- Published on
Go语言的并发模式之worker pool模式
特征
- 工作者协程(worker)的数量是固定的,以避免无限制的并发
- 任务队列:使用一个channel来传递工作请求
- 结果队列:使用另一个channel来传递工作结果
- 调度功能:确定worker协程数、分发任务、收集结果
示例
worker-pool功能代码
https://github.com/apuppy/go-play/blob/main/demo/concurrency_pattern/worker_pool.go
package concurrency_pattern
import (
"fmt"
"net/http"
"time"
)
type JobTask struct {
ID int
URL string
}
type JobResult struct {
ID int
URL string
Success bool
Status int
}
func WorkerPool() {
// worker协程数,取任务的channel, 取回任务结果的channel
jobNum := 3
jobTaskCh := make(chan JobTask, jobNum)
jobResultCh := make(chan JobResult, jobNum)
// 启动worker
for i := 1; i <= jobNum; i++ {
go worker(i, jobTaskCh, jobResultCh)
}
// 添加任务
taskURLs := []string{
"https://www.google.com",
"https://www.youtube.com",
"https://www.wikipedia.org",
"https://www.github.com",
"https://www.apple.com",
"https://www.x.com",
"https://chatgpt.com/",
}
for i, v := range taskURLs {
jobTaskCh <- JobTask{
ID: i,
URL: v,
}
}
close(jobTaskCh)
for i := 1; i <= len(taskURLs); i++ {
jobResult := <-jobResultCh
fmt.Printf("job result, id: %d, URL: %s, success: %t, status: %d\n",
jobResult.ID, jobResult.URL, jobResult.Success, jobResult.Status)
}
}
func worker(wID int, jobTask <-chan JobTask, jobResult chan<- JobResult) {
for job := range jobTask {
client := http.Client{
Timeout: 3 * time.Second,
}
resp, err := client.Get(job.URL)
result := JobResult{
ID: job.ID,
URL: job.URL,
Success: false,
Status: 0,
}
if err != nil {
result.Success = false
} else {
result.Success = true
result.Status = resp.StatusCode
}
jobResult <- result
fmt.Printf("worker id: %d, job id: %d\n", wID, job.ID)
}
}
单元测试
https://github.com/apuppy/go-play/blob/main/tests/con_pattern_test.go
package tests
import (
"github.com/apuppy/go-play/demo/concurrency_pattern"
"testing"
)
func TestPatternWorkerPool(t *testing.T) {
concurrency_pattern.WorkerPool()
}
运行测试
$ go test -v github.com/apuppy/go-play/tests -run TestPatternWorkerPool
=== RUN TestPatternWorkerPool
worker id: 1, job id: 1
job result, id: 1, URL: https://www.youtube.com, success: true, status: 200
worker id: 3, job id: 0
job result, id: 0, URL: https://www.google.com, success: true, status: 200
worker id: 3, job id: 4
job result, id: 4, URL: https://www.apple.com, success: true, status: 200
worker id: 2, job id: 2
job result, id: 2, URL: https://www.wikipedia.org, success: true, status: 200
worker id: 2, job id: 6
job result, id: 6, URL: https://chatgpt.com/, success: true, status: 403
worker id: 1, job id: 3
job result, id: 3, URL: https://www.github.com, success: true, status: 200
worker id: 3, job id: 5
job result, id: 5, URL: https://www.x.com, success: true, status: 200
--- PASS: TestPatternWorkerPool (2.34s)
PASS
ok github.com/apuppy/go-play/tests (cached)