Publishing 2019 R1 content
[platform/upstream/dldt.git] / inference-engine / src / mkldnn_plugin / mkldnn_streams.cpp
1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 #include <string>
6 #include <map>
7 #include <vector>
8 #include <limits>
9 #include <chrono>
10 #include <climits>
11 #include <memory>
12
13 #include "mkldnn_graph.h"
14 #include "ie_parallel.hpp"
15 #include "mkldnn_streams.h"
16
17 using namespace mkldnn;
18 using namespace MKLDNNPlugin;
19 using namespace InferenceEngine;
20 using namespace InferenceEngine::details;
21
22 namespace MKLDNNPlugin {
23
24 thread_local MultiWorkerTaskContext MultiWorkerTaskExecutor::ptrContext;
25
26 bool check_env_variables() {
27 #if IE_THREAD == IE_THREAD_OMP
28     return MKLDNNPlugin::cpu::checkOpenMpEnvVars(false);
29 #else
30     return false;
31 #endif
32 }
33
34 #if !(defined(__APPLE__) || defined(_WIN32))
35 /* Get the cores affinity mask for the current process */
36 bool get_process_mask(int& ncpus, cpu_set_t*& mask) {
37     for (ncpus = sizeof(cpu_set_t) / CHAR_BIT; ncpus < 32768 /* reasonable limit of #cores*/; ncpus <<= 1) {
38         mask = CPU_ALLOC(ncpus);
39         if (!mask) return false;
40
41         const size_t size = CPU_ALLOC_SIZE(ncpus);
42         CPU_ZERO_S(size, mask);
43         const int err = sched_getaffinity(getpid(), size, mask);
44         // the result fits the mask
45         if (!err) break;
46         // mask size is not enough
47         CPU_FREE(mask);
48         mask = NULL;
49         // other error
50         if (errno != EINVAL) break;
51     }
52     if (!mask) {
53         return false;
54     }
55     return true;
56 }
57 /* Pin current thread to a set of cores determined by the mask. */
58 bool pin_current_thread_by_mask(int ncores, const cpu_set_t* proc_mask) {
59     return 0 == sched_setaffinity(0, ncores, proc_mask);
60 }
61 /* Pin thread to a spare core in the round-robin scheme, while respecting the given process mask.
62  * The function can also handle the hyper-threading (by populating the physical cores first) */
63 bool pin_thread_to_vacant_core(int thr_idx, int hyperthreads, int ncores, const cpu_set_t* proc_mask) {
64     if (proc_mask == nullptr)
65         return false;
66     const size_t size = CPU_ALLOC_SIZE(ncores);
67     const int num_cpus = CPU_COUNT_S(size, proc_mask);
68     thr_idx %= num_cpus;  // To limit unique number in [; num_cpus-1] range
69
70     // Place threads with specified step
71     int cpu_idx = 0;
72     for (int i = 0, offset = 0; i < thr_idx; ++i) {
73         cpu_idx += hyperthreads;
74         if (cpu_idx >= num_cpus)
75             cpu_idx = ++offset;
76     }
77
78     // Find index of 'cpu_idx'-th bit that equals to 1
79     int mapped_idx = -1;
80     while (cpu_idx >= 0) {
81         if (CPU_ISSET_S(++mapped_idx, size, proc_mask))
82             --cpu_idx;
83     }
84
85     cpu_set_t *target_mask = CPU_ALLOC(ncores);
86     CPU_ZERO_S(size, target_mask);
87     CPU_SET_S(mapped_idx, size, target_mask);
88     bool res = pin_current_thread_by_mask(size, target_mask);
89     CPU_FREE(target_mask);
90     return res;
91 }
92 #else   // no threads pinning/binding on Win/MacOS
93 bool get_process_mask(int& ncpus, cpu_set_t*& mask) {
94     ncpus = 0;
95     mask =  nullptr;
96     return false;
97 }
98 bool pin_thread_to_vacant_core(int thr_idx, int hyperthreads, int ncores, const cpu_set_t* proc_mask) {
99     return false;
100 }
101 bool pin_current_thread_by_mask(int ncores, const cpu_set_t* proc_mask) {
102     return false;
103 }
104 #endif  // !(defined(__APPLE__) || defined(_WIN32))
105
106 MultiWorkerTaskExecutor::MultiWorkerTaskExecutor(const std::vector<Task::Ptr>& init_tasks, std::string name) :
107         _isStopped(false), _name(name), _initCount(0) {
108     for (auto t : init_tasks) {
109         _threads.push_back(std::thread([&, t] {
110             // initialization (no contention, every worker thread is doing it's own task)
111             t->runNoThrowNoBusyCheck();
112             _initCount++;
113
114             while (!_isStopped) {
115                 bool isQueueEmpty;
116                 Task::Ptr currentTask = nullptr;
117                 {  // waiting for the new task or for stop signal
118                     std::unique_lock<std::mutex> lock(_queueMutex);
119                     _queueCondVar.wait(lock, [&]() { return !_taskQueue.empty() || _isStopped; });
120                     isQueueEmpty = _taskQueue.empty();
121                     if (!isQueueEmpty) {
122                         currentTask = _taskQueue.front();
123                         _taskQueue.pop();
124                         isQueueEmpty = _taskQueue.empty();
125                     }
126                 }
127                 if (currentTask)
128                     currentTask->runNoThrowNoBusyCheck();
129                 if (_isStopped)
130                     break;
131                 if (isQueueEmpty)  // notify dtor, that all tasks were completed
132                     _queueCondVar.notify_all();
133             }
134         }));
135     }
136     while (_initCount != init_tasks.size()) {
137         std::this_thread::sleep_for(std::chrono::milliseconds(10));
138     }
139 }
140
141 MultiWorkerTaskExecutor::~MultiWorkerTaskExecutor() {
142     {
143         std::unique_lock<std::mutex> lock(_queueMutex);
144         if (!_taskQueue.empty()) {
145             _queueCondVar.wait(lock, [this]() { return _taskQueue.empty(); });
146         }
147         _isStopped = true;
148         _queueCondVar.notify_all();
149     }
150     for (auto& thread : _threads) {
151         if (thread.joinable()) {
152             thread.join();
153         }
154     }
155 }
156
157 bool MultiWorkerTaskExecutor::startTask(Task::Ptr task) {
158     if (!task->occupy()) return false;
159     std::unique_lock<std::mutex> lock(_queueMutex);
160     _taskQueue.push(task);
161     _queueCondVar.notify_one();
162     return true;
163 }
164
165 MKLDNNPlugin::MKLDNNGraphlessInferRequest::MKLDNNGraphlessInferRequest(InferenceEngine::InputsDataMap networkInputs,
166                                                                        InferenceEngine::OutputsDataMap networkOutputs)
167         : InferRequestInternal(networkInputs, networkOutputs), m_curBatch(-1) {
168     // Allocate all input blobs
169     for (const auto& it : networkInputs) {
170         InferenceEngine::Blob::Ptr blob;
171         GetBlob(it.first.c_str(), blob);
172     }
173     // Allocate all output blobs
174     for (const auto& it : networkOutputs) {
175         InferenceEngine::Blob::Ptr blob;
176         GetBlob(it.first.c_str(), blob);
177     }
178 }
179
180
181 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::InferImpl() {
182     IE_PROFILING_AUTO_SCOPE(MKLDNN_INFER)
183
184     auto infer = [this] {
185         IE_ASSERT(MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph != nullptr);
186         MKLDNNGraph::Ptr graph = MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph;
187         if (!graph->IsReady())
188             THROW_IE_EXCEPTION << "Network not loaded.";
189         if (m_curBatch > 0 && !graph->getProperty().enableDynamicBatch)
190             THROW_IE_EXCEPTION << "Dynamic batch is not enabled.";
191
192         if (m_curBatch > graph->getProperty().batchLimit)
193             THROW_IE_EXCEPTION << "Invalid dynamic batch size " << m_curBatch <<
194                                " for this request.";
195
196         // execute input pre-processing.
197         execDataPreprocessing(_inputs);
198
199         // need to retain converted blobs until infer finish
200         std::vector<InferenceEngine::Blob::Ptr> convertedInputs;
201         for (auto input : _inputs) {
202             if (!_networkInputs[input.first]) {
203                 THROW_IE_EXCEPTION <<
204                                    "input blobs map contains not registered during IInferencePlugin::LoadNetwork blob with name "
205                                    << input.first;
206             }
207             InferenceEngine::Blob::Ptr iconv;
208             InferenceEngine::TBlob<float> *in_f = nullptr;
209             switch (input.second->precision()) {
210                 case InferenceEngine::Precision::FP32:
211                     graph->PushInputData(input.first, input.second);
212                     break;
213                 case InferenceEngine::Precision::U16:
214                     // U16 is unsupported by mkldnn, so here we convert the blob and send FP32
215                     iconv = InferenceEngine::make_shared_blob<float, const InferenceEngine::SizeVector>(
216                             InferenceEngine::Precision::FP32,
217                             input.second->getTensorDesc().getLayout(), input.second->dims());
218                     convertedInputs.push_back(iconv);
219                     iconv->allocate();
220                     in_f = dynamic_cast<InferenceEngine::TBlob<float> *>(iconv.get());
221                     InferenceEngine::copyToFloat<uint16_t>(in_f->data(), input.second.get());
222                     graph->PushInputData(input.first, iconv);
223                     break;
224                 case InferenceEngine::Precision::I16:
225                     if (graph->hasMeanImageFor(input.first)) {
226                         // If a mean image exists, we convert the blob and send FP32
227                         iconv = InferenceEngine::make_shared_blob<float, const InferenceEngine::SizeVector>(
228                                 InferenceEngine::Precision::FP32,
229                                 input.second->getTensorDesc().getLayout(), input.second->dims());
230                         convertedInputs.push_back(iconv);
231                         iconv->allocate();
232                         in_f = dynamic_cast<InferenceEngine::TBlob<float> *>(iconv.get());
233                         InferenceEngine::copyToFloat<int16_t>(in_f->data(), input.second.get());
234                         graph->PushInputData(input.first, iconv);
235                     } else {
236                         // Instead we can send I16 directly
237                         graph->PushInputData(input.first, input.second);
238                     }
239                     break;
240                 case InferenceEngine::Precision::U8:
241                     if (graph->hasMeanImageFor(input.first)) {
242                         // If a mean image exists, we convert the blob and send FP32
243                         iconv = InferenceEngine::make_shared_blob<float, const InferenceEngine::SizeVector>(
244                                 InferenceEngine::Precision::FP32,
245                                 input.second->getTensorDesc().getLayout(), input.second->dims());
246                         convertedInputs.push_back(iconv);
247                         iconv->allocate();
248                         in_f = dynamic_cast<InferenceEngine::TBlob<float> *>(iconv.get());
249                         InferenceEngine::copyToFloat<uint8_t>(in_f->data(), input.second.get());
250                         graph->PushInputData(input.first, iconv);
251                     } else {
252                         // Instead we can send I8 directly
253                         graph->PushInputData(input.first, input.second);
254                     }
255                     break;
256                 default:
257                     THROW_IE_EXCEPTION << "Unsupported input precision " << input.second->precision();
258             }
259         }
260         graph->Infer(m_curBatch);
261         graph->PullOutputData(_outputs);
262         if (graph->getProperty().collectPerfCounters) {
263             m_perfMap.clear();
264             graph->GetPerfData(m_perfMap);
265         }
266     };
267 #if IE_THREAD == IE_THREAD_TBB
268     auto_scope_observing observer(MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph->ptrObserver);
269     // a TBB arena is made "this" for Infer call via executing lambda for the arena
270     MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph->ptrArena->execute([&] { infer(); });
271 #else
272     infer();
273 #endif
274 }
275
276 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::GetPerformanceCounts(
277         std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> &perfMap) const {
278     perfMap = m_perfMap;
279 }
280
281 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::GetBlob(const char *name, InferenceEngine::Blob::Ptr &data) {
282     // ROI blob is returned only if it was set previously.
283     auto it = _preProcData.find(name);
284     if (it != _preProcData.end()) {
285         data = it->second.getRoiBlob();
286         return;
287     }
288
289     if (_inputs.find(name) != _inputs.end()) {
290         data = _inputs[name];
291         checkBlob(data, name, true);
292         return;
293     } else if (_networkInputs.find(name) != _networkInputs.end()) {
294         InferenceEngine::Layout l = _networkInputs[name]->getLayout();
295         InferenceEngine::Precision p = _networkInputs[name]->getPrecision();
296         InferenceEngine::SizeVector dims = _networkInputs[name]->getTensorDesc().getDims();
297
298         InferenceEngine::TensorDesc desc = InferenceEngine::TensorDesc(p, dims, l);
299         _inputs[name] = data = make_blob_with_precision(desc);
300         _inputs[name]->allocate();
301         checkBlob(data, name, true);
302         return;
303     }
304
305     if (_outputs.find(name) != _outputs.end()) {
306         data = _outputs[name];
307         checkBlob(data, name, false);
308         return;
309     } else if (_networkOutputs.find(name) != _networkOutputs.end()) {
310         InferenceEngine::Layout l = _networkOutputs[name]->getLayout();
311         InferenceEngine::Precision p = _networkOutputs[name]->getPrecision();
312         InferenceEngine::SizeVector dims = _networkOutputs[name]->getTensorDesc().getDims();
313
314         InferenceEngine::TensorDesc desc = InferenceEngine::TensorDesc(p, dims, l);
315         _outputs[name] = data = make_blob_with_precision(desc);
316         _outputs[name]->allocate();
317         checkBlob(data, name, false);
318         return;
319     }
320
321     THROW_IE_EXCEPTION << "Cannot find blob with name: " << name;
322 }
323
324 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::SetBlob(const char *name, const InferenceEngine::Blob::Ptr &data) {
325     if (!data)
326         THROW_IE_EXCEPTION << NOT_ALLOCATED_str << "Failed to set empty blob with name: \'" << name << "\'";
327     if (data->buffer() == nullptr)
328         THROW_IE_EXCEPTION << "Input data was not allocated. Input name: \'" << name << "\'";
329     if (name == nullptr) {
330         THROW_IE_EXCEPTION << NOT_FOUND_str + "Failed to set blob with empty name";
331     }
332     InferenceEngine::InputInfo::Ptr foundInput;
333     InferenceEngine::DataPtr foundOutput;
334     size_t dataSize = data->size();
335     if (findInputAndOutputBlobByName(name, foundInput, foundOutput)) {
336         if (foundInput->getInputPrecision() != data->precision()) {
337             THROW_IE_EXCEPTION << PARAMETER_MISMATCH_str << "Failed to set Blob with precision "
338                                << data->precision();
339         }
340
341         if (foundInput->getPreProcess().getResizeAlgorithm() != InferenceEngine::ResizeAlgorithm::NO_RESIZE) {
342             PreProcessData::isApplicable(data, _inputs[name]);
343             // Stores the given blob as ROI blob. It will be used to fill in network input during pre-processing.
344             _preProcData[name].setRoiBlob(data);
345         } else {
346             size_t inputSize = InferenceEngine::details::product(foundInput->getDims());
347             if (dataSize != inputSize) {
348                 THROW_IE_EXCEPTION << "Input blob size is not equal network input size ("
349                                    << dataSize << "!=" << inputSize << ").";
350             }
351             _inputs[name] = data;
352         }
353     } else {
354         size_t outputSize = InferenceEngine::details::product(foundOutput->getDims());
355         if (dataSize != outputSize) {
356             THROW_IE_EXCEPTION << "Output blob size is not equal network output size ("
357                                << dataSize << "!=" << outputSize << ").";
358         }
359         if (foundOutput->getPrecision() != data->precision()) {
360             THROW_IE_EXCEPTION << PARAMETER_MISMATCH_str
361                                << "Failed to set Blob with precision not corresponding to user output precision";
362         }
363         _outputs[name] = data;
364     }
365 }
366
367 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::SetBatch(int new_batch) {
368     if (new_batch < 1) {
369         THROW_IE_EXCEPTION << "Invalid dynamic batch size " << new_batch <<
370                            " for this request.";
371     }
372     m_curBatch = new_batch;
373 }
374
375 }  // namespace MKLDNNPlugin