}
}
+// Pops an item from every input queue and combine it to the final
+// result. Blocks the current thread. Returns true if the vector has
+// been obtained successfully and false if a Stop message has been
+// received. Handles Stop x-queue synchronization gracefully.
+//
+// In fact, the logic behind this method is a little bit more complex.
+// The complexity comes from handling the pipeline termination
+// messages. This version if GStreamerExecutable is running every
+// graph island in its own thread, and threads communicate via bounded
+// (limited in size) queues. Threads poll their input queues in the
+// infinite loops and pass the data to their Island executables when
+// the full input vector (a "stack frame") arrives.
+//
+// If the input stream is over or stop() is called, "Stop" messages
+// are broadcasted in the graph from island to island via queues,
+// starting with the emitters (sources). Since queues are bounded,
+// thread may block on push() if the queue is full already and is not
+// popped for some reason in the reader thread. In order to avoid
+// this, once an Island gets Stop on an input, it start reading all
+// other input queues until it reaches Stop messages there as well.
+// Only then the thread terminates so in theory queues are left
+// free'd.
+//
+// "Stop" messages are sent to the pipeline in these three cases:
+// 1. User has called stop(): a "Stop" message is sent to every input
+// queue.
+// 2. Input video stream has reached its end -- its emitter sends Stop
+// to its readers AND asks constant emitters (emitters attached to
+// const data -- infinite data generators) to push Stop messages as
+// well - in order to maintain a regular Stop procedure as defined
+// above.
+// 3. "Stop" message coming from a constant emitter after triggering an
+// EOS notification -- see (2).
+//
+// There is a problem with (3). Sometimes it terminates the pipeline
+// too early while some frames could still be produced with no issue,
+// and our test fails with error like "got 99 frames, expected 100".
+// This is how it reproduces:
+//
+// q1
+// [const input] -----------------------> [ ISL2 ] --> [output]
+// q0 q2 .->
+// [stream input] ---> [ ISL1 ] -------'
+//
+// Video emitter is pushing frames to q0, and ISL1 is taking every
+// frame from this queue and processes it. Meanwhile, q1 is a
+// const-input-queue staffed with const data already, ISL2 already
+// popped one, and is waiting for data from q2 (of ISL1) to arrive.
+//
+// When the stream is over, stream emitter pushes the last frame to
+// q0, followed by a Stop sign, and _immediately_ notifies const
+// emitters to broadcast Stop messages as well. In the above
+// configuration, the replicated Stop message via q1 may reach ISL2
+// faster than the real Stop message via q2 -- moreover, somewhere in
+// q1 or q2 there may be real frames awaiting processing. ISL2 gets
+// Stop via q1 and _discards_ any pending data coming from q2 -- so a
+// last frame or two may be lost.
+//
+// A working but not very elegant solution to this problem is to tag
+// Stop messages. Stop got via stop() is really a hard stop, while
+// broadcasted Stop from a Const input shouldn't initiate the Island
+// execution termination. Instead, its associated const data should
+// remain somewhere in islands' thread local storage until a real
+// "Stop" is received.
+//
+// Queue reader is the class which encapsulates all this logic and
+// provies threads with a managed storage and an easy API to obtain
+// data.
+class QueueReader
+{
+ bool m_finishing = false; // Set to true once a "soft" stop is received
+ std::vector<Cmd> m_cmd;
+
+public:
+ bool getInputVector(std::vector<Q*> &in_queues,
+ cv::GRunArgs &in_constants,
+ cv::GRunArgs &isl_inputs);
+};
+
+bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
+ cv::GRunArgs &in_constants,
+ cv::GRunArgs &isl_inputs)
+{
+ // NOTE: in order to maintain the GRunArg's underlying object
+ // lifetime, keep the whole cmd vector (of size == # of inputs)
+ // in memory.
+ m_cmd.resize(in_queues.size());
+ isl_inputs.resize(in_queues.size());
+
+ for (auto &&it : ade::util::indexed(in_queues))
+ {
+ auto id = ade::util::index(it);
+ auto &q = ade::util::value(it);
+
+ if (q == nullptr)
+ {
+ GAPI_Assert(!in_constants.empty());
+ // NULL queue means a graph-constant value (like a
+ // value-initialized scalar)
+ // It can also hold a constant value received with
+ // Stop::Kind::CNST message (see above).
+ // FIXME: Variant move problem
+ isl_inputs[id] = const_cast<const cv::GRunArg&>(in_constants[id]);
+ continue;
+ }
+
+ q->pop(m_cmd[id]);
+ if (!cv::util::holds_alternative<Stop>(m_cmd[id]))
+ {
+ // FIXME: Variant move problem
+ isl_inputs[id] = const_cast<const cv::GRunArg &>(cv::util::get<cv::GRunArg>(m_cmd[id]));
+ }
+ else // A Stop sign
+ {
+ const auto &stop = cv::util::get<Stop>(m_cmd[id]);
+ if (stop.kind == Stop::Kind::CNST)
+ {
+ // We've got a Stop signal from a const source,
+ // propagated as a result of real stream reaching its
+ // end. Sometimes these signals come earlier than
+ // real EOS Stops so are deprioritized -- just
+ // remember the Const value here and continue
+ // processing other queues. Set queue pointer to
+ // nullptr and update the const_val vector
+ // appropriately
+ m_finishing = true;
+ in_queues[id] = nullptr;
+ in_constants.resize(in_queues.size());
+ in_constants[id] = std::move(stop.cdata);
+
+ // NEXT time (on a next call to getInputVector()), the
+ // "q==nullptr" check above will be triggered, but now
+ // we need to make it manually:
+ isl_inputs[id] = const_cast<const cv::GRunArg&>(in_constants[id]);
+ }
+ else
+ {
+ GAPI_Assert(stop.kind == Stop::Kind::HARD);
+ // Just got a stop sign. Reiterate through all
+ // _remaining valid_ queues (some of them can be
+ // set to nullptr already -- see above) and rewind
+ // data to every Stop sign per queue
+ for (auto &&qit : ade::util::indexed(in_queues))
+ {
+ auto id2 = ade::util::index(qit);
+ auto &q2 = ade::util::value(qit);
+ if (id == id2) continue;
+
+ Cmd cmd2;
+ while (q2 && !cv::util::holds_alternative<Stop>(cmd2))
+ q2->pop(cmd2);
+ }
+ // After queues are read to the proper indicator,
+ // indicate end-of-stream
+ return false;
+ } // if(Cnst)
+ } // if(Stop)
+ } // for(in_queues)
+
+ if (m_finishing)
+ {
+ // If the process is about to end (a soft Stop was received
+ // already) and an island has no other inputs than constant
+ // inputs, its queues may all become nullptrs. Indicate it as
+ // "no data".
+ return !ade::util::all_of(in_queues, [](Q *ptr){return ptr == nullptr;});
+ }
+ return true; // A regular case - there is data to process.
+}
+
+
// This thread is a plain dump source actor. What it do is just:
// - Check input queue (the only one) for a control command
// - Depending on the state, obtains next data object and pushes it to the
cv::GMetaArgs out_metas, // ...
std::shared_ptr<cv::gimpl::GIslandExecutable> island, // FIXME: ...a copy of OpDesc{}.
std::vector<Q*> in_queues,
- std::vector<cv::GRunArg> in_constants,
+ cv::GRunArgs in_constants,
std::vector< std::vector<Q*> > out_queues)
{
GAPI_Assert(in_queues.size() == in_rcs.size());
GAPI_Assert(out_queues.size() == out_rcs.size());
GAPI_Assert(out_queues.size() == out_metas.size());
+ QueueReader qr;
while (true)
{
std::vector<cv::gimpl::GIslandExecutable::InObj> isl_inputs;
isl_inputs.resize(in_rcs.size());
- // Try to obtain the full input vector.
- // Note this may block us. We also may get Stop signal here
- // and then exit the thread.
- // NOTE: in order to maintain the GRunArg's underlying object
- // lifetime, keep the whole cmd vector (of size == # of inputs)
- // in memory.
- std::vector<Cmd> cmd(in_queues.size());
- for (auto &&it : ade::util::indexed(in_queues))
+ cv::GRunArgs isl_input_args;
+ if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
{
- auto id = ade::util::index(it);
- auto &q = ade::util::value(it);
-
- isl_inputs[id].first = in_rcs[id];
- if (q == nullptr)
+ // Stop received -- broadcast Stop down to the pipeline and quit
+ for (auto &&out_qq : out_queues)
{
- // NULL queue means a graph-constant value
- // (like a value-initialized scalar)
- // FIXME: Variant move problem
- isl_inputs[id].second = const_cast<const cv::GRunArg&>(in_constants[id]);
+ for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
}
- else
- {
- q->pop(cmd[id]);
- if (cv::util::holds_alternative<Stop>(cmd[id]))
- {
- // FIXME: This logic must be unified with what collectorThread is doing!
- // Just got a stop sign. Reiterate through all queues
- // and rewind data to every Stop sign per queue
- for (auto &&qit : ade::util::indexed(in_queues))
- {
- auto id2 = ade::util::index(qit);
- auto &q2 = ade::util::value(qit);
- if (id == id2) continue;
-
- Cmd cmd2;
- while (q2 && !cv::util::holds_alternative<Stop>(cmd2))
- q2->pop(cmd2);
- }
- // Broadcast Stop down to the pipeline and quit
- for (auto &&out_qq : out_queues)
- {
- for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
- }
- return;
- }
- // FIXME: MOVE PROBLEM
- const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd[id]);
+ return;
+ }
+ GAPI_Assert(isl_inputs.size() == isl_input_args.size());
+ for (auto &&it : ade::util::zip(ade::util::toRange(in_rcs),
+ ade::util::toRange(isl_inputs),
+ ade::util::toRange(isl_input_args)))
+ {
+ const auto &in_rc = std::get<0>(it);
+ auto &isl_input = std::get<1>(it);
+ const auto &in_arg = std::get<2>(it); // FIXME: MOVE PROBLEM
+ isl_input.first = in_rc;
#if defined(GAPI_STANDALONE)
- // Standalone mode - simply store input argument in the vector as-is
- isl_inputs[id].second = in_arg;
+ // Standalone mode - simply store input argument in the vector as-is
+ isl_inputs[id].second = in_arg;
#else
- // Make Islands operate on own:: data types (i.e. in the same
- // environment as GExecutor provides)
- // This way several backends (e.g. Fluid) remain OpenCV-independent.
- switch (in_arg.index()) {
- case cv::GRunArg::index_of<cv::Mat>():
- isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
- break;
- case cv::GRunArg::index_of<cv::Scalar>():
- isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Scalar>(in_arg))};
- break;
- default:
- isl_inputs[id].second = in_arg;
- break;
- }
-#endif // GAPI_STANDALONE
+ // Make Islands operate on own:: data types (i.e. in the same
+ // environment as GExecutor provides)
+ // This way several backends (e.g. Fluid) remain OpenCV-independent.
+ switch (in_arg.index()) {
+ case cv::GRunArg::index_of<cv::Mat>():
+ isl_input.second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
+ break;
+ case cv::GRunArg::index_of<cv::Scalar>():
+ isl_input.second = cv::GRunArg{cv::to_own(cv::util::get<cv::Scalar>(in_arg))};
+ break;
+ default:
+ isl_input.second = in_arg;
+ break;
}
+#endif // GAPI_STANDALONE
}
// Once the vector is obtained, prepare data for island execution
// Note - we first allocate output vector via GRunArg!
// Then it is converted to a GRunArgP.
std::vector<cv::gimpl::GIslandExecutable::OutObj> isl_outputs;
- std::vector<cv::GRunArg> out_data;
+ cv::GRunArgs out_data;
isl_outputs.resize(out_rcs.size());
out_data.resize(out_rcs.size());
for (auto &&it : ade::util::indexed(out_rcs))
void collectorThread(std::vector<Q*> in_queues,
Q& out_queue)
{
+ QueueReader qr;
while (true)
{
cv::GRunArgs this_result(in_queues.size());
- for (auto &&it : ade::util::indexed(in_queues))
+ cv::GRunArgs this_const(in_queues.size());
+ if (!qr.getInputVector(in_queues, this_const, this_result))
{
- Cmd cmd;
- ade::util::value(it)->pop(cmd);
- if (cv::util::holds_alternative<Stop>(cmd))
- {
- // FIXME: Unify this code with island thread
- for (auto &&qit : ade::util::indexed(in_queues))
- {
- if (ade::util::index(qit) == ade::util::index(it)) continue;
- Cmd cmd2;
- while (!cv::util::holds_alternative<Stop>(cmd2))
- ade::util::value(qit)->pop(cmd2);
- }
- out_queue.push(Cmd{Stop{}});
- return;
- }
- else
- {
- // FIXME: MOVE_PROBLEM
- const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd);
- this_result[ade::util::index(it)] = in_arg;
- // FIXME: Check for other message types.
- }
+ out_queue.push(Cmd{Stop{}});
+ return;
}
out_queue.push(Cmd{this_result});
}
// Create a constant emitter.
// Produces always the same ("constant") value when pulled.
emitter.reset(new ConstEmitter{emit_arg});
+ m_const_vals.push_back(const_cast<cv::GRunArg &>(emit_arg)); // FIXME: move problem
m_const_emitter_queues.push_back(&m_emitter_queues[emit_idx]);
break;
}
// all other inputs are "constant" generators.
// Craft here a completion callback to notify Const emitters that
// a video source is over
+ GAPI_Assert(m_const_emitter_queues.size() == m_const_vals.size());
auto real_video_completion_cb = [this]()
{
- for (auto q : m_const_emitter_queues) q->push(Cmd{Stop{}});
+ for (auto it : ade::util::zip(ade::util::toRange(m_const_emitter_queues),
+ ade::util::toRange(m_const_vals)))
+ {
+ Stop stop;
+ stop.kind = Stop::Kind::CNST;
+ stop.cdata = std::get<1>(it);
+ std::get<0>(it)->push(Cmd{std::move(stop)});
+ }
};
// FIXME: ONLY now, after all executable objects are created,