[Query] util function using integer id accepted/tizen/unified/20240423.164552 accepted/tizen/unified/x/20240425.051115
authorJaeyun Jung <jy1210.jung@samsung.com>
Fri, 12 Apr 2024 12:46:21 +0000 (21:46 +0900)
committerSangjung Woo <again4you@gmail.com>
Mon, 15 Apr 2024 11:11:38 +0000 (20:11 +0900)
To prevent double free case, remove query handle and use integer id.
Also, add util function to hide edge handle in query elements and fix null ptr case.

Signed-off-by: Jaeyun Jung <jy1210.jung@samsung.com>
gst/nnstreamer/tensor_query/tensor_query_server.c
gst/nnstreamer/tensor_query/tensor_query_server.h
gst/nnstreamer/tensor_query/tensor_query_serversink.c
gst/nnstreamer/tensor_query/tensor_query_serversink.h
gst/nnstreamer/tensor_query/tensor_query_serversrc.c
gst/nnstreamer/tensor_query/tensor_query_serversrc.h

index 8f6f6fb..bea34ab 100644 (file)
@@ -42,123 +42,240 @@ _release_server_data (gpointer data)
   if (!_data)
     return;
 
+  g_mutex_lock (&_data->lock);
   if (_data->edge_h) {
     nns_edge_release_handle (_data->edge_h);
     _data->edge_h = NULL;
   }
+  g_mutex_unlock (&_data->lock);
 
   g_mutex_clear (&_data->lock);
   g_cond_clear (&_data->cond);
-  g_free (_data->id);
+
   g_free (_data);
 }
 
 /**
  * @brief Get nnstreamer edge server handle.
  */
-edge_server_handle
-gst_tensor_query_server_get_handle (const char *id)
+static GstTensorQueryServer *
+gst_tensor_query_server_get_handle (const guint id)
 {
-  edge_server_handle p;
+  GstTensorQueryServer *data;
 
   G_LOCK (query_server_table);
-  p = g_hash_table_lookup (_qs_table, id);
+  data = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
   G_UNLOCK (query_server_table);
 
-  return p;
+  return data;
 }
 
 /**
  * @brief Add nnstreamer edge server handle into hash table.
  */
-edge_server_handle
-gst_tensor_query_server_add_data (const char *id)
+gboolean
+gst_tensor_query_server_add_data (const guint id)
 {
-  GstTensorQueryServer *data = NULL;
+  GstTensorQueryServer *data;
 
   data = gst_tensor_query_server_get_handle (id);
 
   if (NULL != data) {
-    return data;
+    return TRUE;
   }
 
   data = g_try_new0 (GstTensorQueryServer, 1);
   if (NULL == data) {
-    GST_ERROR ("Failed to allocate memory for tensor query server data.");
-    return NULL;
+    nns_loge ("Failed to allocate memory for tensor query server data.");
+    return FALSE;
   }
 
   g_mutex_init (&data->lock);
   g_cond_init (&data->cond);
-  data->id = g_strdup (id);
+  data->id = id;
   data->configured = FALSE;
 
   G_LOCK (query_server_table);
-  g_hash_table_insert (_qs_table, g_strdup (id), data);
+  g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
   G_UNLOCK (query_server_table);
 
-  return data;
+  return TRUE;
 }
 
 /**
- * @brief Get nnstreamer edge handle of query server.
+ * @brief Prepare edge connection and its handle.
  */
