Go编程模式:Pipeline

Go编程模式:Pipeline

本篇文章,我们着重介绍Go编程中的Pipeline模式。对于Pipeline用过Unix/Linux命令行的人都不会陌生,他是一种把各种命令拼接起来完成一个更强功能的技术方法。在今天,流式处理,函数式编程,以及应用网关对微服务进行简单的API编排,其实都是受pipeline这种技术方式的影响,Pipeline这种技术在可以很容易的把代码按单一职责的原则拆分成多个高内聚低耦合的小模块,然后可以很方便地拼装起来去完成比较复杂的功能。

本文是全系列中第8 / 10篇: Go编程模式

HTTP 处理

这种Pipeline的模式,我们在《 Go编程模式:修饰器 》中有过一个示例,我们在这里再重温一下。在那篇文章中,我们有一堆如 WithServerHead() WithBasicAuth() WithDebugLog() 这样的小功能代码,在我们需要实现某个HTTP API 的时候,我们就可以很容易的组织起来。

原来的代码是下面这个样子:

http.HandleFunc("/v1/hello", WithServerHeader(WithAuthCookie(hello)))
http.HandleFunc("/v2/hello", WithServerHeader(WithBasicAuth(hello)))
http.HandleFunc("/v3/hello", WithServerHeader(WithBasicAuth(WithDebugLog(hello))))

通过一个代理函数:

type HttpHandlerDecorator func(http.HandlerFunc) http.HandlerFunc
func Handler(h http.HandlerFunc, decors ...HttpHandlerDecorator) http.HandlerFunc {
    for i := range decors {
        d := decors[len(decors)-1-i] // iterate in reverse
        h = d(h)
    }
    return h
}

我们就可以移除不断的嵌套像下面这样使用了:

http.HandleFunc("/v4/hello", Handler(hello,
                WithServerHeader, WithBasicAuth, WithDebugLog))

Channel 管理

当然,如果你要写出一个 泛型的pipeline框架 并不容易,而使用 Go Generation ,但是,我们别忘了Go语言最具特色的 Go Routine 和 Channel 这两个神器完全也可以被我们用来构造这种编程。

Rob Pike在 Go Concurrency Patterns: Pipelines and cancellation 这篇blog中介绍了如下的一种编程模式。

Channel转发函数

首先,我们需一个 echo() 函数,其会把一个整型数组放到一个Channel中,并返回这个Channel

func echo(nums []int) <-chan int {
  out := make(chan int)
  go func() {
    for _, n := range nums {
      out <- n
    }
    close(out)
  }()
  return out
}

然后,我们依照这个模式,我们可以写下这个函数。

平方函数
func sq(in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    for n := range in {
      out <- n * n
    }
    close(out)
  }()
  return out
}
过滤奇数函数
func odd(in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    for n := range in {
      if n%2 != 0 {
        out <- n
      }
    }
    close(out)
  }()
  return out
}
求和函数
func sum(in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    var sum = 0
    for n := range in {
      sum += n
    }
    out <- sum
    close(out)
  }()
  return out
}

然后,我们的用户端的代码如下所示:(注: 你可能会觉得, sum() odd() sq() 太过于相似。你其实可以通过我们之前的 Map/Reduce编程模式 或是 Go Generation的方式 来合并一下

var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for n := range sum(sq(odd(echo(nums)))) {
  fmt.Println(n)
}

上面的代码类似于我们执行了Unix/Linux命令: echo $nums | sq | sum

同样,如果你不想有那么多的函数嵌套,你可以使用一个代理函数来完成。

type EchoFunc func ([]int) (<- chan int) 
type PipeFunc func (<- chan int) (<- chan int) 

func pipeline(nums []int, echo EchoFunc, pipeFns ... PipeFunc) <- chan int {
  ch  := echo(nums)
  for i := range pipeFns {
    ch = pipeFns[i](ch)
  }
  return ch
}

然后,就可以这样做了:

var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}    
for n := range pipeline(nums, gen, odd, sq, sum) {
    fmt.Println(n)
  }

Fan in/Out

