1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
14 #include <cpp_interfaces/impl/ie_infer_request_internal.hpp>
15 #include <cpp_interfaces/ie_task_executor.hpp>
16 #include "ie_parallel.hpp"
17 #include "mkldnn/omp_manager.h"
19 /* CPU "streams" implement a feature that allows multiple Infer Requests to be efficiently run simultaneously.
20 * To avoid potential oversubscription the CPU execution resources are divided accordingly.
21 * The feature enables much better performance for the networks that originally do not scale well with #threads
22 * even for a large batches. Examples are lightweight topologies or topologies with many sequential/mem-bound/etc or
23 * otherwise non-scalable layers. This is especially pronounced for many-core (e.g. server) machines.
24 * This is rather throughput-oriented feature,because running multiple requests in parallel might increase the latency
26 * Additionally, the streams help to relax the need for the large batch to improve the throughput and simplify the
27 * application logic, helping to saturate the CPU by multiple requests instead.
28 * Implementation-wise, the "streams" constitute the following:
29 * - Pure "graph-less" Infer Requests that are not connected to the specific MKLDNNGraph (which is regular/legacy approach)
30 * - Just like regular requests, the graph-less go to the common (per ExecutableNetwork) queue
31 * - But unlike conventional case, there are multiple threads that grab the requests (see MultiWorkerTaskExecutor)
32 * - So every stream is in fact is independent "worker" thread that monitors the queue.
33 * - Every worker thread (stream) has it's own copy of the graph (which handles intermediate data required for execution)
34 * - While the Infer Requests just keep only input/output data
36 namespace MKLDNNPlugin {
38 using namespace InferenceEngine;
40 class pinning_observer;
42 /* This structure handles an "execution context" - data required to execute an Infer Request.
43 * This includes graph (which handles the intermediate data) and arena/observer for the TBB */
44 struct MultiWorkerTaskContext {
45 std::shared_ptr<MKLDNNGraph> ptrGraph;
48 #if defined(__APPLE__) || defined(_WIN32)
49 typedef void cpu_set_t;
50 #define CPU_FREE(cpuset)
51 // notice that functions below are just stubs for OSs other than Linux
53 /* Check whether any affinity-related env variables are set (relevant for the OpenMP) */
54 bool check_env_variables();
55 /* Get the cores affinity mask for the current process */
56 bool get_process_mask(int& ncpus, cpu_set_t*& mask);
57 /* Pin current thread to a set of cores determined by the mask. */
58 bool pin_current_thread_by_mask(int ncores, const cpu_set_t* proc_mask);
59 /* Pin thread to a spare core in the round-robin scheme, while respecting the given process mask.
60 * The function can also handle the hyper-threading (by populating the physical cores first) */
61 bool pin_thread_to_vacant_core(int thr_idx, int hyperthreads, int ncores, const cpu_set_t* proc_mask);
63 #if IE_THREAD == IE_THREAD_TBB
64 /* Simple observer that handles pinning threads to the cores, it serves as a callback for threads entering the arena. */
65 class pinning_observer: public tbb::task_scheduler_observer {
68 int stream_id, threads_per_stream;
69 const int pinning_step;
72 pinning_observer(tbb::task_arena& _arena, int _stream_id, int _threads_per_stream, int _pinning_step = 1) :
73 tbb::task_scheduler_observer(_arena),
74 stream_id(_stream_id), threads_per_stream(_threads_per_stream), pinning_step(_pinning_step) {
75 get_process_mask(ncpus, mask);
78 void on_scheduler_entry(bool) override {
80 int thread_idx = tbb::task_arena::current_thread_index();
81 int thr_idx = stream_id * threads_per_stream + thread_idx;
82 // pin thread to the vacant slot
83 pin_thread_to_vacant_core(thr_idx, pinning_step, ncpus, mask);
86 void on_scheduler_exit(bool) override {
88 // reset the thread's mask (to the original process mask)
89 pin_current_thread_by_mask(ncpus, mask);
98 class auto_scope_observing {
100 explicit auto_scope_observing(std::unique_ptr<tbb::task_scheduler_observer>& _p) : p(_p) {
104 ~auto_scope_observing() {
110 std::unique_ptr<tbb::task_scheduler_observer>& p;
112 #endif // IE_THREAD == IE_THREAD_TBB
114 /* Class wrapping multiple worker threads that monitors the same queue with Infer Requests. */
115 class MultiWorkerTaskExecutor : public ITaskExecutor {
117 typedef std::shared_ptr<MultiWorkerTaskExecutor> Ptr;
119 explicit MultiWorkerTaskExecutor(const std::vector<Task::Ptr>&, std::string name = "Default");
121 ~MultiWorkerTaskExecutor();
124 * @brief Adds task for execution and notifies one of the working threads about the new task.
125 * @note can be called from multiple threads - tasks will be added to the queue and executed one-by-one in FIFO mode.
126 * @param task - shared pointer to the task
127 * @return true if succeed to add task, otherwise - false
129 bool startTask(Task::Ptr task) override;
131 static thread_local MultiWorkerTaskContext ptrContext;
134 std::vector<std::thread> _threads;
135 std::mutex _queueMutex;
136 std::condition_variable _queueCondVar;
137 std::queue<Task::Ptr> _taskQueue;
138 std::atomic<bool> _isStopped;
140 std::atomic<int> _initCount;
143 /* Pure Infer Requests - just input and output data. */
144 class MKLDNNGraphlessInferRequest : public InferenceEngine::InferRequestInternal {
146 typedef std::shared_ptr<MKLDNNGraphlessInferRequest> Ptr;
147 explicit MKLDNNGraphlessInferRequest(InferenceEngine::InputsDataMap networkInputs,
148 InferenceEngine::OutputsDataMap networkOutputs);
150 void InferImpl() override;
152 void GetPerformanceCounts(std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> &perfMap) const override;
155 * @brief Given optional implementation of setting blob to avoid need for it to be implemented by plugin
156 * @param name - a name of input or output blob.
157 * @param data - a reference to input or output blob. The type of Blob must correspond to the network input precision and size.
159 void SetBlob(const char *name, const InferenceEngine::Blob::Ptr &data) override;
162 * @brief Given optional implementation of getting blob to avoid need for it to be implemented by plugin
163 * @param name - a name of input or output blob.
164 * @param data - a reference to input or output blob. The type of Blob must correspond to the network input precision and size.
166 void GetBlob(const char *name, InferenceEngine::Blob::Ptr &data) override;
169 void SetBatch(int batch = -1) override;
173 std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> m_perfMap;
177 } // namespace MKLDNNPlugin