Merge pull request #20709 from AsyaPronina:asyadev/integrate_gstreamer_source
authorAnastasiya(Asya) Pronina <anastasiya.pronina@intel.com>
Mon, 6 Dec 2021 16:54:21 +0000 (19:54 +0300)
committerGitHub <noreply@github.com>
Mon, 6 Dec 2021 16:54:21 +0000 (16:54 +0000)
Ported GStreamerSource to OpenCV

* Ported GStreamerSource to OpenCV

* Fixed CI failures

* Whitespaces

* Whitespaces + removed exception from destructors C4722

* Removed assert for Priv's getSS and descr_of

* Removed assert for pull

* Fixed last review comment

Co-authored-by: Pashchenkov Maxim <maxim.pashchenkov@intel.com>
19 files changed:
modules/gapi/CMakeLists.txt
modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp [new file with mode: 0644]
modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamersource.hpp [new file with mode: 0644]
modules/gapi/include/opencv2/gapi/streaming/onevpl/device_selector_interface.hpp
modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.cpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.hpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.cpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.hpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.cpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.hpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamerenv.cpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamerenv.hpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamerpipeline.cpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamerpipeline_priv.hpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamerptr.hpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamersource.cpp [new file with mode: 0644]
modules/gapi/src/streaming/gstreamer/gstreamersource_priv.hpp [new file with mode: 0644]
modules/gapi/test/streaming/gapi_gstreamer_pipeline_facade_int_tests.cpp [new file with mode: 0644]
modules/gapi/test/streaming/gapi_gstreamersource_tests.cpp [new file with mode: 0644]

