Merge pull request #20119 from TolyaTalamanov:at/compile-arg-for-queue-capacity
authorAnatoliy Talamanov <anatoliy.talamanov@intel.com>
Mon, 24 May 2021 18:48:23 +0000 (21:48 +0300)
committerGitHub <noreply@github.com>
Mon, 24 May 2021 18:48:23 +0000 (18:48 +0000)
* Support queue capacity as graph compilation argument

* Fix comments to review

* Fix comments to review

* Fix comments to review

modules/gapi/include/opencv2/gapi/gcommon.hpp
modules/gapi/include/opencv2/gapi/gstreaming.hpp
modules/gapi/src/executor/gstreamingexecutor.cpp
modules/gapi/test/streaming/gapi_streaming_tests.cpp

index 8119e397ebf48bbf05b95de75a98e4fab13f39f3..a9cb0159014e18e27244fb6b0a9e20fd6811d800 100644 (file)
@@ -195,6 +195,14 @@ private:
 
 using GCompileArgs = std::vector<GCompileArg>;
 
+inline cv::GCompileArgs& operator += (      cv::GCompileArgs &lhs,
+                                      const cv::GCompileArgs &rhs)
+{
+    lhs.reserve(lhs.size() + rhs.size());
+    lhs.insert(lhs.end(), rhs.begin(), rhs.end());
+    return lhs;
+}
+
 /**
  * @brief Wraps a list of arguments (a parameter pack) into a vector of
  *        compilation arguments (cv::GCompileArg).
index 4e579caafbaad3f8886c02694fc36b8b55ac200f..371581345f201290195c17a1a9e84d9501db4df1 100644 (file)
@@ -371,6 +371,31 @@ protected:
 };
 /** @} */
 
+namespace gapi {
+namespace streaming {
+/**
+ * @brief Specify queue capacity for streaming execution.
+ *
+ * In the streaming mode the pipeline steps are connected with queues
+ * and this compile argument controls every queue's size.
+ */
+struct GAPI_EXPORTS queue_capacity
+{
+    explicit queue_capacity(size_t cap = 1) : capacity(cap) { };
+    size_t capacity;
+};
+/** @} */
+} // namespace streaming
+} // namespace gapi
+
+namespace detail
+{
+template<> struct CompileArgTag<cv::gapi::streaming::queue_capacity>
+{
+    static const char* tag() { return "gapi.queue_capacity"; }
+};
+}
+
 }
 
 #endif // OPENCV_GAPI_GSTREAMING_COMPILED_HPP
index bb1c33860b094e97d5d2db150bc7563e67ed8fc4..74c96bdf3ef3c885cd6776838343e04f981e0659 100644 (file)
@@ -1126,14 +1126,17 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&
     m_sink_queues   .resize(proto.out_nhs.size(), nullptr);
     m_sink_sync     .resize(proto.out_nhs.size(), -1);
 
-    // Very rough estimation to limit internal queue sizes.
+    // Very rough estimation to limit internal queue sizes if not specified by the user.
     // Pipeline depth is equal to number of its (pipeline) steps.
-    const auto queue_capacity = 3*std::count_if
-        (m_gim.nodes().begin(),
-         m_gim.nodes().end(),
-         [&](ade::NodeHandle nh) {
-            return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
-         });
+    auto has_queue_capacity = cv::gapi::getCompileArg<cv::gapi::streaming::queue_capacity>(m_comp_args);
+    const auto queue_capacity = has_queue_capacity ? has_queue_capacity->capacity :
+            3*std::count_if
+            (m_gim.nodes().begin(),
+            m_gim.nodes().end(),
+            [&](ade::NodeHandle nh) {
+                return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
+            });
+    GAPI_Assert(queue_capacity != 0u);
 
     auto sync_policy = cv::gimpl::getCompileArg<cv::gapi::streaming::sync_policy>(m_comp_args)
                        .value_or(cv::gapi::streaming::sync_policy::dont_sync);
