*/
#define ML_SINGLE_HANDLE_UNLOCK(single_h) g_mutex_unlock (&single_h->mutex);
-/* ML single api data structure for handle */
+/** States for invoke thread */
+typedef enum {
+ IDLE = 0, /**< ready to accept next input */
+ RUNNING, /**< running an input, cannot accept more input */
+ JOIN_REQUESTED, /**< should join the thread, will exit soon */
+ ERROR /**< error on thread, will exit soon */
+} thread_state;
+
+/** ML single api data structure for handle */
typedef struct
{
GTensorFilterSingle *filter; /**< tensor filter element */
ml_tensors_data_h input; /**< input received from user */
ml_tensors_data_h *output; /**< output to be sent back to user */
guint timeout; /**< timeout for invoking */
- gboolean data_ready; /**< data is ready to be processed */
- gboolean join; /**< thread should be joined */
+ thread_state state; /**< current state of the thread */
+ gboolean ignore_output; /**< ignore and free the output */
int status; /**< status of processing */
} ml_single;
/** get the tensor_filter element */
klass = g_type_class_peek (G_TYPE_TENSOR_FILTER_SINGLE);
if (!klass) {
- single_h->join = TRUE;
+ single_h->state = ERROR;
goto exit;
}
- while (single_h->join != TRUE) {
+ while (single_h->state <= RUNNING) {
/** wait for data */
- while (single_h->data_ready != TRUE) {
+ while (single_h->state != RUNNING) {
g_cond_wait (&single_h->cond, &single_h->mutex);
- if (single_h->join == TRUE)
+ if (single_h->state >= JOIN_REQUESTED)
goto exit;
}
g_mutex_lock (&single_h->mutex);
/** Allocate output buffer */
- if (single_h->data_ready == TRUE) {
+ if (single_h->ignore_output == FALSE) {
status = ml_tensors_data_create_no_alloc (&single_h->out_info,
single_h->output);
if (status != ML_ERROR_NONE) {
for (i = 0; i < single_h->out_info.num_tensors; i++) {
out_data->tensors[i].tensor = out_tensors[i].data;
}
+ } else {
+ /**
+ * Caller of the invoke thread has returned back with timeout
+ * so, free the memory allocated by the invoke as their is no receiver
+ */
+ for (i = 0; i < single_h->out_info.num_tensors; i++)
+ g_free (out_tensors[i].data);
}
/** loop over to wait for the next element */
wait_for_next:
single_h->status = status;
- single_h->data_ready = FALSE;
+ if (single_h->state == RUNNING)
+ single_h->state = IDLE;
g_cond_broadcast (&single_h->cond);
}
exit:
- single_h->data_ready = FALSE;
+ if (single_h->state != ERROR)
+ single_h->state = IDLE;
g_mutex_unlock (&single_h->mutex);
return NULL;
}
g_mutex_init (&single_h->mutex);
g_cond_init (&single_h->cond);
- single_h->data_ready = FALSE;
- single_h->join = FALSE;
+ single_h->state = IDLE;
+ single_h->ignore_output = FALSE;
single_h->thread = g_thread_try_new (NULL, invoke_thread, (gpointer) single_h,
&error);
ML_SINGLE_GET_VALID_HANDLE_LOCKED (single_h, single, 1);
- single_h->join = TRUE;
+ single_h->state = JOIN_REQUESTED;
g_cond_broadcast (&single_h->cond);
ML_SINGLE_HANDLE_UNLOCK (single_h);
in_data = (ml_tensors_data_s *) input;
*output = NULL;
- if (!single_h->filter || single_h->join) {
+ if (!single_h->filter || single_h->state >= JOIN_REQUESTED) {
ml_loge ("The given param is invalid, model is missing.");
status = ML_ERROR_INVALID_PARAMETER;
goto exit;
}
}
- if (single_h->data_ready == TRUE) {
+ if (single_h->state != IDLE) {
status = ML_ERROR_TRY_AGAIN;
goto exit;
}
single_h->input = input;
single_h->output = output;
- single_h->data_ready = TRUE;
+ single_h->state = RUNNING;
+ single_h->ignore_output = FALSE;
end_time = g_get_monotonic_time () +
single_h->timeout * G_TIME_SPAN_MILLISECOND;
if (g_cond_wait_until (&single_h->cond, &single_h->mutex, end_time)) {
status = single_h->status;
} else {
+ ml_logw ("Wait for invoke has timed out");
status = ML_ERROR_TIMED_OUT;
/** This is set to notify invoke_thread to not process if timedout */
- single_h->data_ready = FALSE;
+ single_h->ignore_output = TRUE;
+
+ /** Free if any output memory was allocated */
+ if (*single_h->output != NULL) {
+ ml_tensors_data_destroy ((ml_tensors_data_h) *single_h->output);
+ *single_h->output = NULL;
+ }
}
exit:
status = ml_single_invoke (single, input, &output);
if (ss_data->expect) {
if (ss_data->timeout != 0 && ss_data->timeout < ss_data->min_time_to_run) {
- EXPECT_EQ (status, ML_ERROR_TIMED_OUT);
+ EXPECT_TRUE (status == ML_ERROR_TIMED_OUT ||
+ status == ML_ERROR_TRY_AGAIN);
EXPECT_TRUE (output == NULL);
} else {
EXPECT_EQ (status, ML_ERROR_NONE);
/* check the old buffer is dropped */
status = ml_single_invoke (single, input, &output);
- EXPECT_EQ (status, ML_ERROR_TIMED_OUT);
+ /* try_again implies that previous invoke hasn't finished yet */
+ EXPECT_TRUE (status == ML_ERROR_TIMED_OUT || status == ML_ERROR_TRY_AGAIN);
EXPECT_TRUE (output == NULL);
/* set timeout 5 s */
status = ml_single_set_timeout (single, 5000);
+ /* clear out previous buffers */
+ g_usleep (1000000); /** 1 sec */
status = ml_single_invoke (single, input, &output);
EXPECT_EQ (status, ML_ERROR_NONE);