1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
13 #include "mkldnn_graph.h"
14 #include "ie_parallel.hpp"
15 #include "mkldnn_streams.h"
17 using namespace mkldnn;
18 using namespace MKLDNNPlugin;
19 using namespace InferenceEngine;
20 using namespace InferenceEngine::details;
22 namespace MKLDNNPlugin {
24 thread_local MultiWorkerTaskContext MultiWorkerTaskExecutor::ptrContext;
26 bool check_env_variables() {
27 #if IE_THREAD == IE_THREAD_OMP
28 return MKLDNNPlugin::cpu::checkOpenMpEnvVars(false);
34 #if !(defined(__APPLE__) || defined(_WIN32))
35 /* Get the cores affinity mask for the current process */
36 bool get_process_mask(int& ncpus, cpu_set_t*& mask) {
37 for (ncpus = sizeof(cpu_set_t) / CHAR_BIT; ncpus < 32768 /* reasonable limit of #cores*/; ncpus <<= 1) {
38 mask = CPU_ALLOC(ncpus);
39 if (!mask) return false;
41 const size_t size = CPU_ALLOC_SIZE(ncpus);
42 CPU_ZERO_S(size, mask);
43 const int err = sched_getaffinity(getpid(), size, mask);
44 // the result fits the mask
46 // mask size is not enough
50 if (errno != EINVAL) break;
57 /* Pin current thread to a set of cores determined by the mask. */
58 bool pin_current_thread_by_mask(int ncores, const cpu_set_t* proc_mask) {
59 return 0 == sched_setaffinity(0, ncores, proc_mask);
61 /* Pin thread to a spare core in the round-robin scheme, while respecting the given process mask.
62 * The function can also handle the hyper-threading (by populating the physical cores first) */
63 bool pin_thread_to_vacant_core(int thr_idx, int hyperthreads, int ncores, const cpu_set_t* proc_mask) {
64 if (proc_mask == nullptr)
66 const size_t size = CPU_ALLOC_SIZE(ncores);
67 const int num_cpus = CPU_COUNT_S(size, proc_mask);
68 thr_idx %= num_cpus; // To limit unique number in [; num_cpus-1] range
70 // Place threads with specified step
72 for (int i = 0, offset = 0; i < thr_idx; ++i) {
73 cpu_idx += hyperthreads;
74 if (cpu_idx >= num_cpus)
78 // Find index of 'cpu_idx'-th bit that equals to 1
80 while (cpu_idx >= 0) {
81 if (CPU_ISSET_S(++mapped_idx, size, proc_mask))
85 cpu_set_t *target_mask = CPU_ALLOC(ncores);
86 CPU_ZERO_S(size, target_mask);
87 CPU_SET_S(mapped_idx, size, target_mask);
88 bool res = pin_current_thread_by_mask(size, target_mask);
89 CPU_FREE(target_mask);
92 #else // no threads pinning/binding on Win/MacOS
93 bool get_process_mask(int& ncpus, cpu_set_t*& mask) {
98 bool pin_thread_to_vacant_core(int thr_idx, int hyperthreads, int ncores, const cpu_set_t* proc_mask) {
101 bool pin_current_thread_by_mask(int ncores, const cpu_set_t* proc_mask) {
104 #endif // !(defined(__APPLE__) || defined(_WIN32))
106 MultiWorkerTaskExecutor::MultiWorkerTaskExecutor(const std::vector<Task::Ptr>& init_tasks, std::string name) :
107 _isStopped(false), _name(name), _initCount(0) {
108 for (auto t : init_tasks) {
109 _threads.push_back(std::thread([&, t] {
110 // initialization (no contention, every worker thread is doing it's own task)
111 t->runNoThrowNoBusyCheck();
114 while (!_isStopped) {
116 Task::Ptr currentTask = nullptr;
117 { // waiting for the new task or for stop signal
118 std::unique_lock<std::mutex> lock(_queueMutex);
119 _queueCondVar.wait(lock, [&]() { return !_taskQueue.empty() || _isStopped; });
120 isQueueEmpty = _taskQueue.empty();
122 currentTask = _taskQueue.front();
124 isQueueEmpty = _taskQueue.empty();
128 currentTask->runNoThrowNoBusyCheck();
131 if (isQueueEmpty) // notify dtor, that all tasks were completed
132 _queueCondVar.notify_all();
136 while (_initCount != init_tasks.size()) {
137 std::this_thread::sleep_for(std::chrono::milliseconds(10));
141 MultiWorkerTaskExecutor::~MultiWorkerTaskExecutor() {
143 std::unique_lock<std::mutex> lock(_queueMutex);
144 if (!_taskQueue.empty()) {
145 _queueCondVar.wait(lock, [this]() { return _taskQueue.empty(); });
148 _queueCondVar.notify_all();
150 for (auto& thread : _threads) {
151 if (thread.joinable()) {
157 bool MultiWorkerTaskExecutor::startTask(Task::Ptr task) {
158 if (!task->occupy()) return false;
159 std::unique_lock<std::mutex> lock(_queueMutex);
160 _taskQueue.push(task);
161 _queueCondVar.notify_one();
165 MKLDNNPlugin::MKLDNNGraphlessInferRequest::MKLDNNGraphlessInferRequest(InferenceEngine::InputsDataMap networkInputs,
166 InferenceEngine::OutputsDataMap networkOutputs)
167 : InferRequestInternal(networkInputs, networkOutputs), m_curBatch(-1) {
168 // Allocate all input blobs
169 for (const auto& it : networkInputs) {
170 InferenceEngine::Blob::Ptr blob;
171 GetBlob(it.first.c_str(), blob);
173 // Allocate all output blobs
174 for (const auto& it : networkOutputs) {
175 InferenceEngine::Blob::Ptr blob;
176 GetBlob(it.first.c_str(), blob);
181 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::InferImpl() {
182 IE_PROFILING_AUTO_SCOPE(MKLDNN_INFER)
184 auto infer = [this] {
185 IE_ASSERT(MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph != nullptr);
186 MKLDNNGraph::Ptr graph = MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph;
187 if (!graph->IsReady())
188 THROW_IE_EXCEPTION << "Network not loaded.";
189 if (m_curBatch > 0 && !graph->getProperty().enableDynamicBatch)
190 THROW_IE_EXCEPTION << "Dynamic batch is not enabled.";
192 if (m_curBatch > graph->getProperty().batchLimit)
193 THROW_IE_EXCEPTION << "Invalid dynamic batch size " << m_curBatch <<
194 " for this request.";
196 // execute input pre-processing.
197 execDataPreprocessing(_inputs);
199 // need to retain converted blobs until infer finish
200 std::vector<InferenceEngine::Blob::Ptr> convertedInputs;
201 for (auto input : _inputs) {
202 if (!_networkInputs[input.first]) {
203 THROW_IE_EXCEPTION <<
204 "input blobs map contains not registered during IInferencePlugin::LoadNetwork blob with name "
207 InferenceEngine::Blob::Ptr iconv;
208 InferenceEngine::TBlob<float> *in_f = nullptr;
209 switch (input.second->precision()) {
210 case InferenceEngine::Precision::FP32:
211 graph->PushInputData(input.first, input.second);
213 case InferenceEngine::Precision::U16:
214 // U16 is unsupported by mkldnn, so here we convert the blob and send FP32
215 iconv = InferenceEngine::make_shared_blob<float, const InferenceEngine::SizeVector>(
216 InferenceEngine::Precision::FP32,
217 input.second->getTensorDesc().getLayout(), input.second->dims());
218 convertedInputs.push_back(iconv);
220 in_f = dynamic_cast<InferenceEngine::TBlob<float> *>(iconv.get());
221 InferenceEngine::copyToFloat<uint16_t>(in_f->data(), input.second.get());
222 graph->PushInputData(input.first, iconv);
224 case InferenceEngine::Precision::I16:
225 if (graph->hasMeanImageFor(input.first)) {
226 // If a mean image exists, we convert the blob and send FP32
227 iconv = InferenceEngine::make_shared_blob<float, const InferenceEngine::SizeVector>(
228 InferenceEngine::Precision::FP32,
229 input.second->getTensorDesc().getLayout(), input.second->dims());
230 convertedInputs.push_back(iconv);
232 in_f = dynamic_cast<InferenceEngine::TBlob<float> *>(iconv.get());
233 InferenceEngine::copyToFloat<int16_t>(in_f->data(), input.second.get());
234 graph->PushInputData(input.first, iconv);
236 // Instead we can send I16 directly
237 graph->PushInputData(input.first, input.second);
240 case InferenceEngine::Precision::U8:
241 if (graph->hasMeanImageFor(input.first)) {
242 // If a mean image exists, we convert the blob and send FP32
243 iconv = InferenceEngine::make_shared_blob<float, const InferenceEngine::SizeVector>(
244 InferenceEngine::Precision::FP32,
245 input.second->getTensorDesc().getLayout(), input.second->dims());
246 convertedInputs.push_back(iconv);
248 in_f = dynamic_cast<InferenceEngine::TBlob<float> *>(iconv.get());
249 InferenceEngine::copyToFloat<uint8_t>(in_f->data(), input.second.get());
250 graph->PushInputData(input.first, iconv);
252 // Instead we can send I8 directly
253 graph->PushInputData(input.first, input.second);
257 THROW_IE_EXCEPTION << "Unsupported input precision " << input.second->precision();
260 graph->Infer(m_curBatch);
261 graph->PullOutputData(_outputs);
262 if (graph->getProperty().collectPerfCounters) {
264 graph->GetPerfData(m_perfMap);
267 #if IE_THREAD == IE_THREAD_TBB
268 auto_scope_observing observer(MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph->ptrObserver);
269 // a TBB arena is made "this" for Infer call via executing lambda for the arena
270 MKLDNNPlugin::MultiWorkerTaskExecutor::ptrContext.ptrGraph->ptrArena->execute([&] { infer(); });
276 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::GetPerformanceCounts(
277 std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> &perfMap) const {
281 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::GetBlob(const char *name, InferenceEngine::Blob::Ptr &data) {
282 // ROI blob is returned only if it was set previously.
283 auto it = _preProcData.find(name);
284 if (it != _preProcData.end()) {
285 data = it->second.getRoiBlob();
289 if (_inputs.find(name) != _inputs.end()) {
290 data = _inputs[name];
291 checkBlob(data, name, true);
293 } else if (_networkInputs.find(name) != _networkInputs.end()) {
294 InferenceEngine::Layout l = _networkInputs[name]->getLayout();
295 InferenceEngine::Precision p = _networkInputs[name]->getPrecision();
296 InferenceEngine::SizeVector dims = _networkInputs[name]->getTensorDesc().getDims();
298 InferenceEngine::TensorDesc desc = InferenceEngine::TensorDesc(p, dims, l);
299 _inputs[name] = data = make_blob_with_precision(desc);
300 _inputs[name]->allocate();
301 checkBlob(data, name, true);
305 if (_outputs.find(name) != _outputs.end()) {
306 data = _outputs[name];
307 checkBlob(data, name, false);
309 } else if (_networkOutputs.find(name) != _networkOutputs.end()) {
310 InferenceEngine::Layout l = _networkOutputs[name]->getLayout();
311 InferenceEngine::Precision p = _networkOutputs[name]->getPrecision();
312 InferenceEngine::SizeVector dims = _networkOutputs[name]->getTensorDesc().getDims();
314 InferenceEngine::TensorDesc desc = InferenceEngine::TensorDesc(p, dims, l);
315 _outputs[name] = data = make_blob_with_precision(desc);
316 _outputs[name]->allocate();
317 checkBlob(data, name, false);
321 THROW_IE_EXCEPTION << "Cannot find blob with name: " << name;
324 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::SetBlob(const char *name, const InferenceEngine::Blob::Ptr &data) {
326 THROW_IE_EXCEPTION << NOT_ALLOCATED_str << "Failed to set empty blob with name: \'" << name << "\'";
327 if (data->buffer() == nullptr)
328 THROW_IE_EXCEPTION << "Input data was not allocated. Input name: \'" << name << "\'";
329 if (name == nullptr) {
330 THROW_IE_EXCEPTION << NOT_FOUND_str + "Failed to set blob with empty name";
332 InferenceEngine::InputInfo::Ptr foundInput;
333 InferenceEngine::DataPtr foundOutput;
334 size_t dataSize = data->size();
335 if (findInputAndOutputBlobByName(name, foundInput, foundOutput)) {
336 if (foundInput->getInputPrecision() != data->precision()) {
337 THROW_IE_EXCEPTION << PARAMETER_MISMATCH_str << "Failed to set Blob with precision "
338 << data->precision();
341 if (foundInput->getPreProcess().getResizeAlgorithm() != InferenceEngine::ResizeAlgorithm::NO_RESIZE) {
342 PreProcessData::isApplicable(data, _inputs[name]);
343 // Stores the given blob as ROI blob. It will be used to fill in network input during pre-processing.
344 _preProcData[name].setRoiBlob(data);
346 size_t inputSize = InferenceEngine::details::product(foundInput->getDims());
347 if (dataSize != inputSize) {
348 THROW_IE_EXCEPTION << "Input blob size is not equal network input size ("
349 << dataSize << "!=" << inputSize << ").";
351 _inputs[name] = data;
354 size_t outputSize = InferenceEngine::details::product(foundOutput->getDims());
355 if (dataSize != outputSize) {
356 THROW_IE_EXCEPTION << "Output blob size is not equal network output size ("
357 << dataSize << "!=" << outputSize << ").";
359 if (foundOutput->getPrecision() != data->precision()) {
360 THROW_IE_EXCEPTION << PARAMETER_MISMATCH_str
361 << "Failed to set Blob with precision not corresponding to user output precision";
363 _outputs[name] = data;
367 void MKLDNNPlugin::MKLDNNGraphlessInferRequest::SetBatch(int new_batch) {
369 THROW_IE_EXCEPTION << "Invalid dynamic batch size " << new_batch <<
370 " for this request.";
372 m_curBatch = new_batch;
375 } // namespace MKLDNNPlugin