Merge pull request #20298 from mpashchenkov:mp/python-desync
[platform/upstream/opencv.git] / modules / gapi / src / executor / gstreamingexecutor.hpp
1 // This file is part of OpenCV project.
2 // It is subject to the license terms in the LICENSE file found in the top-level directory
3 // of this distribution and at http://opencv.org/license.html.
4 //
5 // Copyright (C) 2019-2020 Intel Corporation
6
7 #ifndef OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP
8 #define OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP
9
10 #ifdef _MSC_VER
11 #pragma warning(disable: 4503)  // "decorated name length exceeded"
12                                 // on concurrent_bounded_queue
13 #endif
14
15 #include <memory> // unique_ptr, shared_ptr
16 #include <thread> // thread
17 #include <vector>
18 #include <unordered_map>
19
20 #if defined(HAVE_TBB)
21 #  include <tbb/concurrent_queue.h> // FIXME: drop it from here!
22 template<typename T> using QueueClass = tbb::concurrent_bounded_queue<T>;
23 #else
24 #  include "executor/conc_queue.hpp"
25 template<typename T> using QueueClass = cv::gapi::own::concurrent_bounded_queue<T>;
26 #endif // TBB
27 #include "executor/last_value.hpp"
28
29 #include <ade/graph.hpp>
30
31 #include "backends/common/gbackend.hpp"
32
33 namespace cv {
34 namespace gimpl {
35
36 namespace stream {
37 struct Start {};
38 struct Stop {
39     enum class Kind {
40         HARD, // a hard-stop: end-of-pipeline reached or stop() called
41         CNST, // a soft-stop emitted for/by constant sources (see QueueReader)
42     } kind = Kind::HARD;
43     cv::GRunArg cdata; // const data for CNST stop
44 };
45
46 struct Result {
47     cv::GRunArgs      args;  // Full results vector
48     std::vector<bool> flags; // Availability flags (in case of desync)
49 };
50
51 using Cmd = cv::util::variant
52     < cv::util::monostate
53     , Start        // Tells emitters to start working. Not broadcasted to workers.
54     , Stop         // Tells emitters to stop working. Broadcasted to workers.
55     , cv::GRunArg  // Workers data payload to process.
56     , Result       // Pipeline's data for gout()
57     >;
58
59 // Interface over a queue. The underlying queue implementation may be
60 // different. This class is mainly introduced to bring some
61 // abstraction over the real queues (bounded in-order) and a
62 // desynchronized data slots (see required to implement
63 // cv::gapi::desync)
64
65 class Q {
66 public:
67     virtual void push(const Cmd &cmd) = 0;
68     virtual void pop(Cmd &cmd) = 0;
69     virtual bool try_pop(Cmd &cmd) = 0;
70     virtual void clear() = 0;
71     virtual ~Q() = default;
72 };
73
74 // A regular queue implementation
75 class SyncQueue final: public Q {
76     QueueClass<Cmd> m_q;    // FIXME: OWN or WRAP??
77
78 public:
79     virtual void push(const Cmd &cmd) override { m_q.push(cmd); }
80     virtual void pop(Cmd &cmd)        override { m_q.pop(cmd);  }
81     virtual bool try_pop(Cmd &cmd)    override { return m_q.try_pop(cmd); }
82     virtual void clear()              override { m_q.clear(); }
83
84     void set_capacity(std::size_t c) { m_q.set_capacity(c);}
85 };
86
87 // Desynchronized "queue" implementation
88 // Every push overwrites value which is not yet popped
89 // This container can hold 0 or 1 element
90 // Special handling for Stop is implemented (FIXME: not really)
91 class DesyncQueue final: public Q {
92     cv::gapi::own::last_written_value<Cmd> m_v;
93
94 public:
95     virtual void push(const Cmd &cmd) override { m_v.push(cmd); }
96     virtual void pop(Cmd &cmd)        override { m_v.pop(cmd);  }
97     virtual bool try_pop(Cmd &cmd)    override { return m_v.try_pop(cmd); }
98     virtual void clear()              override { m_v.clear(); }
99 };
100
101 } // namespace stream
102
103 // FIXME: Currently all GExecutor comments apply also
104 // to this one. Please document it separately in the future.
105
106 class GStreamingExecutor final
107 {
108 protected:
109     // GStreamingExecutor is a state machine described as follows
110     //
111     //              setSource() called
112     //   STOPPED:  - - - - - - - - - ->READY:
113     //   --------                      ------
114     //   Initial state                 Input data specified
115     //   No threads running            Threads are created and IDLE
116     //   ^                             (Currently our emitter threads
117     //   :                             are bounded to input data)
118     //   : stop() called               No processing happending
119     //   : OR                          :
120     //   : end-of-stream reached       :  start() called
121     //   : during pull()/try_pull()    V
122     //   :                             RUNNING:
123     //   :                             --------
124     //   :                             Actual pipeline execution
125     //    - - - - - - - - - - - - - -  Threads are running
126     //
127     enum class State {
128         STOPPED,
129         READY,
130         RUNNING,
131     } state = State::STOPPED;
132
133     std::unique_ptr<ade::Graph> m_orig_graph;
134     std::shared_ptr<ade::Graph> m_island_graph;
135     cv::GCompileArgs m_comp_args;
136     cv::GMetaArgs m_last_metas;
137     util::optional<bool> m_reshapable;
138
139     cv::gimpl::GIslandModel::Graph m_gim; // FIXME: make const?
140     const bool m_desync;
141
142     // FIXME: Naive executor details are here for now
143     // but then it should be moved to another place
144     struct OpDesc
145     {
146         std::vector<RcDesc> in_objects;
147         std::vector<RcDesc> out_objects;
148         cv::GMetaArgs       out_metas;
149         ade::NodeHandle     nh;
150
151         cv::GRunArgs in_constants;
152
153         std::shared_ptr<GIslandExecutable> isl_exec;
154     };
155     std::vector<OpDesc> m_ops;
156
157     struct DataDesc
158     {
159         ade::NodeHandle slot_nh;
160         ade::NodeHandle data_nh;
161     };
162     std::vector<DataDesc> m_slots;
163
164     cv::GRunArgs m_const_vals;
165
166     // Order in these vectors follows the GComputaion's protocol
167     std::vector<ade::NodeHandle> m_emitters;
168     std::vector<ade::NodeHandle> m_sinks;
169
170     class Synchronizer;
171     std::unique_ptr<Synchronizer> m_sync;
172
173     std::vector<std::thread> m_threads;
174     std::vector<stream::SyncQueue>   m_emitter_queues;
175
176     // a view over m_emitter_queues
177     std::vector<stream::SyncQueue*>  m_const_emitter_queues;
178
179     std::vector<stream::Q*>          m_sink_queues;
180
181     // desync path tags for outputs. -1 means that output
182     // doesn't belong to a desync path
183     std::vector<int>                 m_sink_sync;
184
185     std::unordered_set<stream::Q*>   m_internal_queues;
186     stream::SyncQueue m_out_queue;
187
188     // Describes mapping from desync paths to collector threads
189     struct CollectorThreadInfo {
190         std::vector<stream::Q*>  queues;
191         std::vector<int> mapping;
192     };
193     std::unordered_map<int, CollectorThreadInfo> m_collector_map;
194
195
196     void wait_shutdown();
197
198     cv::GTypesInfo out_info;
199
200 public:
201     explicit GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model,
202                                 const cv::GCompileArgs &comp_args);
203     ~GStreamingExecutor();
204     void setSource(GRunArgs &&args);
205     void start();
206     bool pull(cv::GRunArgsP &&outs);
207     bool pull(cv::GOptRunArgsP &&outs);
208     std::tuple<bool, cv::util::variant<cv::GRunArgs, cv::GOptRunArgs>> pull();
209     bool try_pull(cv::GRunArgsP &&outs);
210     void stop();
211     bool running() const;
212 };
213
214 } // namespace gimpl
215 } // namespace cv
216
217 #endif // OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP