- Published on
Go语言的并发模式之pipeline模式
特征
- 多个任务阶段(stage),分阶段以解耦业务流程
- 使用channel接收参数和产出结果,保证数据在不同任务阶段中传递
- 每个阶段的任务可能运行在单独的goroutine之中,以增强并发性
示例
worker-pool功能代码
https://github.com/apuppy/go-play/blob/main/demo/concurrency_pattern/pipeline.go
package concurrency_pattern
import (
"fmt"
)
func Pipeline() {
// generator, generate some int values that can be retrieved through a channel by other goroutines.
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
// multiply stage, accept an input stream and push multiplied value an output stream
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i*multiplier:
}
}
}()
return multipliedStream
}
// add stage, accept an input stream and push added value an output stream
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
addedStream := make(chan int)
go func(){
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + additive:
}
}
}()
return addedStream
}
// invoke the pipeline process
done := make(chan interface{})
defer close(done)
intStream := generator(done, 1,2,3,4)
pipeline := multiply(done, add(done, multiply(done,intStream,2),1), 2)
for v := range pipeline {
fmt.Println(v)
}
}
单元测试
https://github.com/apuppy/go-play/blob/main/tests/con_pattern_test.go
package tests
import (
"github.com/apuppy/go-play/demo/concurrency_pattern"
"testing"
)
func TestPatternPipeline(t *testing.T) {
concurrency_pattern.Pipeline()
}
运行测试
$ go test -v github.com/apuppy/go-play/tests -run TestPatternPipeline
=== RUN TestPatternPipeline
6
10
14
18
--- PASS: TestPatternPipeline (0.00s)
PASS
ok github.com/apuppy/go-play/tests (cached)
参考
- 《Go语言并发之道》- 第四章 Go言语的并发模式 - pipeline
- 4. Concurrency Patterns in Go | Concurrency in Go