Published on

Go语言的并发模式之worker pool模式

特征

  • 工作者协程(worker)的数量是固定的,以避免无限制的并发
  • 任务队列:使用一个channel来传递工作请求
  • 结果队列:使用另一个channel来传递工作结果
  • 调度功能:确定worker协程数分发任务收集结果

示例

worker-pool功能代码

https://github.com/apuppy/go-play/blob/main/demo/concurrency_pattern/worker_pool.go
package concurrency_pattern

import (
	"fmt"
	"net/http"
	"time"
)

type JobTask struct {
	ID  int
	URL string
}

type JobResult struct {
	ID      int
	URL     string
	Success bool
	Status  int
}

func WorkerPool() {

	// worker协程数,取任务的channel, 取回任务结果的channel
	jobNum := 3
	jobTaskCh := make(chan JobTask, jobNum)
	jobResultCh := make(chan JobResult, jobNum)

	// 启动worker
	for i := 1; i <= jobNum; i++ {
		go worker(i, jobTaskCh, jobResultCh)
	}

	// 添加任务
	taskURLs := []string{
		"https://www.google.com",
		"https://www.youtube.com",
		"https://www.wikipedia.org",
		"https://www.github.com",
		"https://www.apple.com",
		"https://www.x.com",
		"https://chatgpt.com/",
	}

	for i, v := range taskURLs {
		jobTaskCh <- JobTask{
			ID:  i,
			URL: v,
		}
	}
	close(jobTaskCh)

	for i := 1; i <= len(taskURLs); i++ {
		jobResult := <-jobResultCh
		fmt.Printf("job result, id: %d, URL: %s, success: %t, status: %d\n",
			jobResult.ID, jobResult.URL, jobResult.Success, jobResult.Status)
	}
}

func worker(wID int, jobTask <-chan JobTask, jobResult chan<- JobResult) {
	for job := range jobTask {
		client := http.Client{
			Timeout: 3 * time.Second,
		}
		resp, err := client.Get(job.URL)
		result := JobResult{
			ID:      job.ID,
			URL:     job.URL,
			Success: false,
			Status:  0,
		}
		if err != nil {
			result.Success = false
		} else {
			result.Success = true
			result.Status = resp.StatusCode
		}

		jobResult <- result
		fmt.Printf("worker id: %d, job id: %d\n", wID, job.ID)
	}
}

单元测试

https://github.com/apuppy/go-play/blob/main/tests/con_pattern_test.go
package tests

import (
	"github.com/apuppy/go-play/demo/concurrency_pattern"
	"testing"
)

func TestPatternWorkerPool(t *testing.T) {
	concurrency_pattern.WorkerPool()
}

运行测试

$ go test -v github.com/apuppy/go-play/tests -run TestPatternWorkerPool
=== RUN   TestPatternWorkerPool
worker id: 1, job id: 1
job result, id: 1, URL: https://www.youtube.com, success: true, status: 200
worker id: 3, job id: 0
job result, id: 0, URL: https://www.google.com, success: true, status: 200
worker id: 3, job id: 4
job result, id: 4, URL: https://www.apple.com, success: true, status: 200
worker id: 2, job id: 2
job result, id: 2, URL: https://www.wikipedia.org, success: true, status: 200
worker id: 2, job id: 6
job result, id: 6, URL: https://chatgpt.com/, success: true, status: 403
worker id: 1, job id: 3
job result, id: 3, URL: https://www.github.com, success: true, status: 200
worker id: 3, job id: 5
job result, id: 5, URL: https://www.x.com, success: true, status: 200
--- PASS: TestPatternWorkerPool (2.34s)
PASS
ok  	github.com/apuppy/go-play/tests	(cached)

参考