Add async manager class for async API supprort.
Async manager is used to support async API support for each service API.
Behavior summary to async manager:
There are two queues which are performed in each thread context.
One is incoming queue which manages input data feeded from input device
such as camera, screen capture module, and other external modules
through input service framework. Simply saying, input service framework
will push input data feeded from input device.
And other is outgoing queue which manages output data after the completion
of invoke such as inference. Regarding outgoing queue, a thread context
is created by async manager, and this thead context pops input data
from incoming queue, request an invoke with the input data and then
push the result to outgoing queue.
As such, one thread puts input data obtained from an input device into
an incoming queue, and the other thread retrieves input data from
the incoming queue, performs invoke, and puts the result of the execution
into an outgoing queue in parallel to utilize HW resource in maximum.
Signed-off-by: Inki Dae <inki.dae@samsung.com>
namespace singleo
{
+void SingleoRawData::setData(BaseDataType &input_data)
+{
+ auto raw_data = dynamic_cast<RawDataType &>(input_data);
+ _data = raw_data;
+}
+
BaseDataType &SingleoRawData::getData()
{
return _data;
FILE(GLOB INPUT_BACKEND_SOURCE_FILES "${PROJECT_SOURCE_DIR}/opencv/src/*.cpp")
ADD_LIBRARY(${PROJECT_NAME} SHARED ${INPUT_BACKEND_SOURCE_FILES})
-TARGET_INCLUDE_DIRECTORIES(${PROJECT_NAME} PRIVATE include ../../common/include ../../log/include opencv/include)
+TARGET_INCLUDE_DIRECTORIES(${PROJECT_NAME} PRIVATE include ../include ../../common/include ../../log/include opencv/include)
TARGET_LINK_LIBRARIES(${PROJECT_NAME} PRIVATE opencv_core opencv_imgcodecs opencv_highgui opencv_videoio singleo_log)
# Install the library
void *user_data) = 0;
virtual void configure() = 0;
virtual void capture(SingleoImageData &out_data) = 0;
+ virtual void streamOn() = 0;
+ virtual void streamOff() = 0;
};
} // input
#include "ICameraBackend.h"
#include "SingleoImageData.h"
+#include "InputTypes.h"
namespace singleo
{
private:
std::unique_ptr<cv::VideoCapture> _video_capture;
cv::Mat _captured_image;
- std::function<void(ISingleoCommonData &data, void *user_data)> _userCb;
+ InputServiceCallbackType _userCb;
void *_userData;
+ std::unique_ptr<std::thread> _thread_handle;
+ bool _exit_thread { false };
int findFirstAvailableCamera();
+ void threadLoop();
public:
OpencvBackend();
virtual ~OpencvBackend();
- void setUserCb(const std::function<void(ISingleoCommonData &data, void *user_data)> &userCb,
- void *user_data) override;
+ void setUserCb(const InputServiceCallbackType &userCb, void *user_data) override;
void configure() override;
void capture(SingleoImageData &out_data) override;
+ void streamOn() override;
+ void streamOff() override;
};
} // input
_userCb = userCb;
_userData = user_data;
- SINGLEO_LOGD("OpencvBackend::%s : user callback has been registered.", __func__);
+ SINGLEO_LOGD("OpencvBackend: user callback has been registered.");
}
void OpencvBackend::configure()
ImageDataType image_data;
- image_data._data_type = DataType::IMAGE;
image_data.width = _captured_image.cols;
image_data.height = _captured_image.rows;
image_data.byte_per_pixel = _captured_image.channels();
out_data.setData(image_data);
}
+void OpencvBackend::threadLoop()
+{
+ SINGLEO_LOGD("OpencvBackend: stream off.");
+
+ SingleoImageData data;
+
+ while (!_exit_thread) {
+ capture(data);
+
+ if (_userCb)
+ _userCb(data, _userData);
+ }
+}
+
+void OpencvBackend::streamOn()
+{
+ SINGLEO_LOGD("OpencvBackend: stream on.");
+
+ if (!_thread_handle)
+ _thread_handle = std::make_unique<std::thread>(&OpencvBackend::threadLoop, this);
+}
+
+void OpencvBackend::streamOff()
+{
+ _exit_thread = true;
+ _thread_handle->join();
+}
+
}
}
explicit CameraServiceDefault(CameraConfig &config);
virtual ~CameraServiceDefault();
- void setUserCb(const std::function<void(ISingleoCommonData &data, void *user_data)> &userCb,
- void *user_data) override;
+ void setUserCb(const InputServiceCallbackType &userCb, void *user_data) override;
void configure() override;
void capture(ISingleoCommonData &data) override;
+ void streamOn() override;
+ void streamOff() override;
};
} // input
explicit CameraServiceExternal(CameraConfig &config);
virtual ~CameraServiceExternal();
- void setUserCb(const std::function<void(ISingleoCommonData &data, void *user_data)> &userCb,
- void *user_data) override;
+ void setUserCb(const InputServiceCallbackType &userCb, void *user_data) override;
void configure() override;
void capture(ISingleoCommonData &data) override;
+ void streamOn() override;
+ void streamOff() override;
};
} // input
#define __ICAMERA_SERVICE_H__
#include "ISingleoCommonData.h"
+#include "InputTypes.h"
namespace singleo
{
public:
virtual ~ICameraService() {};
- virtual void setUserCb(const std::function<void(ISingleoCommonData &data, void *user_data)> &userCb,
- void *user_data) = 0;
+ virtual void setUserCb(const InputServiceCallbackType &userCb, void *user_data) = 0;
virtual void configure() = 0;
virtual void capture(ISingleoCommonData &data) = 0;
+ virtual void streamOn() = 0;
+ virtual void streamOff() = 0;
};
} // input
#include <functional>
#include "ISingleoCommonData.h"
+#include "InputTypes.h"
namespace singleo
{
public:
virtual ~IInputService() {};
- virtual void setUserCb(const std::function<void(ISingleoCommonData &data, void *user_data)> &userCb,
- void *user_data) = 0;
+ virtual void setUserCb(const InputServiceCallbackType &userCb, void *user_data) = 0;
virtual void configure() = 0;
virtual void capture(ISingleoCommonData &data) = 0;
+ virtual void streamOn() = 0;
+ virtual void streamOff() = 0;
};
} // input
explicit InputCamera(CameraConfig &config);
virtual ~InputCamera();
- void setUserCb(const std::function<void(ISingleoCommonData &data, void *user_data)> &userCb,
- void *user_data) override;
+ void setUserCb(const InputServiceCallbackType &userCb, void *user_data) override;
void configure() override;
- virtual void capture(ISingleoCommonData &data) override;
+ void capture(ISingleoCommonData &data) override;
+ void streamOn() override;
+ void streamOff() override;
};
} // input
#ifndef __SINGLEO_INPUT_TYPES_H__
#define __SINGLEO_INPUT_TYPES_H__
+#include <functional>
#include "SingleoCommonTypes.h"
+#include "ISingleoCommonData.h"
namespace singleo
{
bool async { false };
};
+using InputServiceCallbackType = std::function<void(ISingleoCommonData &data, void *user_data)>;
+
} // input
} // singleo
_camera->capture(dynamic_cast<SingleoImageData &>(data));
}
+void CameraServiceDefault::streamOn()
+{
+ _camera->streamOn();
+}
+
+void CameraServiceDefault::streamOff()
+{
+ _camera->streamOff();
+}
+
}
}
void CameraServiceExternal::capture(ISingleoCommonData &data)
{}
+void CameraServiceExternal::streamOn()
+{}
+
+void CameraServiceExternal::streamOff()
+{}
+
}
}
// TODO. check if image data captured by backend is valid or not such as pixel format.
}
+void InputCamera::streamOn()
+{
+ _input->streamOn();
+}
+
+void InputCamera::streamOff()
+{
+ _input->streamOff();
+}
+
}
}
#ifndef __AUTO_ZOOM_H__
#define __AUTO_ZOOM_H__
-#include <queue>
-#include <functional>
-
#include "IService.h"
#include "SingleoCommonTypes.h"
#include "IInferenceServiceInterface.h"
#include "SingleoImageData.h"
#include "IInputService.h"
#include "InputTypes.h"
+#include "AsyncManager.h"
namespace singleo
{
namespace services
{
-struct AutoZoomResult {
+struct AutoZoomResult : public BaseResultType {
+ AutoZoomResult() : BaseResultType(ServiceResultType::AUTO_ZOOM)
+ {}
+ unsigned int frame_number {};
size_t num_of_result {};
Rect rect;
};
{ "Y", AutoZoomResultType::Y },
{ "WIDTH", AutoZoomResultType::WIDTH },
{ "HEIGHT", AutoZoomResultType::HEIGHT } };
+ bool _async_mode { false };
bool isKeyValid(std::string key);
void updateResult();
+ std::unique_ptr<AsyncManager<ImageDataType, AutoZoomResult> > _async_manager;
+
public:
explicit AutoZoom(input::InputConfigBase &config);
- virtual ~AutoZoom() = default;
+ virtual ~AutoZoom();
static IService *create(input::InputConfigBase &config)
{
void performAsync() override;
void getResultCnt(unsigned int *cnt) override;
void getResultInt(unsigned int idx, std::string key, unsigned int *value) override;
+
+ std::unique_ptr<AsyncManager<ImageDataType, AutoZoomResult> > &getAsyncManager()
+ {
+ return _async_manager;
+ }
+
+ std::unique_ptr<singleo::inference::IInferenceServiceInterface> &getInferenceService()
+ {
+ return _inference_service;
+ }
};
} // service
AutoZoom::AutoZoom(InputConfigBase &config)
{
// In default, we will use InferenceServiceDefault class to use Mediavision framework
- // for inference service.
+ // for inference service. TODO. introduce meta config file approach later.
_inference_service = make_unique<InferenceServiceDefault>(TaskType::OBJECT_DETECTION);
// Create InputCamera service if input service type is CAMERA.
_inference_service->prepare();
}
+AutoZoom::~AutoZoom()
+{
+ if (_async_mode)
+ _input_service->streamOff();
+}
+
void AutoZoom::inputServiceCb(ISingleoCommonData &data, void *user_data)
{
auto auto_zoom = static_cast<AutoZoom *>(user_data);
+ ImagePreprocessor preprocessor(data.getData());
+ ImageDataType preprocessed = dynamic_cast<ImageDataType &>(preprocessor.getData());
+ ImageDataType copied = preprocessed;
+ size_t buffer_size = copied.width * copied.height * copied.byte_per_pixel;
+
+ // We have to avoid from grapping input feed stream so copy the captured data to new one.
+ // Ps. This allocated buffer should be released as soon as the completion of invoke in async manager's thread loop.
+ copied.ptr = new unsigned char[buffer_size];
+ memcpy(copied.ptr, preprocessed.ptr, buffer_size);
- SINGLEO_LOGD("AutoZoom::%s : user callback has been called.", __func__);
+ auto_zoom->getAsyncManager()->pushInput(copied);
}
bool AutoZoom::isKeyValid(std::string key)
void AutoZoom::performAsync()
{
- throw runtime_error("Not supported yet.");
+ if (!_input_service) {
+ SINGLEO_LOGE("This API is valid only the case that input feed service is used.");
+ throw runtime_error("Invalid API request.");
+ }
+
+ _input_service->streamOn();
+
+ _async_mode = true;
+ _async_manager = make_unique<AsyncManager<ImageDataType, AutoZoomResult> >();
+ _async_manager->registerInvokeCb(this, [this](IService *service, BaseDataType &data) {
+ SingleoImageData input_container;
+ auto auto_zoom = static_cast<AutoZoom *>(service);
+
+ input_container.setData(data);
+ auto_zoom->getInferenceService()->invoke(input_container);
+ auto_zoom->updateResult();
+
+ // This buffer was allocated and copied in inputServiceCb callback.
+ // So make sure to release this buffer here.
+ auto specific_data = dynamic_cast<ImageDataType &>(data);
+
+ delete specific_data.ptr;
+ });
}
void AutoZoom::updateResult()
// TODO. Temparary code for test.
_result.rect = result[0].rect;
_result.num_of_result = result.size();
+ _result.frame_number = frame_number;
+
+ if (_async_mode)
+ _async_manager->pushOutput(_result);
}
void AutoZoom::getResultCnt(unsigned int *cnt)
{
+ if (_async_mode)
+ _result = _async_manager->popOutput();
+
// TODO. Temparary code.
*cnt = static_cast<unsigned int>(_result.num_of_result);
}
--- /dev/null
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __ASYNC_MANAGER_H__
+#define __ASYNC_MANAGER_H__
+
+#include <queue>
+#include <thread>
+#include <mutex>
+#include <functional>
+#include <condition_variable>
+
+#include "IService.h"
+#include "SingleoCommonTypes.h"
+#include "ImagePreprocessor.h"
+
+namespace singleo
+{
+namespace services
+{
+template<typename In, typename Out> class AsyncManager
+{
+private:
+ std::queue<In> _incoming_queue;
+ std::queue<Out> _outgoing_queue;
+ std::mutex _incoming_queue_mutex;
+ std::mutex _outgoing_queue_mutex;
+ std::condition_variable _incoming_queue_event;
+ std::condition_variable _outgoing_queue_event;
+ std::unique_ptr<std::thread> _thread_handle;
+ bool _exit_thread { false };
+
+ using CallbackType = std::function<void(IService *service, BaseDataType &in_data)>;
+ CallbackType _serviceCb {};
+ IService *_serviceHandle {};
+
+ void waitForInputQueue()
+ {
+ std::unique_lock<std::mutex> lock(_incoming_queue_mutex);
+
+ _incoming_queue_event.wait(lock, [this] { return !this->_incoming_queue.empty(); });
+ }
+
+ void waitForOutputQueue()
+ {
+ std::unique_lock<std::mutex> lock(_outgoing_queue_mutex);
+
+ _outgoing_queue_event.wait(lock, [this] { return !this->_outgoing_queue.empty(); });
+ }
+
+ In popFromInput()
+ {
+ std::lock_guard<std::mutex> lock(_incoming_queue_mutex);
+ In &data = _incoming_queue.front();
+
+ _incoming_queue.pop();
+
+ return data;
+ }
+
+ Out popFromOutput()
+ {
+ std::lock_guard<std::mutex> lock(_outgoing_queue_mutex);
+ Out &output = _outgoing_queue.front();
+
+ _outgoing_queue.pop();
+
+ return output;
+ }
+
+ void invokeThread()
+ {
+ // This is a callback for internal thread created by this async manager.
+ // What to do in this callback is,
+ // 1. wait for input data in incoming queue, which will be pushed by platform specific input feed module in different thread context.
+ // 2. pop a input data from incoming queue if the incoming queue is not empty, which was already preprocessed.
+ // 3. request a invoke with this input data by calling the callback registered by a service who uses this async manager.
+ // what this cllback has to
+ // 3.1 request invoke with a given input data got by popFromInput().
+ // 3.2 update the result.
+ // 3.3 release input data - which was allocated in input service callback of each service - by calling _serviceCb
+ // which was registered by user who uses this async manager.
+
+ while (!_exit_thread) {
+ waitForInputQueue();
+
+ // Exit this thread loop if user called destroy API while in thread loop.
+ if (_exit_thread)
+ break;
+
+ In data = popFromInput();
+
+ _serviceCb(_serviceHandle, data);
+ }
+
+ // waitforOutputQueue function could wait for the notify event after while loop is exited.
+ // So make sure to call notify_one() here.
+ _outgoing_queue_event.notify_one();
+ }
+
+public:
+ AsyncManager()
+ {
+ _thread_handle = std::make_unique<std::thread>(&AsyncManager::invokeThread, this);
+ }
+
+ ~AsyncManager()
+ {
+ _exit_thread = true;
+ _thread_handle->join();
+ _thread_handle = nullptr;
+ }
+
+ void registerInvokeCb(IService *service_handle, const CallbackType &cb)
+ {
+ _serviceCb = cb;
+ _serviceHandle = service_handle;
+ }
+
+ // This function will be called by specific input service internally.
+ // Ps. caller has to provide captured data with concrete class object as data parameter.
+ void pushInput(In &in_data)
+ {
+ // This callback will be called by platform specific input feed module.
+ // The input feed module calls this callback with captured data after capturing data such as
+ // image, audio, sensor and other.
+ //
+ // What to do in this callback is,
+ // 1. copy data passed by the input feed module to new one.
+ // 2. preprocess the copied data.
+ // 3. push the preprocessed data to incoming queue.
+
+ std::lock_guard<std::mutex> lock(_incoming_queue_mutex);
+
+ // If input data exists in incoming queue then skip a new one.
+ // TODO. consider for multiple input data later.
+ if (!_incoming_queue.empty())
+ return;
+
+ _incoming_queue.push(in_data);
+ _incoming_queue_event.notify_one();
+ }
+
+ void pushOutput(Out &result)
+ {
+ std::lock_guard<std::mutex> lock(_outgoing_queue_mutex);
+
+ _outgoing_queue.push(result);
+ _outgoing_queue_event.notify_one();
+ }
+
+ Out popOutput()
+ {
+ waitForOutputQueue();
+ return popFromOutput();
+ }
+};
+
+}
+}
+
+#endif
\ No newline at end of file
~Context()
{}
- ServiceType _service_type;
+ ServiceType _service_type {};
+ bool _async_mode { false };
IService *_service_handle {};
};
{
// TODO
+enum class ServiceResultType { NONE, AUTO_ZOOM };
+
+struct BaseResultType {
+ ServiceResultType _result_type { ServiceResultType::NONE };
+ BaseResultType(ServiceResultType result_type) : _result_type(result_type)
+ {}
+ virtual ~BaseResultType()
+ {}
+};
+
}
}
return _input_feed_type;
}
+bool ServiceConfigParser::getAsyncMode()
+{
+ return _async_mode;
+}
+
void ServiceConfigParser::trim(string &str)
{
str.erase(0, str.find_first_not_of(" \n\r\t"));
{
IService *service_handle = NULL;
ServiceType service_type {};
+ bool async_mode { false };
Context *context = NULL;
try {
ServiceConfigParser parser(option);
+ async_mode = parser.getAsyncMode();
+
if (ServiceType::AUTO_ZOOM == parser.getServiceType()) {
service_handle = ServiceFactory::create("AutoZoom", parser.getConfig(parser.getInputFeedType()));
if (!service_handle)
context->_service_handle = service_handle;
context->_service_type = service_type;
+ context->_async_mode = async_mode;
*handle = static_cast<void *>(context);
{
try {
auto context = static_cast<Context *>(handle);
- context->_service_handle->perform();
+
+ if (!context->_async_mode)
+ context->_service_handle->perform();
+ else
+ context->_service_handle->performAsync();
} catch (const exception &e) {
return -1;
}
TARGET_COMPILE_DEFINITIONS(${PROJECT_NAME} PRIVATE -DTEST_RES_PATH="${TEST_RES_PATH}")
TARGET_INCLUDE_DIRECTORIES(${PROJECT_NAME} PRIVATE ../../capi/)
TARGET_LINK_LIBRARIES(${PROJECT_NAME}
- gtest gtest_main singleo_service)
+ gtest gtest_main pthread singleo_service)
install(TARGETS ${PROJECT_NAME} DESTINATION ${CMAKE_INSTALL_BINDIR})
\ No newline at end of file
#include <iostream>
#include <algorithm>
+#include <memory>
#include <string.h>
+#include <thread>
#include "gtest/gtest.h"
#include "singleo_native_capi.h"
ret = singleo_service_destroy(handle);
ASSERT_EQ(ret, 0);
-}
\ No newline at end of file
+}
+
+void autozoom_callback(void *user_data)
+{
+ singleo_service_h handle = static_cast<singleo_service_h>(user_data);
+ bool is_loop_exit = false;
+ unsigned long frame_number = 0;
+
+ while (!is_loop_exit) {
+ unsigned int cnt;
+
+ int ret = singleo_service_get_result_cnt(handle, &cnt);
+ if (ret != 0)
+ break;
+
+ ASSERT_EQ(ret, 0);
+
+ for (unsigned int idx = 0; idx < cnt; ++idx) {
+ unsigned int x, y, w, h;
+
+ ret = singleo_service_get_result_int(handle, idx, "x", &x);
+ ASSERT_EQ(ret, 0);
+ ret = singleo_service_get_result_int(handle, idx, "y", &y);
+ ASSERT_EQ(ret, 0);
+ ret = singleo_service_get_result_int(handle, idx, "width", &w);
+ ASSERT_EQ(ret, 0);
+ ret = singleo_service_get_result_int(handle, idx, "height", &h);
+ ASSERT_EQ(ret, 0);
+
+ cout << x << " x " << y << " ~ " << w << " x " << h << endl;
+ }
+
+ if (++frame_number > 50 && cnt > 0)
+ is_loop_exit = true;
+ }
+}
+
+TEST(AutoZoomAsyncTest, InferenceRequestWithCameraInputFeedShouldBeOk)
+{
+ singleo_service_h handle;
+
+ int ret =
+ singleo_service_create("service=auto_zoom, input=camera, camera_backend=opencv, fps=30, async=1", &handle);
+ ASSERT_EQ(ret, 0);
+
+ ret = singleo_service_perform(handle);
+ ASSERT_EQ(ret, 0);
+
+ unique_ptr<thread> thread_handle = make_unique<thread>(&autozoom_callback, static_cast<void *>(handle));
+
+ thread_handle->join();
+
+ ret = singleo_service_destroy(handle);
+ ASSERT_EQ(ret, 0);
+}