muduo库-ThreadPool类
线程池模型
模型图如下:
这个是通用线程池,双端队列存放的是多个可调用对象(即用户任务),而非函数指针,因此可以通过std::bind
配接器传参。双端队列queue_
,有时也称为工作队列。
工作原理:首先创建并启动一组线程,称为线程池threads_
,由用户指定其大小maxQueueSize_
,每个元素对对应一个线程。每个线程函数都是一样的,在其中会运行一个loop循环:从双端队列取出一个任务对象task,如果非空,就执行之,如此往复。
当有一个用户线程想要通过线程池运行一个用户任务时,就可以将用户任务函数及参数封装成一个可调用对象Task
f,然后通过线程池接口,将f加入双端队列末尾。当线程池有线程空闲时(未执行用户任务),就会从双端队列头部取出一个Task对象task,然后执行之。
ThreadPool结构
线程池的构造
1 2 3 4 5 6 7 8 9
| ThreadPool::ThreadPool(const string &nameArg) : mutex_(), notEmpty_(mutex_), notFull_(mutex_), name_(nameArg), maxQueueSize_(0), running_(false) { }
|
用户可以指定线程池名称,默认为"ThreadPool",便于调试跟踪,日志诊断问题;值得注意的是工作队列最大大小maxQueueSize_初值0,用户可通过setMaxQueueSize修改其大小;
启动与停止
用户可通过start()
启动线程池,需要指定线程组中子线程数量,一旦创建成功后,各子线程就会投入运行,直到调用stop()
停止线程池运行。
由于Thread
已内含一个门阀,会让调用线程等待新线程函数启动,因此,这里不必再设置门阀等待线程池中线程的启动。相反,如果有子线程运行所需要的数据,就需要在创建之前就准备好,比如running_
,要在线程循环前就设置为true
,否则子线程loop不会运行,而是直接退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| void ThreadPool::start(int numThreads) { assert(threads_.empty()); running_ = true; threads_.reserve(static_cast<size_t>(numThreads)); for (int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id, sizeof(id), "%d", i + 1); threads_.emplace_back(new muduo::Thread( std::bind(&ThreadPool::runInThread, this), name_ + id)); threads_[i]->start(); } if (numThreads == 0 && threadInitCallback_) { threadInitCallback_(); } }
void ThreadPool::stop() { { MutexLockGuard lock(mutex_); running_ = false; notEmpty_.notifyAll(); notFull_.notifyAll(); } for (auto& thr : threads_) { thr->join(); } }
|
为什么start()
中不加锁,而stop()
却要加锁?
因为start()
中,在子线程启动后,并没有对共享数据进行访问,也就不存在竞态条件。而stop()
中,有对共享数据,如running_
、notEmpty
、notFull
,因此,需要加锁对这些数据进行保护。
这里,子线程退出方式是连接(join)线程,而非分离(detach)线程。个人认为两种方案都可以,不过,join更容易在开发阶段,排查问题,因为如果线程无法正常退出,调用线程会阻塞在join调用上。
往工作队列加入任务对象
调用线程通过run()
,向线程池的请求运行用户指定的任务对象,该对象会被加入到工作队列末尾,空闲子线程会自动从工作队列中取任务对象执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void ThreadPool::run(Task task) { if (threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); while (isFull() && running_) { notFull_.wait(); } if (!running_) return; assert(!isFull());
queue_.push_back(std::move(task)); notEmpty_.notify(); } }
|
这里有2个特殊情况需要注意:
threads_
为空,即没有创建线程,可能是用户指定线程数为0
或非法数量(如负数),也有可能是进程创建的线程数达到系统限制,从而创建线程失败。不论什么原因,为避免进程崩溃,可以直接在当前线程中调用用户任务。
- 采用的是isFull()成员来判断工作队列是否满,而不是容器自带的size()来判断。
在isFull()内部,添加了一个互斥锁断言,确保isFull()的调用线程已经取得了mutex_锁;否则,一旦有其他线程在未取得锁的情况下,访问应受锁保护工作队列成员,可能导致意外情况。
1 2 3 4 5
| bool ThreadPool::isFull() const { mutex_.assertLocked(); return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_; }
|
从工作队列取任务对象
用take从工作队列头部取出一个任务对象。通常是子线程空闲时调用,取出后,用来执行用户任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); while (queue_.empty() && running_) { notEmpty_.wait(); } Task task; if (!queue_.empty()) { task = queue_.front(); queue_.pop_front(); if (maxQueueSize_ > 0) { notFull_.notify(); } } return task; }
|
子线程loop
主要工作:从工作队列取用户任务,然后执行之。循环往复,直到线程池停止工作。
实现该工作的runInThread()是在用户调用start()时,自动启动的,不需要用户自行调研。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| void ThreadPool::runInThread() { try { if (threadInitCallback_) { threadInitCallback_(); } while (running_) { Task task(take()); if (task) { task(); } } } catch (const Exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); fprintf(stderr, "stack trace: %s\n", ex.stackTrace()); abort(); } catch (const std::exception& ex) { fprintf(stderr, "exception caught in TheadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); } catch (...) { fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str()); throw ; } }
|
这里,用了try-catch语句块将代码包裹起来,因为不知道用户代码会干些什么,很有可能会产生异常,因此需要捕获异常。对于不确定的异常,可以rethrow(继续上抛)。另外,threadInitCallback_让用户有机会在线程初始化完成后,运行用户任务之前,做一些事情。
ThreadPool的使用、测试
基本流程:
1 2 3 4 5 6 7 8 9 10 11
| ThreadPool pool("MyThreadPool");
pool.setMaxQueueSize(maxSize);
pool.start(threadNum);
pool.run(userTask); ...
pool.stop();
|
截取自muduo的部分代码,对ThreadPool进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
void print() { printf("tid=%d\n", muduo::CurrentThread::tid()); }
void printString(const std::string& str) { LOG_INFO << str; usleep(100*1000); }
void test(int maxSize) { LOG_WARN << "Test ThreadPool with max queue size = " << maxSize; muduo::ThreadPool pool("MainThreadPool"); pool.setMaxQueueSize(maxSize); pool.start(5);
LOG_WARN << "Adding"; pool.run(print); pool.run(print); for (int i = 0; i < 100; ++i) { char buf[32]; snprintf(buf, sizeof(buf), "task %d", i); pool.run(std::bind(printString, std::string(buf))); } LOG_WARN << "Done"; muduo::CountDownLatch latch(1); pool.run(std::bind(&muduo::CountDownLatch::countDown, &latch)); latch.wait(); pool.stop(); }
int main() { test(0); test(1); test(5); test(10); test(50); return 0; }
|
有2点问题:
run只接受一个参数,那么调用线程如何向线程池传参?
解决方案有很多,一种是使用模板函数,为向工作队列加用户任务的run函数添加不定参数的重载版本;另一种,是使用std::bind配机器,向run传递一个新的可调用对象。muduo采用的是后者。
调用线程端的用户,如何获取用户任务执行结果?
run()没有任何返回值,用户只能自行设计用户任务函数及参数,通过参数状态取得结果。
当然,还有另外的办法就是,让run()返回一个std::future,通过future异步获取结果。
线程池大小的选择
若池中执行任务时,密集计算所占用的时间比重为\(P(0<P<=1)1\),而系统一共有\(C\)个CPU,为了让\(C\)个CPU都能得到充分利用而不过载,线程池大小的经验公式为:\(T=C/P\),即\(T*P=C\)(让CPU刚好跑满)
总结
参考: