9444ffbe98ac419657225926dceb3bdb49b9c872
[platform/upstream/opencv.git] / modules / gapi / src / executor / gstreamingexecutor.cpp
1 // This file is part of OpenCV project.
2 // It is subject to the license terms in the LICENSE file found in the top-level directory
3 // of this distribution and at http://opencv.org/license.html.
4 //
5 // Copyright (C) 2019 Intel Corporation
6
7 #include "precomp.hpp"
8
9 #include <iostream>
10
11 #include <ade/util/zip_range.hpp>
12
13 #include <opencv2/gapi/opencv_includes.hpp>
14
15 #include "executor/gstreamingexecutor.hpp"
16 #include "compiler/passes/passes.hpp"
17 #include "backends/common/gbackend.hpp" // createMat
18 #include "compiler/gcompiler.hpp" // for compileIslands
19
20 namespace
21 {
22 using namespace cv::gimpl::stream;
23
24 #if !defined(GAPI_STANDALONE)
25 class VideoEmitter final: public cv::gimpl::GIslandEmitter {
26     cv::gapi::wip::IStreamSource::Ptr src;
27
28     virtual bool pull(cv::GRunArg &arg) override {
29         // FIXME: probably we can maintain a pool of (then) pre-allocated
30         // buffers to avoid runtime allocations.
31         // Pool size can be determined given the internal queue size.
32         cv::gapi::wip::Data newData;
33         if (!src->pull(newData)) {
34             return false;
35         }
36         arg = std::move(static_cast<cv::GRunArg&>(newData));
37         return true;
38     }
39 public:
40     explicit VideoEmitter(const cv::GRunArg &arg) {
41         src = cv::util::get<cv::gapi::wip::IStreamSource::Ptr>(arg);
42     }
43 };
44 #endif // GAPI_STANDALONE
45
46 class ConstEmitter final: public cv::gimpl::GIslandEmitter {
47     cv::GRunArg m_arg;
48
49     virtual bool pull(cv::GRunArg &arg) override {
50         arg = const_cast<const cv::GRunArg&>(m_arg); // FIXME: variant workaround
51         return true;
52     }
53 public:
54
55     explicit ConstEmitter(const cv::GRunArg &arg) : m_arg(arg) {
56     }
57 };
58
59 struct DataQueue {
60     static const char *name() { return "StreamingDataQueue"; }
61
62     explicit DataQueue(std::size_t capacity) {
63         if (capacity) {
64             q.set_capacity(capacity);
65         }
66     }
67
68     cv::gimpl::stream::Q q;
69 };
70
71 std::vector<cv::gimpl::stream::Q*> reader_queues(      ade::Graph &g,
72                                                  const ade::NodeHandle &obj)
73 {
74     ade::TypedGraph<DataQueue> qgr(g);
75     std::vector<cv::gimpl::stream::Q*> result;
76     for (auto &&out_eh : obj->outEdges())
77     {
78         result.push_back(&qgr.metadata(out_eh).get<DataQueue>().q);
79     }
80     return result;
81 }
82
83 std::vector<cv::gimpl::stream::Q*> input_queues(      ade::Graph &g,
84                                                 const ade::NodeHandle &obj)
85 {
86     ade::TypedGraph<DataQueue> qgr(g);
87     std::vector<cv::gimpl::stream::Q*> result;
88     for (auto &&in_eh : obj->inEdges())
89     {
90         result.push_back(qgr.metadata(in_eh).contains<DataQueue>()
91                          ? &qgr.metadata(in_eh).get<DataQueue>().q
92                          : nullptr);
93     }
94     return result;
95 }
96
97 void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs)
98 {
99     namespace own = cv::gapi::own;
100
101     for (auto && it : ade::util::zip(ade::util::toRange(outputs),
102                                      ade::util::toRange(results)))
103     {
104         auto &out_obj = std::get<0>(it);
105         auto &res_obj = std::get<1>(it);
106
107         // FIXME: this conversion should be unified
108         using T = cv::GRunArgP;
109         switch (out_obj.index())
110         {
111 #if !defined(GAPI_STANDALONE)
112         case T::index_of<cv::Mat*>():
113             *cv::util::get<cv::Mat*>(out_obj) = std::move(cv::util::get<cv::Mat>(res_obj));
114             break;
115         case T::index_of<cv::Scalar*>():
116             *cv::util::get<cv::Scalar*>(out_obj) = std::move(cv::util::get<cv::Scalar>(res_obj));
117             break;
118 #endif // !GAPI_STANDALONE
119         case T::index_of<own::Mat*>():
120             *cv::util::get<own::Mat*>(out_obj) = std::move(cv::util::get<own::Mat>(res_obj));
121             break;
122         case T::index_of<own::Scalar*>():
123             *cv::util::get<own::Scalar*>(out_obj) = std::move(cv::util::get<own::Scalar>(res_obj));
124             break;
125         case T::index_of<cv::detail::VectorRef>():
126             cv::util::get<cv::detail::VectorRef>(out_obj).mov(cv::util::get<cv::detail::VectorRef>(res_obj));
127             break;
128         default:
129             GAPI_Assert(false && "This value type is not supported!"); // ...maybe because of STANDALONE mode.
130             break;
131         }
132     }
133 }
134
135 // This thread is a plain dump source actor. What it do is just:
136 // - Check input queue (the only one) for a control command
137 // - Depending on the state, obtains next data object and pushes it to the
138 //   pipeline.
139 void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
140                         Q& in_queue,
141                         std::vector<Q*> out_queues,
142                         std::function<void()> cb_completion)
143 {
144     // Wait for the explicit Start command.
145     // ...or Stop command, this also happens.
146     Cmd cmd;
147     in_queue.pop(cmd);
148     GAPI_Assert(   cv::util::holds_alternative<Start>(cmd)
149                 || cv::util::holds_alternative<Stop>(cmd));
150     if (cv::util::holds_alternative<Stop>(cmd))
151     {
152         for (auto &&oq : out_queues) oq->push(cmd);
153         return;
154     }
155
156     // Now start emitting the data from the source to the pipeline.
157     while (true)
158     {
159         Cmd cancel;
160         if (in_queue.try_pop(cancel))
161         {
162             // if we just popped a cancellation command...
163             GAPI_Assert(cv::util::holds_alternative<Stop>(cancel));
164             // Broadcast it to the readers and quit.
165             for (auto &&oq : out_queues) oq->push(cancel);
166             return;
167         }
168
169         // Try to obrain next data chunk from the source
170         cv::GRunArg data;
171         if (emitter->pull(data))
172         {
173             // // On success, broadcast it to our readers
174             for (auto &&oq : out_queues)
175             {
176                 // FIXME: FOR SOME REASON, oq->push(Cmd{data}) doesn't work!!
177                 // empty mats are arrived to the receivers!
178                 // There may be a fatal bug in our variant!
179                 const auto tmp = data;
180                 oq->push(Cmd{tmp});
181             }
182         }
183         else
184         {
185             // Otherwise, broadcast STOP message to our readers and quit.
186             // This usually means end-of-stream, so trigger a callback
187             for (auto &&oq : out_queues) oq->push(Cmd{Stop{}});
188             if (cb_completion) cb_completion();
189             return;
190         }
191     }
192 }
193
194 // This thread is a plain dumb processing actor. What it do is just:
195 // - Reads input from the input queue(s), sleeps if there's nothing to read
196 // - Once a full input vector is obtained, passes it to the underlying island
197 //   executable for processing.
198 // - Pushes processing results down to consumers - to the subsequent queues.
199 //   Note: Every data object consumer has its own queue.
200 void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs,                // FIXME: this is...
201                        std::vector<cv::gimpl::RcDesc> out_rcs,               // FIXME: ...basically just...
202                        cv::GMetaArgs out_metas,                              // ...
203                        std::shared_ptr<cv::gimpl::GIslandExecutable> island, // FIXME: ...a copy of OpDesc{}.
204                        std::vector<Q*> in_queues,
205                        std::vector<cv::GRunArg> in_constants,
206                        std::vector< std::vector<Q*> > out_queues)
207 {
208     GAPI_Assert(in_queues.size() == in_rcs.size());
209     GAPI_Assert(out_queues.size() == out_rcs.size());
210     GAPI_Assert(out_queues.size() == out_metas.size());
211     while (true)
212     {
213         std::vector<cv::gimpl::GIslandExecutable::InObj> isl_inputs;
214         isl_inputs.resize(in_rcs.size());
215
216         // Try to obtain the full input vector.
217         // Note this may block us. We also may get Stop signal here
218         // and then exit the thread.
219         // NOTE: in order to maintain the GRunArg's underlying object
220         // lifetime, keep the whole cmd vector (of size == # of inputs)
221         // in memory.
222         std::vector<Cmd> cmd(in_queues.size());
223         for (auto &&it : ade::util::indexed(in_queues))
224         {
225             auto id = ade::util::index(it);
226             auto &q = ade::util::value(it);
227
228             isl_inputs[id].first  = in_rcs[id];
229             if (q == nullptr)
230             {
231                 // NULL queue means a graph-constant value
232                 // (like a value-initialized scalar)
233                 // FIXME: Variant move problem
234                 isl_inputs[id].second = const_cast<const cv::GRunArg&>(in_constants[id]);
235             }
236             else
237             {
238                 q->pop(cmd[id]);
239                 if (cv::util::holds_alternative<Stop>(cmd[id]))
240                 {
241                     // FIXME: This logic must be unified with what collectorThread is doing!
242                     // Just got a stop sign. Reiterate through all queues
243                     // and rewind data to every Stop sign per queue
244                     for (auto &&qit : ade::util::indexed(in_queues))
245                     {
246                         auto id2 = ade::util::index(qit);
247                         auto &q2 = ade::util::value(qit);
248                         if (id == id2) continue;
249
250                         Cmd cmd2;
251                         while (q2 && !cv::util::holds_alternative<Stop>(cmd2))
252                             q2->pop(cmd2);
253                     }
254                     // Broadcast Stop down to the pipeline and quit
255                     for (auto &&out_qq : out_queues)
256                     {
257                         for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
258                     }
259                     return;
260                 }
261                 // FIXME: MOVE PROBLEM
262                 const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd[id]);
263 #if defined(GAPI_STANDALONE)
264                 // Standalone mode - simply store input argument in the vector as-is
265                 isl_inputs[id].second = in_arg;
266 #else
267                 // Make Islands operate on own:: data types (i.e. in the same
268                 // environment as GExecutor provides)
269                 // This way several backends (e.g. Fluid) remain OpenCV-independent.
270                 switch (in_arg.index()) {
271                 case cv::GRunArg::index_of<cv::Mat>():
272                     isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
273                     break;
274                 case cv::GRunArg::index_of<cv::Scalar>():
275                     isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Scalar>(in_arg))};
276                     break;
277                 default:
278                     isl_inputs[id].second = in_arg;
279                     break;
280                 }
281 #endif // GAPI_STANDALONE
282             }
283         }
284         // Once the vector is obtained, prepare data for island execution
285         // Note - we first allocate output vector via GRunArg!
286         // Then it is converted to a GRunArgP.
287         std::vector<cv::gimpl::GIslandExecutable::OutObj> isl_outputs;
288         std::vector<cv::GRunArg> out_data;
289         isl_outputs.resize(out_rcs.size());
290         out_data.resize(out_rcs.size());
291         for (auto &&it : ade::util::indexed(out_rcs))
292         {
293             auto id = ade::util::index(it);
294             auto &r = ade::util::value(it);
295
296 #if !defined(GAPI_STANDALONE)
297             using MatType = cv::Mat;
298             using SclType = cv::Scalar;
299 #else
300             using MatType = cv::gapi::own::Mat;
301             using SclType = cv::gapi::own::Scalar;
302 #endif // GAPI_STANDALONE
303
304             switch (r.shape) {
305                 // Allocate a data object based on its shape & meta, and put it into our vectors.
306                 // Yes, first we put a cv::Mat GRunArg, and then specify _THAT_
307                 // pointer as an output parameter - to make sure that after island completes,
308                 // our GRunArg still has the right (up-to-date) value.
309                 // Same applies to other types.
310                 // FIXME: This is absolutely ugly but seem to work perfectly for its purpose.
311             case cv::GShape::GMAT:
312                 {
313                     MatType newMat;
314                     cv::gimpl::createMat(cv::util::get<cv::GMatDesc>(out_metas[id]), newMat);
315                     out_data[id] = cv::GRunArg(std::move(newMat));
316                     isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<MatType>(out_data[id])) };
317                 }
318                 break;
319             case cv::GShape::GSCALAR:
320                 {
321                     SclType newScl;
322                     out_data[id] = cv::GRunArg(std::move(newScl));
323                     isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<SclType>(out_data[id])) };
324                 }
325                 break;
326             case cv::GShape::GARRAY:
327                 {
328                     cv::detail::VectorRef newVec;
329                     cv::util::get<cv::detail::ConstructVec>(r.ctor)(newVec);
330                     out_data[id] = cv::GRunArg(std::move(newVec));
331                     // VectorRef is implicitly shared so no pointer is taken here
332                     const auto &rr = cv::util::get<cv::detail::VectorRef>(out_data[id]); // FIXME: that variant MOVE problem again
333                     isl_outputs[id] = { r, cv::GRunArgP(rr) };
334                 }
335                 break;
336             default:
337                 cv::util::throw_error(std::logic_error("Unsupported GShape"));
338                 break;
339             }
340         }
341         // Now ask Island to execute on this data
342         island->run(std::move(isl_inputs), std::move(isl_outputs));
343
344         // Once executed, dispatch our results down to the pipeline.
345         for (auto &&it : ade::util::zip(ade::util::toRange(out_queues),
346                                         ade::util::toRange(out_data)))
347         {
348             for (auto &&q : std::get<0>(it))
349             {
350                 // FIXME: FATAL VARIANT ISSUE!!
351                 const auto tmp = std::get<1>(it);
352                 q->push(Cmd{tmp});
353             }
354         }
355     }
356 }
357
358 // The idea of collectorThread is easy.  If there're multiple outputs
359 // in the graph, we need to pull an object from every associated queue
360 // and then put the resulting vector into one single queue.  While it
361 // looks redundant, it simplifies dramatically the way how try_pull()
362 // is implemented - we need to check one queue instead of many.
363 void collectorThread(std::vector<Q*> in_queues,
364                      Q&              out_queue)
365 {
366     while (true)
367     {
368         cv::GRunArgs this_result(in_queues.size());
369         for (auto &&it : ade::util::indexed(in_queues))
370         {
371             Cmd cmd;
372             ade::util::value(it)->pop(cmd);
373             if (cv::util::holds_alternative<Stop>(cmd))
374             {
375                 // FIXME: Unify this code with island thread
376                 for (auto &&qit : ade::util::indexed(in_queues))
377                 {
378                     if (ade::util::index(qit) == ade::util::index(it)) continue;
379                     Cmd cmd2;
380                     while (!cv::util::holds_alternative<Stop>(cmd2))
381                         ade::util::value(qit)->pop(cmd2);
382                 }
383                 out_queue.push(Cmd{Stop{}});
384                 return;
385             }
386             else
387             {
388                 // FIXME: MOVE_PROBLEM
389                 const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd);
390                 this_result[ade::util::index(it)] = in_arg;
391                 // FIXME: Check for other message types.
392             }
393         }
394         out_queue.push(Cmd{this_result});
395     }
396 }
397 } // anonymous namespace
398
399 cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model)
400     : m_orig_graph(std::move(g_model))
401     , m_island_graph(GModel::Graph(*m_orig_graph).metadata()
402                      .get<IslandModel>().model)
403     , m_gim(*m_island_graph)
404 {
405     GModel::Graph gm(*m_orig_graph);
406     // NB: Right now GIslandModel is acyclic, and all the below code assumes that.
407     // NB: This naive execution code is taken from GExecutor nearly "as-is"
408
409     const auto proto = gm.metadata().get<Protocol>();
410     m_emitters      .resize(proto.in_nhs.size());
411     m_emitter_queues.resize(proto.in_nhs.size());
412     m_sinks         .resize(proto.out_nhs.size());
413     m_sink_queues   .resize(proto.out_nhs.size());
414
415     // Very rough estimation to limit internal queue sizes.
416     // Pipeline depth is equal to number of its (pipeline) steps.
417     const auto queue_capacity = std::count_if
418         (m_gim.nodes().begin(),
419          m_gim.nodes().end(),
420          [&](ade::NodeHandle nh) {
421             return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
422          });
423
424     // If metadata was not passed to compileStreaming, Islands are not compiled at this point.
425     // It is fine -- Islands are then compiled in setSource (at the first valid call).
426     const bool islands_compiled = m_gim.metadata().contains<IslandsCompiled>();
427
428     auto sorted = m_gim.metadata().get<ade::passes::TopologicalSortData>();
429     for (auto nh : sorted.nodes())
430     {
431         switch (m_gim.metadata(nh).get<NodeKind>().k)
432         {
433         case NodeKind::ISLAND:
434             {
435                 std::vector<RcDesc> input_rcs;
436                 std::vector<RcDesc> output_rcs;
437                 std::vector<GRunArg> in_constants;
438                 cv::GMetaArgs output_metas;
439                 input_rcs.reserve(nh->inNodes().size());
440                 in_constants.reserve(nh->inNodes().size()); // FIXME: Ugly
441                 output_rcs.reserve(nh->outNodes().size());
442                 output_metas.reserve(nh->outNodes().size());
443
444                 std::unordered_set<ade::NodeHandle, ade::HandleHasher<ade::Node> > const_ins;
445
446                 // FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER!
447                 // FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!!
448                 auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec)
449                 {
450                     const auto orig_data_nh
451                         = m_gim.metadata(slot_nh).get<DataSlot>().original_data_node;
452                     const auto &orig_data_info
453                         = gm.metadata(orig_data_nh).get<Data>();
454                     if (orig_data_info.storage == Data::Storage::CONST_VAL) {
455                         const_ins.insert(slot_nh);
456                         // FIXME: Variant move issue
457                         in_constants.push_back(const_cast<const cv::GRunArg&>(gm.metadata(orig_data_nh).get<ConstValue>().arg));
458                     } else in_constants.push_back(cv::GRunArg{}); // FIXME: Make it in some smarter way pls
459                     if (orig_data_info.shape == GShape::GARRAY) {
460                         // FIXME: GArray lost host constructor problem
461                         GAPI_Assert(!cv::util::holds_alternative<cv::util::monostate>(orig_data_info.ctor));
462                     }
463                     vec.emplace_back(RcDesc{ orig_data_info.rc
464                                            , orig_data_info.shape
465                                            , orig_data_info.ctor});
466                 };
467                 auto xtract_out = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec, cv::GMetaArgs &metas)
468                 {
469                     const auto orig_data_nh
470                         = m_gim.metadata(slot_nh).get<DataSlot>().original_data_node;
471                     const auto &orig_data_info
472                         = gm.metadata(orig_data_nh).get<Data>();
473                     if (orig_data_info.shape == GShape::GARRAY) {
474                         // FIXME: GArray lost host constructor problem
475                         GAPI_Assert(!cv::util::holds_alternative<cv::util::monostate>(orig_data_info.ctor));
476                     }
477                     vec.emplace_back(RcDesc{ orig_data_info.rc
478                                            , orig_data_info.shape
479                                            , orig_data_info.ctor});
480                     metas.emplace_back(orig_data_info.meta);
481                 };
482                 // FIXME: JEZ IT WAS SO AWFUL!!!!
483                 for (auto in_slot_nh  : nh->inNodes())  xtract_in(in_slot_nh,  input_rcs);
484                 for (auto out_slot_nh : nh->outNodes()) xtract_out(out_slot_nh, output_rcs, output_metas);
485
486                 std::shared_ptr<GIslandExecutable> isl_exec = islands_compiled
487                     ? m_gim.metadata(nh).get<IslandExec>().object
488                     : nullptr;
489                 m_ops.emplace_back(OpDesc{ std::move(input_rcs)
490                                          , std::move(output_rcs)
491                                          , std::move(output_metas)
492                                          , nh
493                                          , in_constants
494                                          , isl_exec
495                                          });
496                 // Initialize queues for every operation's input
497                 ade::TypedGraph<DataQueue> qgr(*m_island_graph);
498                 for (auto eh : nh->inEdges())
499                 {
500                     // ...only if the data is not compile-const
501                     if (const_ins.count(eh->srcNode()) == 0) {
502                         qgr.metadata(eh).set(DataQueue(queue_capacity));
503                         m_internal_queues.insert(&qgr.metadata(eh).get<DataQueue>().q);
504                     }
505                 }
506             }
507             break;
508         case NodeKind::SLOT:
509             {
510                 const auto orig_data_nh
511                     = m_gim.metadata(nh).get<DataSlot>().original_data_node;
512                 m_slots.emplace_back(DataDesc{nh, orig_data_nh});
513             }
514             break;
515         case NodeKind::EMIT:
516             {
517                 const auto emitter_idx
518                     = m_gim.metadata(nh).get<Emitter>().proto_index;
519                 GAPI_Assert(emitter_idx < m_emitters.size());
520                 m_emitters[emitter_idx] = nh;
521             }
522             break;
523         case NodeKind::SINK:
524             {
525                 const auto sink_idx
526                     = m_gim.metadata(nh).get<Sink>().proto_index;
527                 GAPI_Assert(sink_idx < m_sinks.size());
528                 m_sinks[sink_idx] = nh;
529
530                 // Also initialize Sink's input queue
531                 ade::TypedGraph<DataQueue> qgr(*m_island_graph);
532                 GAPI_Assert(nh->inEdges().size() == 1u);
533                 qgr.metadata(nh->inEdges().front()).set(DataQueue(queue_capacity));
534                 m_sink_queues[sink_idx] = &qgr.metadata(nh->inEdges().front()).get<DataQueue>().q;
535             }
536             break;
537         default:
538             GAPI_Assert(false);
539             break;
540         } // switch(kind)
541     } // for(gim nodes)
542     m_out_queue.set_capacity(queue_capacity);
543 }
544
545 cv::gimpl::GStreamingExecutor::~GStreamingExecutor()
546 {
547     if (state == State::READY || state == State::RUNNING)
548         stop();
549 }
550
551 void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
552 {
553     GAPI_Assert(state == State::READY || state == State::STOPPED);
554
555     const auto is_video = [](const GRunArg &arg)
556     {
557         return util::holds_alternative<cv::gapi::wip::IStreamSource::Ptr>(arg);
558     };
559     const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video);
560     if (num_videos > 1)
561     {
562         // See below why (another reason - no documented behavior
563         // on handling videos streams of different length)
564         util::throw_error(std::logic_error("Only one video source is"
565                                            " currently supported!"));
566     }
567
568     GModel::ConstGraph gm(*m_orig_graph);
569     // Now the tricky-part: completing Islands compilation if compileStreaming
570     // has been called without meta arguments.
571     // The logic is basically the following:
572     // - (0) Collect metadata from input vector;
573     // - (1) If graph is compiled with meta
574     //   - (2) Just check if the passed objects have correct meta.
575     // - (3) Otherwise:
576     //   - (4) Run metadata inference;
577     //   - (5) If islands are not compiled at this point OR are not reshapeable:
578     //     - (6) Compile them for a first time with this meta;
579     //     - (7) Update internal structures with this island information
580     //   - (8) Otherwise:
581     //     - (9) Reshape islands to this new metadata.
582     //     - (10) Update internal structures again
583     const auto update_int_metas = [&]()
584     {
585         for (auto& op : m_ops)
586         {
587             op.out_metas.resize(0);
588             for (auto out_slot_nh : op.nh->outNodes())
589             {
590                 const auto &orig_nh = m_gim.metadata(out_slot_nh).get<DataSlot>().original_data_node;
591                 const auto &orig_info = gm.metadata(orig_nh).get<Data>();
592                 op.out_metas.emplace_back(orig_info.meta);
593             }
594         }
595     };
596     const auto new_meta = cv::descr_of(ins); // 0
597     if (gm.metadata().contains<OriginalInputMeta>()) // (1)
598     {
599         // NB: Metadata is tested in setSource() already - just put an assert here
600         GAPI_Assert(new_meta == gm.metadata().get<OriginalInputMeta>().inputMeta); // (2)
601     }
602     else // (3)
603     {
604         GCompiler::runMetaPasses(*m_orig_graph.get(), new_meta); // (4)
605         if (!m_gim.metadata().contains<IslandsCompiled>()
606             || (m_reshapable.has_value() && m_reshapable.value() == false)) // (5)
607         {
608             bool is_reshapable = true;
609             GCompiler::compileIslands(*m_orig_graph.get(), m_comp_args); // (6)
610             for (auto& op : m_ops)
611             {
612                 op.isl_exec = m_gim.metadata(op.nh).get<IslandExec>().object;
613                 is_reshapable &= op.isl_exec->canReshape();
614             }
615             update_int_metas(); // (7)
616             m_reshapable = util::make_optional(is_reshapable);
617         }
618         else // (8)
619         {
620             for (auto& op : m_ops)
621             {
622                 op.isl_exec->reshape(*m_orig_graph, m_comp_args); // (9)
623             }
624             update_int_metas(); // (10)
625         }
626     }
627     // Metadata handling is done!
628
629     // Walk through the protocol, set-up emitters appropriately
630     // There's a 1:1 mapping between emitters and corresponding data inputs.
631     for (auto it : ade::util::zip(ade::util::toRange(m_emitters),
632                                   ade::util::toRange(ins),
633                                   ade::util::iota(m_emitters.size())))
634     {
635         auto  emit_nh  = std::get<0>(it);
636         auto& emit_arg = std::get<1>(it);
637         auto  emit_idx = std::get<2>(it);
638         auto& emitter  = m_gim.metadata(emit_nh).get<Emitter>().object;
639
640         using T = GRunArg;
641         switch (emit_arg.index())
642         {
643         // Create a streaming emitter.
644         // Produces the next video frame when pulled.
645         case T::index_of<cv::gapi::wip::IStreamSource::Ptr>():
646 #if !defined(GAPI_STANDALONE)
647             emitter.reset(new VideoEmitter{emit_arg});
648 #else
649             util::throw_error(std::logic_error("Video is not supported in the "
650                                                "standalone mode"));
651 #endif
652             break;
653         default:
654             // Create a constant emitter.
655             // Produces always the same ("constant") value when pulled.
656             emitter.reset(new ConstEmitter{emit_arg});
657             m_const_emitter_queues.push_back(&m_emitter_queues[emit_idx]);
658             break;
659         }
660     }
661
662     // FIXME: The below code assumes our graph may have only one
663     // real video source (and so, only one stream which may really end)
664     // all other inputs are "constant" generators.
665     // Craft here a completion callback to notify Const emitters that
666     // a video source is over
667     auto real_video_completion_cb = [this]()
668     {
669         for (auto q : m_const_emitter_queues) q->push(Cmd{Stop{}});
670     };
671
672     // FIXME: ONLY now, after all executable objects are created,
673     // we can set up our execution threads. Let's do it.
674     // First create threads for all the emitters.
675     // FIXME: One way to avoid this may be including an Emitter object as a part of
676     // START message. Why not?
677     if (state == State::READY)
678     {
679         stop();
680     }
681
682     for (auto it : ade::util::indexed(m_emitters))
683     {
684         const auto id = ade::util::index(it); // = index in GComputation's protocol
685         const auto eh = ade::util::value(it);
686
687         // Prepare emitter thread parameters
688         auto emitter = m_gim.metadata(eh).get<Emitter>().object;
689
690         // Collect all reader queues from the emitter's the only output object
691         auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front());
692
693         m_threads.emplace_back(emitterActorThread,
694                                emitter,
695                                std::ref(m_emitter_queues[id]),
696                                out_queues,
697                                real_video_completion_cb);
698     }
699
700
701     // Now do this for every island (in a topological order)
702     for (auto &&op : m_ops)
703     {
704         // Prepare island thread parameters
705         auto island = m_gim.metadata(op.nh).get<IslandExec>().object;
706
707         // Collect actor's input queues
708         auto in_queues = input_queues(*m_island_graph, op.nh);
709
710         // Collect actor's output queues.
711         // This may be tricky...
712         std::vector< std::vector<stream::Q*> > out_queues;
713         for (auto &&out_eh : op.nh->outNodes()) {
714             out_queues.push_back(reader_queues(*m_island_graph, out_eh));
715         }
716
717         m_threads.emplace_back(islandActorThread,
718                                op.in_objects,
719                                op.out_objects,
720                                op.out_metas,
721                                island,
722                                in_queues,
723                                op.in_constants,
724                                out_queues);
725     }
726
727     // Finally, start a collector thread.
728     m_threads.emplace_back(collectorThread,
729                            m_sink_queues,
730                            std::ref(m_out_queue));
731     state = State::READY;
732 }
733
734 void cv::gimpl::GStreamingExecutor::start()
735 {
736     if (state == State::STOPPED)
737     {
738         util::throw_error(std::logic_error("Please call setSource() before start() "
739                                            "if the pipeline has been already stopped"));
740     }
741     GAPI_Assert(state == State::READY);
742
743     // Currently just trigger our emitters to work
744     state = State::RUNNING;
745     for (auto &q : m_emitter_queues)
746     {
747         q.push(stream::Cmd{stream::Start{}});
748     }
749 }
750
751 void cv::gimpl::GStreamingExecutor::wait_shutdown()
752 {
753     // This utility is used by pull/try_pull/stop() to uniformly
754     // shutdown the worker threads.
755     // FIXME: Of course it can be designed much better
756     for (auto &t : m_threads) t.join();
757     m_threads.clear();
758
759     // Clear all queues
760     // If there are constant emitters, internal queues
761     // may be polluted with constant values and have extra
762     // data at the point of shutdown.
763     // It usually happens when there's multiple inputs,
764     // one constant and one is not, and the latter ends (e.g.
765     // with end-of-stream).
766     for (auto &q : m_emitter_queues) q.clear();
767     for (auto &q : m_sink_queues) q->clear();
768     for (auto &q : m_internal_queues) q->clear();
769     m_out_queue.clear();
770
771     state = State::STOPPED;
772 }
773
774 bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs)
775 {
776     if (state == State::STOPPED)
777         return false;
778     GAPI_Assert(state == State::RUNNING);
779     GAPI_Assert(m_sink_queues.size() == outs.size());
780
781     Cmd cmd;
782     m_out_queue.pop(cmd);
783     if (cv::util::holds_alternative<Stop>(cmd))
784     {
785         wait_shutdown();
786         return false;
787     }
788
789     GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(cmd));
790     cv::GRunArgs &this_result = cv::util::get<cv::GRunArgs>(cmd);
791     sync_data(this_result, outs);
792     return true;
793 }
794
795 bool cv::gimpl::GStreamingExecutor::try_pull(cv::GRunArgsP &&outs)
796 {
797     if (state == State::STOPPED)
798         return false;
799
800     GAPI_Assert(m_sink_queues.size() == outs.size());
801
802     Cmd cmd;
803     if (!m_out_queue.try_pop(cmd)) {
804         return false;
805     }
806     if (cv::util::holds_alternative<Stop>(cmd))
807     {
808         wait_shutdown();
809         return false;
810     }
811
812     GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(cmd));
813     cv::GRunArgs &this_result = cv::util::get<cv::GRunArgs>(cmd);
814     sync_data(this_result, outs);
815     return true;
816 }
817
818 void cv::gimpl::GStreamingExecutor::stop()
819 {
820     if (state == State::STOPPED)
821         return;
822
823     // FIXME: ...and how to deal with still-unread data then?
824     // Push a Stop message to the every emitter,
825     // wait until it broadcasts within the pipeline,
826     // FIXME: worker threads could stuck on push()!
827     // need to read the output queues until Stop!
828     for (auto &q : m_emitter_queues) {
829         q.push(stream::Cmd{stream::Stop{}});
830     }
831
832     // Pull messages from the final queue to ensure completion
833     Cmd cmd;
834     while (!cv::util::holds_alternative<Stop>(cmd))
835     {
836         m_out_queue.pop(cmd);
837     }
838     GAPI_Assert(cv::util::holds_alternative<Stop>(cmd));
839     wait_shutdown();
840 }
841
842 bool cv::gimpl::GStreamingExecutor::running() const
843 {
844     return (state == State::RUNNING);
845 }