脚本宝典收集整理的这篇文章主要介绍了简单的线程池(九),脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
本文中,笔者尝试组合了非阻塞式和阻塞式的队列,成为新的组合式线程池。线程池有一个共享任务队列,每个工作线程各有一个工作任务队列。线程池用户提交的任务,先被保存在共享任务队列中。线程池的调度器线程将共享任务队列中的任务分派给工作线程的工作任务队列,工作线程从工作任务队列中获取任务并执行。
【注】图中 * 表示工作线程获取任务的方式会因工作任务队列类型的不同而变化。笔者对共享任务队列和工作任务队列采用不同的类型,组合成了三种方案,
No | 共享任务队列类型 | 工作任务队列类型 |
---|---|---|
1 | 阻塞 | 阻塞独占 |
2 | 阻塞 | 非阻塞互助 |
3 | 阻塞 | 非阻塞互助2B |
以下关于此线程池的说明中,将简述与 阻塞共享任务队列 、 阻塞独占任务队列 、非阻塞互助任务队列 和 非阻塞互助2B任务队列 的内容。如有不明之处,请先参考链接对应的博文。
以下代码给出了方案一的实现,(blocking_shared_blocking_unique_pool.h)
class Thread_Pool {
private:
struct Task_Wrapper { ...
};
atomic<bool> _suspend_;
atomic<bool> _done_;
Blocking_Queue<Task_Wrapper> _poolqueue_; // #1
thread _scheduler_; // #3
unsigned _workersize_;
thread* _workers_;
Blocking_Queue<Task_Wrapper>* _workerqueues_; // #2
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
_workerqueues_[index].pop(task); // #7
task();
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
void stop() {
size_t remaining = 0;
_suspend_.store(true, memory_order_release);
remaining = _poolqueue_.size(); // #8
for (unsigned i = 0; i < _workersize_; ++i)
remaining += _workerqueues_[i].size();
_suspend_.store(false, memory_order_release);
while (!_poolqueue_.empty())
std::this_thread::yield();
for (unsigned i = 0; i < _workersize_; ++i)
while (!_workerqueues_[i].empty())
std::this_thread::yield();
std::fprintf(stderr, "n%zu tasks remain before destructing pool.n", remaining);
_done_.store(true, memory_order_release);
_poolqueue_.push([] {}); // #9
for (unsigned i = 0; i < _workersize_; ++i)
_workerqueues_[i].push([] {});
for (unsigned i = 0; i < _workersize_; ++i)
if (_workers_[i].joinable())
_workers_[i].join();
if (_scheduler_.joinable())
_scheduler_.join();
delete[] _workers_;
delete[] _workerqueues_;
}
void schedule() {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
_poolqueue_.pop(task); // #6
_workerqueues_[rand() % _workersize_].push(std::move(task));
}
}
public:
Thread_Pool() : _suspend_(false), _done_(false) {
try {
_workersize_ = thread::hardware_concurrency();
_workers_ = new thread[_workersize_]();
_workerqueues_ = new Blocking_Queue<Task_Wrapper>[_workersize_](); // #4
for (unsigned i = 0; i < _workersize_; ++i)
_workers_[i] = thread(&Thread_Pool::work, this, i);
_scheduler_ = thread(&Thread_Pool::schedule, this); // #5
} catch (...) { ...
}
}
...
};
线程池中定义了阻塞式的共享任务队列(#1)、阻塞式的工作任务队列(#2)和调度器线程(#3)。线程池对象被创建时,它们一并被初始化(#4、#5)。线程池的调度器线程将共享任务队列中的任务分派给工作任务队列(#6),工作线程从各自的工作任务队列的头部获取任务并执行(#7)。在统计剩余的工作任务时,合计共享任务队列和工作任务队列中剩余的任务(#8)。为了避免发生死锁问题,向共享任务队列和每个工作任务队列中各放入一个假任务(#9),确保调度器线程和各个工作线程都能退出循环等待。
以下代码给出了方案二的实现,(blocking_shared_lockwise_mutual_pool.h)
class Thread_Pool {
private:
struct Task_Wrapper { ...
};
atomic<bool> _suspend_;
atomic<bool> _done_;
Blocking_Queue<Task_Wrapper> _poolqueue_; // #1
thread _scheduler_; // #3
unsigned _workersize_;
thread* _workers_;
Lockwise_Queue<Task_Wrapper>* _workerqueues_; // #2
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
if (_workerqueues_[index].pop(task))
task();
else
for (unsigned i = 0; i < _workersize_; ++i)
if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) { // #7
task();
break;
}
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
void stop() {
size_t remaining = 0;
_suspend_.store(true, memory_order_release);
remaining = _poolqueue_.size(); // #8
for (unsigned i = 0; i < _workersize_; ++i)
remaining += _workerqueues_[i].size();
_suspend_.store(false, memory_order_release);
while (!_poolqueue_.empty())
std::this_thread::yield();
for (unsigned i = 0; i < _workersize_; ++i)
while (!_workerqueues_[i].empty())
std::this_thread::yield();
std::fprintf(stderr, "n%zu tasks remain before destructing pool.n", remaining);
_done_.store(true, memory_order_release);
_poolqueue_.push([] {}); // #9
for (unsigned i = 0; i < _workersize_; ++i)
if (_workers_[i].joinable())
_workers_[i].join();
if (_scheduler_.joinable())
_scheduler_.join();
delete[] _workers_;
delete[] _workerqueues_;
}
void schedule() {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
_poolqueue_.pop(task); // #6
_workerqueues_[rand() % _workersize_].push(std::move(task));
}
}
public:
Thread_Pool() : _suspend_(false), _done_(false) {
try {
_workersize_ = thread::hardware_concurrency();
_workers_ = new thread[_workersize_]();
_workerqueues_ = new Lockwise_Queue<Task_Wrapper>[_workersize_](); // #4
for (unsigned i = 0; i < _workersize_; ++i)
_workers_[i] = thread(&Thread_Pool::work, this, i);
_scheduler_ = thread(&Thread_Pool::schedule, this); // #5
} catch (...) { ...
}
}
...
};
线程池中定义了阻塞式的共享任务队列(#1)、非阻塞互助式的工作任务队列(#2)和调度器线程(#3)。线程池对象被创建时,它们一并被初始化(#4、#5)。线程池的调度器线程将共享任务队列中的任务分派给工作任务队列(#6),工作线程从各自的工作任务队列的头部获取任务并执行。当自己的工作任务队列中无任务时,此工作线程会从其他工作线程的工作任务队列头部获取任务(#7)。在统计剩余的工作任务时,合计共享任务队列和工作任务队列中剩余的任务(#8)。为了避免发生死锁问题,向共享任务队列中放入一个假任务(#9),确保调度器线程能退出循环等待。
以下代码给出了方案三的实现,(blocking_shared_lockwise_mutual_2b_pool.h)
class Thread_Pool {
private:
struct Task_Wrapper { ...
};
atomic<bool> _suspend_;
atomic<bool> _done_;
Blocking_Queue<Task_Wrapper> _poolqueue_; // #1
thread _scheduler_; // #3
unsigned _workersize_;
thread* _workers_;
Lockwise_Deque<Task_Wrapper>* _workerqueues_; // #2
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
if (_workerqueues_[index].pull(task))
task();
else
for (unsigned i = 0; i < _workersize_; ++i)
if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) { // #7
task();
break;
}
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
void stop() {
size_t remaining = 0;
_suspend_.store(true, memory_order_release);
remaining = _poolqueue_.size(); // #8
for (unsigned i = 0; i < _workersize_; ++i)
remaining += _workerqueues_[i].size();
_suspend_.store(false, memory_order_release);
while (!_poolqueue_.empty())
std::this_thread::yield();
for (unsigned i = 0; i < _workersize_; ++i)
while (!_workerqueues_[i].empty())
std::this_thread::yield();
std::fprintf(stderr, "n%zu tasks remain before destructing pool.n", remaining);
_done_.store(true, memory_order_release);
_poolqueue_.push([] {}); // #9
for (unsigned i = 0; i < _workersize_; ++i)
if (_workers_[i].joinable())
_workers_[i].join();
if (_scheduler_.joinable())
_scheduler_.join();
delete[] _workers_;
delete[] _workerqueues_;
}
void schedule() {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
_poolqueue_.pop(task); // #6
_workerqueues_[rand() % _workersize_].push(std::move(task));
}
}
public:
Thread_Pool() : _suspend_(false), _done_(false) {
try {
_workersize_ = thread::hardware_concurrency();
_workers_ = new thread[_workersize_]();
_workerqueues_ = new Lockwise_Deque<Task_Wrapper>[_workersize_](); // #4
for (unsigned i = 0; i < _workersize_; ++i)
_workers_[i] = thread(&Thread_Pool::work, this, i);
_scheduler_ = thread(&Thread_Pool::schedule, this); // #5
} catch (...) { ...
}
}
...
};
线程池中定义了阻塞式的共享任务队列(#1)、非阻塞互助2B式的工作任务队列(#2)和调度器线程(#3)。线程池对象被创建时,它们一并被初始化(#4、#5)。线程池的调度器线程将共享任务队列中的任务分派给工作任务队列(#6),工作线程从各自的工作任务队列的尾部获取任务并执行。当自己的工作任务队列中无任务时,此工作线程会从其他工作线程的工作任务队列的头部获取任务(#7)。在统计剩余的工作任务时,合计共享任务队列和工作任务队列中剩余的任务(#8)。为了避免发生死锁问题,向共享任务队列中放入一个假任务(#9),确保调度器线程能退出循环等待。
以下类图和顺序图分别展现了方案一的主要逻辑结构以及线程池用户提交任务与调度器线程、工作线程执行任务的并发过程,
[注] 图中用构造型(stereotype)标识出调度器线程和工作线程的初始函数,并在注解中加以说明调用关系,下同。
以下为方案二的逻辑,
以下为方案三的逻辑,
验证过程采用了 《简单的线程池(三)》 中定义的的测试用例。笔者对比了测试结果与 《简单的线程池(八)》 的数据,结果如下,
图1 列举了 吞吐量1的差异 在 0.5 分钟、1 分钟和 3 分钟的提交周期内不同思考时间上的对比。
【注】三种组合方案分别略称为 BSBU、BSLM、BSLM2B,下同。
可以看到,
图2 列举了 吞吐量2的差异 在 0.5 分钟、1 分钟和 3 分钟的提交周期内不同思考时间上的对比。
可以看到,
图3 列举了 吞吐量3的差异 在 0.5 分钟、1 分钟和 3 分钟的提交周期内不同思考时间上的对比。
基于以上的对比分析,笔者认为,
完整的代码示例和测试数据请参考 [github] cnblogs/15754987 。
以上是脚本宝典为你收集整理的简单的线程池(九)全部内容,希望文章能够帮你解决简单的线程池(九)所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。