index 064511880e3ddc6b8288557a537466258404ee75..f3179a70813a75f7a0882a8bcdc07abd0b14b191 100644 (file)
@@ -67,13 +67,24 @@ std::ostream& operator<< (std::ostream &os, const KernelPackage &e)
     return os;
 }
 
-struct GAPI_Streaming: public ::testing::TestWithParam<KernelPackage> {
-    GAPI_Streaming() { initTestDataPath(); }
+struct GAPI_Streaming: public ::testing::TestWithParam<std::tuple<KernelPackage,
+                                                                  cv::optional<size_t>>> {
+    GAPI_Streaming() {
+        initTestDataPath();
+        KernelPackage pkg_kind;
+        std::tie(pkg_kind, cap) = GetParam();
+        pkg = getKernelPackage(pkg_kind);
+    }
 
-    cv::gapi::GKernelPackage getKernelPackage()
+    const cv::optional<size_t>& getQueueCapacity()
+    {
+        return cap;
+    }
+
+    cv::gapi::GKernelPackage getKernelPackage(KernelPackage pkg_kind)
     {
         using namespace cv::gapi;
-        switch (GetParam())
+        switch (pkg_kind)
         {
         case KernelPackage::OCV:
             return cv::gapi::combine(core::cpu::kernels(),
@@ -104,6 +115,18 @@ struct GAPI_Streaming: public ::testing::TestWithParam<KernelPackage> {
         }
         throw std::logic_error("Unknown package");
     }
+
+    cv::GCompileArgs getCompileArgs() {
+        using namespace cv::gapi;
+        auto args = cv::compile_args(use_only{pkg});
+        if (cap) {
+            args += cv::compile_args(streaming::queue_capacity{cap.value()});
+        }
+        return args;
+    }
+
+    cv::gapi::GKernelPackage pkg;
+    cv::optional<size_t>     cap;
 };
 
 G_API_OP(Delay, <cv::GMat(cv::GMat, int)>, "org.opencv.test.delay") {
@@ -260,8 +283,7 @@ TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat)
     }
 
     // Compilation & testing
-    auto ccomp = c.compileStreaming(cv::descr_of(in_mat),
-                                    cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+    auto ccomp = c.compileStreaming(cv::descr_of(in_mat), getCompileArgs());
     EXPECT_TRUE(ccomp);
     EXPECT_FALSE(ccomp.running());
 
@@ -306,7 +328,7 @@ TEST_P(GAPI_Streaming, SmokeTest_VideoInput_GMat)
 
     // Compilation & testing
     auto ccomp = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}},
-                                    cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+                                    getCompileArgs());
     EXPECT_TRUE(ccomp);
     EXPECT_FALSE(ccomp.running());
 
@@ -356,7 +378,7 @@ TEST_P(GAPI_Streaming, Regression_CompileTimeScalar)
     cv::GComputation c(cv::GIn(in), cv::GOut(tmp, tmp + 1));
 
     auto ccomp = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,512}},
-                                    cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+                                    getCompileArgs());
 
     cv::Mat in_mat = cv::imread(findDataFile("cv/edgefilter/kodim23.png"));
     cv::Mat out_mat1, out_mat2;
@@ -379,7 +401,7 @@ TEST_P(GAPI_Streaming, SmokeTest_StartRestart)
     cv::GComputation c(cv::GIn(in), cv::GOut(cv::gapi::copy(in), out));
 
     auto ccomp = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}},
-                                    cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+                                    getCompileArgs());
     EXPECT_TRUE(ccomp);
     EXPECT_FALSE(ccomp.running());
 
