Goroutine 工作池

2020年12月5日 18:14

线程池

在其他语言中,可以通过预创建多个线程的方式,降低线程创建和销毁的成本。在这些线程组成的线程池中,每一个线程都处于阻塞状态,每当有新任务压入任务队列,就唤醒一个线程来执行任务,当任务执行完毕,该线程继续进入阻塞状态,等待下一次唤醒,这就是线程池的基本工作原理。

Goroutine 工作池

在 Golang 中,可以通过类似的方式创建资源池。虽然 goroutine 创建和销毁的代价比线程小得多,但是在部分情况下,比如每个 goroutine 中的任务执行时间极短,且有大量 goroutine 频繁的创建和销毁,这个时间成本就会变得不可忽视。很自然的,我们可以想到创建一个 goroutine 工作池。

创建工作池

一提到资源池,可能就会想到任务队列,进而想到带缓冲的通道。但资源池的核心是一定数量的阻塞资源等待被唤醒,阻塞和唤醒,是一种典型的同步工作方式,很符合无缓冲通道的应用场景。

先定义一个接口,只有符合该接口定义的对象,才能使用工作池:

type Worker interface {
    Task()
}

定义一个 goroutine 池结构,使用无缓冲通道的同步工作方式,实现工作池:

type Pool struct {
    work chan Worker
    wg sync.WaitGroup
}

我们可以通过等待 work 通道的方式来实现阻塞等待唤醒机制,这个循环在通道关闭前不会退出:

// 创建工作池对象 p,如果无法从 work 中拿到接口对象,就一直阻塞
for w := range p.work {
    w.Task()
}

工作池工厂函数:

func New(maxGoroutines int) *Pool {
    p := Pool{
        work: make(chan Worker),
    }
    p.wg.Add(maxGoroutines)
    for i := 0; i < maxGoroutines; i++ {
        go func() {
            // 阻塞状态,一直等待新的任务
            for w := range p.work {
                w.Task()
            }
            p.wg.Done()
        } ()
    }
    return &p
}

工厂函数指定需要创建的 goroutine 工作池数量,在循环创建过程中 ,每一个 goroutine 都处于阻塞状态,等待执行 work 通道中的新任务。WaitGroup 对象的值可以用来判断有多少个 goroutine 在工作中,这个阻塞机制可以避免工作池在销毁时,由于部分 goroutine 仍处于执行状态而导致的资源泄漏问题。

使用和销毁

提交任务到工作池中:

func (p *Pool) Run(w Worker) {
    p.work <- w
}

关闭工作池:

func (p *Pool) Shutdown() {
    close(p.work)    // 关闭通道
    p.wg.Wait()    // 等待所有 goroutine 停止工作
}

调用 close 后,for w := range p.work 循环结束,goroutine 跳出阻塞状态,紧接着调用 WaitGroup.Done(),将计数减 1,当 WaitGroup 值为 0 时,工作池完全销毁。WaitGroup.Wait() 很关键,保证了所有 goroutine 退出后才销毁工作池。


参考资料:

  1. Go in action——William Kennedy, Brian Ketelsen, Erik St. Martin