OpenPose  1.0.0rc2
OpenPose: A Real-Time Multi-Person Key-Point Detection And Multi-Threading C++ Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
threadManager.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_THREAD_MANAGER_HPP
2 #define OPENPOSE_THREAD_THREAD_MANAGER_HPP
3 
4 #include <atomic>
5 #include <set> // std::multiset
6 #include <tuple>
12 
13 namespace op
14 {
15  template<typename TDatums, typename TWorker = std::shared_ptr<Worker<TDatums>>, typename TQueue = Queue<TDatums>>
17  {
18  public:
19  // Completely customizable case
20  explicit ThreadManager(const ThreadManagerMode threadManagerMode = ThreadManagerMode::Synchronous);
21 
22  void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues = -1);
23 
24  void add(const unsigned long long threadId, const std::vector<TWorker>& tWorkers, const unsigned long long queueInId,
25  const unsigned long long queueOutId);
26 
27  void add(const unsigned long long threadId, const TWorker& tWorker, const unsigned long long queueInId,
28  const unsigned long long queueOutId);
29 
30  void reset();
31 
32  void exec();
33 
34  void start();
35 
36  void stop();
37 
38  inline std::shared_ptr<std::atomic<bool>> getIsRunningSharedPtr()
39  {
40  return spIsRunning;
41  }
42 
43  inline bool isRunning() const
44  {
45  return *spIsRunning;
46  }
47 
48  bool tryEmplace(TDatums& tDatums);
49 
50  bool waitAndEmplace(TDatums& tDatums);
51 
52  bool tryPush(const TDatums& tDatums);
53 
54  bool waitAndPush(const TDatums& tDatums);
55 
56  bool tryPop(TDatums& tDatums);
57 
58  bool waitAndPop(TDatums& tDatums);
59 
60  private:
61  const ThreadManagerMode mThreadManagerMode;
62  std::shared_ptr<std::atomic<bool>> spIsRunning;
63  long long mDefaultMaxSizeQueues;
64  std::multiset<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>> mThreadWorkerQueues;
65  std::vector<std::shared_ptr<Thread<TDatums, TWorker>>> mThreads;
66  std::vector<std::shared_ptr<TQueue>> mTQueues;
67 
68  void add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>>& threadWorkerQueues);
69 
70  void add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long, unsigned long long>>& threadWorkerQueues);
71 
72  void multisetToThreads();
73 
74  void checkAndCreateEmptyThreads();
75 
76  void checkAndCreateQueues();
77 
78  DELETE_COPY(ThreadManager);
79  };
80 }
81 
82 
83 
84 
85 
86 // Implementation
87 #include <utility> // std::pair
94 namespace op
95 {
96  template<typename TDatums, typename TWorker, typename TQueue>
98  mThreadManagerMode{threadManagerMode},
99  spIsRunning{std::make_shared<std::atomic<bool>>(false)},
100  mDefaultMaxSizeQueues{-1ll}
101  {
102  }
103 
104  template<typename TDatums, typename TWorker, typename TQueue>
105  void ThreadManager<TDatums, TWorker, TQueue>::setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues)
106  {
107  try
108  {
109  mDefaultMaxSizeQueues = {defaultMaxSizeQueues};
110  }
111  catch (const std::exception& e)
112  {
113  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
114  }
115  }
116 
117  template<typename TDatums, typename TWorker, typename TQueue>
118  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId,
119  const std::vector<TWorker>& tWorkers,
120  const unsigned long long queueInId,
121  const unsigned long long queueOutId)
122  {
123  try
124  {
125  add({std::make_tuple(threadId, tWorkers, queueInId, queueOutId)});
126  }
127  catch (const std::exception& e)
128  {
129  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
130  }
131  }
132 
133  template<typename TDatums, typename TWorker, typename TQueue>
134  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId,
135  const TWorker& tWorker,
136  const unsigned long long queueInId,
137  const unsigned long long queueOutId)
138  {
139  try
140  {
141  add({std::make_tuple(threadId, std::vector<TWorker>{tWorker}, queueInId, queueOutId)});
142  }
143  catch (const std::exception& e)
144  {
145  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
146  }
147  }
148 
149  template<typename TDatums, typename TWorker, typename TQueue>
151  {
152  try
153  {
154  mThreadWorkerQueues.clear();
155  mThreads.clear();
156  mTQueues.clear();
157  }
158  catch (const std::exception& e)
159  {
160  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
161  }
162  }
163 
164  template<typename TDatums, typename TWorker, typename TQueue>
166  {
167  try
168  {
169  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
170  // Set threads
171  multisetToThreads();
172  if (!mThreads.empty())
173  {
174  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
175  // Start threads
176  for (auto i = 0u; i < mThreads.size() - 1; i++)
177  mThreads.at(i)->startInThread();
178  (*mThreads.rbegin())->exec(spIsRunning);
179  // Stop threads - It will arrive here when the exec() command has finished
180  stop();
181  }
182  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
183  }
184  catch (const std::exception& e)
185  {
186  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
187  }
188  }
189 
190  template<typename TDatums, typename TWorker, typename TQueue>
192  {
193  try
194  {
195  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
196  // Set threads
197  multisetToThreads();
198  // Start threads
199  for (auto& thread : mThreads)
200  thread->startInThread();
201  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
202  }
203  catch (const std::exception& e)
204  {
205  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
206  }
207  }
208 
209  template<typename TDatums, typename TWorker, typename TQueue>
211  {
212  try
213  {
214  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
215  for (auto& tQueue : mTQueues)
216  tQueue->stop();
217  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
218  *spIsRunning = false;
219  for (auto& thread : mThreads)
220  thread->stopAndJoin();
221  log("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
222  }
223  catch (const std::exception& e)
224  {
225  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
226  }
227  }
228 
229  template<typename TDatums, typename TWorker, typename TQueue>
231  {
232  try
233  {
234  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
235  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
236  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
237  if (mTQueues.empty())
238  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
239  return mTQueues[0]->tryEmplace(tDatums);
240  }
241  catch (const std::exception& e)
242  {
243  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
244  return false;
245  }
246  }
247 
248  template<typename TDatums, typename TWorker, typename TQueue>
250  {
251  try
252  {
253  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
254  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
255  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
256  if (mTQueues.empty())
257  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
258  return mTQueues[0]->waitAndEmplace(tDatums);
259  }
260  catch (const std::exception& e)
261  {
262  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
263  return false;
264  }
265  }
266 
267  template<typename TDatums, typename TWorker, typename TQueue>
269  {
270  try
271  {
272  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
273  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
274  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
275  if (mTQueues.empty())
276  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
277  return mTQueues[0]->tryPush(tDatums);
278  }
279  catch (const std::exception& e)
280  {
281  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
282  return false;
283  }
284  }
285 
286  template<typename TDatums, typename TWorker, typename TQueue>
288  {
289  try
290  {
291  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
292  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
293  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
294  if (mTQueues.empty())
295  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
296  return mTQueues[0]->waitAndPush(tDatums);
297  }
298  catch (const std::exception& e)
299  {
300  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
301  return false;
302  }
303  }
304 
305  template<typename TDatums, typename TWorker, typename TQueue>
307  {
308  try
309  {
310  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
311  && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
312  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
313  if (mTQueues.empty())
314  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
315  return (*mTQueues.rbegin())->tryPop(tDatums);
316  }
317  catch (const std::exception& e)
318  {
319  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
320  return false;
321  }
322  }
323 
324  template<typename TDatums, typename TWorker, typename TQueue>
326  {
327  try
328  {
329  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
330  && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
331  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
332  if (mTQueues.empty())
333  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
334  return (*mTQueues.rbegin())->waitAndPop(tDatums);
335  }
336  catch (const std::exception& e)
337  {
338  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
339  return false;
340  }
341  }
342 
343  template<typename TDatums, typename TWorker, typename TQueue>
344  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>,
345  unsigned long long, unsigned long long>>& threadWorkerQueues)
346  {
347  try
348  {
349  for (const auto& threadWorkerQueue : threadWorkerQueues)
350  mThreadWorkerQueues.insert(threadWorkerQueue);
351  }
352  catch (const std::exception& e)
353  {
354  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
355  }
356  }
357 
358  template<typename TDatums, typename TWorker, typename TQueue>
359  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long,
360  unsigned long long>>& threadWorkerQueues)
361  {
362  try
363  {
364  for (const auto& threadWorkerQueue : threadWorkerQueues)
365  add({std::make_tuple(std::get<0>(threadWorkerQueue),
366  std::vector<TWorker>{std::get<1>(threadWorkerQueue)},
367  std::get<2>(threadWorkerQueue),
368  std::get<3>(threadWorkerQueue))});
369  }
370  catch (const std::exception& e)
371  {
372  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
373  }
374  }
375 
376  template<typename TDatums, typename TWorker, typename TQueue>
377  void ThreadManager<TDatums, TWorker, TQueue>::multisetToThreads()
378  {
379  try
380  {
381  if (!mThreadWorkerQueues.empty())
382  {
383  // Check threads
384  checkAndCreateEmptyThreads();
385 
386  // Check and create queues
387  checkAndCreateQueues();
388 
389  // Data
390  const auto maxQueueIdSynchronous = mTQueues.size()+1;
391 
392  // Set up threads
393  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
394  {
395  auto& thread = mThreads[std::get<0>(threadWorkerQueue)];
396  const auto& tWorkers = std::get<1>(threadWorkerQueue);
397  const auto queueIn = std::get<2>(threadWorkerQueue);
398  const auto queueOut = std::get<3>(threadWorkerQueue);
399  std::shared_ptr<SubThread<TDatums, TWorker>> subThread;
400  // If AsynchronousIn -> queue indexes are OK
401  if (mThreadManagerMode == ThreadManagerMode::Asynchronous
402  || mThreadManagerMode == ThreadManagerMode::AsynchronousIn)
403  {
404  if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn
405  && queueOut == mTQueues.size())
406  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
407  tWorkers, mTQueues.at(queueIn))};
408  else
409  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
410  tWorkers, mTQueues.at(queueIn), mTQueues.at(queueOut))};
411  }
412  // If !AsynchronousIn -> queue indexes - 1
413  else if (queueOut != maxQueueIdSynchronous
414  || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
415  {
416  // Queue in + out
417  if (queueIn != 0)
418  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
419  tWorkers, mTQueues.at(queueIn-1), mTQueues.at(queueOut-1))};
420  // Case queue out (first TWorker(s))
421  else
422  subThread = {std::make_shared<SubThreadQueueOut<TDatums, TWorker, TQueue>>(
423  tWorkers, mTQueues.at(queueOut-1))};
424  }
425  // Case queue in (last TWorker(s))
426  else if (queueIn != 0) // && queueOut == maxQueueIdSynchronous
427  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
428  tWorkers, mTQueues.at(queueIn-1))};
429  // Case no queue
430  else // if (queueIn == 0 && queueOut == maxQueueIdSynchronous)
431  subThread = {std::make_shared<SubThreadNoQueue<TDatums, TWorker>>(tWorkers)};
432  thread->add(subThread);
433  }
434  }
435  else
436  error("Empty, no TWorker(s) added.", __LINE__);
437  }
438  catch (const std::exception& e)
439  {
440  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
441  }
442  }
443 
444  template<typename TDatums, typename TWorker, typename TQueue>
445  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateEmptyThreads()
446  {
447  try
448  {
449  // Check all thread ids from 0-maxThreadId are present
450  const auto maxThreadId = std::get<0>(*mThreadWorkerQueues.crbegin());
451  auto previousThreadId = std::get<0>(*mThreadWorkerQueues.cbegin());
452  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
453  {
454  const auto currentThreadId = std::get<0>(threadWorkerQueue);
455  if (currentThreadId - previousThreadId > 1)
456  error("Missing thread id " + std::to_string(currentThreadId) + " of "
457  + std::to_string(maxThreadId) + ".", __LINE__, __FUNCTION__, __FILE__);
458  previousThreadId = currentThreadId;
459  }
460 
461  // Create Threads
462  // #threads = maxThreadId+1
463  mThreads.resize(maxThreadId);
464  for (auto& thread : mThreads)
465  thread = std::make_shared<Thread<TDatums, TWorker>>();
466  mThreads.emplace_back(std::make_shared<Thread<TDatums, TWorker>>(spIsRunning));
467  }
468  catch (const std::exception& e)
469  {
470  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
471  }
472  }
473 
474  template<typename TDatums, typename TWorker, typename TQueue>
475  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateQueues()
476  {
477  try
478  {
479  if (!mThreadWorkerQueues.empty())
480  {
481  // Get max queue id to get queue size
482  auto maxQueueId = std::get<3>(*mThreadWorkerQueues.cbegin());
483  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
484  maxQueueId = fastMax(
485  maxQueueId, fastMax(std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue)));
486 
487  // Check each queue id has at least a worker that uses it as input and another one as output.
488  // Special cases:
489  std::vector<std::pair<bool, bool>> usedQueueIds(maxQueueId+1, {false, false});
490  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
491  {
492  usedQueueIds.at(std::get<2>(threadWorkerQueue)).first = true;
493  usedQueueIds.at(std::get<3>(threadWorkerQueue)).second = true;
494  }
495  // Id 0 must only needs a worker using it as input.
496  usedQueueIds.begin()->second = true;
497  // Id maxQueueId only needs a worker using it as output.
498  usedQueueIds.rbegin()->first = true;
499  // Error if missing queue id
500  for (auto i = 0ull ; i < usedQueueIds.size() ; i++)
501  {
502  if (!usedQueueIds[i].first)
503  error("Missing queue id " + std::to_string(i) + " (of "
504  + std::to_string(maxQueueId) + ") as input.", __LINE__, __FUNCTION__, __FILE__);
505  if (!usedQueueIds[i].second)
506  error("Missing queue id " + std::to_string(i) + " (of "
507  + std::to_string(maxQueueId) + ") as output.", __LINE__, __FUNCTION__, __FILE__);
508  }
509 
510  // Create Queues
511  if (mThreadManagerMode == ThreadManagerMode::Asynchronous)
512  mTQueues.resize(maxQueueId+1); // First and last one are queues
513  else if (mThreadManagerMode == ThreadManagerMode::Synchronous)
514  mTQueues.resize(maxQueueId-1); // First and last one are not actually queues
515  else if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn
516  || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
517  mTQueues.resize(maxQueueId); // First or last one is queue
518  else
519  error("Unknown ThreadManagerMode", __LINE__, __FUNCTION__, __FILE__);
520  for (auto& tQueue : mTQueues)
521  tQueue = std::make_shared<TQueue>(mDefaultMaxSizeQueues);
522  }
523  }
524  catch (const std::exception& e)
525  {
526  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
527  }
528  }
529 
530  COMPILE_TEMPLATE_DATUM(ThreadManager);
531 }
532 
533 #endif // OPENPOSE_THREAD_THREAD_MANAGER_HPP
ThreadManager(const ThreadManagerMode threadManagerMode=ThreadManagerMode::Synchronous)
Definition: threadManager.hpp:97
void stop()
Definition: threadManager.hpp:210
bool isRunning() const
Definition: threadManager.hpp:43
void start()
Definition: threadManager.hpp:191
void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues=-1)
Definition: threadManager.hpp:105
bool waitAndEmplace(TDatums &tDatums)
Definition: threadManager.hpp:249
std::shared_ptr< std::atomic< bool > > getIsRunningSharedPtr()
Definition: threadManager.hpp:38
T fastMax(const T a, const T b)
Definition: fastMath.hpp:70
bool tryPop(TDatums &tDatums)
Definition: threadManager.hpp:306
void add(const unsigned long long threadId, const std::vector< TWorker > &tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId)
Definition: threadManager.hpp:118
void reset()
Definition: threadManager.hpp:150
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
bool waitAndPop(TDatums &tDatums)
Definition: threadManager.hpp:325
bool tryEmplace(TDatums &tDatums)
Definition: threadManager.hpp:230
Definition: threadManager.hpp:16
void exec()
Definition: threadManager.hpp:165
OP_API void log(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
ThreadManagerMode
Definition: enumClasses.hpp:9
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
std::vector< T, Alloc > vector
Definition: cl2.hpp:567
bool waitAndPush(const TDatums &tDatums)
Definition: threadManager.hpp:287
bool tryPush(const TDatums &tDatums)
Definition: threadManager.hpp:268