@@ -424,8 +446,7 @@ TEST_P(GAPI_Streaming, SmokeTest_VideoConstSource_NoHang)
     auto refc = cv::GComputation([](){
         cv::GMat in;
         return cv::GComputation(in, cv::gapi::copy(in));
-    }).compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}},
-                        cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+    }).compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, getCompileArgs());
 
     auto path = findDataFile("cv/video/768x576.avi");
     try {
@@ -447,7 +468,7 @@ TEST_P(GAPI_Streaming, SmokeTest_VideoConstSource_NoHang)
     auto testc = cv::GComputation(cv::GIn(in, in2), cv::GOut(out))
         .compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{256,256}},
                           cv::GMatDesc{CV_8U,3,cv::Size{768,576}},
-                          cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+                          getCompileArgs());
 
     cv::Mat in_const = cv::Mat::eye(cv::Size(256,256), CV_8UC3);
     testc.setSource(cv::gin(in_const,
@@ -468,7 +489,7 @@ TEST_P(GAPI_Streaming, SmokeTest_AutoMeta)
     cv::GMat out = blr - in;
 
     auto testc = cv::GComputation(cv::GIn(in, in2), cv::GOut(out))
-        .compileStreaming(cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+        .compileStreaming(getCompileArgs());
 
     cv::Mat in_const = cv::Mat::eye(cv::Size(256,256), CV_8UC3);
     cv::Mat tmp;
@@ -510,7 +531,7 @@ TEST_P(GAPI_Streaming, SmokeTest_AutoMeta_2xConstMat)
     cv::GMat out = blr - in;
 
     auto testc = cv::GComputation(cv::GIn(in, in2), cv::GOut(out))
-        .compileStreaming(cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+        .compileStreaming(getCompileArgs());
 
     cv::Mat in_const = cv::Mat::eye(cv::Size(256,256), CV_8UC3);
     cv::Mat tmp;
@@ -541,7 +562,7 @@ TEST_P(GAPI_Streaming, SmokeTest_AutoMeta_VideoScalar)
     cv::GMat out_m = in_m * in_s;
 
     auto testc = cv::GComputation(cv::GIn(in_m, in_s), cv::GOut(out_m))
-        .compileStreaming(cv::compile_args(cv::gapi::use_only{getKernelPackage()}));
+        .compileStreaming(getCompileArgs());
 
     cv::Mat tmp;
     // Test with one video source and scalar
@@ -572,11 +593,13 @@ TEST_P(GAPI_Streaming, SmokeTest_AutoMeta_VideoScalar)
 }
 
 INSTANTIATE_TEST_CASE_P(TestStreaming, GAPI_Streaming,
-                        Values(  KernelPackage::OCV
-                             //, KernelPackage::OCL // FIXME: Fails bit-exactness check, maybe relax it?
-                               , KernelPackage::OCV_FLUID
-                             //, KernelPackage::OCL // FIXME: Fails bit-exactness check, maybe relax it?
-                               ));
+                        Combine(Values(  KernelPackage::OCV
+                                    //, KernelPackage::OCL // FIXME: Fails bit-exactness check, maybe relax it?
+                                      , KernelPackage::OCV_FLUID
+                                    //, KernelPackage::OCL // FIXME: Fails bit-exactness check, maybe relax it?
+                                ),
+                                Values(cv::optional<size_t>{}, 1u, 4u))
+                        );
 
 namespace TypesTest
 {
@@ -653,8 +676,15 @@ TEST_P(GAPI_Streaming, SmokeTest_AutoMeta_VideoArray)
     cv::GMat out_m = TypesTest::AddV::on(in_m, in_v) - in_m;
 
     // Run pipeline
+    auto args = cv::compile_args(cv::gapi::kernels<TypesTest::OCVAddV>());
+    auto capacity = getQueueCapacity();
+    if (capacity)
+    {
+        args += cv::compile_args(
+                    cv::gapi::streaming::queue_capacity{capacity.value()});
+    }
     auto testc = cv::GComputation(cv::GIn(in_m, in_v), cv::GOut(out_m))
-                    .compileStreaming(cv::compile_args(cv::gapi::kernels<TypesTest::OCVAddV>()));
+                    .compileStreaming(std::move(args));
 
     cv::Mat tmp;
     // Test with one video source and vector