Published on

Go语言的并发模式之pipeline模式

特征

  • 多个任务阶段(stage),分阶段以解耦业务流程
  • 使用channel接收参数和产出结果,保证数据在不同任务阶段中传递
  • 每个阶段的任务可能运行在单独的goroutine之中,以增强并发性

示例

worker-pool功能代码

https://github.com/apuppy/go-play/blob/main/demo/concurrency_pattern/pipeline.go
package concurrency_pattern

import (
	"fmt"
)

func Pipeline() {
	// generator, generate some int values that can be retrieved through a channel by other goroutines.
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for _, i := range integers {
				select {
				case <-done:
					return
				case intStream <- i:
				}
			}
		}()
		return intStream
	}

	// multiply stage, accept an input stream and push multiplied value an output stream
	multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
		multipliedStream := make(chan int)
		go func() {
			defer close(multipliedStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case multipliedStream <- i*multiplier:
				}
			}
		}()
		return multipliedStream
	}

	// add stage, accept an input stream and push added value an output stream
	add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
		addedStream := make(chan int)
		go func(){
			defer close(addedStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case addedStream <- i + additive:
				}
			}
		}()
		return addedStream
	}


	// invoke the pipeline process
	done := make(chan interface{})
	defer close(done)

	intStream := generator(done, 1,2,3,4)
	pipeline := multiply(done, add(done, multiply(done,intStream,2),1), 2)
	for v := range pipeline {
		fmt.Println(v)
	}
}

单元测试

https://github.com/apuppy/go-play/blob/main/tests/con_pattern_test.go
package tests

import (
	"github.com/apuppy/go-play/demo/concurrency_pattern"
	"testing"
)

func TestPatternPipeline(t *testing.T) {
	concurrency_pattern.Pipeline()
}

运行测试

$ go test -v github.com/apuppy/go-play/tests -run TestPatternPipeline
=== RUN   TestPatternPipeline
6
10
14
18
--- PASS: TestPatternPipeline (0.00s)
PASS
ok  	github.com/apuppy/go-play/tests	(cached)

参考