Publishing 2019 R1 content
[platform/upstream/dldt.git] / inference-engine / src / mkldnn_plugin / mkldnn_streams.h
1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 #pragma once
6
7 #include <string>
8 #include <vector>
9 #include <atomic>
10 #include <map>
11 #include <queue>
12 #include <memory>
13 #include <climits>
14 #include <cpp_interfaces/impl/ie_infer_request_internal.hpp>
15 #include <cpp_interfaces/ie_task_executor.hpp>
16 #include "ie_parallel.hpp"
17 #include "mkldnn/omp_manager.h"
18
19 /* CPU "streams" implement a feature that allows multiple Infer Requests to be efficiently run simultaneously.
20  * To avoid potential oversubscription the CPU execution resources are divided accordingly.
21  * The feature enables much better performance for the networks that originally do not scale well with #threads
22  * even for a large batches. Examples are lightweight topologies or topologies with many sequential/mem-bound/etc or
23  * otherwise non-scalable layers. This is especially pronounced for many-core (e.g. server) machines.
24  * This is rather throughput-oriented feature,because running multiple requests in parallel might increase the latency
25  * of each request.
26  * Additionally, the streams help to relax the need for the large batch to improve the throughput and simplify the
27  * application logic, helping to saturate the CPU by multiple requests instead.
28  * Implementation-wise, the "streams" constitute the following:
29  *  - Pure "graph-less" Infer Requests that are not connected to the specific MKLDNNGraph (which is regular/legacy approach)
30  *  - Just like regular requests, the graph-less go to the common (per ExecutableNetwork) queue
31  *  - But unlike conventional case, there are multiple threads that grab the requests (see MultiWorkerTaskExecutor)
32  *  - So every stream is in fact is independent "worker" thread that monitors the queue.
33  *  - Every worker thread (stream) has it's own copy of the graph (which handles intermediate data required for execution)
34  *  - While the Infer Requests just keep only input/output data
35 */
36 namespace MKLDNNPlugin {
37
38 using namespace InferenceEngine;
39 class MKLDNNGraph;
40 class pinning_observer;
41
42 /* This structure handles an "execution context" - data required to execute an Infer Request.
43  * This includes graph (which handles the intermediate data) and arena/observer for the TBB */
44 struct MultiWorkerTaskContext {
45     std::shared_ptr<MKLDNNGraph> ptrGraph;
46 };
47
48 #if defined(__APPLE__) || defined(_WIN32)
49 typedef void cpu_set_t;
50 #define CPU_FREE(cpuset)
51 // notice that functions below are just stubs for OSs other than Linux
52 #endif
53 /* Check whether any affinity-related env variables are set (relevant for the OpenMP) */
54 bool check_env_variables();
55 /* Get the cores affinity mask for the current process */
56 bool get_process_mask(int& ncpus, cpu_set_t*& mask);
57 /* Pin current thread to a set of cores determined by the mask. */
58 bool pin_current_thread_by_mask(int ncores, const cpu_set_t* proc_mask);
59 /* Pin thread to a spare core in the round-robin scheme, while respecting the given process mask.
60  * The function can also handle the hyper-threading (by populating the physical cores first) */
61 bool pin_thread_to_vacant_core(int thr_idx, int hyperthreads, int ncores, const cpu_set_t* proc_mask);
62
63 #if IE_THREAD == IE_THREAD_TBB
64 /* Simple observer that handles pinning threads to the cores, it serves as a callback for threads entering the arena. */
65 class pinning_observer: public tbb::task_scheduler_observer {
66     cpu_set_t *mask;
67     int ncpus;
68     int stream_id, threads_per_stream;
69     const int pinning_step;
70
71 public:
72     pinning_observer(tbb::task_arena& _arena, int _stream_id, int _threads_per_stream, int _pinning_step = 1) :
73             tbb::task_scheduler_observer(_arena),
74             stream_id(_stream_id), threads_per_stream(_threads_per_stream), pinning_step(_pinning_step) {
75         get_process_mask(ncpus, mask);
76     }
77
78     void on_scheduler_entry(bool) override {
79         if (!mask) return;
80         int thread_idx = tbb::task_arena::current_thread_index();
81         int thr_idx = stream_id * threads_per_stream + thread_idx;
82         // pin thread to the vacant slot
83         pin_thread_to_vacant_core(thr_idx, pinning_step, ncpus, mask);
84     }
85
86     void on_scheduler_exit(bool) override {
87         if (!mask) return;
88         // reset the thread's mask (to the original process mask)
89         pin_current_thread_by_mask(ncpus, mask);
90     }
91
92     ~pinning_observer() {
93         if (mask)
94             CPU_FREE(mask);
95     }
96 };
97
98 class auto_scope_observing {
99 public:
100      explicit auto_scope_observing(std::unique_ptr<tbb::task_scheduler_observer>&  _p) : p(_p) {
101          if (p)
102              p->observe(true);
103      }
104      ~auto_scope_observing() {
105          if (p)
106             p->observe(false);
107      }
108
109 protected:
110     std::unique_ptr<tbb::task_scheduler_observer>&  p;
111 };
112 #endif  // IE_THREAD == IE_THREAD_TBB
113
114 /* Class wrapping multiple worker threads that monitors the same queue with Infer Requests. */
115 class MultiWorkerTaskExecutor : public ITaskExecutor {
116 public:
117     typedef std::shared_ptr<MultiWorkerTaskExecutor> Ptr;
118
119     explicit MultiWorkerTaskExecutor(const std::vector<Task::Ptr>&, std::string name = "Default");
120
121     ~MultiWorkerTaskExecutor();
122
123     /**
124     * @brief Adds task for execution and notifies one of the working threads about the new task.
125     * @note can be called from multiple threads - tasks will be added to the queue and executed one-by-one in FIFO mode.
126     * @param task - shared pointer to the task
127     *  @return true if succeed to add task, otherwise - false
128     */
129     bool startTask(Task::Ptr task) override;
130
131     static thread_local MultiWorkerTaskContext ptrContext;
132
133 private:
134     std::vector<std::thread> _threads;
135     std::mutex _queueMutex;
136     std::condition_variable _queueCondVar;
137     std::queue<Task::Ptr> _taskQueue;
138     std::atomic<bool> _isStopped;
139     std::string _name;
140     std::atomic<int> _initCount;
141 };
142
143 /* Pure Infer Requests - just input and output data. */
144 class MKLDNNGraphlessInferRequest : public InferenceEngine::InferRequestInternal {
145 public:
146     typedef std::shared_ptr<MKLDNNGraphlessInferRequest> Ptr;
147     explicit MKLDNNGraphlessInferRequest(InferenceEngine::InputsDataMap networkInputs,
148                                          InferenceEngine::OutputsDataMap networkOutputs);
149
150     void InferImpl() override;
151
152     void GetPerformanceCounts(std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> &perfMap) const override;
153
154     /**
155      * @brief Given optional implementation of setting blob to avoid need for it to be implemented by plugin
156      * @param name - a name of input or output blob.
157      * @param data - a reference to input or output blob. The type of Blob must correspond to the network input precision and size.
158      */
159     void SetBlob(const char *name, const InferenceEngine::Blob::Ptr &data) override;
160
161     /**
162      * @brief Given optional implementation of getting blob to avoid need for it to be implemented by plugin
163      * @param name - a name of input or output blob.
164      * @param data - a reference to input or output blob. The type of Blob must correspond to the network input precision and size.
165      */
166     void GetBlob(const char *name, InferenceEngine::Blob::Ptr &data) override;
167
168
169     void SetBatch(int batch = -1) override;
170
171 private:
172     int m_curBatch;
173     std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> m_perfMap;
174 };
175
176
177 }  // namespace MKLDNNPlugin