1 // This file is part of OpenCV project.
2 // It is subject to the license terms in the LICENSE file found in the top-level directory
3 // of this distribution and at http://opencv.org/license.html.
5 // Copyright (C) 2019 Intel Corporation
11 #include <ade/util/zip_range.hpp>
13 #include <opencv2/gapi/opencv_includes.hpp>
15 #include "executor/gstreamingexecutor.hpp"
16 #include "compiler/passes/passes.hpp"
17 #include "backends/common/gbackend.hpp" // createMat
18 #include "compiler/gcompiler.hpp" // for compileIslands
22 using namespace cv::gimpl::stream;
24 #if !defined(GAPI_STANDALONE)
25 class VideoEmitter final: public cv::gimpl::GIslandEmitter {
26 cv::gapi::wip::IStreamSource::Ptr src;
28 virtual bool pull(cv::GRunArg &arg) override {
29 // FIXME: probably we can maintain a pool of (then) pre-allocated
30 // buffers to avoid runtime allocations.
31 // Pool size can be determined given the internal queue size.
32 cv::gapi::wip::Data newData;
33 if (!src->pull(newData)) {
36 arg = std::move(static_cast<cv::GRunArg&>(newData));
40 explicit VideoEmitter(const cv::GRunArg &arg) {
41 src = cv::util::get<cv::gapi::wip::IStreamSource::Ptr>(arg);
44 #endif // GAPI_STANDALONE
46 class ConstEmitter final: public cv::gimpl::GIslandEmitter {
49 virtual bool pull(cv::GRunArg &arg) override {
50 arg = const_cast<const cv::GRunArg&>(m_arg); // FIXME: variant workaround
55 explicit ConstEmitter(const cv::GRunArg &arg) : m_arg(arg) {
60 static const char *name() { return "StreamingDataQueue"; }
62 explicit DataQueue(std::size_t capacity) {
64 q.set_capacity(capacity);
68 cv::gimpl::stream::Q q;
71 std::vector<cv::gimpl::stream::Q*> reader_queues( ade::Graph &g,
72 const ade::NodeHandle &obj)
74 ade::TypedGraph<DataQueue> qgr(g);
75 std::vector<cv::gimpl::stream::Q*> result;
76 for (auto &&out_eh : obj->outEdges())
78 result.push_back(&qgr.metadata(out_eh).get<DataQueue>().q);
83 std::vector<cv::gimpl::stream::Q*> input_queues( ade::Graph &g,
84 const ade::NodeHandle &obj)
86 ade::TypedGraph<DataQueue> qgr(g);
87 std::vector<cv::gimpl::stream::Q*> result;
88 for (auto &&in_eh : obj->inEdges())
90 result.push_back(qgr.metadata(in_eh).contains<DataQueue>()
91 ? &qgr.metadata(in_eh).get<DataQueue>().q
97 void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs)
99 namespace own = cv::gapi::own;
101 for (auto && it : ade::util::zip(ade::util::toRange(outputs),
102 ade::util::toRange(results)))
104 auto &out_obj = std::get<0>(it);
105 auto &res_obj = std::get<1>(it);
107 // FIXME: this conversion should be unified
108 using T = cv::GRunArgP;
109 switch (out_obj.index())
111 #if !defined(GAPI_STANDALONE)
112 case T::index_of<cv::Mat*>():
113 *cv::util::get<cv::Mat*>(out_obj) = std::move(cv::util::get<cv::Mat>(res_obj));
115 case T::index_of<cv::Scalar*>():
116 *cv::util::get<cv::Scalar*>(out_obj) = std::move(cv::util::get<cv::Scalar>(res_obj));
118 #endif // !GAPI_STANDALONE
119 case T::index_of<own::Mat*>():
120 *cv::util::get<own::Mat*>(out_obj) = std::move(cv::util::get<own::Mat>(res_obj));
122 case T::index_of<own::Scalar*>():
123 *cv::util::get<own::Scalar*>(out_obj) = std::move(cv::util::get<own::Scalar>(res_obj));
125 case T::index_of<cv::detail::VectorRef>():
126 cv::util::get<cv::detail::VectorRef>(out_obj).mov(cv::util::get<cv::detail::VectorRef>(res_obj));
129 GAPI_Assert(false && "This value type is not supported!"); // ...maybe because of STANDALONE mode.
135 // This thread is a plain dump source actor. What it do is just:
136 // - Check input queue (the only one) for a control command
137 // - Depending on the state, obtains next data object and pushes it to the
139 void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
141 std::vector<Q*> out_queues,
142 std::function<void()> cb_completion)
144 // Wait for the explicit Start command.
145 // ...or Stop command, this also happens.
148 GAPI_Assert( cv::util::holds_alternative<Start>(cmd)
149 || cv::util::holds_alternative<Stop>(cmd));
150 if (cv::util::holds_alternative<Stop>(cmd))
152 for (auto &&oq : out_queues) oq->push(cmd);
156 // Now start emitting the data from the source to the pipeline.
160 if (in_queue.try_pop(cancel))
162 // if we just popped a cancellation command...
163 GAPI_Assert(cv::util::holds_alternative<Stop>(cancel));
164 // Broadcast it to the readers and quit.
165 for (auto &&oq : out_queues) oq->push(cancel);
169 // Try to obrain next data chunk from the source
171 if (emitter->pull(data))
173 // // On success, broadcast it to our readers
174 for (auto &&oq : out_queues)
176 // FIXME: FOR SOME REASON, oq->push(Cmd{data}) doesn't work!!
177 // empty mats are arrived to the receivers!
178 // There may be a fatal bug in our variant!
179 const auto tmp = data;
185 // Otherwise, broadcast STOP message to our readers and quit.
186 // This usually means end-of-stream, so trigger a callback
187 for (auto &&oq : out_queues) oq->push(Cmd{Stop{}});
188 if (cb_completion) cb_completion();
194 // This thread is a plain dumb processing actor. What it do is just:
195 // - Reads input from the input queue(s), sleeps if there's nothing to read
196 // - Once a full input vector is obtained, passes it to the underlying island
197 // executable for processing.
198 // - Pushes processing results down to consumers - to the subsequent queues.
199 // Note: Every data object consumer has its own queue.
200 void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs, // FIXME: this is...
201 std::vector<cv::gimpl::RcDesc> out_rcs, // FIXME: ...basically just...
202 cv::GMetaArgs out_metas, // ...
203 std::shared_ptr<cv::gimpl::GIslandExecutable> island, // FIXME: ...a copy of OpDesc{}.
204 std::vector<Q*> in_queues,
205 std::vector<cv::GRunArg> in_constants,
206 std::vector< std::vector<Q*> > out_queues)
208 GAPI_Assert(in_queues.size() == in_rcs.size());
209 GAPI_Assert(out_queues.size() == out_rcs.size());
210 GAPI_Assert(out_queues.size() == out_metas.size());
213 std::vector<cv::gimpl::GIslandExecutable::InObj> isl_inputs;
214 isl_inputs.resize(in_rcs.size());
216 // Try to obtain the full input vector.
217 // Note this may block us. We also may get Stop signal here
218 // and then exit the thread.
219 // NOTE: in order to maintain the GRunArg's underlying object
220 // lifetime, keep the whole cmd vector (of size == # of inputs)
222 std::vector<Cmd> cmd(in_queues.size());
223 for (auto &&it : ade::util::indexed(in_queues))
225 auto id = ade::util::index(it);
226 auto &q = ade::util::value(it);
228 isl_inputs[id].first = in_rcs[id];
231 // NULL queue means a graph-constant value
232 // (like a value-initialized scalar)
233 // FIXME: Variant move problem
234 isl_inputs[id].second = const_cast<const cv::GRunArg&>(in_constants[id]);
239 if (cv::util::holds_alternative<Stop>(cmd[id]))
241 // FIXME: This logic must be unified with what collectorThread is doing!
242 // Just got a stop sign. Reiterate through all queues
243 // and rewind data to every Stop sign per queue
244 for (auto &&qit : ade::util::indexed(in_queues))
246 auto id2 = ade::util::index(qit);
247 auto &q2 = ade::util::value(qit);
248 if (id == id2) continue;
251 while (q2 && !cv::util::holds_alternative<Stop>(cmd2))
254 // Broadcast Stop down to the pipeline and quit
255 for (auto &&out_qq : out_queues)
257 for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
261 // FIXME: MOVE PROBLEM
262 const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd[id]);
263 #if defined(GAPI_STANDALONE)
264 // Standalone mode - simply store input argument in the vector as-is
265 isl_inputs[id].second = in_arg;
267 // Make Islands operate on own:: data types (i.e. in the same
268 // environment as GExecutor provides)
269 // This way several backends (e.g. Fluid) remain OpenCV-independent.
270 switch (in_arg.index()) {
271 case cv::GRunArg::index_of<cv::Mat>():
272 isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
274 case cv::GRunArg::index_of<cv::Scalar>():
275 isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Scalar>(in_arg))};
278 isl_inputs[id].second = in_arg;
281 #endif // GAPI_STANDALONE
284 // Once the vector is obtained, prepare data for island execution
285 // Note - we first allocate output vector via GRunArg!
286 // Then it is converted to a GRunArgP.
287 std::vector<cv::gimpl::GIslandExecutable::OutObj> isl_outputs;
288 std::vector<cv::GRunArg> out_data;
289 isl_outputs.resize(out_rcs.size());
290 out_data.resize(out_rcs.size());
291 for (auto &&it : ade::util::indexed(out_rcs))
293 auto id = ade::util::index(it);
294 auto &r = ade::util::value(it);
296 #if !defined(GAPI_STANDALONE)
297 using MatType = cv::Mat;
298 using SclType = cv::Scalar;
300 using MatType = cv::gapi::own::Mat;
301 using SclType = cv::gapi::own::Scalar;
302 #endif // GAPI_STANDALONE
305 // Allocate a data object based on its shape & meta, and put it into our vectors.
306 // Yes, first we put a cv::Mat GRunArg, and then specify _THAT_
307 // pointer as an output parameter - to make sure that after island completes,
308 // our GRunArg still has the right (up-to-date) value.
309 // Same applies to other types.
310 // FIXME: This is absolutely ugly but seem to work perfectly for its purpose.
311 case cv::GShape::GMAT:
314 cv::gimpl::createMat(cv::util::get<cv::GMatDesc>(out_metas[id]), newMat);
315 out_data[id] = cv::GRunArg(std::move(newMat));
316 isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<MatType>(out_data[id])) };
319 case cv::GShape::GSCALAR:
322 out_data[id] = cv::GRunArg(std::move(newScl));
323 isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<SclType>(out_data[id])) };
326 case cv::GShape::GARRAY:
328 cv::detail::VectorRef newVec;
329 cv::util::get<cv::detail::ConstructVec>(r.ctor)(newVec);
330 out_data[id] = cv::GRunArg(std::move(newVec));
331 // VectorRef is implicitly shared so no pointer is taken here
332 const auto &rr = cv::util::get<cv::detail::VectorRef>(out_data[id]); // FIXME: that variant MOVE problem again
333 isl_outputs[id] = { r, cv::GRunArgP(rr) };
337 cv::util::throw_error(std::logic_error("Unsupported GShape"));
341 // Now ask Island to execute on this data
342 island->run(std::move(isl_inputs), std::move(isl_outputs));
344 // Once executed, dispatch our results down to the pipeline.
345 for (auto &&it : ade::util::zip(ade::util::toRange(out_queues),
346 ade::util::toRange(out_data)))
348 for (auto &&q : std::get<0>(it))
350 // FIXME: FATAL VARIANT ISSUE!!
351 const auto tmp = std::get<1>(it);
358 // The idea of collectorThread is easy. If there're multiple outputs
359 // in the graph, we need to pull an object from every associated queue
360 // and then put the resulting vector into one single queue. While it
361 // looks redundant, it simplifies dramatically the way how try_pull()
362 // is implemented - we need to check one queue instead of many.
363 void collectorThread(std::vector<Q*> in_queues,
368 cv::GRunArgs this_result(in_queues.size());
369 for (auto &&it : ade::util::indexed(in_queues))
372 ade::util::value(it)->pop(cmd);
373 if (cv::util::holds_alternative<Stop>(cmd))
375 // FIXME: Unify this code with island thread
376 for (auto &&qit : ade::util::indexed(in_queues))
378 if (ade::util::index(qit) == ade::util::index(it)) continue;
380 while (!cv::util::holds_alternative<Stop>(cmd2))
381 ade::util::value(qit)->pop(cmd2);
383 out_queue.push(Cmd{Stop{}});
388 // FIXME: MOVE_PROBLEM
389 const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd);
390 this_result[ade::util::index(it)] = in_arg;
391 // FIXME: Check for other message types.
394 out_queue.push(Cmd{this_result});
397 } // anonymous namespace
399 cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model)
400 : m_orig_graph(std::move(g_model))
401 , m_island_graph(GModel::Graph(*m_orig_graph).metadata()
402 .get<IslandModel>().model)
403 , m_gim(*m_island_graph)
405 GModel::Graph gm(*m_orig_graph);
406 // NB: Right now GIslandModel is acyclic, and all the below code assumes that.
407 // NB: This naive execution code is taken from GExecutor nearly "as-is"
409 const auto proto = gm.metadata().get<Protocol>();
410 m_emitters .resize(proto.in_nhs.size());
411 m_emitter_queues.resize(proto.in_nhs.size());
412 m_sinks .resize(proto.out_nhs.size());
413 m_sink_queues .resize(proto.out_nhs.size());
415 // Very rough estimation to limit internal queue sizes.
416 // Pipeline depth is equal to number of its (pipeline) steps.
417 const auto queue_capacity = std::count_if
418 (m_gim.nodes().begin(),
420 [&](ade::NodeHandle nh) {
421 return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
424 // If metadata was not passed to compileStreaming, Islands are not compiled at this point.
425 // It is fine -- Islands are then compiled in setSource (at the first valid call).
426 const bool islands_compiled = m_gim.metadata().contains<IslandsCompiled>();
428 auto sorted = m_gim.metadata().get<ade::passes::TopologicalSortData>();
429 for (auto nh : sorted.nodes())
431 switch (m_gim.metadata(nh).get<NodeKind>().k)
433 case NodeKind::ISLAND:
435 std::vector<RcDesc> input_rcs;
436 std::vector<RcDesc> output_rcs;
437 std::vector<GRunArg> in_constants;
438 cv::GMetaArgs output_metas;
439 input_rcs.reserve(nh->inNodes().size());
440 in_constants.reserve(nh->inNodes().size()); // FIXME: Ugly
441 output_rcs.reserve(nh->outNodes().size());
442 output_metas.reserve(nh->outNodes().size());
444 std::unordered_set<ade::NodeHandle, ade::HandleHasher<ade::Node> > const_ins;
446 // FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER!
447 // FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!!
448 auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec)
450 const auto orig_data_nh
451 = m_gim.metadata(slot_nh).get<DataSlot>().original_data_node;
452 const auto &orig_data_info
453 = gm.metadata(orig_data_nh).get<Data>();
454 if (orig_data_info.storage == Data::Storage::CONST_VAL) {
455 const_ins.insert(slot_nh);
456 // FIXME: Variant move issue
457 in_constants.push_back(const_cast<const cv::GRunArg&>(gm.metadata(orig_data_nh).get<ConstValue>().arg));
458 } else in_constants.push_back(cv::GRunArg{}); // FIXME: Make it in some smarter way pls
459 if (orig_data_info.shape == GShape::GARRAY) {
460 // FIXME: GArray lost host constructor problem
461 GAPI_Assert(!cv::util::holds_alternative<cv::util::monostate>(orig_data_info.ctor));
463 vec.emplace_back(RcDesc{ orig_data_info.rc
464 , orig_data_info.shape
465 , orig_data_info.ctor});
467 auto xtract_out = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec, cv::GMetaArgs &metas)
469 const auto orig_data_nh
470 = m_gim.metadata(slot_nh).get<DataSlot>().original_data_node;
471 const auto &orig_data_info
472 = gm.metadata(orig_data_nh).get<Data>();
473 if (orig_data_info.shape == GShape::GARRAY) {
474 // FIXME: GArray lost host constructor problem
475 GAPI_Assert(!cv::util::holds_alternative<cv::util::monostate>(orig_data_info.ctor));
477 vec.emplace_back(RcDesc{ orig_data_info.rc
478 , orig_data_info.shape
479 , orig_data_info.ctor});
480 metas.emplace_back(orig_data_info.meta);
482 // FIXME: JEZ IT WAS SO AWFUL!!!!
483 for (auto in_slot_nh : nh->inNodes()) xtract_in(in_slot_nh, input_rcs);
484 for (auto out_slot_nh : nh->outNodes()) xtract_out(out_slot_nh, output_rcs, output_metas);
486 std::shared_ptr<GIslandExecutable> isl_exec = islands_compiled
487 ? m_gim.metadata(nh).get<IslandExec>().object
489 m_ops.emplace_back(OpDesc{ std::move(input_rcs)
490 , std::move(output_rcs)
491 , std::move(output_metas)
496 // Initialize queues for every operation's input
497 ade::TypedGraph<DataQueue> qgr(*m_island_graph);
498 for (auto eh : nh->inEdges())
500 // ...only if the data is not compile-const
501 if (const_ins.count(eh->srcNode()) == 0) {
502 qgr.metadata(eh).set(DataQueue(queue_capacity));
503 m_internal_queues.insert(&qgr.metadata(eh).get<DataQueue>().q);
510 const auto orig_data_nh
511 = m_gim.metadata(nh).get<DataSlot>().original_data_node;
512 m_slots.emplace_back(DataDesc{nh, orig_data_nh});
517 const auto emitter_idx
518 = m_gim.metadata(nh).get<Emitter>().proto_index;
519 GAPI_Assert(emitter_idx < m_emitters.size());
520 m_emitters[emitter_idx] = nh;
526 = m_gim.metadata(nh).get<Sink>().proto_index;
527 GAPI_Assert(sink_idx < m_sinks.size());
528 m_sinks[sink_idx] = nh;
530 // Also initialize Sink's input queue
531 ade::TypedGraph<DataQueue> qgr(*m_island_graph);
532 GAPI_Assert(nh->inEdges().size() == 1u);
533 qgr.metadata(nh->inEdges().front()).set(DataQueue(queue_capacity));
534 m_sink_queues[sink_idx] = &qgr.metadata(nh->inEdges().front()).get<DataQueue>().q;
542 m_out_queue.set_capacity(queue_capacity);
545 cv::gimpl::GStreamingExecutor::~GStreamingExecutor()
547 if (state == State::READY || state == State::RUNNING)
551 void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
553 GAPI_Assert(state == State::READY || state == State::STOPPED);
555 const auto is_video = [](const GRunArg &arg)
557 return util::holds_alternative<cv::gapi::wip::IStreamSource::Ptr>(arg);
559 const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video);
562 // See below why (another reason - no documented behavior
563 // on handling videos streams of different length)
564 util::throw_error(std::logic_error("Only one video source is"
565 " currently supported!"));
568 GModel::ConstGraph gm(*m_orig_graph);
569 // Now the tricky-part: completing Islands compilation if compileStreaming
570 // has been called without meta arguments.
571 // The logic is basically the following:
572 // - (0) Collect metadata from input vector;
573 // - (1) If graph is compiled with meta
574 // - (2) Just check if the passed objects have correct meta.
576 // - (4) Run metadata inference;
577 // - (5) If islands are not compiled at this point OR are not reshapeable:
578 // - (6) Compile them for a first time with this meta;
579 // - (7) Update internal structures with this island information
581 // - (9) Reshape islands to this new metadata.
582 // - (10) Update internal structures again
583 const auto update_int_metas = [&]()
585 for (auto& op : m_ops)
587 op.out_metas.resize(0);
588 for (auto out_slot_nh : op.nh->outNodes())
590 const auto &orig_nh = m_gim.metadata(out_slot_nh).get<DataSlot>().original_data_node;
591 const auto &orig_info = gm.metadata(orig_nh).get<Data>();
592 op.out_metas.emplace_back(orig_info.meta);
596 const auto new_meta = cv::descr_of(ins); // 0
597 if (gm.metadata().contains<OriginalInputMeta>()) // (1)
599 // NB: Metadata is tested in setSource() already - just put an assert here
600 GAPI_Assert(new_meta == gm.metadata().get<OriginalInputMeta>().inputMeta); // (2)
604 GCompiler::runMetaPasses(*m_orig_graph.get(), new_meta); // (4)
605 if (!m_gim.metadata().contains<IslandsCompiled>()
606 || (m_reshapable.has_value() && m_reshapable.value() == false)) // (5)
608 bool is_reshapable = true;
609 GCompiler::compileIslands(*m_orig_graph.get(), m_comp_args); // (6)
610 for (auto& op : m_ops)
612 op.isl_exec = m_gim.metadata(op.nh).get<IslandExec>().object;
613 is_reshapable &= op.isl_exec->canReshape();
615 update_int_metas(); // (7)
616 m_reshapable = util::make_optional(is_reshapable);
620 for (auto& op : m_ops)
622 op.isl_exec->reshape(*m_orig_graph, m_comp_args); // (9)
624 update_int_metas(); // (10)
627 // Metadata handling is done!
629 // Walk through the protocol, set-up emitters appropriately
630 // There's a 1:1 mapping between emitters and corresponding data inputs.
631 for (auto it : ade::util::zip(ade::util::toRange(m_emitters),
632 ade::util::toRange(ins),
633 ade::util::iota(m_emitters.size())))
635 auto emit_nh = std::get<0>(it);
636 auto& emit_arg = std::get<1>(it);
637 auto emit_idx = std::get<2>(it);
638 auto& emitter = m_gim.metadata(emit_nh).get<Emitter>().object;
641 switch (emit_arg.index())
643 // Create a streaming emitter.
644 // Produces the next video frame when pulled.
645 case T::index_of<cv::gapi::wip::IStreamSource::Ptr>():
646 #if !defined(GAPI_STANDALONE)
647 emitter.reset(new VideoEmitter{emit_arg});
649 util::throw_error(std::logic_error("Video is not supported in the "
654 // Create a constant emitter.
655 // Produces always the same ("constant") value when pulled.
656 emitter.reset(new ConstEmitter{emit_arg});
657 m_const_emitter_queues.push_back(&m_emitter_queues[emit_idx]);
662 // FIXME: The below code assumes our graph may have only one
663 // real video source (and so, only one stream which may really end)
664 // all other inputs are "constant" generators.
665 // Craft here a completion callback to notify Const emitters that
666 // a video source is over
667 auto real_video_completion_cb = [this]()
669 for (auto q : m_const_emitter_queues) q->push(Cmd{Stop{}});
672 // FIXME: ONLY now, after all executable objects are created,
673 // we can set up our execution threads. Let's do it.
674 // First create threads for all the emitters.
675 // FIXME: One way to avoid this may be including an Emitter object as a part of
676 // START message. Why not?
677 if (state == State::READY)
682 for (auto it : ade::util::indexed(m_emitters))
684 const auto id = ade::util::index(it); // = index in GComputation's protocol
685 const auto eh = ade::util::value(it);
687 // Prepare emitter thread parameters
688 auto emitter = m_gim.metadata(eh).get<Emitter>().object;
690 // Collect all reader queues from the emitter's the only output object
691 auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front());
693 m_threads.emplace_back(emitterActorThread,
695 std::ref(m_emitter_queues[id]),
697 real_video_completion_cb);
701 // Now do this for every island (in a topological order)
702 for (auto &&op : m_ops)
704 // Prepare island thread parameters
705 auto island = m_gim.metadata(op.nh).get<IslandExec>().object;
707 // Collect actor's input queues
708 auto in_queues = input_queues(*m_island_graph, op.nh);
710 // Collect actor's output queues.
711 // This may be tricky...
712 std::vector< std::vector<stream::Q*> > out_queues;
713 for (auto &&out_eh : op.nh->outNodes()) {
714 out_queues.push_back(reader_queues(*m_island_graph, out_eh));
717 m_threads.emplace_back(islandActorThread,
727 // Finally, start a collector thread.
728 m_threads.emplace_back(collectorThread,
730 std::ref(m_out_queue));
731 state = State::READY;
734 void cv::gimpl::GStreamingExecutor::start()
736 if (state == State::STOPPED)
738 util::throw_error(std::logic_error("Please call setSource() before start() "
739 "if the pipeline has been already stopped"));
741 GAPI_Assert(state == State::READY);
743 // Currently just trigger our emitters to work
744 state = State::RUNNING;
745 for (auto &q : m_emitter_queues)
747 q.push(stream::Cmd{stream::Start{}});
751 void cv::gimpl::GStreamingExecutor::wait_shutdown()
753 // This utility is used by pull/try_pull/stop() to uniformly
754 // shutdown the worker threads.
755 // FIXME: Of course it can be designed much better
756 for (auto &t : m_threads) t.join();
760 // If there are constant emitters, internal queues
761 // may be polluted with constant values and have extra
762 // data at the point of shutdown.
763 // It usually happens when there's multiple inputs,
764 // one constant and one is not, and the latter ends (e.g.
765 // with end-of-stream).
766 for (auto &q : m_emitter_queues) q.clear();
767 for (auto &q : m_sink_queues) q->clear();
768 for (auto &q : m_internal_queues) q->clear();
771 state = State::STOPPED;
774 bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs)
776 if (state == State::STOPPED)
778 GAPI_Assert(state == State::RUNNING);
779 GAPI_Assert(m_sink_queues.size() == outs.size());
782 m_out_queue.pop(cmd);
783 if (cv::util::holds_alternative<Stop>(cmd))
789 GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(cmd));
790 cv::GRunArgs &this_result = cv::util::get<cv::GRunArgs>(cmd);
791 sync_data(this_result, outs);
795 bool cv::gimpl::GStreamingExecutor::try_pull(cv::GRunArgsP &&outs)
797 if (state == State::STOPPED)
800 GAPI_Assert(m_sink_queues.size() == outs.size());
803 if (!m_out_queue.try_pop(cmd)) {
806 if (cv::util::holds_alternative<Stop>(cmd))
812 GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(cmd));
813 cv::GRunArgs &this_result = cv::util::get<cv::GRunArgs>(cmd);
814 sync_data(this_result, outs);
818 void cv::gimpl::GStreamingExecutor::stop()
820 if (state == State::STOPPED)
823 // FIXME: ...and how to deal with still-unread data then?
824 // Push a Stop message to the every emitter,
825 // wait until it broadcasts within the pipeline,
826 // FIXME: worker threads could stuck on push()!
827 // need to read the output queues until Stop!
828 for (auto &q : m_emitter_queues) {
829 q.push(stream::Cmd{stream::Stop{}});
832 // Pull messages from the final queue to ensure completion
834 while (!cv::util::holds_alternative<Stop>(cmd))
836 m_out_queue.pop(cmd);
838 GAPI_Assert(cv::util::holds_alternative<Stop>(cmd));
842 bool cv::gimpl::GStreamingExecutor::running() const
844 return (state == State::RUNNING);