[single-shot] Added support for timeout in single-shot
authorParichay Kapoor <pk.kapoor@samsung.com>
Thu, 5 Sep 2019 05:39:05 +0000 (14:39 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Mon, 16 Sep 2019 04:44:33 +0000 (13:44 +0900)
Added support for timeout in new single-shot API implementation to
comply with the existing single-shot API

Signed-off-by: Parichay Kapoor <pk.kapoor@samsung.com>
api/capi/src/nnstreamer-capi-single-new.c

index 34d4ba8..2251496 100644 (file)
  * @author MyungJoo Ham <myungjoo.ham@samsung.com>
  * @author Parichay Kapoor <pk.kapoor@samsung.com>
  * @bug No known bugs except for NYI items
- * @todo Complete the support for timeout
  */
 
 #include <string.h>
+#include <pthread.h>
 
 #include <nnstreamer-single.h>
 #include <nnstreamer-capi-private.h>
  */
 #define SINGLE_DEFAULT_TIMEOUT 3000
 
+/** Convert time in millisecond to timespec format */
+#define MSEC_TO_TIMESPEC(ts, msec) do { \
+  (ts).tv_sec = (msec) / 1000; \
+  (ts).tv_nsec = ((msec) % 1000) * 1000000; \
+} while (0)
+
 /* ML single api data structure for handle */
 typedef struct
 {
-  GTensorFilterSingle *filter;
-  ml_tensors_info_s in_info;
-  ml_tensors_info_s out_info;
+  GTensorFilterSingle *filter;  /**< tensor filter element */
+  ml_tensors_info_s in_info;    /**< info about input */
+  ml_tensors_info_s out_info;   /**< info about output */
+
+  pthread_t thread;             /**< thread for invoking */
+  pthread_mutex_t mutex;        /**< mutex for synchronization */
+  pthread_cond_t cond;          /**< condition for synchronization */
+  ml_tensors_data_h input;      /**< input received from user */
+  ml_tensors_data_h * output;   /**< output to be sent back to user */
+  struct timespec timeout;      /**< timeout for invoking */
+  gboolean data_ready;          /**< data is ready to be processed */
+  gboolean join;                /**< thread should be joined */
+  int status;                   /**< status of processing */
 } ml_single;
 
 /**
+ * @brief thread to execute calls to invoke
+ */
+static void *
+invoke_thread (void * arg)
+{
+  ml_single *single_h;
+  GTensorFilterSingleClass *klass;
+  GstTensorMemory in_tensors[NNS_TENSOR_SIZE_LIMIT];
+  GstTensorMemory out_tensors[NNS_TENSOR_SIZE_LIMIT];
+  ml_tensors_data_s *in_data, *out_data;
+  int i, status = ML_ERROR_NONE;
+
+  single_h = (ml_single *) arg;
+
+  pthread_mutex_lock (&single_h->mutex);
+
+  /** get the tensor_filter element */
+  klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE);
+  if (!klass) {
+    single_h->join = TRUE;
+    goto exit;
+  }
+
+  while (single_h->join != TRUE) {
+
+    /** wait for data */
+    while (single_h->data_ready != TRUE) {
+      pthread_cond_wait (&single_h->cond, &single_h->mutex);
+      if (single_h->join == TRUE)
+        goto exit;
+    }
+
+    in_data = (ml_tensors_data_s *) single_h->input;
+
+    /** Setup input buffer */
+    for (i = 0; i < in_data->num_tensors; i++) {
+      in_tensors[i].data = in_data->tensors[i].tensor;
+      in_tensors[i].size = in_data->tensors[i].size;
+      in_tensors[i].type = single_h->in_info.info[i].type;
+    }
+
+    /** Setup output buffer */
+    for (i = 0; i < single_h->out_info.num_tensors; i++) {
+      /** memory will be allocated by tensor_filter_single */
+      out_tensors[i].data = NULL;
+      out_tensors[i].size = ml_tensor_info_get_size (&single_h->out_info.info[i]);
+      out_tensors[i].type = single_h->out_info.info[i].type;
+    }
+    pthread_mutex_unlock (&single_h->mutex);
+
+    /** invoke the thread */
+    if (klass->invoke (single_h->filter, in_tensors, out_tensors) == FALSE) {
+      status = ML_ERROR_INVALID_PARAMETER;
+      pthread_mutex_lock (&single_h->mutex);
+      goto wait_for_next;
+    }
+
+    pthread_mutex_lock (&single_h->mutex);
+    /** Allocate output buffer */
+    if (single_h->data_ready == TRUE) {
+      status = ml_tensors_data_create_no_alloc (&single_h->out_info,
+          single_h->output);
+      if (status != ML_ERROR_NONE) {
+        ml_loge ("Failed to allocate the memory block.");
+        (*single_h->output) = NULL;
+        goto wait_for_next;
+      }
+
+      /** set the result */
+      out_data = (ml_tensors_data_s *) (*single_h->output);
+      for (i = 0; i < single_h->out_info.num_tensors; i++) {
+        out_data->tensors[i].tensor = out_tensors[i].data;
+      }
+    }
+
+    /** loop over to wait for the next element */
+wait_for_next:
+    single_h->status = status;
+    single_h->data_ready = FALSE;
+    pthread_cond_broadcast (&single_h->cond);
+  }
+
+exit:
+  single_h->data_ready = FALSE;
+  pthread_cond_broadcast (&single_h->cond);
+  pthread_mutex_unlock (&single_h->mutex);
+  return NULL;
+}
+
+/**
  * @brief Set the info for input/output tensors
  */
 static int
