Publishing 2019 R1 content
[platform/upstream/dldt.git] / inference-engine / src / inference_engine / cpp_interfaces / ie_task_executor.cpp
1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 #include <string>
6 #include <vector>
7 #include <memory>
8 #include <mutex>
9 #include <condition_variable>
10 #include <thread>
11 #include <queue>
12 #include <ie_profiling.hpp>
13 #include "details/ie_exception.hpp"
14 #include "ie_task.hpp"
15 #include "ie_task_executor.hpp"
16
17 namespace InferenceEngine {
18
19 TaskExecutor::TaskExecutor(std::string name) : _isStopped(false), _name(name) {
20     _thread = std::make_shared<std::thread>([&] {
21         anotateSetThreadName(("TaskExecutor thread for " + _name).c_str());
22         while (!_isStopped) {
23             bool isQueueEmpty;
24             Task::Ptr currentTask;
25             {  // waiting for the new task or for stop signal
26                 std::unique_lock<std::mutex> lock(_queueMutex);
27                 _queueCondVar.wait(lock, [&]() { return !_taskQueue.empty() || _isStopped; });
28                 isQueueEmpty = _taskQueue.empty();
29                 if (!isQueueEmpty) currentTask = _taskQueue.front();
30             }
31             if (_isStopped && isQueueEmpty)
32                 break;
33             if (!isQueueEmpty) {
34                 currentTask->runNoThrowNoBusyCheck();
35                 std::unique_lock<std::mutex> lock(_queueMutex);
36                 _taskQueue.pop();
37                 isQueueEmpty = _taskQueue.empty();
38                 if (isQueueEmpty) {
39                     // notify dtor, that all tasks were completed
40                     _queueCondVar.notify_all();
41                 }
42             }
43         }
44     });
45 }
46
47 TaskExecutor::~TaskExecutor() {
48     {
49         std::unique_lock<std::mutex> lock(_queueMutex);
50         if (!_taskQueue.empty()) {
51             _queueCondVar.wait(lock, [this]() { return _taskQueue.empty(); });
52         }
53         _isStopped = true;
54         _queueCondVar.notify_all();
55     }
56     if (_thread && _thread->joinable()) {
57         _thread->join();
58         _thread.reset();
59     }
60 }
61
62 bool TaskExecutor::startTask(Task::Ptr task) {
63     if (!task->occupy()) return false;
64     std::unique_lock<std::mutex> lock(_queueMutex);
65     _taskQueue.push(task);
66     _queueCondVar.notify_all();
67     return true;
68 }
69
70 }  // namespace InferenceEngine