Merge pull request #21731 from AsyaPronina:asyadev/fix_new_stream_event
authorAnastasiya(Asya) Pronina <anastasiya.pronina@intel.com>
Mon, 4 Apr 2022 17:39:02 +0000 (20:39 +0300)
committerGitHub <noreply@github.com>
Mon, 4 Apr 2022 17:39:02 +0000 (17:39 +0000)
Fixed handling of new stream, especially for stateful OCV kernels

* Fixed handling of new stream, especially for stateful OCV kernels

* Removed duplication from StateInitOnce tests

* Addressed review comments for PR #21731
- Fixed explanation comments
- Expanded test for stateful OCV kernels in Regular mode

* Addressed review comments for PR #21731
- Moved notification about new stream to the constructor
- Added test on state reset for Regular mode

* Addresed review comments

* Addressed review comments

Co-authored-by: Ruslan Garnov <ruslan.garnov@intel.com>
modules/gapi/src/backends/cpu/gcpubackend.cpp
modules/gapi/src/backends/cpu/gcpubackend.hpp
modules/gapi/src/executor/gexecutor.cpp
modules/gapi/src/executor/gstreamingexecutor.cpp
modules/gapi/test/cpu/gapi_ocv_stateful_kernel_tests.cpp

index b1e716f..f50f8ec 100644 (file)
@@ -27,6 +27,7 @@
 #include "api/gbackend_priv.hpp" // FIXME: Make it part of Backend SDK!
 
 #include "utils/itt.hpp"
+#include "logger.hpp"
 
 // FIXME: Is there a way to take a typed graph (our GModel),
 // and create a new typed graph _ATOP_ of that (by extending with a couple of
@@ -113,8 +114,6 @@ cv::gimpl::GCPUExecutable::GCPUExecutable(const ade::Graph &g,
         }
     }
     makeReshape();
-    // For each stateful kernel call 'setup' user callback to initialize state.
-    setupKernelStates();
 }
 
 // FIXME: Document what it does
@@ -190,18 +189,23 @@ void cv::gimpl::GCPUExecutable::makeReshape() {
 void cv::gimpl::GCPUExecutable::reshape(ade::Graph&, const GCompileArgs& args) {
     m_compileArgs = args;
     makeReshape();
-    // Signal to reset stateful kernels` state.
-    // There can be no handleNewStream() call to set this flag
-    // if user didn't call GCompiled`s prepareForNewStream()
-    m_newStreamStarted = true;
+    // TODO: Add an input meta sensitivity flag to stateful kernels.
+    // When reshape() happens, reset state for meta-sensitive kernels only
+    if (!m_nodesToStates.empty()) {
+        std::call_once(m_warnFlag,
+            [](){
+                GAPI_LOG_WARNING(NULL,
+                    "\nGCPUExecutable::reshape was called. Resetting states of stateful kernels.");
+            });
+        setupKernelStates();
+    }
 }
 
 void cv::gimpl::GCPUExecutable::handleNewStream()
 {
-    // Signal to reset stateful kernels` state.
-    // No need to call reshape() here since it'll
-    // be called automatically if input meta was changed
-    m_newStreamStarted = true;
+    // In case if new video-stream happens - for each stateful kernel
+    // call 'setup' user callback to re-initialize state.
+    setupKernelStates();
 }
 
 void cv::gimpl::GCPUExecutable::run(std::vector<InObj>  &&input_objs,
@@ -231,14 +235,6 @@ void cv::gimpl::GCPUExecutable::run(std::vector<InObj>  &&input_objs,
         }
     }
 
-    // In case if new video-stream happens - for each stateful kernel
-    // call 'setup' user callback to re-initialize state.
-    if (m_newStreamStarted)
-    {
-        setupKernelStates();
-        m_newStreamStarted = false;
-    }
-
     // OpenCV backend execution is not a rocket science at all.
     // Simply invoke our kernels in the proper order.
     GConstGCPUModel gcm(m_g);
index 6a7b41e..c8bad6c 100644 (file)
@@ -56,8 +56,8 @@ class GCPUExecutable final: public GIslandExecutable
     // Actual data of all resources in graph (both internal and external)
     Mag m_res;
 
-    // Flag which identifies if new stream was started
-    bool m_newStreamStarted = false;
+    // A flag for call_once() (used for log warnings)
+    std::once_flag m_warnFlag;
 
     GArg packArg(const GArg &arg);
     void setupKernelStates();
index b7b0b5c..a8abde2 100644 (file)
@@ -30,10 +30,11 @@ cv::gimpl::GExecutor::GExecutor(std::unique_ptr<ade::Graph> &&g_model)
     // 1. Allocate all internal resources first (NB - CPU plugin doesn't do it)
     // 2. Put input/output GComputation arguments to the storage
     // 3. For every Island, prepare vectors of input/output parameter descs
-    // 4. Iterate over a list of operations (sorted in the topological order)
-    // 5. For every operation, form a list of input/output data objects
-    // 6. Run GIslandExecutable
-    // 7. writeBack
+    // 4. Ask every GIslandExecutable to prepare its internal states for a new stream
+    // 5. Iterate over a list of operations (sorted in the topological order)
+    // 6. For every operation, form a list of input/output data objects
+    // 7. Run GIslandExecutable
+    // 8. writeBack
 
     auto sorted = m_gim.metadata().get<ade::passes::TopologicalSortData>();
     for (auto nh : sorted.nodes())
@@ -82,6 +83,9 @@ cv::gimpl::GExecutor::GExecutor(std::unique_ptr<ade::Graph> &&g_model)
             break;
         } // switch(kind)
     } // for(gim nodes)
+
+    // (4)
+    prepareForNewStream();
 }
 
 namespace cv {
@@ -401,10 +405,10 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
         magazine::resetInternalData(m_res, data);
     }
 
-    // Run the script
+    // Run the script (5)
     for (auto &op : m_ops)
     {
-        // (5), (6)
+        // (6), (7)
         Input i{m_res, op.in_objects};
         Output o{m_res, op.out_objects};
         op.isl_exec->run(i, o);
@@ -412,7 +416,7 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
         o.verify();
     }
 
-    // (7)
+    // (8)
     for (auto it : ade::util::zip(ade::util::toRange(proto.outputs),
                                   ade::util::toRange(args.outObjs)))
     {
index 34424cb..6c8c568 100644 (file)
@@ -1564,7 +1564,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
             }
         }
     };
-    bool islandsRecompiled = false;
+
     const auto new_meta = cv::descr_of(ins); // 0
     if (gm.metadata().contains<OriginalInputMeta>()) // (1)
     {
@@ -1586,8 +1586,6 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
             }
             update_int_metas(); // (7)
             m_reshapable = util::make_optional(is_reshapable);
-
-            islandsRecompiled = true;
         }
         else // (8)
         {
@@ -1709,14 +1707,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
         island_meta_info = GIslandModel::traceIslandName(op.nh, m_gim);
 #endif // OPENCV_WITH_ITT
 
-        // If Island Executable is recompiled, all its stuff including internal kernel states
-        // are recreated and re-initialized automatically.
-        // But if not, we should notify Island Executable about new started stream to let it update
-        // its internal variables.
-        if (!islandsRecompiled)
-        {
-            op.isl_exec->handleNewStream();
-        }
+        // Notify island executable about a new stream to let it update its internal variables.
+        op.isl_exec->handleNewStream();
 
         m_threads.emplace_back(islandActorThread,
                                op.in_objects,
index cf03430..17c4e2f 100644 (file)
@@ -14,6 +14,7 @@
 #include <opencv2/video.hpp>
 #endif
 
+#include <memory> // required by std::shared_ptr
 
 namespace opencv_test
 {
@@ -21,6 +22,11 @@ namespace opencv_test
     {
         std::string method;
     };
+
+    struct CountStateSetupsParams
+    {
+        std::shared_ptr<int> pSetupsCount;
+    };
 } // namespace opencv_test
 
 namespace cv
@@ -34,6 +40,14 @@ namespace cv
                 return "org.opencv.test.background_substractor_state_params";
             }
         };