@@ -227,6 +333,7 @@ ml_single_open (ml_single_h * single, const char *model,
   single_h = g_new0 (ml_single, 1);
   g_assert (single_h);
   single_h->filter = g_object_new (G_TYPE_TENSOR_FILTER_SINGLE, NULL);
+  MSEC_TO_TIMESPEC (single_h->timeout, SINGLE_DEFAULT_TIMEOUT);
   if (single_h->filter == NULL) {
     status = ML_ERROR_INVALID_PARAMETER;
     goto error;
@@ -359,6 +466,17 @@ ml_single_open (ml_single_h * single, const char *model,
     }
   }
 
+  pthread_mutex_init (&single_h->mutex, NULL);
+  pthread_cond_init (&single_h->cond, NULL);
+  single_h->data_ready = FALSE;
+  single_h->join = FALSE;
+
+  if (pthread_create (&single_h->thread, NULL, invoke_thread, (void *)single_h) < 0) {
+    ml_loge ("Failed to create the invoke thread.");
+    status = ML_ERROR_UNKNOWN;
+    goto error;
+  }
+
   *single = single_h;
   return ML_ERROR_NONE;
 
@@ -384,6 +502,12 @@ ml_single_close (ml_single_h single)
 
   single_h = (ml_single *) single;
 
+  pthread_mutex_lock (&single_h->mutex);
+  single_h->join = TRUE;
+  pthread_cond_broadcast (&single_h->cond);
+  pthread_mutex_unlock (&single_h->mutex);
+  pthread_join (single_h->thread, NULL);
+
   if (single_h->filter) {
     GTensorFilterSingleClass *klass;
     klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE);
@@ -408,11 +532,10 @@ ml_single_invoke (ml_single_h single,
     const ml_tensors_data_h input, ml_tensors_data_h * output)
 {
   ml_single *single_h;
-  ml_tensors_data_s *in_data, *result;
-  GstTensorMemory in_tensors[NNS_TENSOR_SIZE_LIMIT];
-  GstTensorMemory out_tensors[NNS_TENSOR_SIZE_LIMIT];
-  GTensorFilterSingleClass *klass;
+  ml_tensors_data_s *in_data;
+  struct timespec ts;
   int i, status = ML_ERROR_NONE;
+  int err;
 
   check_feature_state ();
 
@@ -425,7 +548,7 @@ ml_single_invoke (ml_single_h single,
   in_data = (ml_tensors_data_s *) input;
   *output = NULL;
 
-  if (!single_h->filter) {
+  if (!single_h->filter || single_h->join) {
     ml_loge ("The given param is invalid, model is missing.");
     return ML_ERROR_INVALID_PARAMETER;
   }
@@ -447,45 +570,38 @@ ml_single_invoke (ml_single_h single,
     }
   }
 
-  /** Setup input buffer */
-  for (i = 0; i < in_data->num_tensors; i++) {
-    in_tensors[i].data = in_data->tensors[i].tensor;
-    in_tensors[i].size = in_data->tensors[i].size;
-    in_tensors[i].type = single_h->in_info.info[i].type;
+  pthread_mutex_lock (&single_h->mutex);
+  if (single_h->data_ready == TRUE) {
+    status = ML_ERROR_TRY_AGAIN;
+    goto exit;
   }
 
-  /** Setup output buffer */
-  for (i = 0; i < single_h->out_info.num_tensors; i++) {
-    /** memory will be allocated by tensor_filter_single */
-    out_tensors[i].data = NULL;
-    out_tensors[i].size = ml_tensor_info_get_size (&single_h->out_info.info[i]);
-    out_tensors[i].type = single_h->out_info.info[i].type;
-  }
+  single_h->input = input;
+  single_h->output = output;
+  single_h->data_ready = TRUE;
 
-  klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE);
-  if (!klass)
-    return ML_ERROR_PERMISSION_DENIED;
+  clock_gettime (CLOCK_REALTIME, &ts);
+  ts.tv_nsec += single_h->timeout.tv_nsec;
+  ts.tv_sec += single_h->timeout.tv_sec;
 
-  /** TODO: create a new thread, which will invoke and wait with a timeout */
-  if (klass->invoke (single_h->filter, in_tensors, out_tensors) == FALSE)
-    return ML_ERROR_INVALID_PARAMETER;
+  pthread_cond_broadcast (&single_h->cond);
+  err = pthread_cond_timedwait (&single_h->cond, &single_h->mutex, &ts);
 
-  /* Allocate output buffer */
-  status = ml_tensors_data_create_no_alloc (&single_h->out_info, output);
-  if (status != ML_ERROR_NONE) {
-    ml_loge ("Failed to allocate the memory block.");
-    *output = NULL;
-    return status;
-  }
-
-  result = (ml_tensors_data_s *) (*output);
-
-  /* set the result */
-  for (i = 0; i < single_h->out_info.num_tensors; i++) {
-    result->tensors[i].tensor = out_tensors[i].data;
+  if (err == 0)
+    status = single_h->status;
+  else if (err == ETIMEDOUT) {
+    status = ML_ERROR_TIMED_OUT;
+    /** This is set to notify invoke_thread to not process if timedout */
+    single_h->data_ready = FALSE;
   }
+  else if (err == EPERM)
+    status = ML_ERROR_PERMISSION_DENIED;
+  else
+    status = ML_ERROR_INVALID_PARAMETER;
 
-  return ML_ERROR_NONE;
+exit:
+  pthread_mutex_unlock (&single_h->mutex);
+  return status;
 }
 
 /**
@@ -602,5 +718,15 @@ ml_single_get_output_info (ml_single_h single, ml_tensors_info_h * info)
 int
 ml_single_set_timeout (ml_single_h single, unsigned int timeout)
 {
-  return ML_ERROR_NOT_SUPPORTED;
+  ml_single *single_h;
+
+  check_feature_state ();
+
+  if (!single || timeout == 0)
+    return ML_ERROR_INVALID_PARAMETER;
+
+  single_h = (ml_single *) single;
+
+  MSEC_TO_TIMESPEC (single_h->timeout, timeout);
+  return ML_ERROR_NONE;
 }