Last active 1 year ago

mga's Avatar mga revised this gist 1 year ago. Go to revision

2 files changed, 143 insertions

threadPool.cpp(file created)

@@ -0,0 +1,72 @@
1 +
2 + #include "threadPool.h"
3 +
4 + ThreadPool::ThreadPool()
5 + : m_busyThreads{ 0 }
6 + , m_threads{std::vector<std::thread>(std::thread::hardware_concurrency())}
7 + , m_stopRequested{ false }
8 + {
9 + for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
10 + {
11 + m_threads[i] = std::thread(ThreadWorker(this));
12 + }
13 + }
14 +
15 + ThreadPool::~ThreadPool()
16 + {
17 + {
18 + std::lock_guard<std::mutex> lock(m_mutex);
19 + m_stopRequested = true;
20 + m_condition_task.notify_all();
21 + }
22 +
23 + for (size_t i = 0; i < m_threads.size(); ++i)
24 + {
25 + if (m_threads[i].joinable())
26 + {
27 + m_threads[i].join();
28 + }
29 + }
30 + }
31 +
32 + void ThreadPool::wait()
33 + {
34 + std::unique_lock<std::mutex> lock(m_mutex);
35 + m_condition_finish.wait(lock, [this] {
36 + return m_tasks.empty() && m_busyThreads == 0;
37 + });
38 + lock.unlock();
39 + }
40 +
41 + ThreadWorker::ThreadWorker(ThreadPool* pool)
42 + : m_pool(pool)
43 + {}
44 +
45 + ThreadWorker::~ThreadWorker()
46 + {}
47 +
48 + void ThreadWorker::operator()()
49 + {
50 + std::unique_lock<std::mutex> lock(m_pool->m_mutex);
51 + while (!m_pool->m_stopRequested || (m_pool->m_stopRequested && !m_pool->m_tasks.empty()))
52 + {
53 + m_pool->m_condition_task.wait(lock, [this] {
54 + return m_pool->m_stopRequested || !m_pool->m_tasks.empty();
55 + });
56 +
57 + if (!m_pool->m_tasks.empty())
58 + {
59 + m_pool->m_busyThreads++;
60 +
61 + auto task = m_pool->m_tasks.front();
62 + m_pool->m_tasks.pop();
63 + lock.unlock();
64 + task();
65 + lock.lock();
66 +
67 + m_pool->m_busyThreads--;
68 + m_pool->m_condition_finish.notify_one();
69 + }
70 + }
71 + }
72 +

threadPool.h(file created)

@@ -0,0 +1,71 @@
1 + #ifndef THREAD_POOL_H
2 + #define THREAD_POOL_H
3 +
4 + #include <thread>
5 + #include <mutex>
6 + #include <condition_variable>
7 + #include <functional>
8 + #include <future>
9 + #include <queue>
10 + #include <vector>
11 + #include <memory>
12 + #include <condition_variable>
13 +
14 + class ThreadPool
15 + {
16 + public:
17 + ThreadPool();
18 + ~ThreadPool();
19 +
20 + ThreadPool(const ThreadPool&) = delete;
21 + ThreadPool(ThreadPool&&) = delete;
22 + ThreadPool& operator=(const ThreadPool&) = delete;
23 + ThreadPool& operator=(ThreadPool&&) = delete;
24 +
25 + template<typename F, typename... Args>
26 + auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
27 + {
28 + auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
29 + auto taskPtr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(task);
30 + auto wrapper = [taskPtr]() {
31 + (*taskPtr)();
32 + };
33 +
34 + {
35 + std::lock_guard<std::mutex> lock(m_mutex);
36 + m_tasks.push(wrapper);
37 + m_condition_task.notify_one();
38 + }
39 +
40 + return taskPtr->get_future();
41 + }
42 + void wait();
43 +
44 + private:
45 + mutable std::mutex m_mutex;
46 + std::condition_variable m_condition_task;
47 + std::condition_variable m_condition_finish;
48 + unsigned int m_busyThreads;
49 +
50 + std::vector<std::thread> m_threads;
51 + bool m_stopRequested;
52 +
53 + std::queue<std::function<void()>> m_tasks;
54 +
55 + friend class ThreadWorker;
56 + };
57 +
58 +
59 + class ThreadWorker
60 + {
61 + public:
62 + ThreadWorker(ThreadPool* pool);
63 + ~ThreadWorker();
64 +
65 + void operator()();
66 +
67 + private:
68 + ThreadPool* m_pool;
69 + };
70 +
71 + #endif // !THREAD_POOL_H
Newer Older