+
+        template<> struct CompileArgTag<opencv_test::CountStateSetupsParams>
+        {
+            static const char* tag()
+            {
+                return "org.opencv.test.count_state_setups_params";
+            }
+        };
     } // namespace detail
 } // namespace cv
 
@@ -127,8 +141,101 @@ namespace
         }
     };
 #endif
+
+    G_TYPED_KERNEL(GCountStateSetups, <cv::GOpaque<bool>(GMat)>,
+                   "org.opencv.test.count_state_setups")
+    {
+        static GOpaqueDesc outMeta(GMatDesc /* in */) { return empty_gopaque_desc(); }
+    };
+
+    GAPI_OCV_KERNEL_ST(GOCVCountStateSetups, GCountStateSetups, int)
+    {
+        static void setup(const cv::GMatDesc &, std::shared_ptr<int> &,
+                          const cv::GCompileArgs &compileArgs)
+        {
+            auto params = cv::gapi::getCompileArg<CountStateSetupsParams>(compileArgs)
+                .value_or(CountStateSetupsParams { });
+            if (params.pSetupsCount != nullptr) {
+                (*params.pSetupsCount)++;
+            }
+        }
+
+        static void run(const cv::Mat & , bool &out, int &)
+        {
+            out = true;
+        }
+    };
+};
+
+TEST(StatefulKernel, StateInitOnceInRegularMode)
+{
+    cv::GMat in;
+    cv::GOpaque<bool> out = GCountStateSetups::on(in);
+    cv::GComputation c(cv::GIn(in), cv::GOut(out));
+
+    // Input mat:
+    cv::Mat inputData(1080, 1920, CV_8UC1);
+    cv::randu(inputData, cv::Scalar::all(1), cv::Scalar::all(128));
+
+    // variable to update when state is initialized in the kernel
+    CountStateSetupsParams params;
+    params.pSetupsCount.reset(new int(0));
+
+    // Testing for 100 frames
+    bool result { };
+    for (int i = 0; i < 100; ++i) {
+        c.apply(cv::gin(inputData), cv::gout(result),
+                cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(), params));
+        EXPECT_TRUE(result);
+        EXPECT_TRUE(params.pSetupsCount != nullptr);
+        EXPECT_EQ(1, *params.pSetupsCount);
+    }
 };
 
+struct StateInitOnce : public ::testing::TestWithParam<bool>{};
+TEST_P(StateInitOnce, StreamingCompiledWithMeta)
+{
+    bool compileWithMeta = GetParam();
+    cv::GMat in;
+    cv::GOpaque<bool> out = GCountStateSetups::on(in);
+    cv::GComputation c(cv::GIn(in), cv::GOut(out));
+
+    // Input mat:
+    cv::Mat inputData(1080, 1920, CV_8UC1);
+    cv::randu(inputData, cv::Scalar::all(1), cv::Scalar::all(128));
+
+    // variable to update when state is initialized in the kernel
+    CountStateSetupsParams params;
+    params.pSetupsCount.reset(new int(0));
+
+    // Compilation & testing
+    auto ccomp = (compileWithMeta)
+        ? c.compileStreaming(cv::descr_of(inputData),
+              cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(),
+                               params))
+        : c.compileStreaming(
+              cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(),
+                               params));
+
+    ccomp.setSource(cv::gin(inputData));
+
+    ccomp.start();
+    EXPECT_TRUE(ccomp.running());
+
+    int counter { };
+    bool result;
+    // Process mat 100 times
+    while (ccomp.pull(cv::gout(result)) && (counter++ < 100)) {
+        EXPECT_TRUE(params.pSetupsCount != nullptr);
+        EXPECT_EQ(1, *params.pSetupsCount);
+    }
+
+    ccomp.stop();
+    EXPECT_FALSE(ccomp.running());
+}
+
+INSTANTIATE_TEST_CASE_P(StatefulKernel, StateInitOnce, ::testing::Bool());
+
 TEST(StatefulKernel, StateIsMutableInRuntime)
 {
     constexpr int expectedCallsCount = 10;
@@ -163,7 +270,43 @@ TEST(StatefulKernel, StateIsMutableInRuntime)
 
 }
 
