1 // This file is part of OpenCV project.
2 // It is subject to the license terms in the LICENSE file found in the top-level directory
3 // of this distribution and at http://opencv.org/license.html.
5 // Copyright (C) 2019-2020 Intel Corporation
7 #ifndef OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP
8 #define OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP
11 #pragma warning(disable: 4503) // "decorated name length exceeded"
12 // on concurrent_bounded_queue
15 #include <memory> // unique_ptr, shared_ptr
16 #include <thread> // thread
18 #include <unordered_map>
21 # include <tbb/concurrent_queue.h> // FIXME: drop it from here!
22 template<typename T> using QueueClass = tbb::concurrent_bounded_queue<T>;
24 # include "executor/conc_queue.hpp"
25 template<typename T> using QueueClass = cv::gapi::own::concurrent_bounded_queue<T>;
27 #include "executor/last_value.hpp"
29 #include <ade/graph.hpp>
31 #include "backends/common/gbackend.hpp"
40 HARD, // a hard-stop: end-of-pipeline reached or stop() called
41 CNST, // a soft-stop emitted for/by constant sources (see QueueReader)
43 cv::GRunArg cdata; // const data for CNST stop
47 cv::GRunArgs args; // Full results vector
48 std::vector<bool> flags; // Availability flags (in case of desync)
51 using Cmd = cv::util::variant
53 , Start // Tells emitters to start working. Not broadcasted to workers.
54 , Stop // Tells emitters to stop working. Broadcasted to workers.
55 , cv::GRunArg // Workers data payload to process.
56 , Result // Pipeline's data for gout()
59 // Interface over a queue. The underlying queue implementation may be
60 // different. This class is mainly introduced to bring some
61 // abstraction over the real queues (bounded in-order) and a
62 // desynchronized data slots (see required to implement
67 virtual void push(const Cmd &cmd) = 0;
68 virtual void pop(Cmd &cmd) = 0;
69 virtual bool try_pop(Cmd &cmd) = 0;
70 virtual void clear() = 0;
71 virtual ~Q() = default;
74 // A regular queue implementation
75 class SyncQueue final: public Q {
76 QueueClass<Cmd> m_q; // FIXME: OWN or WRAP??
79 virtual void push(const Cmd &cmd) override { m_q.push(cmd); }
80 virtual void pop(Cmd &cmd) override { m_q.pop(cmd); }
81 virtual bool try_pop(Cmd &cmd) override { return m_q.try_pop(cmd); }
82 virtual void clear() override { m_q.clear(); }
84 void set_capacity(std::size_t c) { m_q.set_capacity(c);}
87 // Desynchronized "queue" implementation
88 // Every push overwrites value which is not yet popped
89 // This container can hold 0 or 1 element
90 // Special handling for Stop is implemented (FIXME: not really)
91 class DesyncQueue final: public Q {
92 cv::gapi::own::last_written_value<Cmd> m_v;
95 virtual void push(const Cmd &cmd) override { m_v.push(cmd); }
96 virtual void pop(Cmd &cmd) override { m_v.pop(cmd); }
97 virtual bool try_pop(Cmd &cmd) override { return m_v.try_pop(cmd); }
98 virtual void clear() override { m_v.clear(); }
101 } // namespace stream
103 // FIXME: Currently all GExecutor comments apply also
104 // to this one. Please document it separately in the future.
106 class GStreamingExecutor final
109 // GStreamingExecutor is a state machine described as follows
111 // setSource() called
112 // STOPPED: - - - - - - - - - ->READY:
114 // Initial state Input data specified
115 // No threads running Threads are created and IDLE
116 // ^ (Currently our emitter threads
117 // : are bounded to input data)
118 // : stop() called No processing happending
120 // : end-of-stream reached : start() called
121 // : during pull()/try_pull() V
124 // : Actual pipeline execution
125 // - - - - - - - - - - - - - - Threads are running
131 } state = State::STOPPED;
133 std::unique_ptr<ade::Graph> m_orig_graph;
134 std::shared_ptr<ade::Graph> m_island_graph;
135 cv::GCompileArgs m_comp_args;
136 cv::GMetaArgs m_last_metas;
137 util::optional<bool> m_reshapable;
139 cv::gimpl::GIslandModel::Graph m_gim; // FIXME: make const?
142 // FIXME: Naive executor details are here for now
143 // but then it should be moved to another place
146 std::vector<RcDesc> in_objects;
147 std::vector<RcDesc> out_objects;
148 cv::GMetaArgs out_metas;
151 cv::GRunArgs in_constants;
153 std::shared_ptr<GIslandExecutable> isl_exec;
155 std::vector<OpDesc> m_ops;
159 ade::NodeHandle slot_nh;
160 ade::NodeHandle data_nh;
162 std::vector<DataDesc> m_slots;
164 cv::GRunArgs m_const_vals;
166 // Order in these vectors follows the GComputaion's protocol
167 std::vector<ade::NodeHandle> m_emitters;
168 std::vector<ade::NodeHandle> m_sinks;
171 std::unique_ptr<Synchronizer> m_sync;
173 std::vector<std::thread> m_threads;
174 std::vector<stream::SyncQueue> m_emitter_queues;
176 // a view over m_emitter_queues
177 std::vector<stream::SyncQueue*> m_const_emitter_queues;
179 std::vector<stream::Q*> m_sink_queues;
181 // desync path tags for outputs. -1 means that output
182 // doesn't belong to a desync path
183 std::vector<int> m_sink_sync;
185 std::unordered_set<stream::Q*> m_internal_queues;
186 stream::SyncQueue m_out_queue;
188 // Describes mapping from desync paths to collector threads
189 struct CollectorThreadInfo {
190 std::vector<stream::Q*> queues;
191 std::vector<int> mapping;
193 std::unordered_map<int, CollectorThreadInfo> m_collector_map;
196 void wait_shutdown();
198 cv::GTypesInfo out_info;
201 explicit GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model,
202 const cv::GCompileArgs &comp_args);
203 ~GStreamingExecutor();
204 void setSource(GRunArgs &&args);
206 bool pull(cv::GRunArgsP &&outs);
207 bool pull(cv::GOptRunArgsP &&outs);
208 std::tuple<bool, cv::util::variant<cv::GRunArgs, cv::GOptRunArgs>> pull();
209 bool try_pull(cv::GRunArgsP &&outs);
211 bool running() const;
217 #endif // OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP