Merge pull request #16081 from dmatveev:dm/ocv42_gapi_bugfixes
[platform/upstream/opencv.git] / modules / gapi / src / executor / gstreamingexecutor.cpp
index 9444ffb..bf8bdf0 100644 (file)
@@ -132,6 +132,177 @@ void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs)
     }
 }
 
+// Pops an item from every input queue and combine it to the final
+// result.  Blocks the current thread.  Returns true if the vector has
+// been obtained successfully and false if a Stop message has been
+// received. Handles Stop x-queue synchronization gracefully.
+//
+// In fact, the logic behind this method is a little bit more complex.
+// The complexity comes from handling the pipeline termination
+// messages.  This version if GStreamerExecutable is running every
+// graph island in its own thread, and threads communicate via bounded
+// (limited in size) queues.  Threads poll their input queues in the
+// infinite loops and pass the data to their Island executables when
+// the full input vector (a "stack frame") arrives.
+//
+// If the input stream is over or stop() is called, "Stop" messages
+// are broadcasted in the graph from island to island via queues,
+// starting with the emitters (sources). Since queues are bounded,
+// thread may block on push() if the queue is full already and is not
+// popped for some reason in the reader thread. In order to avoid
+// this, once an Island gets Stop on an input, it start reading all
+// other input queues until it reaches Stop messages there as well.
+// Only then the thread terminates so in theory queues are left
+// free'd.
+//
+// "Stop" messages are sent to the pipeline in these three cases:
+// 1. User has called stop(): a "Stop" message is sent to every input
+//    queue.
+// 2. Input video stream has reached its end -- its emitter sends Stop
+//    to its readers AND asks constant emitters (emitters attached to
+//    const data -- infinite data generators) to push Stop messages as
+//    well - in order to maintain a regular Stop procedure as defined
+//    above.
+// 3. "Stop" message coming from a constant emitter after triggering an
+//    EOS notification -- see (2).
+//
+// There is a problem with (3). Sometimes it terminates the pipeline
+// too early while some frames could still be produced with no issue,
+// and our test fails with error like "got 99 frames, expected 100".
+// This is how it reproduces:
+//
+//                   q1
+//   [const input]   -----------------------> [ ISL2 ] --> [output]
+//                   q0             q2    .->
+//   [stream input]  ---> [ ISL1 ] -------'
+//
+// Video emitter is pushing frames to q0, and ISL1 is taking every
+// frame from this queue and processes it. Meanwhile, q1 is a
+// const-input-queue staffed with const data already, ISL2 already
+// popped one, and is waiting for data from q2 (of ISL1) to arrive.
+//
+// When the stream is over, stream emitter pushes the last frame to
+// q0, followed by a Stop sign, and _immediately_ notifies const
+// emitters to broadcast Stop messages as well.  In the above
+// configuration, the replicated Stop message via q1 may reach ISL2
+// faster than the real Stop message via q2 -- moreover, somewhere in
+// q1 or q2 there may be real frames awaiting processing. ISL2 gets
+// Stop via q1 and _discards_ any pending data coming from q2 -- so a
+// last frame or two may be lost.
+//
+// A working but not very elegant solution to this problem is to tag
+// Stop messages. Stop got via stop() is really a hard stop, while
+// broadcasted Stop from a Const input shouldn't initiate the Island
+// execution termination. Instead, its associated const data should
+// remain somewhere in islands' thread local storage until a real
+// "Stop" is received.
+//
+// Queue reader is the class which encapsulates all this logic and
+// provies threads with a managed storage and an easy API to obtain
+// data.
+class QueueReader
+{
+    bool m_finishing = false; // Set to true once a "soft" stop is received
+    std::vector<Cmd> m_cmd;
+
+public:
+    bool getInputVector(std::vector<Q*> &in_queues,
+                        cv::GRunArgs    &in_constants,
+                        cv::GRunArgs    &isl_inputs);
+};
+
+bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
+                                 cv::GRunArgs    &in_constants,
+                                 cv::GRunArgs    &isl_inputs)
+{
+    // NOTE: in order to maintain the GRunArg's underlying object
+    // lifetime, keep the whole cmd vector (of size == # of inputs)
+    // in memory.
+    m_cmd.resize(in_queues.size());
+    isl_inputs.resize(in_queues.size());
+
+    for (auto &&it : ade::util::indexed(in_queues))
+    {
+        auto id = ade::util::index(it);
+        auto &q = ade::util::value(it);
+
+        if (q == nullptr)
+        {
+            GAPI_Assert(!in_constants.empty());
+            // NULL queue means a graph-constant value (like a
+            // value-initialized scalar)
+            // It can also hold a constant value received with
+            // Stop::Kind::CNST message (see above).
+            // FIXME: Variant move problem
+            isl_inputs[id] = const_cast<const cv::GRunArg&>(in_constants[id]);
+            continue;
+        }
+
+        q->pop(m_cmd[id]);
+        if (!cv::util::holds_alternative<Stop>(m_cmd[id]))
+        {
+            // FIXME: Variant move problem
+            isl_inputs[id] = const_cast<const cv::GRunArg &>(cv::util::get<cv::GRunArg>(m_cmd[id]));
+        }
+        else // A Stop sign
+        {
+            const auto &stop = cv::util::get<Stop>(m_cmd[id]);
+            if (stop.kind == Stop::Kind::CNST)
+            {
+                // We've got a Stop signal from a const source,
+                // propagated as a result of real stream reaching its
+                // end.  Sometimes these signals come earlier than
+                // real EOS Stops so are deprioritized -- just
+                // remember the Const value here and continue
+                // processing other queues. Set queue pointer to
+                // nullptr and update the const_val vector
+                // appropriately
+                m_finishing = true;
+                in_queues[id] = nullptr;
+                in_constants.resize(in_queues.size());
+                in_constants[id] = std::move(stop.cdata);
+
+                // NEXT time (on a next call to getInputVector()), the
+                // "q==nullptr" check above will be triggered, but now
+                // we need to make it manually:
+                isl_inputs[id] = const_cast<const cv::GRunArg&>(in_constants[id]);
+            }
+            else
+            {
+                GAPI_Assert(stop.kind == Stop::Kind::HARD);
+                // Just got a stop sign. Reiterate through all
+                // _remaining valid_ queues (some of them can be
+                // set to nullptr already -- see above) and rewind
+                // data to every Stop sign per queue
+                for (auto &&qit : ade::util::indexed(in_queues))
+                {
+                    auto id2 = ade::util::index(qit);
+                    auto &q2 = ade::util::value(qit);
+                    if (id == id2) continue;
+
+                    Cmd cmd2;
+                    while (q2 && !cv::util::holds_alternative<Stop>(cmd2))
+                        q2->pop(cmd2);
+                }
+                // After queues are read to the proper indicator,
+                // indicate end-of-stream
+                return false;
+            } // if(Cnst)
+        } // if(Stop)
+    } // for(in_queues)
+
+    if (m_finishing)
+    {
+        // If the process is about to end (a soft Stop was received
+        // already) and an island has no other inputs than constant
+        // inputs, its queues may all become nullptrs. Indicate it as
+        // "no data".
+        return !ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;});
+    }
+    return true; // A regular case - there is data to process.
+}
+
+
 // This thread is a plain dump source actor. What it do is just:
 // - Check input queue (the only one) for a control command
 // - Depending on the state, obtains next data object and pushes it to the
