[Query] Implement mqtt-hybrid
authorgichan <gichan2.jang@samsung.com>
Fri, 22 Jul 2022 01:47:58 +0000 (10:47 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Thu, 28 Jul 2022 23:55:11 +0000 (08:55 +0900)
Implement mqtt-hybrid using nnstreamer-edge lib.

Signed-off-by: gichan <gichan2.jang@samsung.com>
15 files changed:
gst/nnstreamer/meson.build
gst/nnstreamer/tensor_query/meson.build
gst/nnstreamer/tensor_query/tensor_query_client.c
gst/nnstreamer/tensor_query/tensor_query_client.h
gst/nnstreamer/tensor_query/tensor_query_hybrid.c [deleted file]
gst/nnstreamer/tensor_query/tensor_query_hybrid.h [deleted file]
gst/nnstreamer/tensor_query/tensor_query_server.c
gst/nnstreamer/tensor_query/tensor_query_server.h
gst/nnstreamer/tensor_query/tensor_query_serversink.c
gst/nnstreamer/tensor_query/tensor_query_serversink.h
gst/nnstreamer/tensor_query/tensor_query_serversrc.c
gst/nnstreamer/tensor_query/tensor_query_serversrc.h
jni/nnstreamer.mk
meson.build
tests/nnstreamer_query/runTest.sh

index b7a1e41..4a2d4cc 100644 (file)
@@ -73,10 +73,6 @@ foreach p : nnst_plugins
   subdir(p)
 endforeach
 
-if query_hybrid_support_is_available
-  nnstreamer_deps += query_hybrid_support_deps
-endif
-
 subdir('include')
 
 # Private header for sub-plugins and native APIs
index 1830053..af1ed6e 100644 (file)
@@ -6,10 +6,6 @@ tensor_query_sources = [
   'tensor_query_server.c',
 ]
 
-if query_hybrid_support_is_available
-  tensor_query_sources += 'tensor_query_hybrid.c'
-endif
-
 foreach s : tensor_query_sources
   nnstreamer_sources += join_paths(meson.current_source_dir(), s)
 endforeach
index ebf7110..378eadb 100644 (file)
 #include <gio/gio.h>
 #include <glib.h>
 #include <string.h>
-#include "nnstreamer-edge.h"
 #include "tensor_query_common.h"
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
-#include <arpa/inet.h>
-#include <ifaddrs.h>
 
 /**
  * @brief Macro for debug mode.
@@ -44,7 +41,7 @@ enum
   PROP_HOST,
   PROP_PORT,
   PROP_PROTOCOL,
-  PROP_OPERATION,
+  PROP_TOPIC,
   PROP_SILENT,
 };
 
@@ -124,9 +121,9 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
           "The network protocol to establish connections between client and server.",
           GST_TYPE_QUERY_PROTOCOL, DEFAULT_PROTOCOL,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_OPERATION,
-      g_param_spec_string ("operation", "Operation",
-          "The main operation of the host.",
+  g_object_class_install_property (gobject_class, PROP_TOPIC,
+      g_param_spec_string ("topic", "Topic",
+          "The main topic of the host.",
           "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
@@ -144,40 +141,6 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
 }
 
 /**
- * @brief Get IP address
- */
-static gchar *
-_get_ip_address (void)
-{
-  struct ifaddrs *addrs, *run;
-  gchar *ret = NULL;
-
-  if (0 != getifaddrs (&addrs))
-    goto done;
-  run = addrs;
-
-  while (run) {
-    if (run->ifa_addr && run->ifa_addr->sa_family == AF_INET) {
-      struct sockaddr_in *pAddr = (struct sockaddr_in *) run->ifa_addr;
-
-      if (NULL != strstr (run->ifa_name, "en") ||
-          NULL != strstr (run->ifa_name, "et")) {
-        g_free (ret);
-        ret = g_strdup (inet_ntoa (pAddr->sin_addr));
-      }
-    }
-    run = run->ifa_next;
-  }
-  freeifaddrs (addrs);
-
-done:
-  if (NULL == ret)
-    ret = g_strdup ("localhost");
-
-  return ret;
-}
-
-/**
  * @brief initialize the new element
  */
 static void
@@ -202,14 +165,9 @@ gst_tensor_query_client_init (GstTensorQueryClient * self)
   self->protocol = DEFAULT_PROTOCOL;
   self->host = g_strdup (TCP_DEFAULT_HOST);
   self->port = TCP_DEFAULT_SRC_PORT;
-  self->srv_host = g_strdup (TCP_DEFAULT_HOST);
-  self->srv_port = TCP_DEFAULT_SRC_PORT;
-  self->operation = NULL;
+  self->topic = NULL;
   self->in_caps_str = NULL;
   self->msg_queue = g_async_queue_new ();
-
-  tensor_query_hybrid_init (&self->hybrid_info, NULL, 0, FALSE);
-  nns_edge_create_handle ("TEMP_ID", "TEMP_CLIENT_TOPIC", &self->edge_h);
 }
 
 /**
@@ -223,19 +181,20 @@ gst_tensor_query_client_finalize (GObject * object)
 
   g_free (self->host);
   self->host = NULL;
-  g_free (self->srv_host);
-  self->srv_host = NULL;
-  g_free (self->operation);
-  self->operation = NULL;
+  g_free (self->topic);
+  self->topic = NULL;
   g_free (self->in_caps_str);
   self->in_caps_str = NULL;
 
   while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
     nns_edge_data_destroy (data_h);
   }
-  g_async_queue_unref (self->msg_queue);
 
-  tensor_query_hybrid_close (&self->hybrid_info);
+  if (self->msg_queue) {
+    g_async_queue_unref (self->msg_queue);
+    self->msg_queue = NULL;
+  }
+
   if (self->edge_h) {
     nns_edge_release_handle (self->edge_h);
     self->edge_h = NULL;
@@ -266,21 +225,17 @@ gst_tensor_query_client_set_property (GObject * object, guint prop_id,
       self->port = g_value_get_uint (value);
       break;
     case PROP_PROTOCOL:
-    {
-        /** @todo expand when other protocols are ready */
-      nns_edge_protocol_e protocol = g_value_get_enum (value);
-      if (protocol == NNS_EDGE_PROTOCOL_TCP)
-        self->protocol = protocol;
-    }
+      self->protocol = g_value_get_enum (value);
       break;
