1 #ifndef OPENPOSE_THREAD_THREAD_MANAGER_HPP
2 #define OPENPOSE_THREAD_THREAD_MANAGER_HPP
15 template<
typename TDatums,
typename TWorker = std::shared_ptr<Worker<TDatums>>,
typename TQueue = Queue<TDatums>>
24 void add(
const unsigned long long threadId,
const std::vector<TWorker>& tWorkers,
const unsigned long long queueInId,
25 const unsigned long long queueOutId);
27 void add(
const unsigned long long threadId,
const TWorker& tWorker,
const unsigned long long queueInId,
28 const unsigned long long queueOutId);
52 bool tryPush(
const TDatums& tDatums);
56 bool tryPop(TDatums& tDatums);
62 std::shared_ptr<std::atomic<bool>> spIsRunning;
63 long long mDefaultMaxSizeQueues;
64 std::multiset<std::tuple<unsigned long long, std::vector<TWorker>,
unsigned long long,
unsigned long long>> mThreadWorkerQueues;
65 std::vector<std::shared_ptr<Thread<TDatums, TWorker>>> mThreads;
66 std::vector<std::shared_ptr<TQueue>> mTQueues;
68 void add(
const std::vector<std::tuple<
unsigned long long, std::vector<TWorker>,
unsigned long long,
unsigned long long>>& threadWorkerQueues);
70 void add(
const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long, unsigned long long>>& threadWorkerQueues);
72 void multisetToThreads();
74 void checkAndCreateEmptyThreads();
76 void checkAndCreateQueues();
96 template<
typename TDatums,
typename TWorker,
typename TQueue>
98 mThreadManagerMode{threadManagerMode},
99 spIsRunning{std::make_shared<std::atomic<bool>>(
false)},
100 mDefaultMaxSizeQueues{-1ll}
104 template<
typename TDatums,
typename TWorker,
typename TQueue>
109 mDefaultMaxSizeQueues = {defaultMaxSizeQueues};
111 catch (
const std::exception& e)
113 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
117 template<
typename TDatums,
typename TWorker,
typename TQueue>
119 const std::vector<TWorker>& tWorkers,
120 const unsigned long long queueInId,
121 const unsigned long long queueOutId)
125 add({std::make_tuple(threadId, tWorkers, queueInId, queueOutId)});
127 catch (
const std::exception& e)
129 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
133 template<
typename TDatums,
typename TWorker,
typename TQueue>
135 const TWorker& tWorker,
136 const unsigned long long queueInId,
137 const unsigned long long queueOutId)
141 add({std::make_tuple(threadId, std::vector<TWorker>{tWorker}, queueInId, queueOutId)});
143 catch (
const std::exception& e)
145 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
149 template<
typename TDatums,
typename TWorker,
typename TQueue>
154 mThreadWorkerQueues.clear();
158 catch (
const std::exception& e)
160 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
164 template<
typename TDatums,
typename TWorker,
typename TQueue>
172 if (!mThreads.empty())
176 for (
auto i = 0u; i < mThreads.size() - 1; i++)
177 mThreads.at(i)->startInThread();
178 (*mThreads.rbegin())->exec(spIsRunning);
184 catch (
const std::exception& e)
186 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
190 template<
typename TDatums,
typename TWorker,
typename TQueue>
199 for (
auto& thread : mThreads)
200 thread->startInThread();
203 catch (
const std::exception& e)
205 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
209 template<
typename TDatums,
typename TWorker,
typename TQueue>
215 for (
auto& tQueue : mTQueues)
218 *spIsRunning =
false;
219 for (
auto& thread : mThreads)
220 thread->stopAndJoin();
223 catch (
const std::exception& e)
225 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
229 template<
typename TDatums,
typename TWorker,
typename TQueue>
236 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
237 if (mTQueues.empty())
238 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
239 return mTQueues[0]->tryEmplace(tDatums);
241 catch (
const std::exception& e)
243 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
248 template<
typename TDatums,
typename TWorker,
typename TQueue>
255 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
256 if (mTQueues.empty())
257 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
258 return mTQueues[0]->waitAndEmplace(tDatums);
260 catch (
const std::exception& e)
262 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
267 template<
typename TDatums,
typename TWorker,
typename TQueue>
274 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
275 if (mTQueues.empty())
276 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
277 return mTQueues[0]->tryPush(tDatums);
279 catch (
const std::exception& e)
281 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
286 template<
typename TDatums,
typename TWorker,
typename TQueue>
293 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
294 if (mTQueues.empty())
295 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
296 return mTQueues[0]->waitAndPush(tDatums);
298 catch (
const std::exception& e)
300 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
305 template<
typename TDatums,
typename TWorker,
typename TQueue>
312 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
313 if (mTQueues.empty())
314 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
315 return (*mTQueues.rbegin())->tryPop(tDatums);
317 catch (
const std::exception& e)
319 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
324 template<
typename TDatums,
typename TWorker,
typename TQueue>
331 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
332 if (mTQueues.empty())
333 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
334 return (*mTQueues.rbegin())->waitAndPop(tDatums);
336 catch (
const std::exception& e)
338 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
343 template<
typename TDatums,
typename TWorker,
typename TQueue>
345 unsigned long long,
unsigned long long>>& threadWorkerQueues)
349 for (
const auto& threadWorkerQueue : threadWorkerQueues)
350 mThreadWorkerQueues.insert(threadWorkerQueue);
352 catch (
const std::exception& e)
354 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
358 template<
typename TDatums,
typename TWorker,
typename TQueue>
360 unsigned long long>>& threadWorkerQueues)
364 for (
const auto& threadWorkerQueue : threadWorkerQueues)
365 add({std::make_tuple(std::get<0>(threadWorkerQueue),
366 std::vector<TWorker>{std::get<1>(threadWorkerQueue)},
367 std::get<2>(threadWorkerQueue),
368 std::get<3>(threadWorkerQueue))});
370 catch (
const std::exception& e)
372 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
376 template<
typename TDatums,
typename TWorker,
typename TQueue>
377 void ThreadManager<TDatums, TWorker, TQueue>::multisetToThreads()
381 if (!mThreadWorkerQueues.empty())
384 checkAndCreateEmptyThreads();
387 checkAndCreateQueues();
390 const auto maxQueueIdSynchronous = mTQueues.size()+1;
393 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
395 auto& thread = mThreads[std::get<0>(threadWorkerQueue)];
396 const auto& tWorkers = std::get<1>(threadWorkerQueue);
397 const auto queueIn = std::get<2>(threadWorkerQueue);
398 const auto queueOut = std::get<3>(threadWorkerQueue);
399 std::shared_ptr<SubThread<TDatums, TWorker>> subThread;
405 && queueOut == mTQueues.size())
406 subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
407 tWorkers, mTQueues.at(queueIn))};
409 subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
410 tWorkers, mTQueues.at(queueIn), mTQueues.at(queueOut))};
413 else if (queueOut != maxQueueIdSynchronous
418 subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
419 tWorkers, mTQueues.at(queueIn-1), mTQueues.at(queueOut-1))};
422 subThread = {std::make_shared<SubThreadQueueOut<TDatums, TWorker, TQueue>>(
423 tWorkers, mTQueues.at(queueOut-1))};
426 else if (queueIn != 0)
427 subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
428 tWorkers, mTQueues.at(queueIn-1))};
431 subThread = {std::make_shared<SubThreadNoQueue<TDatums, TWorker>>(tWorkers)};
432 thread->add(subThread);
436 error(
"Empty, no TWorker(s) added.", __LINE__);
438 catch (
const std::exception& e)
440 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
444 template<
typename TDatums,
typename TWorker,
typename TQueue>
445 void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateEmptyThreads()
450 const auto maxThreadId = std::get<0>(*mThreadWorkerQueues.crbegin());
451 auto previousThreadId = std::get<0>(*mThreadWorkerQueues.cbegin());
452 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
454 const auto currentThreadId = std::get<0>(threadWorkerQueue);
455 if (currentThreadId - previousThreadId > 1)
456 error(
"Missing thread id " + std::to_string(currentThreadId) +
" of "
457 + std::to_string(maxThreadId) +
".", __LINE__, __FUNCTION__, __FILE__);
458 previousThreadId = currentThreadId;
463 mThreads.resize(maxThreadId);
464 for (
auto& thread : mThreads)
465 thread = std::make_shared<Thread<TDatums, TWorker>>();
466 mThreads.emplace_back(std::make_shared<Thread<TDatums, TWorker>>(spIsRunning));
468 catch (
const std::exception& e)
470 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
474 template<
typename TDatums,
typename TWorker,
typename TQueue>
475 void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateQueues()
479 if (!mThreadWorkerQueues.empty())
482 auto maxQueueId = std::get<3>(*mThreadWorkerQueues.cbegin());
483 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
485 maxQueueId,
fastMax(std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue)));
489 std::vector<std::pair<bool, bool>> usedQueueIds(maxQueueId+1, {
false,
false});
490 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
492 usedQueueIds.at(std::get<2>(threadWorkerQueue)).first =
true;
493 usedQueueIds.at(std::get<3>(threadWorkerQueue)).second =
true;
496 usedQueueIds.begin()->second =
true;
498 usedQueueIds.rbegin()->first =
true;
500 for (
auto i = 0ull ; i < usedQueueIds.size() ; i++)
502 if (!usedQueueIds[i].first)
503 error(
"Missing queue id " + std::to_string(i) +
" (of "
504 + std::to_string(maxQueueId) +
") as input.", __LINE__, __FUNCTION__, __FILE__);
505 if (!usedQueueIds[i].second)
506 error(
"Missing queue id " + std::to_string(i) +
" (of "
507 + std::to_string(maxQueueId) +
") as output.", __LINE__, __FUNCTION__, __FILE__);
512 mTQueues.resize(maxQueueId+1);
514 mTQueues.resize(maxQueueId-1);
517 mTQueues.resize(maxQueueId);
519 error(
"Unknown ThreadManagerMode", __LINE__, __FUNCTION__, __FILE__);
520 for (
auto& tQueue : mTQueues)
521 tQueue = std::make_shared<TQueue>(mDefaultMaxSizeQueues);
524 catch (
const std::exception& e)
526 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
533 #endif // OPENPOSE_THREAD_THREAD_MANAGER_HPP
ThreadManager(const ThreadManagerMode threadManagerMode=ThreadManagerMode::Synchronous)
Definition: threadManager.hpp:97
void stop()
Definition: threadManager.hpp:210
bool isRunning() const
Definition: threadManager.hpp:43
void start()
Definition: threadManager.hpp:191
void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues=-1)
Definition: threadManager.hpp:105
bool waitAndEmplace(TDatums &tDatums)
Definition: threadManager.hpp:249
std::shared_ptr< std::atomic< bool > > getIsRunningSharedPtr()
Definition: threadManager.hpp:38
T fastMax(const T a, const T b)
Definition: fastMath.hpp:70
bool tryPop(TDatums &tDatums)
Definition: threadManager.hpp:306
void add(const unsigned long long threadId, const std::vector< TWorker > &tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId)
Definition: threadManager.hpp:118
void reset()
Definition: threadManager.hpp:150
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
bool waitAndPop(TDatums &tDatums)
Definition: threadManager.hpp:325
bool tryEmplace(TDatums &tDatums)
Definition: threadManager.hpp:230
Definition: threadManager.hpp:16
void exec()
Definition: threadManager.hpp:165
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
ThreadManagerMode
Definition: enumClasses.hpp:9
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
std::vector< T, Alloc > vector
Definition: cl2.hpp:567
bool waitAndPush(const TDatums &tDatums)
Definition: threadManager.hpp:287
bool tryPush(const TDatums &tDatums)
Definition: threadManager.hpp:268