使用 C++11 特性写一个线程池

C++
2020年3月14日 20:19

前段时间偶然看到了一个使用 C++11 编写的线程池代码,一个很小巧的头文件,在 Github 收获了 3000 多个 Stars,感觉还是很值得学习的,源码地址在这:C++11 ThreadPool

基本设计

一个简单的线程池的基本工作流程:启动时预创建多个线程,每个线程都处于阻塞状态,每当有新任务压入任务队列,就从线程池中唤醒一个线程执行任务,执行完毕后该线程继续进入阻塞状态,等待下一次唤醒。总体来说逻辑十分简单,但是在实际操作中,需要考虑任务的适配以及代码的执行效率:

  • 我们希望支持多种多种任务类型:函数指针、函数对象、匿名函数;
  • 支持多种函数签名;
  • 容器的操作要提高效率,能够减少不必要的内存重分配。

这些都是我们需要解决的问题,好在 C++11 已经有足够多的特性,能够在语言层面提供非常简洁的解决方案。

总体结构:

写一个线程池类,对外提供功能

线程池初始化(构造函数)
ThreadPool::ThreadPool(size_t);
添加任务
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
析构函数
ThreadPool::~ThreadPool()

这里有几个值得注意的地方:
为了支持多种参数,这里使用了可变参数模板 class... Args
右值引用和移动语义,C++11 中我们可以引用临时变量(右值)了,临时变量不需要在内存上进行拷贝,提高了传递效率(使用 &&);
使用 std::future 接收异步任务的执行结果;
返回类型后置 auto xxx -> xxx,适用于返回类型前置时,编译器暂时无法判断出返回类型的情况,比如使用了模板,但是编译器无法判断出 F(Args...) 的返回值。

设计细节:

线程池总体来说分为三部分:主线程提交任务、线程池消费任务,任务队列将两者进行解耦;
任务队列处于生产和消费的中心,存在多个线程的写操作,因此需要加锁;
使用 std::condition_variable 来控制线程的阻塞与唤醒;
线程任务执行完毕后,重新进入等待状态,因此需要一个死循环来控制,但是也需要退出条件;
使用 templatestd::functionstd::packaged_task 支持多种可调用对象及多种函数签名。

类成员变量:
  • 线程池——存储一定数量的线程 std::vector<std::thread> workers
  • 任务队列——存储添加的任务 std::queue<std::function<void()>> tasks
  • 互斥量——控制任务队列写线程的同步 std::mutex queue_mutex
  • 条件变量——线程等待和唤醒的控制 std::condition_variable condition
  • 退出标志——线程池结束标志 bool stop
线程池构造函数

构造函数需要实现的功能比较简单,初始化线程池,并使所有线程处于等待状态。

核心部分只有短短几行:

workers.emplace_back(
    [this] {
        for(;;) {
            std::function<void()> task;
             {
                std::unique_lock<std::mutex> lock(this->queue_mutex);
                this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
                if (this->stop && this->tasks.empty())
                    return;
                task = std::move(this->tasks.front());
                this->tasks.pop();
            }
            task();
        }
    }
);

单个线程的行为如下:
等待另一个线程通过条件变量的 notify_one 方法唤醒,之后持有锁,从任务队列取出一个任务,由于外围增加了作用域 { },出了作用域后锁被释放,在线程中同步执行任务,任务结束后,由于外层的死循环,线程重新进入 wait 状态,等待下一次唤醒。

添加一个任务

向任务队列中添加一个任务,同时唤醒一个等待的线程来执行任务,这里的任务应该是一个可调用对象,但是 C++ 中的可调用对象五花八门,有函数指针、函数对象、匿名函数,有没有统一的方式对它们进行描述和保存呢?C++11 提供了 std::functionstd::bind 来保存可调用对象,std::function 是一个可调用对象包装器,不同于函数指针,它是类型安全的,std::bind 来自于 boost::bind,通常用于实现回调,可以将可调用对象的某些参数绑定到已有变量,并返回一个新的可调用对象,配合可变参数模板,可以描述并保存大多数的可调用对象。
此外还需要拿到可调用对象的返回结果,一般来说,我们需要定义一个变量,在任务执行时对其赋值拿到结果,不过 C++11 提供了 std::future 来访问异步操作的结果。

方法的定义:

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>

传入对象的包装:

using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();

这里使用 std::bind 以及 std::package_task 来保存传入的可调用对象,并使用智能指针管理其生命周期;为何不直接使用 std::function 保存可执行对象呢?主要原因是我们需要拿到任务的执行结果,std::packaged_taskstd::function 功能类似,但是可以将包装的可调用对象的执行结果传递给一个 std::future 对象,其本身的接收参数是一个可调用对象,因此可以使用 std::bind 将可调用对象及其参数进行绑定。

核心部分:

{
    std::unique_lock<std::mutex> lock(queue_mutex);
    if (stop)
        throw std::runtime_error("enqueue on stopped ThreadPool");
    tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();

逻辑十分简单,向任务队列中添加一个可调用对象,并通知线程池,唤醒一个线程来执行任务,锁出了作用域自动释放,不会造成死锁。

析构函数
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    conditon.notify_all();
    for (std::thread &worker: workers)
        worker.join();
}

释放所有线程资源


这个线程池代码逻辑十分简单,是一个学习 C++11 的绝好案例,用到的 C++11 特性如下:

  • std::thread ~ 语言级的线程支持;
  • range for ~ 区间迭代,新的循环表达式写法;
  • auto/decltype/result_of ~ 编译期类型推导;
  • auto xxx -> xxx ~ 返回类型后置;
  • std::condition_variable ~ 条件变量,提供线程同步的支持;
  • std::bind ~ 可以将可调用对象的某些参数绑定到已有变量,并返回一个新的可调用对象;
  • std::function ~ 可调用对象包装器,是类型安全的;
  • std::packaged_task ~ 与 std::function 类似,但会将返回结果传递给 std::future 对象;
  • std::future ~ 保存异步操作的结果;
  • std::forward ~ 完美转发,参数在传递过程中,保证其本身的左值或右值属性不变;
  • std::move ~ 通过引用折叠、特殊类型推断等规则,将参数转换为右值引用;
  • emplaceemplace_back ~ 提高数据传递效率,避免不必要的内存拷贝。