Imported Upstream version 1.25.0
[platform/core/ml/nnfw.git] / runtime / onert / backend / trix / DevContext.cc
1 /*
2  * Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "DevContext.h"
18
19 #include "Convert.h"
20
21 #include <stdexcept>
22
23 namespace onert
24 {
25 namespace backend
26 {
27 namespace trix
28 {
29
30 // All things related to npu device handle are gathered this Class, but when implementing npu
31 // deamon, others except the context roles should be seperated.
32 DevContext::DevContext() : _dev_handles{}, _model_ids{}, _meta_map{}
33 {
34   auto dev_count = getnumNPUdeviceByType(NPUCOND_TRIV2_CONN_SOCIP);
35   if (dev_count <= 0)
36   {
37     throw std::runtime_error("Unable to find TRIX NPU device");
38   }
39
40   // Get NPU device handles
41   for (int i = 0; i < dev_count; ++i)
42   {
43     npudev_h handle;
44     if (getNPUdeviceByType(&handle, NPUCOND_TRIV2_CONN_SOCIP, i) < 0)
45     {
46       throw std::runtime_error("Failed to get TRIX NPU device handle");
47     }
48     _dev_handles.emplace_back(handle);
49   }
50
51   // NOTE Do not change the number of threads as long as jobs in thread call
52   //      the synchronous APIs such as submitNPU_request()
53   _batch_thread_pool = std::make_unique<BatchThreadPool>(_dev_handles.size());
54   // We need to careful not to create multiple `BatchThreadPool`. In case of multiple models, there
55   // may be a problem having multiple `BatchThreadPool` in current implementation. But if this
56   // creating thread pool is moved to npu deamon, I think this problem will be solved smoothly.
57 }
58
59 DevContext::~DevContext()
60 {
61   // NOTE Must release _batch_thread_pool before releasing _dev_handles to wait for all threads to
62   //      be terminated
63   _batch_thread_pool.reset(nullptr);
64
65   for (const auto &dev_handle : _dev_handles)
66   {
67     unregisterNPUmodel_all(dev_handle);
68     putNPUdevice(dev_handle);
69   }
70 }
71
72 ModelID DevContext::registerModel(const std::string &model_file_path)
73 {
74   if (_dev_handles.size() == 0)
75   {
76     throw std::runtime_error("No npu device is available");
77   }
78
79   std::unique_ptr<npubin_meta, decltype(&free)> meta(
80     getNPUmodel_metadata(model_file_path.c_str(), false), free);
81
82   if (meta == nullptr)
83   {
84     throw std::runtime_error("Unable to extract the model metadata");
85   }
86
87   generic_buffer file_info;
88   file_info.type = BUFFER_FILE;
89   file_info.filepath = model_file_path.c_str();
90   file_info.size = meta->size;
91
92   ModelID model_id = 0;
93
94   for (uint32_t dev_num = 0; dev_num < _dev_handles.size(); ++dev_num)
95   {
96     // Register model for each device
97     uint32_t model_id_at_device;
98     if (registerNPUmodel(_dev_handles.at(dev_num), &file_info, &model_id_at_device) < 0)
99     {
100       throw std::runtime_error("Failed to register npu model");
101     }
102
103     if (dev_num == 0)
104     {
105       model_id = model_id_at_device;
106       _meta_map[model_id_at_device] = std::shared_ptr<npubin_meta>(std::move(meta));
107     }
108     else
109     {
110       _meta_map[model_id_at_device] = _meta_map[model_id];
111     }
112
113     _model_ids[model_id].resize(dev_num + 1);
114     _model_ids[model_id].at(dev_num) = model_id_at_device;
115   }
116
117   // Return the model id for device 0 only
118   return model_id;
119 }
120
121 void DevContext::unRegisterModel(ModelID model_id)
122 {
123   for (uint32_t dev_num = 0; dev_num < _dev_handles.size(); ++dev_num)
124   {
125     const auto model_id_at_device = _model_ids.at(model_id).at(dev_num);
126     const auto &dev_handle = _dev_handles.at(dev_num);
127
128     // Remove meta data
129     _meta_map.erase(model_id_at_device);
130
131     // Unregister Model for each device
132     unregisterNPUmodel(dev_handle, model_id_at_device);
133   }
134   // Remove model IDs
135   _model_ids.erase(model_id);
136 }
137
138 void DevContext::requestRun(ModelID model_id, input_buffers *input_bufs, tensors_data_info *in_info,
139                             output_buffers *output_bufs, tensors_data_info *out_info,
140                             size_t batch_size)
141 {
142   if (batch_size > 1)
143   {
144     if (in_info->num_info != 1)
145     {
146       throw std::runtime_error("Supported only an input that has batch now");
147     }
148     if (out_info->num_info != 1)
149     {
150       throw std::runtime_error("Supported only one output now");
151     }
152
153     if (input_bufs->bufs[0].size % batch_size != 0)
154     {
155       throw std::runtime_error("Invalid batch size. batch size :" + std::to_string(batch_size) +
156                                ", input buffer size : " + std::to_string(input_bufs->bufs[0].size));
157     }
158
159     if (output_bufs->bufs[0].size % batch_size != 0)
160     {
161       throw std::runtime_error(
162         "Invalid batch size. batch size :" + std::to_string(batch_size) +
163         ", output tensor size : " + std::to_string(output_bufs->bufs[0].size));
164     }
165
166     // inputs/outputs for each batch
167     std::vector<input_buffers> in_buffers_vec(batch_size);
168     std::vector<output_buffers> out_buffers_vec(batch_size);
169
170     // Run on thread pool
171     std::vector<std::future<int32_t>> batch_futures;
172     for (uint32_t batch_num = 0; batch_num < batch_size; ++batch_num)
173     {
174       // Enqueue jobs
175       // The in_info and out_info are always the same even if they are divided by batch, so they are
176       // used as they are.
177       auto future = _batch_thread_pool->enqueueJob(
178         [batch_size, in_info, out_info,
179          this](uint32_t dev_num, ModelID model_id, const input_buffers *input_bufs,
180                const output_buffers *output_bufs, uint32_t batch_num) -> int32_t {
181           // Set buffers of inputs/outputs for each batch
182           // TODO Support multiple inputs/outputs
183           input_buffers in_batch_buffers;
184           in_batch_buffers.num_buffers = input_bufs->num_buffers;
185           const uint64_t in_batch_offset = input_bufs->bufs[0].size / batch_size;
186           setBufferByBatch(input_bufs->bufs[0], batch_num, in_batch_offset,
187                            &in_batch_buffers.bufs[0]);
188
189           output_buffers out_batch_buffers;
190           out_batch_buffers.num_buffers = output_bufs->num_buffers;
191           const uint64_t out_batch_offset = output_bufs->bufs[0].size / batch_size;
192           setBufferByBatch(output_bufs->bufs[0], batch_num, out_batch_offset,
193                            &out_batch_buffers.bufs[0]);
194
195           try
196           {
197             // dev_num is the same as the thread number in _batch_thread_pool
198             this->runOneBatch(dev_num, model_id, &in_batch_buffers, in_info, &out_batch_buffers,
199                               out_info);
200           }
201           catch (...)
202           {
203             _eptr = std::current_exception();
204           }
205
206           return batch_num;
207         },
208         model_id, input_bufs, output_bufs, batch_num);
209       batch_futures.emplace_back(std::move(future));
210     }
211
212     for (auto &&future : batch_futures)
213     {
214       future.get();
215     }
216
217     if (_eptr)
218     {
219       std::exception_ptr eptr(nullptr);
220       _eptr.swap(eptr);
221       std::rethrow_exception(eptr);
222     }
223   }
224   else
225   {
226     runOneBatch(0, model_id, input_bufs, in_info, output_bufs, out_info);
227   }
228 }
229
230 void DevContext::runOneBatch(uint32_t dev_num, ModelID model_id, input_buffers *input_bufs,
231                              tensors_data_info *in_info, output_buffers *output_bufs,
232                              tensors_data_info *out_info)
233 {
234   const auto &model_id_at_device = _model_ids.at(model_id).at(dev_num);
235
236   const auto meta = _meta_map.at(model_id_at_device);
237   if (meta->input_seg_num != in_info->num_info)
238   {
239     throw std::runtime_error("The number of inputs does not match to model input seg num");
240   }
241
242   if (meta->output_seg_num != out_info->num_info)
243   {
244     throw std::runtime_error("The number of outputs does not match to model output seg num");
245   }
246
247   const auto &dev_handle = _dev_handles.at(dev_num);
248   int req_id;
249
250   if (auto error_code = createNPU_request(dev_handle, model_id_at_device, &req_id))
251   {
252     throw std::runtime_error("Unable to create NPU request with model id (" +
253                              std::to_string(model_id_at_device) + ")" +
254                              " error code : " + std::to_string(error_code));
255   }
256
257   if (auto error_code =
258         setNPU_requestData(dev_handle, req_id, input_bufs, in_info, output_bufs, out_info))
259   {
260     removeNPU_request(dev_handle, req_id);
261     throw std::runtime_error("Unable to create NPU request for model id (" +
262                              std::to_string(model_id_at_device) + ")" +
263                              " error code : " + std::to_string(error_code));
264   }
265
266   // NOTE submitNPU_request is not thread-safe(?). It is rarely hanging(unresponsive).
267   //      Ultimately, to solve this problem, we have to either use other thread-safe API or
268   //      change submitNPU_request to be thread-safe, but both works take time.
269   //      As a workaround, let's allow hanging thread.
270   // TODO Change submitNPU_request to be thread-safe or replaced with other thread-safe API
271   std::packaged_task<int(npudev_h, int)> task(submitNPU_request);
272   auto f = task.get_future();
273   std::thread thread_submit_request(std::move(task), dev_handle, req_id);
274   auto status = f.wait_until(std::chrono::system_clock::now() + std::chrono::seconds(60));
275   if (status == std::future_status::timeout)
276   {
277     // There is no way to terminate hanging submitNPU_request from the outside.
278     // If a hanging thread is detached, it will remain as a hanging thread. Even so, it's better
279     // than having the main thread hanging.
280     thread_submit_request.detach();
281
282     // TODO Enable removeNPU_request after resolving hanging.
283     // removeNPU_request(dev_handle, req_id);
284     throw std::runtime_error("The npu API \"submitNPU_request\" timeout");
285   }
286
287   auto error_code = f.get();
288   thread_submit_request.join();
289   if (error_code != 0)
290   {
291     removeNPU_request(dev_handle, req_id);
292     throw std::runtime_error("Unable to submit NPU request with req id (" + std::to_string(req_id) +
293                              ")" + " error code : " + std::to_string(error_code));
294   }
295
296   if (auto error_code = removeNPU_request(dev_handle, req_id))
297   {
298     throw std::runtime_error("Unable to remove NPU request with req id (" + std::to_string(req_id) +
299                              ")" + " error code : " + std::to_string(error_code));
300   }
301 }
302
303 void DevContext::setBufferByBatch(const generic_buffer &origin_buf, uint32_t batch_num,
304                                   uint64_t batch_offset, generic_buffer *batch_buf)
305 {
306   batch_buf->addr = reinterpret_cast<uint8_t *>(origin_buf.addr) + batch_num * batch_offset;
307   batch_buf->size = batch_offset;
308   batch_buf->type = BUFFER_MAPPED;
309 }
310
311 } // namespace trix
312 } // namespace backend
313 } // namespace onert