wQueueOrderer.hpp 4.5 KB
Newer Older
G
gineshidalgo99 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
#ifndef OPENPOSE__THREAD__W_QUEUE_ORDERER_HPP
#define OPENPOSE__THREAD__W_QUEUE_ORDERER_HPP

#include <queue> // std::priority_queue
#include "worker.hpp"
#include "../utilities/pointerContainer.hpp"

namespace op
{
    template<typename TDatums>
    class WQueueOrderer : public Worker<TDatums>
    {
    public:
        explicit WQueueOrderer(const int maxBufferSize = 64);

        void initializationOnThread();

        void work(TDatums& tDatums);

        void tryStop();

    private:
        const int mMaxBufferSize;
        bool mStopWhenEmpty;
        unsigned long long mNextExpectedId;
        std::priority_queue<TDatums, std::vector<TDatums>, PointerContainerGreater<TDatums>> mPriorityQueueBuffer;

        DELETE_COPY(WQueueOrderer);
    };
}





// Implementation
#include <chrono>
#include <thread>
#include "../utilities/errorAndLog.hpp"
#include "../utilities/macros.hpp"
#include "../utilities/profiler.hpp"
namespace op
{
    template<typename TDatums>
    WQueueOrderer<TDatums>::WQueueOrderer(const int maxBufferSize) :
        mMaxBufferSize{maxBufferSize},
        mStopWhenEmpty{false},
        mNextExpectedId{0}
    {
    }

    template<typename TDatums>
    void WQueueOrderer<TDatums>::initializationOnThread()
    {
    }

    template<typename TDatums>
    void WQueueOrderer<TDatums>::work(TDatums& tDatums)
    {
        try
        {
            // Profiling speed
            const auto profilerKey = Profiler::timerInit(__LINE__, __FUNCTION__, __FILE__);
            bool profileSpeed = (tDatums != nullptr);
            // Input TDatum -> enqueue or return it back
            if (checkNoNullNorEmpty(tDatums))
            {
                // T* to T
                auto& tDatumsNoPtr = *tDatums;
                // tDatums is the next expected, update counter
                if (tDatumsNoPtr[0].id == mNextExpectedId)
                    mNextExpectedId++;
                // Else push it to our buffered queue
                else
                {
                    // Enqueue current tDatums
                    mPriorityQueueBuffer.emplace(tDatums);
                    tDatums = nullptr;
                    // Else if buffer full -> remove one tDatums
                    if (mPriorityQueueBuffer.size() > mMaxBufferSize)
                    {
                        tDatums = mPriorityQueueBuffer.top();
                        mPriorityQueueBuffer.pop();
                    }
                }
            }
            // If input TDatum enqueued -> check if previously enqueued next desired frame and pop it
            if (!checkNoNullNorEmpty(tDatums))
            {
                // Retrieve frame if next is desired frame or if we want to stop this worker
                if (!mPriorityQueueBuffer.empty()   &&   ((*mPriorityQueueBuffer.top())[0].id == mNextExpectedId || mStopWhenEmpty))
                {
                    tDatums = { mPriorityQueueBuffer.top() };
                    mPriorityQueueBuffer.pop();
                }
            }
            // If TDatum ready to be returned -> updated next expected id
            if (checkNoNullNorEmpty(tDatums))
            {
                const auto& tDatumsNoPtr = *tDatums;
                mNextExpectedId = tDatumsNoPtr[0].id + 1;
            }
            // Sleep if no new tDatums to either pop
            if (!checkNoNullNorEmpty(tDatums) && mPriorityQueueBuffer.size() < mMaxBufferSize / 2)
                std::this_thread::sleep_for(std::chrono::milliseconds{1});
            // If TDatum popped and/or pushed
            if (profileSpeed || tDatums != nullptr)
            {
                // Profiling speed
                Profiler::timerEnd(profilerKey);
G
gineshidalgo99 已提交
111
                Profiler::printAveragedTimeMsOnIterationX(profilerKey, __LINE__, __FUNCTION__, __FILE__, Profiler::DEFAULT_X);
G
gineshidalgo99 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
                // Debugging log
                dLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
            }
        }
        catch (const std::exception& e)
        {
            this->stop();
            tDatums = nullptr;
            error(e.what(), __LINE__, __FUNCTION__, __FILE__);
        }
    }

    template<typename TDatums>
    void WQueueOrderer<TDatums>::tryStop()
    {
        try
        {
            // Close if all frames were retrieved from the queue
            if (mPriorityQueueBuffer.empty())
                this->stop();
            mStopWhenEmpty = true;

        }
        catch (const std::exception& e)
        {
            error(e.what(), __LINE__, __FUNCTION__, __FILE__);
        }
    }

    COMPILE_TEMPLATE_DATUM(WQueueOrderer);
}

#endif // OPENPOSE__THREAD__W_QUEUE_ORDERER_HPP