-    case PROP_OPERATION:
+    case PROP_TOPIC:
       if (!g_value_get_string (value)) {
-        nns_logw
-            ("Operation property cannot be NULL. Query-hybrid is disabled.");
+        nns_logw ("Topic property cannot be NULL. Query-hybrid is disabled.");
         break;
       }
-      g_free (self->operation);
-      self->operation = g_value_dup_string (value);
+      g_free (self->topic);
+      self->topic = g_value_dup_string (value);
+      if (NNS_EDGE_PROTOCOL_TCP == self->protocol)
+        self->protocol = NNS_EDGE_PROTOCOL_MQTT;
       break;
     case PROP_SILENT:
       self->silent = g_value_get_boolean (value);
@@ -310,9 +265,8 @@ gst_tensor_query_client_get_property (GObject * object, guint prop_id,
     case PROP_PROTOCOL:
       g_value_set_enum (value, self->protocol);
       break;
-    case PROP_OPERATION:
-      g_value_set_string (value, self->operation);
-      break;
+    case PROP_TOPIC:
+      g_value_set_string (value, self->topic);
       break;
     case PROP_SILENT:
       g_value_set_boolean (value, self->silent);
@@ -358,44 +312,23 @@ gst_tensor_query_client_update_caps (GstTensorQueryClient * self,
 }
 
 /**
- * @brief Copy server info.
- */
-static void
-_copy_srv_info (GstTensorQueryClient * self, query_server_info_s * server)
-{
-  g_free (self->srv_host);
-  self->srv_host = g_strdup (server->src.host);
-  self->srv_port = server->src.port;
-}
-
-/**
  * @brief Retry to connect to available server.
  */
 static gboolean
 _client_retry_connection (GstTensorQueryClient * self)
 {
-  gboolean ret = FALSE;
-  query_server_info_s *server = NULL;
-
-  g_return_val_if_fail (self->operation, FALSE);
-  nns_logd ("Retry to connect to available server.");
-
-  while ((server = tensor_query_hybrid_get_server_info (&self->hybrid_info))) {
-    nns_edge_disconnect (self->edge_h);
-
-    _copy_srv_info (self, server);
-    tensor_query_hybrid_free_server_info (server);
+  if (NNS_EDGE_ERROR_NONE != nns_edge_disconnect (self->edge_h)) {
+    nns_loge ("Failed to retry connection, disconnection failure");
+    return FALSE;
+  }
 
-    if (nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP, self->srv_host,
-            self->srv_port)) {
-      nns_logi ("Connected to new server: %s:%u.", self->srv_host,
-          self->srv_port);
-      ret = TRUE;
-      break;
-    }
+  if (NNS_EDGE_ERROR_NONE != nns_edge_connect (self->edge_h,
+          self->protocol, self->host, self->port)) {
+    nns_loge ("Failed to retry connection, connection failure");
+    return FALSE;
   }
 
-  return ret;
+  return TRUE;
 }
 
 /**
@@ -404,11 +337,17 @@ _client_retry_connection (GstTensorQueryClient * self)
 static gchar *
 _nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
 {
-  gchar **strv = g_strsplit (caps_str, "@", -1);
-  gint num = g_strv_length (strv), i;
+  gchar **strv;
+  gint num, i;
   gchar *find_key = NULL;
   gchar *ret_str = NULL;
 
+  if (!caps_str)
+    return NULL;
+
+  strv = g_strsplit (caps_str, "@", -1);
+  num = g_strv_length (strv);
+
   find_key =
       is_src ==
       TRUE ? g_strdup ("query_server_src_caps") :
@@ -519,7 +458,7 @@ gst_tensor_query_client_sink_event (GstPad * pad,
 {
   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
   gboolean ret = TRUE;
-  gchar *ip, *prev_caps_str, *new_caps_str;
+  gchar *prev_caps_str, *new_caps_str;
 
   GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
       GST_EVENT_TYPE_NAME (event), event);
@@ -528,36 +467,17 @@ gst_tensor_query_client_sink_event (GstPad * pad,
     case GST_EVENT_CAPS:
     {
       GstCaps *caps;
-      gst_event_parse_caps (event, &caps);
+      gchar *_topic = NULL, *protocol_str = NULL;
 
-      /** Subscribe server info from broker */
-      if (self->operation) {
-        query_server_info_s *server;
-
-        tensor_query_hybrid_set_broker (&self->hybrid_info,
-            self->host, self->port);
-
-        if (!tensor_query_hybrid_subscribe (&self->hybrid_info,
-                self->operation)) {
-          nns_loge ("Failed to subscribe a topic.");
-          gst_event_unref (event);
-          return FALSE;
-        }
+      if (self->topic)
+        _topic = g_strdup (self->topic);
+      else
+        _topic = g_strdup ("TCP_DIRECT");
 
-        server = tensor_query_hybrid_get_server_info (&self->hybrid_info);
-        if (server) {
-          _copy_srv_info (self, server);
-          tensor_query_hybrid_free_server_info (server);
-        }
-      } else {
-        nns_logi ("Query-hybrid feature is disabled.");
-        nns_logi
-            ("Specify operation to subscribe to the available broker (e.g., operation=object_detection).");
-        g_free (self->srv_host);
-        self->srv_host = g_strdup (self->host);
-        self->srv_port = self->port;
-      }
+      nns_edge_create_handle ("TEMP_ID", _topic, &self->edge_h);
+      g_free (_topic);
 
+      gst_event_parse_caps (event, &caps);
       g_free (self->in_caps_str);
       self->in_caps_str = gst_caps_to_string (caps);
       nns_edge_get_info (self->edge_h, "CAPS", &prev_caps_str);
@@ -567,13 +487,12 @@ gst_tensor_query_client_sink_event (GstPad * pad,
       nns_edge_set_info (self->edge_h, "CAPS", new_caps_str);
       g_free (prev_caps_str);
       g_free (new_caps_str);
+      protocol_str = g_strdup_printf ("%d", self->protocol);
+      nns_edge_set_info (self->edge_h, "PROTOCOL", protocol_str);
+      g_free (protocol_str);
 
-      nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
 
-      ip = _get_ip_address ();
-      nns_edge_set_info (self->edge_h, "IP", ip);
-      nns_edge_set_info (self->edge_h, "PORT", "0");
-      g_free (ip);
+      nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
 
       if (0 != nns_edge_start (self->edge_h, false)) {
         nns_loge
@@ -581,8 +500,8 @@ gst_tensor_query_client_sink_event (GstPad * pad,
         return FALSE;
       }
 
-      if (0 != nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP,
-              self->srv_host, self->srv_port)) {
+      if (0 != nns_edge_connect (self->edge_h, self->protocol,
+              self->host, self->port)) {
         nns_loge ("Failed to connect to edge server!");
         return FALSE;
       }
@@ -717,7 +636,7 @@ gst_tensor_query_client_chain (GstPad * pad,
   goto done;
 
 retry:
-  if (!self->operation || !_client_retry_connection (self)) {
+  if (!self->topic || !_client_retry_connection (self)) {
     nns_loge ("Failed to retry connection");
     res = GST_FLOW_ERROR;
   }
index 175fa13..be1d556 100644 (file)
@@ -16,7 +16,7 @@
 #include <gst/gst.h>
 #include <gio/gio.h>
 #include <tensor_common.h>
-#include "tensor_query_hybrid.h"
+#include "nnstreamer-edge.h"
 
 G_BEGIN_DECLS
 
@@ -48,8 +48,7 @@ struct _GstTensorQueryClient
   gchar *in_caps_str;
 
   /* Query-hybrid feature */
-  gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */
-  query_hybrid_info_s hybrid_info;
+  gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */
   gchar *host;
   guint16 port;
 
diff --git a/gst/nnstreamer/tensor_query/tensor_query_hybrid.c b/gst/nnstreamer/tensor_query/tensor_query_hybrid.c
deleted file mode 100644 (file)
index 0fc6e3f..0000000
+++ /dev/null
@@ -1,375 +0,0 @@
-/* SPDX-License-Identifier: LGPL-2.1-only */
-/**
- * Copyright (C) 2021 Gichan Jang <gichan2.jang@samsung.com>
- *
- * @file   tensor_query_hybrid.c
- * @date   12 Oct 2021
- * @brief  Utility functions for tensor-query hybrid feature
- * @see    https://github.com/nnstreamer/nnstreamer
- * @author Gichan Jang <gichan2.jang@samsung.com>
- * @bug    No known bugs except for NYI items
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <string.h>
-#include "tensor_query_hybrid.h"
-#include "nnstreamer_log.h"
-
-#if defined(ENABLE_QUERY_HYBRID)
-#include <nnsquery.h>
-
-/**
- * @brief Internal function to free node info.
- */
-static void
-_free_node_info (query_node_info_s * node)
-{
-  if (node) {
-    g_free (node->host);
-    node->host = NULL;
-  }
-}
-
-/**
- * @brief Internal function to free server info.
- */
-static void
-_free_server_info (query_server_info_s * server)
-{
-  if (server) {
-    _free_node_info (&server->src);
-    _free_node_info (&server->sink);
-  }
-}
-
-/**
- * @brief Parse received message.
- */
-static void
-_broker_parse_message (query_hybrid_info_s * info, gchar * payload)
-{
-  gchar **splits;
-  query_server_info_s *server;
-
-  server = g_try_new0 (query_server_info_s, 1);
-  if (!server) {
-    nns_loge ("[Query-hybrid] Failed to allocate query server info.");
-    return;
-  }
-
-  splits = g_strsplit (payload, "/", -1);
-  server->src.host = g_strdup (splits[0]);
-  server->src.port = g_ascii_strtoull (splits[1], NULL, 10);
-  server->sink.host = g_strdup (splits[2]);
-  server->sink.port = g_ascii_strtoull (splits[3], NULL, 10);
-
-  nns_logd ("[Query-hybrid] Parsed info: src[%s:%u] sink[%s:%u]",
-      server->src.host, server->src.port, server->sink.host, server->sink.port);
-
-  g_async_queue_push (info->server_list, server);
-  g_strfreev (splits);
-}
-
-/**
- * @brief State change callback.
- */
-static void
-_broker_state_change_cb (void *user_data, query_mqtt_state_t state)
-{
-  query_hybrid_info_s *info = (query_hybrid_info_s *) user_data;
-
-  info->state = (gint) state;
-  nns_logd ("[Query-hybrid] Broker state changed to %d.", state);
-}
-
-/**
- * @brief Raw message received callback.
- */
-static void
-_broker_msg_received_cb (const gchar * topic, msg_data * msg, gint msg_len,
-    void *user_data)
-{
-  query_hybrid_info_s *info = (query_hybrid_info_s *) user_data;
-  gchar *payload;
-  gint size;
-
-  if (msg_len <= 0) {
-    nns_logd ("[Query-hybrid] There is no data to receive from broker.");
-    return;
-  }
-
-  size = msg_len - sizeof (msg->type);
-  payload = (gchar *) g_malloc0 (size + 1);
-  memcpy (payload, msg->payload, size);
-  payload[size] = '\0';
-
-  nns_logd ("[Query-hybrid] Received topic: %s (size: %d)\n", topic, msg_len);
-  nns_logd ("[Query-hybrid] payload: %s", payload);
-
-  _broker_parse_message (info, payload);
-  g_free (payload);
-}
-
-/**
- * @brief Connect to broker.
- */
-static gboolean
-_broker_connect (query_hybrid_info_s * info)
-{
-  gchar *host, *port;
-  gint err;
-
-  host = g_strdup (info->broker.host);
-  port = g_strdup_printf ("%u", info->broker.port);
-
-  err = query_open_connection (&info->handle, host, port,
-      _broker_state_change_cb, info);
-  if (err == 0) {
-    /* Wait until connection is established. */
-    while (MQTT_CONNECTED != info->state) {
-      g_usleep (10000);
-    }
-  }
-
-  g_free (host);
-  g_free (port);
-
-  if (err != 0) {
-    nns_loge ("[Query-hybrid] Failed to connect broker (error: %d).", err);
-    return FALSE;
-  }
-
-  return TRUE;
-}
-
-/**
- * @brief Initialize query-hybrid info.
- */
-void
-tensor_query_hybrid_init (query_hybrid_info_s * info,
-    const gchar * host, const guint16 port, gboolean is_server)
-{
-  g_return_if_fail (info != NULL);
-
-  memset (info, 0, sizeof (query_hybrid_info_s));
-  info->node.host = g_strdup (host);
-  info->node.port = port;
-  info->is_server = is_server;
-  info->state = MQTT_DISCONNECTED;
-
-  if (!is_server) {
-    info->server_list = g_async_queue_new ();
-  }
-}
-
-/**
- * @brief Close connection and free query-hybrid info.
- */
-void
-tensor_query_hybrid_close (query_hybrid_info_s * info)
-{
-  g_return_if_fail (info != NULL);
-
-  if (info->handle) {
-    if (info->topic) {
-      query_clear_retained_topic (info->handle, info->topic);
-      g_free (info->topic);
-      info->topic = NULL;
-    }
-
-    if (0 != query_close_connection (info->handle)) {
-      nns_loge ("[Query-hybrid] Failed to close broker connection.");
-    }
-
-    info->handle = NULL;
-  }
-
-  if (info->server_list) {
-    gpointer data;
-
-    while ((data = g_async_queue_try_pop (info->server_list))) {
-      tensor_query_hybrid_free_server_info (data);
-    }
-
-    g_async_queue_unref (info->server_list);
-    info->server_list = NULL;
-  }
-
-  _free_node_info (&info->broker);
-  _free_node_info (&info->node);
-}
-
-/**
- * @brief Set current node info.
- */
-void
-tensor_query_hybrid_set_node (query_hybrid_info_s * info, const gchar * host,
-    const guint16 port, query_server_info_handle server_info_h)
-{
-  g_return_if_fail (info != NULL);
-
-  _free_node_info (&info->node);
-  info->node.host = g_strdup (host);
-  info->node.port = port;
-  info->node.server_info_h = server_info_h;
-}
-
-/**
- * @brief Set broker info.
- */
-void
-tensor_query_hybrid_set_broker (query_hybrid_info_s * info,
-    const gchar * host, const guint16 port)
-{
-  g_return_if_fail (info != NULL);
-
-  _free_node_info (&info->broker);
-  info->broker.host = g_strdup (host);
-  info->broker.port = port;
-}
-
-/**
- * @brief Get server info from node list.
- * @return Server node info. Caller should release server info using tensor_query_hybrid_free_server_info().
- */
-query_server_info_s *
-tensor_query_hybrid_get_server_info (query_hybrid_info_s * info)
-{
-  query_server_info_s *server = NULL;
-
-  g_return_val_if_fail (info != NULL, FALSE);
-
-  if (info->is_server) {
-    nns_logw ("[Query-hybrid] It is server node, cannot get the server info.");
-  } else {
-    /**
-     * @todo Need to update server selection policy. Now, use first received info.
-     */
-    server = (query_server_info_s *) g_async_queue_try_pop (info->server_list);
-  }
-
-  return server;
-}
-
-/**
- * @brief Free server info.
- */
-void
-tensor_query_hybrid_free_server_info (query_server_info_s * server)
-{
-  if (server) {
-    _free_server_info (server);
-    g_free (server);
-  }
-}
-
-/**
- * @brief Connect to broker and publish topic.
- */
-gboolean
-tensor_query_hybrid_publish (query_hybrid_info_s * info,
-    const gchar * operation)
-{
-  gchar *device, *topic, *msg;
-  gchar *sink_host = NULL;
-  guint16 sink_port;
-  gint err;
-
-  g_return_val_if_fail (info != NULL, FALSE);
-  g_return_val_if_fail (operation != NULL, FALSE);
-
-  if (!info->is_server) {
-    nns_logw ("[Query-hybrid] It is client node, cannot publish topic.");
-    return FALSE;
-  }
-
-  if (!info->node.host || info->node.port == 0) {
-    nns_loge ("[Query-hybrid] Invalid node info, cannot publish topic.");
-    return FALSE;
-  }
-
-  if (!_broker_connect (info)) {
-    nns_loge ("[Query-hybrid] Failed to publish, connection failed.");
-    return FALSE;
-  }
-
-  /**
-   * @todo Set unique device name.
-   * Device name should be unique. Consider using MAC address later.
-   * Now, use IP and port number temporarily.
-   */
-  device = g_strdup_printf ("device-%s-%u", info->node.host, info->node.port);
-  topic = g_strdup_printf ("edge/inference/%s/%s/", device, operation);
-  nns_logd ("[Query-hybrid] Query server topic: %s", topic);
-  g_free (device);
-
-  sink_host = gst_tensor_query_server_get_sink_host (info->node.server_info_h);
-  if (!sink_host) {
-    nns_logw
-        ("[Query-hybrid] Sink host is not given. Use default host (localhost).");
-    sink_host = g_strdup ("localhost");
-  }
-  sink_port = gst_tensor_query_server_get_sink_port (info->node.server_info_h);
-  if (0 == sink_port) {
-    nns_logw
-        ("[Query-hybrid] Sink port is not given. Use default port (3000).");
-    sink_port = 3000;
-  }
-
-  msg = g_strdup_printf ("%s/%u/%s/%u", info->node.host, info->node.port,
-      sink_host, sink_port);
-  g_free (sink_host);
-  nns_logd ("[Query-hybrid] Query server source msg: %s", msg);
-
-  err = query_publish_raw_data (info->handle, topic, msg, strlen (msg), TRUE);
-  g_free (msg);
-
-  if (err != 0) {
-    nns_loge ("[Query-hybrid] Failed to publish raw data (error: %d).", err);
-    g_free (topic);
-    return FALSE;
-  }
-
-  info->topic = topic;
-  return TRUE;
-}
-
-/**
- * @brief Connect to broker and subcribe topic.
- */
-gboolean
-tensor_query_hybrid_subscribe (query_hybrid_info_s * info,
-    const gchar * operation)
-{
-  gchar *topic;
-  gint err;
-
-  g_return_val_if_fail (info != NULL, FALSE);
-  g_return_val_if_fail (operation != NULL, FALSE);
-
-  if (info->is_server) {
-    nns_logw ("[Query-hybrid] It is server node, cannot subcribe topic.");
-    return FALSE;
-  }
-
-  if (!_broker_connect (info)) {
-    nns_loge ("[Query-hybrid] Failed to subscribe, connection failed.");
-    return FALSE;
-  }
-
-  topic = g_strdup_printf ("edge/inference/+/%s/#", operation);
-  err = query_subscribe_topic (info->handle, topic,
-      _broker_msg_received_cb, info);
-  g_free (topic);
-
-  if (err != 0) {
-    nns_loge ("[Query-hybrid] Failed to subscribe topic (error: %d).", err);
-    return FALSE;
-  }
-
-  return TRUE;
-}
-#endif /* ENABLE_QUERY_HYBRID */
diff --git a/gst/nnstreamer/tensor_query/tensor_query_hybrid.h b/gst/nnstreamer/tensor_query/tensor_query_hybrid.h
deleted file mode 100644 (file)
index 723b1b8..0000000
+++ /dev/null
@@ -1,120 +0,0 @@
-/* SPDX-License-Identifier: LGPL-2.1-only */
-/**
- * Copyright (C) 2021 Gichan Jang <gichan2.jang@samsung.com>
- *
- * @file   tensor_query_hybrid.h
- * @date   12 Oct 2021
- * @brief  Utility functions for tensor-query hybrid feature
- * @see    https://github.com/nnstreamer/nnstreamer
- * @author Gichan Jang <gichan2.jang@samsung.com>
- * @bug    No known bugs except for NYI items
- */
-
-#ifndef __TENSOR_QUERY_HYBRID_H__
-#define __TENSOR_QUERY_HYBRID_H__
-
-#include <glib.h>
-#include "tensor_query_server.h"
-
-#define DEFAULT_BROKER_HOST "tcp://localhost"
-#define DEFAULT_BROKER_PORT 1883
-
-G_BEGIN_DECLS
-
-/**
- * @brief Data structure for node info.
- */
-typedef struct
-{
-  gchar *host;
-  guint16 port;
-  query_server_info_handle server_info_h;
-} query_node_info_s;
-
-/**
- * @brief Data structure for server info.
- */
-typedef struct
-{
-  query_node_info_s src;
-  query_node_info_s sink;
-} query_server_info_s;
-
-/**
- * @brief Data structure for query-hybrid feature.
- */
-typedef struct
-{
-  query_node_info_s node;
-  query_node_info_s broker;
-  gboolean is_server;
-  GAsyncQueue *server_list;
-  gpointer handle;
-  gint state;
-  gchar *topic;
-} query_hybrid_info_s;
-
-#if defined(ENABLE_QUERY_HYBRID)
-/**
- * @brief Initialize query-hybrid info.
- */
-extern void
-tensor_query_hybrid_init (query_hybrid_info_s * info, const gchar * host, const guint16 port, gboolean is_server);
-
-/**
- * @brief Close connection and free query-hybrid info.
- */
-extern void
-tensor_query_hybrid_close (query_hybrid_info_s * info);
-
-/**
- * @brief Set current node info.
- */
-extern void
-tensor_query_hybrid_set_node (query_hybrid_info_s * info, const gchar * host, const guint16 port, query_server_info_handle server_info_h);
-
-/**
- * @brief Set broker info.
- */
-extern void
-tensor_query_hybrid_set_broker (query_hybrid_info_s * info, const gchar * host, const guint16 port);
-
-/**
- * @brief Get server info from node list.
- * @return Server node info. Caller should release server info using tensor_query_hybrid_free_server_info().
- */
-extern query_server_info_s *
-tensor_query_hybrid_get_server_info (query_hybrid_info_s * info);
-
-/**
- * @brief Free server info.
- */
-extern void
-tensor_query_hybrid_free_server_info (query_server_info_s * server);
-
-/**
- * @brief Connect to broker and publish topic.
- * @note Before calling this function, user should set broker info using tensor_query_hybrid_set_broker().
- */
-extern gboolean
-tensor_query_hybrid_publish (query_hybrid_info_s * info, const gchar * operation);
-
-/**
- * @brief Connect to broker and subcribe topic.
- * @note Before calling this function, user should set broker info using tensor_query_hybrid_set_broker().
- */
-extern gboolean
-tensor_query_hybrid_subscribe (query_hybrid_info_s * info, const gchar * operation);
-#else
-#define tensor_query_hybrid_init(...)
-#define tensor_query_hybrid_close(...)
-#define tensor_query_hybrid_set_node(...)
-#define tensor_query_hybrid_set_broker(...)
-#define tensor_query_hybrid_get_server_info(...) NULL
-#define tensor_query_hybrid_free_server_info(...)
-#define tensor_query_hybrid_publish(...) FALSE
-#define tensor_query_hybrid_subscribe(...) FALSE
-#endif /* ENABLE_QUERY_HYBRID */
-
-G_END_DECLS
-#endif /* __TENSOR_QUERY_HYBRID_H__ */
index df2fd4a..d006e0a 100644 (file)
@@ -17,7 +17,6 @@
 #include "tensor_query_server.h"
 #include <tensor_typedef.h>
 #include <tensor_common.h>
-#include "nnstreamer-edge.h"
 
 /**
  * @brief mutex for tensor-query server table.
@@ -51,10 +50,11 @@ gst_tensor_query_server_get_handle (char *id)
  * @brief Add nnstreamer edge server handle into hash table.
  */
 edge_server_handle
-gst_tensor_query_server_add_data (char *id)
+gst_tensor_query_server_add_data (char *id, const gchar * topic)
 {
   GstTensorQueryServer *data = NULL;
   int ret;
+  gchar *_topic = NULL;
 
   data = gst_tensor_query_server_get_handle (id);
 
@@ -73,8 +73,13 @@ gst_tensor_query_server_add_data (char *id)
   data->id = id;
   data->configured = FALSE;
 
-  ret = nns_edge_create_handle (id, "TEMP_SERVER_TOPIC", &data->edge_h);
-  if (ret != NNS_EDGE_ERROR_NONE) {
+  if (topic)
+    _topic = g_strdup (topic);
+  else
+    _topic = g_strdup ("TCP_DIRECT");
+
+  ret = nns_edge_create_handle (id, _topic, &data->edge_h);
+  if (NNS_EDGE_ERROR_NONE != ret) {
     GST_ERROR ("Failed to get nnstreamer edge handle.");
     gst_tensor_query_server_remove_data (data);
     return NULL;
@@ -84,6 +89,7 @@ gst_tensor_query_server_add_data (char *id)
   g_hash_table_insert (_qs_table, g_strdup (id), data);
   G_UNLOCK (query_server_table);
 
+  g_free (_topic);
   return data;
 }
 
index 492fde6..bfd11e1 100644 (file)
@@ -22,7 +22,6 @@ G_BEGIN_DECLS
 
 #define DEFAULT_SERVER_ID 0
 #define DEFAULT_QUERY_INFO_TIMEOUT 5
-typedef void * query_server_info_handle;
 typedef void * edge_server_handle;
 
 /**
@@ -48,7 +47,7 @@ gst_tensor_query_server_get_handle (char *id);
  * @brief Add GstTensorQueryServer.
  */
 edge_server_handle
-gst_tensor_query_server_add_data (char *id);
+gst_tensor_query_server_add_data (char *id, const gchar *topic);
 
 /**
  * @brief Remove GstTensorQueryServer.
index 9c97f05..c76c194 100644 (file)
@@ -203,7 +203,7 @@ gst_tensor_query_serversink_start (GstBaseSink * bsink)
   gchar *id_str = NULL;
 
   id_str = g_strdup_printf ("%u", sink->sink_id);
-  sink->server_h = gst_tensor_query_server_add_data (id_str);
+  sink->server_h = gst_tensor_query_server_add_data (id_str, NULL);
   g_free (id_str);
 
   caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (bsink));
index b88da3b..de6722e 100644 (file)
@@ -45,7 +45,6 @@ struct _GstTensorQueryServerSink
   guint sink_id;
 
   guint timeout;
-  query_server_info_handle server_info_h;
   gint metaless_frame_limit;
   gint metaless_frame_count;
 
index ff30f6a..05b1374 100644 (file)
@@ -17,7 +17,6 @@
 #include <tensor_typedef.h>
 #include <tensor_common.h>
 #include "tensor_query_serversrc.h"
-#include "tensor_query_server.h"
 #include "tensor_query_common.h"
 #include "nnstreamer_util.h"
 
@@ -44,7 +43,7 @@ enum
   PROP_PORT,
   PROP_PROTOCOL,
   PROP_TIMEOUT,
-  PROP_OPERATION,
+  PROP_TOPIC,
   PROP_ID,
   PROP_IS_LIVE
 };
@@ -100,7 +99,7 @@ gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass)
           "The timeout as seconds to maintain connection", 0,
           3600, QUERY_DEFAULT_TIMEOUT_SEC,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_OPERATION,
+  g_object_class_install_property (gobject_class, PROP_TOPIC,
       g_param_spec_string ("topic", "Topic",
           "The main topic of the host and option if necessary. "
           "(topic)/(optional topic for main topic).",
@@ -141,10 +140,7 @@ gst_tensor_query_serversrc_init (GstTensorQueryServerSrc * src)
   src->protocol = DEFAULT_PROTOCOL;
   src->timeout = QUERY_DEFAULT_TIMEOUT_SEC;
   src->topic = NULL;
-  src->srv_host = g_strdup (DEFAULT_HOST);
-  src->srv_port = DEFAULT_PORT_SRC;
   src->src_id = DEFAULT_SERVER_ID;
-  tensor_query_hybrid_init (&src->hybrid_info, NULL, 0, TRUE);
   src->configured = FALSE;
   src->msg_queue = g_async_queue_new ();
 
@@ -168,8 +164,6 @@ gst_tensor_query_serversrc_finalize (GObject * object)
   src->host = NULL;
   g_free (src->topic);
   src->topic = NULL;
-  g_free (src->srv_host);
-  src->srv_host = NULL;
 
   while ((data_h = g_async_queue_try_pop (src->msg_queue))) {
     nns_edge_data_destroy (data_h);
@@ -206,13 +200,15 @@ gst_tensor_query_serversrc_set_property (GObject * object, guint prop_id,
     case PROP_TIMEOUT:
       serversrc->timeout = g_value_get_uint (value);
       break;
-    case PROP_OPERATION:
+    case PROP_TOPIC:
       if (!g_value_get_string (value)) {
         nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
         break;
       }
       g_free (serversrc->topic);
       serversrc->topic = g_value_dup_string (value);
+      if (NNS_EDGE_PROTOCOL_TCP == serversrc->protocol)
+        serversrc->protocol = NNS_EDGE_PROTOCOL_MQTT;
       break;
     case PROP_ID:
       serversrc->src_id = g_value_get_uint (value);
@@ -249,7 +245,7 @@ gst_tensor_query_serversrc_get_property (GObject * object, guint prop_id,
     case PROP_TIMEOUT:
       g_value_set_uint (value, serversrc->timeout);
       break;
-    case PROP_OPERATION:
+    case PROP_TOPIC:
       g_value_set_string (value, serversrc->topic);
       break;
     case PROP_ID:
@@ -303,11 +299,10 @@ static gboolean
 gst_tensor_query_serversrc_start (GstBaseSrc * bsrc)
 {
   GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
-  char *id_str = NULL;
-  char *port = NULL;
+  char *id_str = NULL, *port = NULL, *protocol_str = NULL;
 
   id_str = g_strdup_printf ("%d", src->src_id);
-  src->server_h = gst_tensor_query_server_add_data (id_str);
+  src->server_h = gst_tensor_query_server_add_data (id_str, src->topic);
   g_free (id_str);
 
   src->edge_h = gst_tensor_query_server_get_edge_handle (src->server_h);
@@ -317,24 +312,13 @@ gst_tensor_query_serversrc_start (GstBaseSrc * bsrc)
   g_free (port);
 
   /** Publish query sever connection info */
-  if (src->topic) {
+  if (src->topic)
     nns_edge_set_info (src->edge_h, "TOPIC", src->topic);
-    tensor_query_hybrid_set_node (&src->hybrid_info, src->srv_host,
-        src->srv_port, src->server_info_h);
-    tensor_query_hybrid_set_broker (&src->hybrid_info, src->host, src->port);
 
-    if (!tensor_query_hybrid_publish (&src->hybrid_info, src->topic)) {
-      nns_loge ("Failed to publish a topic.");
-      return FALSE;
-    }
-  } else {
-    g_free (src->srv_host);
-    src->srv_host = g_strdup (src->host);
-    src->srv_port = src->port;
-    nns_logi ("Query-hybrid feature is disabled.");
-    nns_logi
-        ("Specify topic to register server to broker (e.g., topic=object_detection/mobilev3).");
-  }
+  protocol_str = g_strdup_printf ("%d", src->protocol);
+  nns_edge_set_info (src->edge_h, "PROTOCOL", protocol_str);
+  g_free (protocol_str);
+
   nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src);
 
   if (0 != nns_edge_start (src->edge_h, true)) {
index 2e703ec..3d5d125 100644 (file)
@@ -16,7 +16,7 @@
 #include <gst/base/gstbasesrc.h>
 #include <gst/base/gstpushsrc.h>
 #include <tensor_meta.h>
-#include "tensor_query_hybrid.h"
+#include "tensor_query_server.h"
 
 G_BEGIN_DECLS
 
@@ -46,15 +46,10 @@ struct _GstTensorQueryServerSrc
 
   guint16 port;
   gchar *host;
-  guint16 srv_port;
-  gchar *srv_host;
   guint timeout;
 
   /* Query-hybrid feature */
   gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */
-  query_hybrid_info_s hybrid_info;
-
-  query_server_info_handle server_info_h;
 
   nns_edge_protocol_e protocol;
   edge_server_handle server_h;
index c541966..dd492d4 100644 (file)
@@ -72,7 +72,6 @@ NNSTREAMER_PLUGINS_SRCS := \
 # tensor-query element with nnstreamer-edge
 NNSTREAMER_QUERY_SRCS := \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_common.c \
-    $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_hybrid.c \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_client.c \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversink.c \
     $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversrc.c \
index 1e4a79a..29ac6da 100644 (file)
@@ -413,10 +413,6 @@ features = {
     'target': 'tvm_runtime',
     'project_args': { 'ENABLE_TVM' : 1 }
   },
-  'query-hybrid-support': {
-    'target': 'nnsquery',
-    'project_args': { 'ENABLE_QUERY_HYBRID': 1 }
-  },
   'trix-engine-support': {
     'target': 'npu-engine',
     'project_args': { 'ENABLE_TRIX_ENGINE' : 1 }
index eb877d7..907e797 100644 (file)
@@ -132,13 +132,6 @@ _callCompareTest raw8_2.log result8_2.log 8-5 "Compare 8-5" 1 0
 kill -9 $pid &> /dev/null
 wait $pid
 
-# TODO enable query-hybrid test
-# Now nnsquery library is not available.
-# After publishing the nnsquery pkg, enable below testcases.
-# rm *.log
-report
-exit
-
 # Check whether mqtt broker is running or not
 pid=`ps aux | grep mosquitto | grep -v grep | awk '{print $2}'`
 if [ $pid > 0 ]; then
@@ -151,27 +144,18 @@ else
 fi
 
 # Test Query-hybrid. Get server info from broker.
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} tensor_query_serversrc num-buffers=3 operation=passthrough ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location=server1_%1d.log t. ! queue ! tensor_query_serversink" 4-1 0 0 $PERFORMANCE $TIMEOUT_SEC &
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} tensor_query_serversrc num-buffers=3 operation=passthrough port=5000 ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location=server2_%1d.log t. ! queue ! tensor_query_serversink port=5001" 4-2 0 0 $PERFORMANCE $TIMEOUT_SEC &
-sleep $SLEEPTIME_SEC
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} videotestsrc is-live=true num-buffers=7 ! videoconvert ! videoscale ! video/x-raw,width=300,height=300,format=RGB ! tensor_converter ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location= raw4_%1d.log t. ! queue ! tensor_query_client operation=passthrough ! multifilesink location=result4_%1d.log" 4-3 0 0 $PERFORMANCE
-_callCompareTest raw4_0.log result4_0.log 4-4 "Compare 4-4" 1 0
-_callCompareTest raw4_1.log result4_1.log 4-5 "Compare 4-5" 1 0
-_callCompareTest raw4_2.log result4_2.log 4-6 "Compare 4-6" 1 0
-# Server 1 is stopped and lost the fourth buffer.
-_callCompareTest raw4_4.log result4_3.log 4-7 "Compare 4-7" 1 0
-_callCompareTest raw4_5.log result4_4.log 4-8 "Compare 4-8" 1 0
-_callCompareTest raw4_6.log result4_5.log 4-9 "Compare 4-9" 1 0
-
-# Compare the results of the server and the client.
-_callCompareTest server1_0.log result4_0.log 4-10 "Compare 4-10" 1 0
-_callCompareTest server1_1.log result4_1.log 4-11 "Compare 4-11" 1 0
-_callCompareTest server1_2.log result4_2.log 4-12 "Compare 4-12" 1 0
-_callCompareTest server2_0.log result4_3.log 4-13 "Compare 4-13" 1 0
-_callCompareTest server2_1.log result4_4.log 4-14 "Compare 4-14" 1 0
-_callCompareTest server2_2.log result4_5.log 4-15 "Compare 4-15" 1 0
-
-sleep $SLEEPTIME_SEC
+gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} tensor_query_serversrc id=12345 host=tcp://localhost port=1883 topic=passthrough ! other/tensors,format=flexible,framerate=0/1 ! tee name = t t. ! queue ! multifilesink location=server1_%1d.log t. ! queue ! tensor_query_serversink id=12345 async=false" 4-1 0 0 5
+gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} videotestsrc num-buffers=50 is-live=true ! videoconvert ! videoscale ! video/x-raw,width=300,height=300,format=RGB ! tensor_converter ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location= raw4_%1d.log t. ! queue ! tensor_query_client  host=tcp://localhost port=1883 topic=passthrough ! multifilesink location=result4_%1d.log" 4-3 0 0 $PERFORMANCE
+_callCompareTest raw4_0.log result4_0.log 4-4 "Compare the raw file and client received file 4-4" 1 0
+_callCompareTest raw4_1.log result4_1.log 4-5 "Compare the raw file and client received file 4-5" 1 0
+_callCompareTest raw4_2.log result4_2.log 4-6 "Compare the raw file and client received file 4-6" 1 0
+_callCompareTest server1_0.log result4_0.log 4-7 "Compare the server 1 received file and client received file 4-7" 1 0
+_callCompareTest server1_1.log result4_1.log 4-8 "Compare the server 1 received file and client received file 4-8" 1 0
+_callCompareTest server1_2.log result4_2.log 4-9 "Compare the server 1 received file and client received file 4-9" 1 0
+
+kill -9 $pid &> /dev/null
+wait $pid
+
 rm *.log
 
 report