GstObject * parent, GstBuffer * buf)
{
GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
- TensorQueryCommandData cmd_buf;
- guint i, num_tensors = 0;
- guint mem_sizes[NNS_TENSOR_SIZE_LIMIT];
GstBuffer *out_buf = NULL;
- GstMemory *out_mem;
- GstMapInfo out_info;
GstFlowReturn res = GST_FLOW_OK;
- gint ecode;
UNUSED (pad);
goto retry;
}
- /** Receive start command buffer */
- if (0 != nnstreamer_query_receive (self->sink_conn, &cmd_buf)) {
- nns_loge ("Failed to receive start command buffer");
- goto retry;
- }
+ out_buf = tensor_query_receive_buffer (self->sink_conn);
+ if (out_buf) {
+ /* metadata from incoming buffer */
+ gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
- if (cmd_buf.cmd == _TENSOR_QUERY_CMD_TRANSFER_START) {
- num_tensors = cmd_buf.data_info.num_mems;
-
- for (i = 0; i < num_tensors; i++) {
- mem_sizes[i] = cmd_buf.data_info.mem_sizes[i];
- }
+ res = gst_pad_push (self->srcpad, out_buf);
+ goto done;
}
- out_buf = gst_buffer_new ();
-
- /** Receive data command buffer */
- for (i = 0; i < num_tensors; i++) {
- out_mem = gst_allocator_alloc (NULL, mem_sizes[i], NULL);
- gst_buffer_append_memory (out_buf, out_mem);
-
- if (!gst_memory_map (out_mem, &out_info, GST_MAP_WRITE)) {
- nns_loge ("Cannot map gst memory (query-client buffer)");
- goto error;
- }
- cmd_buf.data.data = out_info.data;
-
- ecode = nnstreamer_query_receive (self->sink_conn, &cmd_buf);
- gst_memory_unmap (out_mem, &out_info);
-
- if (ecode != 0) {
- nns_loge ("Failed to receive %u th data command buffer", i);
- goto error;
- }
- }
-
- /** Receive end command buffer */
- if (0 != nnstreamer_query_receive (self->sink_conn, &cmd_buf)) {
- nns_loge ("Failed to receive end command buffer");
- goto error;
- }
- if (cmd_buf.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
- nns_loge ("Expected _TENSOR_QUERY_CMD_TRANSFER_END, but received %d.",
- cmd_buf.cmd);
- goto error;
- }
-
- out_buf = gst_buffer_make_writable (out_buf);
-
- /* metadata from incoming buffer */
- gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
-
- res = gst_pad_push (self->srcpad, out_buf);
-
- goto done;
+ nns_logw ("Failed to receive result from server node, retry connection.");
-error:
- gst_buffer_unref (out_buf);
retry:
if (!self->operation || !_client_retry_connection (self)) {
nns_loge ("Failed to retry connection");
TensorQueryConnection *_conn = _thread_data->conn;
TensorQueryServerData *_sdata = _thread_data->sdata;
TensorQueryCommandData cmd_data;
- TensorQueryDataInfo data_info;
size_t size;
GIOCondition condition;
GstBuffer *outbuf = NULL;
- GstMemory *mem = NULL;
- GstMapInfo map;
GstMetaQuery *meta_query;
- uint32_t i;
- int32_t ecode;
cmd_data.cmd = _TENSOR_QUERY_CMD_CLIENT_ID;
cmd_data.client_id = g_get_monotonic_time ();
break;
}
- if (0 != nnstreamer_query_receive (_conn, &cmd_data)) {
- nns_loge ("Failed to receive cmd");
- break;
- }
-
- if (cmd_data.cmd == _TENSOR_QUERY_CMD_TRANSFER_START) {
- data_info = cmd_data.data_info;
- outbuf = gst_buffer_new ();
- for (i = 0; i < data_info.num_mems; i++) {
- mem = gst_allocator_alloc (NULL, data_info.mem_sizes[i], NULL);
- gst_buffer_append_memory (outbuf, mem);
-
- if (!gst_memory_map (mem, &map, GST_MAP_READWRITE)) {
- nns_loge ("Failed to map the memory to receive data.");
- goto reset_buffer;
- }
-
- cmd_data.data.data = map.data;
- cmd_data.data.size = map.size;
-
- ecode = nnstreamer_query_receive (_conn, &cmd_data);
- gst_memory_unmap (mem, &map);
-
- if (ecode != 0) {
- nns_logi ("Failed to receive data");
- goto reset_buffer;
- }
- }
-
- /* receive end */
- if (0 != nnstreamer_query_receive (_conn, &cmd_data) ||
- cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
- nns_logi ("Failed to receive end command");
- goto reset_buffer;
- }
-
+ outbuf = tensor_query_receive_buffer (_conn);
+ if (outbuf) {
meta_query = gst_buffer_add_meta_query (outbuf);
if (meta_query) {
meta_query->client_id =
nnstreamer_query_connection_get_client_id (_conn);
}
- } else {
- nns_logi ("Failed to receive start command.");
- break;
+
+ g_async_queue_push (_sdata->msg_queue, outbuf);
}
- g_async_queue_push (_sdata->msg_queue, outbuf);
- continue;
- reset_buffer:
- gst_buffer_unref (outbuf);
}
done:
return done;
}
+
+/**
+ * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer.
+ * @return Newly generated gst-buffer. Null if failed to receive data.
+ * @todo This function should be used in nnstreamer element. Update function name rule and params later.
+ */
+GstBuffer *
+tensor_query_receive_buffer (query_connection_handle connection)
+{
+ TensorQueryCommandData cmd_data = { 0 };
+ GstBuffer *buffer = NULL;
+ gboolean done = FALSE;
+ gpointer data;
+ gsize len;
+ guint i, num_mems;
+
+ if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
+ nns_loge ("Failed to receive start command.");
+ goto error;
+ }
+
+ if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_START) {
+ nns_loge ("Invalid command %d, cannot start data transfer.", cmd_data.cmd);
+ goto error;
+ }
+
+ buffer = gst_buffer_new ();
+
+ num_mems = cmd_data.data_info.num_mems;
+ for (i = 0; i < num_mems; i++) {
+ len = cmd_data.data_info.mem_sizes[i];
+ data = g_malloc0 (len);
+
+ cmd_data.data.data = data;
+ cmd_data.data.size = len;
+
+ if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
+ nns_loge ("Failed to receive %uth data.", i);
+ g_free (data);
+ goto error;
+ }
+
+ gst_buffer_append_memory (buffer,
+ gst_memory_new_wrapped (0, data, len, 0, len, data, g_free));
+ }
+
+ /* done */
+ if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
+ nns_loge ("Failed to receive end command.");
+ goto error;
+ }
+
+ if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
+ nns_loge ("Invalid command %d, failed to transfer data.", cmd_data.cmd);
+ goto error;
+ }
+
+ done = TRUE;
+
+error:
+ if (!done) {
+ if (buffer) {
+ gst_buffer_unref (buffer);
+ buffer = NULL;
+ }
+ }
+
+ return buffer;
+}