diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index 20a73a55951063ea7626c20805bfab6b4a7baef6..79ae30ae57f4a5e1f3a9ae780b4a5e05361a15dd 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -180,6 +180,8 @@ set(gapi_srcs src/streaming/onevpl/engine/processing_engine_base.cpp src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp src/streaming/onevpl/engine/decode/decode_session.cpp + src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp + src/streaming/onevpl/data_provider_dispatcher.cpp src/streaming/onevpl/cfg_param_device_selector.cpp src/streaming/onevpl/device_selector_interface.cpp diff --git a/modules/gapi/include/opencv2/gapi/streaming/onevpl/data_provider_interface.hpp b/modules/gapi/include/opencv2/gapi/streaming/onevpl/data_provider_interface.hpp index ac3444757d3858ca7b638de9342f16e076798d4b..c70e3db0ac67f261b0b644f8726c855a9f88be1a 100644 --- a/modules/gapi/include/opencv2/gapi/streaming/onevpl/data_provider_interface.hpp +++ b/modules/gapi/include/opencv2/gapi/streaming/onevpl/data_provider_interface.hpp @@ -7,28 +7,39 @@ #ifndef GAPI_STREAMING_ONEVPL_ONEVPL_DATA_PROVIDER_INTERFACE_HPP #define GAPI_STREAMING_ONEVPL_ONEVPL_DATA_PROVIDER_INTERFACE_HPP #include +#include #include #include // GAPI_EXPORTS - namespace cv { namespace gapi { namespace wip { namespace onevpl { struct GAPI_EXPORTS DataProviderException : public std::exception { - virtual ~DataProviderException() {} -}; + DataProviderException(const std::string& descr); + DataProviderException(std::string&& descr); -struct GAPI_EXPORTS DataProviderSystemErrorException : public DataProviderException { - DataProviderSystemErrorException(int error_code, const std::string& desription = std::string()); - virtual ~DataProviderSystemErrorException(); + virtual ~DataProviderException() = default; virtual const char* what() const noexcept override; - private: std::string reason; }; +struct GAPI_EXPORTS DataProviderSystemErrorException final : public DataProviderException { + DataProviderSystemErrorException(int error_code, const std::string& desription = std::string()); + ~DataProviderSystemErrorException() = default; +}; + +struct GAPI_EXPORTS DataProviderUnsupportedException final : public DataProviderException { + DataProviderUnsupportedException(const std::string& description); + ~DataProviderUnsupportedException() = default; +}; + +struct GAPI_EXPORTS DataProviderImplementationException : public DataProviderException { + DataProviderImplementationException(const std::string& description); + ~DataProviderImplementationException() = default; +}; /** * @brief Public interface allows to customize extraction of video stream data * used by onevpl::GSource instead of reading stream from file (by default). @@ -41,21 +52,41 @@ private: */ struct GAPI_EXPORTS IDataProvider { using Ptr = std::shared_ptr; + using mfx_codec_id_type = uint32_t; + + /** + * NB: here is supposed to be forward declaration of mfxBitstream + * But according to current oneVPL implementation it is impossible to forward + * declare untagged struct mfxBitstream. + * + * IDataProvider makes sense only for HAVE_VPL is ON and to keep IDataProvider + * interface API/ABI compliant between core library and user application layer + * let's introduce wrapper mfx_bitstream which inherits mfxBitstream in private + * G-API code section and declare forward for wrapper mfx_bitstream here + */ + struct mfx_bitstream; - virtual ~IDataProvider() {} + virtual ~IDataProvider() = default; + + /** + * The function is used by onevpl::GSource to extract codec id from data + * + */ + virtual mfx_codec_id_type get_mfx_codec_id() const = 0; /** * The function is used by onevpl::GSource to extract binary data stream from @ref IDataProvider * implementation. * * It MUST throw `DataProviderException` kind exceptions in fail cases. - * It MUST return 0 in EOF which considered as not-fail case. + * It MUST return MFX_ERR_MORE_DATA in EOF which considered as not-fail case. * - * @param out_data_bytes_size the available capacity of out_data buffer. - * @param out_data the output consumer buffer with capacity out_data_bytes_size. - * @return fetched bytes count. + * @param in_out_bitsream the input-output reference on MFX bitstream buffer which MUST be empty at the first request + * to allow implementation to allocate it by itself and to return back. Subsequent invocation of `fetch_bitstream_data` + * MUST use the previously used in_out_bitsream to avoid skipping rest of frames which haven't been consumed + * @return true for fetched data, false on EOF and throws exception on error */ - virtual size_t fetch_data(size_t out_data_bytes_size, void* out_data) = 0; + virtual bool fetch_bitstream_data(std::shared_ptr &in_out_bitsream) = 0; /** * The function is used by onevpl::GSource to check more binary data availability. diff --git a/modules/gapi/src/streaming/onevpl/cfg_param_device_selector.cpp b/modules/gapi/src/streaming/onevpl/cfg_param_device_selector.cpp index 0bdec7098603d24fe4945f9a315135a4635a5499..64dc34329b027f7e6546fd9beda141fb22ea0395 100644 --- a/modules/gapi/src/streaming/onevpl/cfg_param_device_selector.cpp +++ b/modules/gapi/src/streaming/onevpl/cfg_param_device_selector.cpp @@ -10,6 +10,7 @@ #include #include "streaming/onevpl/cfg_param_device_selector.hpp" +#include "streaming/onevpl/cfg_params_parser.hpp" #include "streaming/onevpl/utils.hpp" #include "logger.hpp" @@ -37,45 +38,6 @@ namespace gapi { namespace wip { namespace onevpl { -// TODO Will be changed on generic function from `onevpl_param_parser` as soons as feature merges -static mfxVariant cfg_param_to_mfx_variant(const CfgParam& accel_param) { - mfxVariant ret; - const CfgParam::value_t& accel_val = accel_param.get_value(); - if (!cv::util::holds_alternative(accel_val)) { - // expected string or uint32_t as value - if (!cv::util::holds_alternative(accel_val)) { - throw std::logic_error("Incorrect value type of \"mfxImplDescription.AccelerationMode\" " - " std::string is expected" ); - } - ret.Type = MFX_VARIANT_TYPE_U32; - ret.Data.U32 = cv::util::get(accel_val); - return ret; - } - - const std::string& accel_val_str = cv::util::get(accel_val); - ret.Type = MFX_VARIANT_TYPE_U32; - if (accel_val_str == "MFX_ACCEL_MODE_NA") { - ret.Data.U32 = MFX_ACCEL_MODE_NA; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_D3D9") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_D3D9; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_D3D11") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_D3D11; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_DRM_MODESET") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_DRM_MODESET; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_GLX") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_GLX; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_X11") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_X11; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_WAYLAND") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_WAYLAND; - } else if (accel_val_str == "MFX_ACCEL_MODE_VIA_HDDLUNITE") { - ret.Data.U32 = MFX_ACCEL_MODE_VIA_HDDLUNITE; - } - return ret; -} - CfgParamDeviceSelector::CfgParamDeviceSelector(const CfgParams& cfg_params) : suggested_device(IDeviceSelector::create(nullptr, "CPU", AccelType::HOST)), suggested_context(IDeviceSelector::create(nullptr, AccelType::HOST)) { diff --git a/modules/gapi/src/streaming/onevpl/data_provider_defines.hpp b/modules/gapi/src/streaming/onevpl/data_provider_defines.hpp new file mode 100644 index 0000000000000000000000000000000000000000..d31bece9feeec8af0951092ff9d0e39c878a998a --- /dev/null +++ b/modules/gapi/src/streaming/onevpl/data_provider_defines.hpp @@ -0,0 +1,32 @@ +#ifndef GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DEFINES_HPP +#define GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DEFINES_HPP + +#ifdef HAVE_ONEVPL +#include +#include +#endif // HAVE_ONEVPL + +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { + +#ifdef HAVE_ONEVPL +struct IDataProvider::mfx_bitstream : public mfxBitstream {}; +#else // HAVE_ONEVPL +struct IDataProvider::mfx_bitstream { + mfx_bitstream() { + GAPI_Assert(false && "Reject to create `mfxBitstream` because library compiled without VPL/MFX support"); + } +}; +#endif // HAVE_ONEVPL + +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DEFINES_HPP diff --git a/modules/gapi/src/streaming/onevpl/data_provider_dispatcher.cpp b/modules/gapi/src/streaming/onevpl/data_provider_dispatcher.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bc712eb1a1bbda0d2d24267e3294f3b43689a8f4 --- /dev/null +++ b/modules/gapi/src/streaming/onevpl/data_provider_dispatcher.cpp @@ -0,0 +1,68 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifdef HAVE_ONEVPL + +#include "streaming/onevpl/data_provider_dispatcher.hpp" +#include "streaming/onevpl/file_data_provider.hpp" +#include "streaming/onevpl/demux/async_mfp_demux_data_provider.hpp" +#include "logger.hpp" + +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { + +IDataProvider::Ptr DataProviderDispatcher::create(const std::string& file_path, + const std::vector &cfg_params) { + GAPI_LOG_INFO(nullptr, "try select suitable IDataProvider for source: " << + file_path); + + IDataProvider::Ptr provider; + + // Look-up CodecId from input params + // If set then raw data provider is preferred + GAPI_LOG_DEBUG(nullptr, "try find explicit cfg param\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\""); + auto codec_it = + std::find_if(cfg_params.begin(), cfg_params.end(), [] (const CfgParam& value) { + return value.get_name() == "mfxImplDescription.mfxDecoderDescription.decoder.CodecID"; + }); + if (codec_it != cfg_params.end()) { + GAPI_LOG_DEBUG(nullptr, "Dispatcher found \"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\"" + " so try on raw data provider at first"); + + try { + provider = std::make_shared(file_path, cfg_params); + GAPI_LOG_INFO(nullptr, "raw data provider created"); + } catch (const DataProviderUnsupportedException& ex) { + GAPI_LOG_INFO(nullptr, "raw data provider creation is failed, reason: " << + ex.what()); + } + } + + if (!provider) { + GAPI_LOG_DEBUG(nullptr, "Try on MFP data provider"); + try { + provider = std::make_shared(file_path); + GAPI_LOG_INFO(nullptr, "MFP data provider created"); + } catch (const DataProviderUnsupportedException& ex) { + GAPI_LOG_INFO(nullptr, "MFP data provider creation is failed, reason: " << + ex.what()); + } + } + + // final check + if (!provider) { + GAPI_LOG_WARNING(nullptr, "Cannot find suitable data provider"); + throw DataProviderUnsupportedException("Unsupported source or configuration parameters");; + } + return provider; +} +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_ONEVPL diff --git a/modules/gapi/src/streaming/onevpl/data_provider_dispatcher.hpp b/modules/gapi/src/streaming/onevpl/data_provider_dispatcher.hpp new file mode 100644 index 0000000000000000000000000000000000000000..01e24a1f4c809e3a62c34eb98c5863eb5add0867 --- /dev/null +++ b/modules/gapi/src/streaming/onevpl/data_provider_dispatcher.hpp @@ -0,0 +1,29 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DISPATCHER_HPP +#define GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DISPATCHER_HPP + +#ifdef HAVE_ONEVPL +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { + +struct GAPI_EXPORTS DataProviderDispatcher { + + static IDataProvider::Ptr create(const std::string& file_path, + const std::vector &codec_params = {}); +}; +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_ONEVPL +#endif // GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DISPATCHER_HPP diff --git a/modules/gapi/src/streaming/onevpl/data_provider_interface_exception.cpp b/modules/gapi/src/streaming/onevpl/data_provider_interface_exception.cpp index feaae7a727e453e1a37a0930d51d3d72d3787f29..f30ab1cfadfef13f63f01103eb92dd1f8018fa9e 100644 --- a/modules/gapi/src/streaming/onevpl/data_provider_interface_exception.cpp +++ b/modules/gapi/src/streaming/onevpl/data_provider_interface_exception.cpp @@ -1,3 +1,14 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifdef HAVE_ONEVPL +#include +#include +#endif // HAVE_ONEVPL + #include #include @@ -7,15 +18,31 @@ namespace cv { namespace gapi { namespace wip { namespace onevpl { -DataProviderSystemErrorException::DataProviderSystemErrorException(int error_code, const std::string& desription) { - reason = desription + ", error: " + std::to_string(error_code) + ", desctiption: " + strerror(error_code); -} -DataProviderSystemErrorException::~DataProviderSystemErrorException() = default; +DataProviderException::DataProviderException(const std::string& descr) : + reason(descr) { +} +DataProviderException::DataProviderException(std::string&& descr) : + reason(std::move(descr)) { +} -const char* DataProviderSystemErrorException::what() const noexcept { +const char* DataProviderException::what() const noexcept { return reason.c_str(); } + +DataProviderSystemErrorException::DataProviderSystemErrorException(int error_code, + const std::string& description) : + DataProviderException(description + ", error code: " + std::to_string(error_code) + " - " + strerror(error_code)) { + +} + +DataProviderUnsupportedException::DataProviderUnsupportedException(const std::string& description) : + DataProviderException(description) { +} + +DataProviderImplementationException::DataProviderImplementationException(const std::string& description) : + DataProviderException(description) { +} } // namespace onevpl } // namespace wip } // namespace gapi diff --git a/modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp b/modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp new file mode 100644 index 0000000000000000000000000000000000000000..85d7d1aaf01d0635c27f703f9ceb7569aadc9207 --- /dev/null +++ b/modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp @@ -0,0 +1,820 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation +#ifdef HAVE_ONEVPL +#include +#ifdef _WIN32 + +#pragma comment(lib, "Mf.lib") +#pragma comment(lib, "Mfuuid.lib") +#pragma comment(lib, "Mfplat.lib") +#pragma comment(lib, "shlwapi.lib") +#pragma comment(lib, "mfreadwrite.lib") +#endif // _WIN32 + +#include "streaming/onevpl/demux/async_mfp_demux_data_provider.hpp" +#include "logger.hpp" + +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { +#ifdef _WIN32 +static HRESULT create_media_source(const std::string& url, IMFMediaSource **ppSource) { + wchar_t sURL[MAX_PATH]; + GAPI_Assert(url.size() < MAX_PATH && "Windows MAX_PATH limit was reached"); + size_t ret_url_length = 0; + mbstowcs_s(&ret_url_length, sURL, url.data(), url.size()); + + HRESULT hr = S_OK; + ComPtrGuard source_resolver = createCOMPtrGuard(); + { + IMFSourceResolver *source_resolver_tmp = nullptr; + hr = MFCreateSourceResolver(&source_resolver_tmp); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), + "cannot create MFCreateSourceResolver from URI: " + + url); + } + source_resolver.reset(source_resolver_tmp); + } + + MF_OBJECT_TYPE ObjectType = MF_OBJECT_INVALID; + /** + * NB: + * CreateObjectFromURL throws exception if actual container type is mismatched with + * file extension. To overcome this situation by MFP it is possible to apply 2 step + * approach: at first step we pass special flag + * `MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL` which claims to fail with error + * in any case of input instead exception throwing; + * at the second step we must cease `MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL` + * flag AND set another special flag + * `MF_RESOLUTION_CONTENT_DOES_NOT_HAVE_TO_MATCH_EXTENSION_OR_MIME_TYPE` + * to filter out container type & file extension mismatch errors. + * + * If it failed at second phase then some other errors were not related + * to types-extension disturbance would happen and data provider must fail ultimately. + * + * If second step passed then data provider would continue execution + */ + IUnknown *source_unknown_tmp = nullptr; + DWORD resolver_flags = MF_RESOLUTION_MEDIASOURCE | MF_RESOLUTION_READ | + MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL; + hr = source_resolver->CreateObjectFromURL(sURL, + resolver_flags, + nullptr, &ObjectType, + &source_unknown_tmp); + if (FAILED(hr)) { + GAPI_LOG_DEBUG(nullptr, "Cannot create MF_RESOLUTION_MEDIASOURCE using file extension, " + " looks like actual media container type doesn't match to file extension. " + "Try special mode"); + resolver_flags ^= MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL; + resolver_flags ^= MF_RESOLUTION_CONTENT_DOES_NOT_HAVE_TO_MATCH_EXTENSION_OR_MIME_TYPE; + hr = source_resolver->CreateObjectFromURL(sURL, resolver_flags, + nullptr, &ObjectType, + &source_unknown_tmp); + if (FAILED(hr)) { + GAPI_LOG_WARNING(nullptr, "Cannot create MF_RESOLUTION_MEDIASOURCE from URI: " << + url << ". Abort"); + throw DataProviderSystemErrorException(HRESULT_CODE(hr), + "CreateObjectFromURL failed"); + } + } + + ComPtrGuard source_unknown = createCOMPtrGuard(source_unknown_tmp); + hr = source_unknown->QueryInterface(__uuidof(IMFMediaSource), (void**)ppSource); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), + "QueryInterface for IMFMediaSource failed"); + } + + return hr; +} + +/* + * The next part of converting GUID into string function GetGUIDNameConst + * was copied and modified from + * https://docs.microsoft.com/en-us/windows/win32/medfound/media-type-debugging-code + */ +#ifndef IF_EQUAL_RETURN +#define IF_EQUAL_RETURN(param, val) if(val == param) return #val +#endif + +static const char* GetGUIDNameConst(const GUID& guid) +{ + IF_EQUAL_RETURN(guid, MF_MT_MAJOR_TYPE); + IF_EQUAL_RETURN(guid, MF_MT_MAJOR_TYPE); + IF_EQUAL_RETURN(guid, MF_MT_SUBTYPE); + IF_EQUAL_RETURN(guid, MF_MT_ALL_SAMPLES_INDEPENDENT); + IF_EQUAL_RETURN(guid, MF_MT_FIXED_SIZE_SAMPLES); + IF_EQUAL_RETURN(guid, MF_MT_COMPRESSED); + IF_EQUAL_RETURN(guid, MF_MT_SAMPLE_SIZE); + IF_EQUAL_RETURN(guid, MF_MT_WRAPPED_TYPE); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_NUM_CHANNELS); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_SAMPLES_PER_SECOND); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_FLOAT_SAMPLES_PER_SECOND); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_AVG_BYTES_PER_SECOND); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_BLOCK_ALIGNMENT); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_BITS_PER_SAMPLE); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_VALID_BITS_PER_SAMPLE); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_SAMPLES_PER_BLOCK); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_CHANNEL_MASK); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_FOLDDOWN_MATRIX); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_PEAKREF); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_PEAKTARGET); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_AVGREF); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_AVGTARGET); + IF_EQUAL_RETURN(guid, MF_MT_AUDIO_PREFER_WAVEFORMATEX); + IF_EQUAL_RETURN(guid, MF_MT_AAC_PAYLOAD_TYPE); + IF_EQUAL_RETURN(guid, MF_MT_AAC_AUDIO_PROFILE_LEVEL_INDICATION); + IF_EQUAL_RETURN(guid, MF_MT_FRAME_SIZE); + IF_EQUAL_RETURN(guid, MF_MT_FRAME_RATE); + IF_EQUAL_RETURN(guid, MF_MT_FRAME_RATE_RANGE_MAX); + IF_EQUAL_RETURN(guid, MF_MT_FRAME_RATE_RANGE_MIN); + IF_EQUAL_RETURN(guid, MF_MT_PIXEL_ASPECT_RATIO); + IF_EQUAL_RETURN(guid, MF_MT_DRM_FLAGS); + IF_EQUAL_RETURN(guid, MF_MT_PAD_CONTROL_FLAGS); + IF_EQUAL_RETURN(guid, MF_MT_SOURCE_CONTENT_HINT); + IF_EQUAL_RETURN(guid, MF_MT_VIDEO_CHROMA_SITING); + IF_EQUAL_RETURN(guid, MF_MT_INTERLACE_MODE); + IF_EQUAL_RETURN(guid, MF_MT_TRANSFER_FUNCTION); + IF_EQUAL_RETURN(guid, MF_MT_VIDEO_PRIMARIES); + IF_EQUAL_RETURN(guid, MF_MT_CUSTOM_VIDEO_PRIMARIES); + IF_EQUAL_RETURN(guid, MF_MT_YUV_MATRIX); + IF_EQUAL_RETURN(guid, MF_MT_VIDEO_LIGHTING); + IF_EQUAL_RETURN(guid, MF_MT_VIDEO_NOMINAL_RANGE); + IF_EQUAL_RETURN(guid, MF_MT_GEOMETRIC_APERTURE); + IF_EQUAL_RETURN(guid, MF_MT_MINIMUM_DISPLAY_APERTURE); + IF_EQUAL_RETURN(guid, MF_MT_PAN_SCAN_APERTURE); + IF_EQUAL_RETURN(guid, MF_MT_PAN_SCAN_ENABLED); + IF_EQUAL_RETURN(guid, MF_MT_AVG_BITRATE); + IF_EQUAL_RETURN(guid, MF_MT_AVG_BIT_ERROR_RATE); + IF_EQUAL_RETURN(guid, MF_MT_MAX_KEYFRAME_SPACING); + IF_EQUAL_RETURN(guid, MF_MT_DEFAULT_STRIDE); + IF_EQUAL_RETURN(guid, MF_MT_PALETTE); + IF_EQUAL_RETURN(guid, MF_MT_USER_DATA); + IF_EQUAL_RETURN(guid, MF_MT_AM_FORMAT_TYPE); + IF_EQUAL_RETURN(guid, MF_MT_MPEG_START_TIME_CODE); + IF_EQUAL_RETURN(guid, MF_MT_MPEG2_PROFILE); + IF_EQUAL_RETURN(guid, MF_MT_MPEG2_LEVEL); + IF_EQUAL_RETURN(guid, MF_MT_MPEG2_FLAGS); + IF_EQUAL_RETURN(guid, MF_MT_MPEG_SEQUENCE_HEADER); + IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_SRC_PACK_0); + IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_CTRL_PACK_0); + IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_SRC_PACK_1); + IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_CTRL_PACK_1); + IF_EQUAL_RETURN(guid, MF_MT_DV_VAUX_SRC_PACK); + IF_EQUAL_RETURN(guid, MF_MT_DV_VAUX_CTRL_PACK); + IF_EQUAL_RETURN(guid, MF_MT_ARBITRARY_HEADER); + IF_EQUAL_RETURN(guid, MF_MT_ARBITRARY_FORMAT); + IF_EQUAL_RETURN(guid, MF_MT_IMAGE_LOSS_TOLERANT); + IF_EQUAL_RETURN(guid, MF_MT_MPEG4_SAMPLE_DESCRIPTION); + IF_EQUAL_RETURN(guid, MF_MT_MPEG4_CURRENT_SAMPLE_ENTRY); + IF_EQUAL_RETURN(guid, MF_MT_ORIGINAL_4CC); + IF_EQUAL_RETURN(guid, MF_MT_ORIGINAL_WAVE_FORMAT_TAG); + + // Media types + + IF_EQUAL_RETURN(guid, MFMediaType_Audio); + IF_EQUAL_RETURN(guid, MFMediaType_Video); + IF_EQUAL_RETURN(guid, MFMediaType_Protected); + IF_EQUAL_RETURN(guid, MFMediaType_SAMI); + IF_EQUAL_RETURN(guid, MFMediaType_Script); + IF_EQUAL_RETURN(guid, MFMediaType_Image); + IF_EQUAL_RETURN(guid, MFMediaType_HTML); + IF_EQUAL_RETURN(guid, MFMediaType_Binary); + IF_EQUAL_RETURN(guid, MFMediaType_FileTransfer); + + IF_EQUAL_RETURN(guid, MFVideoFormat_AI44); // FCC('AI44') + IF_EQUAL_RETURN(guid, MFVideoFormat_ARGB32); // D3DFMT_A8R8G8B8 + IF_EQUAL_RETURN(guid, MFVideoFormat_AV1); + IF_EQUAL_RETURN(guid, MFVideoFormat_AYUV); // FCC('AYUV') + IF_EQUAL_RETURN(guid, MFVideoFormat_DV25); // FCC('dv25') + IF_EQUAL_RETURN(guid, MFVideoFormat_DV50); // FCC('dv50') + IF_EQUAL_RETURN(guid, MFVideoFormat_DVH1); // FCC('dvh1') + IF_EQUAL_RETURN(guid, MFVideoFormat_DVSD); // FCC('dvsd') + IF_EQUAL_RETURN(guid, MFVideoFormat_DVSL); // FCC('dvsl') + IF_EQUAL_RETURN(guid, MFVideoFormat_H264); // FCC('H264') + IF_EQUAL_RETURN(guid, MFVideoFormat_H265); + IF_EQUAL_RETURN(guid, MFVideoFormat_HEVC); + IF_EQUAL_RETURN(guid, MFVideoFormat_HEVC_ES); + IF_EQUAL_RETURN(guid, MFVideoFormat_I420); // FCC('I420') + IF_EQUAL_RETURN(guid, MFVideoFormat_IYUV); // FCC('IYUV') + IF_EQUAL_RETURN(guid, MFVideoFormat_M4S2); // FCC('M4S2') + IF_EQUAL_RETURN(guid, MFVideoFormat_MJPG); + IF_EQUAL_RETURN(guid, MFVideoFormat_MP43); // FCC('MP43') + IF_EQUAL_RETURN(guid, MFVideoFormat_MP4S); // FCC('MP4S') + IF_EQUAL_RETURN(guid, MFVideoFormat_MP4V); // FCC('MP4V') + IF_EQUAL_RETURN(guid, MFVideoFormat_MPG1); // FCC('MPG1') + IF_EQUAL_RETURN(guid, MFVideoFormat_MSS1); // FCC('MSS1') + IF_EQUAL_RETURN(guid, MFVideoFormat_MSS2); // FCC('MSS2') + IF_EQUAL_RETURN(guid, MFVideoFormat_NV11); // FCC('NV11') + IF_EQUAL_RETURN(guid, MFVideoFormat_NV12); // FCC('NV12') + IF_EQUAL_RETURN(guid, MFVideoFormat_P010); // FCC('P010') + IF_EQUAL_RETURN(guid, MFVideoFormat_P016); // FCC('P016') + IF_EQUAL_RETURN(guid, MFVideoFormat_P210); // FCC('P210') + IF_EQUAL_RETURN(guid, MFVideoFormat_P216); // FCC('P216') + IF_EQUAL_RETURN(guid, MFVideoFormat_RGB24); // D3DFMT_R8G8B8 + IF_EQUAL_RETURN(guid, MFVideoFormat_RGB32); // D3DFMT_X8R8G8B8 + IF_EQUAL_RETURN(guid, MFVideoFormat_RGB555); // D3DFMT_X1R5G5B5 + IF_EQUAL_RETURN(guid, MFVideoFormat_RGB565); // D3DFMT_R5G6B5 + IF_EQUAL_RETURN(guid, MFVideoFormat_RGB8); + IF_EQUAL_RETURN(guid, MFVideoFormat_UYVY); // FCC('UYVY') + IF_EQUAL_RETURN(guid, MFVideoFormat_v210); // FCC('v210') + IF_EQUAL_RETURN(guid, MFVideoFormat_v410); // FCC('v410') + IF_EQUAL_RETURN(guid, MFVideoFormat_WMV1); // FCC('WMV1') + IF_EQUAL_RETURN(guid, MFVideoFormat_WMV2); // FCC('WMV2') + IF_EQUAL_RETURN(guid, MFVideoFormat_WMV3); // FCC('WMV3') + IF_EQUAL_RETURN(guid, MFVideoFormat_WVC1); // FCC('WVC1') + IF_EQUAL_RETURN(guid, MFVideoFormat_VP90); + IF_EQUAL_RETURN(guid, MFVideoFormat_Y210); // FCC('Y210') + IF_EQUAL_RETURN(guid, MFVideoFormat_Y216); // FCC('Y216') + IF_EQUAL_RETURN(guid, MFVideoFormat_Y410); // FCC('Y410') + IF_EQUAL_RETURN(guid, MFVideoFormat_Y416); // FCC('Y416') + IF_EQUAL_RETURN(guid, MFVideoFormat_Y41P); + IF_EQUAL_RETURN(guid, MFVideoFormat_Y41T); + IF_EQUAL_RETURN(guid, MFVideoFormat_YUY2); // FCC('YUY2') + IF_EQUAL_RETURN(guid, MFVideoFormat_YV12); // FCC('YV12') + IF_EQUAL_RETURN(guid, MFVideoFormat_YVYU); + + IF_EQUAL_RETURN(guid, MFAudioFormat_PCM); // WAVE_FORMAT_PCM + IF_EQUAL_RETURN(guid, MFAudioFormat_Float); // WAVE_FORMAT_IEEE_FLOAT + IF_EQUAL_RETURN(guid, MFAudioFormat_DTS); // WAVE_FORMAT_DTS + IF_EQUAL_RETURN(guid, MFAudioFormat_Dolby_AC3_SPDIF); // WAVE_FORMAT_DOLBY_AC3_SPDIF + IF_EQUAL_RETURN(guid, MFAudioFormat_DRM); // WAVE_FORMAT_DRM + IF_EQUAL_RETURN(guid, MFAudioFormat_WMAudioV8); // WAVE_FORMAT_WMAUDIO2 + IF_EQUAL_RETURN(guid, MFAudioFormat_WMAudioV9); // WAVE_FORMAT_WMAUDIO3 + IF_EQUAL_RETURN(guid, MFAudioFormat_WMAudio_Lossless); // WAVE_FORMAT_WMAUDIO_LOSSLESS + IF_EQUAL_RETURN(guid, MFAudioFormat_WMASPDIF); // WAVE_FORMAT_WMASPDIF + IF_EQUAL_RETURN(guid, MFAudioFormat_MSP1); // WAVE_FORMAT_WMAVOICE9 + IF_EQUAL_RETURN(guid, MFAudioFormat_MP3); // WAVE_FORMAT_MPEGLAYER3 + IF_EQUAL_RETURN(guid, MFAudioFormat_MPEG); // WAVE_FORMAT_MPEG + IF_EQUAL_RETURN(guid, MFAudioFormat_AAC); // WAVE_FORMAT_MPEG_HEAAC + IF_EQUAL_RETURN(guid, MFAudioFormat_ADTS); // WAVE_FORMAT_MPEG_ADTS_AAC + + return ""; +} + +static IDataProvider::mfx_codec_id_type convert_to_mfx_codec_id(const GUID& guid) { + if (guid == MFVideoFormat_H264) { + return MFX_CODEC_AVC; + } else if (guid == MFVideoFormat_H265 || + guid == MFVideoFormat_HEVC || + guid == MFVideoFormat_HEVC_ES) { + return MFX_CODEC_HEVC; + } else if (guid == MFAudioFormat_MPEG) { + return MFX_CODEC_MPEG2; + } else if (guid == MFVideoFormat_WVC1) { + return MFX_CODEC_VC1; + } else if (guid == MFVideoFormat_VP90) { + return MFX_CODEC_VP9; + } else if (guid == MFVideoFormat_AV1) { + return MFX_CODEC_AV1; + } else if (guid == MFVideoFormat_MJPG) { + return MFX_CODEC_JPEG; + } + + throw DataProviderUnsupportedException(std::string("unsupported codec type: ") + + GetGUIDNameConst(guid)); +} + +bool MFPAsyncDemuxDataProvider::select_supported_video_stream( + ComPtrGuard &descriptor, + mfx_codec_id_type &out_codec_id, + void *source_id) { + DWORD stream_count = 0; + BOOL is_stream_selected = false; + descriptor->GetStreamDescriptorCount(&stream_count); + GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " << + "received stream count: " << stream_count); + for (DWORD stream_index = 0; + stream_index < stream_count && !is_stream_selected; stream_index++) { + + GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " << + "check stream info by index: " << stream_index); + IMFStreamDescriptor *stream_descriptor_tmp = nullptr; + descriptor->GetStreamDescriptorByIndex(stream_index, &is_stream_selected, + &stream_descriptor_tmp); + if (!stream_descriptor_tmp) { + GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " << + "Cannot get stream descriptor by index: " << + stream_index); + continue; + } + + ComPtrGuard stream_descriptor = + createCOMPtrGuard(stream_descriptor_tmp); + is_stream_selected = false; // deselect until supported stream found + IMFMediaTypeHandler *handler_tmp = nullptr; + stream_descriptor->GetMediaTypeHandler(&handler_tmp); + if (!handler_tmp) { + GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " << + "Cannot get media type handler for stream by index: " << + stream_index); + continue; + } + + ComPtrGuard handler = createCOMPtrGuard(handler_tmp); + GUID guidMajorType; + if (FAILED(handler->GetMajorType(&guidMajorType))) { + GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " << + "Cannot get major GUID type for stream by index: " << + stream_index); + continue; + } + + if (guidMajorType != MFMediaType_Video) { + GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " << + "Skipping non-video stream"); + continue; + } + GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " << + "video stream detected"); + IMFMediaType *media_type_tmp = nullptr; + handler->GetCurrentMediaType(&media_type_tmp); + if (!media_type_tmp) { + GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " << + "Cannot determine media type for stream by index: " << + stream_index); + continue; + } + + ComPtrGuard media_type = createCOMPtrGuard(media_type_tmp); + GUID subtype; + if (SUCCEEDED(media_type->GetGUID(MF_MT_SUBTYPE, &subtype))) { + GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " << + "video type: " << GetGUIDNameConst(subtype)); + + std::string is_codec_supported("unsupported, skip..."); + try { + out_codec_id = convert_to_mfx_codec_id(subtype); + is_stream_selected = true; + is_codec_supported = "selected!"; + } catch (...) {} + + GAPI_LOG_INFO(nullptr, "[" << source_id << "] " << + "video stream index: " << stream_index << + ", codec: " << GetGUIDNameConst(subtype) << + " - " << is_codec_supported) + } else { + GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " << + "Cannot get media GUID subtype for stream by index: " << + stream_index); + continue; + } + } + return is_stream_selected; +} + +MFPAsyncDemuxDataProvider::MFPAsyncDemuxDataProvider(const std::string& file_path, + size_t keep_preprocessed_buf_count_value) : + keep_preprocessed_buf_count(keep_preprocessed_buf_count_value), + source(createCOMPtrGuard()), + source_reader(createCOMPtrGuard()), + codec(std::numeric_limits::max()), + provider_state(State::InProgress) { + + submit_read_request.clear(); + com_interface_reference_count = 1; // object itself + + HRESULT hr = S_OK; + hr = MFStartup(MF_VERSION); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), "Cannot initialize MFStartup"); + } + + GAPI_LOG_INFO(nullptr, "[" << this << "] " << + " initializing, URI " << file_path); + IMFMediaSource *source_tmp = nullptr; + hr = create_media_source(file_path, &source_tmp); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), "Cannot create IMFMediaSource"); + } + source.reset(source_tmp); + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + " start creating source attributes"); + IMFAttributes *attrs_tmp = nullptr; + + // NB: create 2 attributes for disable converters & async callback capability + const UINT32 relevant_attributes_count = 2; + hr = MFCreateAttributes(&attrs_tmp, relevant_attributes_count); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), "MFCreateAttributes failed"); + } + + ComPtrGuard attributes = createCOMPtrGuard(attrs_tmp); + hr = attributes->SetUINT32(MF_READWRITE_DISABLE_CONVERTERS, TRUE); + + // set the callback pointer. + if (SUCCEEDED(hr)) + { + hr = attributes->SetUnknown( + MF_SOURCE_READER_ASYNC_CALLBACK, + this + ); + } + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), "Cannot set MFP async callback "); + } + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "is getting presentation descriptor"); + IMFPresentationDescriptor* descriptor_tmp = nullptr; + hr = source->CreatePresentationDescriptor(&descriptor_tmp); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), + "CreatePresentationDescriptor failed"); + } + ComPtrGuard descriptor = createCOMPtrGuard(descriptor_tmp); + if (!MFPAsyncDemuxDataProvider::select_supported_video_stream(descriptor, codec, this)) { + // NB: let's pretty notify clients about list of supported codecs to keep + // contract in explicit way to avoid continuous troubleshooting + const auto &supported_codecs = get_supported_mfx_codec_ids(); + std::string ss; + for (mfxU32 id : supported_codecs) { + ss += mfx_codec_id_to_cstr(id); + ss += ", "; + } + if (!ss.empty()) { + ss.erase(ss.size() - 2, 2); + } + + GAPI_LOG_WARNING(nullptr, "[" << this << "] " + "couldn't find video stream with supported params, " + "expected codecs: " << ss); + throw DataProviderUnsupportedException("couldn't find supported video stream"); + } + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "is creating media source"); + IMFSourceReader *source_reader_tmp = nullptr; + hr = MFCreateSourceReaderFromMediaSource(source.get(), attributes.get(), + &source_reader_tmp); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), + "MFCreateSourceReaderFromMediaSource failed"); + } + source_reader = createCOMPtrGuard(source_reader_tmp); + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "created IMFSourceReader: " << source_reader); + + // Ask for the first sample. + hr = request_next(hr, 0, 0); + if (FAILED(hr)) { + throw DataProviderSystemErrorException(HRESULT_CODE(hr), + "ReadSample failed while requesting initial sample"); + } + GAPI_LOG_INFO(nullptr, "[" << this << "] " << + "initialized"); +} + +MFPAsyncDemuxDataProvider::~MFPAsyncDemuxDataProvider() { + GAPI_LOG_INFO(nullptr, "[" << this << "] " << + "begin deinitializing"); + + flush(); + + { + std::unique_lock l(buffer_storage_mutex); + GAPI_LOG_INFO(nullptr, "Clean up async storage, count: " << + worker_key_to_buffer_mapping_storage.size()); + for (auto& buffer : worker_key_to_buffer_mapping_storage) { + if (buffer.second) { + buffer.second->Unlock(); + } + } + worker_key_to_buffer_mapping_storage.clear(); + } + + GAPI_LOG_INFO(nullptr, "Clean data storage, elapsed buffer count: " << + processing_key_to_buffer_mapping_storage.size()); + for (auto& buffer : processing_key_to_buffer_mapping_storage) { + if (buffer.second) { + buffer.second->Unlock(); + } + } + processing_key_to_buffer_mapping_storage.clear(); + + // release COM object before overall MFP shutdown + source_reader.reset(); + source.reset(); + + MFShutdown(); + GAPI_LOG_INFO(nullptr, "[" << this << "] " << + "deinitialized"); +} + + +ULONG MFPAsyncDemuxDataProvider::AddRef() { + // align behavior with InterlockedIncrement + return com_interface_reference_count.fetch_add(1) + 1; +} + +ULONG MFPAsyncDemuxDataProvider::Release() { + auto count = com_interface_reference_count.fetch_sub(1); + GAPI_Assert(count != 0 && "Incorrect reference counting for MFPAsyncDemuxDataProvider"); + count -= 1; // align behavior with InterlockedDecrement + return count; +} + +HRESULT MFPAsyncDemuxDataProvider::QueryInterface(REFIID riid, void** ppv) +{ + static const QITAB qit[] = + { + QITABENT(MFPAsyncDemuxDataProvider, IMFSourceReaderCallback), + { 0 }, + }; + return QISearch(this, qit, riid, ppv); +} + + +STDMETHODIMP +MFPAsyncDemuxDataProvider::OnReadSample(HRESULT status, DWORD, + DWORD stream_flag, LONGLONG, + IMFSample *sample_ptr) { + GAPI_LOG_DEBUG(nullptr, "[" << this << "] status: " << std::to_string(HRESULT_CODE(status)) << + ", stream flags: " << stream_flag << + ", sample: " << sample_ptr); + HRESULT hr = S_OK; + if (FAILED(status)) { + hr = status; + } + + // check EOF + if (stream_flag & MF_SOURCE_READERF_ENDOFSTREAM) { + GAPI_LOG_DEBUG(nullptr, "[" << this << "] EOF"); + + // close reader + provider_state.store(State::Exhausted); + buffer_storage_non_empty_cond.notify_all(); + return hr; + } + + submit_read_request.clear(); + + // extract stream data + size_t worker_buffer_count = 0; + if (SUCCEEDED(hr)) { + if (sample_ptr) { + // Get the video frame buffer from the sample. + IMFMediaBuffer *buffer_ptr = nullptr; + hr = sample_ptr->ConvertToContiguousBuffer(&buffer_ptr); + GAPI_Assert(SUCCEEDED(hr) && + "MFPAsyncDemuxDataProvider::OnReadSample - ConvertToContiguousBuffer failed"); + + DWORD max_buffer_size = 0; + DWORD curr_size = 0; + + // lock buffer directly into mfx bitstream + std::shared_ptr staging_stream = std::make_shared(); + staging_stream->Data = nullptr; + + hr = buffer_ptr->Lock(&staging_stream->Data, &max_buffer_size, &curr_size); + GAPI_Assert(SUCCEEDED(hr) && + "MFPAsyncDemuxDataProvider::OnReadSample - Lock failed"); + + staging_stream->MaxLength = max_buffer_size; + staging_stream->DataLength = curr_size; + staging_stream->CodecId = get_mfx_codec_id(); + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] bitstream created, data: " << + static_cast(staging_stream->Data) << + ", MaxLength: " << staging_stream->MaxLength << + ", DataLength: " << staging_stream->DataLength); + + worker_buffer_count = produce_worker_data(staging_stream->Data, + createCOMPtrGuard(buffer_ptr), + std::move(staging_stream)); + } + } else { + GAPI_LOG_WARNING(nullptr, "[" << this << "] callback failed" + ", status: " << std::to_string(HRESULT_CODE(status)) << + ", stream flags: " << stream_flag << + ", sample: " << sample_ptr); + } + + hr = request_next(hr, stream_flag, worker_buffer_count); + return hr; +} + +size_t MFPAsyncDemuxDataProvider::get_locked_buffer_size() const { + std::unique_lock l(buffer_storage_mutex); + return worker_locked_buffer_storage.size(); +} + +STDMETHODIMP MFPAsyncDemuxDataProvider::OnEvent(DWORD, IMFMediaEvent *) { + return S_OK; +} + +STDMETHODIMP MFPAsyncDemuxDataProvider::OnFlush(DWORD) { + provider_state.store(State::Exhausted); + buffer_storage_non_empty_cond.notify_all(); + return S_OK; +} + +void MFPAsyncDemuxDataProvider::flush() { + if(source_reader) { + GAPI_LOG_INFO(nullptr, "[" << this << "] set flush"); + source_reader->Flush(static_cast(MF_SOURCE_READER_ALL_STREAMS)); + } + + size_t iterations = 0; + const int waiting_ms = 100; + const size_t warning_iteration_wait_count = 300; // approx 30 sec + while (provider_state.load() != State::Exhausted) { + iterations++; + if (iterations > warning_iteration_wait_count) { + GAPI_LOG_WARNING(nullptr, "[" << this << "] is still waiting for flush finishing, " + "iteration: " << iterations); + } else { + GAPI_LOG_DEBUG(nullptr, "[" << this << "] is waiting for flush finishing, " + "iteration: " << iterations); + } + std::unique_lock l(buffer_storage_mutex); + buffer_storage_non_empty_cond.wait_for(l, std::chrono::milliseconds(waiting_ms)); + } + + GAPI_LOG_INFO(nullptr, "[" << this << "] has flushed in: " << + iterations * waiting_ms << "ms interval"); +} + +HRESULT MFPAsyncDemuxDataProvider::request_next(HRESULT hr, + DWORD stream_flag, + size_t worker_buffer_count) { + GAPI_LOG_DEBUG(nullptr, "[" << this << "] status: " << + std::to_string(HRESULT_CODE(hr)) << + ", stream flags: " << stream_flag << + ", worker buffer count: (" << worker_buffer_count << + "/" << keep_preprocessed_buf_count << ")"); + // check gap in stream + if (stream_flag & MF_SOURCE_READERF_STREAMTICK ) { + GAPI_LOG_INFO(nullptr, "[" << this << "] stream gap detected"); + return hr; + } + + if (FAILED(hr)) { + GAPI_LOG_WARNING(nullptr, "[" << this << "] callback error " + ", status: " << std::to_string(HRESULT_CODE(hr)) << + ", stream flags: " << stream_flag); + } + + // put on worker buffers available ready + if (worker_buffer_count < keep_preprocessed_buf_count) { + // only one consumer might make submit + if (!submit_read_request.test_and_set()) { + hr = source_reader->ReadSample((DWORD)MF_SOURCE_READER_FIRST_VIDEO_STREAM, + 0, NULL, NULL, NULL, NULL); + GAPI_LOG_DEBUG(nullptr, "[" << this << "] submit read sample, status: " << + std::to_string(HRESULT_CODE(hr))); + } + } + return hr; +} + +void MFPAsyncDemuxDataProvider::consume_worker_data() { + // wait callback exchange + std::unique_lock l(buffer_storage_mutex); + buffer_storage_non_empty_cond.wait(l, [this] { + bool empty = worker_locked_buffer_storage.empty(); + if (empty) { + if (!submit_read_request.test_and_set()) { + (void)source_reader->ReadSample((DWORD)MF_SOURCE_READER_FIRST_VIDEO_STREAM, + 0, NULL, NULL, NULL, NULL); + } + } else { + worker_key_to_buffer_mapping_storage.swap(processing_key_to_buffer_mapping_storage); + worker_locked_buffer_storage.swap(processing_locked_buffer_storage); + } + + return !empty || provider_state == State::Exhausted; + }); +} + +size_t MFPAsyncDemuxDataProvider::produce_worker_data(void *key, + ComPtrGuard &&buffer, + std::shared_ptr &&staging_stream) { + size_t bitstream_count = 0; + size_t worker_buffer_count = 0; + { + std::unique_lock l(buffer_storage_mutex); + + // remember sample buffer to keep data safe + worker_key_to_buffer_mapping_storage.emplace(key, std::move(buffer)); + worker_buffer_count = worker_key_to_buffer_mapping_storage.size(); + + // remember bitstream for consuming + worker_locked_buffer_storage.push(std::move(staging_stream)); + bitstream_count = worker_locked_buffer_storage.size(); + buffer_storage_non_empty_cond.notify_all(); + } + GAPI_DbgAssert(worker_buffer_count == bitstream_count && + "worker_key_to_buffer_mapping_storage & worker_locked_buffer_storage" + " must be the same size" ); + GAPI_LOG_DEBUG(nullptr, "[" << this << "] created dmux buffer by key: " << + key << ", ready bitstream count: " << + bitstream_count); + + return worker_buffer_count; +} + +/////////////// IDataProvider methods /////////////// +IDataProvider::mfx_codec_id_type MFPAsyncDemuxDataProvider::get_mfx_codec_id() const { + return codec; +} + +bool MFPAsyncDemuxDataProvider::fetch_bitstream_data(std::shared_ptr &out_bitsream) { + if (empty()) { + GAPI_LOG_DEBUG(nullptr, "[" << this << "] empty"); + return false; + } + + // utilize consumed bitstream portion allocated at prev step + if (out_bitsream) { + // make dmux buffer unlock for not empty bitstream + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "bitstream before fetch: " << + out_bitsream.get() << + ", DataOffset: " << + out_bitsream->DataOffset << + ", DataLength: " << + out_bitsream->DataLength); + if (out_bitsream->DataOffset < out_bitsream->DataLength) { + return true; + } + + // cleanup + auto it = processing_key_to_buffer_mapping_storage.find(out_bitsream->Data); + if (it == processing_key_to_buffer_mapping_storage.end()) { + GAPI_LOG_WARNING(nullptr, "[" << this << "] " << + "cannot find appropriate dmux buffer by key: " << + static_cast(out_bitsream->Data)); + GAPI_Assert(false && "invalid bitstream key"); + } + if (it->second) { + it->second->Unlock(); + } + processing_key_to_buffer_mapping_storage.erase(it); + } + + // consume new bitstream portion + if (processing_locked_buffer_storage.empty() && + provider_state.load() == State::InProgress) { + // get worker data collected from another thread + consume_worker_data(); + } + + // EOF check: nothing to process at this point + if (processing_locked_buffer_storage.empty()) { + GAPI_DbgAssert(provider_state == State::Exhausted && "Source reader must be drained"); + out_bitsream.reset(); + return false; + } + + out_bitsream = processing_locked_buffer_storage.front(); + processing_locked_buffer_storage.pop(); + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " + "bitstream after fetch: " << + out_bitsream.get() << + ", DataOffset: " << + out_bitsream->DataOffset << + ", DataLength: " << + out_bitsream->DataLength); + return true; +} + +bool MFPAsyncDemuxDataProvider::empty() const { + return (provider_state.load() == State::Exhausted) && + (processing_locked_buffer_storage.size() == 0) && + (get_locked_buffer_size() == 0); +} +#else // _WIN32 + +MFPAsyncDemuxDataProvider::MFPAsyncDemuxDataProvider(const std::string&) { + GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available"); +} +IDataProvider::mfx_codec_id_type MFPAsyncDemuxDataProvider::get_mfx_codec_id() const { + GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available"); + return std::numeric_limits::max(); +} + +bool MFPAsyncDemuxDataProvider::fetch_bitstream_data(std::shared_ptr &) { + GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available"); + return false; +} + +bool MFPAsyncDemuxDataProvider::empty() const override { + GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available"); + return true; +} +#endif // _WIN32 +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // HAVE_ONEVPL diff --git a/modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.hpp b/modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.hpp new file mode 100644 index 0000000000000000000000000000000000000000..706d63dca73e87cca08cd193e3bec949205e63c6 --- /dev/null +++ b/modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.hpp @@ -0,0 +1,126 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef GAPI_STREAMING_ONEVPL_DEMUX_ASYNC_MFP_DEMUX_DATA_PROVIDER_HPP +#define GAPI_STREAMING_ONEVPL_DEMUX_ASYNC_MFP_DEMUX_DATA_PROVIDER_HPP + +#include +#include +#include +#include + +#ifdef HAVE_ONEVPL +#include + +#ifdef _WIN32 +#define NOMINMAX +#include +#include +#include +#include +#include +#include +#include +#include +#include +#undef NOMINMAX + +#include +#include "streaming/onevpl/data_provider_defines.hpp" +#include "streaming/onevpl/utils.hpp" + +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { +struct GAPI_EXPORTS MFPAsyncDemuxDataProvider : public IDataProvider, + public IMFSourceReaderCallback { + MFPAsyncDemuxDataProvider(const std::string& file_path, + size_t keep_preprocessed_buf_count_value = 3); + ~MFPAsyncDemuxDataProvider(); + + mfx_codec_id_type get_mfx_codec_id() const override; + bool fetch_bitstream_data(std::shared_ptr &out_bitsream) override; + bool empty() const override; + +protected: /* For Unit tests only */ + enum class State { + InProgress, + Exhausted + }; + + // IUnknown methods forbidden for current implementations + STDMETHODIMP QueryInterface(REFIID iid, void** ppv) override; + STDMETHODIMP_(ULONG) AddRef() override; + STDMETHODIMP_(ULONG) Release() override; + + // IMFSourceReaderCallback methods + virtual STDMETHODIMP OnReadSample(HRESULT status, DWORD stream_index, + DWORD stream_flag, LONGLONG timestamp, + IMFSample *sample_ptr) override; + STDMETHODIMP OnEvent(DWORD, IMFMediaEvent *) override; + STDMETHODIMP OnFlush(DWORD) override; + + // implementation methods + void flush(); + HRESULT request_next(HRESULT hr, DWORD stream_flag, + size_t worker_buffer_count); + void consume_worker_data(); + virtual size_t produce_worker_data(void *key, + ComPtrGuard &&buffer, + std::shared_ptr &&staging_stream); + size_t get_locked_buffer_size() const; + +private: + static bool select_supported_video_stream(ComPtrGuard &descriptor, + mfx_codec_id_type &out_codec_id, + void *source_id); + // members + size_t keep_preprocessed_buf_count; + + // COM members + ComPtrGuard source; + ComPtrGuard source_reader; + std::atomic com_interface_reference_count; + + mfx_codec_id_type codec; + + // worker & processing buffers + std::map> worker_key_to_buffer_mapping_storage; + std::map> processing_key_to_buffer_mapping_storage; + std::queue> worker_locked_buffer_storage; + std::queue> processing_locked_buffer_storage; + std::condition_variable buffer_storage_non_empty_cond; + mutable std::mutex buffer_storage_mutex; + + std::atomic_flag submit_read_request; + std::atomic provider_state; +}; +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv + +#else // _WIN32 +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { +struct GAPI_EXPORTS MFPAsyncDemuxDataProvider : public IDataProvider { + explicit MFPAsyncDemuxDataProvider(const std::string&); + + mfx_codec_id_type get_mfx_codec_id() const override; + bool fetch_bitstream_data(std::shared_ptr &out_bitsream) override; + bool empty() const override; +}; +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // _WIN32 +#endif // HAVE_ONEVPL +#endif // GAPI_STREAMING_ONEVPL_DEMUX_ASYNC_MFP_DEMUX_DATA_PROVIDER_HPP diff --git a/modules/gapi/src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp b/modules/gapi/src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp index a5e98cc118cfc5246563dd1fcdf657b0224df93d..0abca7f5a58e4a9ce756db04a9d7628b4825cd32 100644 --- a/modules/gapi/src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp +++ b/modules/gapi/src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp @@ -10,6 +10,7 @@ #include #include +#include "streaming/onevpl/data_provider_defines.hpp" #include "streaming/onevpl/engine/decode/decode_engine_legacy.hpp" #include "streaming/onevpl/engine/decode/decode_session.hpp" @@ -122,8 +123,14 @@ VPLLegacyDecodeEngine::VPLLegacyDecodeEngine(std::unique_ptr ExecutionStatus { LegacyDecodeSession &my_sess = static_cast(sess); - my_sess.last_status = ReadEncodedStream(my_sess.stream, my_sess.data_provider); - if (my_sess.last_status != MFX_ERR_NONE) { + if (!my_sess.data_provider) { + my_sess.last_status = MFX_ERR_MORE_DATA; + return ExecutionStatus::Continue; + } + + my_sess.last_status = MFX_ERR_NONE; + if (!my_sess.data_provider->fetch_bitstream_data(my_sess.stream)) { + my_sess.last_status = MFX_ERR_MORE_DATA; my_sess.data_provider.reset(); //close source } return ExecutionStatus::Continue; @@ -140,7 +147,7 @@ VPLLegacyDecodeEngine::VPLLegacyDecodeEngine(std::unique_ptrget_handle(), &sync_pair.second, @@ -339,8 +346,10 @@ ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::process_error(mfxSt // The decoder detected a new sequence header in the bitstream. // Video parameters may have changed. // In external memory allocation case, might need to reallocate the output surface - GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - " + /*GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - " "MFX_WRN_VIDEO_PARAM_CHANGED is not processed"); + */ + return ExecutionStatus::Continue; break; case MFX_ERR_INCOMPATIBLE_VIDEO_PARAM: // The function detected that video parameters provided by the application diff --git a/modules/gapi/src/streaming/onevpl/engine/engine_session.cpp b/modules/gapi/src/streaming/onevpl/engine/engine_session.cpp index 9f8028361a90de93c99e02dd54153cd2ce82caf3..4915b51e34764fb2e448af4ca522022bf36c88b0 100644 --- a/modules/gapi/src/streaming/onevpl/engine/engine_session.cpp +++ b/modules/gapi/src/streaming/onevpl/engine/engine_session.cpp @@ -14,7 +14,7 @@ namespace gapi { namespace wip { namespace onevpl { -EngineSession::EngineSession(mfxSession sess, mfxBitstream&& str) : +EngineSession::EngineSession(mfxSession sess, std::shared_ptr&& str) : session(sess), stream(std::move(str)) {} EngineSession::~EngineSession() { diff --git a/modules/gapi/src/streaming/onevpl/engine/engine_session.hpp b/modules/gapi/src/streaming/onevpl/engine/engine_session.hpp index d8f2f6b31246a2f0424a795f21810b0322e42bc4..e0c6d01f8b416c9d5a65381ed5b320b6e8970bce 100644 --- a/modules/gapi/src/streaming/onevpl/engine/engine_session.hpp +++ b/modules/gapi/src/streaming/onevpl/engine/engine_session.hpp @@ -15,6 +15,7 @@ #include #include "opencv2/gapi/own/exports.hpp" // GAPI_EXPORTS +#include #ifdef HAVE_ONEVPL #include @@ -26,17 +27,17 @@ namespace onevpl { // GAPI_EXPORTS for tests struct GAPI_EXPORTS DecoderParams { - mfxBitstream stream; + std::shared_ptr stream; mfxVideoParam param; }; struct GAPI_EXPORTS EngineSession { mfxSession session; - mfxBitstream stream; + std::shared_ptr stream; mfxSyncPoint sync; mfxStatus last_status; - EngineSession(mfxSession sess, mfxBitstream&& str); + EngineSession(mfxSession sess, std::shared_ptr&& str); std::string error_code_to_str() const; virtual ~EngineSession(); }; diff --git a/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.cpp b/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.cpp index 382c3ae88badc0ca42a37316d2711a85a955703b..9a4c4a7fb0873e6c806b9cd771de4ae19a1a7b5c 100644 --- a/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.cpp +++ b/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.cpp @@ -99,34 +99,6 @@ const VPLAccelerationPolicy* ProcessingEngineBase::get_accel() const { VPLAccelerationPolicy* ProcessingEngineBase::get_accel() { return const_cast(static_cast(this)->get_accel()); } - - -// Read encoded stream from file -mfxStatus ReadEncodedStream(mfxBitstream &bs, std::shared_ptr& data_provider) { - - if (!data_provider) { - return MFX_ERR_MORE_DATA; - } - - mfxU8 *p0 = bs.Data; - mfxU8 *p1 = bs.Data + bs.DataOffset; - if (bs.DataOffset > bs.MaxLength - 1) { - return MFX_ERR_NOT_ENOUGH_BUFFER; - } - if (bs.DataLength + bs.DataOffset > bs.MaxLength) { - return MFX_ERR_NOT_ENOUGH_BUFFER; - } - - std::copy_n(p1, bs.DataLength, p0); - - bs.DataOffset = 0; - bs.DataLength += static_cast(data_provider->fetch_data(bs.MaxLength - bs.DataLength, - bs.Data + bs.DataLength)); - if (bs.DataLength == 0) - return MFX_ERR_MORE_DATA; - - return MFX_ERR_NONE; -} } // namespace onevpl } // namespace wip } // namespace gapi diff --git a/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.hpp b/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.hpp index b307c18112e356fdb70ae941584741b2a6fd1758..4b34721d6741962b244ef38a9050b22a785041ed 100644 --- a/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.hpp +++ b/modules/gapi/src/streaming/onevpl/engine/processing_engine_base.hpp @@ -87,9 +87,6 @@ protected: return sess_impl; } }; - - -mfxStatus ReadEncodedStream(mfxBitstream &bs, std::shared_ptr& data_provider); } // namespace onevpl } // namespace wip } // namespace gapi diff --git a/modules/gapi/src/streaming/onevpl/file_data_provider.cpp b/modules/gapi/src/streaming/onevpl/file_data_provider.cpp index 2a3df07c512e657a920bf816de4d8074f1f96064..020d471b55cea3377507cb9b8c9d4fed56dcf5c1 100644 --- a/modules/gapi/src/streaming/onevpl/file_data_provider.cpp +++ b/modules/gapi/src/streaming/onevpl/file_data_provider.cpp @@ -3,44 +3,152 @@ // of this distribution and at http://opencv.org/license.html. // // Copyright (C) 2021 Intel Corporation + #include #include "streaming/onevpl/file_data_provider.hpp" +#include "streaming/onevpl/cfg_params_parser.hpp" +#include "streaming/onevpl/utils.hpp" +#include "logger.hpp" namespace cv { namespace gapi { namespace wip { namespace onevpl { -FileDataProvider::FileDataProvider(const std::string& file_path) : - source_handle(fopen(file_path.c_str(), "rb"), &fclose) { +#ifdef HAVE_ONEVPL +FileDataProvider::FileDataProvider(const std::string& file_path, + const std::vector &codec_params, + uint32_t bitstream_data_size_value) : + source_handle(nullptr, &fclose), + bitstream_data_size(bitstream_data_size_value) { + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "check codec Id from CfgParam, total param count: " << + codec_params.size()); + auto codec_it = + std::find_if(codec_params.begin(), codec_params.end(), [] (const CfgParam& value) { + return value.get_name() == "mfxImplDescription.mfxDecoderDescription.decoder.CodecID"; + }); + if (codec_it == codec_params.end()) + { + GAPI_LOG_WARNING(nullptr, "[" << this << "] " << + "\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\" " + "is absent, total param count" << codec_params.size()); + throw DataProviderUnsupportedException("\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\" " + "is required for FileDataProvider"); + } + + codec = cfg_param_to_mfx_variant(*codec_it).Data.U32; + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "opening file: " << file_path); + source_handle.reset(fopen(file_path.c_str(), "rb")); if (!source_handle) { throw DataProviderSystemErrorException(errno, "FileDataProvider: cannot open source file: " + file_path); } + + GAPI_LOG_INFO(nullptr, "[" << this << "] " << + "file: " << file_path << " opened, codec requested: " << mfx_codec_id_to_cstr(codec)); } FileDataProvider::~FileDataProvider() = default; -size_t FileDataProvider::fetch_data(size_t out_data_bytes_size, void* out_data) { +IDataProvider::mfx_codec_id_type FileDataProvider::get_mfx_codec_id() const { + return codec; +} + +bool FileDataProvider::fetch_bitstream_data(std::shared_ptr &out_bitstream) { + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + ", dst: " << out_bitstream.get()); if (empty()) { - return 0; + return false; + } + + if (!out_bitstream) { + out_bitstream = std::make_shared(); + out_bitstream->MaxLength = bitstream_data_size; + out_bitstream->Data = (mfxU8 *)calloc(out_bitstream->MaxLength, sizeof(mfxU8)); + if(!out_bitstream->Data) { + throw std::runtime_error("Cannot allocate bitstream.Data bytes: " + + std::to_string(out_bitstream->MaxLength * sizeof(mfxU8))); + } + out_bitstream->CodecId = get_mfx_codec_id(); + } + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "bitstream before fetch, DataOffset: " << + out_bitstream->DataOffset << + ", DataLength: " << + out_bitstream->DataLength); + mfxU8 *p0 = out_bitstream->Data; + mfxU8 *p1 = out_bitstream->Data + out_bitstream->DataOffset; + if (out_bitstream->DataOffset > out_bitstream->MaxLength - 1) { + throw DataProviderImplementationException(mfxstatus_to_string(MFX_ERR_NOT_ENOUGH_BUFFER)); + } + if (out_bitstream->DataLength + out_bitstream->DataOffset > out_bitstream->MaxLength) { + throw DataProviderImplementationException(mfxstatus_to_string(MFX_ERR_NOT_ENOUGH_BUFFER)); } - size_t ret = fread(out_data, 1, out_data_bytes_size, source_handle.get()); - if (ret == 0) { + std::copy_n(p1, out_bitstream->DataLength, p0); + + out_bitstream->DataOffset = 0; + size_t bytes_count = fread(out_bitstream->Data + out_bitstream->DataLength, + 1, out_bitstream->MaxLength - out_bitstream->DataLength, + source_handle.get()); + if (bytes_count == 0) { if (feof(source_handle.get())) { source_handle.reset(); } else { - throw DataProviderSystemErrorException (errno, "FileDataProvider::fetch_data error read"); + throw DataProviderSystemErrorException (errno, "FileDataProvider::fetch_bitstream_data error read"); } } - return ret; + out_bitstream->DataLength += static_cast(bytes_count); + GAPI_LOG_DEBUG(nullptr, "bitstream after fetch, DataOffset: " << out_bitstream->DataOffset << + ", DataLength: " << out_bitstream->DataLength); + if (out_bitstream->DataLength == 0) + return false; + + GAPI_LOG_DEBUG(nullptr, "[" << this << "] " << + "buff fetched: " << out_bitstream.get()); + return true; } bool FileDataProvider::empty() const { return !source_handle; } + +#else // HAVE_ONEVPL + +FileDataProvider::FileDataProvider(const std::string&, + const std::vector &, + uint32_t bitstream_data_size_value) : + source_handle(nullptr, &fclose), + codec(std::numeric_limits::max()), + bitstream_data_size(bitstream_data_size_value) { + GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`"); +} + +FileDataProvider::~FileDataProvider() = default; + +IDataProvider::mfx_codec_id_type FileDataProvider::get_mfx_codec_id() const { + cv::util::suppress_unused_warning(codec); + GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`"); + return codec; +} + +bool FileDataProvider::fetch_bitstream_data(std::shared_ptr &) { + cv::util::suppress_unused_warning(bitstream_data_size); + GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`"); + return false; +} + +bool FileDataProvider::empty() const { + GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`"); + return true; +} +#endif // HAVE_ONEVPL } // namespace onevpl } // namespace wip } // namespace gapi diff --git a/modules/gapi/src/streaming/onevpl/file_data_provider.hpp b/modules/gapi/src/streaming/onevpl/file_data_provider.hpp index 22348290be3790c788b54d5ccafa0ced69c72cbe..cfa12459169f0dd96b3adda4167a6296c4c69736 100644 --- a/modules/gapi/src/streaming/onevpl/file_data_provider.hpp +++ b/modules/gapi/src/streaming/onevpl/file_data_provider.hpp @@ -6,9 +6,13 @@ #ifndef GAPI_STREAMING_ONEVPL_ONEVPL_FILE_DATA_PROVIDER_HPP #define GAPI_STREAMING_ONEVPL_ONEVPL_FILE_DATA_PROVIDER_HPP + #include #include +#include + +#include "streaming/onevpl/data_provider_defines.hpp" namespace cv { namespace gapi { @@ -17,13 +21,18 @@ namespace onevpl { struct FileDataProvider : public IDataProvider { using file_ptr = std::unique_ptr; - FileDataProvider(const std::string& file_path); + FileDataProvider(const std::string& file_path, + const std::vector &codec_params = {}, + uint32_t bitstream_data_size_value = 2000000); ~FileDataProvider(); - size_t fetch_data(size_t out_data_bytes_size, void* out_data) override; + mfx_codec_id_type get_mfx_codec_id() const override; + bool fetch_bitstream_data(std::shared_ptr &out_bitsream) override; bool empty() const override; private: file_ptr source_handle; + mfx_codec_id_type codec; + const uint32_t bitstream_data_size; }; } // namespace onevpl } // namespace wip diff --git a/modules/gapi/src/streaming/onevpl/source.cpp b/modules/gapi/src/streaming/onevpl/source.cpp index 806017a90dd270b6c0efee709635f6b5ac92701e..e5b045188d3797f4878e2719e76509ae14111191 100644 --- a/modules/gapi/src/streaming/onevpl/source.cpp +++ b/modules/gapi/src/streaming/onevpl/source.cpp @@ -7,7 +7,7 @@ #include #include "streaming/onevpl/source_priv.hpp" -#include "streaming/onevpl/file_data_provider.hpp" +#include "streaming/onevpl/data_provider_dispatcher.hpp" #include "streaming/onevpl/cfg_param_device_selector.hpp" namespace cv { @@ -36,7 +36,7 @@ GSource::GSource(const std::string& filePath, GSource::GSource(const std::string& filePath, const CfgParams& cfg_params, std::shared_ptr selector) : - GSource(std::make_shared(filePath), cfg_params, selector) { + GSource(DataProviderDispatcher::create(filePath, cfg_params), cfg_params, selector) { if (filePath.empty()) { util::throw_error(std::logic_error("Cannot create 'GSource' on empty source file name")); } diff --git a/modules/gapi/src/streaming/onevpl/source_priv.cpp b/modules/gapi/src/streaming/onevpl/source_priv.cpp index d00074925d4242c315100bd875cc903c925c77c3..c5de2a6998b201a09a51f8c40936fe35b306bf03 100644 --- a/modules/gapi/src/streaming/onevpl/source_priv.cpp +++ b/modules/gapi/src/streaming/onevpl/source_priv.cpp @@ -12,6 +12,7 @@ #include "streaming/onevpl/accelerators/accel_policy_cpu.hpp" #include "streaming/onevpl/utils.hpp" #include "streaming/onevpl/cfg_params_parser.hpp" +#include "streaming/onevpl/data_provider_defines.hpp" #include "streaming/onevpl/source_priv.hpp" #include "logger.hpp" @@ -44,7 +45,6 @@ enum { VPL_NEW_API_MINOR_VERSION = 2 }; - GSource::Priv::Priv() : mfx_handle(MFXLoad()), mfx_impl_description(), @@ -147,7 +147,7 @@ GSource::Priv::Priv(std::shared_ptr provider, // An available VPL implementation with max matching count std::vector impl_params = get_params_from_string(ss.str()); std::sort(impl_params.begin(), impl_params.end()); - GAPI_LOG_DEBUG(nullptr, "Find implementation cfg params count" << impl_params.size()); + GAPI_LOG_DEBUG(nullptr, "Find implementation cfg params count: " << impl_params.size()); std::vector matched_params; std::set_intersection(impl_params.begin(), impl_params.end(), @@ -195,10 +195,7 @@ GSource::Priv::Priv(std::shared_ptr provider, // initialize decoder // Find codec ID from config - auto dec_it = std::find_if(cfg_params.begin(), cfg_params.end(), [] (const CfgParam& value) { - return value.get_name() == "mfxImplDescription.mfxDecoderDescription.decoder.CodecID"; - }); - GAPI_Assert (dec_it != cfg_params.end() && "Cannot determine DecoderID from oneVPL config. Abort"); + IDataProvider::mfx_codec_id_type decoder_id = provider->get_mfx_codec_id(); // create session driving engine if required if (!engine) { @@ -215,7 +212,7 @@ GSource::Priv::Priv(std::shared_ptr provider, } //create decoder for session accoring to header recovered from source file - DecoderParams decoder_param = create_decoder_from_file(*dec_it, provider); + DecoderParams decoder_param = create_decoder_from_file(decoder_id, provider); // create engine session for processing mfx session pipeline engine->initialize_session(mfx_session, std::move(decoder_param), @@ -233,41 +230,42 @@ GSource::Priv::~Priv() MFXUnload(mfx_handle); } -DecoderParams GSource::Priv::create_decoder_from_file(const CfgParam& decoder_cfg, +DecoderParams GSource::Priv::create_decoder_from_file(uint32_t decoder_id, std::shared_ptr provider) { GAPI_DbgAssert(provider && "Cannot create decoder, data provider is nullptr"); - mfxBitstream bitstream{}; - const int BITSTREAM_BUFFER_SIZE = 2000000; - bitstream.MaxLength = BITSTREAM_BUFFER_SIZE; - bitstream.Data = (mfxU8 *)calloc(bitstream.MaxLength, sizeof(mfxU8)); - if(!bitstream.Data) { - throw std::runtime_error("Cannot allocate bitstream.Data bytes: " + - std::to_string(bitstream.MaxLength * sizeof(mfxU8))); - } - - mfxVariant decoder = cfg_param_to_mfx_variant(decoder_cfg); - // according to oneVPL documentation references - // https://spec.oneapi.io/versions/latest/elements/oneVPL/source/API_ref/VPL_disp_api_struct.html - // mfxVariant is an `union` type and considered different meaning for different param ids - // So CodecId has U32 data type - bitstream.CodecId = decoder.Data.U32; - - mfxStatus sts = ReadEncodedStream(bitstream, provider); - if(MFX_ERR_NONE != sts) { - throw std::runtime_error("Error reading bitstream, error: " + - mfxstatus_to_string(sts)); - } + std::shared_ptr bitstream{}; // Retrieve the frame information from input stream mfxVideoParam mfxDecParams {}; - mfxDecParams.mfx.CodecId = decoder.Data.U32; + mfxDecParams.mfx.CodecId = decoder_id; mfxDecParams.IOPattern = MFX_IOPATTERN_OUT_SYSTEM_MEMORY;//MFX_IOPATTERN_OUT_VIDEO_MEMORY; - sts = MFXVideoDECODE_DecodeHeader(mfx_session, &bitstream, &mfxDecParams); - if(MFX_ERR_NONE != sts) { - throw std::runtime_error("Error decoding header, error: " + - mfxstatus_to_string(sts)); + mfxStatus sts = MFX_ERR_NONE; + bool can_fetch_data = false; + do { + can_fetch_data = provider->fetch_bitstream_data(bitstream); + if (!can_fetch_data) { + // must fetch data always because EOF critical at this point + GAPI_LOG_WARNING(nullptr, "cannot decode header from provider: " << provider.get() << + ". Unexpected EOF"); + throw std::runtime_error("Error reading bitstream: EOF"); + } + + sts = MFXVideoDECODE_DecodeHeader(mfx_session, bitstream.get(), &mfxDecParams); + if(MFX_ERR_NONE != sts && MFX_ERR_MORE_DATA != sts) { + throw std::runtime_error("Error decoding header, error: " + + mfxstatus_to_string(sts)); + } + } while (sts == MFX_ERR_MORE_DATA && !provider->empty()); + + if (MFX_ERR_NONE != sts) { + GAPI_LOG_WARNING(nullptr, "cannot decode header from provider: " << provider.get() + << ". Make sure data source is valid and/or " + "\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\"" + " has correct value in case of demultiplexed raw input"); + throw std::runtime_error("Error decode header, error: " + + mfxstatus_to_string(sts)); } // Input parameters finished, now initialize decode diff --git a/modules/gapi/src/streaming/onevpl/source_priv.hpp b/modules/gapi/src/streaming/onevpl/source_priv.hpp index 955184c05c88031b8b375bb2193fdcd5b90c959d..07e113919103fa1bbcfa298e1251af0da26cdb8f 100644 --- a/modules/gapi/src/streaming/onevpl/source_priv.hpp +++ b/modules/gapi/src/streaming/onevpl/source_priv.hpp @@ -49,7 +49,7 @@ struct GSource::Priv GMetaArg descr_of() const; private: Priv(); - DecoderParams create_decoder_from_file(const CfgParam& decoder, + DecoderParams create_decoder_from_file(uint32_t decoder_id, std::shared_ptr provider); std::unique_ptr initializeHWAccel(); diff --git a/modules/gapi/src/streaming/onevpl/utils.cpp b/modules/gapi/src/streaming/onevpl/utils.cpp index 449d2868135f4dae49b314c7a01dd808de8c824f..6cbe0e7ea11468d478ed5ae8e564a511a6c6c271 100644 --- a/modules/gapi/src/streaming/onevpl/utils.cpp +++ b/modules/gapi/src/streaming/onevpl/utils.cpp @@ -152,6 +152,39 @@ mfxU32 cstr_to_mfx_codec_id(const char* cstr) { throw std::logic_error(std::string("Cannot parse \"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\" value: ") + cstr); } +const char* mfx_codec_id_to_cstr(mfxU32 mfx_id) { + switch(mfx_id) { + case MFX_CODEC_AVC: + return "MFX_CODEC_AVC"; + case MFX_CODEC_HEVC: + return "MFX_CODEC_HEVC"; + case MFX_CODEC_MPEG2: + return "MFX_CODEC_MPEG2"; + case MFX_CODEC_VC1: + return "MFX_CODEC_VC1"; + case MFX_CODEC_VP9: + return "MFX_CODEC_VP9"; + case MFX_CODEC_AV1: + return "MFX_CODEC_AV1"; + case MFX_CODEC_JPEG: + return "MFX_CODEC_JPEG"; + default: + return ""; + } +} + +const std::set& get_supported_mfx_codec_ids() +{ + static std::set supported_codecs({MFX_CODEC_AVC, + MFX_CODEC_HEVC, + MFX_CODEC_MPEG2, + MFX_CODEC_VC1, + MFX_CODEC_VP9, + MFX_CODEC_AV1, + MFX_CODEC_JPEG}); + return supported_codecs; +} + const char* mfx_codec_type_to_cstr(const mfxU32 fourcc, const mfxU32 type) { switch (fourcc) { case MFX_CODEC_JPEG: { @@ -384,6 +417,10 @@ std::ostream& operator<< (std::ostream& out, const mfxImplDescription& idesc) return out; } +std::string mfxstatus_to_string(int64_t err) { + return mfxstatus_to_string(static_cast(err)); +} + std::string mfxstatus_to_string(mfxStatus err) { switch(err) { diff --git a/modules/gapi/src/streaming/onevpl/utils.hpp b/modules/gapi/src/streaming/onevpl/utils.hpp index 19c1b135dc504d8f5e49f715c988ae0e7e7986d8..723ba16a98b40e93e89a146489cd4b36f9082e7a 100644 --- a/modules/gapi/src/streaming/onevpl/utils.hpp +++ b/modules/gapi/src/streaming/onevpl/utils.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -61,10 +62,15 @@ mfxResourceType cstr_to_mfx_resource_type(const char* cstr); mfxU32 cstr_to_mfx_codec_id(const char* cstr); +const char* mfx_codec_id_to_cstr(mfxU32 mfx_id); + +const std::set &get_supported_mfx_codec_ids(); + const char* mfx_codec_type_to_cstr(const mfxU32 fourcc, const mfxU32 type); mfxU32 cstr_to_mfx_version(const char* cstr); +std::string mfxstatus_to_string(int64_t err); std::string mfxstatus_to_string(mfxStatus err); std::ostream& operator<< (std::ostream& out, const mfxImplDescription& idesc); diff --git a/modules/gapi/test/streaming/gapi_streaming_tests.cpp b/modules/gapi/test/streaming/gapi_streaming_tests.cpp index 3b37e57f54e684dd23cc9a3ac306f2b1c81119bc..fefd3e07e175bc58e2d0dba62d03527896c5811c 100644 --- a/modules/gapi/test/streaming/gapi_streaming_tests.cpp +++ b/modules/gapi/test/streaming/gapi_streaming_tests.cpp @@ -26,6 +26,7 @@ #include #include +#include "streaming/onevpl/data_provider_defines.hpp" #ifdef HAVE_ONEVPL @@ -268,13 +269,47 @@ void checkPullOverload(const cv::Mat& ref, EXPECT_EQ(0., cv::norm(ref, out_mat, cv::NORM_INF)); } +#ifdef HAVE_ONEVPL struct StreamDataProvider : public cv::gapi::wip::onevpl::IDataProvider { StreamDataProvider(std::istream& in) : data_stream (in) { EXPECT_TRUE(in); } - size_t fetch_data(size_t out_data_size, void* out_data_buf) override { + mfx_codec_id_type get_mfx_codec_id() const override { + return MFX_CODEC_HEVC; + } + + bool fetch_bitstream_data(std::shared_ptr &out_bitstream) override { + if (empty()) { + return false; + } + + if (!out_bitstream) { + out_bitstream = std::make_shared(); + out_bitstream->MaxLength = 2000000; + out_bitstream->Data = (mfxU8 *)calloc(out_bitstream->MaxLength, sizeof(mfxU8)); + if(!out_bitstream->Data) { + throw std::runtime_error("Cannot allocate bitstream.Data bytes: " + + std::to_string(out_bitstream->MaxLength * sizeof(mfxU8))); + } + out_bitstream->CodecId = get_mfx_codec_id(); + } + + mfxU8 *p0 = out_bitstream->Data; + mfxU8 *p1 = out_bitstream->Data + out_bitstream->DataOffset; + EXPECT_FALSE(out_bitstream->DataOffset > out_bitstream->MaxLength - 1); + EXPECT_FALSE(out_bitstream->DataLength + out_bitstream->DataOffset > out_bitstream->MaxLength); + + std::copy_n(p1, out_bitstream->DataLength, p0); + + out_bitstream->DataOffset = 0; + out_bitstream->DataLength += static_cast(fetch_data(out_bitstream->MaxLength - out_bitstream->DataLength, + out_bitstream->Data + out_bitstream->DataLength)); + return out_bitstream->DataLength != 0; + } + + size_t fetch_data(size_t out_data_size, void* out_data_buf) { data_stream.read(reinterpret_cast(out_data_buf), out_data_size); return (size_t)data_stream.gcount(); } @@ -284,6 +319,7 @@ struct StreamDataProvider : public cv::gapi::wip::onevpl::IDataProvider { private: std::istream& data_stream; }; +#endif // HAVE_ONEVPL } // anonymous namespace TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat) @@ -2232,7 +2268,7 @@ TEST(OneVPL_Source, Init) std::stringstream stream(std::ios_base::in | std::ios_base::out | std::ios_base::binary); EXPECT_TRUE(stream.write(reinterpret_cast(const_cast(hevc_header)), sizeof(hevc_header))); - std::shared_ptr stream_data_provider = std::make_shared(stream); + auto stream_data_provider = std::make_shared(stream); cv::Ptr cap; bool cap_created = false; diff --git a/modules/gapi/test/streaming/gapi_streaming_vpl_core_test.cpp b/modules/gapi/test/streaming/gapi_streaming_vpl_core_test.cpp index 3ffdaf0fc9bf5311aabfcdfe3c0f7cc7c3c345b6..a84f92fafc1872c00f16dd3d553e07d4953f4a7d 100644 --- a/modules/gapi/test/streaming/gapi_streaming_vpl_core_test.cpp +++ b/modules/gapi/test/streaming/gapi_streaming_vpl_core_test.cpp @@ -45,12 +45,15 @@ namespace struct EmptyDataProvider : public cv::gapi::wip::onevpl::IDataProvider { - size_t fetch_data(size_t, void*) override { - return 0; - } bool empty() const override { return true; } + mfx_codec_id_type get_mfx_codec_id() const override { + return std::numeric_limits::max(); + } + bool fetch_bitstream_data(std::shared_ptr &) override { + return false; + } }; struct TestProcessingSession : public cv::gapi::wip::onevpl::EngineSession { diff --git a/modules/gapi/test/streaming/gapi_streaming_vpl_data_provider.cpp b/modules/gapi/test/streaming/gapi_streaming_vpl_data_provider.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4e797ae9ef12547062f7e500ac84c2aa99f1f0d3 --- /dev/null +++ b/modules/gapi/test/streaming/gapi_streaming_vpl_data_provider.cpp @@ -0,0 +1,304 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifdef HAVE_ONEVPL + +#include + +#include "../test_precomp.hpp" + +#include "../common/gapi_tests_common.hpp" +#include "streaming/onevpl/data_provider_dispatcher.hpp" +#include "streaming/onevpl/file_data_provider.hpp" +#include "streaming/onevpl/demux/async_mfp_demux_data_provider.hpp" +#include "streaming/onevpl/source_priv.hpp" + +namespace opencv_test +{ +namespace +{ +using source_t = std::string; +using dd_valid_t = bool; +using demux_valid_t = bool; +using dec_valid_t = bool; +using array_element_t = + std::tuple; +array_element_t files[] = { + array_element_t {"highgui/video/VID00003-20100701-2204.3GP", + false, true, false}, + array_element_t {"highgui/video/VID00003-20100701-2204.avi", + false, true, false}, + array_element_t {"highgui/video/VID00003-20100701-2204.mpg", + false, true, false}, + array_element_t {"highgui/video/VID00003-20100701-2204.wmv", + false, true, false}, + array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libaom-av1.mp4", + true, true, true}, + array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libvpx-vp9.mp4", + true, true, true}, + array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libx264.avi", + true, true, true}, + array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libx264.mp4", + true, true, true}, + array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4", + true, true, true}, + array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.mjpeg.mp4", + /* MFP cannot extract video MJPEG subtype from that */ + false, false, true}, + array_element_t {"highgui/video/big_buck_bunny.h264", + false, false, false}, + array_element_t {"highgui/video/big_buck_bunny.h265", + false, false, false} +}; + +class OneVPL_Source_MFPAsyncDispatcherTest : public ::testing::TestWithParam {}; +TEST_P(OneVPL_Source_MFPAsyncDispatcherTest, open_and_decode_file) +{ + using namespace cv::gapi::wip::onevpl; + + source_t path = findDataFile(std::get<0>(GetParam())); + dd_valid_t dd_result = std::get<1>(GetParam()); + dec_valid_t dec_result = std::get<3>(GetParam()); + + // open demux source & check format support + std::unique_ptr provider_ptr; + try { + provider_ptr.reset(new MFPAsyncDemuxDataProvider(path)); + } catch (...) { + EXPECT_FALSE(dd_result); + GTEST_SUCCEED(); + return; + } + EXPECT_TRUE(dd_result); + + // initialize MFX + mfxLoader mfx_handle = MFXLoad(); + + mfxConfig cfg_inst_0 = MFXCreateConfig(mfx_handle); + EXPECT_TRUE(cfg_inst_0); + mfxVariant mfx_param_0; + mfx_param_0.Type = MFX_VARIANT_TYPE_U32; + mfx_param_0.Data.U32 = provider_ptr->get_mfx_codec_id(); + EXPECT_EQ(MFXSetConfigFilterProperty(cfg_inst_0,(mfxU8 *)"mfxImplDescription.mfxDecoderDescription.decoder.CodecID", + mfx_param_0), MFX_ERR_NONE); + + // create MFX session + mfxSession mfx_session{}; + mfxStatus sts = MFXCreateSession(mfx_handle, 0, &mfx_session); + EXPECT_EQ(MFX_ERR_NONE, sts); + + // create proper bitstream + std::shared_ptr bitstream{}; + + // prepare dec params + mfxVideoParam mfxDecParams {}; + mfxDecParams.mfx.CodecId = mfx_param_0.Data.U32; + mfxDecParams.IOPattern = MFX_IOPATTERN_OUT_SYSTEM_MEMORY; + do { + bool fetched = provider_ptr->fetch_bitstream_data(bitstream); + if (dec_result) { + EXPECT_TRUE(fetched); + } + sts = MFXVideoDECODE_DecodeHeader(mfx_session, bitstream.get(), &mfxDecParams); + EXPECT_TRUE(MFX_ERR_NONE == sts || MFX_ERR_MORE_DATA == sts); + } while (sts == MFX_ERR_MORE_DATA && !provider_ptr->empty()); + + if (dec_result) { + EXPECT_EQ(MFX_ERR_NONE, sts); + } else { + EXPECT_FALSE(MFX_ERR_NONE == sts); + } + + MFXVideoDECODE_Close(mfx_session); + MFXClose(mfx_session); + MFXUnload(mfx_handle); +} + + +TEST_P(OneVPL_Source_MFPAsyncDispatcherTest, choose_dmux_provider) +{ + using namespace cv::gapi::wip::onevpl; + + + source_t path = findDataFile(std::get<0>(GetParam())); + dd_valid_t dd_result = std::get<1>(GetParam()); + + std::shared_ptr provider_ptr; + + // choose demux provider for empty CfgParams + try { + provider_ptr = DataProviderDispatcher::create(path); + } catch (...) { + EXPECT_FALSE(dd_result); + provider_ptr = DataProviderDispatcher::create(path, + { CfgParam::create( + "mfxImplDescription.mfxDecoderDescription.decoder.CodecID", + "MFX_CODEC_HEVC") /* Doesn't matter what codec for RAW here*/}); + EXPECT_TRUE(std::dynamic_pointer_cast(provider_ptr)); + GTEST_SUCCEED(); + return; + } + + EXPECT_TRUE(dd_result); + EXPECT_TRUE(std::dynamic_pointer_cast(provider_ptr)); +} + +INSTANTIATE_TEST_CASE_P(MFP_VPL_DecodeHeaderTests, OneVPL_Source_MFPAsyncDispatcherTest, + testing::ValuesIn(files)); + +namespace test { + struct IntrusiveAsyncDemuxDataProvider : + public cv::gapi::wip::onevpl::MFPAsyncDemuxDataProvider { + + using base_t = cv::gapi::wip::onevpl::MFPAsyncDemuxDataProvider; + using base_t::base_t; + + ~IntrusiveAsyncDemuxDataProvider() { + destroyed = true; + } + + STDMETHODIMP OnReadSample(HRESULT status, DWORD stream_index, + DWORD stream_flag, LONGLONG timestamp, + IMFSample *sample_ptr) override { + if (IntrusiveAsyncDemuxDataProvider::need_request_next) { + return base_t::OnReadSample(status, stream_index, stream_flag, + timestamp, sample_ptr); + } + return status; + } + + + // implementation methods + size_t produce_worker_data(void *key, + cv::gapi::wip::onevpl::ComPtrGuard &&buffer, + std::shared_ptr &&staging_stream) override { + return base_t::produce_worker_data(key, std::move(buffer), + std::move(staging_stream)); + } + + static bool need_request_next; + static bool destroyed; +}; + +bool IntrusiveAsyncDemuxDataProvider::need_request_next{}; +bool IntrusiveAsyncDemuxDataProvider::destroyed{}; +} // namespace test + +TEST(OneVPL_Source_MFPAsyncDemux, sync_flush) { + using namespace cv::gapi::wip::onevpl; + + source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4"); + test::IntrusiveAsyncDemuxDataProvider::need_request_next = false; + const size_t preprocessed_samples_count = 3; + { + test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count); + size_t produce_buffer_count = 199 * preprocessed_samples_count; + std::thread producer([&provider, produce_buffer_count]() { + size_t total_produced_count = 0; + for (size_t i = 0; i < produce_buffer_count; i ++) { + total_produced_count += provider.produce_worker_data( + reinterpret_cast(i), + createCOMPtrGuard(nullptr), + {}); + } + }); + producer.join(); + } + + EXPECT_EQ(test::IntrusiveAsyncDemuxDataProvider::destroyed, true); +} + +TEST(OneVPL_Source_MFPAsyncDemux, async_flush) { + using namespace cv::gapi::wip::onevpl; + + source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4"); + test::IntrusiveAsyncDemuxDataProvider::need_request_next = true; + const size_t preprocessed_samples_count = 999; + { + std::shared_ptr stream; + test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count); + EXPECT_TRUE(provider.fetch_bitstream_data(stream)); + EXPECT_TRUE(stream); + } + + EXPECT_EQ(test::IntrusiveAsyncDemuxDataProvider::destroyed, true); +} + +TEST(OneVPL_Source_MFPAsyncDemux, eof_async_detection) { + using namespace cv::gapi::wip::onevpl; + + source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4"); + test::IntrusiveAsyncDemuxDataProvider::need_request_next = false; + const size_t preprocessed_samples_count = 0; // do not ask sample at start + test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count); + std::promise start_consume_data; + std::future wait_consume_data = start_consume_data.get_future(); + + std::thread fetcher([&provider, &start_consume_data]() { + std::shared_ptr stream; + start_consume_data.set_value(); + EXPECT_FALSE(provider.fetch_bitstream_data(stream)); + EXPECT_FALSE(stream); + }); + + wait_consume_data.wait(); + std::this_thread::sleep_for(std::chrono::seconds(2)); // hope fetched has slept on condition + + test::IntrusiveAsyncDemuxDataProvider::need_request_next = true; + provider.OnReadSample(S_OK, 0, MF_SOURCE_READERF_ENDOFSTREAM, 0, nullptr); + fetcher.join(); +} + +TEST(OneVPL_Source_MFPAsyncDemux, produce_consume) { + using namespace cv::gapi::wip::onevpl; + + source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4"); + test::IntrusiveAsyncDemuxDataProvider::need_request_next = false; + const size_t preprocessed_samples_count = 3; + test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count); + + std::promise start_consume_data; + std::future wait_consume_data = start_consume_data.get_future(); + size_t produce_buffer_count = 199 * preprocessed_samples_count; + std::thread producer([&provider, &wait_consume_data, produce_buffer_count]() { + wait_consume_data.wait(); + size_t total_produced_count = 0; + for (size_t i = 0; i < produce_buffer_count; i ++) { + std::shared_ptr dummy_stream = + std::make_shared(); + dummy_stream->DataLength = static_cast(i); // control block + dummy_stream->DataOffset = static_cast(i); // control block + dummy_stream->Data = reinterpret_cast(i); + total_produced_count = provider.produce_worker_data( + dummy_stream->Data, + createCOMPtrGuard(nullptr), + std::move(dummy_stream)); + EXPECT_TRUE(total_produced_count <= produce_buffer_count); + } + }); + + std::thread consumer([&provider, &start_consume_data, produce_buffer_count]() { + + start_consume_data.set_value(); + size_t total_consumed_count = 0; + std::shared_ptr dummy_stream; + size_t stream_idx = 0; + do { + EXPECT_TRUE(provider.fetch_bitstream_data(dummy_stream)); + EXPECT_TRUE(dummy_stream); + EXPECT_EQ(dummy_stream->DataLength, stream_idx); + stream_idx ++; + total_consumed_count++; + } while (total_consumed_count != produce_buffer_count); + }); + + producer.join(); + consumer.join(); +} +} +} // namespace opencv_test + +#endif // HAVE_ONEVPL