OpenPose  1.0.0rc2
OpenPose: A Real-Time Multi-Person Key-Point Detection And Multi-Threading C++ Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
threadManager.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_THREAD_MANAGER_HPP
2 #define OPENPOSE_THREAD_THREAD_MANAGER_HPP
3 
4 #include <atomic>
5 #include <set> // std::multiset
6 #include <tuple>
12 
13 namespace op
14 {
15  template<typename TDatums, typename TWorker = std::shared_ptr<Worker<TDatums>>, typename TQueue = Queue<TDatums>>
17  {
18  public:
19  // Completely customizable case
20  explicit ThreadManager(const ThreadManagerMode threadManagerMode = ThreadManagerMode::Synchronous);
21 
22  virtual ~ThreadManager();
23 
24  void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues = -1);
25 
26  void add(const unsigned long long threadId, const std::vector<TWorker>& tWorkers, const unsigned long long queueInId,
27  const unsigned long long queueOutId);
28 
29  void add(const unsigned long long threadId, const TWorker& tWorker, const unsigned long long queueInId,
30  const unsigned long long queueOutId);
31 
32  void reset();
33 
34  void exec();
35 
36  void start();
37 
38  void stop();
39 
40  inline std::shared_ptr<std::atomic<bool>> getIsRunningSharedPtr()
41  {
42  return spIsRunning;
43  }
44 
45  inline bool isRunning() const
46  {
47  return *spIsRunning;
48  }
49 
50  bool tryEmplace(TDatums& tDatums);
51 
52  bool waitAndEmplace(TDatums& tDatums);
53 
54  bool tryPush(const TDatums& tDatums);
55 
56  bool waitAndPush(const TDatums& tDatums);
57 
58  bool tryPop(TDatums& tDatums);
59 
60  bool waitAndPop(TDatums& tDatums);
61 
62  private:
63  const ThreadManagerMode mThreadManagerMode;
64  std::shared_ptr<std::atomic<bool>> spIsRunning;
65  long long mDefaultMaxSizeQueues;
66  std::multiset<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>> mThreadWorkerQueues;
67  std::vector<std::shared_ptr<Thread<TDatums, TWorker>>> mThreads;
68  std::vector<std::shared_ptr<TQueue>> mTQueues;
69 
70  void add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>>& threadWorkerQueues);
71 
72  void add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long, unsigned long long>>& threadWorkerQueues);
73 
74  void multisetToThreads();
75 
76  void checkAndCreateEmptyThreads();
77 
78  void checkAndCreateQueues();
79 
80  DELETE_COPY(ThreadManager);
81  };
82 }
83 
84 
85 
86 
87 
88 // Implementation
89 #include <utility> // std::pair
96 namespace op
97 {
98  template<typename TDatums, typename TWorker, typename TQueue>
100  mThreadManagerMode{threadManagerMode},
101  spIsRunning{std::make_shared<std::atomic<bool>>(false)},
102  mDefaultMaxSizeQueues{-1ll}
103  {
104  }
105 
106  template<typename TDatums, typename TWorker, typename TQueue>
108  {
109  }
110 
111  template<typename TDatums, typename TWorker, typename TQueue>
112  void ThreadManager<TDatums, TWorker, TQueue>::setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues)
113  {
114  try
115  {
116  mDefaultMaxSizeQueues = {defaultMaxSizeQueues};
117  }
118  catch (const std::exception& e)
119  {
120  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
121  }
122  }
123 
124  template<typename TDatums, typename TWorker, typename TQueue>
125  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId,
126  const std::vector<TWorker>& tWorkers,
127  const unsigned long long queueInId,
128  const unsigned long long queueOutId)
129  {
130  try
131  {
132  add({std::make_tuple(threadId, tWorkers, queueInId, queueOutId)});
133  }
134  catch (const std::exception& e)
135  {
136  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
137  }
138  }
139 
140  template<typename TDatums, typename TWorker, typename TQueue>
141  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId,
142  const TWorker& tWorker,
143  const unsigned long long queueInId,
144  const unsigned long long queueOutId)
145  {
146  try
147  {
148  add({std::make_tuple(threadId, std::vector<TWorker>{tWorker}, queueInId, queueOutId)});
149  }
150  catch (const std::exception& e)
151  {
152  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
153  }
154  }
155 
156  template<typename TDatums, typename TWorker, typename TQueue>
158  {
159  try
160  {
161  mThreadWorkerQueues.clear();
162  mThreads.clear();
163  mTQueues.clear();
164  }
165  catch (const std::exception& e)
166  {
167  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
168  }
169  }
170 
171  template<typename TDatums, typename TWorker, typename TQueue>
173  {
174  try
175  {
176  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
177  // Set threads
178  multisetToThreads();
179  if (!mThreads.empty())
180  {
181  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
182  // Start threads
183  for (auto i = 0u; i < mThreads.size() - 1; i++)
184  mThreads.at(i)->startInThread();
185  (*mThreads.rbegin())->exec(spIsRunning);
186  // Stop threads - It will arrive here when the exec() command has finished
187  stop();
188  }
189  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
190  }
191  catch (const std::exception& e)
192  {
193  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
194  }
195  }
196 
197  template<typename TDatums, typename TWorker, typename TQueue>
199  {
200  try
201  {
202  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
203  // Set threads
204  multisetToThreads();
205  // Start threads
206  for (auto& thread : mThreads)
207  thread->startInThread();
208  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
209  }
210  catch (const std::exception& e)
211  {
212  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
213  }
214  }
215 
216  template<typename TDatums, typename TWorker, typename TQueue>
218  {
219  try
220  {
221  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
222  for (auto& tQueue : mTQueues)
223  tQueue->stop();
224  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
225  *spIsRunning = false;
226  for (auto& thread : mThreads)
227  thread->stopAndJoin();
228  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
229  }
230  catch (const std::exception& e)
231  {
232  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
233  }
234  }
235 
236  template<typename TDatums, typename TWorker, typename TQueue>
238  {
239  try
240  {
241  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
242  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
243  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
244  if (mTQueues.empty())
245  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
246  return mTQueues[0]->tryEmplace(tDatums);
247  }
248  catch (const std::exception& e)
249  {
250  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
251  return false;
252  }
253  }
254 
255  template<typename TDatums, typename TWorker, typename TQueue>
257  {
258  try
259  {
260  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
261  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
262  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
263  if (mTQueues.empty())
264  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
265  return mTQueues[0]->waitAndEmplace(tDatums);
266  }
267  catch (const std::exception& e)
268  {
269  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
270  return false;
271  }
272  }
273 
274  template<typename TDatums, typename TWorker, typename TQueue>
276  {
277  try
278  {
279  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
280  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
281  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
282  if (mTQueues.empty())
283  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
284  return mTQueues[0]->tryPush(tDatums);
285  }
286  catch (const std::exception& e)
287  {
288  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
289  return false;
290  }
291  }
292 
293  template<typename TDatums, typename TWorker, typename TQueue>
295  {
296  try
297  {
298  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
299  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
300  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
301  if (mTQueues.empty())
302  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
303  return mTQueues[0]->waitAndPush(tDatums);
304  }
305  catch (const std::exception& e)
306  {
307  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
308  return false;
309  }
310  }
311 
312  template<typename TDatums, typename TWorker, typename TQueue>
314  {
315  try
316  {
317  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
318  && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
319  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
320  if (mTQueues.empty())
321  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
322  return (*mTQueues.rbegin())->tryPop(tDatums);
323  }
324  catch (const std::exception& e)
325  {
326  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
327  return false;
328  }
329  }
330 
331  template<typename TDatums, typename TWorker, typename TQueue>
333  {
334  try
335  {
336  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
337  && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
338  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
339  if (mTQueues.empty())
340  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
341  return (*mTQueues.rbegin())->waitAndPop(tDatums);
342  }
343  catch (const std::exception& e)
344  {
345  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
346  return false;
347  }
348  }
349 
350  template<typename TDatums, typename TWorker, typename TQueue>
351  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>,
352  unsigned long long, unsigned long long>>& threadWorkerQueues)
353  {
354  try
355  {
356  for (const auto& threadWorkerQueue : threadWorkerQueues)
357  mThreadWorkerQueues.insert(threadWorkerQueue);
358  }
359  catch (const std::exception& e)
360  {
361  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
362  }
363  }
364 
365  template<typename TDatums, typename TWorker, typename TQueue>
366  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long,
367  unsigned long long>>& threadWorkerQueues)
368  {
369  try
370  {
371  for (const auto& threadWorkerQueue : threadWorkerQueues)
372  add({std::make_tuple(std::get<0>(threadWorkerQueue),
373  std::vector<TWorker>{std::get<1>(threadWorkerQueue)},
374  std::get<2>(threadWorkerQueue),
375  std::get<3>(threadWorkerQueue))});
376  }
377  catch (const std::exception& e)
378  {
379  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
380  }
381  }
382 
383  template<typename TDatums, typename TWorker, typename TQueue>
384  void ThreadManager<TDatums, TWorker, TQueue>::multisetToThreads()
385  {
386  try
387  {
388  if (!mThreadWorkerQueues.empty())
389  {
390  // Check threads
391  checkAndCreateEmptyThreads();
392 
393  // Check and create queues
394  checkAndCreateQueues();
395 
396  // Data
397  const auto maxQueueIdSynchronous = mTQueues.size()+1;
398 
399  // Set up threads
400  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
401  {
402  auto& thread = mThreads[std::get<0>(threadWorkerQueue)];
403  const auto& tWorkers = std::get<1>(threadWorkerQueue);
404  const auto queueIn = std::get<2>(threadWorkerQueue);
405  const auto queueOut = std::get<3>(threadWorkerQueue);
406  std::shared_ptr<SubThread<TDatums, TWorker>> subThread;
407  // If AsynchronousIn -> queue indexes are OK
408  if (mThreadManagerMode == ThreadManagerMode::Asynchronous
409  || mThreadManagerMode == ThreadManagerMode::AsynchronousIn)
410  {
411  if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn
412  && queueOut == mTQueues.size())
413  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
414  tWorkers, mTQueues.at(queueIn))};
415  else
416  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
417  tWorkers, mTQueues.at(queueIn), mTQueues.at(queueOut))};
418  }
419  // If !AsynchronousIn -> queue indexes - 1
420  else if (queueOut != maxQueueIdSynchronous
421  || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
422  {
423  // Queue in + out
424  if (queueIn != 0)
425  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
426  tWorkers, mTQueues.at(queueIn-1), mTQueues.at(queueOut-1))};
427  // Case queue out (first TWorker(s))
428  else
429  subThread = {std::make_shared<SubThreadQueueOut<TDatums, TWorker, TQueue>>(
430  tWorkers, mTQueues.at(queueOut-1))};
431  }
432  // Case queue in (last TWorker(s))
433  else if (queueIn != 0) // && queueOut == maxQueueIdSynchronous
434  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
435  tWorkers, mTQueues.at(queueIn-1))};
436  // Case no queue
437  else // if (queueIn == 0 && queueOut == maxQueueIdSynchronous)
438  subThread = {std::make_shared<SubThreadNoQueue<TDatums, TWorker>>(tWorkers)};
439  thread->add(subThread);
440  }
441  }
442  else
443  error("Empty, no TWorker(s) added.", __LINE__);
444  }
445  catch (const std::exception& e)
446  {
447  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
448  }
449  }
450 
451  template<typename TDatums, typename TWorker, typename TQueue>
452  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateEmptyThreads()
453  {
454  try
455  {
456  // Check all thread ids from 0-maxThreadId are present
457  const auto maxThreadId = std::get<0>(*mThreadWorkerQueues.crbegin());
458  auto previousThreadId = std::get<0>(*mThreadWorkerQueues.cbegin());
459  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
460  {
461  const auto currentThreadId = std::get<0>(threadWorkerQueue);
462  if (currentThreadId - previousThreadId > 1)
463  error("Missing thread id " + std::to_string(currentThreadId) + " of "
464  + std::to_string(maxThreadId) + ".", __LINE__, __FUNCTION__, __FILE__);
465  previousThreadId = currentThreadId;
466  }
467 
468  // Create Threads
469  // #threads = maxThreadId+1
470  mThreads.resize(maxThreadId);
471  for (auto& thread : mThreads)
472  thread = std::make_shared<Thread<TDatums, TWorker>>();
473  mThreads.emplace_back(std::make_shared<Thread<TDatums, TWorker>>(spIsRunning));
474  }
475  catch (const std::exception& e)
476  {
477  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
478  }
479  }
480 
481  template<typename TDatums, typename TWorker, typename TQueue>
482  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateQueues()
483  {
484  try
485  {
486  if (!mThreadWorkerQueues.empty())
487  {
488  // Get max queue id to get queue size
489  auto maxQueueId = std::get<3>(*mThreadWorkerQueues.cbegin());
490  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
491  maxQueueId = fastMax(
492  maxQueueId, fastMax(std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue)));
493 
494  // Check each queue id has at least a worker that uses it as input and another one as output.
495  // Special cases:
496  std::vector<std::pair<bool, bool>> usedQueueIds(maxQueueId+1, {false, false});
497  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
498  {
499  usedQueueIds.at(std::get<2>(threadWorkerQueue)).first = true;
500  usedQueueIds.at(std::get<3>(threadWorkerQueue)).second = true;
501  }
502  // Id 0 must only needs a worker using it as input.
503  usedQueueIds.begin()->second = true;
504  // Id maxQueueId only needs a worker using it as output.
505  usedQueueIds.rbegin()->first = true;
506  // Error if missing queue id
507  for (auto i = 0ull ; i < usedQueueIds.size() ; i++)
508  {
509  if (!usedQueueIds[i].first)
510  error("Missing queue id " + std::to_string(i) + " (of "
511  + std::to_string(maxQueueId) + ") as input.", __LINE__, __FUNCTION__, __FILE__);
512  if (!usedQueueIds[i].second)
513  error("Missing queue id " + std::to_string(i) + " (of "
514  + std::to_string(maxQueueId) + ") as output.", __LINE__, __FUNCTION__, __FILE__);
515  }
516 
517  // Create Queues
518  if (mThreadManagerMode == ThreadManagerMode::Asynchronous)
519  mTQueues.resize(maxQueueId+1); // First and last one are queues
520  else if (mThreadManagerMode == ThreadManagerMode::Synchronous)
521  mTQueues.resize(maxQueueId-1); // First and last one are not actually queues
522  else if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn
523  || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
524  mTQueues.resize(maxQueueId); // First or last one is queue
525  else
526  error("Unknown ThreadManagerMode", __LINE__, __FUNCTION__, __FILE__);
527  for (auto& tQueue : mTQueues)
528  tQueue = std::make_shared<TQueue>(mDefaultMaxSizeQueues);
529  }
530  }
531  catch (const std::exception& e)
532  {
533  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
534  }
535  }
536 
537  COMPILE_TEMPLATE_DATUM(ThreadManager);
538 }
539 
540 #endif // OPENPOSE_THREAD_THREAD_MANAGER_HPP
virtual ~ThreadManager()
Definition: threadManager.hpp:107
ThreadManager(const ThreadManagerMode threadManagerMode=ThreadManagerMode::Synchronous)
Definition: threadManager.hpp:99
void stop()
Definition: threadManager.hpp:217
bool isRunning() const
Definition: threadManager.hpp:45
void start()
Definition: threadManager.hpp:198
void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues=-1)
Definition: threadManager.hpp:112
bool waitAndEmplace(TDatums &tDatums)
Definition: threadManager.hpp:256
std::shared_ptr< std::atomic< bool > > getIsRunningSharedPtr()
Definition: threadManager.hpp:40
T fastMax(const T a, const T b)
Definition: fastMath.hpp:70
bool tryPop(TDatums &tDatums)
Definition: threadManager.hpp:313
void add(const unsigned long long threadId, const std::vector< TWorker > &tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId)
Definition: threadManager.hpp:125
void reset()
Definition: threadManager.hpp:157
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
bool waitAndPop(TDatums &tDatums)
Definition: threadManager.hpp:332
bool tryEmplace(TDatums &tDatums)
Definition: threadManager.hpp:237
Definition: threadManager.hpp:16
void exec()
Definition: threadManager.hpp:172
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
ThreadManagerMode
Definition: enumClasses.hpp:9
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
std::vector< T, Alloc > vector
Definition: cl2.hpp:567
bool waitAndPush(const TDatums &tDatums)
Definition: threadManager.hpp:294
bool tryPush(const TDatums &tDatums)
Definition: threadManager.hpp:275