Last active 1 year ago

threadPool.cpp Raw
1
2#include "threadPool.h"
3
4ThreadPool::ThreadPool()
5 : m_busyThreads{ 0 }
6 , m_threads{std::vector<std::thread>(std::thread::hardware_concurrency())}
7 , m_stopRequested{ false }
8{
9 for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
10 {
11 m_threads[i] = std::thread(ThreadWorker(this));
12 }
13}
14
15ThreadPool::~ThreadPool()
16{
17 {
18 std::lock_guard<std::mutex> lock(m_mutex);
19 m_stopRequested = true;
20 m_condition_task.notify_all();
21 }
22
23 for (size_t i = 0; i < m_threads.size(); ++i)
24 {
25 if (m_threads[i].joinable())
26 {
27 m_threads[i].join();
28 }
29 }
30}
31
32void ThreadPool::wait()
33{
34 std::unique_lock<std::mutex> lock(m_mutex);
35 m_condition_finish.wait(lock, [this] {
36 return m_tasks.empty() && m_busyThreads == 0;
37 });
38 lock.unlock();
39}
40
41ThreadWorker::ThreadWorker(ThreadPool* pool)
42 : m_pool(pool)
43{}
44
45ThreadWorker::~ThreadWorker()
46{}
47
48void ThreadWorker::operator()()
49{
50 std::unique_lock<std::mutex> lock(m_pool->m_mutex);
51 while (!m_pool->m_stopRequested || (m_pool->m_stopRequested && !m_pool->m_tasks.empty()))
52 {
53 m_pool->m_condition_task.wait(lock, [this] {
54 return m_pool->m_stopRequested || !m_pool->m_tasks.empty();
55 });
56
57 if (!m_pool->m_tasks.empty())
58 {
59 m_pool->m_busyThreads++;
60
61 auto task = m_pool->m_tasks.front();
62 m_pool->m_tasks.pop();
63 lock.unlock();
64 task();
65 lock.lock();
66
67 m_pool->m_busyThreads--;
68 m_pool->m_condition_finish.notify_one();
69 }
70 }
71}
72
73
threadPool.h Raw
1#ifndef THREAD_POOL_H
2#define THREAD_POOL_H
3
4#include <thread>
5#include <mutex>
6#include <condition_variable>
7#include <functional>
8#include <future>
9#include <queue>
10#include <vector>
11#include <memory>
12#include <condition_variable>
13
14class ThreadPool
15{
16public:
17 ThreadPool();
18 ~ThreadPool();
19
20 ThreadPool(const ThreadPool&) = delete;
21 ThreadPool(ThreadPool&&) = delete;
22 ThreadPool& operator=(const ThreadPool&) = delete;
23 ThreadPool& operator=(ThreadPool&&) = delete;
24
25 template<typename F, typename... Args>
26 auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
27 {
28 auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
29 auto taskPtr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(task);
30 auto wrapper = [taskPtr]() {
31 (*taskPtr)();
32 };
33
34 {
35 std::lock_guard<std::mutex> lock(m_mutex);
36 m_tasks.push(wrapper);
37 m_condition_task.notify_one();
38 }
39
40 return taskPtr->get_future();
41 }
42 void wait();
43
44private:
45 mutable std::mutex m_mutex;
46 std::condition_variable m_condition_task;
47 std::condition_variable m_condition_finish;
48 unsigned int m_busyThreads;
49
50 std::vector<std::thread> m_threads;
51 bool m_stopRequested;
52
53 std::queue<std::function<void()>> m_tasks;
54
55 friend class ThreadWorker;
56};
57
58
59class ThreadWorker
60{
61public:
62 ThreadWorker(ThreadPool* pool);
63 ~ThreadWorker();
64
65 void operator()();
66
67private:
68 ThreadPool* m_pool;
69};
70
71#endif // !THREAD_POOL_H