@@ -202,90 +373,62 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs,                //
                        cv::GMetaArgs out_metas,                              // ...
                        std::shared_ptr<cv::gimpl::GIslandExecutable> island, // FIXME: ...a copy of OpDesc{}.
                        std::vector<Q*> in_queues,
-                       std::vector<cv::GRunArg> in_constants,
+                       cv::GRunArgs in_constants,
                        std::vector< std::vector<Q*> > out_queues)
 {
     GAPI_Assert(in_queues.size() == in_rcs.size());
     GAPI_Assert(out_queues.size() == out_rcs.size());
     GAPI_Assert(out_queues.size() == out_metas.size());
+    QueueReader qr;
     while (true)
     {
         std::vector<cv::gimpl::GIslandExecutable::InObj> isl_inputs;
         isl_inputs.resize(in_rcs.size());
 
-        // Try to obtain the full input vector.
-        // Note this may block us. We also may get Stop signal here
-        // and then exit the thread.
-        // NOTE: in order to maintain the GRunArg's underlying object
-        // lifetime, keep the whole cmd vector (of size == # of inputs)
-        // in memory.
-        std::vector<Cmd> cmd(in_queues.size());
-        for (auto &&it : ade::util::indexed(in_queues))
+        cv::GRunArgs isl_input_args;
+        if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
         {
-            auto id = ade::util::index(it);
-            auto &q = ade::util::value(it);
-
-            isl_inputs[id].first  = in_rcs[id];
-            if (q == nullptr)
+            // Stop received -- broadcast Stop down to the pipeline and quit
+            for (auto &&out_qq : out_queues)
             {
-                // NULL queue means a graph-constant value
-                // (like a value-initialized scalar)
-                // FIXME: Variant move problem
-                isl_inputs[id].second = const_cast<const cv::GRunArg&>(in_constants[id]);
+                for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
             }
-            else
-            {
-                q->pop(cmd[id]);
-                if (cv::util::holds_alternative<Stop>(cmd[id]))
-                {
-                    // FIXME: This logic must be unified with what collectorThread is doing!
-                    // Just got a stop sign. Reiterate through all queues
-                    // and rewind data to every Stop sign per queue
-                    for (auto &&qit : ade::util::indexed(in_queues))
-                    {
-                        auto id2 = ade::util::index(qit);
-                        auto &q2 = ade::util::value(qit);
-                        if (id == id2) continue;
-
-                        Cmd cmd2;
-                        while (q2 && !cv::util::holds_alternative<Stop>(cmd2))
-                            q2->pop(cmd2);
-                    }
-                    // Broadcast Stop down to the pipeline and quit
-                    for (auto &&out_qq : out_queues)
-                    {
-                        for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
-                    }
-                    return;
-                }
-                // FIXME: MOVE PROBLEM
-                const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd[id]);
+            return;
+        }
+        GAPI_Assert(isl_inputs.size() == isl_input_args.size());
+        for (auto &&it : ade::util::zip(ade::util::toRange(in_rcs),
+                                        ade::util::toRange(isl_inputs),
+                                        ade::util::toRange(isl_input_args)))
+        {
+            const auto &in_rc     = std::get<0>(it);
+            auto       &isl_input = std::get<1>(it);
+            const auto &in_arg    = std::get<2>(it); // FIXME: MOVE PROBLEM
+            isl_input.first = in_rc;
 #if defined(GAPI_STANDALONE)
-                // Standalone mode - simply store input argument in the vector as-is
-                isl_inputs[id].second = in_arg;
+            // Standalone mode - simply store input argument in the vector as-is
+            isl_inputs[id].second = in_arg;
 #else
-                // Make Islands operate on own:: data types (i.e. in the same
-                // environment as GExecutor provides)
-                // This way several backends (e.g. Fluid) remain OpenCV-independent.
-                switch (in_arg.index()) {
-                case cv::GRunArg::index_of<cv::Mat>():
-                    isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
-                    break;
-                case cv::GRunArg::index_of<cv::Scalar>():
-                    isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Scalar>(in_arg))};
-                    break;
-                default:
-                    isl_inputs[id].second = in_arg;
-                    break;
-                }
-#endif // GAPI_STANDALONE
+            // Make Islands operate on own:: data types (i.e. in the same
+            // environment as GExecutor provides)
+            // This way several backends (e.g. Fluid) remain OpenCV-independent.
+            switch (in_arg.index()) {
+            case cv::GRunArg::index_of<cv::Mat>():
+                isl_input.second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
+                break;
+            case cv::GRunArg::index_of<cv::Scalar>():
+                isl_input.second = cv::GRunArg{cv::to_own(cv::util::get<cv::Scalar>(in_arg))};
+                break;
+            default:
+                isl_input.second = in_arg;
+                break;
             }
+#endif // GAPI_STANDALONE
         }
         // Once the vector is obtained, prepare data for island execution
         // Note - we first allocate output vector via GRunArg!
         // Then it is converted to a GRunArgP.
         std::vector<cv::gimpl::GIslandExecutable::OutObj> isl_outputs;
