0595148789209b8eb00afe2ea39b98248cabbc28
[platform/core/ml/nnfw.git] / runtime / onert / backend / trix / DevContext.cc
1 /*
2  * Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "DevContext.h"
18
19 #include "Convert.h"
20
21 #include <stdexcept>
22
23 namespace onert
24 {
25 namespace backend
26 {
27 namespace trix
28 {
29
30 // All things related to npu device handle are gathered this Class, but when implementing npu
31 // deamon, others except the context roles should be seperated.
32 DevContext::DevContext() : _dev_handles{}, _model_ids{}, _meta_map{}
33 {
34   auto dev_count = getnumNPUdeviceByType(NPUCOND_TRIV2_CONN_SOCIP);
35   if (dev_count <= 0)
36   {
37     throw std::runtime_error("Unable to find TRIX NPU device");
38   }
39
40   // Get NPU device handles
41   for (int i = 0; i < dev_count; ++i)
42   {
43     npudev_h handle;
44     if (getNPUdeviceByType(&handle, NPUCOND_TRIV2_CONN_SOCIP, i) < 0)
45     {
46       throw std::runtime_error("Failed to get TRIX NPU device handle");
47     }
48     _dev_handles.emplace_back(handle);
49   }
50
51   // NOTE Do not change the number of threads as long as jobs in thread call
52   //      the synchronous APIs such as submitNPU_request()
53   _batch_thread_pool = std::make_unique<BatchThreadPool>(_dev_handles.size());
54   // We need to careful not to create multiple `BatchThreadPool`. In case of multiple models, there
55   // may be a problem having multiple `BatchThreadPool` in current implementation. But if this
56   // creating thread pool is moved to npu deamon, I think this problem will be solved smoothly.
57 }
58
59 DevContext::~DevContext()
60 {
61   // NOTE Must release _batch_thread_pool before releasing _dev_handles to wait for all threads to
62   //      be terminated
63   _batch_thread_pool.reset(nullptr);
64
65   for (const auto &dev_handle : _dev_handles)
66   {
67     unregisterNPUmodel_all(dev_handle);
68     putNPUdevice(dev_handle);
69   }
70 }
71
72 ModelID DevContext::registerModel(const std::string &model_file_path)
73 {
74   auto meta = getNPUmodel_metadata(model_file_path.c_str(), false);
75
76   if (meta == nullptr)
77   {
78     throw std::runtime_error("Unable to extract the model metadata");
79   }
80
81   generic_buffer file_info;
82   file_info.type = BUFFER_FILE;
83   file_info.filepath = model_file_path.c_str();
84   file_info.size = meta->size;
85
86   ModelID model_id;
87
88   for (uint32_t dev_num = 0; dev_num < _dev_handles.size(); ++dev_num)
89   {
90     // Register model for each device
91     uint32_t model_id_at_device;
92     if (registerNPUmodel(_dev_handles.at(dev_num), &file_info, &model_id_at_device) < 0)
93     {
94       throw std::runtime_error("Failed to register npu model");
95     }
96
97     if (dev_num == 0)
98     {
99       model_id = model_id_at_device;
100       _meta_map[model_id_at_device] = std::shared_ptr<npubin_meta>(meta);
101     }
102     else
103     {
104       _meta_map[model_id_at_device] = _meta_map[model_id];
105     }
106
107     _model_ids[model_id].resize(dev_num + 1);
108     _model_ids[model_id].at(dev_num) = model_id_at_device;
109   }
110
111   // Return the model id for device 0 only
112   return model_id;
113 }
114
115 void DevContext::unRegisterModel(ModelID model_id)
116 {
117   for (uint32_t dev_num = 0; dev_num < _dev_handles.size(); ++dev_num)
118   {
119     const auto model_id_at_device = _model_ids.at(model_id).at(dev_num);
120     const auto &dev_handle = _dev_handles.at(dev_num);
121
122     // Remove meta data
123     _meta_map.erase(model_id_at_device);
124
125     // Unregister Model for each device
126     unregisterNPUmodel(dev_handle, model_id_at_device);
127   }
128   // Remove model IDs
129   _model_ids.erase(model_id);
130 }
131
132 void DevContext::requestRun(ModelID model_id, input_buffers *input_bufs, tensors_data_info *in_info,
133                             output_buffers *output_bufs, tensors_data_info *out_info,
134                             size_t batch_size)
135 {
136   if (batch_size > 1)
137   {
138     if (in_info->num_info != 1)
139     {
140       throw std::runtime_error("Supported only an input that has batch now");
141     }
142     if (out_info->num_info != 1)
143     {
144       throw std::runtime_error("Supported only one output now");
145     }
146
147     if (input_bufs->bufs[0].size % batch_size != 0)
148     {
149       throw std::runtime_error("Invalid batch size. batch size :" + std::to_string(batch_size) +
150                                ", input buffer size : " + std::to_string(input_bufs->bufs[0].size));
151     }
152
153     if (output_bufs->bufs[0].size % batch_size != 0)
154     {
155       throw std::runtime_error(
156         "Invalid batch size. batch size :" + std::to_string(batch_size) +
157         ", output tensor size : " + std::to_string(output_bufs->bufs[0].size));
158     }
159
160     // inputs/outputs for each batch
161     std::vector<input_buffers> in_buffers_vec(batch_size);
162     std::vector<output_buffers> out_buffers_vec(batch_size);
163
164     // Run on thread pool
165     std::vector<std::future<int32_t>> batch_futures;
166     for (uint32_t batch_num = 0; batch_num < batch_size; ++batch_num)
167     {
168       // Enqueue jobs
169       // The in_info and out_info are always the same even if they are divided by batch, so they are
170       // used as they are.
171       auto future = _batch_thread_pool->enqueueJob(
172         [batch_size, in_info, out_info,
173          this](uint32_t dev_num, ModelID model_id, const input_buffers *input_bufs,
174                const output_buffers *output_bufs, uint32_t batch_num) -> int32_t {
175           // Set buffers of inputs/outputs for each batch
176           // TODO Support multiple inputs/outputs
177           input_buffers in_batch_buffers;
178           in_batch_buffers.num_buffers = input_bufs->num_buffers;
179           const uint64_t in_batch_offset = input_bufs->bufs[0].size / batch_size;
180           setBufferByBatch(input_bufs->bufs[0], batch_num, in_batch_offset,
181                            &in_batch_buffers.bufs[0]);
182
183           output_buffers out_batch_buffers;
184           out_batch_buffers.num_buffers = output_bufs->num_buffers;
185           const uint64_t out_batch_offset = output_bufs->bufs[0].size / batch_size;
186           setBufferByBatch(output_bufs->bufs[0], batch_num, out_batch_offset,
187                            &out_batch_buffers.bufs[0]);
188
189           try
190           {
191             // dev_num is the same as the thread number in _batch_thread_pool
192             this->runOneBatch(dev_num, model_id, &in_batch_buffers, in_info, &out_batch_buffers,
193                               out_info);
194           }
195           catch (...)
196           {
197             _eptr = std::current_exception();
198           }
199
200           return batch_num;
201         },
202         model_id, input_bufs, output_bufs, batch_num);
203       batch_futures.emplace_back(std::move(future));
204     }
205
206     for (auto &&future : batch_futures)
207     {
208       future.get();
209     }
210
211     if (_eptr)
212     {
213       std::exception_ptr eptr(nullptr);
214       _eptr.swap(eptr);
215       std::rethrow_exception(eptr);
216     }
217   }
218   else
219   {
220     runOneBatch(0, model_id, input_bufs, in_info, output_bufs, out_info);
221   }
222 }
223
224 void DevContext::runOneBatch(uint32_t dev_num, ModelID model_id, input_buffers *input_bufs,
225                              tensors_data_info *in_info, output_buffers *output_bufs,
226                              tensors_data_info *out_info)
227 {
228   const auto &model_id_at_device = _model_ids.at(model_id).at(dev_num);
229
230   const auto meta = _meta_map.at(model_id_at_device);
231   if (meta->input_seg_num != in_info->num_info)
232   {
233     throw std::runtime_error("The number of inputs does not match to model input seg num");
234   }
235
236   if (meta->output_seg_num != out_info->num_info)
237   {
238     throw std::runtime_error("The number of outputs does not match to model output seg num");
239   }
240
241   const auto &dev_handle = _dev_handles.at(dev_num);
242   int req_id;
243
244   if (auto error_code = createNPU_request(dev_handle, model_id_at_device, &req_id))
245   {
246     throw std::runtime_error("Unable to create NPU request with model id (" +
247                              std::to_string(model_id_at_device) + ")" +
248                              " error code : " + std::to_string(error_code));
249   }
250
251   if (auto error_code =
252         setNPU_requestData(dev_handle, req_id, input_bufs, in_info, output_bufs, out_info))
253   {
254     removeNPU_request(dev_handle, req_id);
255     throw std::runtime_error("Unable to create NPU request for model id (" +
256                              std::to_string(model_id_at_device) + ")" +
257                              " error code : " + std::to_string(error_code));
258   }
259
260   // NOTE submitNPU_request is not thread-safe(?). It is rarely hanging(unresponsive).
261   //      Ultimately, to solve this problem, we have to either use other thread-safe API or
262   //      change submitNPU_request to be thread-safe, but both works take time.
263   //      As a workaround, let's allow hanging thread.
264   // TODO Change submitNPU_request to be thread-safe or replaced with other thread-safe API
265   std::packaged_task<int(npudev_h, int)> task(submitNPU_request);
266   auto f = task.get_future();
267   std::thread thread_submit_request(std::move(task), dev_handle, req_id);
268   auto status = f.wait_until(std::chrono::system_clock::now() + std::chrono::seconds(60));
269   if (status == std::future_status::timeout)
270   {
271     // There is no way to terminate hanging submitNPU_request from the outside.
272     // If a hanging thread is detached, it will remain as a hanging thread. Even so, it's better
273     // than having the main thread hanging.
274     thread_submit_request.detach();
275
276     // TODO Enable removeNPU_request after resolving hanging.
277     // removeNPU_request(dev_handle, req_id);
278     throw std::runtime_error("The npu API \"submitNPU_request\" timeout");
279   }
280
281   auto error_code = f.get();
282   thread_submit_request.join();
283   if (error_code != 0)
284   {
285     removeNPU_request(dev_handle, req_id);
286     throw std::runtime_error("Unable to submit NPU request with req id (" + std::to_string(req_id) +
287                              ")" + " error code : " + std::to_string(error_code));
288   }
289
290   if (auto error_code = removeNPU_request(dev_handle, req_id))
291   {
292     throw std::runtime_error("Unable to remove NPU request with req id (" + std::to_string(req_id) +
293                              ")" + " error code : " + std::to_string(error_code));
294   }
295 }
296
297 void DevContext::setBufferByBatch(const generic_buffer &origin_buf, uint32_t batch_num,
298                                   uint64_t batch_offset, generic_buffer *batch_buf)
299 {
300   batch_buf->addr = reinterpret_cast<uint8_t *>(origin_buf.addr) + batch_num * batch_offset;
301   batch_buf->size = batch_offset;
302   batch_buf->type = BUFFER_MAPPED;
303 }
304
305 } // namespace trix
306 } // namespace backend
307 } // namespace onert