index 79ae30a..855ce27 100644 (file)
@@ -47,12 +47,14 @@ file(GLOB gapi_ext_hdrs
     "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/infer/*.hpp"
     "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/ocl/*.hpp"
     "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/own/*.hpp"
+    "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/plaidml/*.hpp"
+    "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/python/*.hpp"
     "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/render/*.hpp"
     "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/s11n/*.hpp"
     "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/*.hpp"
-    "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/plaidml/*.hpp"
+    "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/gstreamer/*.hpp"
+    "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/onevpl/*.hpp"
     "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/util/*.hpp"
-    "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/python/*.hpp"
     )
 
 set(gapi_srcs
@@ -163,7 +165,7 @@ set(gapi_srcs
     src/backends/ie/bindings_ie.cpp
     src/backends/python/gpythonbackend.cpp
 
-    # Streaming source
+    # OpenVPL Streaming source
     src/streaming/onevpl/source.cpp
     src/streaming/onevpl/source_priv.cpp
     src/streaming/onevpl/file_data_provider.cpp
@@ -186,6 +188,14 @@ set(gapi_srcs
     src/streaming/onevpl/cfg_param_device_selector.cpp
     src/streaming/onevpl/device_selector_interface.cpp
 
+    # GStreamer Streaming source
+    src/streaming/gstreamer/gstreamer_pipeline_facade.cpp
+    src/streaming/gstreamer/gstreamerpipeline.cpp
+    src/streaming/gstreamer/gstreamersource.cpp
+    src/streaming/gstreamer/gstreamer_buffer_utils.cpp
+    src/streaming/gstreamer/gstreamer_media_adapter.cpp
+    src/streaming/gstreamer/gstreamerenv.cpp
+
     # Utils (ITT tracing)
     src/utils/itt.cpp
     )
@@ -283,6 +293,15 @@ if(HAVE_GAPI_ONEVPL)
   endif()
 endif()
 
+if(HAVE_GSTREAMER)
+  if(TARGET opencv_test_gapi)
+    ocv_target_compile_definitions(opencv_test_gapi PRIVATE -DHAVE_GSTREAMER)
+    ocv_target_link_libraries(opencv_test_gapi PRIVATE ocv.3rdparty.gstreamer)
+  endif()
+  ocv_target_compile_definitions(${the_module} PRIVATE -DHAVE_GSTREAMER)
+  ocv_target_link_libraries(${the_module} PRIVATE ocv.3rdparty.gstreamer)
+endif()
+
 if(WIN32)
   # Required for htonl/ntohl on Windows
   ocv_target_link_libraries(${the_module} PRIVATE wsock32 ws2_32)
diff --git a/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp b/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp
new file mode 100644 (file)
index 0000000..83afc99
--- /dev/null
@@ -0,0 +1,47 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP
+
+#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
+#include <opencv2/gapi/own/exports.hpp>
+
+#include <string>
+#include <unordered_map>
+#include <memory>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+class GAPI_EXPORTS GStreamerPipeline
+{
+public:
+    class Priv;
+
+    explicit GStreamerPipeline(const std::string& pipeline);
+    IStreamSource::Ptr getStreamingSource(const std::string& appsinkName,
+                                          const GStreamerSource::OutputType outputType =
+                                              GStreamerSource::OutputType::MAT);
+    virtual ~GStreamerPipeline();
+
+protected:
+    explicit GStreamerPipeline(std::unique_ptr<Priv> priv);
+
+    std::unique_ptr<Priv> m_priv;
+};
+
+} // namespace gst
+
+using GStreamerPipeline = gst::GStreamerPipeline;
+
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP
diff --git a/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamersource.hpp b/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamersource.hpp
new file mode 100644 (file)
index 0000000..b81bad3
--- /dev/null
@@ -0,0 +1,89 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP
+
+#include <opencv2/gapi/streaming/source.hpp>
+#include <opencv2/gapi/garg.hpp>
+
+#include <memory>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+/**
+ * @brief OpenCV's GStreamer streaming source.
+ *        Streams cv::Mat-s/cv::MediaFrame from passed GStreamer pipeline.
+ *
+ * This class implements IStreamSource interface.
+ *
+ * To create GStreamerSource instance you need to pass 'pipeline' and, optionally, 'outputType'
+ * arguments into constructor.
+ * 'pipeline' should represent GStreamer pipeline in form of textual description.
+ * Almost any custom pipeline is supported which can be successfully ran via gst-launch.
+ * The only two limitations are:
+ *      - there should be __one__ appsink element in the pipeline to pass data to OpenCV app.
+ *        Pipeline can actually contain many sink elements, but it must have one and only one
+ *        appsink among them.
+ *
+ *      - data passed to appsink should be video-frame in NV12 format.
+ *
+ * 'outputType' is used to select type of output data to produce: 'cv::MediaFrame' or 'cv::Mat'.
+ * To produce 'cv::MediaFrame'-s you need to pass 'GStreamerSource::OutputType::FRAME' and,
+ * correspondingly, 'GStreamerSource::OutputType::MAT' to produce 'cv::Mat'-s.
+ * Please note, that in the last case, output 'cv::Mat' will be of BGR format, internal conversion
+ * from NV12 GStreamer data will happen.
+ * Default value for 'outputType' is 'GStreamerSource::OutputType::MAT'.
+ *
+ * @note Stream sources are passed to G-API via shared pointers, so please use gapi::make_src<>
+ *       to create objects and ptr() to pass a GStreamerSource to cv::gin().
+ *
+ * @note You need to build OpenCV with GStreamer support to use this class.
+ */
+
+class GStreamerPipelineFacade;
+
+class GAPI_EXPORTS GStreamerSource : public IStreamSource
+{
+public:
+    class Priv;
+
+    // Indicates what type of data should be produced by GStreamerSource: cv::MediaFrame or cv::Mat
+    enum class OutputType {
+        FRAME,
+        MAT
+    };
+
+    GStreamerSource(const std::string& pipeline,
+                    const GStreamerSource::OutputType outputType =
+                        GStreamerSource::OutputType::MAT);
+    GStreamerSource(std::shared_ptr<GStreamerPipelineFacade> pipeline,
+                    const std::string& appsinkName,
+                    const GStreamerSource::OutputType outputType =
+                        GStreamerSource::OutputType::MAT);
+
+    bool pull(cv::gapi::wip::Data& data) override;
+    GMetaArg descr_of() const override;
+    ~GStreamerSource() override;
+
+protected:
+    explicit GStreamerSource(std::unique_ptr<Priv> priv);
+
+    std::unique_ptr<Priv> m_priv;
+};
+
+} // namespace gst
+
+using GStreamerSource = gst::GStreamerSource;
+
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP
index ca19849..04f8cae 100644 (file)
@@ -19,7 +19,7 @@ namespace gapi {
 namespace wip {
 namespace onevpl {
 
-enum class AccelType : uint8_t {
+enum class AccelType: uint8_t {
     HOST,
     DX11,
 
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.cpp b/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.cpp
new file mode 100644 (file)
index 0000000..2270131
--- /dev/null
@@ -0,0 +1,27 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "gstreamer_buffer_utils.hpp"
+#include "gstreamerptr.hpp"
+#include <opencv2/gapi/own/assert.hpp>
+
+#ifdef HAVE_GSTREAMER
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gstreamer_utils {
+
+void mapBufferToFrame(GstBuffer& buffer, GstVideoInfo& info, GstVideoFrame& frame,
+                      GstMapFlags mapFlags) {
+    bool mapped = gst_video_frame_map(&frame, &info, &buffer, mapFlags);
+    GAPI_Assert(mapped && "Failed to map GStreamer buffer to system memory as video-frame!");
+}
+
+} // namespace gstreamer_utils
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+#endif // HAVE_GSTREAMER
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.hpp b/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.hpp
new file mode 100644 (file)
index 0000000..3e6f908
--- /dev/null
@@ -0,0 +1,27 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP
+
+#ifdef HAVE_GSTREAMER
+#include <gst/gstbuffer.h>
+#include <gst/video/video-frame.h>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gstreamer_utils {
+
+void mapBufferToFrame(GstBuffer& buffer, GstVideoInfo& info, GstVideoFrame& frame,
+                      GstMapFlags map_flags);
+
+} // namespace gstreamer_utils
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+#endif // HAVE_GSTREAMER
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.cpp b/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.cpp
new file mode 100644 (file)
index 0000000..9019289
--- /dev/null
@@ -0,0 +1,122 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "gstreamer_media_adapter.hpp"
+#include "gstreamer_buffer_utils.hpp"
+
+#ifdef HAVE_GSTREAMER
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+GStreamerMediaAdapter::GStreamerMediaAdapter(const cv::GFrameDesc& frameDesc,
+                                             GstVideoInfo* videoInfo,
+                                             GstBuffer* buffer) :
+    m_frameDesc(frameDesc),
+    m_videoInfo(gst_video_info_copy(videoInfo)),
+    m_buffer(gst_buffer_ref(buffer)),
+    m_isMapped(false)
+{
+#if GST_VERSION_MINOR >= 10
+    // Check that GstBuffer has mono-view, so we can retrieve only one video-meta
+    GAPI_Assert((gst_buffer_get_flags(m_buffer) & GST_VIDEO_BUFFER_FLAG_MULTIPLE_VIEW) == 0);
+#endif // GST_VERSION_MINOR >= 10
+
+    GstVideoMeta* videoMeta = gst_buffer_get_video_meta(m_buffer);
+    if (videoMeta != nullptr) {
+        m_strides = { videoMeta->stride[0], videoMeta->stride[1] };
+        m_offsets = { videoMeta->offset[0], videoMeta->offset[1] };
+    } else {
+        m_strides = { GST_VIDEO_INFO_PLANE_STRIDE(m_videoInfo.get(), 0),
+                      GST_VIDEO_INFO_PLANE_STRIDE(m_videoInfo.get(), 1) };
+        m_offsets = { GST_VIDEO_INFO_PLANE_OFFSET(m_videoInfo.get(), 0),
+                      GST_VIDEO_INFO_PLANE_OFFSET(m_videoInfo.get(), 1) };
+    }
+}
+
+GStreamerMediaAdapter::~GStreamerMediaAdapter() {
+    if (m_isMapped.load(std::memory_order_acquire)) {
+        gst_video_frame_unmap(&m_videoFrame);
+        m_isMapped.store(false, std::memory_order_release);
+        m_mappedForWrite.store(false);
+    }
+}
+
+cv::GFrameDesc GStreamerMediaAdapter::meta() const {
+    return m_frameDesc;
+}
+
+cv::MediaFrame::View GStreamerMediaAdapter::access(cv::MediaFrame::Access access) {
+    GAPI_Assert(access == cv::MediaFrame::Access::R ||
+                access == cv::MediaFrame::Access::W);
+    static std::atomic<size_t> thread_counters { };
+    ++thread_counters;
+
+    // NOTE: Framework guarantees that there should be no parallel accesses to the frame
+    //       memory if is accessing for write.
+    if (access == cv::MediaFrame::Access::W && !m_mappedForWrite.load(std::memory_order_acquire)) {
+        GAPI_Assert(thread_counters > 1 &&
+                    "Multiple access to view during mapping for write detected!");
+        gst_video_frame_unmap(&m_videoFrame);
+        m_isMapped.store(false);
+    }
+
+    if (!m_isMapped.load(std::memory_order_acquire)) {
+
+        std::lock_guard<std::mutex> lock(m_mutex);
+
+        if(!m_isMapped.load(std::memory_order_relaxed)) {
+
+            GAPI_Assert(GST_VIDEO_INFO_N_PLANES(m_videoInfo.get()) == 2);
+            GAPI_Assert(GST_VIDEO_INFO_FORMAT(m_videoInfo.get()) == GST_VIDEO_FORMAT_NV12);
+
+            // TODO: Use RAII for map/unmap
+            if (access == cv::MediaFrame::Access::W) {
+                gstreamer_utils::mapBufferToFrame(*m_buffer, *m_videoInfo, m_videoFrame,
+                                                  GST_MAP_WRITE);
+                m_mappedForWrite.store(true, std::memory_order_release);
+            } else {
+                gstreamer_utils::mapBufferToFrame(*m_buffer, *m_videoInfo, m_videoFrame,
+                                                  GST_MAP_READ);
+            }
+
+            GAPI_Assert(GST_VIDEO_FRAME_PLANE_STRIDE(&m_videoFrame, 0) == m_strides[0]);
+            GAPI_Assert(GST_VIDEO_FRAME_PLANE_STRIDE(&m_videoFrame, 1) == m_strides[1]);
+            GAPI_Assert(GST_VIDEO_FRAME_PLANE_OFFSET(&m_videoFrame, 0) == m_offsets[0]);
+            GAPI_Assert(GST_VIDEO_FRAME_PLANE_OFFSET(&m_videoFrame, 1) == m_offsets[1]);
+
+            m_isMapped.store(true, std::memory_order_release);
+        }
+    }
+
+    cv::MediaFrame::View::Ptrs ps {
+        static_cast<uint8_t*>(GST_VIDEO_FRAME_PLANE_DATA(&m_videoFrame, 0)) + m_offsets[0], // Y-plane
+        static_cast<uint8_t*>(GST_VIDEO_FRAME_PLANE_DATA(&m_videoFrame, 0)) + m_offsets[1], // UV-plane
+        nullptr,
+        nullptr
+    };
+
+    cv::MediaFrame::View::Strides ss = {
+        static_cast<std::size_t>(m_strides[0]), // Y-plane stride
+        static_cast<std::size_t>(m_strides[1]), // UV-plane stride
+        0u,
+        0u
+    };
+
+    --thread_counters;
+    return cv::MediaFrame::View(std::move(ps), std::move(ss));
+}
+
+cv::util::any GStreamerMediaAdapter::blobParams() const {
+    GAPI_Assert(false && "No implementation for GStreamerMediaAdapter::blobParams()");
+}
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+#endif // HAVE_GSTREAMER
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.hpp b/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.hpp
new file mode 100644 (file)
index 0000000..4c5c137
--- /dev/null
@@ -0,0 +1,63 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP
+
+// #include <opencv2/gapi/garray.hpp>
+// #include <opencv2/gapi/streaming/meta.hpp>
+
+#include "gstreamerptr.hpp"
+#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
+
+#include <atomic>
+#include <mutex>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/gstbuffer.h>
+#include <gst/video/video-frame.h>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+class GStreamerMediaAdapter : public cv::MediaFrame::IAdapter {
+public:
+    explicit GStreamerMediaAdapter(const cv::GFrameDesc& frameDesc,
+                                   GstVideoInfo* videoInfo,
+                                   GstBuffer* buffer);
+
+    ~GStreamerMediaAdapter() override;
+
+    virtual cv::GFrameDesc meta() const override;
+
+    cv::MediaFrame::View access(cv::MediaFrame::Access access) override;
+
+    cv::util::any blobParams() const override;
+
+protected:
+    cv::GFrameDesc m_frameDesc;
+
+    GStreamerPtr<GstVideoInfo> m_videoInfo;
+    GStreamerPtr<GstBuffer> m_buffer;
+
+    std::vector<gint> m_strides;
+    std::vector<gsize> m_offsets;
+
+    GstVideoFrame m_videoFrame;
+
+    std::atomic<bool> m_isMapped;
+    std::atomic<bool> m_mappedForWrite;
+    std::mutex m_mutex;
+};
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+#endif // HAVE_GSTREAMER
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.cpp b/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.cpp
new file mode 100644 (file)
index 0000000..cd78253
--- /dev/null
@@ -0,0 +1,314 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "gstreamerenv.hpp"
+
+#include "gstreamer_pipeline_facade.hpp"
+
+#include <opencv2/gapi/streaming/meta.hpp>
+
+#include <logger.hpp>
+
+#include <opencv2/imgproc.hpp>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/app/gstappsink.h>
+#include <gst/gstbuffer.h>
+#include <gst/video/video-frame.h>
+#include <gst/pbutils/missing-plugins.h>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+GStreamerPipelineFacade::GStreamerPipelineFacade():
+    m_isPrerolled(false),
+    m_isPlaying(false)
+    { }
+
+GStreamerPipelineFacade::GStreamerPipelineFacade(const std::string& pipelineDesc):
+    GStreamerPipelineFacade()
+{
+    m_pipelineDesc = pipelineDesc;
+
+    // Initialize GStreamer library:
+    GStreamerEnv::init();
+
+    // Create GStreamer pipeline:
+    GError* error = NULL;
+    // gst_parse_launch [transfer floating]
+    m_pipeline = GST_ELEMENT(g_object_ref_sink(
+        gst_parse_launch(m_pipelineDesc.c_str(), &error)));
+
+    GStreamerPtr<GError> err(error);
+
+    if (err)
+    {
+        cv::util::throw_error(
+            std::runtime_error("Error in parsing pipeline: " + std::string(err->message)));
+    }
+}
+
+// The destructors are noexcept by default (since C++11).
+GStreamerPipelineFacade::~GStreamerPipelineFacade()
+{
+    // There is no mutex acquisition here, because we assume that no one will call this method
+    // directly.
+
+    // Destructor may be called on empty GStreamerSource object in case if
+    // exception is thrown during construction.
+    if (m_pipeline && GST_IS_ELEMENT(m_pipeline.get()))
+    {
+        try
+        {
+            setState(GST_STATE_NULL);
+        }
+        catch(...)
+        {
+            GAPI_LOG_WARNING(NULL, "Unable to stop pipeline in destructor.\n");
+        }
+
+        m_pipeline.release();
+    }
+}
+
+std::vector<GstElement*> GStreamerPipelineFacade::getElementsByFactoryName(
+    const std::string& factoryName)
+{
+    std::vector<GstElement*> outElements = getElements(
+        [&factoryName](GstElement* element) {
+            GStreamerPtr<gchar> name(
+                gst_object_get_name(GST_OBJECT(gst_element_get_factory(element))));
+            return name && (0 == strcmp(name, factoryName.c_str()));
+        });
+
+    return outElements;
+}
+
+GstElement* GStreamerPipelineFacade::getElementByName(const std::string& elementName)
+{
+    std::vector<GstElement*> outElements = getElements(
+    [&elementName](GstElement* element) {
+        GStreamerPtr<gchar> name(gst_element_get_name(element));
+        return name && (0 == strcmp(name, elementName.c_str()));
+    });
+
+    if (outElements.empty())
+    {
+        return nullptr;
+    }
+    else
+    {
+        GAPI_Assert(1ul == outElements.size());
+        return outElements[0];
+    }
+}
+
+void GStreamerPipelineFacade::completePreroll() {
+    // FIXME: If there are multiple sources in pipeline and one of them is live, then pipeline
+    //        will return GST_STATE_CHANGE_NO_PREROLL while pipeline pausing.
+    //        But appsink may not be connected to this live source and only to anothers,
+    //        not-live ones. So, it is not required to start the playback for appsink to complete
+    //        the preroll.
+    //        Starting of playback for the not-live sources before the first frame pull will lead
+    //        to loosing of some amount of frames and pulling of the first frame can return frame
+    //        which is far from the first.
+    //
+    //        Need to handle this case or forbid to mix multiples sources of different
+    //        categories(live, not-live) in the pipeline explicitly(assert).
+
+    if (!m_isPrerolled.load(std::memory_order_acquire))
+    {
+        std::lock_guard<std::mutex> lock(m_stateChangeMutex);
+
+        if(!m_isPrerolled.load(std::memory_order_relaxed))
+        {
+            PipelineState state = queryState();
+
+            // Only move forward in the pipeline's state machine
+            GAPI_Assert(state.current != GST_STATE_PLAYING);
+
+            GAPI_Assert(state.pending == GST_STATE_VOID_PENDING);
+            GstStateChangeReturn status = gst_element_set_state(m_pipeline, GST_STATE_PAUSED);
+            checkBusMessages();
+            if (status == GST_STATE_CHANGE_NO_PREROLL)
+            {
+                status = gst_element_set_state(m_pipeline, GST_STATE_PLAYING);
+                m_isPlaying.store(true);
+            }
+            verifyStateChange(status);
+
+            m_isPrerolled.store(true, std::memory_order_release);
+        }
+    }
+}
+
+void GStreamerPipelineFacade::play()
+{
+    if (!m_isPlaying.load(std::memory_order_acquire))
+    {
+        std::lock_guard<std::mutex> lock(m_stateChangeMutex);
+
+        if (!m_isPlaying.load(std::memory_order_relaxed))
+        {
+            setState(GST_STATE_PLAYING);
+            m_isPlaying.store(true, std::memory_order_release);
+            m_isPrerolled.store(true);
+        }
+    }
+}
+
+bool GStreamerPipelineFacade::isPlaying() {
+    return m_isPlaying.load();
+}
+
+std::vector<GstElement*> GStreamerPipelineFacade::getElements(
+    std::function<bool(GstElement*)> comparator)
+{
+    std::vector<GstElement*> outElements;
+    GStreamerPtr<GstIterator> it(gst_bin_iterate_elements(GST_BIN(m_pipeline.get())));
+    GValue value = G_VALUE_INIT;
+
+    GstIteratorResult status = gst_iterator_next(it, &value);
+    while (status != GST_ITERATOR_DONE && status != GST_ITERATOR_ERROR)
+    {
+        if (status == GST_ITERATOR_OK)
+        {
+            GstElement* element = GST_ELEMENT(g_value_get_object(&value));
+            if (comparator(element))
+            {
+                outElements.push_back(GST_ELEMENT(element));
+            }
+
+            g_value_unset(&value);
+        }
+        else if (status == GST_ITERATOR_RESYNC)
+        {
+            gst_iterator_resync(it);
+        }
+
+        status = gst_iterator_next(it, &value);
+    }
+
+    return outElements;
+}
+
+PipelineState GStreamerPipelineFacade::queryState()
+{
+    GAPI_Assert(m_pipeline && GST_IS_ELEMENT(m_pipeline.get()) &&
+                "GStreamer pipeline has not been created!");
+
+    PipelineState state;
+    GstClockTime timeout = 5 * GST_SECOND;
+    gst_element_get_state(m_pipeline, &state.current, &state.pending, timeout);
+
+    return state;
+}
+
+void GStreamerPipelineFacade::setState(GstState newState)
+{
+    PipelineState state = queryState();
+    GAPI_Assert(state.pending == GST_STATE_VOID_PENDING);
+
+    if (state.current != newState)
+    {
+        GstStateChangeReturn status = gst_element_set_state(m_pipeline, newState);
+        verifyStateChange(status);
+    }
+}
+
+void GStreamerPipelineFacade::verifyStateChange(GstStateChangeReturn status)
+{
+    if (status == GST_STATE_CHANGE_ASYNC)
+    {
+        // Wait for status update.
+        status = gst_element_get_state(m_pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
+    }
+
+    if (status == GST_STATE_CHANGE_FAILURE)
+    {
+        checkBusMessages();
+        PipelineState state = queryState();
+        const gchar* currentState = gst_element_state_get_name(state.current);
+        const gchar* pendingState = gst_element_state_get_name(state.pending);
+        cv::util::throw_error(
+            std::runtime_error(std::string("Unable to change pipeline state from ") +
+                               std::string(currentState) + std::string(" to ") +
+                               std::string(pendingState)));
+    }
+
+    checkBusMessages();
+}
+
+// Handles GStreamer bus messages.
+// For debugging purposes.
+void GStreamerPipelineFacade::checkBusMessages() const
+{
+    GStreamerPtr<GstBus> bus(gst_element_get_bus(m_pipeline));
+
+    while (gst_bus_have_pending(bus))
+    {
+        GStreamerPtr<GstMessage> msg(gst_bus_pop(bus));
+        if (!msg || !GST_IS_MESSAGE(msg.get()))
+        {
+            continue;
+        }
+
+        if (gst_is_missing_plugin_message(msg))
+        {
+            GStreamerPtr<gchar> descr(gst_missing_plugin_message_get_description(msg));
+            cv::util::throw_error(
+                std::runtime_error("Your GStreamer installation is missing a required plugin!"
+                                   "Details: " + std::string(descr)));
+        }
+        else
+        {
+            switch (GST_MESSAGE_TYPE(msg))
+            {
+                case GST_MESSAGE_STATE_CHANGED:
+                {
+                    if (GST_MESSAGE_SRC(msg.get()) == GST_OBJECT(m_pipeline.get()))
+                    {
+                        GstState oldState = GST_STATE_NULL,
+                                 newState = GST_STATE_NULL;
+                        gst_message_parse_state_changed(msg, &oldState, &newState, NULL);
+                        const gchar* oldStateName = gst_element_state_get_name(oldState);
+                        const gchar* newStateName = gst_element_state_get_name(newState);
+                        GAPI_LOG_INFO(NULL, "Pipeline state changed from " << oldStateName << " to "
+                                            << newStateName);
+                    }
+                    break;
+                }
+                case GST_MESSAGE_ERROR:
+                {
+                    GError* error = NULL;
+                    gchar*  debug = NULL;
+
+                    gst_message_parse_error(msg, &error, &debug); // transfer full for out args
+
+                    GStreamerPtr<GError> err(error);
+                    GStreamerPtr<gchar> deb(debug);
+
+                    GStreamerPtr<gchar> name(gst_element_get_name(GST_MESSAGE_SRC(msg.get())));
+                    GAPI_LOG_WARNING(NULL, "Embedded video playback halted; module " << name.get()
+                                           << " reported: " << err->message);
+                    GAPI_LOG_WARNING(NULL, "GStreamer debug: " << deb);
+
+                    break;
+                }
+                default:
+                    break;
+            }
+        }
+    }
+}
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+#endif // HAVE_GSTREAMER
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.hpp b/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.hpp
new file mode 100644 (file)
index 0000000..4ebd8f0
--- /dev/null
@@ -0,0 +1,89 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP
+
+#include "gstreamerptr.hpp"
+
+#include <string>
+#include <atomic>
+#include <mutex>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/gst.h>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+// GAPI_EXPORTS here is only for testing purposes.
+struct GAPI_EXPORTS PipelineState
+{
+    GstState current = GST_STATE_NULL;
+    GstState pending = GST_STATE_NULL;
+};
+
+// This class represents facade for pipeline GstElement and related functions.
+// Class restricts pipeline to only move forward in its state machine:
+// NULL -> READY -> PAUSED -> PLAYING.
+// There is no possibility to pause and resume pipeline, it can be only once played.
+// GAPI_EXPORTS here is only for testing purposes.
+class GAPI_EXPORTS GStreamerPipelineFacade
+{
+public:
+    // Strong exception guarantee.
+    explicit GStreamerPipelineFacade(const std::string& pipeline);
+
+    // The destructors are noexcept by default. (since C++11)
+    ~GStreamerPipelineFacade();
+
+    // Elements getters are not guarded with mutexes because elements order is not supposed
+    // to change in the pipeline.
+    std::vector<GstElement*> getElementsByFactoryName(const std::string& factoryName);
+    GstElement* getElementByName(const std::string& elementName);
+
+    // Pipeline state modifiers: can be called only once, MT-safe, mutually exclusive.
+    void completePreroll();
+    void play();
+
+    // Pipeline state checker: MT-safe.
+    bool isPlaying();
+
+private:
+    std::string m_pipelineDesc;
+
+    GStreamerPtr<GstElement> m_pipeline;
+
+    std::atomic<bool> m_isPrerolled;
+    std::atomic<bool> m_isPlaying;
+    // Mutex to guard state(paused, playing) from changes from multiple threads
+    std::mutex m_stateChangeMutex;
+
+private:
+    // This constructor is needed only to make public constructor as delegating constructor
+    // and allow it to throw exceptions.
+    GStreamerPipelineFacade();
+
+    // Elements getter is not guarded with mutex because elements order is not supposed
+    // to change in the pipeline.
+    std::vector<GstElement*> getElements(std::function<bool(GstElement*)> comparator);
+
+    // Getters, modifiers, verifiers are not MT-safe, because they are called from
+    // MT-safe mutually exclusive public functions.
+    PipelineState queryState();
+    void setState(GstState state);
+    void verifyStateChange(GstStateChangeReturn status);
+    void checkBusMessages() const;
+};
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+#endif // HAVE_GSTREAMER
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerenv.cpp b/modules/gapi/src/streaming/gstreamer/gstreamerenv.cpp
new file mode 100644 (file)
index 0000000..138589b
--- /dev/null
@@ -0,0 +1,90 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "gstreamerenv.hpp"
+#include "gstreamerptr.hpp"
+
+#ifdef HAVE_GSTREAMER
+#include <gst/gst.h>
+#endif // HAVE_GSTREAMER
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+#ifdef HAVE_GSTREAMER
+
+const GStreamerEnv& GStreamerEnv::init()
+{
+    static GStreamerEnv gInit;
+    return gInit;
+}
+
+GStreamerEnv::GStreamerEnv()
+{
+    if (!gst_is_initialized())
+    {
+        GError* error = NULL;
+        gst_init_check(NULL, NULL, &error);
+
+        GStreamerPtr<GError> err(error);
+
+        if (err)
+        {
+            cv::util::throw_error(
+                std::runtime_error(std::string("GStreamer initializaton error! Details: ") +
+                                   err->message));
+        }
+    }
+
+    // FIXME: GStreamer libs which have same MAJOR and MINOR versions are API and ABI compatible.
+    //        If GStreamer runtime MAJOR version differs from the version the application was
+    //        compiled with, will it fail on the linkage stage? If so, the code below isn't needed.
+    guint major, minor, micro, nano;
+    gst_version(&major, &minor, &micro, &nano);
+    if (GST_VERSION_MAJOR != major)
+    {
+        cv::util::throw_error(
+            std::runtime_error(std::string("Incompatible GStreamer version: compiled with ") +
+                               std::to_string(GST_VERSION_MAJOR) + '.' +
+                               std::to_string(GST_VERSION_MINOR) + '.' +
+                               std::to_string(GST_VERSION_MICRO) + '.' +
+                               std::to_string(GST_VERSION_NANO) +
+                               ", but runtime has " +
+                               std::to_string(major) + '.' + std::to_string(minor) + '.' +
+                               std::to_string(micro) + '.' + std::to_string(nano) + '.'));
+    }
+}
+
+GStreamerEnv::~GStreamerEnv()
+{
+    gst_deinit();
+}
+
+#else // HAVE_GSTREAMER
+
+const GStreamerEnv& GStreamerEnv::init()
+{
+    GAPI_Assert(false && "Built without GStreamer support!");
+}
+
+GStreamerEnv::GStreamerEnv()
+{
+    GAPI_Assert(false && "Built without GStreamer support!");
+}
+
+GStreamerEnv::~GStreamerEnv()
+{
+    // No need an assert here. The assert raise C4722 warning. Constructor have already got assert.
+}
+
+#endif // HAVE_GSTREAMER
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerenv.hpp b/modules/gapi/src/streaming/gstreamer/gstreamerenv.hpp
new file mode 100644 (file)
index 0000000..04f7f0c
--- /dev/null
@@ -0,0 +1,37 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+/*!
+ * \brief The GStreamerEnv class
+ * Initializes gstreamer once in the whole process
+ *
+ *
+ * @note You need to build OpenCV with GStreamer support to use this class.
+ */
+class GStreamerEnv
+{
+public:
+    static const GStreamerEnv& init();
+
+private:
+    GStreamerEnv();
+    ~GStreamerEnv();
+};
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerpipeline.cpp b/modules/gapi/src/streaming/gstreamer/gstreamerpipeline.cpp
new file mode 100644 (file)
index 0000000..6687076
--- /dev/null
@@ -0,0 +1,112 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "gstreamer_pipeline_facade.hpp"
+#include "gstreamerpipeline_priv.hpp"
+#include <opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/app/gstappsink.h>
+#endif // HAVE_GSTREAMER
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+#ifdef HAVE_GSTREAMER
+
+GStreamerPipeline::Priv::Priv(const std::string& pipelineDesc):
+    m_pipeline(std::make_shared<GStreamerPipelineFacade>(pipelineDesc))
+{
+    std::vector<GstElement*> appsinks =
+        m_pipeline->getElementsByFactoryName("appsink");
+
+    for (std::size_t i = 0ul; i < appsinks.size(); ++i)
+    {
+        auto* appsink = appsinks[i];
+        GAPI_Assert(appsink != nullptr);
+        GStreamerPtr<gchar> name(gst_element_get_name(appsink));
+        auto result = m_appsinkNamesToUse.insert({ name.get(), true /* free */ });
+        GAPI_Assert(std::get<1>(result) && "Each appsink name must be unique!");
+    }
+}
+
+IStreamSource::Ptr GStreamerPipeline::Priv::getStreamingSource(
+    const std::string& appsinkName, const GStreamerSource::OutputType outputType)
+{
+    auto appsinkNameIt = m_appsinkNamesToUse.find(appsinkName);
+    if (appsinkNameIt == m_appsinkNamesToUse.end())
+    {
+        cv::util::throw_error(std::logic_error(std::string("There is no appsink element in the "
+            "pipeline with the name '") + appsinkName + "'."));
+    }
+
+    if (!appsinkNameIt->second)
+    {
+        cv::util::throw_error(std::logic_error(std::string("appsink element with the name '") +
+            appsinkName + "' has been already used to create a GStreamerSource!"));
+    }
+
+    m_appsinkNamesToUse[appsinkName] = false /* not free */;
+
+    IStreamSource::Ptr src;
+    try {
+        src = cv::gapi::wip::make_src<cv::gapi::wip::GStreamerSource>(m_pipeline, appsinkName,
+                                                                      outputType);
+    }
+    catch(...) {
+        m_appsinkNamesToUse[appsinkName] = true; /* free */
+        cv::util::throw_error(std::runtime_error(std::string("Error during creation of ") +
+            "GStreamerSource on top of '" + appsinkName + "' appsink element!"));
+    }
+
+    return src;
+}
+
+GStreamerPipeline::Priv::~Priv() { }
+
+#else // HAVE_GSTREAMER
+
+GStreamerPipeline::Priv::Priv(const std::string&)
+{
+    GAPI_Assert(false && "Built without GStreamer support!");
+}
+
+IStreamSource::Ptr GStreamerPipeline::Priv::getStreamingSource(const std::string&,
+                                                               const GStreamerSource::OutputType)
+{
+    // No need an assert here. The assert raise C4702 warning. Constructor have already got assert.
+    return nullptr;
+}
+
+GStreamerPipeline::Priv::~Priv()
+{
+    // No need an assert here. The assert raise C4722 warning. Constructor have already got assert.
+}
+
+#endif // HAVE_GSTREAMER
+
+GStreamerPipeline::GStreamerPipeline(const std::string& pipelineDesc):
+    m_priv(new Priv(pipelineDesc)) { }
+
+IStreamSource::Ptr GStreamerPipeline::getStreamingSource(
+    const std::string& appsinkName, const GStreamerSource::OutputType outputType)
+{
+    return m_priv->getStreamingSource(appsinkName, outputType);
+}
+
+GStreamerPipeline::~GStreamerPipeline()
+{ }
+
+GStreamerPipeline::GStreamerPipeline(std::unique_ptr<Priv> priv):
+    m_priv(std::move(priv))
+{ }
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerpipeline_priv.hpp b/modules/gapi/src/streaming/gstreamer/gstreamerpipeline_priv.hpp
new file mode 100644 (file)
index 0000000..4b10d10
--- /dev/null
@@ -0,0 +1,58 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP
+
+#include <opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp>
+
+#include <string>
+#include <unordered_map>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+#ifdef HAVE_GSTREAMER
+
+class GStreamerPipeline::Priv
+{
+public:
+    explicit Priv(const std::string& pipeline);
+
+    IStreamSource::Ptr getStreamingSource(const std::string& appsinkName,
+                                          const GStreamerSource::OutputType outputType);
+
+    virtual ~Priv();
+
+protected:
+    std::shared_ptr<GStreamerPipelineFacade> m_pipeline;
+    std::unordered_map<std::string, bool> m_appsinkNamesToUse;
+};
+
+#else // HAVE_GSTREAMER
+
+class GStreamerPipeline::Priv
+{
+public:
+    explicit Priv(const std::string& pipeline);
+
+    IStreamSource::Ptr getStreamingSource(const std::string& appsinkName,
+                                          const GStreamerSource::OutputType outputType);
+
+    virtual ~Priv();
+};
+
+#endif // HAVE_GSTREAMER
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+
+
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerptr.hpp b/modules/gapi/src/streaming/gstreamer/gstreamerptr.hpp
new file mode 100644 (file)
index 0000000..6e6bba3
--- /dev/null
@@ -0,0 +1,177 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP
+
+#include <opencv2/gapi.hpp>
+
+#include <utility>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/gst.h>
+#include <gst/video/video-frame.h>
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+template<typename T> static inline void GStreamerPtrUnrefObject(T* ptr)
+{
+    if (ptr)
+    {
+        gst_object_unref(G_OBJECT(ptr));
+    }
+}
+
+template<typename T> static inline void GStreamerPtrRelease(T* ptr);
+
+template<> inline void GStreamerPtrRelease<GError>(GError* ptr)
+{
+    if (ptr)
+    {
+        g_error_free(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstElement>(GstElement* ptr)
+{
+    GStreamerPtrUnrefObject<GstElement>(ptr);
+}
+
+template<> inline void GStreamerPtrRelease<GstElementFactory>(GstElementFactory* ptr)
+{
+    GStreamerPtrUnrefObject<GstElementFactory>(ptr);
+}
+
+template<> inline void GStreamerPtrRelease<GstPad>(GstPad* ptr)
+{
+    GStreamerPtrUnrefObject<GstPad>(ptr);
+}
+
+template<> inline void GStreamerPtrRelease<GstBus>(GstBus* ptr)
+{
+    GStreamerPtrUnrefObject<GstBus>(ptr);
+}
+
+template<> inline void GStreamerPtrRelease<GstAllocator>(GstAllocator* ptr)
+{
+    GStreamerPtrUnrefObject<GstAllocator>(ptr);
+}
+
+template<> inline void GStreamerPtrRelease<GstVideoInfo>(GstVideoInfo* ptr)
+{
+    if (ptr)
+    {
+        gst_video_info_free(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstCaps>(GstCaps* ptr)
+{
+    if (ptr)
+    {
+        gst_caps_unref(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstMemory>(GstMemory* ptr)
+{
+    if (ptr)
+    {
+        gst_memory_unref(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstBuffer>(GstBuffer* ptr)
+{
+    if (ptr)
+    {
+        gst_buffer_unref(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstSample>(GstSample* ptr)
+{
+    if (ptr)
+    {
+        gst_sample_unref(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstMessage>(GstMessage* ptr)
+{
+    if (ptr)
+    {
+        gst_message_unref(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstIterator>(GstIterator* ptr)
+{
+    if (ptr)
+    {
+        gst_iterator_free(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<GstQuery>(GstQuery* ptr)
+{
+    if (ptr)
+    {
+        gst_query_unref(ptr);
+    }
+}
+
+template<> inline void GStreamerPtrRelease<char>(char* ptr)
+{
+    if (ptr)
+    {
+        g_free(ptr);
+    }
+}
+
+// NOTE: The main concept of this class is to be owner of some passed to it piece of memory.
+//       (be owner = free this memory or reduce reference count to it after use).
+//       More specifically, GStreamerPtr is designed to own memory returned from GStreamer/GLib
+//       functions, which are marked as [transfer full] in documentation.
+//       [transfer full] means that function fully transfers ownership of returned memory to the
+//       receiving piece of code.
+//
+//       Memory ownership and ownership transfer concept:
+// https://developer.gnome.org/programming-guidelines/stable/memory-management.html.en#g-clear-object
+
+// NOTE: GStreamerPtr can only own strong references, not floating ones.
+//       For floating references please call g_object_ref_sink(reference) before wrapping
+//       it with GStreamerPtr.
+//       See https://developer.gnome.org/gobject/stable/gobject-The-Base-Object-Type.html#floating-ref
+//       for floating references.
+// NOTE: GStreamerPtr doesn't support pointers to arrays, only pointers to single objects.
+template<typename T> class GStreamerPtr :
+    public std::unique_ptr<T, decltype(&GStreamerPtrRelease<T>)>
+{
+    using BaseClass = std::unique_ptr<T, decltype(&GStreamerPtrRelease<T>)>;
+
+public:
+    constexpr GStreamerPtr() noexcept : BaseClass(nullptr, GStreamerPtrRelease<T>) { }
+    constexpr GStreamerPtr(std::nullptr_t) noexcept : BaseClass(nullptr, GStreamerPtrRelease<T>) { }
+    explicit GStreamerPtr(typename BaseClass::pointer p) noexcept :
+        BaseClass(p, GStreamerPtrRelease<T>) { }
+
+    GStreamerPtr& operator=(T* p) noexcept { *this = std::move(GStreamerPtr<T>(p)); return *this; }
+
+    inline operator T*() noexcept { return this->get(); }
+    // There is no const correctness in GStreamer C API
+    inline operator /*const*/ T*() const noexcept { return (T*)this->get(); }
+};
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+#endif // HAVE_GSTREAMER
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamersource.cpp b/modules/gapi/src/streaming/gstreamer/gstreamersource.cpp
new file mode 100644 (file)
index 0000000..6611256
--- /dev/null
@@ -0,0 +1,383 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "gstreamer_buffer_utils.hpp"
+
+#include "gstreamer_media_adapter.hpp"
+
+#include "gstreamersource_priv.hpp"
+#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
+
+#include <opencv2/gapi/streaming/meta.hpp>
+
+#include <logger.hpp>
+
+#include <opencv2/imgproc.hpp>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/app/gstappsink.h>
+#include <gst/gstbuffer.h>
+#include <gst/video/video-frame.h>
+#endif // HAVE_GSTREAMER
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+#ifdef HAVE_GSTREAMER
+
+constexpr char NV12_CAPS_STRING[] =
+    "video/x-raw,format=NV12;video/x-raw(memory:DMABuf),format=NV12";
+
+namespace {
+GstPadProbeReturn appsinkQueryCallback(GstPad*, GstPadProbeInfo* info, gpointer)
+{
+    GstQuery *query = GST_PAD_PROBE_INFO_QUERY(info);
+
+    if (GST_QUERY_TYPE(query) != GST_QUERY_ALLOCATION)
+        return GST_PAD_PROBE_OK;
+
+    gst_query_add_allocation_meta(query, GST_VIDEO_META_API_TYPE, NULL);
+
+    return GST_PAD_PROBE_HANDLED;
+}
+} // anonymous namespace
+
+GStreamerSource::Priv::Priv(const std::string& pipelineDesc,
+                            const GStreamerSource::OutputType outputType) :
+    m_pipeline(std::make_shared<GStreamerPipelineFacade>(pipelineDesc)),
+    m_outputType(outputType)
+{
+    GAPI_Assert((m_outputType == GStreamerSource::OutputType::FRAME ||
+                 m_outputType == GStreamerSource::OutputType::MAT)
+                && "Unsupported output type for GStreamerSource!");
+
+    auto appsinks = m_pipeline->getElementsByFactoryName("appsink");
+    GAPI_Assert(1ul == appsinks.size() &&
+        "GStreamerSource can accept pipeline with only 1 appsink element inside!\n"
+        "Please, note, that amount of sink elements of other than appsink type is not limited.\n");
+
+    m_appsink = GST_ELEMENT(gst_object_ref(appsinks[0]));
+
+    configureAppsink();
+}
+
+GStreamerSource::Priv::Priv(std::shared_ptr<GStreamerPipelineFacade> pipeline,
+                            const std::string& appsinkName,
+                            const GStreamerSource::OutputType outputType) :
+    m_pipeline(pipeline),
+    m_outputType(outputType)
+{
+    GAPI_Assert((m_outputType == GStreamerSource::OutputType::FRAME ||
+                 m_outputType == GStreamerSource::OutputType::MAT)
+                && "Unsupported output type for GStreamerSource!");
+
+    m_appsink = (GST_ELEMENT(gst_object_ref(m_pipeline->getElementByName(appsinkName))));
+    configureAppsink();
+}
+
+bool GStreamerSource::Priv::pull(cv::gapi::wip::Data& data)
+{
+    bool result = false;
+    switch(m_outputType) {
+        case GStreamerSource::OutputType::FRAME: {
+            cv::MediaFrame frame;
+            result = retrieveFrame(frame);
+            if (result) {
+                data = frame;
+            }
+            break;
+        }
+        case GStreamerSource::OutputType::MAT: {
+            cv::Mat mat;
+            result = retrieveFrame(mat);
+            if (result) {
+                data = mat;
+            }
+            break;
+        }
+    }
+
+    if (result) {
+        data.meta[cv::gapi::streaming::meta_tag::timestamp] = computeTimestamp();
+        data.meta[cv::gapi::streaming::meta_tag::seq_id]    = m_frameId++;
+    }
+
+    return result;
+}
+
+GMetaArg GStreamerSource::Priv::descr_of() noexcept
+{
+    // Prepare frame metadata if it wasn't prepared yet.
+    prepareVideoMeta();
+
+    switch(m_outputType) {
+        case GStreamerSource::OutputType::FRAME: {
+            return GMetaArg { m_mediaFrameMeta };
+        }
+        case GStreamerSource::OutputType::MAT: {
+            return GMetaArg { m_matMeta };
+        }
+    }
+
+    return GMetaArg { };
+}
+
+void GStreamerSource::Priv::configureAppsink() {
+    // NOTE: appsink element should be configured before pipeline launch.
+    GAPI_Assert(!m_pipeline->isPlaying());
+
+    // TODO: is 1 single buffer really high enough?
+    gst_app_sink_set_max_buffers(GST_APP_SINK(m_appsink.get()), 1);
+
+    // Do not emit signals: all calls will be synchronous and blocking.
+    gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), FALSE);
+
+    GStreamerPtr<GstCaps> nv12Caps(gst_caps_from_string(NV12_CAPS_STRING));
+
+    GStreamerPtr<GstPad> appsinkPad(gst_element_get_static_pad(m_appsink, "sink"));
+    GStreamerPtr<GstCaps> peerCaps(gst_pad_peer_query_caps(appsinkPad, NULL));
+    if (!gst_caps_can_intersect(peerCaps, nv12Caps)) {
+        cv::util::throw_error(
+            std::logic_error("appsink element can only consume video-frame in NV12 format in "
+                             "GStreamerSource"));
+    }
+
+    gst_app_sink_set_caps(GST_APP_SINK(m_appsink.get()), nv12Caps);
+
+    gst_pad_add_probe(appsinkPad, GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, appsinkQueryCallback,
+                      NULL, NULL);
+}
+
+void GStreamerSource::Priv::prepareVideoMeta()
+{
+    if (!m_isMetaPrepared) {
+        m_pipeline->completePreroll();
+
+        GStreamerPtr<GstSample> prerollSample(
+#if GST_VERSION_MINOR >= 10
+            gst_app_sink_try_pull_preroll(GST_APP_SINK(m_appsink.get()), 5 * GST_SECOND));
+#else // GST_VERSION_MINOR < 10
+            // TODO: This function may cause hang with some pipelines, need to check whether these
+            // pipelines are really wrong or not?
+            gst_app_sink_pull_preroll(GST_APP_SINK(m_appsink.get())));
+#endif // GST_VERSION_MINOR >= 10
+        GAPI_Assert(prerollSample != nullptr);
+
+        GstCaps* prerollCaps(gst_sample_get_caps(prerollSample));
+        GAPI_Assert(prerollCaps != nullptr);
+
+        const GstStructure* structure = gst_caps_get_structure(prerollCaps, 0);
+
+        // Width and height held in GstCaps structure are format-independent width and height
+        // of the frame. So the actual height of the output buffer in NV12 format will be
+        // height * 3/2.
+        int width = 0;
+        int height = 0;
+        if (!gst_structure_get_int(structure, "width", &width) ||
+            !gst_structure_get_int(structure, "height", &height))
+        {
+            cv::util::throw_error(std::logic_error("Cannot query video width/height."));
+        }
+
+        switch(m_outputType) {
+            case GStreamerSource::OutputType::FRAME: {
+                // Construct metadata for media frame.
+                m_mediaFrameMeta = GFrameDesc { cv::MediaFormat::NV12, cv::Size(width, height) };
+                break;
+            }
+            case GStreamerSource::OutputType::MAT: {
+                // Construct metadata for BGR mat.
+                m_matMeta = GMatDesc { CV_8U, 3, cv::Size(width, height), false };
+                break;
+            }
+        }
+
+        // Fill GstVideoInfo structure to work further with GstVideoFrame class.
+        if (!gst_video_info_from_caps(&m_videoInfo, prerollCaps)) {
+            cv::util::throw_error(std::logic_error("preroll sample has invalid caps."));
+        }
+        GAPI_Assert(GST_VIDEO_INFO_N_PLANES(&m_videoInfo) == 2);
+        GAPI_Assert(GST_VIDEO_INFO_FORMAT(&m_videoInfo) == GST_VIDEO_FORMAT_NV12);
+
+        m_isMetaPrepared = true;
+    }
+}
+
+int64_t GStreamerSource::Priv::computeTimestamp()
+{
+    GAPI_Assert(m_buffer && "Pulled buffer is nullptr!");
+
+    int64_t timestamp { };
+
+    GstClockTime pts = GST_BUFFER_PTS(m_buffer);
+    if (GST_CLOCK_TIME_IS_VALID(pts)) {
+        timestamp = GST_TIME_AS_USECONDS(pts);
+    } else {
+        const auto now = std::chrono::system_clock::now();
+        const auto dur = std::chrono::duration_cast<std::chrono::microseconds>
+            (now.time_since_epoch());
+        timestamp = int64_t{dur.count()};
+    }
+
+    return timestamp;
+}
+
+bool GStreamerSource::Priv::pullBuffer()
+{
+    // Start the pipeline if it is not in the playing state yet
+    if (!m_isPipelinePlaying) {
+        m_pipeline->play();
+        m_isPipelinePlaying = true;
+    }
+
+    // Bail out if EOS
+    if (gst_app_sink_is_eos(GST_APP_SINK(m_appsink.get())))
+    {
+        return false;
+    }
+
+    m_sample = gst_app_sink_pull_sample(GST_APP_SINK(m_appsink.get()));
+
+    // TODO: GAPI_Assert because of already existed check for EOS?
+    if (m_sample == nullptr)
+    {
+        return false;
+    }
+
+    m_buffer = gst_sample_get_buffer(m_sample);
+    GAPI_Assert(m_buffer && "Grabbed sample has no buffer!");
+
+    return true;
+}
+
+bool GStreamerSource::Priv::retrieveFrame(cv::Mat& data)
+{
+    // Prepare metadata if it isn't prepared yet.
+    prepareVideoMeta();
+
+    bool result = pullBuffer();
+    if (!result)
+    {
+        return false;
+    }
+
+    // TODO: Use RAII for map/unmap
+    GstVideoFrame videoFrame;
+    gstreamer_utils::mapBufferToFrame(*m_buffer, m_videoInfo, videoFrame, GST_MAP_READ);
+
+    try
+    {
+        // m_matMeta holds width and height for 8U BGR frame, but actual
+        // frame m_buffer we request from GStreamer pipeline has 8U NV12 format.
+        // Constructing y and uv cv::Mat-s from such a m_buffer:
+        GAPI_Assert((uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 1) ==
+                    (uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) +
+                    GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 1));
+
+        cv::Mat y(m_matMeta.size, CV_8UC1,
+                  (uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) +
+                  GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 0),
+                  GST_VIDEO_FRAME_PLANE_STRIDE(&videoFrame, 0));
+        cv::Mat uv(m_matMeta.size / 2, CV_8UC2,
+                   (uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) +
+                   GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 1),
+                   GST_VIDEO_FRAME_PLANE_STRIDE(&videoFrame, 1));
+
+        cv::cvtColorTwoPlane(y, uv, data, cv::COLOR_YUV2BGR_NV12);
+    }
+    catch (...)
+    {
+        gst_video_frame_unmap(&videoFrame);
+        cv::util::throw_error(std::runtime_error("NV12 buffer conversion to BGR is failed!"));
+    }
+    gst_video_frame_unmap(&videoFrame);
+
+    return true;
+}
+
+bool GStreamerSource::Priv::retrieveFrame(cv::MediaFrame& data)
+{
+    // Prepare metadata if it isn't prepared yet.
+    prepareVideoMeta();
+
+    bool result = pullBuffer();
+    if (!result)
+    {
+        return false;
+    }
+
+    data = cv::MediaFrame::Create<GStreamerMediaAdapter>(m_mediaFrameMeta, &m_videoInfo,
+                                                         m_buffer);
+
+    return true;
+}
+
+GStreamerSource::Priv::~Priv() { }
+
+#else // HAVE_GSTREAMER
+
+GStreamerSource::Priv::Priv(const std::string&, const GStreamerSource::OutputType)
+{
+    GAPI_Assert(false && "Built without GStreamer support!");
+}
+
+GStreamerSource::Priv::Priv(std::shared_ptr<GStreamerPipelineFacade>, const std::string&,
+                            const GStreamerSource::OutputType)
+{
+    GAPI_Assert(false && "Built without GStreamer support!");
+}
+
+bool GStreamerSource::Priv::pull(cv::gapi::wip::Data&)
+{
+    // No need an assert here. Constructor have already got assert.
+    return false;
+}
+
+GMetaArg GStreamerSource::Priv::descr_of() const noexcept
+{
+    // No need an assert here. The assert raise C4702 warning. Constructor have already got assert.
+    return GMetaArg{};
+}
+
+GStreamerSource::Priv::~Priv()
+{
+    // No need an assert here. The assert raise C4722 warning. Constructor have already got assert.
+}
+
+#endif // HAVE_GSTREAMER
+
+GStreamerSource::GStreamerSource(const std::string& pipeline,
+                                 const GStreamerSource::OutputType outputType):
+    m_priv(new Priv(pipeline, outputType)) { }
+
+GStreamerSource::GStreamerSource(std::shared_ptr<GStreamerPipelineFacade> pipeline,
+                                 const std::string& appsinkName,
+                                 const GStreamerSource::OutputType outputType):
+    m_priv(new Priv(pipeline, appsinkName, outputType)) { }
+
+bool GStreamerSource::pull(cv::gapi::wip::Data& data)
+{
+    return m_priv->pull(data);
+}
+
+GMetaArg GStreamerSource::descr_of() const
+{
+    return m_priv->descr_of();
+}
+
+GStreamerSource::~GStreamerSource()
+{ }
+
+GStreamerSource::GStreamerSource(std::unique_ptr<Priv> priv):
+    m_priv(std::move(priv))
+{ }
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
diff --git a/modules/gapi/src/streaming/gstreamer/gstreamersource_priv.hpp b/modules/gapi/src/streaming/gstreamer/gstreamersource_priv.hpp
new file mode 100644 (file)
index 0000000..b0940c4
--- /dev/null
@@ -0,0 +1,94 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP
+#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP
+
+#include "gstreamerptr.hpp"
+#include "gstreamer_pipeline_facade.hpp"
+#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
+
+#include <string>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/gst.h>
+#include <gst/video/video-frame.h>
+#endif // HAVE_GSTREAMER
+
+namespace cv {
+namespace gapi {
+namespace wip {
+namespace gst {
+
+#ifdef HAVE_GSTREAMER
+
+class GStreamerSource::Priv
+{
+public:
+    Priv(const std::string& pipeline, const GStreamerSource::OutputType outputType);
+
+    Priv(std::shared_ptr<GStreamerPipelineFacade> pipeline, const std::string& appsinkName,
+         const GStreamerSource::OutputType outputType);
+
+    bool pull(cv::gapi::wip::Data& data);
+
+    // non-const in difference with GStreamerSource, because contains delayed meta initialization
+    GMetaArg descr_of() noexcept;
+
+    virtual ~Priv();
+
+protected:
+    // Shares:
+    std::shared_ptr<GStreamerPipelineFacade> m_pipeline;
+
+    // Owns:
+    GStreamerPtr<GstElement> m_appsink;
+    GStreamerPtr<GstSample> m_sample;
+    GstBuffer* m_buffer = nullptr; // Actual frame memory holder
+    GstVideoInfo m_videoInfo; // Information about Video frame
+
+    GStreamerSource::OutputType m_outputType = GStreamerSource::OutputType::MAT;
+
+    GMatDesc m_matMeta;
+    GFrameDesc m_mediaFrameMeta;
+
+    bool m_isMetaPrepared = false;
+    bool m_isPipelinePlaying = false;
+
+    int64_t m_frameId = 0L;
+
+protected:
+    void configureAppsink();
+    void prepareVideoMeta();
+
+    int64_t computeTimestamp();
+
+    bool pullBuffer();
+    bool retrieveFrame(cv::Mat& data);
+    bool retrieveFrame(cv::MediaFrame& data);
+};
+
+#else // HAVE_GSTREAMER
+
+class GStreamerSource::Priv
+{
+public:
+    Priv(const std::string& pipeline, const GStreamerSource::OutputType outputType);
+    Priv(std::shared_ptr<GStreamerPipelineFacade> pipeline, const std::string& appsinkName,
+         const GStreamerSource::OutputType outputType);
+    bool pull(cv::gapi::wip::Data& data);
+    GMetaArg descr_of() const noexcept;
+    virtual ~Priv();
+};
+
+#endif // HAVE_GSTREAMER
+
+} // namespace gst
+} // namespace wip
+} // namespace gapi
+} // namespace cv
+
+#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP
diff --git a/modules/gapi/test/streaming/gapi_gstreamer_pipeline_facade_int_tests.cpp b/modules/gapi/test/streaming/gapi_gstreamer_pipeline_facade_int_tests.cpp
new file mode 100644 (file)
index 0000000..7b809a8
--- /dev/null
@@ -0,0 +1,188 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "../test/common/gapi_tests_common.hpp"
+
+#include "../src/streaming/gstreamer/gstreamer_pipeline_facade.hpp"
+#include "../src/streaming/gstreamer/gstreamerptr.hpp"
+
+#include <opencv2/ts.hpp>
+
+#include <thread>
+
+#ifdef HAVE_GSTREAMER
+#include <gst/app/gstappsink.h>
+
+namespace opencv_test
+{
+
+TEST(GStreamerPipelineFacadeTest, GetElsByFactoryNameUnitTest)
+{
+    auto comparator = [](GstElement* element, const std::string& factoryName) {
+        cv::gapi::wip::gst::GStreamerPtr<gchar> name(
+            gst_object_get_name(GST_OBJECT(gst_element_get_factory(element))));
+        return name && (0 == strcmp(name, factoryName.c_str()));
+    };
+
+    cv::gapi::wip::gst::GStreamerPipelineFacade
+        pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink1 "
+                       "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink2");
+
+    auto videotestsrcs = pipelineFacade.getElementsByFactoryName("videotestsrc");
+    EXPECT_EQ(2u, videotestsrcs.size());
+    for (auto&& src : videotestsrcs) {
+        EXPECT_TRUE(comparator(src, "videotestsrc"));
+    }
+
+    auto appsinks = pipelineFacade.getElementsByFactoryName("appsink");
+    EXPECT_EQ(2u, appsinks.size());
+    for (auto&& sink : appsinks) {
+        EXPECT_TRUE(comparator(sink, "appsink"));
+    }
+}
+
+TEST(GStreamerPipelineFacadeTest, GetElByNameUnitTest)
+{
+    auto comparator = [](GstElement* element, const std::string& elementName) {
+        cv::gapi::wip::gst::GStreamerPtr<gchar> name(gst_element_get_name(element));
+        return name && (0 == strcmp(name, elementName.c_str()));
+    };
+
+    cv::gapi::wip::gst::GStreamerPipelineFacade
+        pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink1 "
+                       "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink2");
+
+    auto appsink1 = pipelineFacade.getElementByName("sink1");
+    GAPI_Assert(appsink1 != nullptr);
+    EXPECT_TRUE(comparator(appsink1, "sink1"));
+    auto appsink2 = pipelineFacade.getElementByName("sink2");
+    GAPI_Assert(appsink2 != nullptr);
+    EXPECT_TRUE(comparator(appsink2, "sink2"));
+}
+
+TEST(GStreamerPipelineFacadeTest, CompletePrerollUnitTest)
+{
+    cv::gapi::wip::gst::GStreamerPipelineFacade
+        pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink1 "
+                       "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink2");
+
+    auto appsink = pipelineFacade.getElementByName("sink1");
+    pipelineFacade.completePreroll();
+
+    cv::gapi::wip::gst::GStreamerPtr<GstSample> prerollSample(
+#if GST_VERSION_MINOR >= 10
+            gst_app_sink_try_pull_preroll(GST_APP_SINK(appsink), 5 * GST_SECOND)
+#else // GST_VERSION_MINOR < 10
+            // TODO: This function may cause hang with some pipelines, need to check whether these
+            // pipelines are really wrong or not?
+            gst_app_sink_pull_preroll(GST_APP_SINK(appsink))
+#endif // GST_VERSION_MINOR >= 10
+                                                             );
+    GAPI_Assert(prerollSample != nullptr);
+}
+
+TEST(GStreamerPipelineFacadeTest, PlayUnitTest)
+{
+    cv::gapi::wip::gst::GStreamerPipelineFacade
+        pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink1 "
+                       "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink2");
+
+    auto appsink = pipelineFacade.getElementByName("sink2");
+
+    pipelineFacade.play();
+
+    cv::gapi::wip::gst::PipelineState state;
+    GstStateChangeReturn status =
+        gst_element_get_state(appsink, &state.current, &state.pending, 5 * GST_SECOND);
+    EXPECT_EQ(GST_STATE_CHANGE_SUCCESS, status);
+    EXPECT_EQ(GST_STATE_PLAYING, state.current);
+    EXPECT_EQ(GST_STATE_VOID_PENDING, state.pending);
+}
+
+TEST(GStreamerPipelineFacadeTest, IsPlayingUnitTest)
+{
+    cv::gapi::wip::gst::GStreamerPipelineFacade
+        pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink1 "
+                       "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink2");
+
+    EXPECT_EQ(false, pipelineFacade.isPlaying());
+    pipelineFacade.play();
+    EXPECT_EQ(true, pipelineFacade.isPlaying());
+}
+
+TEST(GStreamerPipelineFacadeTest, MTSafetyUnitTest)
+{
+    cv::gapi::wip::gst::GStreamerPipelineFacade
+        pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink1 "
+                       "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                       "videorate ! videoscale ! "
+                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                       "appsink name=sink2");
+
+    auto prerollRoutine = [&pipelineFacade](){ pipelineFacade.completePreroll(); };
+    auto playRoutine = [&pipelineFacade](){ pipelineFacade.play(); };
+    auto isPlayingRoutine = [&pipelineFacade](){ pipelineFacade.isPlaying(); };
+
+    using f = std::function<void()>;
+
+    auto routinesLauncher = [](const f& r1, const f& r2, const f& r3) {
+        std::vector<f> routines { r1, r2, r3 };
+        std::vector<std::thread> threads { };
+
+        for (auto&& r : routines) {
+            threads.emplace_back(r);
+        }
+
+        for (auto&& t : threads) {
+            t.join();
+        }
+    };
+
+    routinesLauncher(prerollRoutine, playRoutine, isPlayingRoutine);
+    routinesLauncher(prerollRoutine, isPlayingRoutine, playRoutine);
+    routinesLauncher(isPlayingRoutine, prerollRoutine, playRoutine);
+    routinesLauncher(playRoutine, prerollRoutine, isPlayingRoutine);
+    routinesLauncher(playRoutine, isPlayingRoutine, prerollRoutine);
+    routinesLauncher(isPlayingRoutine, playRoutine, prerollRoutine);
+
+    EXPECT_TRUE(true);
+}
+} // namespace opencv_test
+
+#endif // HAVE_GSTREAMER
diff --git a/modules/gapi/test/streaming/gapi_gstreamersource_tests.cpp b/modules/gapi/test/streaming/gapi_gstreamersource_tests.cpp
new file mode 100644 (file)
index 0000000..0478d2d
--- /dev/null
@@ -0,0 +1,401 @@
+// This file is part of OpenCV project.
+// It is subject to the license terms in the LICENSE file found in the top-level directory
+// of this distribution and at http://opencv.org/license.html.
+//
+// Copyright (C) 2021 Intel Corporation
+
+#include "../test/common/gapi_tests_common.hpp"
+
+#include <opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp>
+#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
+#include <opencv2/gapi/core.hpp>
+#include <opencv2/gapi/cpu/core.hpp>
+#include <opencv2/gapi/streaming/meta.hpp>
+#include <opencv2/gapi/streaming/format.hpp>
+
+#include <opencv2/gapi/gkernel.hpp>
+#include <opencv2/gapi/cpu/gcpukernel.hpp>
+#include <opencv2/gapi/gcomputation.hpp>
+
+#include <opencv2/ts.hpp>
+
+#include <regex>
+
+#ifdef HAVE_GSTREAMER
+
+namespace opencv_test
+{
+
+struct GStreamerSourceTest : public TestWithParam<std::tuple<std::string, cv::Size, std::size_t>>
+{ };
+
+TEST_P(GStreamerSourceTest, AccuracyTest)
+{
+    std::string pipeline;
+    cv::Size expectedFrameSize;
+    std::size_t streamLength { };
+    std::tie(pipeline, expectedFrameSize, streamLength) = GetParam();
+
+    // Graph declaration:
+    cv::GMat in;
+    auto out = cv::gapi::copy(in);
+    cv::GComputation c(cv::GIn(in), cv::GOut(out));
+
+    // Graph compilation for streaming mode:
+    auto ccomp = c.compileStreaming();
+
+    EXPECT_TRUE(ccomp);
+    EXPECT_FALSE(ccomp.running());
+
+    // GStreamer streaming source configuration:
+    ccomp.setSource<cv::gapi::wip::GStreamerSource>(pipeline);
+
+    // Start of streaming:
+    ccomp.start();
+    EXPECT_TRUE(ccomp.running());
+
+    // Streaming - pulling of frames until the end:
+    cv::Mat in_mat_gapi;
+
+    EXPECT_TRUE(ccomp.pull(cv::gout(in_mat_gapi)));
+    EXPECT_TRUE(!in_mat_gapi.empty());
+    EXPECT_EQ(expectedFrameSize, in_mat_gapi.size());
+    EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
+
+    std::size_t framesCount = 1UL;
+    while (ccomp.pull(cv::gout(in_mat_gapi))) {
+        EXPECT_TRUE(!in_mat_gapi.empty());
+        EXPECT_EQ(expectedFrameSize, in_mat_gapi.size());
+        EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
+
+        framesCount++;
+    }
+
+    EXPECT_FALSE(ccomp.running());
+    ccomp.stop();
+
+    EXPECT_FALSE(ccomp.running());
+
+    EXPECT_EQ(streamLength, framesCount);
+}
+
+TEST_P(GStreamerSourceTest, TimestampsTest)
+{
+    std::string pipeline;
+    std::size_t streamLength { };
+    std::tie(pipeline, std::ignore, streamLength) = GetParam();
+
+    // Graph declaration:
+    cv::GMat in;
+    cv::GMat copied = cv::gapi::copy(in);
+    cv::GOpaque<int64_t> outId = cv::gapi::streaming::seq_id(copied);
+    cv::GOpaque<int64_t> outTs = cv::gapi::streaming::timestamp(copied);
+    cv::GComputation c(cv::GIn(in), cv::GOut(outId, outTs));
+
+    // Graph compilation for streaming mode:
+    auto ccomp = c.compileStreaming();
+
+    EXPECT_TRUE(ccomp);
+    EXPECT_FALSE(ccomp.running());
+
+    // GStreamer streaming source configuration:
+    ccomp.setSource<cv::gapi::wip::GStreamerSource>(pipeline);
+
+    // Start of streaming:
+    ccomp.start();
+    EXPECT_TRUE(ccomp.running());
+
+    // Streaming - pulling of frames until the end:
+    int64_t seqId;
+    int64_t timestamp;
+
+    std::vector<int64_t> allSeqIds;
+    std::vector<int64_t> allTimestamps;
+
+    while (ccomp.pull(cv::gout(seqId, timestamp))) {
+        allSeqIds.push_back(seqId);
+        allTimestamps.push_back(timestamp);
+    }
+
+    EXPECT_FALSE(ccomp.running());
+    ccomp.stop();
+
+    EXPECT_FALSE(ccomp.running());
+
+    EXPECT_EQ(0L, allSeqIds.front());
+    EXPECT_EQ(int64_t(streamLength) - 1, allSeqIds.back());
+    EXPECT_EQ(streamLength, allSeqIds.size());
+    EXPECT_TRUE(std::is_sorted(allSeqIds.begin(), allSeqIds.end()));
+    EXPECT_EQ(allSeqIds.size(), std::set<int64_t>(allSeqIds.begin(), allSeqIds.end()).size());
+
+    EXPECT_EQ(streamLength, allTimestamps.size());
+    EXPECT_TRUE(std::is_sorted(allTimestamps.begin(), allTimestamps.end()));
+}
+
+G_TYPED_KERNEL(GGstFrameCopyToNV12, <std::tuple<cv::GMat,cv::GMat>(GFrame)>,
+    "org.opencv.test.gstframe_copy_to_nv12")
+{
+    static std::tuple<GMatDesc, GMatDesc> outMeta(GFrameDesc desc) {
+        GMatDesc y  { CV_8U, 1, desc.size, false };
+        GMatDesc uv { CV_8U, 2, desc.size / 2, false };
+
+        return std::make_tuple(y, uv);
+    }
+};
+
+GAPI_OCV_KERNEL(GOCVGstFrameCopyToNV12, GGstFrameCopyToNV12)
+{
+    static void run(const cv::MediaFrame& in, cv::Mat& y, cv::Mat& uv)
+    {
+        auto view = in.access(cv::MediaFrame::Access::R);
+        cv::Mat ly(y.size(), y.type(), view.ptr[0], view.stride[0]);
+        cv::Mat luv(uv.size(), uv.type(), view.ptr[1], view.stride[1]);
+
+        ly.copyTo(y);
+        luv.copyTo(uv);
+    }
+};
+
+TEST_P(GStreamerSourceTest, GFrameTest)
+{
+    std::string pipeline;
+    cv::Size expectedFrameSize;
+    std::size_t streamLength { };
+    std::tie(pipeline, expectedFrameSize, streamLength) = GetParam();
+
+    // Graph declaration:
+    cv::GFrame in;
+    cv::GMat copiedY, copiedUV;
+    std::tie(copiedY, copiedUV) = GGstFrameCopyToNV12::on(in);
+    cv::GComputation c(cv::GIn(in), cv::GOut(copiedY, copiedUV));
+
+    // Graph compilation for streaming mode:
+    auto ccomp = c.compileStreaming(cv::compile_args(cv::gapi::kernels<GOCVGstFrameCopyToNV12>()));
+
+    EXPECT_TRUE(ccomp);
+    EXPECT_FALSE(ccomp.running());
+
+    // GStreamer streaming source configuration:
+    ccomp.setSource<cv::gapi::wip::GStreamerSource>
+        (pipeline, cv::gapi::wip::GStreamerSource::OutputType::FRAME);
+
+    // Start of streaming:
+    ccomp.start();
+    EXPECT_TRUE(ccomp.running());
+
+    // Streaming - pulling of frames until the end:
+    cv::Mat y_mat, uv_mat;
+
+    EXPECT_TRUE(ccomp.pull(cv::gout(y_mat, uv_mat)));
+    EXPECT_TRUE(!y_mat.empty());
+    EXPECT_TRUE(!uv_mat.empty());
+
+    cv::Size expectedYSize = expectedFrameSize;
+    cv::Size expectedUVSize = expectedFrameSize / 2;
+
+    EXPECT_EQ(expectedYSize, y_mat.size());
+    EXPECT_EQ(expectedUVSize, uv_mat.size());
+
+    EXPECT_EQ(CV_8UC1, y_mat.type());
+    EXPECT_EQ(CV_8UC2, uv_mat.type());
+
+    std::size_t framesCount = 1UL;
+    while (ccomp.pull(cv::gout(y_mat, uv_mat))) {
+        EXPECT_TRUE(!y_mat.empty());
+        EXPECT_TRUE(!uv_mat.empty());
+
+        EXPECT_EQ(expectedYSize, y_mat.size());
+        EXPECT_EQ(expectedUVSize, uv_mat.size());
+
+        EXPECT_EQ(CV_8UC1, y_mat.type());
+        EXPECT_EQ(CV_8UC2, uv_mat.type());
+
+        framesCount++;
+    }
+
+    EXPECT_FALSE(ccomp.running());
+    ccomp.stop();
+
+    EXPECT_FALSE(ccomp.running());
+
+    EXPECT_EQ(streamLength, framesCount);
+}
+
+// FIXME: Need to launch with sudo. May be infrastructure problems.
+// TODO: It is needed to add tests for streaming from native KMB camera: kmbcamsrc
+//       GStreamer element.
+INSTANTIATE_TEST_CASE_P(CameraEmulatingPipeline, GStreamerSourceTest,
+                        Combine(Values("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                                       "videorate ! videoscale ! "
+                                       "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                                       "appsink"),
+                                Values(cv::Size(1920, 1080)),
+                                Values(10UL)));
+
+INSTANTIATE_TEST_CASE_P(FileEmulatingPipeline, GStreamerSourceTest,
+                        Combine(Values("videotestsrc pattern=colors num-buffers=10 ! "
+                                       "videorate ! videoscale ! "
+                                       "video/x-raw,width=640,height=420,framerate=3/1 ! "
+                                       "appsink"),
+                                Values(cv::Size(640, 420)),
+                                Values(10UL)));
+
+INSTANTIATE_TEST_CASE_P(MultipleLiveSources, GStreamerSourceTest,
+                        Combine(Values("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                                       "videoscale ! video/x-raw,width=1280,height=720 ! appsink "
+                                       "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                                       "fakesink"),
+                                Values(cv::Size(1280, 720)),
+                                Values(10UL)));
+
+INSTANTIATE_TEST_CASE_P(MultipleNotLiveSources, GStreamerSourceTest,
+                        Combine(Values("videotestsrc pattern=colors num-buffers=10 ! "
+                                       "videoscale ! video/x-raw,width=1280,height=720 ! appsink "
+                                       "videotestsrc pattern=colors num-buffers=10 ! "
+                                       "fakesink"),
+                                Values(cv::Size(1280, 720)),
+                                Values(10UL)));
+
+
+TEST(GStreamerMultiSourceSmokeTest, Test)
+{
+    // Graph declaration:
+    cv::GMat in1, in2;
+    auto out = cv::gapi::add(in1, in2);
+    cv::GComputation c(cv::GIn(in1, in2), cv::GOut(out));
+
+    // Graph compilation for streaming mode:
+    auto ccomp = c.compileStreaming();
+
+    EXPECT_TRUE(ccomp);
+    EXPECT_FALSE(ccomp.running());
+
+    cv::gapi::wip::GStreamerPipeline
+        pipeline("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                 "videorate ! videoscale ! "
+                 "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                 "appsink name=sink1 "
+                 "videotestsrc is-live=true pattern=colors num-buffers=10 ! "
+                 "videorate ! videoscale ! "
+                 "video/x-raw,width=1920,height=1080,framerate=3/1 ! "
+                 "appsink name=sink2");
+
+    // GStreamer streaming sources configuration:
+    auto src1 = pipeline.getStreamingSource("sink1");
+    auto src2 = pipeline.getStreamingSource("sink2");
+
+    ccomp.setSource(cv::gin(src1, src2));
+
+    // Start of streaming:
+    ccomp.start();
+    EXPECT_TRUE(ccomp.running());
+
+    // Streaming - pulling of frames until the end:
+    cv::Mat in_mat_gapi;
+
+    EXPECT_TRUE(ccomp.pull(cv::gout(in_mat_gapi)));
+    EXPECT_TRUE(!in_mat_gapi.empty());
+    EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
+
+    while (ccomp.pull(cv::gout(in_mat_gapi))) {
+        EXPECT_TRUE(!in_mat_gapi.empty());
+        EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
+    }
+
+    EXPECT_FALSE(ccomp.running());
+    ccomp.stop();
+
+    EXPECT_FALSE(ccomp.running());
+}
+
+struct GStreamerMultiSourceTest :
+    public TestWithParam<std::tuple<cv::GComputation, cv::gapi::wip::GStreamerSource::OutputType>>
+{ };
+
+TEST_P(GStreamerMultiSourceTest, ImageDataTest)
+{
+    std::string pathToLeftIm = findDataFile("cv/stereomatching/datasets/tsukuba/im6.png");
+    std::string pathToRightIm = findDataFile("cv/stereomatching/datasets/tsukuba/im2.png");
+
+    std::string pipelineToReadImage("filesrc location=LOC ! pngdec ! videoconvert ! "
+        "videoscale ! video/x-raw,format=NV12 ! appsink");
+
+    cv::gapi::wip::GStreamerSource leftImageProvider(
+        std::regex_replace(pipelineToReadImage, std::regex("LOC"), pathToLeftIm));
+    cv::gapi::wip::GStreamerSource rightImageProvider(
+        std::regex_replace(pipelineToReadImage, std::regex("LOC"), pathToRightIm));
+
+    cv::gapi::wip::Data leftImData, rightImData;
+    leftImageProvider.pull(leftImData);
+    rightImageProvider.pull(rightImData);
+
+    cv::Mat leftRefMat =  cv::util::get<cv::Mat>(leftImData);
+    cv::Mat rightRefMat = cv::util::get<cv::Mat>(rightImData);
+
+    // Retrieve test parameters:
+    std::tuple<cv::GComputation, cv::gapi::wip::GStreamerSource::OutputType> params = GetParam();
+    cv::GComputation extractImage = std::move(std::get<0>(params));
+    cv::gapi::wip::GStreamerSource::OutputType outputType = std::get<1>(params);
+
+    // Graph compilation for streaming mode:
+    auto compiled =
+        extractImage.compileStreaming();
+
+    EXPECT_TRUE(compiled);
+    EXPECT_FALSE(compiled.running());
+
+    cv::gapi::wip::GStreamerPipeline
+        pipeline(std::string("multifilesrc location=" + pathToLeftIm + " index=0 loop=true ! "
+                 "pngdec ! videoconvert ! videoscale ! video/x-raw,format=NV12 ! "
+                 "appsink name=sink1 ") +
+                 std::string("multifilesrc location=" + pathToRightIm + " index=0 loop=true ! "
+                 "pngdec ! videoconvert ! videoscale ! video/x-raw,format=NV12 ! "
+                 "appsink name=sink2"));
+
+    // GStreamer streaming sources configuration:
+    auto src1 = pipeline.getStreamingSource("sink1", outputType);
+    auto src2 = pipeline.getStreamingSource("sink2", outputType);
+
+    compiled.setSource(cv::gin(src1, src2));
+
+    // Start of streaming:
+    compiled.start();
+    EXPECT_TRUE(compiled.running());
+
+    // Streaming - pulling of frames:
+    cv::Mat in_mat1, in_mat2;
+
+    std::size_t counter { }, limit { 10 };
+    while(compiled.pull(cv::gout(in_mat1, in_mat2)) && (counter < limit)) {
+        EXPECT_EQ(0, cv::norm(in_mat1, leftRefMat, cv::NORM_INF));
+        EXPECT_EQ(0, cv::norm(in_mat2, rightRefMat, cv::NORM_INF));
+        ++counter;
+    }
+
+    compiled.stop();
+
+    EXPECT_FALSE(compiled.running());
+}
+
+INSTANTIATE_TEST_CASE_P(GStreamerMultiSourceViaGMatsTest, GStreamerMultiSourceTest,
+                        Combine(Values(cv::GComputation([]()
+                                       {
+                                           cv::GMat in1, in2;
+                                           return cv::GComputation(cv::GIn(in1, in2),
+                                                                   cv::GOut(cv::gapi::copy(in1),
+                                                                            cv::gapi::copy(in2)));
+                                       })),
+                               Values(cv::gapi::wip::GStreamerSource::OutputType::MAT)));
+
+INSTANTIATE_TEST_CASE_P(GStreamerMultiSourceViaGFramesTest, GStreamerMultiSourceTest,
+                        Combine(Values(cv::GComputation([]()
+                                       {
+                                           cv::GFrame in1, in2;
+                                           return cv::GComputation(cv::GIn(in1, in2),
+                                                cv::GOut(cv::gapi::streaming::BGR(in1),
+                                                         cv::gapi::streaming::BGR(in2)));
+                                       })),
+                               Values(cv::gapi::wip::GStreamerSource::OutputType::FRAME)));
+} // namespace opencv_test
+
+#endif // HAVE_GSTREAMER