[Query] util function to receive data
authorJaeyun <jy1210.jung@samsung.com>
Wed, 1 Dec 2021 10:41:26 +0000 (19:41 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Fri, 3 Dec 2021 04:27:45 +0000 (13:27 +0900)
Add util function to receive data and generate gst-buffer.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
gst/nnstreamer/tensor_query/tensor_query_client.c
gst/nnstreamer/tensor_query/tensor_query_common.c
gst/nnstreamer/tensor_query/tensor_query_common.h

index e2a4318..66601f9 100644 (file)
@@ -590,14 +590,8 @@ gst_tensor_query_client_chain (GstPad * pad,
     GstObject * parent, GstBuffer * buf)
 {
   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
-  TensorQueryCommandData cmd_buf;
-  guint i, num_tensors = 0;
-  guint mem_sizes[NNS_TENSOR_SIZE_LIMIT];
   GstBuffer *out_buf = NULL;
-  GstMemory *out_mem;
-  GstMapInfo out_info;
   GstFlowReturn res = GST_FLOW_OK;
-  gint ecode;
 
   UNUSED (pad);
 
@@ -607,64 +601,17 @@ gst_tensor_query_client_chain (GstPad * pad,
     goto retry;
   }
 
-  /** Receive start command buffer */
-  if (0 != nnstreamer_query_receive (self->sink_conn, &cmd_buf)) {
-    nns_loge ("Failed to receive start command buffer");
-    goto retry;
-  }
+  out_buf = tensor_query_receive_buffer (self->sink_conn);
+  if (out_buf) {
+    /* metadata from incoming buffer */
+    gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
 
-  if (cmd_buf.cmd == _TENSOR_QUERY_CMD_TRANSFER_START) {
-    num_tensors = cmd_buf.data_info.num_mems;
-
-    for (i = 0; i < num_tensors; i++) {
-      mem_sizes[i] = cmd_buf.data_info.mem_sizes[i];
-    }
+    res = gst_pad_push (self->srcpad, out_buf);
+    goto done;
   }
 
-  out_buf = gst_buffer_new ();
-
-  /** Receive data command buffer */
-  for (i = 0; i < num_tensors; i++) {
-    out_mem = gst_allocator_alloc (NULL, mem_sizes[i], NULL);
-    gst_buffer_append_memory (out_buf, out_mem);
-
-    if (!gst_memory_map (out_mem, &out_info, GST_MAP_WRITE)) {
-      nns_loge ("Cannot map gst memory (query-client buffer)");
-      goto error;
-    }
-    cmd_buf.data.data = out_info.data;
-
-    ecode = nnstreamer_query_receive (self->sink_conn, &cmd_buf);
-    gst_memory_unmap (out_mem, &out_info);
-
-    if (ecode != 0) {
-      nns_loge ("Failed to receive %u th data command buffer", i);
-      goto error;
-    }
-  }
-
-  /** Receive end command buffer */
-  if (0 != nnstreamer_query_receive (self->sink_conn, &cmd_buf)) {
-    nns_loge ("Failed to receive end command buffer");
-    goto error;
-  }
-  if (cmd_buf.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
-    nns_loge ("Expected _TENSOR_QUERY_CMD_TRANSFER_END, but received %d.",
-        cmd_buf.cmd);
-    goto error;
-  }
-
-  out_buf = gst_buffer_make_writable (out_buf);
-
-  /* metadata from incoming buffer */
-  gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
-
-  res = gst_pad_push (self->srcpad, out_buf);
-
-  goto done;
+  nns_logw ("Failed to receive result from server node, retry connection.");
 
-error:
-  gst_buffer_unref (out_buf);
 retry:
   if (!self->operation || !_client_retry_connection (self)) {
     nns_loge ("Failed to retry connection");
index 0ea2bbd..2609ef7 100644 (file)
@@ -682,15 +682,10 @@ _message_handler (void *thread_data)
   TensorQueryConnection *_conn = _thread_data->conn;
   TensorQueryServerData *_sdata = _thread_data->sdata;
   TensorQueryCommandData cmd_data;
-  TensorQueryDataInfo data_info;
   size_t size;
   GIOCondition condition;
   GstBuffer *outbuf = NULL;
-  GstMemory *mem = NULL;
-  GstMapInfo map;
   GstMetaQuery *meta_query;
-  uint32_t i;
-  int32_t ecode;
 
   cmd_data.cmd = _TENSOR_QUERY_CMD_CLIENT_ID;
   cmd_data.client_id = g_get_monotonic_time ();
@@ -768,55 +763,16 @@ _message_handler (void *thread_data)
       break;
     }
 
-    if (0 != nnstreamer_query_receive (_conn, &cmd_data)) {
-      nns_loge ("Failed to receive cmd");
-      break;
-    }
-
-    if (cmd_data.cmd == _TENSOR_QUERY_CMD_TRANSFER_START) {
-      data_info = cmd_data.data_info;
-      outbuf = gst_buffer_new ();
-      for (i = 0; i < data_info.num_mems; i++) {
-        mem = gst_allocator_alloc (NULL, data_info.mem_sizes[i], NULL);
-        gst_buffer_append_memory (outbuf, mem);
-
-        if (!gst_memory_map (mem, &map, GST_MAP_READWRITE)) {
-          nns_loge ("Failed to map the memory to receive data.");
-          goto reset_buffer;
-        }
-
-        cmd_data.data.data = map.data;
-        cmd_data.data.size = map.size;
-
-        ecode = nnstreamer_query_receive (_conn, &cmd_data);
-        gst_memory_unmap (mem, &map);
-
-        if (ecode != 0) {
-          nns_logi ("Failed to receive data");
-          goto reset_buffer;
-        }
-      }
-
-      /* receive end */
-      if (0 != nnstreamer_query_receive (_conn, &cmd_data) ||
-          cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
-        nns_logi ("Failed to receive end command");
-        goto reset_buffer;
-      }
-
+    outbuf = tensor_query_receive_buffer (_conn);
+    if (outbuf) {
       meta_query = gst_buffer_add_meta_query (outbuf);
       if (meta_query) {
         meta_query->client_id =
             nnstreamer_query_connection_get_client_id (_conn);
       }
-    } else {
-      nns_logi ("Failed to receive start command.");
-      break;
+
+      g_async_queue_push (_sdata->msg_queue, outbuf);
     }
-    g_async_queue_push (_sdata->msg_queue, outbuf);
-    continue;
-  reset_buffer:
-    gst_buffer_unref (outbuf);
   }
 
 done:
@@ -1025,3 +981,72 @@ error:
 
   return done;
 }
+
+/**
+ * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer.
+ * @return Newly generated gst-buffer. Null if failed to receive data.
+ * @todo This function should be used in nnstreamer element. Update function name rule and params later.
+ */
+GstBuffer *
+tensor_query_receive_buffer (query_connection_handle connection)
+{
+  TensorQueryCommandData cmd_data = { 0 };
+  GstBuffer *buffer = NULL;
+  gboolean done = FALSE;
+  gpointer data;
+  gsize len;
+  guint i, num_mems;
+
+  if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
+    nns_loge ("Failed to receive start command.");
+    goto error;
+  }
+
+  if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_START) {
+    nns_loge ("Invalid command %d, cannot start data transfer.", cmd_data.cmd);
+    goto error;
+  }
+
+  buffer = gst_buffer_new ();
+
+  num_mems = cmd_data.data_info.num_mems;
+  for (i = 0; i < num_mems; i++) {
+    len = cmd_data.data_info.mem_sizes[i];
+    data = g_malloc0 (len);
+
+    cmd_data.data.data = data;
+    cmd_data.data.size = len;
+
+    if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
+      nns_loge ("Failed to receive %uth data.", i);
+      g_free (data);
+      goto error;
+    }
+
+    gst_buffer_append_memory (buffer,
+        gst_memory_new_wrapped (0, data, len, 0, len, data, g_free));
+  }
+
+  /* done */
+  if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
+    nns_loge ("Failed to receive end command.");
+    goto error;
+  }
+
+  if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
+    nns_loge ("Invalid command %d, failed to transfer data.", cmd_data.cmd);
+    goto error;
+  }
+
+  done = TRUE;
+
+error:
+  if (!done) {
+    if (buffer) {
+      gst_buffer_unref (buffer);
+      buffer = NULL;
+    }
+  }
+
+  return buffer;
+}
index 6ed5f48..fc14327 100644 (file)
@@ -178,6 +178,14 @@ extern gboolean
 tensor_query_send_buffer (query_connection_handle connection,
     GstElement * element, GstBuffer * buffer, guint timeout);
 
+/**
+ * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer.
+ * @return Newly generated gst-buffer. Null if failed to receive data.
+ * @todo This function should be used in nnstreamer element. Update function name rule and params later.
+ */
+extern GstBuffer *
+tensor_query_receive_buffer (query_connection_handle connection);
+
 #ifdef __cplusplus
 }
 #endif /* __cplusplus */