From 31b768cdcbd6087d6d87cdbdcf42bb7b8fac643e Mon Sep 17 00:00:00 2001 From: Parichay Kapoor Date: Thu, 5 Sep 2019 14:39:05 +0900 Subject: [PATCH] [single-shot] Added support for timeout in single-shot Added support for timeout in new single-shot API implementation to comply with the existing single-shot API Signed-off-by: Parichay Kapoor --- api/capi/src/nnstreamer-capi-single-new.c | 210 ++++++++++++++++++++++++------ 1 file changed, 168 insertions(+), 42 deletions(-) diff --git a/api/capi/src/nnstreamer-capi-single-new.c b/api/capi/src/nnstreamer-capi-single-new.c index 34d4ba8..2251496 100644 --- a/api/capi/src/nnstreamer-capi-single-new.c +++ b/api/capi/src/nnstreamer-capi-single-new.c @@ -20,10 +20,10 @@ * @author MyungJoo Ham * @author Parichay Kapoor * @bug No known bugs except for NYI items - * @todo Complete the support for timeout */ #include +#include #include #include @@ -38,15 +38,121 @@ */ #define SINGLE_DEFAULT_TIMEOUT 3000 +/** Convert time in millisecond to timespec format */ +#define MSEC_TO_TIMESPEC(ts, msec) do { \ + (ts).tv_sec = (msec) / 1000; \ + (ts).tv_nsec = ((msec) % 1000) * 1000000; \ +} while (0) + /* ML single api data structure for handle */ typedef struct { - GTensorFilterSingle *filter; - ml_tensors_info_s in_info; - ml_tensors_info_s out_info; + GTensorFilterSingle *filter; /**< tensor filter element */ + ml_tensors_info_s in_info; /**< info about input */ + ml_tensors_info_s out_info; /**< info about output */ + + pthread_t thread; /**< thread for invoking */ + pthread_mutex_t mutex; /**< mutex for synchronization */ + pthread_cond_t cond; /**< condition for synchronization */ + ml_tensors_data_h input; /**< input received from user */ + ml_tensors_data_h * output; /**< output to be sent back to user */ + struct timespec timeout; /**< timeout for invoking */ + gboolean data_ready; /**< data is ready to be processed */ + gboolean join; /**< thread should be joined */ + int status; /**< status of processing */ } ml_single; /** + * @brief thread to execute calls to invoke + */ +static void * +invoke_thread (void * arg) +{ + ml_single *single_h; + GTensorFilterSingleClass *klass; + GstTensorMemory in_tensors[NNS_TENSOR_SIZE_LIMIT]; + GstTensorMemory out_tensors[NNS_TENSOR_SIZE_LIMIT]; + ml_tensors_data_s *in_data, *out_data; + int i, status = ML_ERROR_NONE; + + single_h = (ml_single *) arg; + + pthread_mutex_lock (&single_h->mutex); + + /** get the tensor_filter element */ + klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE); + if (!klass) { + single_h->join = TRUE; + goto exit; + } + + while (single_h->join != TRUE) { + + /** wait for data */ + while (single_h->data_ready != TRUE) { + pthread_cond_wait (&single_h->cond, &single_h->mutex); + if (single_h->join == TRUE) + goto exit; + } + + in_data = (ml_tensors_data_s *) single_h->input; + + /** Setup input buffer */ + for (i = 0; i < in_data->num_tensors; i++) { + in_tensors[i].data = in_data->tensors[i].tensor; + in_tensors[i].size = in_data->tensors[i].size; + in_tensors[i].type = single_h->in_info.info[i].type; + } + + /** Setup output buffer */ + for (i = 0; i < single_h->out_info.num_tensors; i++) { + /** memory will be allocated by tensor_filter_single */ + out_tensors[i].data = NULL; + out_tensors[i].size = ml_tensor_info_get_size (&single_h->out_info.info[i]); + out_tensors[i].type = single_h->out_info.info[i].type; + } + pthread_mutex_unlock (&single_h->mutex); + + /** invoke the thread */ + if (klass->invoke (single_h->filter, in_tensors, out_tensors) == FALSE) { + status = ML_ERROR_INVALID_PARAMETER; + pthread_mutex_lock (&single_h->mutex); + goto wait_for_next; + } + + pthread_mutex_lock (&single_h->mutex); + /** Allocate output buffer */ + if (single_h->data_ready == TRUE) { + status = ml_tensors_data_create_no_alloc (&single_h->out_info, + single_h->output); + if (status != ML_ERROR_NONE) { + ml_loge ("Failed to allocate the memory block."); + (*single_h->output) = NULL; + goto wait_for_next; + } + + /** set the result */ + out_data = (ml_tensors_data_s *) (*single_h->output); + for (i = 0; i < single_h->out_info.num_tensors; i++) { + out_data->tensors[i].tensor = out_tensors[i].data; + } + } + + /** loop over to wait for the next element */ +wait_for_next: + single_h->status = status; + single_h->data_ready = FALSE; + pthread_cond_broadcast (&single_h->cond); + } + +exit: + single_h->data_ready = FALSE; + pthread_cond_broadcast (&single_h->cond); + pthread_mutex_unlock (&single_h->mutex); + return NULL; +} + +/** * @brief Set the info for input/output tensors */ static int @@ -227,6 +333,7 @@ ml_single_open (ml_single_h * single, const char *model, single_h = g_new0 (ml_single, 1); g_assert (single_h); single_h->filter = g_object_new (G_TYPE_TENSOR_FILTER_SINGLE, NULL); + MSEC_TO_TIMESPEC (single_h->timeout, SINGLE_DEFAULT_TIMEOUT); if (single_h->filter == NULL) { status = ML_ERROR_INVALID_PARAMETER; goto error; @@ -359,6 +466,17 @@ ml_single_open (ml_single_h * single, const char *model, } } + pthread_mutex_init (&single_h->mutex, NULL); + pthread_cond_init (&single_h->cond, NULL); + single_h->data_ready = FALSE; + single_h->join = FALSE; + + if (pthread_create (&single_h->thread, NULL, invoke_thread, (void *)single_h) < 0) { + ml_loge ("Failed to create the invoke thread."); + status = ML_ERROR_UNKNOWN; + goto error; + } + *single = single_h; return ML_ERROR_NONE; @@ -384,6 +502,12 @@ ml_single_close (ml_single_h single) single_h = (ml_single *) single; + pthread_mutex_lock (&single_h->mutex); + single_h->join = TRUE; + pthread_cond_broadcast (&single_h->cond); + pthread_mutex_unlock (&single_h->mutex); + pthread_join (single_h->thread, NULL); + if (single_h->filter) { GTensorFilterSingleClass *klass; klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE); @@ -408,11 +532,10 @@ ml_single_invoke (ml_single_h single, const ml_tensors_data_h input, ml_tensors_data_h * output) { ml_single *single_h; - ml_tensors_data_s *in_data, *result; - GstTensorMemory in_tensors[NNS_TENSOR_SIZE_LIMIT]; - GstTensorMemory out_tensors[NNS_TENSOR_SIZE_LIMIT]; - GTensorFilterSingleClass *klass; + ml_tensors_data_s *in_data; + struct timespec ts; int i, status = ML_ERROR_NONE; + int err; check_feature_state (); @@ -425,7 +548,7 @@ ml_single_invoke (ml_single_h single, in_data = (ml_tensors_data_s *) input; *output = NULL; - if (!single_h->filter) { + if (!single_h->filter || single_h->join) { ml_loge ("The given param is invalid, model is missing."); return ML_ERROR_INVALID_PARAMETER; } @@ -447,45 +570,38 @@ ml_single_invoke (ml_single_h single, } } - /** Setup input buffer */ - for (i = 0; i < in_data->num_tensors; i++) { - in_tensors[i].data = in_data->tensors[i].tensor; - in_tensors[i].size = in_data->tensors[i].size; - in_tensors[i].type = single_h->in_info.info[i].type; + pthread_mutex_lock (&single_h->mutex); + if (single_h->data_ready == TRUE) { + status = ML_ERROR_TRY_AGAIN; + goto exit; } - /** Setup output buffer */ - for (i = 0; i < single_h->out_info.num_tensors; i++) { - /** memory will be allocated by tensor_filter_single */ - out_tensors[i].data = NULL; - out_tensors[i].size = ml_tensor_info_get_size (&single_h->out_info.info[i]); - out_tensors[i].type = single_h->out_info.info[i].type; - } + single_h->input = input; + single_h->output = output; + single_h->data_ready = TRUE; - klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE); - if (!klass) - return ML_ERROR_PERMISSION_DENIED; + clock_gettime (CLOCK_REALTIME, &ts); + ts.tv_nsec += single_h->timeout.tv_nsec; + ts.tv_sec += single_h->timeout.tv_sec; - /** TODO: create a new thread, which will invoke and wait with a timeout */ - if (klass->invoke (single_h->filter, in_tensors, out_tensors) == FALSE) - return ML_ERROR_INVALID_PARAMETER; + pthread_cond_broadcast (&single_h->cond); + err = pthread_cond_timedwait (&single_h->cond, &single_h->mutex, &ts); - /* Allocate output buffer */ - status = ml_tensors_data_create_no_alloc (&single_h->out_info, output); - if (status != ML_ERROR_NONE) { - ml_loge ("Failed to allocate the memory block."); - *output = NULL; - return status; - } - - result = (ml_tensors_data_s *) (*output); - - /* set the result */ - for (i = 0; i < single_h->out_info.num_tensors; i++) { - result->tensors[i].tensor = out_tensors[i].data; + if (err == 0) + status = single_h->status; + else if (err == ETIMEDOUT) { + status = ML_ERROR_TIMED_OUT; + /** This is set to notify invoke_thread to not process if timedout */ + single_h->data_ready = FALSE; } + else if (err == EPERM) + status = ML_ERROR_PERMISSION_DENIED; + else + status = ML_ERROR_INVALID_PARAMETER; - return ML_ERROR_NONE; +exit: + pthread_mutex_unlock (&single_h->mutex); + return status; } /** @@ -602,5 +718,15 @@ ml_single_get_output_info (ml_single_h single, ml_tensors_info_h * info) int ml_single_set_timeout (ml_single_h single, unsigned int timeout) { - return ML_ERROR_NOT_SUPPORTED; + ml_single *single_h; + + check_feature_state (); + + if (!single || timeout == 0) + return ML_ERROR_INVALID_PARAMETER; + + single_h = (ml_single *) single; + + MSEC_TO_TIMESPEC (single_h->timeout, timeout); + return ML_ERROR_NONE; } -- 2.7.4