From 05500b4500c5acc2f382bf60312e73435affd588 Mon Sep 17 00:00:00 2001 From: zhaoyong Date: Wed, 27 Jun 2018 01:15:22 +0800 Subject: [PATCH] added ThreadPool --- GSLAM/core/Mutex.h | 85 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/GSLAM/core/Mutex.h b/GSLAM/core/Mutex.h index a7a36962..86100959 100644 --- a/GSLAM/core/Mutex.h +++ b/GSLAM/core/Mutex.h @@ -20,6 +20,10 @@ typedef pi::Event Event; #include #include #include +#include +#include +#include +#include "Glog.h" namespace GSLAM{ typedef std::mutex Mutex; @@ -74,6 +78,87 @@ class Event std::condition_variable _cond; }; +// A simple threadpool implementation. +class ThreadPool { + public: + // All the threads are created upon construction. + explicit ThreadPool(const int num_threads): stop(false) { + CHECK_GE(num_threads, 1) + << "The number of threads specified to the ThreadPool is insufficient."; + for (size_t i = 0; i < num_threads; ++i) { + workers.emplace_back([this] { + for (;;) { + std::function task; + + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, [this] { + return this->stop || !this->tasks.empty(); + }); + if (this->stop && this->tasks.empty()) return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + + task(); + } + }); + } + } + ~ThreadPool(){ + { + std::unique_lock lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for (std::thread& worker : workers) + worker.join(); + } + + + // Adds a task to the threadpool. + template + auto Add(F&& f, Args&& ... args) + ->std::future::type>; + + private: + // Keep track of threads so we can join them + std::vector workers; + // The task queue + std::queue > tasks; + + // Synchronization + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; + +}; + +// add new work item to the pool +template +auto ThreadPool::Add(F&& f, Args&& ... args) + ->std::future::type> { + using return_type = typename std::result_of::type; + + auto task = std::make_shared >( + std::bind(std::forward(f), std::forward(args)...)); + + std::future res = task->get_future(); + { + std::unique_lock lock(queue_mutex); + + // don't allow enqueueing after stopping the pool + CHECK(!stop) << "The ThreadPool object has been destroyed! Cannot add more " + "tasks to the ThreadPool!"; + + tasks.emplace([task]() { + (*task)(); + }); + } + condition.notify_one(); + return res; +} + } #endif