-nns_edge_h
-gst_tensor_query_server_get_edge_handle (const char *id,
-    nns_edge_connect_type_e connect_type)
+gboolean
+gst_tensor_query_server_prepare (const guint id,
+    nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo * edge_info)
 {
-  GstTensorQueryServer *data = NULL;
+  GstTensorQueryServer *data;
+  gchar *port_str, *id_str;
+  gboolean prepared = FALSE;
   int ret;
 
   data = gst_tensor_query_server_get_handle (id);
-  if (!data)
-    return NULL;
+  if (NULL == data) {
+    return FALSE;
+  }
+
+  id_str = g_strdup_printf ("%u", id);
 
   g_mutex_lock (&data->lock);
-  if (data->edge_h) {
-    goto done;
+  if (data->edge_h == NULL) {
+    ret = nns_edge_create_handle (id_str, connect_type,
+        NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      GST_ERROR ("Failed to get nnstreamer edge handle.");
+      goto done;
+    }
   }
 
-  ret = nns_edge_create_handle (id, connect_type,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
-  if (NNS_EDGE_ERROR_NONE != ret) {
-    GST_ERROR ("Failed to get nnstreamer edge handle.");
+  if (edge_info) {
+    if (edge_info->host) {
+      nns_edge_set_info (data->edge_h, "HOST", edge_info->host);
+    }
+    if (edge_info->port > 0) {
+      port_str = g_strdup_printf ("%u", edge_info->port);
+      nns_edge_set_info (data->edge_h, "PORT", port_str);
+      g_free (port_str);
+    }
+    if (edge_info->dest_host) {
+      nns_edge_set_info (data->edge_h, "DEST_HOST", edge_info->dest_host);
+    }
+    if (edge_info->dest_port > 0) {
+      port_str = g_strdup_printf ("%u", edge_info->dest_port);
+      nns_edge_set_info (data->edge_h, "DEST_PORT", port_str);
+      g_free (port_str);
+    }
+    if (edge_info->topic) {
+      nns_edge_set_info (data->edge_h, "TOPIC", edge_info->topic);
+    }
+
+    nns_edge_set_event_callback (data->edge_h, edge_info->cb, edge_info->pdata);
+
+    ret = nns_edge_start (data->edge_h);
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      nns_loge
+          ("Failed to start NNStreamer-edge. Please check server IP and port.");
+      goto done;
+    }
   }
 
+  prepared = TRUE;
+
 done:
   g_mutex_unlock (&data->lock);
-  return data->edge_h;
+  g_free (id_str);
+  return prepared;
 }
 
 /**
- * @brief Release nnstreamer edge handle of query server.
+ * @brief Send buffer to connected edge device.
  */
-void
-gst_tensor_query_server_release_edge_handle (edge_server_handle server_h)
+gboolean
+gst_tensor_query_server_send_buffer (const guint id, GstBuffer * buffer)
 {
-  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
+  GstTensorQueryServer *data;
+  GstMetaQuery *meta_query;
+  nns_edge_data_h data_h;
+  guint i, num_tensors = 0;
+  gint ret = NNS_EDGE_ERROR_NONE;
+  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
+  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
+  gchar *val;
+  gboolean sent = FALSE;
+
+  data = gst_tensor_query_server_get_handle (id);
+
+  if (NULL == data) {
+    nns_loge ("Failed to send buffer, server handle is null.");
+    return FALSE;
+  }
+
+  meta_query = gst_buffer_get_meta_query (buffer);
+  if (!meta_query) {
+    nns_loge ("Failed to send buffer, cannot get tensor query meta.");
+    return FALSE;
+  }
+
+  ret = nns_edge_data_create (&data_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_loge ("Failed to create edge data handle in query server.");
+    return FALSE;
+  }
+
+  num_tensors = gst_tensor_buffer_get_count (buffer);
+  for (i = 0; i < num_tensors; i++) {
+    mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
+
+    if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
+      ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
+      gst_memory_unref (mem[i]);
+      num_tensors = i;
+      goto done;
+    }
+
+    nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
+  }
+
+  val = g_strdup_printf ("%lld", (long long) meta_query->client_id);
+  nns_edge_data_set_info (data_h, "client_id", val);
+  g_free (val);
 
   g_mutex_lock (&data->lock);
-  nns_edge_release_handle (data->edge_h);
-  data->edge_h = NULL;
+  ret = nns_edge_send (data->edge_h, data_h);
   g_mutex_unlock (&data->lock);
+
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_loge ("Failed to send edge data handle in query server.");
+    goto done;
+  }
+
+  sent = TRUE;
+
+done:
+  for (i = 0; i < num_tensors; i++) {
+    gst_memory_unmap (mem[i], &map[i]);
+    gst_memory_unref (mem[i]);
+  }
+
+  nns_edge_data_destroy (data_h);
+
+  return sent;
 }
 
 /**
- * @brief Remove GstTensorQueryServer.
+ * @brief Release nnstreamer edge handle of query server.
  */
 void
