* @author MyungJoo Ham <myungjoo.ham@samsung.com>
* @author Parichay Kapoor <pk.kapoor@samsung.com>
* @bug No known bugs except for NYI items
- * @todo Complete the support for timeout
*/
#include <string.h>
+#include <pthread.h>
#include <nnstreamer-single.h>
#include <nnstreamer-capi-private.h>
*/
#define SINGLE_DEFAULT_TIMEOUT 3000
+/** Convert time in millisecond to timespec format */
+#define MSEC_TO_TIMESPEC(ts, msec) do { \
+ (ts).tv_sec = (msec) / 1000; \
+ (ts).tv_nsec = ((msec) % 1000) * 1000000; \
+} while (0)
+
/* ML single api data structure for handle */
typedef struct
{
- GTensorFilterSingle *filter;
- ml_tensors_info_s in_info;
- ml_tensors_info_s out_info;
+ GTensorFilterSingle *filter; /**< tensor filter element */
+ ml_tensors_info_s in_info; /**< info about input */
+ ml_tensors_info_s out_info; /**< info about output */
+
+ pthread_t thread; /**< thread for invoking */
+ pthread_mutex_t mutex; /**< mutex for synchronization */
+ pthread_cond_t cond; /**< condition for synchronization */
+ ml_tensors_data_h input; /**< input received from user */
+ ml_tensors_data_h * output; /**< output to be sent back to user */
+ struct timespec timeout; /**< timeout for invoking */
+ gboolean data_ready; /**< data is ready to be processed */
+ gboolean join; /**< thread should be joined */
+ int status; /**< status of processing */
} ml_single;
/**
+ * @brief thread to execute calls to invoke
+ */
+static void *
+invoke_thread (void * arg)
+{
+ ml_single *single_h;
+ GTensorFilterSingleClass *klass;
+ GstTensorMemory in_tensors[NNS_TENSOR_SIZE_LIMIT];
+ GstTensorMemory out_tensors[NNS_TENSOR_SIZE_LIMIT];
+ ml_tensors_data_s *in_data, *out_data;
+ int i, status = ML_ERROR_NONE;
+
+ single_h = (ml_single *) arg;
+
+ pthread_mutex_lock (&single_h->mutex);
+
+ /** get the tensor_filter element */
+ klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE);
+ if (!klass) {
+ single_h->join = TRUE;
+ goto exit;
+ }
+
+ while (single_h->join != TRUE) {
+
+ /** wait for data */
+ while (single_h->data_ready != TRUE) {
+ pthread_cond_wait (&single_h->cond, &single_h->mutex);
+ if (single_h->join == TRUE)
+ goto exit;
+ }
+
+ in_data = (ml_tensors_data_s *) single_h->input;
+
+ /** Setup input buffer */
+ for (i = 0; i < in_data->num_tensors; i++) {
+ in_tensors[i].data = in_data->tensors[i].tensor;
+ in_tensors[i].size = in_data->tensors[i].size;
+ in_tensors[i].type = single_h->in_info.info[i].type;
+ }
+
+ /** Setup output buffer */
+ for (i = 0; i < single_h->out_info.num_tensors; i++) {
+ /** memory will be allocated by tensor_filter_single */
+ out_tensors[i].data = NULL;
+ out_tensors[i].size = ml_tensor_info_get_size (&single_h->out_info.info[i]);
+ out_tensors[i].type = single_h->out_info.info[i].type;
+ }
+ pthread_mutex_unlock (&single_h->mutex);
+
+ /** invoke the thread */
+ if (klass->invoke (single_h->filter, in_tensors, out_tensors) == FALSE) {
+ status = ML_ERROR_INVALID_PARAMETER;
+ pthread_mutex_lock (&single_h->mutex);
+ goto wait_for_next;
+ }
+
+ pthread_mutex_lock (&single_h->mutex);
+ /** Allocate output buffer */
+ if (single_h->data_ready == TRUE) {
+ status = ml_tensors_data_create_no_alloc (&single_h->out_info,
+ single_h->output);
+ if (status != ML_ERROR_NONE) {
+ ml_loge ("Failed to allocate the memory block.");
+ (*single_h->output) = NULL;
+ goto wait_for_next;
+ }
+
+ /** set the result */
+ out_data = (ml_tensors_data_s *) (*single_h->output);
+ for (i = 0; i < single_h->out_info.num_tensors; i++) {
+ out_data->tensors[i].tensor = out_tensors[i].data;
+ }
+ }
+
+ /** loop over to wait for the next element */
+wait_for_next:
+ single_h->status = status;
+ single_h->data_ready = FALSE;
+ pthread_cond_broadcast (&single_h->cond);
+ }
+
+exit:
+ single_h->data_ready = FALSE;
+ pthread_cond_broadcast (&single_h->cond);
+ pthread_mutex_unlock (&single_h->mutex);
+ return NULL;
+}
+
+/**
* @brief Set the info for input/output tensors
*/
static int
single_h = g_new0 (ml_single, 1);
g_assert (single_h);
single_h->filter = g_object_new (G_TYPE_TENSOR_FILTER_SINGLE, NULL);
+ MSEC_TO_TIMESPEC (single_h->timeout, SINGLE_DEFAULT_TIMEOUT);
if (single_h->filter == NULL) {
status = ML_ERROR_INVALID_PARAMETER;
goto error;
}
}
+ pthread_mutex_init (&single_h->mutex, NULL);
+ pthread_cond_init (&single_h->cond, NULL);
+ single_h->data_ready = FALSE;
+ single_h->join = FALSE;
+
+ if (pthread_create (&single_h->thread, NULL, invoke_thread, (void *)single_h) < 0) {
+ ml_loge ("Failed to create the invoke thread.");
+ status = ML_ERROR_UNKNOWN;
+ goto error;
+ }
+
*single = single_h;
return ML_ERROR_NONE;
single_h = (ml_single *) single;
+ pthread_mutex_lock (&single_h->mutex);
+ single_h->join = TRUE;
+ pthread_cond_broadcast (&single_h->cond);
+ pthread_mutex_unlock (&single_h->mutex);
+ pthread_join (single_h->thread, NULL);
+
if (single_h->filter) {
GTensorFilterSingleClass *klass;
klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE);
const ml_tensors_data_h input, ml_tensors_data_h * output)
{
ml_single *single_h;
- ml_tensors_data_s *in_data, *result;
- GstTensorMemory in_tensors[NNS_TENSOR_SIZE_LIMIT];
- GstTensorMemory out_tensors[NNS_TENSOR_SIZE_LIMIT];
- GTensorFilterSingleClass *klass;
+ ml_tensors_data_s *in_data;
+ struct timespec ts;
int i, status = ML_ERROR_NONE;
+ int err;
check_feature_state ();
in_data = (ml_tensors_data_s *) input;
*output = NULL;
- if (!single_h->filter) {
+ if (!single_h->filter || single_h->join) {
ml_loge ("The given param is invalid, model is missing.");
return ML_ERROR_INVALID_PARAMETER;
}
}
}
- /** Setup input buffer */
- for (i = 0; i < in_data->num_tensors; i++) {
- in_tensors[i].data = in_data->tensors[i].tensor;
- in_tensors[i].size = in_data->tensors[i].size;
- in_tensors[i].type = single_h->in_info.info[i].type;
+ pthread_mutex_lock (&single_h->mutex);
+ if (single_h->data_ready == TRUE) {
+ status = ML_ERROR_TRY_AGAIN;
+ goto exit;
}
- /** Setup output buffer */
- for (i = 0; i < single_h->out_info.num_tensors; i++) {
- /** memory will be allocated by tensor_filter_single */
- out_tensors[i].data = NULL;
- out_tensors[i].size = ml_tensor_info_get_size (&single_h->out_info.info[i]);
- out_tensors[i].type = single_h->out_info.info[i].type;
- }
+ single_h->input = input;
+ single_h->output = output;
+ single_h->data_ready = TRUE;
- klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE);
- if (!klass)
- return ML_ERROR_PERMISSION_DENIED;
+ clock_gettime (CLOCK_REALTIME, &ts);
+ ts.tv_nsec += single_h->timeout.tv_nsec;
+ ts.tv_sec += single_h->timeout.tv_sec;
- /** TODO: create a new thread, which will invoke and wait with a timeout */
- if (klass->invoke (single_h->filter, in_tensors, out_tensors) == FALSE)
- return ML_ERROR_INVALID_PARAMETER;
+ pthread_cond_broadcast (&single_h->cond);
+ err = pthread_cond_timedwait (&single_h->cond, &single_h->mutex, &ts);
- /* Allocate output buffer */
- status = ml_tensors_data_create_no_alloc (&single_h->out_info, output);
- if (status != ML_ERROR_NONE) {
- ml_loge ("Failed to allocate the memory block.");
- *output = NULL;
- return status;
- }
-
- result = (ml_tensors_data_s *) (*output);
-
- /* set the result */
- for (i = 0; i < single_h->out_info.num_tensors; i++) {
- result->tensors[i].tensor = out_tensors[i].data;
+ if (err == 0)
+ status = single_h->status;
+ else if (err == ETIMEDOUT) {
+ status = ML_ERROR_TIMED_OUT;
+ /** This is set to notify invoke_thread to not process if timedout */
+ single_h->data_ready = FALSE;
}
+ else if (err == EPERM)
+ status = ML_ERROR_PERMISSION_DENIED;
+ else
+ status = ML_ERROR_INVALID_PARAMETER;
- return ML_ERROR_NONE;
+exit:
+ pthread_mutex_unlock (&single_h->mutex);
+ return status;
}
/**
int
ml_single_set_timeout (ml_single_h single, unsigned int timeout)
{
- return ML_ERROR_NOT_SUPPORTED;
+ ml_single *single_h;
+
+ check_feature_state ();
+
+ if (!single || timeout == 0)
+ return ML_ERROR_INVALID_PARAMETER;
+
+ single_h = (ml_single *) single;
+
+ MSEC_TO_TIMESPEC (single_h->timeout, timeout);
+ return ML_ERROR_NONE;
}