[Edge] Tensor query TCP connection
authorgichan <gichan2.jang@samsung.com>
Wed, 8 Jun 2022 00:55:44 +0000 (09:55 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 20 Jul 2022 06:36:16 +0000 (15:36 +0900)
Implement tensor query TCP connection using nnstreamer-edge lib.

Signed-off-by: gichan <gichan2.jang@samsung.com>
22 files changed:
debian/control
debian/control.debian
debian/control.ubuntu.ppa
gst/nnstreamer/meson.build
gst/nnstreamer/registerer/nnstreamer.c
gst/nnstreamer/tensor_query/meson.build
gst/nnstreamer/tensor_query/nnstreamer-edge.h
gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c
gst/nnstreamer/tensor_query/tensor_query_client.c
gst/nnstreamer/tensor_query/tensor_query_client.h
gst/nnstreamer/tensor_query/tensor_query_common.c
gst/nnstreamer/tensor_query/tensor_query_common.h
gst/nnstreamer/tensor_query/tensor_query_server.c
gst/nnstreamer/tensor_query/tensor_query_server.h
gst/nnstreamer/tensor_query/tensor_query_serversink.c
gst/nnstreamer/tensor_query/tensor_query_serversink.h
gst/nnstreamer/tensor_query/tensor_query_serversrc.c
gst/nnstreamer/tensor_query/tensor_query_serversrc.h
jni/nnstreamer.mk
meson.build
meson_options.txt
packaging/nnstreamer.spec

index eb7b581..4c12cb1 100644 (file)
@@ -3,7 +3,7 @@ Section: libs
 Priority: optional
 Maintainer: MyungJoo Ham <myungjoo.ham@samsung.com>
 Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4),
- ninja-build, meson (>=0.50), debhelper (>=9),
+ ninja-build, meson (>=0.50), debhelper (>=9), nnstreamer-edge-dev,
  libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev,
  gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good,
  libgtest-dev, ssat, libpng-dev, libopencv-dev, liborc-0.4-dev, flex, bison,
index e984241..e91a20d 100644 (file)
@@ -3,7 +3,7 @@ Section: libs
 Priority: optional
 Maintainer: MyungJoo Ham <myungjoo.ham@samsung.com>
 Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4),
- ninja-build, meson (>=0.49), debhelper (>=9),
+ ninja-build, meson (>=0.49), debhelper (>=9), nnstreamer-edge-dev,
  libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev,
  gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good,
  libgtest-dev, libpng-dev, libopencv-dev, liborc-0.4-dev, flex, bison,
index 9b6741e..b2c631d 100644 (file)
@@ -3,7 +3,7 @@ Section: libs
 Priority: optional
 Maintainer: MyungJoo Ham <myungjoo.ham@samsung.com>
 Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4),
- ninja-build, meson (>=0.49), debhelper (>=9),
+ ninja-build, meson (>=0.49), debhelper (>=9), nnstreamer-edge-dev,
  libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev,
  gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good,
  libgtest-dev, ssat, libpng-dev, libopencv-dev, liborc-0.4-dev,