-TEST(StatefulKernel, StateIsAutoResetForNewStream)
+TEST(StateIsResetOnNewStream, RegularMode)
+{
+    cv::GMat in;
+    cv::GOpaque<bool> out = GCountStateSetups::on(in);
+    cv::GComputation c(cv::GIn(in), cv::GOut(out));
+
+    // Input mat:
+    cv::Mat inputData(1080, 1920, CV_8UC1);
+    cv::randu(inputData, cv::Scalar::all(1), cv::Scalar::all(128));
+
+    // variable to update when state is initialized in the kernel
+    CountStateSetupsParams params;
+    params.pSetupsCount.reset(new int(0));
+
+    auto setupsCounter = c.compile(cv::descr_of(inputData),
+                                   cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(),
+                                                    params));
+
+    bool result { };
+    for (int i = 0; i < 2; ++i) {
+        setupsCounter(cv::gin(inputData), cv::gout(result));
+        EXPECT_TRUE(params.pSetupsCount != nullptr);
+        EXPECT_EQ(1, *params.pSetupsCount);
+    }
+
+    EXPECT_TRUE(params.pSetupsCount != nullptr);
+    EXPECT_EQ(1, *params.pSetupsCount);
+    setupsCounter.prepareForNewStream();
+
+    for (int i = 0; i < 2; ++i) {
+        setupsCounter(cv::gin(inputData), cv::gout(result));
+        EXPECT_TRUE(params.pSetupsCount != nullptr);
+        EXPECT_EQ(2, *params.pSetupsCount);
+    }
+}
+
+TEST(StateIsResetOnNewStream, StreamingMode)
 {
     cv::GMat in;
     cv::GOpaque<bool> out = GIsStateUpToDate::on(in);
@@ -387,6 +530,45 @@ TEST(StatefulKernel, StateIsChangedViaCompArgsOnReshape)
     run("cv/video/768x576.avi", "knn");
     run("cv/video/1920x1080.avi", "mog2");
 }
+
+TEST(StatefulKernel, StateIsResetOnceOnReshapeInStreaming)
+{
+    cv::GMat in;
+    cv::GOpaque<bool> out = GCountStateSetups::on(in);
+    cv::GComputation c(cv::GIn(in), cv::GOut(out));
+
+    // variable to update when state is initialized in the kernel
+    CountStateSetupsParams params;
+    params.pSetupsCount.reset(new int(0));
+
+    auto ccomp = c.compileStreaming(
+        cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(), params));
+
+    auto run = [&ccomp, &params](const std::string& videoPath, int expectedSetupsCount) {
+        auto path = findDataFile(videoPath);
+        try {
+            ccomp.setSource<cv::gapi::wip::GCaptureSource>(path);
+        } catch(...) {
+            throw SkipTestException("Video file can not be opened");
+        }
+        ccomp.start();
+
+        int frames = 0;
+        bool result = false;
+        while (ccomp.pull(cv::gout(result)) && (frames++ < 10)) {
+            EXPECT_TRUE(result);
+            EXPECT_TRUE(params.pSetupsCount != nullptr);
+            EXPECT_EQ(expectedSetupsCount, *params.pSetupsCount);
+        }
+        ccomp.stop();
+    };
+
+    run("cv/video/768x576.avi", 1);
+    // FIXME: it should be 2, not 3 for expectedSetupsCount here.
+    // With current implemention both GCPUExecutable reshape() and
+    // handleNewStream() call setupKernelStates()
+    run("cv/video/1920x1080.avi", 3);
+}
 #endif
 
 TEST(StatefulKernel, StateIsAutoResetOnReshape)