线程池
在其他语言中,可以通过预创建多个线程的方式,降低线程创建和销毁的成本。在这些线程组成的线程池中,每一个线程都处于阻塞状态,每当有新任务压入任务队列,就唤醒一个线程来执行任务,当任务执行完毕,该线程继续进入阻塞状态,等待下一次唤醒,这就是线程池的基本工作原理。
Goroutine 工作池
在 Golang 中,可以通过类似的方式创建资源池。虽然 goroutine 创建和销毁的代价比线程小得多,但是在部分情况下,比如每个 goroutine 中的任务执行时间极短,且有大量 goroutine 频繁的创建和销毁,这个时间成本就会变得不可忽视。很自然的,我们可以想到创建一个 goroutine 工作池。
创建工作池
一提到资源池,可能就会想到任务队列,进而想到带缓冲的通道。但资源池的核心是一定数量的阻塞资源等待被唤醒,阻塞和唤醒,是一种典型的同步工作方式,很符合无缓冲通道的应用场景。
先定义一个接口,只有符合该接口定义的对象,才能使用工作池:
type Worker interface {
Task()
}
定义一个 goroutine 池结构,使用无缓冲通道的同步工作方式,实现工作池:
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
我们可以通过等待 work 通道的方式来实现阻塞等待唤醒机制,这个循环在通道关闭前不会退出:
// 创建工作池对象 p,如果无法从 work 中拿到接口对象,就一直阻塞
for w := range p.work {
w.Task()
}
工作池工厂函数:
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
// 阻塞状态,一直等待新的任务
for w := range p.work {
w.Task()
}
p.wg.Done()
} ()
}
return &p
}
工厂函数指定需要创建的 goroutine 工作池数量,在循环创建过程中 ,每一个 goroutine 都处于阻塞状态,等待执行 work 通道中的新任务。WaitGroup 对象的值可以用来判断有多少个 goroutine 在工作中,这个阻塞机制可以避免工作池在销毁时,由于部分 goroutine 仍处于执行状态而导致的资源泄漏问题。
使用和销毁
提交任务到工作池中:
func (p *Pool) Run(w Worker) {
p.work <- w
}
关闭工作池:
func (p *Pool) Shutdown() {
close(p.work) // 关闭通道
p.wg.Wait() // 等待所有 goroutine 停止工作
}
调用 close 后,for w := range p.work
循环结束,goroutine 跳出阻塞状态,紧接着调用 WaitGroup.Done()
,将计数减 1,当 WaitGroup 值为 0 时,工作池完全销毁。WaitGroup.Wait()
很关键,保证了所有 goroutine 退出后才销毁工作池。
参考资料:
- Go in action——William Kennedy, Brian Ketelsen, Erik St. Martin