前段时间偶然看到了一个使用 C++11 编写的线程池代码,一个很小巧的头文件,在 Github 收获了 3000 多个 Stars,感觉还是很值得学习的,源码地址在这:C++11 ThreadPool。
基本设计
一个简单的线程池的基本工作流程:启动时预创建多个线程,每个线程都处于阻塞状态,每当有新任务压入任务队列,就从线程池中唤醒一个线程执行任务,执行完毕后该线程继续进入阻塞状态,等待下一次唤醒。总体来说逻辑十分简单,但是在实际操作中,需要考虑任务的适配以及代码的执行效率:
- 我们希望支持多种多种任务类型:函数指针、函数对象、匿名函数;
- 支持多种函数签名;
- 容器的操作要提高效率,能够减少不必要的内存重分配。
这些都是我们需要解决的问题,好在 C++11 已经有足够多的特性,能够在语言层面提供非常简洁的解决方案。
总体结构:
写一个线程池类,对外提供功能
线程池初始化(构造函数)
ThreadPool::ThreadPool(size_t);
添加任务
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
析构函数
ThreadPool::~ThreadPool()
这里有几个值得注意的地方:
为了支持多种参数,这里使用了可变参数模板 class... Args
;
右值引用和移动语义,C++11 中我们可以引用临时变量(右值)了,临时变量不需要在内存上进行拷贝,提高了传递效率(使用 &&
);
使用 std::future
接收异步任务的执行结果;
返回类型后置 auto xxx -> xxx
,适用于返回类型前置时,编译器暂时无法判断出返回类型的情况,比如使用了模板,但是编译器无法判断出 F(Args...)
的返回值。
设计细节:
线程池总体来说分为三部分:主线程提交任务、线程池消费任务,任务队列将两者进行解耦;
任务队列处于生产和消费的中心,存在多个线程的写操作,因此需要加锁;
使用 std::condition_variable
来控制线程的阻塞与唤醒;
线程任务执行完毕后,重新进入等待状态,因此需要一个死循环来控制,但是也需要退出条件;
使用 template
、std::function
、std::packaged_task
支持多种可调用对象及多种函数签名。
类成员变量:
- 线程池——存储一定数量的线程
std::vector<std::thread> workers
; - 任务队列——存储添加的任务
std::queue<std::function<void()>> tasks
; - 互斥量——控制任务队列写线程的同步
std::mutex queue_mutex
; - 条件变量——线程等待和唤醒的控制
std::condition_variable condition
; - 退出标志——线程池结束标志
bool stop
。
线程池构造函数
构造函数需要实现的功能比较简单,初始化线程池,并使所有线程处于等待状态。
核心部分只有短短几行:
workers.emplace_back(
[this] {
for(;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
单个线程的行为如下:
等待另一个线程通过条件变量的 notify_one
方法唤醒,之后持有锁,从任务队列取出一个任务,由于外围增加了作用域 { }
,出了作用域后锁被释放,在线程中同步执行任务,任务结束后,由于外层的死循环,线程重新进入 wait 状态,等待下一次唤醒。
添加一个任务
向任务队列中添加一个任务,同时唤醒一个等待的线程来执行任务,这里的任务应该是一个可调用对象,但是 C++ 中的可调用对象五花八门,有函数指针、函数对象、匿名函数,有没有统一的方式对它们进行描述和保存呢?C++11 提供了 std::function
和 std::bind
来保存可调用对象,std::function
是一个可调用对象包装器,不同于函数指针,它是类型安全的,std::bind
来自于 boost::bind
,通常用于实现回调,可以将可调用对象的某些参数绑定到已有变量,并返回一个新的可调用对象,配合可变参数模板,可以描述并保存大多数的可调用对象。
此外还需要拿到可调用对象的返回结果,一般来说,我们需要定义一个变量,在任务执行时对其赋值拿到结果,不过 C++11 提供了 std::future
来访问异步操作的结果。
方法的定义:
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>
传入对象的包装:
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
这里使用 std::bind
以及 std::package_task
来保存传入的可调用对象,并使用智能指针管理其生命周期;为何不直接使用 std::function
保存可执行对象呢?主要原因是我们需要拿到任务的执行结果,std::packaged_task
和 std::function
功能类似,但是可以将包装的可调用对象的执行结果传递给一个 std::future
对象,其本身的接收参数是一个可调用对象,因此可以使用 std::bind
将可调用对象及其参数进行绑定。
核心部分:
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
逻辑十分简单,向任务队列中添加一个可调用对象,并通知线程池,唤醒一个线程来执行任务,锁出了作用域自动释放,不会造成死锁。
析构函数
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
conditon.notify_all();
for (std::thread &worker: workers)
worker.join();
}
释放所有线程资源
这个线程池代码逻辑十分简单,是一个学习 C++11 的绝好案例,用到的 C++11 特性如下:
std::thread
~ 语言级的线程支持;range for
~ 区间迭代,新的循环表达式写法;auto/decltype/result_of
~ 编译期类型推导;auto xxx -> xxx
~ 返回类型后置;std::condition_variable
~ 条件变量,提供线程同步的支持;std::bind
~ 可以将可调用对象的某些参数绑定到已有变量,并返回一个新的可调用对象;std::function
~ 可调用对象包装器,是类型安全的;std::packaged_task
~ 与std::function
类似,但会将返回结果传递给std::future
对象;std::future
~ 保存异步操作的结果;std::forward
~ 完美转发,参数在传递过程中,保证其本身的左值或右值属性不变;std::move
~ 通过引用折叠、特殊类型推断等规则,将参数转换为右值引用;emplace
和emplace_back
~ 提高数据传递效率,避免不必要的内存拷贝。