-gst_tensor_query_server_remove_data (edge_server_handle server_h)
+gst_tensor_query_server_release_edge_handle (const guint id)
 {
-  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
+  GstTensorQueryServer *data;
+
+  data = gst_tensor_query_server_get_handle (id);
 
   if (NULL == data) {
     return;
   }
 
+  g_mutex_lock (&data->lock);
+  if (data->edge_h) {
+    nns_edge_release_handle (data->edge_h);
+    data->edge_h = NULL;
+  }
+  g_mutex_unlock (&data->lock);
+}
+
+/**
+ * @brief Remove GstTensorQueryServer.
+ */
+void
+gst_tensor_query_server_remove_data (const guint id)
+{
   G_LOCK (query_server_table);
-  if (g_hash_table_lookup (_qs_table, data->id))
-    g_hash_table_remove (_qs_table, data->id);
+  if (g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id)))
+    g_hash_table_remove (_qs_table, GUINT_TO_POINTER (id));
   G_UNLOCK (query_server_table);
 }
 
@@ -166,10 +283,12 @@ gst_tensor_query_server_remove_data (edge_server_handle server_h)
  * @brief Wait until the sink is configured and get server info handle.
  */
 gboolean
-gst_tensor_query_server_wait_sink (edge_server_handle server_h)
+gst_tensor_query_server_wait_sink (const guint id)
 {
   gint64 end_time;
-  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
+  GstTensorQueryServer *data;
+
+  data = gst_tensor_query_server_get_handle (id);
 
   if (NULL == data) {
     return FALSE;
@@ -194,12 +313,16 @@ gst_tensor_query_server_wait_sink (edge_server_handle server_h)
  * @brief set query server sink configured.
  */
 void
-gst_tensor_query_server_set_configured (edge_server_handle server_h)
+gst_tensor_query_server_set_configured (const guint id)
 {
-  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
+  GstTensorQueryServer *data;
+
+  data = gst_tensor_query_server_get_handle (id);
+
   if (NULL == data) {
     return;
   }
+
   g_mutex_lock (&data->lock);
   data->configured = TRUE;
   g_cond_broadcast (&data->cond);
@@ -210,17 +333,20 @@ gst_tensor_query_server_set_configured (edge_server_handle server_h)
  * @brief set query server caps.
  */
 void
-gst_tensor_query_server_set_caps (edge_server_handle server_h,
-    const char *caps_str)
+gst_tensor_query_server_set_caps (const guint id, const gchar * caps_str)
 {
-  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
-  gchar *prev_caps_str = NULL, *new_caps_str;
+  GstTensorQueryServer *data;
+  gchar *prev_caps_str, *new_caps_str;
+
+  data = gst_tensor_query_server_get_handle (id);
 
   if (NULL == data) {
     return;
   }
+
   g_mutex_lock (&data->lock);
 
+  prev_caps_str = new_caps_str = NULL;
   nns_edge_get_info (data->edge_h, "CAPS", &prev_caps_str);
   if (!prev_caps_str)
     prev_caps_str = g_strdup ("");
@@ -241,7 +367,7 @@ init_queryserver (void)
 {
   G_LOCK (query_server_table);
   g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */
-  _qs_table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free,
+  _qs_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
       _release_server_data);
   G_UNLOCK (query_server_table);
 }
index c07558b..f392cea 100644 (file)
 G_BEGIN_DECLS
 #define DEFAULT_SERVER_ID 0
 #define DEFAULT_QUERY_INFO_TIMEOUT 5
-typedef void *edge_server_handle;
+
+/**
+ * @brief Internal data structure for nns-edge info to prepare edge connection.
+ */
+typedef struct
+{
+  gchar *host;
+  guint16 port;
+  gchar *dest_host;
+  guint16 dest_port;
+  gchar *topic;
+
+  /* nns-edge callback info */
+  nns_edge_event_cb cb;
+  void *pdata;
+} GstTensorQueryEdgeInfo;
 
 /**
  * @brief GstTensorQueryServer internal info data structure.
  */
 typedef struct
 {
-  char *id;
+  guint id;
   gboolean configured;
   GMutex lock;
   GCond cond;
@@ -37,46 +52,45 @@ typedef struct
 } GstTensorQueryServer;
 
 /**
- * @brief Get nnstreamer edge server handle.
+ * @brief Add GstTensorQueryServer.
  */
-edge_server_handle gst_tensor_query_server_get_handle (const char *id);
+gboolean gst_tensor_query_server_add_data (const guint id);
 
 /**
- * @brief Add GstTensorQueryServer.
+ * @brief Prepare edge connection and its handle.
  */
-edge_server_handle gst_tensor_query_server_add_data (const char *id);
+gboolean gst_tensor_query_server_prepare (const guint id,
+    nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo *edge_info);
 
 /**
  * @brief Remove GstTensorQueryServer.
  */
-void gst_tensor_query_server_remove_data (edge_server_handle server_h);
+void gst_tensor_query_server_remove_data (const guint id);
 
 /**
  * @brief Wait until the sink is configured and get server info handle.
  */
-gboolean gst_tensor_query_server_wait_sink (edge_server_handle server_h);
+gboolean gst_tensor_query_server_wait_sink (const guint id);
 
 /**
- * @brief Get nnstreamer edge handle of query server.
+ * @brief Send buffer to connected edge device.
  */
-nns_edge_h gst_tensor_query_server_get_edge_handle (const char *id, nns_edge_connect_type_e connect_type);
+gboolean gst_tensor_query_server_send_buffer (const guint id, GstBuffer *buffer);
 
 /**
  * @brief set query server sink configured.
  */
-void gst_tensor_query_server_set_configured (edge_server_handle server_h);
+void gst_tensor_query_server_set_configured (const guint id);
 
 /**
  * @brief set query server caps.
  */
-void
-gst_tensor_query_server_set_caps (edge_server_handle server_h,
-    const char *caps_str);
+void gst_tensor_query_server_set_caps (const guint id, const gchar *caps_str);
 
 /**
  * @brief Release nnstreamer edge handle of query server.
  */
-void gst_tensor_query_server_release_edge_handle (edge_server_handle server_h);
+void gst_tensor_query_server_release_edge_handle (const guint id);
 
 G_END_DECLS
 #endif /* __GST_TENSOR_QUERY_CLIENT_H__ */
index 847fd8d..d06e57a 100644 (file)
@@ -130,37 +130,37 @@ static void
 gst_tensor_query_serversink_finalize (GObject * object)
 {
   GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (object);
-  gst_tensor_query_server_remove_data (sink->server_h);
+  gst_tensor_query_server_remove_data (sink->sink_id);
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 /**
  * @brief start processing of query_serversink
  */
-static void
+static gboolean
 _gst_tensor_query_serversink_start (GstTensorQueryServerSink * sink)
 {
-  gchar *id_str = NULL;
+  gboolean ret;
 
-  id_str = g_strdup_printf ("%u", sink->sink_id);
-  sink->server_h = gst_tensor_query_server_add_data (id_str);
-  g_free (id_str);
+  ret = gst_tensor_query_server_add_data (sink->sink_id);
+  if (ret)
+    gst_tensor_query_server_set_configured (sink->sink_id);
 
-  gst_tensor_query_server_set_configured (sink->server_h);
+  return ret;
 }
 
 /**
  * @brief start processing of query_serversink
  */
-static void
+static gboolean
 _gst_tensor_query_serversink_playing (GstTensorQueryServerSink * sink)
 {
-  gchar *id_str = NULL;
+  gboolean ret;
+
+  ret = gst_tensor_query_server_prepare (sink->sink_id, sink->connect_type,
+      NULL);
 
-  id_str = g_strdup_printf ("%u", sink->sink_id);
-  sink->edge_h =
-      gst_tensor_query_server_get_edge_handle (id_str, sink->connect_type);
-  g_free (id_str);
+  return ret;
 }
 
 /**
@@ -175,18 +175,17 @@ gst_tensor_query_serversink_change_state (GstElement * element,
 
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      _gst_tensor_query_serversink_playing (sink);
-      if (!sink->edge_h) {
+      if (!_gst_tensor_query_serversink_playing (sink)) {
         nns_loge ("Failed to change state from PAUSED to PLAYING.");
         return GST_STATE_CHANGE_FAILURE;
       }
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      _gst_tensor_query_serversink_start (sink);
-      if (!sink->server_h) {
+      if (!_gst_tensor_query_serversink_start (sink)) {
         nns_loge ("Failed to change state from READY to PAUSED.");
         return GST_STATE_CHANGE_FAILURE;
       }
+      break;
     default:
       break;
   }
@@ -199,8 +198,7 @@ gst_tensor_query_serversink_change_state (GstElement * element,
 
   switch (transition) {
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      gst_tensor_query_server_release_edge_handle (sink->server_h);
-      sink->edge_h = NULL;
+      gst_tensor_query_server_release_edge_handle (sink->sink_id);
       break;
     default:
       break;
@@ -277,7 +275,7 @@ gst_tensor_query_serversink_set_caps (GstBaseSink * bsink, GstCaps * caps)
   caps_str = gst_caps_to_string (caps);
 
   new_caps_str = g_strdup_printf ("@query_server_sink_caps@%s", caps_str);
-  gst_tensor_query_server_set_caps (sink->server_h, new_caps_str);
+  gst_tensor_query_server_set_caps (sink->sink_id, new_caps_str);
 
   g_free (new_caps_str);
   g_free (caps_str);
@@ -293,42 +291,15 @@ gst_tensor_query_serversink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
   GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
   GstMetaQuery *meta_query;
-  nns_edge_data_h data_h;
-  guint i, num_tensors = 0;
-  gint ret;
-  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
-  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
-  char *val;
 
   meta_query = gst_buffer_get_meta_query (buf);
   if (meta_query) {
     sink->metaless_frame_count = 0;
 
-    ret = nns_edge_data_create (&data_h);
-    if (ret != NNS_EDGE_ERROR_NONE) {
-      nns_loge ("Failed to create data handle in server sink.");
+    if (!gst_tensor_query_server_send_buffer (sink->sink_id, buf)) {
+      nns_loge ("Failed to send buffer to edge device in server sink.");
       return GST_FLOW_ERROR;
     }
-
-    num_tensors = gst_tensor_buffer_get_count (buf);
-    for (i = 0; i < num_tensors; i++) {
-      mem[i] = gst_tensor_buffer_get_nth_memory (buf, i);
-      if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
-        ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
-        gst_memory_unref (mem[i]);
-        num_tensors = i;
-        nns_edge_data_destroy (data_h);
-        goto done;
-      }
-      nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
-    }
-
-    val = g_strdup_printf ("%lld", (long long) meta_query->client_id);
-    nns_edge_data_set_info (data_h, "client_id", val);
-    g_free (val);
-
-    nns_edge_send (sink->edge_h, data_h);
-    nns_edge_data_destroy (data_h);
   } else {
     nns_logw ("Cannot get tensor query meta. Drop buffers!\n");
     sink->metaless_frame_count++;
@@ -341,11 +312,6 @@ gst_tensor_query_serversink_render (GstBaseSink * bsink, GstBuffer * buf)
       return GST_FLOW_ERROR;
     }
   }
-done:
-  for (i = 0; i < num_tensors; i++) {
-    gst_memory_unmap (mem[i], &map[i]);
-    gst_memory_unref (mem[i]);
-  }
 
   return GST_FLOW_OK;
 }
index e9dae73..38b434f 100644 (file)
@@ -47,8 +47,6 @@ struct _GstTensorQueryServerSink
   gint metaless_frame_count;
 
   nns_edge_connect_type_e connect_type;
-  edge_server_handle server_h;
-  nns_edge_h edge_h;
 };
 
 /**
index 22dd2fd..a9766d0 100644 (file)
@@ -88,6 +88,7 @@ gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass)
   gobject_class->get_property = gst_tensor_query_serversrc_get_property;
   gobject_class->finalize = gst_tensor_query_serversrc_finalize;
   gstelement_class->change_state = gst_tensor_query_serversrc_change_state;
+  gstpushsrc_class->create = gst_tensor_query_serversrc_create;
 
   g_object_class_install_property (gobject_class, PROP_HOST,
       g_param_spec_string ("host", "Host", "The hostname to listen as",
@@ -136,8 +137,6 @@ gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass)
       "Receive tensor data as a server over the network",
       "Samsung Electronics Co., Ltd.");
 
-  gstpushsrc_class->create = gst_tensor_query_serversrc_create;
-
   GST_DEBUG_CATEGORY_INIT (gst_tensor_query_serversrc_debug,
       "tensor_query_serversrc", 0, "Tensor Query Server Source");
 }
@@ -233,18 +232,15 @@ _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
 static gboolean
 _gst_tensor_query_serversrc_start (GstTensorQueryServerSrc * src)
 {
-  gchar *id_str = NULL;
+  gboolean ret = FALSE;
 
-  id_str = g_strdup_printf ("%u", src->src_id);
-  src->server_h = gst_tensor_query_server_add_data (id_str);
-  g_free (id_str);
+  if (gst_tensor_query_server_add_data (src->src_id))
+    ret = gst_tensor_query_server_wait_sink (src->src_id);
 
-  if (!gst_tensor_query_server_wait_sink (src->server_h)) {
+  if (!ret)
     nns_loge ("Failed to get server information from query server.");
-    return FALSE;
-  }
 
-  return TRUE;
+  return ret;
 }
 
 /**
@@ -254,44 +250,20 @@ static gboolean
 _gst_tensor_query_serversrc_playing (GstTensorQueryServerSrc * src,
     nns_edge_connect_type_e connect_type)
 {
-  gchar *id_str = NULL, *port = NULL, *dest_port = NULL;
+  GstTensorQueryEdgeInfo edge_info = { 0 };
+  gboolean ret;
 
-  id_str = g_strdup_printf ("%u", src->src_id);
-  src->edge_h = gst_tensor_query_server_get_edge_handle (id_str, connect_type);
-  g_free (id_str);
+  edge_info.host = src->host;
+  edge_info.port = src->port;
+  edge_info.dest_host = src->dest_host;
+  edge_info.dest_port = src->dest_port;
+  edge_info.topic = src->topic;
+  edge_info.cb = _nns_edge_event_cb;
+  edge_info.pdata = src;
 
-  if (!src->edge_h) {
-    return FALSE;
-  }
-
-  if (src->host)
-    nns_edge_set_info (src->edge_h, "HOST", src->host);
-  if (src->port > 0) {
-    port = g_strdup_printf ("%u", src->port);
-    nns_edge_set_info (src->edge_h, "PORT", port);
-    g_free (port);
-  }
-  if (src->dest_host)
-    nns_edge_set_info (src->edge_h, "DEST_HOST", src->dest_host);
-  if (src->dest_port > 0) {
-    dest_port = g_strdup_printf ("%u", src->dest_port);
-    nns_edge_set_info (src->edge_h, "DEST_PORT", dest_port);
-    g_free (dest_port);
-  }
-  if (src->topic)
-    nns_edge_set_info (src->edge_h, "TOPIC", src->topic);
+  ret = gst_tensor_query_server_prepare (src->src_id, connect_type, &edge_info);
 
-  nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src);
-
-  if (NNS_EDGE_ERROR_NONE != nns_edge_start (src->edge_h)) {
-    nns_loge
-        ("Failed to start NNStreamer-edge. Please check server IP and port.");
-    return FALSE;
-  }
-
-  src->playing = TRUE;
-
-  return TRUE;
+  return ret;
 }
 
 /**
@@ -331,11 +303,10 @@ gst_tensor_query_serversrc_change_state (GstElement * element,
   switch (transition) {
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       src->playing = FALSE;
-      gst_tensor_query_server_release_edge_handle (src->server_h);
-      src->edge_h = NULL;
+      gst_tensor_query_server_release_edge_handle (src->src_id);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_tensor_query_server_remove_data (src->server_h);
+      gst_tensor_query_server_remove_data (src->src_id);
       break;
     default:
       break;
@@ -459,12 +430,13 @@ _gst_tensor_query_serversrc_get_buffer (GstTensorQueryServerSrc * src)
   GstMetaQuery *meta_query;
   int ret;
 
-  while (src->playing && !data_h)
+  while (src->playing && !data_h) {
     data_h = g_async_queue_timeout_pop (src->msg_queue,
         DEFAULT_DATA_POP_TIMEOUT);
+  }
 
   if (!data_h) {
-    nns_loge ("Failed to get message from the server message queue");
+    nns_loge ("Failed to get message from the server message queue.");
     return NULL;
   }
 
@@ -515,6 +487,8 @@ gst_tensor_query_serversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (psrc);
   GstBaseSrc *bsrc = GST_BASE_SRC (psrc);
+  GstStateChangeReturn sret;
+  GstState state = GST_STATE_NULL;
 
   if (!src->configured) {
     gchar *caps_str, *new_caps_str;
@@ -527,7 +501,7 @@ gst_tensor_query_serversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
     caps_str = gst_caps_to_string (caps);
 
     new_caps_str = g_strdup_printf ("@query_server_src_caps@%s", caps_str);
-    gst_tensor_query_server_set_caps (src->server_h, new_caps_str);
+    gst_tensor_query_server_set_caps (src->src_id, new_caps_str);
     g_free (new_caps_str);
     g_free (caps_str);
 
@@ -537,6 +511,12 @@ gst_tensor_query_serversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 
   *outbuf = _gst_tensor_query_serversrc_get_buffer (src);
   if (*outbuf == NULL) {
+    sret = gst_element_get_state (GST_ELEMENT (psrc), &state, NULL, 0);
+    if (sret != GST_STATE_CHANGE_SUCCESS || state != GST_STATE_PLAYING) {
+      nns_logw ("Failed to get buffer for query server, not in PLAYING state.");
+      return GST_FLOW_FLUSHING;
+    }
+
     nns_loge ("Failed to get buffer to push to the tensor query serversrc.");
     return GST_FLOW_ERROR;
   }
index 2d93d62..769515f 100644 (file)
@@ -52,8 +52,6 @@ struct _GstTensorQueryServerSrc
   gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */
 
   nns_edge_connect_type_e connect_type;
-  edge_server_handle server_h;
-  nns_edge_h edge_h;
   GAsyncQueue *msg_queue;
   gboolean playing;
 };