index e5e30c2..b7a1e41 100644 (file)
@@ -112,7 +112,7 @@ nnstreamer_deps += nnstreamer_single_dep
 # Build libraries ("both_libraries" are supported from 0.46.)
 nnstreamer_shared = shared_library('nnstreamer',
   nnstreamer_sources,
-  dependencies: nnstreamer_deps,
+  dependencies: [nnstreamer_deps, nns_edge_dep],
   include_directories: nnstreamer_inc,
   install: true,
   install_dir: plugins_install_dir
@@ -132,7 +132,7 @@ meson.add_install_script(
 
 nnstreamer_static = static_library('nnstreamer',
   nnstreamer_sources,
-  dependencies: nnstreamer_deps,
+  dependencies: [nnstreamer_deps, nns_edge_dep],
   include_directories: nnstreamer_inc,
   install: true,
   install_dir: nnstreamer_libdir
index 1fd9895..6f78592 100644 (file)
 #endif /* __gnu_linux__ && !__ANDROID__ */
 
 #include <tensor_filter/tensor_filter.h>
+#if defined(ENABLE_NNSTREAMER_EDGE)
 #include <tensor_query/tensor_query_serversrc.h>
 #include <tensor_query/tensor_query_serversink.h>
 #include <tensor_query/tensor_query_client.h>
+#endif
 
 #define NNSTREAMER_INIT(plugin,name,type) \
   do { \
@@ -104,9 +106,11 @@ gst_nnstreamer_init (GstPlugin * plugin)
   NNSTREAMER_INIT (plugin, transform, TRANSFORM);
   NNSTREAMER_INIT (plugin, if, IF);
   NNSTREAMER_INIT (plugin, rate, RATE);
+#if defined(ENABLE_NNSTREAMER_EDGE)
   NNSTREAMER_INIT (plugin, query_serversrc, QUERY_SERVERSRC);
   NNSTREAMER_INIT (plugin, query_serversink, QUERY_SERVERSINK);
   NNSTREAMER_INIT (plugin, query_client, QUERY_CLIENT);
+#endif
 #if defined(__gnu_linux__) && !defined(__ANDROID__)
   /* IIO requires Linux / non-Android */
 #if (GST_VERSION_MAJOR == 1) && (GST_VERSION_MINOR >= 8)
index 1830053..755fcbd 100644 (file)
@@ -26,6 +26,8 @@ nnstreamer_edge_deps = [
   glib_dep, gio_dep, thread_dep
 ]
 
+nnstreamer_headers += join_paths(meson.current_source_dir(), 'nnstreamer-edge.h')
+
 if aitt_support_is_available
   nnstreamer_edge_sources += join_paths(meson.current_source_dir(), 'nnstreamer_edge_aitt.c')
   nnstreamer_edge_deps += aitt_support_deps
index b7b3955..0382909 100644 (file)
@@ -64,6 +64,7 @@ typedef enum {
 
 typedef enum {
   NNS_EDGE_PROTOCOL_TCP = 0,
+  NNS_EDGE_PROTOCOL_UDP,
   NNS_EDGE_PROTOCOL_MQTT,
   NNS_EDGE_PROTOCOL_AITT,
   NNS_EDGE_PROTOCOL_AITT_TCP,
index 7c7ac5a..43b218b 100644 (file)
@@ -735,6 +735,7 @@ nns_edge_release_handle (nns_edge_h edge_h)
   g_free (eh->topic);
   g_free (eh->ip);
   g_free (eh->recv_ip);
+  g_free (eh->caps_str);
   g_hash_table_destroy (eh->conn_table);
 
   nns_edge_unlock (eh);
index 3317a58..3397b8e 100644 (file)
 #include <gio/gio.h>
 #include <glib.h>
 #include <string.h>
+#include "nnstreamer-edge.h"
+#include "tensor_query_common.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <ifaddrs.h>
 
 /**
  * @brief Macro for debug mode.
 enum
 {
   PROP_0,
-  PROP_SINK_HOST,
-  PROP_SINK_PORT,
-  PROP_SRC_HOST,
-  PROP_SRC_PORT,
+  PROP_HOST,
+  PROP_PORT,
   PROP_PROTOCOL,
   PROP_OPERATION,
-  PROP_BROKER_HOST,
-  PROP_BROKER_PORT,
   PROP_SILENT,
 };
 
 #define TCP_HIGHEST_PORT        65535
 #define TCP_DEFAULT_HOST        "localhost"
-#define TCP_DEFAULT_SINK_PORT        3000
 #define TCP_DEFAULT_SRC_PORT        3001
 #define DEFAULT_SILENT TRUE
 
@@ -104,22 +107,13 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
   gobject_class->finalize = gst_tensor_query_client_finalize;
 
   /** install property goes here */
-  g_object_class_install_property (gobject_class, PROP_SINK_HOST,
-      g_param_spec_string ("sink-host", "Sink Host",
-          "A tenor query sink host to send the packets to/from",
-          TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_SINK_PORT,
-      g_param_spec_uint ("sink-port", "Sink Port",
-          "The port of tensor query sink to send the packets to/from", 0,
-          TCP_HIGHEST_PORT, TCP_DEFAULT_SINK_PORT,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_SRC_HOST,
-      g_param_spec_string ("src-host", "Source Host",
-          "A tenor query src host to send the packets to/from",
+  g_object_class_install_property (gobject_class, PROP_HOST,
+      g_param_spec_string ("host", "Host",
+          "A tenor query server host to send the packets to/from",
           TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_SRC_PORT,
-      g_param_spec_uint ("src-port", "Source Port",
-          "The port of tensor query src to send the packets to/from", 0,
+  g_object_class_install_property (gobject_class, PROP_PORT,
+      g_param_spec_uint ("port", "Port",
+          "The port of tensor query server to send the packets to/from", 0,
           TCP_HIGHEST_PORT, TCP_DEFAULT_SRC_PORT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_SILENT,
@@ -134,14 +128,6 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
       g_param_spec_string ("operation", "Operation",
           "The main operation of the host.",
           "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BROKER_HOST,
-      g_param_spec_string ("broker-host", "Broker Host",
-          "Broker host address to connect.", DEFAULT_BROKER_HOST,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BROKER_PORT,
-      g_param_spec_uint ("broker-port", "Broker Port",
-          "Broker port to connect.", 0, 65535,
-          DEFAULT_BROKER_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
@@ -158,6 +144,40 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
 }
 
 /**
+ * @brief Get IP address
+ */
+static gchar *
+_get_ip_address (void)
+{
+  struct ifaddrs *addrs, *run;
+  gchar *ret = NULL;
+
+  if (0 != getifaddrs (&addrs))
+    goto done;
+  run = addrs;
+
+  while (run) {
+    if (run->ifa_addr && run->ifa_addr->sa_family == AF_INET) {
+      struct sockaddr_in *pAddr = (struct sockaddr_in *) run->ifa_addr;
+
+      if (NULL != strstr (run->ifa_name, "en") ||
+          NULL != strstr (run->ifa_name, "et")) {
+        g_free (ret);
+        ret = g_strdup (inet_ntoa (pAddr->sin_addr));
+      }
+    }
+    run = run->ifa_next;
+  }
+  freeifaddrs (addrs);
+
+done:
+  if (NULL == ret)
+    ret = g_strdup ("localhost");
+
+  return ret;
+}
+
+/**
  * @brief initialize the new element
  */
 static void
@@ -180,18 +200,16 @@ gst_tensor_query_client_init (GstTensorQueryClient * self)
   /* init properties */
   self->silent = DEFAULT_SILENT;
   self->protocol = DEFAULT_PROTOCOL;
-  self->sink_conn = NULL;
-  self->sink_host = g_strdup (TCP_DEFAULT_HOST);
-  self->sink_port = TCP_DEFAULT_SINK_PORT;
-  self->src_conn = NULL;
-  self->src_host = g_strdup (TCP_DEFAULT_HOST);
-  self->src_port = TCP_DEFAULT_SRC_PORT;
+  self->host = g_strdup (TCP_DEFAULT_HOST);
+  self->port = TCP_DEFAULT_SRC_PORT;
+  self->srv_host = g_strdup (TCP_DEFAULT_HOST);
+  self->srv_port = TCP_DEFAULT_SRC_PORT;
   self->operation = NULL;
-  self->broker_host = g_strdup (DEFAULT_BROKER_HOST);
-  self->broker_port = DEFAULT_BROKER_PORT;
   self->in_caps_str = NULL;
+  self->msg_queue = g_async_queue_new ();
 
   tensor_query_hybrid_init (&self->hybrid_info, NULL, 0, FALSE);
+  nns_edge_create_handle ("TEMP_ID", "TEMP_CLIENT_TOPIC", &self->edge_h);
 }
 
 /**
@@ -201,25 +219,26 @@ static void
 gst_tensor_query_client_finalize (GObject * object)
 {
   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
+  nns_edge_data_h data_h;
 
-  g_free (self->sink_host);
-  self->sink_host = NULL;
-  g_free (self->src_host);
-  self->src_host = NULL;
+  g_free (self->host);
+  self->host = NULL;
+  g_free (self->srv_host);
+  self->srv_host = NULL;
   g_free (self->operation);
   self->operation = NULL;
-  g_free (self->broker_host);
-  self->broker_host = NULL;
   g_free (self->in_caps_str);
   self->in_caps_str = NULL;
-  tensor_query_hybrid_close (&self->hybrid_info);
-  if (self->sink_conn) {
-    nnstreamer_query_close (self->sink_conn);
-    self->sink_conn = NULL;
+
+  while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
+    nns_edge_data_destroy (data_h);
   }
-  if (self->src_conn) {
-    nnstreamer_query_close (self->src_conn);
-    self->src_conn = NULL;
+  g_async_queue_unref (self->msg_queue);
+
+  tensor_query_hybrid_close (&self->hybrid_info);
+  if (self->edge_h) {
+    nns_edge_release_handle (self->edge_h);
+    self->edge_h = NULL;
   }
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
@@ -235,33 +254,22 @@ gst_tensor_query_client_set_property (GObject * object, guint prop_id,
   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
 
   switch (prop_id) {
-    case PROP_SINK_HOST:
+    case PROP_HOST:
       if (!g_value_get_string (value)) {
         nns_logw ("Sink host property cannot be NULL");
         break;
       }
-      g_free (self->sink_host);
-      self->sink_host = g_value_dup_string (value);
-      break;
-    case PROP_SINK_PORT:
-      self->sink_port = g_value_get_uint (value);
-      break;
-    case PROP_SRC_HOST:
-      if (!g_value_get_string (value)) {
-        nns_logw ("Source host property cannot be NULL");
-        break;
-      }
-      g_free (self->src_host);
-      self->src_host = g_value_dup_string (value);
+      g_free (self->host);
+      self->host = g_value_dup_string (value);
       break;
-    case PROP_SRC_PORT:
-      self->src_port = g_value_get_uint (value);
+    case PROP_PORT:
+      self->port = g_value_get_uint (value);
       break;
     case PROP_PROTOCOL:
     {
         /** @todo expand when other protocols are ready */
-      TensorQueryProtocol protocol = g_value_get_enum (value);
-      if (protocol == _TENSOR_QUERY_PROTOCOL_TCP)
+      nns_edge_protocol_e protocol = g_value_get_enum (value);
+      if (protocol == NNS_EDGE_PROTOCOL_TCP)
         self->protocol = protocol;
     }
       break;
@@ -274,17 +282,6 @@ gst_tensor_query_client_set_property (GObject * object, guint prop_id,
       g_free (self->operation);
       self->operation = g_value_dup_string (value);
       break;
-    case PROP_BROKER_HOST:
-      if (!g_value_get_string (value)) {
-        nns_logw ("Broker host property cannot be NULL");
-        break;
-      }
-      g_free (self->broker_host);
-      self->broker_host = g_value_dup_string (value);
-      break;
-    case PROP_BROKER_PORT:
-      self->broker_port = g_value_get_uint (value);
-      break;
     case PROP_SILENT:
       self->silent = g_value_get_boolean (value);
       break;
@@ -304,17 +301,11 @@ gst_tensor_query_client_get_property (GObject * object, guint prop_id,
   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
 
   switch (prop_id) {
-    case PROP_SINK_HOST:
-      g_value_set_string (value, self->sink_host);
-      break;
-    case PROP_SINK_PORT:
-      g_value_set_uint (value, self->sink_port);
+    case PROP_HOST:
+      g_value_set_string (value, self->host);
       break;
-    case PROP_SRC_HOST:
-      g_value_set_string (value, self->src_host);
-      break;
-    case PROP_SRC_PORT:
-      g_value_set_uint (value, self->src_port);
+    case PROP_PORT:
+      g_value_set_uint (value, self->port);
       break;
     case PROP_PROTOCOL:
       g_value_set_enum (value, self->protocol);
@@ -322,11 +313,6 @@ gst_tensor_query_client_get_property (GObject * object, guint prop_id,
     case PROP_OPERATION:
       g_value_set_string (value, self->operation);
       break;
-    case PROP_BROKER_HOST:
-      g_value_set_string (value, self->broker_host);
-      break;
-    case PROP_BROKER_PORT:
-      g_value_set_uint (value, self->broker_port);
       break;
     case PROP_SILENT:
       g_value_set_boolean (value, self->silent);
@@ -372,88 +358,14 @@ gst_tensor_query_client_update_caps (GstTensorQueryClient * self,
 }
 
 /**
- * @brief Connect to query server. (Direct connection)
- */
-static gboolean
-_connect_to_server (GstTensorQueryClient * self)
-{
-  TensorQueryCommandData cmd_buf;
-  query_client_id_t client_id;
-
-  nns_logd ("Server src info: %s:%u", self->src_host, self->src_port);
-  self->src_conn = nnstreamer_query_connect (self->protocol, self->src_host,
-      self->src_port, QUERY_DEFAULT_TIMEOUT_SEC);
-  if (!self->src_conn) {
-    nns_loge ("Failed to connect server source ");
-    return FALSE;
-  }
-
-  /** Receive client ID from server src */
-  if (0 != nnstreamer_query_receive (self->src_conn, &cmd_buf) ||
-      cmd_buf.cmd != _TENSOR_QUERY_CMD_CLIENT_ID) {
-    nns_loge ("Failed to receive client ID.");
-    return FALSE;
-  }
-
-  client_id = cmd_buf.client_id;
-  nnstreamer_query_set_client_id (self->src_conn, client_id);
-
-  cmd_buf.cmd = _TENSOR_QUERY_CMD_REQUEST_INFO;
-  cmd_buf.data.data = (uint8_t *) self->in_caps_str;
-  cmd_buf.data.size = (size_t) strlen (self->in_caps_str) + 1;
-
-  if (0 != nnstreamer_query_send (self->src_conn, &cmd_buf)) {
-    nns_loge ("Failed to send request info cmd buf");
-    return FALSE;
-  }
-
-  if (0 != nnstreamer_query_receive (self->src_conn, &cmd_buf)) {
-    nns_loge ("Failed to receive response from the query server.");
-    return FALSE;
-  }
-
-  if (cmd_buf.cmd == _TENSOR_QUERY_CMD_RESPOND_APPROVE) {
-    if (!gst_tensor_query_client_update_caps (self, (char *) cmd_buf.data.data)) {
-      nns_loge ("Failed to update client source caps.");
-      return FALSE;
-    }
-  } else {
-    /** @todo Retry for info */
-    nns_loge ("Failed to receive approve command.");
-    return FALSE;
-  }
-
-  nns_logd ("Server sink info: %s:%u", self->sink_host, self->sink_port);
-  self->sink_conn =
-      nnstreamer_query_connect (self->protocol, self->sink_host,
-      self->sink_port, QUERY_DEFAULT_TIMEOUT_SEC);
-  if (!self->sink_conn) {
-    nns_loge ("Failed to connect server sink ");
-    return FALSE;
-  }
-
-  nnstreamer_query_set_client_id (self->sink_conn, client_id);
-  cmd_buf.cmd = _TENSOR_QUERY_CMD_CLIENT_ID;
-  cmd_buf.client_id = client_id;
-  if (0 != nnstreamer_query_send (self->sink_conn, &cmd_buf)) {
-    nns_loge ("Failed to send client ID to server sink");
-    return FALSE;
-  }
-  return TRUE;
-}
-
-/**
  * @brief Copy server info.
  */
 static void
 _copy_srv_info (GstTensorQueryClient * self, query_server_info_s * server)
 {
-  g_free (self->src_host);
-  self->src_host = g_strdup (server->src.host);
-  self->src_port = server->src.port;
-  g_free (self->sink_host);
-  self->sink_host = g_strdup (server->sink.host);
-  self->sink_port = server->sink.port;
+  g_free (self->srv_host);
+  self->srv_host = g_strdup (server->src.host);
+  self->srv_port = server->src.port;
 }
 
 /**
@@ -469,17 +381,15 @@ _client_retry_connection (GstTensorQueryClient * self)
   nns_logd ("Retry to connect to available server.");
 
   while ((server = tensor_query_hybrid_get_server_info (&self->hybrid_info))) {
-    nnstreamer_query_close (self->sink_conn);
-    nnstreamer_query_close (self->src_conn);
-    self->sink_conn = NULL;
-    self->src_conn = NULL;
+    nns_edge_disconnect (self->edge_h);
 
     _copy_srv_info (self, server);
     tensor_query_hybrid_free_server_info (server);
 
-    if (_connect_to_server (self)) {
-      nns_logi ("Connected to new server. src: %s:%u, sink: %s:%u",
-          self->src_host, self->src_port, self->sink_host, self->sink_port);
+    if (nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP, self->srv_host,
+            self->srv_port)) {
+      nns_logi ("Connected to new server: %s:%u.", self->srv_host,
+          self->srv_port);
       ret = TRUE;
       break;
     }
@@ -489,6 +399,118 @@ _client_retry_connection (GstTensorQueryClient * self)
 }
 
 /**
+ * @brief Parse caps from received event data.
+ */
+static gchar *
+_nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
+{
+  gchar **strv = g_strsplit (caps_str, "@", -1);
+  gint num = g_strv_length (strv), i;
+  gchar *find_key = NULL;
+  gchar *ret_str = NULL;
+
+  find_key =
+      is_src ==
+      TRUE ? g_strdup ("query_server_src_caps") :
+      g_strdup ("query_server_sink_caps");
+
+  for (i = 1; i < num; i += 2) {
+    if (0 == g_strcmp0 (find_key, strv[i])) {
+      ret_str = g_strdup (strv[i + 1]);
+      break;
+    }
+  }
+
+  g_free (find_key);
+  g_strfreev (strv);
+
+  return ret_str;
+}
+
+/**
+ * @brief nnstreamer-edge event callback.
+ */
+static int
+_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+  nns_edge_event_e event_type;
+  int ret = NNS_EDGE_ERROR_NONE;
+  gchar *caps_str = NULL;
+  GstTensorQueryClient *self = (GstTensorQueryClient *) user_data;
+
+  if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) {
+    nns_loge ("Failed to get event type!");
+    return NNS_EDGE_ERROR_NOT_SUPPORTED;
+  }
+
+  switch (event_type) {
+    case NNS_EDGE_EVENT_CAPABILITY:
+    {
+      GstCaps *server_caps, *client_caps;
+      GstStructure *server_st, *client_st;
+      gboolean result = FALSE;
+      gchar *ret_str;
+
+      nns_edge_event_parse_capability (event_h, &caps_str);
+      ret_str = _nns_edge_parse_caps (caps_str, TRUE);
+      client_caps = gst_caps_from_string ((gchar *) self->in_caps_str);
+      server_caps = gst_caps_from_string (ret_str);
+      g_free (ret_str);
+
+      /** Server framerate may vary. Let's skip comparing the framerate. */
+      gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
+          NULL);
+      gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
+          NULL);
+
+      server_st = gst_caps_get_structure (server_caps, 0);
+      client_st = gst_caps_get_structure (client_caps, 0);
+
+      if (gst_structure_is_tensor_stream (server_st)) {
+        GstTensorsConfig server_config, client_config;
+
+        gst_tensors_config_from_structure (&server_config, server_st);
+        gst_tensors_config_from_structure (&client_config, client_st);
+
+        result = gst_tensors_config_is_equal (&server_config, &client_config);
+      }
+
+      if (result || gst_caps_can_intersect (client_caps, server_caps)) {
+        /** Update client src caps */
+        ret_str = _nns_edge_parse_caps (caps_str, FALSE);
+        if (!gst_tensor_query_client_update_caps (self, ret_str)) {
+          nns_loge ("Failed to update client source caps.");
+          ret = NNS_EDGE_ERROR_UNKNOWN;
+        }
+        g_free (ret_str);
+      } else {
+        /* respond deny with src caps string */
+        nns_loge ("Query caps is not acceptable!");
+        ret = NNS_EDGE_ERROR_UNKNOWN;
+      }
+
+      gst_caps_unref (server_caps);
+      gst_caps_unref (client_caps);
+
+      break;
+    }
+    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+    {
+      nns_edge_data_h data;
+
+      nns_edge_event_parse_new_data (event_h, &data);
+      g_async_queue_push (self->msg_queue, data);
+      break;
+    }
+    default:
+      break;
+  }
+
+  g_free (caps_str);
+  return ret;
+}
+
+/**
  * @brief This function handles sink event.
  */
 static gboolean
@@ -497,6 +519,7 @@ gst_tensor_query_client_sink_event (GstPad * pad,
 {
   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
   gboolean ret = TRUE;
+  gchar *ip;
 
   GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
       GST_EVENT_TYPE_NAME (event), event);
@@ -512,7 +535,7 @@ gst_tensor_query_client_sink_event (GstPad * pad,
         query_server_info_s *server;
 
         tensor_query_hybrid_set_broker (&self->hybrid_info,
-            self->broker_host, self->broker_port);
+            self->host, self->port);
 
         if (!tensor_query_hybrid_subscribe (&self->hybrid_info,
                 self->operation)) {
@@ -530,13 +553,31 @@ gst_tensor_query_client_sink_event (GstPad * pad,
         nns_logi ("Query-hybrid feature is disabled.");
         nns_logi
             ("Specify operation to subscribe to the available broker (e.g., operation=object_detection).");
+        g_free (self->srv_host);
+        self->srv_host = g_strdup (self->host);
+        self->srv_port = self->port;
       }
 
       g_free (self->in_caps_str);
       self->in_caps_str = gst_caps_to_string (caps);
+      nns_edge_set_info (self->edge_h, "CAPS", self->in_caps_str);
+      nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
+
+      ip = _get_ip_address ();
+      nns_edge_set_info (self->edge_h, "IP", ip);
+      nns_edge_set_info (self->edge_h, "PORT", "0");
+      g_free (ip);
+
+      if (0 != nns_edge_start (self->edge_h, false)) {
+        nns_loge
+            ("Failed to start NNStreamer-edge. Please check server IP and port");
+        return FALSE;
+      }
 
-      if (!_connect_to_server (self)) {
-        ret = _client_retry_connection (self);
+      if (0 != nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP,
+              self->srv_host, self->srv_port)) {
+        nns_loge ("Failed to connect to edge server!");
+        return FALSE;
       }
 
       gst_event_unref (event);
@@ -610,24 +651,61 @@ gst_tensor_query_client_chain (GstPad * pad,
   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
   GstBuffer *out_buf = NULL;
   GstFlowReturn res = GST_FLOW_OK;
-
+  nns_edge_data_h data_h;
+  guint i, num_mems, num_data;
+  int ret;
+  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
+  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
   UNUSED (pad);
 
-  if (!tensor_query_send_buffer (self->src_conn, GST_ELEMENT (self), buf)) {
-    nns_logw ("Failed to send buffer to server node, retry connection.");
+  ret = nns_edge_data_create (&data_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_loge ("Failed to create data handle in client chain.");
+    return GST_FLOW_ERROR;
+  }
+
+  num_mems = gst_buffer_n_memory (buf);
+  for (i = 0; i < num_mems; i++) {
+    mem[i] = gst_buffer_peek_memory (buf, i);
+    if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
+      ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
+      num_mems = i;
+      goto done;
+    }
+    nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
+  }
+  if (0 != nns_edge_request (self->edge_h, data_h)) {
+    nns_logw ("Failed to request to server node, retry connection.");
     goto retry;
   }
 
-  out_buf = tensor_query_receive_buffer (self->sink_conn);
-  if (out_buf) {
+  nns_edge_data_destroy (data_h);
+
+  data_h = g_async_queue_try_pop (self->msg_queue);
+  if (data_h) {
+    out_buf = gst_buffer_new ();
+    if (NNS_EDGE_ERROR_NONE != nns_edge_data_get_count (data_h, &num_data)) {
+      nns_loge ("Failed to get the number of memories of the edge data.");
+      res = GST_FLOW_ERROR;
+      goto done;
+    }
+    for (i = 0; i < num_data; i++) {
+      void *data = NULL;
+      size_t data_len;
+      gpointer new_data;
+
+      nns_edge_data_get (data_h, i, &data, &data_len);
+      new_data = _g_memdup (data, data_len);
+      gst_buffer_append_memory (out_buf,
+          gst_memory_new_wrapped (0, new_data, data_len, 0,
+              data_len, new_data, g_free));
+    }
     /* metadata from incoming buffer */
     gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
 
     res = gst_pad_push (self->srcpad, out_buf);
-    goto done;
   }
-
-  nns_logw ("Failed to receive result from server node, retry connection.");
+  goto done;
 
 retry:
   if (!self->operation || !_client_retry_connection (self)) {
@@ -635,6 +713,13 @@ retry:
     res = GST_FLOW_ERROR;
   }
 done:
+  if (data_h) {
+    nns_edge_data_destroy (data_h);
+  }
+
+  for (i = 0; i < num_mems; i++)
+    gst_memory_unmap (mem[i], &map[i]);
+
   gst_buffer_unref (buf);
   return res;
 }
index 0a66038..175fa13 100644 (file)
@@ -16,7 +16,6 @@
 #include <gst/gst.h>
 #include <gio/gio.h>
 #include <tensor_common.h>
-#include "tensor_query_common.h"
 #include "tensor_query_hybrid.h"
 
 G_BEGIN_DECLS
@@ -48,23 +47,18 @@ struct _GstTensorQueryClient
   gboolean silent; /**< True if logging is minimized */
   gchar *in_caps_str;
 
-  TensorQueryProtocol protocol;
-
   /* Query-hybrid feature */
   gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */
   query_hybrid_info_s hybrid_info;
-  gchar *broker_host;
-  guint16 broker_port;
+  gchar *host;
+  guint16 port;
 
-  /* src information (Connect to query server source) */
-  query_connection_handle src_conn;
-  gchar *src_host;
-  guint16 src_port;
+  gchar *srv_host;
+  guint16 srv_port;
 
-  /* sink socket and information (Connect to query server sink) */
-  query_connection_handle sink_conn;
-  gchar *sink_host;
-  guint16 sink_port;
+  nns_edge_protocol_e protocol;
+  nns_edge_h edge_h;
+  GAsyncQueue *msg_queue;
 };
 
 /**
index 5cc4c24..bad1db4 100644 (file)
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
-
-#include <gio/gio.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
 #include <stdint.h>
 #include <stdlib.h>
 #include <string.h>
-#include <errno.h>
-#include <nnstreamer_util.h>
-#include <nnstreamer_log.h>
 #include "tensor_query_common.h"
-#include <pthread.h>
-
-#define TENSOR_QUERY_SERVER_DATA_LEN 128
-#define N_BACKLOG 10
-#define CLIENT_ID_LEN sizeof(query_client_id_t)
 
 #ifndef EREMOTEIO
 #define EREMOTEIO 121           /* This is Linux-specific. Define this for non-Linux systems */
 #endif
 
 /**
- * @brief Query server dependent network data
- */
-typedef struct
-{
-  TensorQueryProtocol protocol;
-  int8_t is_src;
-  char *src_caps_str;
-  char *sink_caps_str;
-  GAsyncQueue *msg_queue;
-  union
-  {
-    struct
-    {
-      GSocketListener *socket_listener;
-      GCancellable *cancellable;
-      GAsyncQueue *conn_queue;
-    };
-    /* check the size of struct is less */
-    uint8_t _dummy[TENSOR_QUERY_SERVER_DATA_LEN];
-  };
-} TensorQueryServerData;
-
-/**
- * @brief Structures for tensor query client data.
- */
-typedef struct
-{
-  TensorQueryProtocol protocol;
-  union
-  {
-    struct
-    {
-      GSocket *client_socket;
-      GCancellable *cancellable;
-    };
-  };
-} TensorQueryClientData;
-
-/**
- * @brief Connection info structure
- */
-typedef struct
-{
-  TensorQueryProtocol protocol;
-  char *host;
-  uint16_t port;
-  uint32_t timeout;
-  query_client_id_t client_id;
-  pthread_t msg_thread;
-  int8_t running;
-
-  /* network info */
-  union
-  {
-    /* TCP */
-    struct
-    {
-      GSocket *socket;
-      GCancellable *cancellable;
-    };
-  };
-} TensorQueryConnection;
-
-/**
- * @brief Structures for tensor query message handling thread data.
- */
-typedef struct
-{
-  TensorQueryServerData *sdata;
-  TensorQueryConnection *conn;
-} TensorQueryMsgThreadData;
-
-static int
-query_tcp_receive (GSocket * socket, uint8_t * data, size_t size,
-    GCancellable * cancellable);
-static gboolean query_tcp_send (GSocket * socket, uint8_t * data, size_t size,
-    GCancellable * cancellable);
-static void
-accept_socket_async_cb (GObject * source, GAsyncResult * result,
-    gpointer user_data);
-
-/**
- * @brief Internal function to check connection.
- */
-static gboolean
-_query_check_connection (query_connection_handle connection)
-{
-  TensorQueryConnection *conn;
-  size_t size;
-  GIOCondition condition;
-
-  conn = (TensorQueryConnection *) connection;
-
-  condition = g_socket_condition_check (conn->socket,
-      G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
-  size = g_socket_get_available_bytes (conn->socket);
-
-  if (condition && size <= 0) {
-    nns_logw ("Socket is not available, possibly EOS.");
-    return FALSE;
-  }
-
-  return TRUE;
-}
-
-/**
- * @brief Internal function to get client id.
- */
-static query_client_id_t
-_query_get_client_id (query_connection_handle connection)
-{
-  TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-  return conn->client_id;
-}
-
-/**
- * @brief Get socket address
- */
-static gboolean
-gst_tensor_query_get_saddr (const char *host, uint16_t port,
-    GCancellable * cancellable, GSocketAddress ** saddr)
-{
-  GError *err = NULL;
-  GInetAddress *addr;
-
-  /* look up name if we need to */
-  addr = g_inet_address_new_from_string (host);
-  if (!addr) {
-    GList *results;
-    GResolver *resolver;
-    resolver = g_resolver_get_default ();
-    results = g_resolver_lookup_by_name (resolver, host, cancellable, &err);
-    if (!results) {
-      if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-        nns_logd ("gst_tensor_query_socket_new: Cancelled name resolval");
-      } else {
-        nns_loge ("Failed to resolve host '%s': %s", host, err->message);
-      }
-      g_clear_error (&err);
-      g_object_unref (resolver);
-      return FALSE;
-    }
-    /** @todo Try with the second address if the first fails */
-    addr = G_INET_ADDRESS (g_object_ref (results->data));
-    g_resolver_free_addresses (results);
-    g_object_unref (resolver);
-  }
-
-  *saddr = g_inet_socket_address_new (addr, port);
-  g_object_unref (addr);
-
-  return TRUE;
-}
-
-/**
- * @brief Create and connect to requested socket.
- */
-static gboolean
-gst_tensor_query_connect (query_connection_handle conn_h)
-{
-  GError *err = NULL;
-  GSocketAddress *saddr = NULL;
-  TensorQueryConnection *conn = (TensorQueryConnection *) conn_h;
-  gboolean ret = FALSE;
-
-  if (!gst_tensor_query_get_saddr (conn->host, conn->port, conn->cancellable,
-          &saddr)) {
-    nns_loge ("Failed to get socket address");
-    return ret;
-  }
-
-  /* create sending client socket */
-  conn->socket =
-      g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
-      G_SOCKET_PROTOCOL_TCP, &err);
-  /** @todo Support UDP protocol */
-
-  if (!conn->socket) {
-    nns_loge ("Failed to create new socket");
-    goto done;
-  }
-
-  /* setting TCP_NODELAY to TRUE in order to avoid packet batching as known as Nagle's algorithm */
-  if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, TRUE, &err)) {
-    nns_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
-    goto done;
-  }
-
-  if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) {
-    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-      nns_logd ("Cancelled connecting");
-    } else {
-      nns_loge ("Failed to connect to host");
-    }
-    goto done;
-  }
-
-  /* now connected to the requested socket */
-  ret = TRUE;
-
-done:
-  g_object_unref (saddr);
-  g_clear_error (&err);
-  return ret;
-}
-
-/**
- * @brief Set timeout.
- */
-void
-nnstreamer_query_set_timeout (query_connection_handle connection,
-    uint32_t timeout)
-{
-  TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-
-  if (conn->timeout != timeout) {
-    conn->timeout = timeout;
-
-    switch (conn->protocol) {
-      case _TENSOR_QUERY_PROTOCOL_TCP:
-        g_socket_set_timeout (conn->socket, timeout);
-        break;
-      default:
-        /* NYI */
-        nns_loge ("Invalid protocol");
-        break;
-    }
-  }
-}
-
-/**
- * @brief Set client ID.
- */
-void
-nnstreamer_query_set_client_id (query_connection_handle connection,
-    query_client_id_t id)
-{
-  TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-  conn->client_id = id;
-}
-
-/**
- * @brief Connect to the specified address.
- */
-query_connection_handle
-nnstreamer_query_connect (TensorQueryProtocol protocol, const char *ip,
-    uint16_t port, uint32_t timeout)
-{
-  TensorQueryConnection *conn = g_new0 (TensorQueryConnection, 1);
-
-  conn->protocol = protocol;
-  conn->host = g_strdup (ip);
-  conn->port = port;
-  conn->timeout = timeout;
-
-  switch (protocol) {
-    case _TENSOR_QUERY_PROTOCOL_TCP:
-    {
-      conn->cancellable = g_cancellable_new ();
-      if (!gst_tensor_query_connect (conn)) {
-        nns_loge ("Failed to create new socket");
-        nnstreamer_query_close (conn);
-        return NULL;
-      }
-      break;
-    }
-    default:
-      nns_loge ("Unsupported protocol.");
-      nnstreamer_query_close (conn);
-      return NULL;
-  }
-
-  nnstreamer_query_set_timeout (conn, timeout);
-  return conn;
-}
-
-/**
- * @brief receive command from connected device.
- * @return 0 if OK, negative value if error
- * @note The socket operates in two modes: blocking and non-blocking.
- *       In non-blocking mode, if there is no data available, it is immediately returned.
- */
-int
-nnstreamer_query_receive (query_connection_handle connection,
-    TensorQueryCommandData * data)
-{
-  TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-
-  if (!conn) {
-    nns_loge ("Invalid connection data");
-    return -EINVAL;
-  }
-
-  switch (conn->protocol) {
-    case _TENSOR_QUERY_PROTOCOL_TCP:
-    {
-      TensorQueryCommand cmd;
-
-      if (query_tcp_receive (conn->socket, (uint8_t *) & cmd, sizeof (cmd),
-              conn->cancellable) < 0) {
-        nns_loge ("Failed to receive command from socket");
-        return -EREMOTEIO;
-      }
-
-      nns_logd ("Received command: %d", cmd);
-      data->cmd = cmd;
-
-      if (cmd == _TENSOR_QUERY_CMD_TRANSFER_DATA ||
-          cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) {
-        /* receive size */
-        if (query_tcp_receive (conn->socket, (uint8_t *) & data->data.size,
-                sizeof (data->data.size), conn->cancellable) < 0) {
-          nns_loge ("Failed to receive data size from socket");
-          return -EREMOTEIO;
-        }
-
-        if (cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) {
-          data->data.data = (uint8_t *) g_malloc0 (data->data.size);
-        }
-
-        /* receive data */
-        if (query_tcp_receive (conn->socket, (uint8_t *) data->data.data,
-                data->data.size, conn->cancellable) < 0) {
-          nns_loge ("Failed to receive data from socket");
-          return -EREMOTEIO;
-        }
-        return 0;
-      } else if (data->cmd == _TENSOR_QUERY_CMD_CLIENT_ID) {
-        /* receive client id */
-        if (query_tcp_receive (conn->socket, (uint8_t *) & data->client_id,
-                CLIENT_ID_LEN, conn->cancellable) < 0) {
-          nns_logd ("Failed to receive client id from socket");
-          return -EREMOTEIO;
-        }
-      } else {
-        /* receive data_info */
-        if (query_tcp_receive (conn->socket, (uint8_t *) & data->data_info,
-                sizeof (TensorQueryDataInfo), conn->cancellable) < 0) {
-          nns_logd ("Failed to receive data info from socket");
-          return -EREMOTEIO;
-        }
-      }
-    }
-      break;
-    default:
-      /* NYI */
-      return -EPROTONOSUPPORT;
-  }
-  return 0;
-}
-
-/**
- * @brief send command to connected device.
- * @return 0 if OK, negative value if error
- */
-int
-nnstreamer_query_send (query_connection_handle connection,
-    TensorQueryCommandData * data)
-{
-  TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-
-  if (!data) {
-    nns_loge ("Sending data is NULL");
-    return -EINVAL;
-  }
-  if (!conn) {
-    nns_loge ("Invalid connection data");
-    return -EINVAL;
-  }
-
-  switch (conn->protocol) {
-    case _TENSOR_QUERY_PROTOCOL_TCP:
-      if (!query_tcp_send (conn->socket, (uint8_t *) & data->cmd,
-              sizeof (TensorQueryCommand), conn->cancellable)) {
-        nns_logd ("Failed to send to socket");
-        return -EREMOTEIO;
-      }
-      if (data->cmd == _TENSOR_QUERY_CMD_TRANSFER_DATA ||
-          data->cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) {
-        /* send size */
-        if (!query_tcp_send (conn->socket, (uint8_t *) & data->data.size,
-                sizeof (data->data.size), conn->cancellable)) {
-          nns_logd ("Failed to send size to socket");
-          return -EREMOTEIO;
-        }
-        /* send data */
-        if (!query_tcp_send (conn->socket, (uint8_t *) data->data.data,
-                data->data.size, conn->cancellable)) {
-          nns_logd ("Failed to send data to socket");
-          return -EREMOTEIO;
-        }
-      } else if (data->cmd == _TENSOR_QUERY_CMD_CLIENT_ID) {
-        /* send client id */
-        if (!query_tcp_send (conn->socket, (uint8_t *) & data->client_id,
-                CLIENT_ID_LEN, conn->cancellable)) {
-          nns_logd ("Failed to send client id to socket");
-          return -EREMOTEIO;
-        }
-      } else {
-        /* send data_info */
-        if (!query_tcp_send (conn->socket, (uint8_t *) & data->data_info,
-                sizeof (TensorQueryDataInfo), conn->cancellable)) {
-          nns_logd ("Failed to send data_info to socket");
-          return -EREMOTEIO;
-        }
-      }
-      break;
-    default:
-      /* NYI */
-      return -EPROTONOSUPPORT;
-  }
-  return 0;
-}
-
-/**
- * @brief free connection
- * @return 0 if OK, negative value if error
- */
-int
-nnstreamer_query_close (query_connection_handle connection)
-{
-  TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-  GError *err = NULL;
-  int ret = 0;
-
-  if (!conn) {
-    nns_loge ("Invalid connection data");
-    return -EINVAL;
-  }
-  switch (conn->protocol) {
-    case _TENSOR_QUERY_PROTOCOL_TCP:
-    {
-      if (conn->socket) {
-        if (!g_socket_close (conn->socket, &err)) {
-          nns_loge ("Failed to close socket: %s", err->message);
-          g_clear_error (&err);
-        }
-        g_object_unref (conn->socket);
-        conn->socket = NULL;
-      }
-
-      if (conn->cancellable) {
-        g_object_unref (conn->cancellable);
-        conn->cancellable = NULL;
-      }
-      break;
-    }
-    default:
-      /* NYI */
-      ret = -EPROTONOSUPPORT;
-      break;
-  }
-
-  g_free (conn->host);
-  conn->host = NULL;
-  g_free (conn);
-  return ret;
-}
-
-/**
- * @brief return initialized server handle
- * @return query_server_handle, NULL if error
- */
-query_server_handle
-nnstreamer_query_server_data_new (void)
-{
-  TensorQueryServerData *sdata = g_try_new0 (TensorQueryServerData, 1);
-  if (!sdata) {
-    nns_loge ("Failed to allocate server data");
-    return NULL;
-  }
-
-  return (query_server_handle) sdata;
-}
-
-/**
- * @brief free server handle
- */
-void
-nnstreamer_query_server_data_free (query_server_handle server_data)
-{
-  TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
-  if (!sdata)
-    return;
-
-  switch (sdata->protocol) {
-    case _TENSOR_QUERY_PROTOCOL_TCP:
-    {
-      TensorQueryConnection *conn_remained;
-      GstBuffer *buf_remained;
-
-      if (sdata->conn_queue) {
-        while ((conn_remained = g_async_queue_try_pop (sdata->conn_queue))) {
-          conn_remained->running = 0;
-          pthread_join (conn_remained->msg_thread, NULL);
-          nnstreamer_query_close (conn_remained);
-        }
-        g_async_queue_unref (sdata->conn_queue);
-        sdata->conn_queue = NULL;
-      }
-
-      if (sdata->is_src && sdata->msg_queue) {
-        while ((buf_remained = g_async_queue_try_pop (sdata->msg_queue))) {
-          gst_buffer_unref (buf_remained);
-        }
-        g_async_queue_unref (sdata->msg_queue);
-        sdata->msg_queue = NULL;
-      }
-
-      if (sdata->socket_listener) {
-        g_socket_listener_close (sdata->socket_listener);
-        g_object_unref (sdata->socket_listener);
-        sdata->socket_listener = NULL;
-      }
-
-      if (sdata->cancellable) {
-        g_object_unref (sdata->cancellable);
-        sdata->cancellable = NULL;
-      }
-      break;
-    }
-    default:
-      /* NYI */
-      nns_loge ("Invalid protocol");
-      break;
-  }
-  g_free (sdata->src_caps_str);
-  g_free (sdata->sink_caps_str);
-  g_free (sdata);
-}
-
-/**
- * @brief set server handle params and setup server
- * @return 0 if OK, negative value if error
- */
-int
-nnstreamer_query_server_init (query_server_handle server_data,
-    TensorQueryProtocol protocol, const char *host, uint16_t port,
-    int8_t is_src)
-{
-  TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
-  GSocketAddress *saddr = NULL;
-  GError *err = NULL;
-  int ret = 0;
-
-  if (!sdata)
-    return -EINVAL;
-
-  sdata->protocol = protocol;
-  sdata->is_src = is_src;
-
-  switch (protocol) {
-    case _TENSOR_QUERY_PROTOCOL_TCP:
-      sdata->cancellable = g_cancellable_new ();
-      if (!gst_tensor_query_get_saddr (host, port, sdata->cancellable, &saddr)) {
-        nns_loge ("Failed to get socket address");
-        ret = -EADDRNOTAVAIL;
-        goto error;
-      }
-
-      sdata->socket_listener = g_socket_listener_new ();
-      g_socket_listener_set_backlog (sdata->socket_listener, N_BACKLOG);
-
-      if (!g_socket_listener_add_address (sdata->socket_listener, saddr,
-              G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
-        nns_loge ("Failed to add address: %s", err->message);
-        g_clear_error (&err);
-        ret = -EADDRNOTAVAIL;
-        goto error;
-      }
-
-      sdata->conn_queue = g_async_queue_new ();
-      if (sdata->is_src)
-        sdata->msg_queue = g_async_queue_new ();
-
-      g_socket_listener_accept_socket_async (sdata->socket_listener,
-          sdata->cancellable, (GAsyncReadyCallback) accept_socket_async_cb,
-          sdata);
-      break;
-    default:
-      /* NYI */
-      nns_loge ("Invalid protocol");
-      ret = -EPROTONOSUPPORT;
-      break;
-  }
-
-error:
-  if (saddr)
-    g_object_unref (saddr);
-
-  return ret;
-}
-
-/**
- * @brief accept connection from remote
- * @return query_connection_handle including connection data
- */
-query_connection_handle
-nnstreamer_query_server_accept (query_server_handle server_data,
-    query_client_id_t client_id)
-{
-  TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
-  TensorQueryConnection *conn;
-  gint total, checked;
-
-  if (!sdata)
-    return NULL;
-
-  switch (sdata->protocol) {
-    case _TENSOR_QUERY_PROTOCOL_TCP:
-    {
-      total = g_async_queue_length (sdata->conn_queue);
-      checked = 0;
-
-      while (checked < total) {
-        conn = g_async_queue_pop (sdata->conn_queue);
-        checked++;
-
-        if (!_query_check_connection (conn)) {
-          nnstreamer_query_close (conn);
-          continue;
-        }
-
-        g_async_queue_push (sdata->conn_queue, conn);
-
-        if (client_id == _query_get_client_id (conn))
-          return conn;
-      }
-      break;
-    }
-    default:
-      /* NYI */
-      nns_loge ("Invalid protocol");
-      break;
-  }
-
-  return NULL;
-}
-
-/**
- * @brief [TCP] receive data for tcp server
- * @return 0 if OK, negative value if error
- */
-static int
-query_tcp_receive (GSocket * socket, uint8_t * data, size_t size,
-    GCancellable * cancellable)
-{
-  size_t bytes_received = 0;
-  ssize_t rret;
-  GError *err = NULL;
-
-  while (bytes_received < size) {
-    rret = g_socket_receive (socket, (char *) data + bytes_received,
-        size - bytes_received, cancellable, &err);
-
-    if (rret == 0) {
-      nns_logi ("Connection closed");
-      return -EREMOTEIO;
-    }
-
-    if (rret < 0) {
-      nns_logi ("Failed to read from socket: %s", err->message);
-      g_clear_error (&err);
-      return -EREMOTEIO;
-    }
-    bytes_received += rret;
-  }
-  return 0;
-}
-
-/**
- * @brief [TCP] send data for tcp server
- */
-static gboolean
-query_tcp_send (GSocket * socket, uint8_t * data, size_t size,
-    GCancellable * cancellable)
-{
-  size_t bytes_sent = 0;
-  ssize_t rret;
-  GError *err = NULL;
-  while (bytes_sent < size) {
-    rret = g_socket_send (socket, (char *) data + bytes_sent,
-        size - bytes_sent, cancellable, &err);
-    if (rret == 0) {
-      nns_logi ("Connection closed");
-      return FALSE;
-    }
-    if (rret < 0) {
-      nns_loge ("Error while sending data %s", err->message);
-      g_clear_error (&err);
-      return FALSE;
-    }
-    bytes_sent += rret;
-  }
-  return TRUE;
-}
-
-/**
- * @brief [TCP] Receive buffer from the client
- * @param[in] conn connection info
- */
-static void *
-_message_handler (void *thread_data)
-{
-  TensorQueryMsgThreadData *_thread_data =
-      (TensorQueryMsgThreadData *) thread_data;
-  TensorQueryConnection *_conn = _thread_data->conn;
-  TensorQueryServerData *_sdata = _thread_data->sdata;
-  TensorQueryCommandData cmd_data_receive;
-  TensorQueryCommandData cmd_data_send;
-  GstBuffer *outbuf = NULL;
-  GstMetaQuery *meta_query;
-
-  cmd_data_send.cmd = _TENSOR_QUERY_CMD_CLIENT_ID;
-  cmd_data_send.client_id = g_get_monotonic_time ();
-
-  if (0 != nnstreamer_query_send (_conn, &cmd_data_send)) {
-    nns_loge ("Failed to send client id to client");
-    goto done;
-  }
-  nnstreamer_query_set_client_id (_conn, cmd_data_send.client_id);
-
-  if (0 != nnstreamer_query_receive (_conn, &cmd_data_receive)) {
-    nns_logi ("Failed to receive cmd");
-    goto done;
-  }
-
-  if (cmd_data_receive.cmd == _TENSOR_QUERY_CMD_REQUEST_INFO) {
-    GstCaps *server_caps, *client_caps;
-    GstStructure *server_st, *client_st;
-    gboolean result = FALSE;
-
-    server_caps = gst_caps_from_string (_sdata->src_caps_str);
-    client_caps = gst_caps_from_string ((char *) cmd_data_receive.data.data);
-    /** Server framerate may vary. Let's skip comparing the framerate. */
-    gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
-        NULL);
-    gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
-        NULL);
-
-    server_st = gst_caps_get_structure (server_caps, 0);
-    client_st = gst_caps_get_structure (client_caps, 0);
-
-    if (gst_structure_is_tensor_stream (server_st)) {
-      GstTensorsConfig server_config, client_config;
-
-      gst_tensors_config_from_structure (&server_config, server_st);
-      gst_tensors_config_from_structure (&client_config, client_st);
-
-      result = gst_tensors_config_is_equal (&server_config, &client_config);
-    }
-
-    if (result || gst_caps_can_intersect (client_caps, server_caps)) {
-      cmd_data_send.cmd = _TENSOR_QUERY_CMD_RESPOND_APPROVE;
-      cmd_data_send.data.data = (uint8_t *) _sdata->sink_caps_str;
-      cmd_data_send.data.size = (size_t) strlen (_sdata->sink_caps_str) + 1;
-    } else {
-      /* respond deny with src caps string */
-      nns_loge ("Query caps is not acceptable!");
-      nns_loge ("Query client sink caps: %s", cmd_data_receive.data.data);
-      nns_loge ("Query server src caps: %s", _sdata->src_caps_str);
-
-      cmd_data_send.cmd = _TENSOR_QUERY_CMD_RESPOND_DENY;
-      cmd_data_send.data.data = (uint8_t *) _sdata->src_caps_str;
-      cmd_data_send.data.size = (size_t) strlen (_sdata->src_caps_str) + 1;
-    }
-
-    g_free (cmd_data_receive.data.data);
-    cmd_data_receive.data.data = NULL;
-
-    gst_caps_unref (server_caps);
-    gst_caps_unref (client_caps);
-
-    if (nnstreamer_query_send (_conn, &cmd_data_send) != 0) {
-      nns_logi ("Failed to send respond");
-      goto done;
-    }
-  }
-
-  while (_conn->running) {
-    if (!_query_check_connection (_conn))
-      break;
-
-    outbuf = tensor_query_receive_buffer (_conn);
-    if (outbuf) {
-      meta_query = gst_buffer_add_meta_query (outbuf);
-      if (meta_query) {
-        meta_query->client_id = _query_get_client_id (_conn);
-      }
-
-      g_async_queue_push (_sdata->msg_queue, outbuf);
-    }
-  }
-
-done:
-  g_free (thread_data);
-  _conn->running = 0;
-  return NULL;
-}
-
-/**
- * @brief [TCP] Callback for socket listener that pushes socket to the queue
- */
-static void
-accept_socket_async_cb (GObject * source, GAsyncResult * result,
-    gpointer user_data)
-{
-  GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
-  GSocket *socket = NULL;
-  GSocketAddress *saddr = NULL;
-  GError *err = NULL;
-  TensorQueryServerData *sdata = user_data;
-  TensorQueryConnection *conn = NULL;
-  TensorQueryCommandData cmd_data;
-  gboolean done = FALSE;
-
-  socket =
-      g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
-      &err);
-  if (!socket) {
-    nns_loge ("Failed to get socket: %s", err->message);
-    g_clear_error (&err);
-    goto error;
-  }
-  g_socket_set_timeout (socket, QUERY_DEFAULT_TIMEOUT_SEC);
-  /* create socket with connection */
-  conn = g_try_new0 (TensorQueryConnection, 1);
-  if (!conn) {
-    nns_loge ("Failed to allocate connection");
-    goto error;
-  }
-
-  conn->socket = socket;
-  conn->cancellable = g_cancellable_new ();
-
-  /* setting TCP_NODELAY to TRUE in order to avoid packet batching as known as Nagle's algorithm */
-  if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, TRUE, &err)) {
-    nns_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
-    g_clear_error (&err);
-    goto error;
-  }
-
-  saddr = g_socket_get_remote_address (socket, &err);
-  if (!saddr) {
-    nns_loge ("Failed to get socket address: %s", err->message);
-    g_clear_error (&err);
-    goto error;
-  }
-  conn->protocol = (g_socket_get_protocol (socket) == G_SOCKET_PROTOCOL_TCP) ?
-      _TENSOR_QUERY_PROTOCOL_TCP : _TENSOR_QUERY_PROTOCOL_END;
-  conn->host = g_inet_address_to_string (g_inet_socket_address_get_address (
-          (GInetSocketAddress *) saddr));
-  conn->port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr);
-  g_object_unref (saddr);
-  nns_logi ("New client connected from %s:%u", conn->host, conn->port);
-
-  /** Generate and send client_id to client */
-  if (sdata->is_src) {
-    TensorQueryMsgThreadData *thread_data = NULL;
-    pthread_attr_t attr;
-    int tid;
-
-    thread_data = g_try_new0 (TensorQueryMsgThreadData, 1);
-    if (!thread_data) {
-      nns_loge ("Failed to allocate query thread data.");
-      goto error;
-    }
-    conn->running = 1;
-    thread_data->sdata = sdata;
-    thread_data->conn = conn;
-
-    pthread_attr_init (&attr);
-    pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
-    tid = pthread_create (&conn->msg_thread, &attr, _message_handler,
-        thread_data);
-    pthread_attr_destroy (&attr);
-
-    if (tid < 0) {
-      nns_loge ("Failed to create message handler thread.");
-      nnstreamer_query_close (conn);
-      g_free (thread_data);
-      return;
-    }
-  } else { /** server sink */
-    if (0 != nnstreamer_query_receive (conn, &cmd_data)) {
-      nns_loge ("Failed to receive command.");
-      goto error;
-    }
-    if (cmd_data.cmd == _TENSOR_QUERY_CMD_CLIENT_ID) {
-      nns_logd ("Connected client id: %ld", (long) cmd_data.client_id);
-      nnstreamer_query_set_client_id (conn, cmd_data.client_id);
-    }
-  }
-
-  done = TRUE;
-  g_async_queue_push (sdata->conn_queue, conn);
-
-error:
-  if (!done) {
-    nnstreamer_query_close (conn);
-  }
-
-  g_socket_listener_accept_socket_async (socket_listener,
-      sdata->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, sdata);
-}
-
-/**
- * @brief set server source and sink caps string.
- */
-void
-nnstreamer_query_server_data_set_caps_str (query_server_handle server_data,
-    const char *src_caps_str, const char *sink_caps_str)
-{
-  TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
-  g_free (sdata->src_caps_str);
-  g_free (sdata->sink_caps_str);
-  sdata->src_caps_str = g_strdup (src_caps_str);
-  sdata->sink_caps_str = g_strdup (sink_caps_str);
-}
-
-/**
- * @brief Get buffer from message queue.
- */
-GstBuffer *
-nnstreamer_query_server_get_buffer (query_server_handle server_data)
-{
-  TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
-
-  return GST_BUFFER (g_async_queue_pop (sdata->msg_queue));
-}
-
-/**
- * @brief Send gst-buffer to destination node.
- * @return True if all data in gst-buffer is successfully sent. False if failed to transfer data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-gboolean
-tensor_query_send_buffer (query_connection_handle connection,
-    GstElement * element, GstBuffer * buffer)
-{
-  TensorQueryCommandData cmd_data = { 0 };
-  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
-  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
-  gboolean done = FALSE;
-  guint i, num_mems;
-
-  num_mems = gst_buffer_n_memory (buffer);
-
-  /* start */
-  cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_START;
-  cmd_data.data_info.base_time = gst_element_get_base_time (element);
-  cmd_data.data_info.duration = GST_BUFFER_DURATION (buffer);
-  cmd_data.data_info.dts = GST_BUFFER_DTS (buffer);
-  cmd_data.data_info.pts = GST_BUFFER_PTS (buffer);
-  cmd_data.data_info.num_mems = num_mems;
-
-  /* memory chunks in gst-buffer */
-  for (i = 0; i < num_mems; i++) {
-    mem[i] = gst_buffer_peek_memory (buffer, i);
-    if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
-      ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
-      num_mems = i;
-      goto error;
-    }
-
-    cmd_data.data_info.mem_sizes[i] = map[i].size;
-  }
-
-  if (nnstreamer_query_send (connection, &cmd_data) != 0) {
-    nns_loge ("Failed to send start command.");
-    goto error;
-  }
-
-  /* transfer data */
-  cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_DATA;
-  for (i = 0; i < num_mems; i++) {
-    cmd_data.data.size = map[i].size;
-    cmd_data.data.data = map[i].data;
-
-    if (nnstreamer_query_send (connection, &cmd_data) != 0) {
-      nns_loge ("Failed to send %uth data buffer", i);
-      goto error;
-    }
-  }
-
-  /* done */
-  cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_END;
-  if (nnstreamer_query_send (connection, &cmd_data) != 0) {
-    nns_loge ("Failed to send end command.");
-    goto error;
-  }
-
-  done = TRUE;
-
-error:
-  for (i = 0; i < num_mems; i++)
-    gst_memory_unmap (mem[i], &map[i]);
-
-  return done;
-}
-
-/**
- * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer.
- * @return Newly generated gst-buffer. Null if failed to receive data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-GstBuffer *
-tensor_query_receive_buffer (query_connection_handle connection)
-{
-  TensorQueryCommandData cmd_data = { 0 };
-  GstBuffer *buffer = NULL;
-  gboolean done = FALSE;
-  gpointer data;
-  gsize len;
-  guint i, num_mems;
-
-  if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
-    nns_loge ("Failed to receive start command.");
-    goto error;
-  }
-
-  if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_START) {
-    nns_loge ("Invalid command %d, cannot start data transfer.", cmd_data.cmd);
-    goto error;
-  }
-
-  buffer = gst_buffer_new ();
-
-  num_mems = cmd_data.data_info.num_mems;
-  for (i = 0; i < num_mems; i++) {
-    len = cmd_data.data_info.mem_sizes[i];
-    data = g_malloc0 (len);
-
-    cmd_data.data.data = data;
-    cmd_data.data.size = len;
-
-    if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
-      nns_loge ("Failed to receive %uth data.", i);
-      g_free (data);
-      goto error;
-    }
-
-    gst_buffer_append_memory (buffer,
-        gst_memory_new_wrapped (0, data, len, 0, len, data, g_free));
-  }
-
-  /* done */
-  if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
-    nns_loge ("Failed to receive end command.");
-    goto error;
-  }
-
-  if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
-    nns_loge ("Invalid command %d, failed to transfer data.", cmd_data.cmd);
-    goto error;
-  }
-
-  done = TRUE;
-
-error:
-  if (!done) {
-    if (buffer) {
-      gst_buffer_unref (buffer);
-      buffer = NULL;
-    }
-  }
-
-  return buffer;
-}
-
-/**
  * @brief register GEnumValue array for query protocol property handling
  */
 GType
@@ -1115,11 +32,11 @@ gst_tensor_query_protocol_get_type (void)
   static GType protocol = 0;
   if (protocol == 0) {
     static GEnumValue protocols[] = {
-      {_TENSOR_QUERY_PROTOCOL_TCP, "TCP",
+      {NNS_EDGE_PROTOCOL_TCP, "TCP",
           "Raw TCP protocol. Directly sending stream frames via TCP connections."},
-      {_TENSOR_QUERY_PROTOCOL_UDP, "UDP",
+      {NNS_EDGE_PROTOCOL_UDP, "UDP",
           "Raw UDP protocol. Directly sending stream frames via UDP connections."},
-      {_TENSOR_QUERY_PROTOCOL_MQTT, "MQTT", "Connect with MQTT brokers."},
+      {NNS_EDGE_PROTOCOL_MQTT, "MQTT", "Connect with MQTT brokers."},
       {0, NULL, NULL},
     };
     protocol = g_enum_register_static ("tensor_query_protocol", protocols);
index 282e2d7..ccff038 100644 (file)
 #include "tensor_typedef.h"
 #include "tensor_common.h"
 #include "tensor_meta.h"
+#include "nnstreamer-edge.h"
 
 #ifdef __cplusplus
 extern "C" {
 #endif /* __cplusplus */
 
-typedef void * query_connection_handle;
-typedef void * query_server_handle;
-
 /**
  * @brief Default timeout, in seconds.
  */
@@ -32,173 +30,17 @@ typedef void * query_server_handle;
 /**
  * @brief protocol options for tensor query.
  */
-typedef enum
-{
-  _TENSOR_QUERY_PROTOCOL_TCP = 0,
-  _TENSOR_QUERY_PROTOCOL_UDP = 1,
-  _TENSOR_QUERY_PROTOCOL_MQTT = 2,
-  _TENSOR_QUERY_PROTOCOL_END
-} TensorQueryProtocol;
 
 #define DEFAULT_HOST "localhost"
-#define DEFAULT_PROTOCOL (_TENSOR_QUERY_PROTOCOL_TCP)
+#define DEFAULT_PROTOCOL (NNS_EDGE_PROTOCOL_TCP)
 #define GST_TYPE_QUERY_PROTOCOL (gst_tensor_query_protocol_get_type ())
+
 /**
  * @brief register GEnumValue array for query protocol property handling
  */
 GType
 gst_tensor_query_protocol_get_type (void);
 
-/**
- * @brief Structures for tensor query commands.
- */
-typedef enum
-{
-  _TENSOR_QUERY_CMD_REQUEST_INFO = 0,
-  _TENSOR_QUERY_CMD_RESPOND_APPROVE = 1,
-  _TENSOR_QUERY_CMD_RESPOND_DENY = 2,
-  _TENSOR_QUERY_CMD_TRANSFER_START = 3,
-  _TENSOR_QUERY_CMD_TRANSFER_DATA = 4,
-  _TENSOR_QUERY_CMD_TRANSFER_END = 5,
-  _TENSOR_QUERY_CMD_CLIENT_ID = 6,
-  _TENSOR_QUERY_CMD_END
-} TensorQueryCommand;
-
-
-/**
- * @brief Structures for tensor query data info.
- */
-typedef struct
-{
-  int64_t base_time;
-  int64_t sent_time;
-  uint64_t duration;
-  uint64_t dts;
-  uint64_t pts;
-  uint32_t num_mems;
-  uint64_t mem_sizes[NNS_TENSOR_SIZE_LIMIT];
-} TensorQueryDataInfo;
-
-typedef struct
-{
-  uint8_t *data;
-  size_t size;
-} TensorQueryData;
-
-/**
- * @brief Structures for tensor query command buffers.
- */
-typedef struct
-{
-  TensorQueryCommand cmd;
-  union
-  {
-    TensorQueryDataInfo data_info; /** _TENSOR_QUERY_CMD_REQUEST_INFO */
-    TensorQueryData data;          /** _TENSOR_QUERY_CMD_TRANSFER_DATA */
-    query_client_id_t client_id;   /** _TENSOR_QUERY_CMD_CLIENT_ID */
-  };
-} TensorQueryCommandData;
-
-/**
- * @brief connect to the specified address.
- * @return 0 if OK, negative value if error
- */
-extern query_connection_handle
-nnstreamer_query_connect (TensorQueryProtocol protocol, const char *ip, uint16_t port, uint32_t timeout);
-
-/**
- * @brief Set timeout.
- * @param timeout the timeout value, in seconds. 0 for none.
- */
-extern void
-nnstreamer_query_set_timeout (query_connection_handle connection, uint32_t timeout);
-
-/**
- * @brief Set client ID.
- */
-extern void
-nnstreamer_query_set_client_id (query_connection_handle connection, query_client_id_t id);
-
-/**
- * @brief send command to connected device.
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_send (query_connection_handle connection, TensorQueryCommandData *data);
-
-/**
- * @brief receive command from connected device.
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_receive (query_connection_handle connection, TensorQueryCommandData *data);
-
-/**
- * @brief close connection with corresponding id.
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_close (query_connection_handle connection);
-
-/* server */
-/**
- * @brief accept connection from remote
- * @return query_connection_handle including connection data
- */
-extern query_connection_handle
-nnstreamer_query_server_accept (query_server_handle server_data, query_client_id_t client_id);
-
-/**
- * @brief return initialized server handle
- * @return query_server_handle, NULL if error
- */
-extern query_server_handle
-nnstreamer_query_server_data_new (void);
-
-/**
- * @brief free server handle
- */
-extern void
-nnstreamer_query_server_data_free (query_server_handle server_data);
-
-/**
- * @brief set server handle params and setup server
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_server_init (query_server_handle server_data,
-    TensorQueryProtocol protocol, const char *host, uint16_t port, int8_t is_src);
-
-/**
- * @brief set server source and sink tensors config.
- */
-extern void
-nnstreamer_query_server_data_set_caps_str (query_server_handle server_data,
-    const char * src_caps_str, const char * sink_caps_str);
-
-/**
- * @brief Get buffer from message queue.
- */
-extern GstBuffer *
-nnstreamer_query_server_get_buffer (query_server_handle server_data);
-
-/**
- * @brief Send gst-buffer to destination node.
- * @return True if all data in gst-buffer is successfully sent. False if failed to transfer data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-extern gboolean
-tensor_query_send_buffer (query_connection_handle connection,
-    GstElement * element, GstBuffer * buffer);
-
-/**
- * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer.
- * @return Newly generated gst-buffer. Null if failed to receive data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-extern GstBuffer *
-tensor_query_receive_buffer (query_connection_handle connection);
-
 #ifdef __cplusplus
 }
 #endif /* __cplusplus */
index a0eb46b..2d55bcd 100644 (file)
@@ -15,9 +15,9 @@
 #endif
 
 #include "tensor_query_server.h"
-#include <nnstreamer_util.h>
 #include <tensor_typedef.h>
 #include <tensor_common.h>
+#include "nnstreamer-edge.h"
 
 /**
  * @brief mutex for tensor-query server table.
@@ -33,35 +33,35 @@ static void init_queryserver (void) __attribute__((constructor));
 static void fini_queryserver (void) __attribute__((destructor));
 
 /**
- * @brief Getter to get nth GstTensorQueryServerInfo.
+ * @brief Get nnstreamer edge server handle.
  */
-query_server_info_handle
-gst_tensor_query_server_get_data (guint id)
+edge_server_handle
+gst_tensor_query_server_get_handle (char *id)
 {
-  gpointer p;
+  edge_server_handle p;
 
   G_LOCK (query_server_table);
-  p = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
+  p = g_hash_table_lookup (_qs_table, id);
   G_UNLOCK (query_server_table);
 
   return p;
 }
 
 /**
- * @brief Add GstTensorQueryServerInfo into hash table.
+ * @brief Add nnstreamer edge server handle into hash table.
  */
-query_server_info_handle
-gst_tensor_query_server_add_data (guint id)
+edge_server_handle
+gst_tensor_query_server_add_data (char *id)
 {
-  GstTensorQueryServerInfo *data = NULL;
+  GstTensorQueryServer *data = NULL;
 
-  data = (GstTensorQueryServerInfo *) gst_tensor_query_server_get_data (id);
+  data = gst_tensor_query_server_get_handle (id);
 
   if (NULL != data) {
     return data;
   }
 
-  data = g_try_new0 (GstTensorQueryServerInfo, 1);
+  data = g_try_new0 (GstTensorQueryServer, 1);
   if (NULL == data) {
     GST_ERROR ("Failed to allocate memory for tensor query server data.");
     return NULL;
@@ -70,50 +70,69 @@ gst_tensor_query_server_add_data (guint id)
   g_mutex_init (&data->lock);
   g_cond_init (&data->cond);
   data->id = id;
-  data->sink_host = NULL;
-  data->sink_port = 0;
   data->configured = FALSE;
-  data->sink_caps_str = NULL;
+
+  if (0 != nns_edge_create_handle (id, "TEMP_SERVER_TOPIC", &data->edge_h)) {
+    GST_ERROR ("Failed to get nnstreamer edge handle.");
+    gst_tensor_query_server_remove_data (data);
+    return NULL;
+  }
 
   G_LOCK (query_server_table);
-  g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
+  g_hash_table_insert (_qs_table, g_strdup (id), data);
   G_UNLOCK (query_server_table);
 
   return data;
 }
 
+
 /**
- * @brief Remove GstTensorQueryServerInfo.
+ * @brief Get edge handle from server data.
+ */
+nns_edge_h
+gst_tensor_query_server_get_edge_handle (edge_server_handle server_h)
+{
+  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
+
+  return data->edge_h;
+}
+
+/**
+ * @brief Remove GstTensorQueryServer.
  */
 void
-gst_tensor_query_server_remove_data (query_server_info_handle server_info_h)
+gst_tensor_query_server_remove_data (edge_server_handle server_h)
 {
-  GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
+  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
 
   if (NULL == data) {
     return;
   }
 
   G_LOCK (query_server_table);
-  g_hash_table_remove (_qs_table, GUINT_TO_POINTER (data->id));
+  if (!g_hash_table_lookup (_qs_table, data->id))
+    g_hash_table_remove (_qs_table, data->id);
   G_UNLOCK (query_server_table);
-  g_free (data->sink_host);
-  data->sink_host = NULL;
-  g_free (data->sink_caps_str);
-  data->sink_caps_str = NULL;
+
+  if (data->edge_h) {
+    nns_edge_release_handle (data->edge_h);
+    data->edge_h = NULL;
+  }
+
   g_cond_clear (&data->cond);
   g_mutex_clear (&data->lock);
   g_free (data);
+  data = NULL;
 }
 
 /**
  * @brief Wait until the sink is configured and get server info handle.
  */
 gboolean
-gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h)
+gst_tensor_query_server_wait_sink (edge_server_handle server_h)
 {
   gint64 end_time;
-  GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
+  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
 
   if (NULL == data) {
     return FALSE;
@@ -135,108 +154,22 @@ gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h)
 }
 
 /**
- * @brief set sink caps string.
+ * @brief set query server sink configured.
  */
 void
-gst_tensor_query_server_set_sink_caps_str (query_server_info_handle
-    server_info_h, const gchar * caps_str)
+gst_tensor_query_server_set_configured (edge_server_handle server_h)
 {
-  GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
-
+  GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
   if (NULL == data) {
     return;
   }
   g_mutex_lock (&data->lock);
-  if (caps_str) {
-    g_free (data->sink_caps_str);
-    data->sink_caps_str = g_strdup (caps_str);
-  }
   data->configured = TRUE;
   g_cond_broadcast (&data->cond);
   g_mutex_unlock (&data->lock);
 }
 
 /**
- * @brief get sink caps string.
- */
-gchar *
-gst_tensor_query_server_get_sink_caps_str (query_server_info_handle
-    server_info_h)
-{
-  GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
-  gchar *caps_str = NULL;
-
-  if (NULL == data) {
-    return caps_str;
-  }
-
-  g_mutex_lock (&data->lock);
-  caps_str = g_strdup (data->sink_caps_str);
-  g_mutex_unlock (&data->lock);
-
-  return caps_str;
-}
-
-/**
- * @brief set sink host
- */
-void
-gst_tensor_query_server_set_sink_host (query_server_info_handle server_info_h,
-    gchar * host, guint16 port)
-{
-  GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
-
-  if (NULL == data) {
-    return;
-  }
-
-  g_mutex_lock (&data->lock);
-  data->sink_host = g_strdup (host);
-  data->sink_port = port;
-  g_mutex_unlock (&data->lock);
-}
-
-/**
- * @brief get sink host
- */
-gchar *
-gst_tensor_query_server_get_sink_host (query_server_info_handle server_info_h)
-{
-  GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
-  gchar *sink_host = NULL;
-
-  if (NULL == data) {
-    return NULL;
-  }
-
-  g_mutex_lock (&data->lock);
-  sink_host = g_strdup (data->sink_host);
-  g_mutex_unlock (&data->lock);
-
-  return sink_host;
-}
-
-/**
- * @brief get sink port
- */
-guint16
-gst_tensor_query_server_get_sink_port (query_server_info_handle server_info_h)
-{
-  GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
-  guint16 sink_port = 0;
-
-  if (NULL == data) {
-    return sink_port;
-  }
-
-  g_mutex_lock (&data->lock);
-  sink_port = data->sink_port;
-  g_mutex_unlock (&data->lock);
-
-  return sink_port;
-}
-
-/**
  * @brief Initialize the query server.
  */
 static void
@@ -244,7 +177,7 @@ init_queryserver (void)
 {
   G_LOCK (query_server_table);
   g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */
-  _qs_table = g_hash_table_new (g_direct_hash, g_direct_equal);
+  _qs_table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
   G_UNLOCK (query_server_table);
 }
 
index 66282c9..492fde6 100644 (file)
 
 #include <gst/gst.h>
 #include <tensor_common.h>
+#include "nnstreamer-edge.h"
+#include "tensor_meta.h"
 
 G_BEGIN_DECLS
 
 #define DEFAULT_SERVER_ID 0
 #define DEFAULT_QUERY_INFO_TIMEOUT 5
 typedef void * query_server_info_handle;
+typedef void * edge_server_handle;
 
 /**
  * @brief GstTensorQueryServer internal info data structure.
  */
 typedef struct
 {
-  gint64 id;
-  gchar *sink_caps_str;
-  gchar *sink_host;
-  guint16 sink_port;
+  char *id;
   gboolean configured;
   GMutex lock;
   GCond cond;
-} GstTensorQueryServerInfo;
+
+  nns_edge_h edge_h;
+} GstTensorQueryServer;
 
 /**
- * @brief Get GstTensorQueryServerInfo.
+ * @brief Get nnstreamer edge server handle.
  */
-query_server_info_handle
-gst_tensor_query_server_get_data (guint id);
+edge_server_handle
+gst_tensor_query_server_get_handle (char *id);
 
 /**
- * @brief Add GstTensorQueryServerInfo.
+ * @brief Add GstTensorQueryServer.
  */
-query_server_info_handle
-gst_tensor_query_server_add_data (guint id);
+edge_server_handle
+gst_tensor_query_server_add_data (char *id);
 
 /**
- * @brief Remove GstTensorQueryServerInfo.
+ * @brief Remove GstTensorQueryServer.
  */
 void
-gst_tensor_query_server_remove_data (query_server_info_handle server_info_h);
+gst_tensor_query_server_remove_data (edge_server_handle server_h);
 
 /**
  * @brief Wait until the sink is configured and get server info handle.
  */
 gboolean
-gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h);
+gst_tensor_query_server_wait_sink (edge_server_handle server_h);
 
 /**
- * @brief set sink caps string.
+ * @brief Get edge handle from server data.
  */
-void gst_tensor_query_server_set_sink_caps_str (query_server_info_handle server_info_h, const gchar * caps_str);
+nns_edge_h
+gst_tensor_query_server_get_edge_handle (edge_server_handle server_h);
 
 /**
- * @brief get sink caps string.
- */
-gchar *
-gst_tensor_query_server_get_sink_caps_str (query_server_info_handle server_info_h);
-
-/**
- * @brief set sink host address and port
+ * @brief set query server sink configured.
  */
 void
-gst_tensor_query_server_set_sink_host (query_server_info_handle server_info_h, gchar *host, guint16 port);
-
-/**
- * @brief get sink host
- */
-gchar *
-gst_tensor_query_server_get_sink_host (query_server_info_handle server_info_h);
-
-/**
- * @brief get sink port
- */
-guint16
-gst_tensor_query_server_get_sink_port (query_server_info_handle server_info_h);
+gst_tensor_query_server_set_configured (edge_server_handle server_h);
 
 G_END_DECLS
 
index c895fd0..f3a6e47 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <string.h>
 #include "tensor_query_serversink.h"
+#include "nnstreamer_util.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_serversink_debug);
 #define GST_CAT_DEFAULT gst_tensor_query_serversink_debug
@@ -34,8 +35,6 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
 enum
 {
   PROP_0,
-  PROP_HOST,
-  PROP_PORT,
   PROP_PROTOCOL,
   PROP_ID,
   PROP_TIMEOUT,
@@ -53,7 +52,6 @@ static void gst_tensor_query_serversink_get_property (GObject * object,
 static void gst_tensor_query_serversink_finalize (GObject * object);
 
 static gboolean gst_tensor_query_serversink_start (GstBaseSink * bsink);
-static gboolean gst_tensor_query_serversink_stop (GstBaseSink * bsink);
 static GstFlowReturn gst_tensor_query_serversink_render (GstBaseSink * bsink,
     GstBuffer * buf);
 static gboolean gst_tensor_query_serversink_set_caps (GstBaseSink * basesink,
@@ -77,14 +75,6 @@ gst_tensor_query_serversink_class_init (GstTensorQueryServerSinkClass * klass)
   gobject_class->get_property = gst_tensor_query_serversink_get_property;
   gobject_class->finalize = gst_tensor_query_serversink_finalize;
 
-  g_object_class_install_property (gobject_class, PROP_HOST,
-      g_param_spec_string ("host", "Host", "The hostname to listen as",
-          DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_PORT,
-      g_param_spec_uint ("port", "Port",
-          "The port to listen to (0=random available port)", 0,
-          65535, DEFAULT_PORT_SINK,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_PROTOCOL,
       g_param_spec_enum ("protocol", "Protocol",
           "The network protocol to establish connection",
@@ -116,7 +106,6 @@ gst_tensor_query_serversink_class_init (GstTensorQueryServerSinkClass * klass)
       "Samsung Electronics Co., Ltd.");
 
   gstbasesink_class->start = gst_tensor_query_serversink_start;
-  gstbasesink_class->stop = gst_tensor_query_serversink_stop;
   gstbasesink_class->set_caps = gst_tensor_query_serversink_set_caps;
   gstbasesink_class->render = gst_tensor_query_serversink_render;
 
@@ -130,12 +119,9 @@ gst_tensor_query_serversink_class_init (GstTensorQueryServerSinkClass * klass)
 static void
 gst_tensor_query_serversink_init (GstTensorQueryServerSink * sink)
 {
-  sink->host = g_strdup (DEFAULT_HOST);
-  sink->port = DEFAULT_PORT_SINK;
   sink->protocol = DEFAULT_PROTOCOL;
   sink->timeout = QUERY_DEFAULT_TIMEOUT_SEC;
   sink->sink_id = DEFAULT_SERVER_ID;
-  sink->server_data = nnstreamer_query_server_data_new ();
   sink->metaless_frame_count = 0;
 }
 
@@ -146,15 +132,7 @@ static void
 gst_tensor_query_serversink_finalize (GObject * object)
 {
   GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (object);
-  g_free (sink->host);
-  if (sink->server_data) {
-    nnstreamer_query_server_data_free (sink->server_data);
-    sink->server_data = NULL;
-  }
-  if (sink->server_info_h) {
-    gst_tensor_query_server_remove_data (sink->server_info_h);
-    sink->server_info_h = NULL;
-  }
+  gst_tensor_query_server_remove_data (sink->server_h);
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -168,17 +146,6 @@ gst_tensor_query_serversink_set_property (GObject * object, guint prop_id,
   GstTensorQueryServerSink *serversink = GST_TENSOR_QUERY_SERVERSINK (object);
 
   switch (prop_id) {
-    case PROP_HOST:
-      if (!g_value_get_string (value)) {
-        nns_logw ("host property cannot be NULL");
-        break;
-      }
-      g_free (serversink->host);
-      serversink->host = g_value_dup_string (value);
-      break;
-    case PROP_PORT:
-      serversink->port = g_value_get_uint (value);
-      break;
     case PROP_PROTOCOL:
       serversink->protocol = g_value_get_enum (value);
       break;
@@ -207,12 +174,6 @@ gst_tensor_query_serversink_get_property (GObject * object, guint prop_id,
   GstTensorQueryServerSink *serversink = GST_TENSOR_QUERY_SERVERSINK (object);
 
   switch (prop_id) {
-    case PROP_HOST:
-      g_value_set_string (value, serversink->host);
-      break;
-    case PROP_PORT:
-      g_value_set_uint (value, serversink->port);
-      break;
     case PROP_PROTOCOL:
       g_value_set_enum (value, serversink->protocol);
       break;
@@ -240,22 +201,11 @@ gst_tensor_query_serversink_start (GstBaseSink * bsink)
   GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
   GstCaps *caps;
   gchar *caps_str = NULL;
+  gchar *id_str = NULL;
 
-  if (!sink->server_data) {
-    nns_loge ("Server_data is NULL");
-    return FALSE;
-  }
-
-  if (nnstreamer_query_server_init (sink->server_data, sink->protocol,
-          sink->host, sink->port, FALSE) != 0) {
-    nns_loge ("Failed to setup server");
-    return FALSE;
-  }
-
-  /** Set server sink information */
-  sink->server_info_h = gst_tensor_query_server_add_data (sink->sink_id);
-  gst_tensor_query_server_set_sink_host (sink->server_info_h, sink->host,
-      sink->port);
+  id_str = g_strdup_printf ("%u", sink->sink_id);
+  sink->server_h = gst_tensor_query_server_add_data (id_str);
+  g_free (id_str);
 
   caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (bsink));
   if (!caps) {
@@ -265,8 +215,9 @@ gst_tensor_query_serversink_start (GstBaseSink * bsink)
   if (caps) {
     caps_str = gst_caps_to_string (caps);
   }
-  gst_tensor_query_server_set_sink_caps_str (sink->server_info_h, caps_str);
 
+  sink->edge_h = gst_tensor_query_server_get_edge_handle (sink->server_h);
+  gst_tensor_query_server_set_configured (sink->server_h);
   gst_caps_unref (caps);
   g_free (caps_str);
 
@@ -274,18 +225,6 @@ gst_tensor_query_serversink_start (GstBaseSink * bsink)
 }
 
 /**
- * @brief stop processing of query_serversink
- */
-static gboolean
-gst_tensor_query_serversink_stop (GstBaseSink * bsink)
-{
-  GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
-  nnstreamer_query_server_data_free (sink->server_data);
-  sink->server_data = NULL;
-  return TRUE;
-}
-
-/**
  * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
  */
 static gboolean
@@ -295,7 +234,8 @@ gst_tensor_query_serversink_set_caps (GstBaseSink * bsink, GstCaps * caps)
   gchar *caps_str;
 
   caps_str = gst_caps_to_string (caps);
-  gst_tensor_query_server_set_sink_caps_str (sink->server_info_h, caps_str);
+  nns_edge_set_info (sink->edge_h, "CAPS", "@query_server_sink_caps@");
+  nns_edge_set_info (sink->edge_h, "CAPS", caps_str);
 
   g_free (caps_str);
 
@@ -310,20 +250,41 @@ gst_tensor_query_serversink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
   GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
   GstMetaQuery *meta_query;
-  query_connection_handle conn;
+  nns_edge_data_h data_h = NULL;
+  guint i, num_mems = 0;
+  gint ret;
+  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
+  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
+  char *val;
 
   meta_query = gst_buffer_get_meta_query (buf);
   if (meta_query) {
     sink->metaless_frame_count = 0;
-    conn = nnstreamer_query_server_accept (sink->server_data,
-        meta_query->client_id);
-    if (conn) {
-      if (!tensor_query_send_buffer (conn, GST_ELEMENT (sink), buf)) {
-        nns_logw ("Failed to send buffer to client, drop current buffer.");
+
+    ret = nns_edge_data_create (&data_h);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_loge ("Failed to create data handle in server sink.");
+      return GST_FLOW_ERROR;
+    }
+
+    num_mems = gst_buffer_n_memory (buf);
+    for (i = 0; i < num_mems; i++) {
+      mem[i] = gst_buffer_peek_memory (buf, i);
+      if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
+        ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
+        num_mems = i;
+        goto done;
       }
-    } else {
-      nns_logw ("Cannot get the client connection, drop current buffer.");
+      nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
     }
+
+    val = g_strdup_printf ("%ld", (long int) meta_query->client_id);
+    nns_edge_data_set_info (data_h, "client_id", val);
+    g_free (val);
+
+    nns_edge_respond (sink->edge_h, data_h);
+    nns_edge_data_destroy (data_h);
+    data_h = NULL;
   } else {
     nns_logw ("Cannot get tensor query meta. Drop buffers!\n");
     sink->metaless_frame_count++;
@@ -336,6 +297,9 @@ gst_tensor_query_serversink_render (GstBaseSink * bsink, GstBuffer * buf)
       return GST_FLOW_ERROR;
     }
   }
+done:
+  for (i = 0; i < num_mems; i++)
+    gst_memory_unmap (mem[i], &map[i]);
 
   return GST_FLOW_OK;
 }
index 07bba42..b88da3b 100644 (file)
@@ -17,9 +17,8 @@
 #include <gst/base/gstbasesink.h>
 #include <tensor_common.h>
 #include <tensor_meta.h>
-#include "tensor_query_common.h"
 #include "tensor_query_server.h"
-
+#include "tensor_query_common.h"
 G_BEGIN_DECLS
 
 #define GST_TYPE_TENSOR_QUERY_SERVERSINK \
@@ -45,14 +44,14 @@ struct _GstTensorQueryServerSink
   GstBaseSink element; /**< parent object */
   guint sink_id;
 
-  guint16 port;
-  gchar *host;
-  TensorQueryProtocol protocol;
   guint timeout;
-  query_server_handle server_data;
   query_server_info_handle server_info_h;
   gint metaless_frame_limit;
   gint metaless_frame_count;
+
+  nns_edge_protocol_e protocol;
+  edge_server_handle server_h;
+  nns_edge_h edge_h;
 };
 
 /**
index 080c5d9..9859abd 100644 (file)
 
 #include <tensor_typedef.h>
 #include <tensor_common.h>
-#include "tensor_query_common.h"
 #include "tensor_query_serversrc.h"
 #include "tensor_query_server.h"
+#include "tensor_query_common.h"
+#include "nnstreamer_util.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_serversrc_debug);
 #define GST_CAT_DEFAULT gst_tensor_query_serversrc_debug
@@ -44,8 +45,6 @@ enum
   PROP_PROTOCOL,
   PROP_TIMEOUT,
   PROP_OPERATION,
-  PROP_BROKER_HOST,
-  PROP_BROKER_PORT,
   PROP_ID,
   PROP_IS_LIVE
 };
@@ -61,7 +60,6 @@ static void gst_tensor_query_serversrc_get_property (GObject * object,
 static void gst_tensor_query_serversrc_finalize (GObject * object);
 
 static gboolean gst_tensor_query_serversrc_start (GstBaseSrc * bsrc);
-static gboolean gst_tensor_query_serversrc_stop (GstBaseSrc * bsrc);
 static GstFlowReturn gst_tensor_query_serversrc_create (GstPushSrc * psrc,
     GstBuffer ** buf);
 
@@ -103,18 +101,10 @@ gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass)
           3600, QUERY_DEFAULT_TIMEOUT_SEC,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_OPERATION,
-      g_param_spec_string ("operation", "Operation",
-          "The main operation of the host and option if necessary. "
-          "(operation)/(optional topic for main operation).",
+      g_param_spec_string ("topic", "Topic",
+          "The main topic of the host and option if necessary. "
+          "(topic)/(optional topic for main topic).",
           "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BROKER_HOST,
-      g_param_spec_string ("broker-host", "Broker Host",
-          "Broker host address to connect.", DEFAULT_BROKER_HOST,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_BROKER_PORT,
-      g_param_spec_uint ("broker-port", "Broker Port",
-          "Broker port to connect.", 0, 65535,
-          DEFAULT_BROKER_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_ID,
       g_param_spec_uint ("id", "ID",
           "ID for distinguishing query servers.", 0,
@@ -134,7 +124,6 @@ gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass)
       "Samsung Electronics Co., Ltd.");
 
   gstbasesrc_class->start = gst_tensor_query_serversrc_start;
-  gstbasesrc_class->stop = gst_tensor_query_serversrc_stop;
   gstpushsrc_class->create = gst_tensor_query_serversrc_create;
 
   GST_DEBUG_CATEGORY_INIT (gst_tensor_query_serversrc_debug,
@@ -151,13 +140,14 @@ gst_tensor_query_serversrc_init (GstTensorQueryServerSrc * src)
   src->port = DEFAULT_PORT_SRC;
   src->protocol = DEFAULT_PROTOCOL;
   src->timeout = QUERY_DEFAULT_TIMEOUT_SEC;
-  src->operation = NULL;
-  src->broker_host = g_strdup (DEFAULT_BROKER_HOST);
-  src->broker_port = DEFAULT_BROKER_PORT;
+  src->topic = NULL;
+  src->srv_host = g_strdup (DEFAULT_HOST);
+  src->srv_port = DEFAULT_PORT_SRC;
   src->src_id = DEFAULT_SERVER_ID;
   tensor_query_hybrid_init (&src->hybrid_info, NULL, 0, TRUE);
-  src->server_data = nnstreamer_query_server_data_new ();
   src->configured = FALSE;
+  src->msg_queue = g_async_queue_new ();
+
   gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
   /** set the timestamps on each buffer */
   gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE);
@@ -172,17 +162,20 @@ static void
 gst_tensor_query_serversrc_finalize (GObject * object)
 {
   GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (object);
+  nns_edge_data_h data_h;
 
   g_free (src->host);
   src->host = NULL;
-  g_free (src->operation);
-  src->operation = NULL;
-  g_free (src->broker_host);
-  src->broker_host = NULL;
-  if (src->server_data) {
-    nnstreamer_query_server_data_free (src->server_data);
-    src->server_data = NULL;
+  g_free (src->topic);
+  src->topic = NULL;
+  g_free (src->srv_host);
+  src->srv_host = NULL;
+
+  while ((data_h = g_async_queue_try_pop (src->msg_queue))) {
+    nns_edge_data_destroy (data_h);
   }
+  g_async_queue_unref (src->msg_queue);
+
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -215,23 +208,11 @@ gst_tensor_query_serversrc_set_property (GObject * object, guint prop_id,
       break;
     case PROP_OPERATION:
       if (!g_value_get_string (value)) {
-        nns_logw
-            ("operation property cannot be NULL. Query-hybrid is disabled.");
+        nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
         break;
       }
-      g_free (serversrc->operation);
-      serversrc->operation = g_value_dup_string (value);
-      break;
-    case PROP_BROKER_HOST:
-      if (!g_value_get_string (value)) {
-        nns_logw ("Broker host property cannot be NULL");
-        break;
-      }
-      g_free (serversrc->broker_host);
-      serversrc->broker_host = g_value_dup_string (value);
-      break;
-    case PROP_BROKER_PORT:
-      serversrc->broker_port = g_value_get_uint (value);
+      g_free (serversrc->topic);
+      serversrc->topic = g_value_dup_string (value);
       break;
     case PROP_ID:
       serversrc->src_id = g_value_get_uint (value);
@@ -269,13 +250,7 @@ gst_tensor_query_serversrc_get_property (GObject * object, guint prop_id,
       g_value_set_uint (value, serversrc->timeout);
       break;
     case PROP_OPERATION:
-      g_value_set_string (value, serversrc->operation);
-      break;
-    case PROP_BROKER_HOST:
-      g_value_set_string (value, serversrc->broker_host);
-      break;
-    case PROP_BROKER_PORT:
-      g_value_set_uint (value, serversrc->broker_port);
+      g_value_set_string (value, serversrc->topic);
       break;
     case PROP_ID:
       g_value_set_uint (value, serversrc->src_id);
@@ -291,61 +266,147 @@ gst_tensor_query_serversrc_get_property (GObject * object, guint prop_id,
 }
 
 /**
+ * @brief nnstreamer-edge event callback.
+ */
+static int
+_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+  nns_edge_event_e event_type;
+  int ret = NNS_EDGE_ERROR_NONE;
+
+  GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) user_data;
+  if (0 != nns_edge_event_get_type (event_h, &event_type)) {
+    nns_loge ("Failed to get event type!");
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+
+  switch (event_type) {
+    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+    {
+      nns_edge_data_h data;
+
+      nns_edge_event_parse_new_data (event_h, &data);
+      g_async_queue_push (src->msg_queue, data);
+      break;
+    }
+    default:
+      break;
+  }
+
+  return ret;
+}
+
+/**
  * @brief start processing of query_serversrc, setting up the server
  */
 static gboolean
 gst_tensor_query_serversrc_start (GstBaseSrc * bsrc)
 {
   GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
+  char *id_str = NULL;
+  char *port = NULL;
 
-  if (!src->server_data) {
-    nns_loge ("Server_data is NULL");
-    return FALSE;
-  }
+  id_str = g_strdup_printf ("%d", src->src_id);
+  src->server_h = gst_tensor_query_server_add_data (id_str);
+  g_free (id_str);
 
-  if (nnstreamer_query_server_init (src->server_data, src->protocol,
-          src->host, src->port, TRUE) != 0) {
-    nns_loge ("Failed to setup server");
-    return FALSE;
-  }
-
-  src->server_info_h = gst_tensor_query_server_add_data (src->src_id);
-  if (!gst_tensor_query_server_wait_sink (src->server_info_h)) {
-    nns_loge ("Failed to get server information from query server.");
-    return FALSE;
-  }
+  src->edge_h = gst_tensor_query_server_get_edge_handle (src->server_h);
+  nns_edge_set_info (src->edge_h, "IP", src->host);
+  port = g_strdup_printf ("%d", src->port);
+  nns_edge_set_info (src->edge_h, "PORT", port);
+  g_free (port);
 
   /** Publish query sever connection info */
-  if (src->operation) {
-    tensor_query_hybrid_set_node (&src->hybrid_info, src->host, src->port,
-        src->server_info_h);
-    tensor_query_hybrid_set_broker (&src->hybrid_info,
-        src->broker_host, src->broker_port);
+  if (src->topic) {
+    nns_edge_set_info (src->edge_h, "TOPIC", src->topic);
+    tensor_query_hybrid_set_node (&src->hybrid_info, src->srv_host,
+        src->srv_port, src->server_info_h);
+    tensor_query_hybrid_set_broker (&src->hybrid_info, src->host, src->port);
 
-    if (!tensor_query_hybrid_publish (&src->hybrid_info, src->operation)) {
+    if (!tensor_query_hybrid_publish (&src->hybrid_info, src->topic)) {
       nns_loge ("Failed to publish a topic.");
       return FALSE;
     }
   } else {
+    g_free (src->srv_host);
+    src->srv_host = g_strdup (src->host);
+    src->srv_port = src->port;
     nns_logi ("Query-hybrid feature is disabled.");
     nns_logi
-        ("Specify operation to register server to broker (e.g., operation=object_detection/mobilev3).");
+        ("Specify topic to register server to broker (e.g., topic=object_detection/mobilev3).");
+  }
+  nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src);
+
+  if (0 != nns_edge_start (src->edge_h, true)) {
+    nns_loge
+        ("Failed to start NNStreamer-edge. Please check server IP and port");
+    return FALSE;
+  }
+
+  if (!gst_tensor_query_server_wait_sink (src->server_h)) {
+    nns_loge ("Failed to get server information from query server.");
+    return FALSE;
   }
+
   return TRUE;
 }
 
 /**
- * @brief stop processing of query_serversrc
+ * @brief Get buffer from message queue.
  */
-static gboolean
-gst_tensor_query_serversrc_stop (GstBaseSrc * bsrc)
+static GstBuffer *
+_gst_tensor_query_serversrc_get_buffer (GstTensorQueryServerSrc * src)
 {
-  GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
+  nns_edge_data_h data_h;
+  GstBuffer *buffer = NULL;
+  guint i, num_data;
+  GstMetaQuery *meta_query;
 
-  tensor_query_hybrid_close (&src->hybrid_info);
-  nnstreamer_query_server_data_free (src->server_data);
-  src->server_data = NULL;
-  return TRUE;
+  data_h = g_async_queue_pop (src->msg_queue);
+
+  if (!data_h) {
+    nns_loge ("Failed to get message from the server message queue");
+    return NULL;
+  }
+  buffer = gst_buffer_new ();
+
+  if (NNS_EDGE_ERROR_NONE != nns_edge_data_get_count (data_h, &num_data)) {
+    nns_loge ("Failed to get the number of memories of the edge data.");
+    gst_buffer_unref (buffer);
+    return NULL;
+  }
+  for (i = 0; i < num_data; i++) {
+    void *data = NULL;
+    size_t data_len = 0;
+    gpointer new_data;
+
+    nns_edge_data_get (data_h, i, &data, &data_len);
+    new_data = _g_memdup (data, data_len);
+
+    gst_buffer_append_memory (buffer,
+        gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
+            g_free));
+  }
+
+  if (buffer) {
+    meta_query = gst_buffer_add_meta_query (buffer);
+    if (meta_query) {
+      char *val;
+      int ret;
+
+      ret = nns_edge_data_get_info (data_h, "client_id", &val);
+      if (NNS_EDGE_ERROR_NONE != ret) {
+        gst_buffer_unref (buffer);
+        buffer = NULL;
+      } else {
+        meta_query->client_id = g_ascii_strtoll (val, NULL, 10);
+        g_free (val);
+      }
+    }
+  }
+  nns_edge_data_destroy (data_h);
+
+  return buffer;
 }
 
 /**
@@ -357,33 +418,27 @@ gst_tensor_query_serversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (psrc);
   GstBaseSrc *bsrc = GST_BASE_SRC (psrc);
 
-  if (!src->server_data) {
-    nns_loge ("Server data is NULL");
-    return GST_FLOW_ERROR;
-  }
-
   if (!src->configured) {
-    gchar *sink_caps_str, *src_caps_str;
+    gchar *src_caps_str;
 
     GstCaps *caps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (bsrc), NULL);
     if (gst_caps_is_fixed (caps)) {
       gst_base_src_set_caps (bsrc, caps);
     }
 
-    src->server_info_h = gst_tensor_query_server_get_data (src->src_id);
-    sink_caps_str =
-        gst_tensor_query_server_get_sink_caps_str (src->server_info_h);
     src_caps_str = gst_caps_to_string (caps);
-    nnstreamer_query_server_data_set_caps_str (src->server_data, src_caps_str,
-        sink_caps_str);
-
-    g_free (sink_caps_str);
+    nns_edge_set_info (src->edge_h, "CAPS", "@query_server_src_caps@");
+    nns_edge_set_info (src->edge_h, "CAPS", src_caps_str);
     g_free (src_caps_str);
     gst_caps_unref (caps);
     src->configured = TRUE;
   }
 
-  *outbuf = nnstreamer_query_server_get_buffer (src->server_data);
+  *outbuf = _gst_tensor_query_serversrc_get_buffer (src);
+  if (!outbuf) {
+    nns_loge ("Failed to get buffer to push to the tensor query serversrc.");
+    return GST_FLOW_ERROR;
+  }
 
   return GST_FLOW_OK;
 }
index f3616b0..2e703ec 100644 (file)
@@ -15,8 +15,6 @@
 
 #include <gst/base/gstbasesrc.h>
 #include <gst/base/gstpushsrc.h>
-#include "tensor_query_common.h"
-#include <nnstreamer_util.h>
 #include <tensor_meta.h>
 #include "tensor_query_hybrid.h"
 
@@ -48,17 +46,20 @@ struct _GstTensorQueryServerSrc
 
   guint16 port;
   gchar *host;
-  TensorQueryProtocol protocol;
+  guint16 srv_port;
+  gchar *srv_host;
   guint timeout;
 
   /* Query-hybrid feature */
-  gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */
+  gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */
   query_hybrid_info_s hybrid_info;
-  gchar *broker_host;
-  guint16 broker_port;
 
-  query_server_handle server_data; /* server data passed to common functions */
   query_server_info_handle server_info_h;
+
+  nns_edge_protocol_e protocol;
+  edge_server_handle server_h;
+  nns_edge_h edge_h;
+  GAsyncQueue *msg_queue;
 };
 
 /**
index bddab60..c541966 100644 (file)
@@ -68,12 +68,17 @@ NNSTREAMER_PLUGINS_SRCS := \
     $(NNSTREAMER_GST_HOME)/elements/gsttensor_split.c \
     $(NNSTREAMER_GST_HOME)/elements/gsttensor_transform.c \
     $(NNSTREAMER_GST_HOME)/tensor_filter/tensor_filter.c \
+
+# tensor-query element with nnstreamer-edge
+NNSTREAMER_QUERY_SRCS := \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_common.c \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_hybrid.c \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_client.c \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversink.c \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversrc.c \
-    $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_server.c
+    $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_server.c \
+    $(NNSTREAMER_GST_HOME)/tensor_query/nnstreamer_edge_common.c \
+    $(NNSTREAMER_GST_HOME)/tensor_query/nnstreamer_edge_internal.c
 
 # source AMC (Android MediaCodec)
 NNSTREAMER_SOURCE_AMC_SRCS := \
index 9ee4444..1e4a79a 100644 (file)
@@ -425,6 +425,10 @@ features = {
     'target': 'aitt',
     'project_args': { 'ENABLE_AITT' : 1 }
   },
+  'nnstreamer-edge-support': {
+    'target': 'nnstreamer-edge',
+    'project_args': { 'ENABLE_NNSTREAMER_EDGE': 1 }
+  },
   'mxnet-support': {
     'extra_deps': [ mxnet_dep ]
   }
index 04f1034..076bd98 100644 (file)
@@ -24,6 +24,7 @@ option('mqtt-support', type: 'feature', value: 'auto')
 option('tvm-support', type: 'feature', value: 'auto')
 option('query-hybrid-support', type: 'feature', value: 'auto')
 option('trix-engine-support', type: 'feature', value: 'auto')
+option('nnstreamer-edge-support', type: 'feature', value: 'auto')
 option('aitt-support', type: 'feature', value: 'auto')
 option('mxnet-support', type: 'feature', value: 'auto')
 
index 8fdf3ea..73f5ec6 100644 (file)
@@ -155,6 +155,7 @@ Requires: nnstreamer-core = %{version}-%{release}
 Requires: nnstreamer-configuration = %{version}-%{release}
 Requires: nnstreamer-single = %{version}-%{release}
 Recommends: nnstreamer-default-configuration = %{version}-%{release}
+BuildRequires: nnstreamer-edge-devel
 
 ## Define build requirements ##
 BuildRequires: gstreamer-devel