线程池与队列系统的设计 线程池的设计原理在很多新手的认知中线程池和队列系统是一项非常高深的技术其实也不然当你熟练掌握了多线程编程技术后这一切将会变的很容易你需要克服的是内心的恐惧而已。所谓线程池不过是一组线程而已一般情况下我们需要异步执行一些任务这些任务的产生和执行是存在于我们程序的整个生命周期的与其让操作系统频繁地为我们创建和销毁线程我们通常需要创建一组在我们程序生命周期内不会退出的线程为了不浪费系统资源我们的基本要求是当有任务需要执行时这些线程可以自动拿到任务去执行没有任务时这些线程处于阻塞或者睡眠状态。这里就涉及到这些处理任务的工作线程的唤醒与睡眠如果你理解了上文中介绍的各种线程同步技术相信你现在对如何唤醒和睡眠线程已经很熟悉了。既然在程序生命周期内会产生很多任务那么这些任务必须有一个存放的地方而这个地方就是队列所以不要一提到队列就认为是一个具体的 list它可以是一个全局变量、链表等等。而线程池中的线程从队列中如何取任务则也可以设计得非常灵活如从尾部放入任务从头部取出或者从头部放入从尾部取出等等。而队列也可以根据实际应用设计得”丰富多彩“如可以根据任务得优先级设计多个队列如分为高中低三个级别、分为关键和普通两个级别。这本质上就是生产者消费者模式产生任务的线程是生产者线程池中的线程是消费者。当然这不是绝对的线程池中的线程处理一个任务以后可能会产生一个新的关联任务那么此时这个工作线程又是生产者的角色。既然会有多个线程同时操作这个队列根据多线程程序的原则这个队列我们一般需要对其加锁以避免多线程竞争产生非预期的结果。当然技术上除了要解决线程池的创建、往队列中投递任务、从队列中取任务处理我们还需要做一些善后工作如线程池的清理即如何退出线程池中的工作线程和清理任务队列。这就是线程池和任务队列的核心原理希望读者能认真体会。说了这么多结合前文介绍的具体的实现也变得很容易我们来看一个具体的例子/** * 任务池模型TaskPool.h * zhangyl 2019.02.14 */ #include thread #include mutex #include condition_variable #include list #include vector #include memory #include iostream class Task { public: virtual void doIt() { std::cout handle a task... std::endl; } virtual ~Task() { //为了看到一个task的销毁这里刻意补上其析构函数 std::cout a task destructed... std::endl; } }; class TaskPool final { public: TaskPool(); ~TaskPool(); TaskPool(const TaskPool rhs) delete; TaskPool operator(const TaskPool rhs) delete; public: void init(int threadNum 5); void stop(); void addTask(Task* task); void removeAllTasks(); private: void threadFunc(); private: std::liststd::shared_ptrTask m_taskList; std::mutex m_mutexList; std::condition_variable m_cv; bool m_bRunning; std::vectorstd::shared_ptrstd::thread m_threads; };/** * 任务池模型TaskPool.cpp * zhangyl 2019.02.14 */ #include TaskPool.h TaskPool::TaskPool() : m_bRunning(false) { } TaskPool::~TaskPool() { removeAllTasks(); } void TaskPool::init(int threadNum/* 5*/) { if (threadNum 0) threadNum 5; m_bRunning true; for (int i 0; i threadNum; i) { std::shared_ptrstd::thread spThread; spThread.reset(new std::thread(std::bind(TaskPool::threadFunc, this))); m_threads.push_back(spThread); } } void TaskPool::threadFunc() { std::shared_ptrTask spTask; while (true) { {// 减小m_mutexList锁的范围 std::unique_lockstd::mutex guard(m_mutexList); while (m_taskList.empty()) { if (!m_bRunning) break; //如果获得了互斥锁但是条件不满足的话m_cv.wait()调用会释放锁且挂起当前 //线程因此不往下执行。 //当发生变化后条件满足m_cv.wait() 将唤醒挂起的线程且获得锁。 m_cv.wait(guard); } if (!m_bRunning) break; spTask m_taskList.front(); m_taskList.pop_front(); } if (spTask NULL) continue; spTask-doIt(); spTask.reset(); } std::cout exit thread, threadID: std::this_thread::get_id() std::endl; } void TaskPool::stop() { m_bRunning false; m_cv.notify_all(); //等待所有线程退出 for (auto iter : m_threads) { if (iter-joinable()) iter-join(); } } void TaskPool::addTask(Task* task) { std::shared_ptrTask spTask; spTask.reset(task); { std::lock_guardstd::mutex guard(m_mutexList); m_taskList.push_back(spTask); std::cout add a Task. std::endl; } m_cv.notify_one(); } void TaskPool::removeAllTasks() { { std::lock_guardstd::mutex guard(m_mutexList); for (auto iter : m_taskList) { iter.reset(); } m_taskList.clear(); } }上述代码封装了一个简单的任务队列模型我们可以这么使用这个TaskPool对象#include TaskPool.h #include chrono int main() { TaskPool threadPool; threadPool.init(); Task* task NULL; for (int i 0; i 10; i) { task new Task(); threadPool.addTask(task); } std::this_thread::sleep_for(std::chrono::seconds(5)); threadPool.stop(); return 0; }程序执行结果如下由于退出线程的输出提示不是原子的多个线程并行执行因此上图中这部分的输出出现了”错乱“。上述代码演示了一个基本的多线程队列模型虽然简单但是具有典型性可以应付实际生产中的一部分需求你可以基于这个基础模型进行扩展不管怎么扩展其基本原理都是一样的。例如如果生产者和消费者即产生任务者和处理任务者的速度差不多可以将队列改成环形队列以节省内存空间。另外很多应用为了追求效率利用一些技巧将队列无锁化。这些都是仁者见仁智者见智的扩展了本文不再介绍。不管如何希望读者一定要理解线程池和任务队列的基本设计原理只有这样你才能做更多高级的扩展和设计。消息中间件基于生产者消费者理论模型的队列系统在实际开发中实在是太常用了以至于在一组服务中可能每个进程都需要一个这样的队列系统。既然如此出于复用和解耦的目的业界产生了许多独立的队列系统这些队列系统或以一个独立的进程运行或以支持分布式的一组服务运行。我们把这种独立的队列系统称之为消息中间件。这些消息中间件在功能上做了丰富的扩展如消费的方式、主备切换、容灾容错数据自动备份和过期数据自动清理等等比较典型的有 Kafka、ActiveMQ、RabbitMQ、RocketMQ 等。下图是 Kafka 官网提供的一张介绍 Kafka 作用的图片下图是笔者开发过的一个金融交易系统后台服务拓扑图其大量使用消息中间件 kafka整个交易的流程如下前端通过 HTTP 请求向下单服务请求下单下单服务在校验完数据后会向**消息中间件 A1 **投递一条下单请求成交服务订阅了消息中间件 A1的消息取出下单请求结合自己的成交规则如果可以成交向**消息中间件 A2 **投递一条成交后的消息结算服务订阅了消息中间件A2从其中拿到成交消息后对用户资金账户进行结算结算完成后用户的下单就算正式完成了然后产生一条行情消息投递给消息中间件 A3行情推送服务器从消息中间件 A3中拿到行情消息后推送给所有已经连接的客户端。上述过程中每个消息中间件kafka都有一个生产者和消费者虚线箭头表示短连接实线箭头表示长连接。当然实际的金融交易系统要比这里的模型复杂许多这里为了演示方便做了大量简化。有了这种专门的队列系统生产者和消费者将最大化解耦利用消息中间件提供的对外消息接口生产者只需要负责生产消息它不必关心谁是消费者消费者也不用关心生产者是谁、何时有数据而队列系统本身也不关心自己有多少生产者和消费者。当然这种消息中间件还有其他一些非常优秀的功能如对数据的备份、负载和容灾容错措施。笔者建议学有余力的读者适当地去了解一两种开源的队列系统的使用方法如果掌握其设计思路那就善莫大焉了。