Published on

在Go语言中使用errgroup

使用场景

  • 有并发处理任务的需求
  • 简化sync.WaitGroup的代码风格,不需要手动Add(), Done()
  • 能控制协程的并发数量setLimit(),不会因为协程并发导致资源耗尽
  • 出错时可以取消其余协程任务的执行(但是不保证除出错任务外其它的任务都能取消掉

特点

  • 可以返回错误,但是只能接收到第一个返回的错误

示例

忽略错误

  • 不需要context
https://github.com/apuppy/go-play/blob/main/demo/error_group.go
package demo

import (
	"errors"
	"fmt"
	"time"

	"golang.org/x/sync/errgroup"
)

func ErrGroupWithoutCancel() {
	var g errgroup.Group

	g.Go(func() error {
		time.Sleep(5 * time.Second)
		fmt.Println("exec #1")
		return nil
	})

	g.Go(func() error {
		time.Sleep(10 * time.Second)
		fmt.Println("exec #2")
		return errors.New("failed to exec #2")
	})

	g.Go(func() error {
		time.Sleep(15 * time.Second)
		fmt.Println("exec #3")
		return nil
	})

	if err := g.Wait(); err == nil {
		fmt.Println("exec all")
	} else {
		fmt.Println("failed: ", err)
	}
}

不忽略错误

  • 使用contextWithContext()
  • Go()调用的方法出错时需要返回error
  • 使用select<-ctx.Done()监听错误
https://github.com/apuppy/go-play/blob/main/demo/error_group.go
package demo

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/sync/errgroup"
)

func ErrGroupWithCancel() {
	g, ctx := errgroup.WithContext(context.Background())
	input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	g.SetLimit(10)
	//g.SetLimit(100) // If you set the limit to 100, seems that the context canceling does not work.

	g.Go(func() error {
		for v := range input {
			func(n int) {
				g.Go(func() error {
					select {
					case <-ctx.Done():
						fmt.Printf("Canceling, goroutine cancel, number: %d, err: %s.\n", n, ctx.Err())
						return nil
					default:
						// simulation error occurred here
						if n == 5 {
							return fmt.Errorf("bigger than 5")
						}
						fmt.Printf("Working, goroutine work, number: %d\n", n)
						// Simulate time-consuming task for a while use time.Sleep()
						// If you comment the time.Sleep line bellow, you will see context canceling seems did not work. But it's not true.
						//  Why: It may meet the real-world scenario because the task executed very fast,
						//       so it's too late to call the ctx.Done() to cancel other goroutines.
						time.Sleep(1 * time.Second)
						return nil
					}
				})
			}(v)
		}
		return nil
	})

	if err := g.Wait(); err != nil {
		fmt.Printf("Failed: %s\n", err)
	} else {
		fmt.Println("Success")
	}
	return
}

NOTE

经测试,无法确保取消操作全部奏效,想想为什么呢?从goroutine队列,时间片分配等方面考虑。

参考