-        std::vector<cv::GRunArg> out_data;
+        cv::GRunArgs out_data;
         isl_outputs.resize(out_rcs.size());
         out_data.resize(out_rcs.size());
         for (auto &&it : ade::util::indexed(out_rcs))
@@ -363,33 +506,15 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs,                //
 void collectorThread(std::vector<Q*> in_queues,
                      Q&              out_queue)
 {
+    QueueReader qr;
     while (true)
     {
         cv::GRunArgs this_result(in_queues.size());
-        for (auto &&it : ade::util::indexed(in_queues))
+        cv::GRunArgs this_const(in_queues.size());
+        if (!qr.getInputVector(in_queues, this_const, this_result))
         {
-            Cmd cmd;
-            ade::util::value(it)->pop(cmd);
-            if (cv::util::holds_alternative<Stop>(cmd))
-            {
-                // FIXME: Unify this code with island thread
-                for (auto &&qit : ade::util::indexed(in_queues))
-                {
-                    if (ade::util::index(qit) == ade::util::index(it)) continue;
-                    Cmd cmd2;
-                    while (!cv::util::holds_alternative<Stop>(cmd2))
-                        ade::util::value(qit)->pop(cmd2);
-                }
-                out_queue.push(Cmd{Stop{}});
-                return;
-            }
-            else
-            {
-                // FIXME: MOVE_PROBLEM
-                const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd);
-                this_result[ade::util::index(it)] = in_arg;
-                // FIXME: Check for other message types.
-            }
+            out_queue.push(Cmd{Stop{}});
+            return;
         }
         out_queue.push(Cmd{this_result});
     }
@@ -654,6 +779,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
             // Create a constant emitter.
             // Produces always the same ("constant") value when pulled.
             emitter.reset(new ConstEmitter{emit_arg});
+            m_const_vals.push_back(const_cast<cv::GRunArg &>(emit_arg)); // FIXME: move problem
             m_const_emitter_queues.push_back(&m_emitter_queues[emit_idx]);
             break;
         }
@@ -664,9 +790,17 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
     // all other inputs are "constant" generators.
     // Craft here a completion callback to notify Const emitters that
     // a video source is over
+    GAPI_Assert(m_const_emitter_queues.size() == m_const_vals.size());
     auto real_video_completion_cb = [this]()
     {
-        for (auto q : m_const_emitter_queues) q->push(Cmd{Stop{}});
+        for (auto it : ade::util::zip(ade::util::toRange(m_const_emitter_queues),
+                                      ade::util::toRange(m_const_vals)))
+        {
+            Stop stop;
+            stop.kind = Stop::Kind::CNST;
+            stop.cdata = std::get<1>(it);
+            std::get<0>(it)->push(Cmd{std::move(stop)});
+        }
     };
 
     // FIXME: ONLY now, after all executable objects are created,