2 * Copyright (c) 2020-2022 Hailo Technologies Ltd. All rights reserved.
3 * Distributed under the MIT license (https://opensource.org/licenses/MIT)
6 * @file raw_async_streams_single_thread_example
7 * This example demonstrates using low level async streams using single thread over c++.
10 #include "hailo/hailort.hpp"
15 #include <condition_variable>
21 using namespace hailort;
23 using AlignedBuffer = std::shared_ptr<uint8_t>;
24 static AlignedBuffer page_aligned_alloc(size_t size)
27 auto addr = mmap(NULL, size, PROT_WRITE | PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
28 if (MAP_FAILED == addr) throw std::bad_alloc();
29 return AlignedBuffer(reinterpret_cast<uint8_t*>(addr), [size](void *addr) { munmap(addr, size); });
30 #elif defined(_MSC_VER)
31 auto addr = VirtualAlloc(NULL, size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
32 if (!addr) throw std::bad_alloc();
33 return AlignedBuffer(reinterpret_cast<uint8_t*>(addr), [](void *addr){ VirtualFree(addr, 0, MEM_RELEASE); });
35 #pragma error("Aligned alloc not supported")
39 static hailo_status infer(ConfiguredNetworkGroup &network_group, InputStream &input, OutputStream &output)
41 auto input_queue_size = input.get_async_max_queue_size();
42 auto output_queue_size = output.get_async_max_queue_size();
43 if (!input_queue_size || !output_queue_size) {
44 std::cerr << "Failed getting async queue size" << std::endl;
45 return HAILO_INTERNAL_FAILURE;
48 // We store buffers vector here as a guard for the memory. The buffer will be freed only after
49 // activated_network_group will be released.
50 std::vector<AlignedBuffer> buffer_guards;
52 OutputStream::TransferDoneCallback read_done = [&output, &read_done](const OutputStream::CompletionInfo &completion_info) {
53 hailo_status status = HAILO_UNINITIALIZED;
54 switch (completion_info.status) {
56 // Real applications can forward the buffer to post-process/display. Here we just re-launch new async read.
57 status = output.read_async(completion_info.buffer_addr, completion_info.buffer_size, read_done);
58 if ((HAILO_SUCCESS != status) && (HAILO_STREAM_NOT_ACTIVATED != status)) {
59 std::cerr << "Failed read async with status=" << status << std::endl;
62 case HAILO_STREAM_ABORTED_BY_USER:
63 // Transfer was canceled, finish gracefully.
66 std::cerr << "Got an unexpected status on callback. status=" << completion_info.status << std::endl;
70 InputStream::TransferDoneCallback write_done = [&input, &write_done](const InputStream::CompletionInfo &completion_info) {
71 hailo_status status = HAILO_UNINITIALIZED;
72 switch (completion_info.status) {
74 // Real applications may free the buffer and replace it with new buffer ready to be sent. Here we just
75 // re-launch new async write.
76 status = input.write_async(completion_info.buffer_addr, completion_info.buffer_size, write_done);
77 if ((HAILO_SUCCESS != status) && (HAILO_STREAM_NOT_ACTIVATED != status)) {
78 std::cerr << "Failed read async with status=" << status << std::endl;
81 case HAILO_STREAM_ABORTED_BY_USER:
82 // Transfer was canceled, finish gracefully.
85 std::cerr << "Got an unexpected status on callback. status=" << completion_info.status << std::endl;
89 // The destructor of activated_network_group will make sure that all async operations are done. All pending
90 // operations will be canceled and their callbacks will be called with status=HAILO_STREAM_ABORTED_BY_USER.
91 // Be sure to capture variables in the callbacks that will be destructed after the activated_network_group.
92 // Otherwise, the lambda would have access an uninitialized data.
93 auto activated_network_group = network_group.activate();
94 if (!activated_network_group) {
95 std::cerr << "Failed to activate network group " << activated_network_group.status() << std::endl;
96 return activated_network_group.status();
99 // We launch "*output_queue_size" async read operation. On each async callback, we launch a new async read operation.
100 for (size_t i = 0; i < *output_queue_size; i++) {
101 // Buffers read from async operation must be page aligned.
102 auto buffer = page_aligned_alloc(output.get_frame_size());
103 auto status = output.read_async(buffer.get(), output.get_frame_size(), read_done);
104 if (HAILO_SUCCESS != status) {
105 std::cerr << "read_async failed with status=" << status << std::endl;
109 buffer_guards.emplace_back(buffer);
112 // We launch "*input_queue_size" async write operation. On each async callback, we launch a new async write operation.
113 for (size_t i = 0; i < *input_queue_size; i++) {
114 // Buffers written to async operation must be page aligned.
115 auto buffer = page_aligned_alloc(input.get_frame_size());
116 auto status = input.write_async(buffer.get(), input.get_frame_size(), write_done);
117 if (HAILO_SUCCESS != status) {
118 std::cerr << "write_async failed with status=" << status << std::endl;
122 buffer_guards.emplace_back(buffer);
125 // After all async operations are launched, the inference will continue until the activated_network_group
126 // destructor is called.
127 std::this_thread::sleep_for(std::chrono::seconds(5));
129 return HAILO_SUCCESS;
133 static Expected<std::shared_ptr<ConfiguredNetworkGroup>> configure_network_group(Device &device, const std::string &hef_path)
135 auto hef = Hef::create(hef_path);
137 return make_unexpected(hef.status());
140 auto configure_params = device.create_configure_params(hef.value());
141 if (!configure_params) {
142 return make_unexpected(configure_params.status());
145 // change stream_params to operate in async mode
146 for (auto &ng_name_params_pair : *configure_params) {
147 for (auto &stream_params_name_pair : ng_name_params_pair.second.stream_params_by_name) {
148 stream_params_name_pair.second.flags = HAILO_STREAM_FLAGS_ASYNC;
152 auto network_groups = device.configure(hef.value(), configure_params.value());
153 if (!network_groups) {
154 return make_unexpected(network_groups.status());
157 if (1 != network_groups->size()) {
158 std::cerr << "Invalid amount of network groups" << std::endl;
159 return make_unexpected(HAILO_INTERNAL_FAILURE);
162 return std::move(network_groups->at(0));
167 auto device = Device::create();
169 std::cerr << "Failed to create device " << device.status() << std::endl;
170 return device.status();
173 static const auto HEF_FILE = "hefs/shortcut_net.hef";
174 auto network_group = configure_network_group(*device.value(), HEF_FILE);
175 if (!network_group) {
176 std::cerr << "Failed to configure network group" << HEF_FILE << std::endl;
177 return network_group.status();
180 // Assume one input and output
181 auto output = network_group->get()->get_output_streams()[0];
182 auto input = network_group->get()->get_input_streams()[0];
184 // Now start the inference
185 auto status = infer(*network_group.value(), input.get(), output.get());
186 if (HAILO_SUCCESS != status) {
187 std::cerr << "Inference failed with " << status << std::endl;
191 std::cout << "Inference finished successfully" << std::endl;
192 return HAILO_SUCCESS;