动用Go语言的 Go Routine和 Channel还有一个好处,就是可以写出1对多,或多对1的pipeline,也就是Fan In/ Fan Out。下面,我们来看一个Fan in的示例:

我们想通过并发的方式来对一个很长的数组中的质数进行求和运算,我们想先把数组分段求和,然后再把其集中起来。

下面是我们的主函数:

func makeRange(min, max int) []int {
  a := make([]int, max-min+1)
  for i := range a {
    a[i] = min + i
  }
  return a
}

func main() {
  nums := makeRange(1, 10000)
  in := echo(nums)

  const nProcess = 5
  var chans [nProcess]<-chan int
  for i := range chans {
    chans[i] = sum(prime(in))
  }

  for n := range sum(merge(chans[:])) {
    fmt.Println(n)
  }
}

再看我们的 prime() 函数的实现 :

func is_prime(value int) bool {
  for i := 2; i <= int(math.Floor(float64(value) / 2)); i++ {
    if value%i == 0 {
      return false
    }
  }
  return value > 1
}

func prime(in <-chan int) <-chan int {
  out := make(chan int)
  go func ()  {
    for n := range in {
      if is_prime(n) {
        out <- n
      }
    }
    close(out)
  }()
  return out
}

我们可以看到,

  • 我们先制造了从1到10000的一个数组,
  • 然后,把这堆数组全部 echo 到一个channel里 – in
  • 此时,生成 5 个 Channel,然后都调用 sum(prime(in)) ,于是每个Sum的Go Routine都会开始计算和
  • 最后再把所有的结果再求和拼起来,得到最终的结果。

其中的merge代码如下:

func merge(cs []<-chan int) <-chan int {
  var wg sync.WaitGroup
  out := make(chan int)

  wg.Add(len(cs))
  for _, c := range cs {
    go func(c <-chan int) {
      for n := range c {
        out <- n
      }
      wg.Done()
    }(c)
  }
  go func() {
    wg.Wait()
    close(out)
  }()
  return out
}

用图片表示一下,整个程序的结构如下所示:

延伸阅读

如果你还想了解更多的这样的与并发相关的技术,可以参看下面这些资源:

(全文完)

(转载本站文章请注明作者和出处 酷 壳 – CoolShell ,请勿用于任何商业用途)

好烂啊 有点差 凑合看看 还不错 很精彩 ( 38 人打了分,平均分: 3.92 )
Loading...

Go编程模式:Pipeline 》的相关评论

  1. fan in/out一节的例子,虽然中间分成5个channel和goroutine,但是最后又merge成一个channel。真正处理数据只会有一个goroutine,其余4个都是阻塞住的,没有并发起来。不知我理解的对不对?

      1. merge就是需要待所有的channel都处理完成了。但是计算的事都并行完了,所以,在计算上是并发的,在merge上并不是。但真正耗时的是计算而不是merge,所以,并发是有用的。

        另,使不使用buffered channel都差不多,无非就是阻塞在merge还是merge后而已。

  2. 耗子哥,可以打招聘广告么……不行您就删了哈

    如果对耗子哥这几篇go编程模式都掌握的炉火纯青,热爱并熟悉函数式,热爱并熟悉函数式,热爱并熟悉函数式,又在寻找新的工作机会的同学。欢迎私聊 wx: starc_mo 坐标深圳,海外电商 SaaS 赛道,薪酬可与 BAT 竞争,寻找有野心的技术人入局。

  3. merge时是否使用buffered chan在这个demo里没多大区别。因为merge是将5个no buffered chan合并到一个no buffered chan,并最终使用sum消费这个chan。瓶颈在最后一步的sum操作。

  4. 第一次调用sum时启动了5个goroutine,然后在merge中等待5个goroutine全部完成。

    第二次调用sum就没有用到并发了。而且第二次的goroutine编号,与第一次的没有任何重叠。

    虽然是pipeline,其实每次调用都用了不同的goroutine,这和多线程编程是不一样的。

  5. 请问is_prime中的实现,int(math.Floor(float64(value)/2))为什么要这么写啊?我自己验证发现效果和value/2一样。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注