OpenPose  1.0.0rc2
OpenPose: A Real-Time Multi-Person Key-Point Detection And Multi-Threading C++ Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
wQueueOrderer.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
2 #define OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
3 
4 #include <queue> // std::priority_queue
8 
9 namespace op
10 {
11  template<typename TDatums>
12  class WQueueOrderer : public Worker<TDatums>
13  {
14  public:
15  explicit WQueueOrderer(const unsigned int maxBufferSize = 64u);
16 
18 
19  void work(TDatums& tDatums);
20 
21  void tryStop();
22 
23  private:
24  const unsigned int mMaxBufferSize;
25  bool mStopWhenEmpty;
26  unsigned long long mNextExpectedId;
27  unsigned long long mNextExpectedSubId;
28  std::priority_queue<TDatums, std::vector<TDatums>, PointerContainerGreater<TDatums>> mPriorityQueueBuffer;
29 
30  DELETE_COPY(WQueueOrderer);
31  };
32 }
33 
34 
35 
36 
37 
38 // Implementation
39 #include <chrono>
40 #include <thread>
41 namespace op
42 {
43  template<typename TDatums>
44  WQueueOrderer<TDatums>::WQueueOrderer(const unsigned int maxBufferSize) :
45  mMaxBufferSize{maxBufferSize},
46  mStopWhenEmpty{false},
47  mNextExpectedId{0},
48  mNextExpectedSubId{0}
49  {
50  }
51 
52  template<typename TDatums>
54  {
55  }
56 
57  template<typename TDatums>
58  void WQueueOrderer<TDatums>::work(TDatums& tDatums)
59  {
60  try
61  {
62  // Profiling speed
63  const auto profilerKey = Profiler::timerInit(__LINE__, __FUNCTION__, __FILE__);
64  bool profileSpeed = (tDatums != nullptr);
65  // Input TDatum -> enqueue or return it back
66  if (checkNoNullNorEmpty(tDatums))
67  {
68  // T* to T
69  auto& tDatumsNoPtr = *tDatums;
70  // tDatums is the next expected, update counter
71  if (tDatumsNoPtr[0].id == mNextExpectedId && tDatumsNoPtr[0].subId == mNextExpectedSubId)
72  {
73  // If single-view
74  if (tDatumsNoPtr[0].subIdMax == 0)
75  mNextExpectedId++;
76  // If muilti-view system
77  else
78  {
79  mNextExpectedSubId++;
80  if (mNextExpectedSubId > tDatumsNoPtr[0].subIdMax)
81  {
82  mNextExpectedSubId = 0;
83  mNextExpectedId++;
84  }
85  }
86  }
87  // Else push it to our buffered queue
88  else
89  {
90  // Enqueue current tDatums
91  mPriorityQueueBuffer.emplace(tDatums);
92  tDatums = nullptr;
93  // Else if buffer full -> remove one tDatums
94  if (mPriorityQueueBuffer.size() > mMaxBufferSize)
95  {
96  tDatums = mPriorityQueueBuffer.top();
97  mPriorityQueueBuffer.pop();
98  }
99  }
100  }
101  // If input TDatum enqueued -> check if previously enqueued next desired frame and pop it
102  if (!checkNoNullNorEmpty(tDatums))
103  {
104  // Retrieve frame if next is desired frame or if we want to stop this worker
105  if (!mPriorityQueueBuffer.empty()
106  && (mStopWhenEmpty ||
107  ((*mPriorityQueueBuffer.top())[0].id == mNextExpectedId
108  && (*mPriorityQueueBuffer.top())[0].subId == mNextExpectedSubId)))
109  {
110  tDatums = { mPriorityQueueBuffer.top() };
111  mPriorityQueueBuffer.pop();
112  }
113  }
114  // If TDatum ready to be returned -> updated next expected id
115  if (checkNoNullNorEmpty(tDatums))
116  {
117  const auto& tDatumsNoPtr = *tDatums;
118  // If single-view
119  if (tDatumsNoPtr[0].subIdMax == 0)
120  mNextExpectedId = tDatumsNoPtr[0].id + 1;
121  // If muilti-view system
122  else
123  {
124  mNextExpectedSubId = tDatumsNoPtr[0].subId + 1;
125  if (mNextExpectedSubId > tDatumsNoPtr[0].subIdMax)
126  {
127  mNextExpectedSubId = 0;
128  mNextExpectedId = tDatumsNoPtr[0].id + 1;
129  }
130  }
131  }
132  // Sleep if no new tDatums to either pop or push
133  if (!checkNoNullNorEmpty(tDatums) && mPriorityQueueBuffer.size() < mMaxBufferSize / 2u)
134  std::this_thread::sleep_for(std::chrono::milliseconds{1});
135  // If TDatum popped and/or pushed
136  if (profileSpeed || tDatums != nullptr)
137  {
138  // Profiling speed
139  Profiler::timerEnd(profilerKey);
140  Profiler::printAveragedTimeMsOnIterationX(profilerKey, __LINE__, __FUNCTION__, __FILE__);
141  // Debugging log
142  dLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
143  }
144  }
145  catch (const std::exception& e)
146  {
147  this->stop();
148  tDatums = nullptr;
149  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
150  }
151  }
152 
153  template<typename TDatums>
155  {
156  try
157  {
158  // Close if all frames were retrieved from the queue
159  if (mPriorityQueueBuffer.empty())
160  this->stop();
161  mStopWhenEmpty = true;
162 
163  }
164  catch (const std::exception& e)
165  {
166  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
167  }
168  }
169 
171 }
172 
173 #endif // OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
void work(TDatums &tDatums)
Definition: wQueueOrderer.hpp:58
void initializationOnThread()
Definition: wQueueOrderer.hpp:53
Definition: worker.hpp:9
Definition: pointerContainer.hpp:13
static const std::string timerInit(const int line, const std::string &function, const std::string &file)
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
Definition: wQueueOrderer.hpp:12
void tryStop()
Definition: wQueueOrderer.hpp:154
void dLog(const T &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
Definition: errorAndLog.hpp:53
bool checkNoNullNorEmpty(const TPointerContainer &tPointerContainer)
Definition: pointerContainer.hpp:7
WQueueOrderer(const unsigned int maxBufferSize=64u)
Definition: wQueueOrderer.hpp:44
static void printAveragedTimeMsOnIterationX(const std::string &key, const int line, const std::string &function, const std::string &file, const unsigned long long x=DEFAULT_X)
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
static void timerEnd(const std::string &key)