Implement mqtt-hybrid using nnstreamer-edge lib.
Signed-off-by: gichan <gichan2.jang@samsung.com>
subdir(p)
endforeach
-if query_hybrid_support_is_available
- nnstreamer_deps += query_hybrid_support_deps
-endif
-
subdir('include')
# Private header for sub-plugins and native APIs
'tensor_query_server.c',
]
-if query_hybrid_support_is_available
- tensor_query_sources += 'tensor_query_hybrid.c'
-endif
-
foreach s : tensor_query_sources
nnstreamer_sources += join_paths(meson.current_source_dir(), s)
endforeach
#include <gio/gio.h>
#include <glib.h>
#include <string.h>
-#include "nnstreamer-edge.h"
#include "tensor_query_common.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
-#include <arpa/inet.h>
-#include <ifaddrs.h>
/**
* @brief Macro for debug mode.
PROP_HOST,
PROP_PORT,
PROP_PROTOCOL,
- PROP_OPERATION,
+ PROP_TOPIC,
PROP_SILENT,
};
"The network protocol to establish connections between client and server.",
GST_TYPE_QUERY_PROTOCOL, DEFAULT_PROTOCOL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_OPERATION,
- g_param_spec_string ("operation", "Operation",
- "The main operation of the host.",
+ g_object_class_install_property (gobject_class, PROP_TOPIC,
+ g_param_spec_string ("topic", "Topic",
+ "The main topic of the host.",
"", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
}
/**
- * @brief Get IP address
- */
-static gchar *
-_get_ip_address (void)
-{
- struct ifaddrs *addrs, *run;
- gchar *ret = NULL;
-
- if (0 != getifaddrs (&addrs))
- goto done;
- run = addrs;
-
- while (run) {
- if (run->ifa_addr && run->ifa_addr->sa_family == AF_INET) {
- struct sockaddr_in *pAddr = (struct sockaddr_in *) run->ifa_addr;
-
- if (NULL != strstr (run->ifa_name, "en") ||
- NULL != strstr (run->ifa_name, "et")) {
- g_free (ret);
- ret = g_strdup (inet_ntoa (pAddr->sin_addr));
- }
- }
- run = run->ifa_next;
- }
- freeifaddrs (addrs);
-
-done:
- if (NULL == ret)
- ret = g_strdup ("localhost");
-
- return ret;
-}
-
-/**
* @brief initialize the new element
*/
static void
self->protocol = DEFAULT_PROTOCOL;
self->host = g_strdup (TCP_DEFAULT_HOST);
self->port = TCP_DEFAULT_SRC_PORT;
- self->srv_host = g_strdup (TCP_DEFAULT_HOST);
- self->srv_port = TCP_DEFAULT_SRC_PORT;
- self->operation = NULL;
+ self->topic = NULL;
self->in_caps_str = NULL;
self->msg_queue = g_async_queue_new ();
-
- tensor_query_hybrid_init (&self->hybrid_info, NULL, 0, FALSE);
- nns_edge_create_handle ("TEMP_ID", "TEMP_CLIENT_TOPIC", &self->edge_h);
}
/**
g_free (self->host);
self->host = NULL;
- g_free (self->srv_host);
- self->srv_host = NULL;
- g_free (self->operation);
- self->operation = NULL;
+ g_free (self->topic);
+ self->topic = NULL;
g_free (self->in_caps_str);
self->in_caps_str = NULL;
while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
nns_edge_data_destroy (data_h);
}
- g_async_queue_unref (self->msg_queue);
- tensor_query_hybrid_close (&self->hybrid_info);
+ if (self->msg_queue) {
+ g_async_queue_unref (self->msg_queue);
+ self->msg_queue = NULL;
+ }
+
if (self->edge_h) {
nns_edge_release_handle (self->edge_h);
self->edge_h = NULL;
self->port = g_value_get_uint (value);
break;
case PROP_PROTOCOL:
- {
- /** @todo expand when other protocols are ready */
- nns_edge_protocol_e protocol = g_value_get_enum (value);
- if (protocol == NNS_EDGE_PROTOCOL_TCP)
- self->protocol = protocol;
- }
+ self->protocol = g_value_get_enum (value);
break;
- case PROP_OPERATION:
+ case PROP_TOPIC:
if (!g_value_get_string (value)) {
- nns_logw
- ("Operation property cannot be NULL. Query-hybrid is disabled.");
+ nns_logw ("Topic property cannot be NULL. Query-hybrid is disabled.");
break;
}
- g_free (self->operation);
- self->operation = g_value_dup_string (value);
+ g_free (self->topic);
+ self->topic = g_value_dup_string (value);
+ if (NNS_EDGE_PROTOCOL_TCP == self->protocol)
+ self->protocol = NNS_EDGE_PROTOCOL_MQTT;
break;
case PROP_SILENT:
self->silent = g_value_get_boolean (value);
case PROP_PROTOCOL:
g_value_set_enum (value, self->protocol);
break;
- case PROP_OPERATION:
- g_value_set_string (value, self->operation);
- break;
+ case PROP_TOPIC:
+ g_value_set_string (value, self->topic);
break;
case PROP_SILENT:
g_value_set_boolean (value, self->silent);
}
/**
- * @brief Copy server info.
- */
-static void
-_copy_srv_info (GstTensorQueryClient * self, query_server_info_s * server)
-{
- g_free (self->srv_host);
- self->srv_host = g_strdup (server->src.host);
- self->srv_port = server->src.port;
-}
-
-/**
* @brief Retry to connect to available server.
*/
static gboolean
_client_retry_connection (GstTensorQueryClient * self)
{
- gboolean ret = FALSE;
- query_server_info_s *server = NULL;
-
- g_return_val_if_fail (self->operation, FALSE);
- nns_logd ("Retry to connect to available server.");
-
- while ((server = tensor_query_hybrid_get_server_info (&self->hybrid_info))) {
- nns_edge_disconnect (self->edge_h);
-
- _copy_srv_info (self, server);
- tensor_query_hybrid_free_server_info (server);
+ if (NNS_EDGE_ERROR_NONE != nns_edge_disconnect (self->edge_h)) {
+ nns_loge ("Failed to retry connection, disconnection failure");
+ return FALSE;
+ }
- if (nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP, self->srv_host,
- self->srv_port)) {
- nns_logi ("Connected to new server: %s:%u.", self->srv_host,
- self->srv_port);
- ret = TRUE;
- break;
- }
+ if (NNS_EDGE_ERROR_NONE != nns_edge_connect (self->edge_h,
+ self->protocol, self->host, self->port)) {
+ nns_loge ("Failed to retry connection, connection failure");
+ return FALSE;
}
- return ret;
+ return TRUE;
}
/**
static gchar *
_nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
{
- gchar **strv = g_strsplit (caps_str, "@", -1);
- gint num = g_strv_length (strv), i;
+ gchar **strv;
+ gint num, i;
gchar *find_key = NULL;
gchar *ret_str = NULL;
+ if (!caps_str)
+ return NULL;
+
+ strv = g_strsplit (caps_str, "@", -1);
+ num = g_strv_length (strv);
+
find_key =
is_src ==
TRUE ? g_strdup ("query_server_src_caps") :
{
GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
gboolean ret = TRUE;
- gchar *ip, *prev_caps_str, *new_caps_str;
+ gchar *prev_caps_str, *new_caps_str;
GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
case GST_EVENT_CAPS:
{
GstCaps *caps;
- gst_event_parse_caps (event, &caps);
+ gchar *_topic = NULL, *protocol_str = NULL;
- /** Subscribe server info from broker */
- if (self->operation) {
- query_server_info_s *server;
-
- tensor_query_hybrid_set_broker (&self->hybrid_info,
- self->host, self->port);
-
- if (!tensor_query_hybrid_subscribe (&self->hybrid_info,
- self->operation)) {
- nns_loge ("Failed to subscribe a topic.");
- gst_event_unref (event);
- return FALSE;
- }
+ if (self->topic)
+ _topic = g_strdup (self->topic);
+ else
+ _topic = g_strdup ("TCP_DIRECT");
- server = tensor_query_hybrid_get_server_info (&self->hybrid_info);
- if (server) {
- _copy_srv_info (self, server);
- tensor_query_hybrid_free_server_info (server);
- }
- } else {
- nns_logi ("Query-hybrid feature is disabled.");
- nns_logi
- ("Specify operation to subscribe to the available broker (e.g., operation=object_detection).");
- g_free (self->srv_host);
- self->srv_host = g_strdup (self->host);
- self->srv_port = self->port;
- }
+ nns_edge_create_handle ("TEMP_ID", _topic, &self->edge_h);
+ g_free (_topic);
+ gst_event_parse_caps (event, &caps);
g_free (self->in_caps_str);
self->in_caps_str = gst_caps_to_string (caps);
nns_edge_get_info (self->edge_h, "CAPS", &prev_caps_str);
nns_edge_set_info (self->edge_h, "CAPS", new_caps_str);
g_free (prev_caps_str);
g_free (new_caps_str);
+ protocol_str = g_strdup_printf ("%d", self->protocol);
+ nns_edge_set_info (self->edge_h, "PROTOCOL", protocol_str);
+ g_free (protocol_str);
- nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
- ip = _get_ip_address ();
- nns_edge_set_info (self->edge_h, "IP", ip);
- nns_edge_set_info (self->edge_h, "PORT", "0");
- g_free (ip);
+ nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
if (0 != nns_edge_start (self->edge_h, false)) {
nns_loge
return FALSE;
}
- if (0 != nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP,
- self->srv_host, self->srv_port)) {
+ if (0 != nns_edge_connect (self->edge_h, self->protocol,
+ self->host, self->port)) {
nns_loge ("Failed to connect to edge server!");
return FALSE;
}
goto done;
retry:
- if (!self->operation || !_client_retry_connection (self)) {
+ if (!self->topic || !_client_retry_connection (self)) {
nns_loge ("Failed to retry connection");
res = GST_FLOW_ERROR;
}
#include <gst/gst.h>
#include <gio/gio.h>
#include <tensor_common.h>
-#include "tensor_query_hybrid.h"
+#include "nnstreamer-edge.h"
G_BEGIN_DECLS
gchar *in_caps_str;
/* Query-hybrid feature */
- gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */
- query_hybrid_info_s hybrid_info;
+ gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */
gchar *host;
guint16 port;
+++ /dev/null
-/* SPDX-License-Identifier: LGPL-2.1-only */
-/**
- * Copyright (C) 2021 Gichan Jang <gichan2.jang@samsung.com>
- *
- * @file tensor_query_hybrid.c
- * @date 12 Oct 2021
- * @brief Utility functions for tensor-query hybrid feature
- * @see https://github.com/nnstreamer/nnstreamer
- * @author Gichan Jang <gichan2.jang@samsung.com>
- * @bug No known bugs except for NYI items
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <string.h>
-#include "tensor_query_hybrid.h"
-#include "nnstreamer_log.h"
-
-#if defined(ENABLE_QUERY_HYBRID)
-#include <nnsquery.h>
-
-/**
- * @brief Internal function to free node info.
- */
-static void
-_free_node_info (query_node_info_s * node)
-{
- if (node) {
- g_free (node->host);
- node->host = NULL;
- }
-}
-
-/**
- * @brief Internal function to free server info.
- */
-static void
-_free_server_info (query_server_info_s * server)
-{
- if (server) {
- _free_node_info (&server->src);
- _free_node_info (&server->sink);
- }
-}
-
-/**
- * @brief Parse received message.
- */
-static void
-_broker_parse_message (query_hybrid_info_s * info, gchar * payload)
-{
- gchar **splits;
- query_server_info_s *server;
-
- server = g_try_new0 (query_server_info_s, 1);
- if (!server) {
- nns_loge ("[Query-hybrid] Failed to allocate query server info.");
- return;
- }
-
- splits = g_strsplit (payload, "/", -1);
- server->src.host = g_strdup (splits[0]);
- server->src.port = g_ascii_strtoull (splits[1], NULL, 10);
- server->sink.host = g_strdup (splits[2]);
- server->sink.port = g_ascii_strtoull (splits[3], NULL, 10);
-
- nns_logd ("[Query-hybrid] Parsed info: src[%s:%u] sink[%s:%u]",
- server->src.host, server->src.port, server->sink.host, server->sink.port);
-
- g_async_queue_push (info->server_list, server);
- g_strfreev (splits);
-}
-
-/**
- * @brief State change callback.
- */
-static void
-_broker_state_change_cb (void *user_data, query_mqtt_state_t state)
-{
- query_hybrid_info_s *info = (query_hybrid_info_s *) user_data;
-
- info->state = (gint) state;
- nns_logd ("[Query-hybrid] Broker state changed to %d.", state);
-}
-
-/**
- * @brief Raw message received callback.
- */
-static void
-_broker_msg_received_cb (const gchar * topic, msg_data * msg, gint msg_len,
- void *user_data)
-{
- query_hybrid_info_s *info = (query_hybrid_info_s *) user_data;
- gchar *payload;
- gint size;
-
- if (msg_len <= 0) {
- nns_logd ("[Query-hybrid] There is no data to receive from broker.");
- return;
- }
-
- size = msg_len - sizeof (msg->type);
- payload = (gchar *) g_malloc0 (size + 1);
- memcpy (payload, msg->payload, size);
- payload[size] = '\0';
-
- nns_logd ("[Query-hybrid] Received topic: %s (size: %d)\n", topic, msg_len);
- nns_logd ("[Query-hybrid] payload: %s", payload);
-
- _broker_parse_message (info, payload);
- g_free (payload);
-}
-
-/**
- * @brief Connect to broker.
- */
-static gboolean
-_broker_connect (query_hybrid_info_s * info)
-{
- gchar *host, *port;
- gint err;
-
- host = g_strdup (info->broker.host);
- port = g_strdup_printf ("%u", info->broker.port);
-
- err = query_open_connection (&info->handle, host, port,
- _broker_state_change_cb, info);
- if (err == 0) {
- /* Wait until connection is established. */
- while (MQTT_CONNECTED != info->state) {
- g_usleep (10000);
- }
- }
-
- g_free (host);
- g_free (port);
-
- if (err != 0) {
- nns_loge ("[Query-hybrid] Failed to connect broker (error: %d).", err);
- return FALSE;
- }
-
- return TRUE;
-}
-
-/**
- * @brief Initialize query-hybrid info.
- */
-void
-tensor_query_hybrid_init (query_hybrid_info_s * info,
- const gchar * host, const guint16 port, gboolean is_server)
-{
- g_return_if_fail (info != NULL);
-
- memset (info, 0, sizeof (query_hybrid_info_s));
- info->node.host = g_strdup (host);
- info->node.port = port;
- info->is_server = is_server;
- info->state = MQTT_DISCONNECTED;
-
- if (!is_server) {
- info->server_list = g_async_queue_new ();
- }
-}
-
-/**
- * @brief Close connection and free query-hybrid info.
- */
-void
-tensor_query_hybrid_close (query_hybrid_info_s * info)
-{
- g_return_if_fail (info != NULL);
-
- if (info->handle) {
- if (info->topic) {
- query_clear_retained_topic (info->handle, info->topic);
- g_free (info->topic);
- info->topic = NULL;
- }
-
- if (0 != query_close_connection (info->handle)) {
- nns_loge ("[Query-hybrid] Failed to close broker connection.");
- }
-
- info->handle = NULL;
- }
-
- if (info->server_list) {
- gpointer data;
-
- while ((data = g_async_queue_try_pop (info->server_list))) {
- tensor_query_hybrid_free_server_info (data);
- }
-
- g_async_queue_unref (info->server_list);
- info->server_list = NULL;
- }
-
- _free_node_info (&info->broker);
- _free_node_info (&info->node);
-}
-
-/**
- * @brief Set current node info.
- */
-void
-tensor_query_hybrid_set_node (query_hybrid_info_s * info, const gchar * host,
- const guint16 port, query_server_info_handle server_info_h)
-{
- g_return_if_fail (info != NULL);
-
- _free_node_info (&info->node);
- info->node.host = g_strdup (host);
- info->node.port = port;
- info->node.server_info_h = server_info_h;
-}
-
-/**
- * @brief Set broker info.
- */
-void
-tensor_query_hybrid_set_broker (query_hybrid_info_s * info,
- const gchar * host, const guint16 port)
-{
- g_return_if_fail (info != NULL);
-
- _free_node_info (&info->broker);
- info->broker.host = g_strdup (host);
- info->broker.port = port;
-}
-
-/**
- * @brief Get server info from node list.
- * @return Server node info. Caller should release server info using tensor_query_hybrid_free_server_info().
- */
-query_server_info_s *
-tensor_query_hybrid_get_server_info (query_hybrid_info_s * info)
-{
- query_server_info_s *server = NULL;
-
- g_return_val_if_fail (info != NULL, FALSE);
-
- if (info->is_server) {
- nns_logw ("[Query-hybrid] It is server node, cannot get the server info.");
- } else {
- /**
- * @todo Need to update server selection policy. Now, use first received info.
- */
- server = (query_server_info_s *) g_async_queue_try_pop (info->server_list);
- }
-
- return server;
-}
-
-/**
- * @brief Free server info.
- */
-void
-tensor_query_hybrid_free_server_info (query_server_info_s * server)
-{
- if (server) {
- _free_server_info (server);
- g_free (server);
- }
-}
-
-/**
- * @brief Connect to broker and publish topic.
- */
-gboolean
-tensor_query_hybrid_publish (query_hybrid_info_s * info,
- const gchar * operation)
-{
- gchar *device, *topic, *msg;
- gchar *sink_host = NULL;
- guint16 sink_port;
- gint err;
-
- g_return_val_if_fail (info != NULL, FALSE);
- g_return_val_if_fail (operation != NULL, FALSE);
-
- if (!info->is_server) {
- nns_logw ("[Query-hybrid] It is client node, cannot publish topic.");
- return FALSE;
- }
-
- if (!info->node.host || info->node.port == 0) {
- nns_loge ("[Query-hybrid] Invalid node info, cannot publish topic.");
- return FALSE;
- }
-
- if (!_broker_connect (info)) {
- nns_loge ("[Query-hybrid] Failed to publish, connection failed.");
- return FALSE;
- }
-
- /**
- * @todo Set unique device name.
- * Device name should be unique. Consider using MAC address later.
- * Now, use IP and port number temporarily.
- */
- device = g_strdup_printf ("device-%s-%u", info->node.host, info->node.port);
- topic = g_strdup_printf ("edge/inference/%s/%s/", device, operation);
- nns_logd ("[Query-hybrid] Query server topic: %s", topic);
- g_free (device);
-
- sink_host = gst_tensor_query_server_get_sink_host (info->node.server_info_h);
- if (!sink_host) {
- nns_logw
- ("[Query-hybrid] Sink host is not given. Use default host (localhost).");
- sink_host = g_strdup ("localhost");
- }
- sink_port = gst_tensor_query_server_get_sink_port (info->node.server_info_h);
- if (0 == sink_port) {
- nns_logw
- ("[Query-hybrid] Sink port is not given. Use default port (3000).");
- sink_port = 3000;
- }
-
- msg = g_strdup_printf ("%s/%u/%s/%u", info->node.host, info->node.port,
- sink_host, sink_port);
- g_free (sink_host);
- nns_logd ("[Query-hybrid] Query server source msg: %s", msg);
-
- err = query_publish_raw_data (info->handle, topic, msg, strlen (msg), TRUE);
- g_free (msg);
-
- if (err != 0) {
- nns_loge ("[Query-hybrid] Failed to publish raw data (error: %d).", err);
- g_free (topic);
- return FALSE;
- }
-
- info->topic = topic;
- return TRUE;
-}
-
-/**
- * @brief Connect to broker and subcribe topic.
- */
-gboolean
-tensor_query_hybrid_subscribe (query_hybrid_info_s * info,
- const gchar * operation)
-{
- gchar *topic;
- gint err;
-
- g_return_val_if_fail (info != NULL, FALSE);
- g_return_val_if_fail (operation != NULL, FALSE);
-
- if (info->is_server) {
- nns_logw ("[Query-hybrid] It is server node, cannot subcribe topic.");
- return FALSE;
- }
-
- if (!_broker_connect (info)) {
- nns_loge ("[Query-hybrid] Failed to subscribe, connection failed.");
- return FALSE;
- }
-
- topic = g_strdup_printf ("edge/inference/+/%s/#", operation);
- err = query_subscribe_topic (info->handle, topic,
- _broker_msg_received_cb, info);
- g_free (topic);
-
- if (err != 0) {
- nns_loge ("[Query-hybrid] Failed to subscribe topic (error: %d).", err);
- return FALSE;
- }
-
- return TRUE;
-}
-#endif /* ENABLE_QUERY_HYBRID */
+++ /dev/null
-/* SPDX-License-Identifier: LGPL-2.1-only */
-/**
- * Copyright (C) 2021 Gichan Jang <gichan2.jang@samsung.com>
- *
- * @file tensor_query_hybrid.h
- * @date 12 Oct 2021
- * @brief Utility functions for tensor-query hybrid feature
- * @see https://github.com/nnstreamer/nnstreamer
- * @author Gichan Jang <gichan2.jang@samsung.com>
- * @bug No known bugs except for NYI items
- */
-
-#ifndef __TENSOR_QUERY_HYBRID_H__
-#define __TENSOR_QUERY_HYBRID_H__
-
-#include <glib.h>
-#include "tensor_query_server.h"
-
-#define DEFAULT_BROKER_HOST "tcp://localhost"
-#define DEFAULT_BROKER_PORT 1883
-
-G_BEGIN_DECLS
-
-/**
- * @brief Data structure for node info.
- */
-typedef struct
-{
- gchar *host;
- guint16 port;
- query_server_info_handle server_info_h;
-} query_node_info_s;
-
-/**
- * @brief Data structure for server info.
- */
-typedef struct
-{
- query_node_info_s src;
- query_node_info_s sink;
-} query_server_info_s;
-
-/**
- * @brief Data structure for query-hybrid feature.
- */
-typedef struct
-{
- query_node_info_s node;
- query_node_info_s broker;
- gboolean is_server;
- GAsyncQueue *server_list;
- gpointer handle;
- gint state;
- gchar *topic;
-} query_hybrid_info_s;
-
-#if defined(ENABLE_QUERY_HYBRID)
-/**
- * @brief Initialize query-hybrid info.
- */
-extern void
-tensor_query_hybrid_init (query_hybrid_info_s * info, const gchar * host, const guint16 port, gboolean is_server);
-
-/**
- * @brief Close connection and free query-hybrid info.
- */
-extern void
-tensor_query_hybrid_close (query_hybrid_info_s * info);
-
-/**
- * @brief Set current node info.
- */
-extern void
-tensor_query_hybrid_set_node (query_hybrid_info_s * info, const gchar * host, const guint16 port, query_server_info_handle server_info_h);
-
-/**
- * @brief Set broker info.
- */
-extern void
-tensor_query_hybrid_set_broker (query_hybrid_info_s * info, const gchar * host, const guint16 port);
-
-/**
- * @brief Get server info from node list.
- * @return Server node info. Caller should release server info using tensor_query_hybrid_free_server_info().
- */
-extern query_server_info_s *
-tensor_query_hybrid_get_server_info (query_hybrid_info_s * info);
-
-/**
- * @brief Free server info.
- */
-extern void
-tensor_query_hybrid_free_server_info (query_server_info_s * server);
-
-/**
- * @brief Connect to broker and publish topic.
- * @note Before calling this function, user should set broker info using tensor_query_hybrid_set_broker().
- */
-extern gboolean
-tensor_query_hybrid_publish (query_hybrid_info_s * info, const gchar * operation);
-
-/**
- * @brief Connect to broker and subcribe topic.
- * @note Before calling this function, user should set broker info using tensor_query_hybrid_set_broker().
- */
-extern gboolean
-tensor_query_hybrid_subscribe (query_hybrid_info_s * info, const gchar * operation);
-#else
-#define tensor_query_hybrid_init(...)
-#define tensor_query_hybrid_close(...)
-#define tensor_query_hybrid_set_node(...)
-#define tensor_query_hybrid_set_broker(...)
-#define tensor_query_hybrid_get_server_info(...) NULL
-#define tensor_query_hybrid_free_server_info(...)
-#define tensor_query_hybrid_publish(...) FALSE
-#define tensor_query_hybrid_subscribe(...) FALSE
-#endif /* ENABLE_QUERY_HYBRID */
-
-G_END_DECLS
-#endif /* __TENSOR_QUERY_HYBRID_H__ */
#include "tensor_query_server.h"
#include <tensor_typedef.h>
#include <tensor_common.h>
-#include "nnstreamer-edge.h"
/**
* @brief mutex for tensor-query server table.
* @brief Add nnstreamer edge server handle into hash table.
*/
edge_server_handle
-gst_tensor_query_server_add_data (char *id)
+gst_tensor_query_server_add_data (char *id, const gchar * topic)
{
GstTensorQueryServer *data = NULL;
int ret;
+ gchar *_topic = NULL;
data = gst_tensor_query_server_get_handle (id);
data->id = id;
data->configured = FALSE;
- ret = nns_edge_create_handle (id, "TEMP_SERVER_TOPIC", &data->edge_h);
- if (ret != NNS_EDGE_ERROR_NONE) {
+ if (topic)
+ _topic = g_strdup (topic);
+ else
+ _topic = g_strdup ("TCP_DIRECT");
+
+ ret = nns_edge_create_handle (id, _topic, &data->edge_h);
+ if (NNS_EDGE_ERROR_NONE != ret) {
GST_ERROR ("Failed to get nnstreamer edge handle.");
gst_tensor_query_server_remove_data (data);
return NULL;
g_hash_table_insert (_qs_table, g_strdup (id), data);
G_UNLOCK (query_server_table);
+ g_free (_topic);
return data;
}
#define DEFAULT_SERVER_ID 0
#define DEFAULT_QUERY_INFO_TIMEOUT 5
-typedef void * query_server_info_handle;
typedef void * edge_server_handle;
/**
* @brief Add GstTensorQueryServer.
*/
edge_server_handle
-gst_tensor_query_server_add_data (char *id);
+gst_tensor_query_server_add_data (char *id, const gchar *topic);
/**
* @brief Remove GstTensorQueryServer.
gchar *id_str = NULL;
id_str = g_strdup_printf ("%u", sink->sink_id);
- sink->server_h = gst_tensor_query_server_add_data (id_str);
+ sink->server_h = gst_tensor_query_server_add_data (id_str, NULL);
g_free (id_str);
caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (bsink));
guint sink_id;
guint timeout;
- query_server_info_handle server_info_h;
gint metaless_frame_limit;
gint metaless_frame_count;
#include <tensor_typedef.h>
#include <tensor_common.h>
#include "tensor_query_serversrc.h"
-#include "tensor_query_server.h"
#include "tensor_query_common.h"
#include "nnstreamer_util.h"
PROP_PORT,
PROP_PROTOCOL,
PROP_TIMEOUT,
- PROP_OPERATION,
+ PROP_TOPIC,
PROP_ID,
PROP_IS_LIVE
};
"The timeout as seconds to maintain connection", 0,
3600, QUERY_DEFAULT_TIMEOUT_SEC,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_OPERATION,
+ g_object_class_install_property (gobject_class, PROP_TOPIC,
g_param_spec_string ("topic", "Topic",
"The main topic of the host and option if necessary. "
"(topic)/(optional topic for main topic).",
src->protocol = DEFAULT_PROTOCOL;
src->timeout = QUERY_DEFAULT_TIMEOUT_SEC;
src->topic = NULL;
- src->srv_host = g_strdup (DEFAULT_HOST);
- src->srv_port = DEFAULT_PORT_SRC;
src->src_id = DEFAULT_SERVER_ID;
- tensor_query_hybrid_init (&src->hybrid_info, NULL, 0, TRUE);
src->configured = FALSE;
src->msg_queue = g_async_queue_new ();
src->host = NULL;
g_free (src->topic);
src->topic = NULL;
- g_free (src->srv_host);
- src->srv_host = NULL;
while ((data_h = g_async_queue_try_pop (src->msg_queue))) {
nns_edge_data_destroy (data_h);
case PROP_TIMEOUT:
serversrc->timeout = g_value_get_uint (value);
break;
- case PROP_OPERATION:
+ case PROP_TOPIC:
if (!g_value_get_string (value)) {
nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
break;
}
g_free (serversrc->topic);
serversrc->topic = g_value_dup_string (value);
+ if (NNS_EDGE_PROTOCOL_TCP == serversrc->protocol)
+ serversrc->protocol = NNS_EDGE_PROTOCOL_MQTT;
break;
case PROP_ID:
serversrc->src_id = g_value_get_uint (value);
case PROP_TIMEOUT:
g_value_set_uint (value, serversrc->timeout);
break;
- case PROP_OPERATION:
+ case PROP_TOPIC:
g_value_set_string (value, serversrc->topic);
break;
case PROP_ID:
gst_tensor_query_serversrc_start (GstBaseSrc * bsrc)
{
GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
- char *id_str = NULL;
- char *port = NULL;
+ char *id_str = NULL, *port = NULL, *protocol_str = NULL;
id_str = g_strdup_printf ("%d", src->src_id);
- src->server_h = gst_tensor_query_server_add_data (id_str);
+ src->server_h = gst_tensor_query_server_add_data (id_str, src->topic);
g_free (id_str);
src->edge_h = gst_tensor_query_server_get_edge_handle (src->server_h);
g_free (port);
/** Publish query sever connection info */
- if (src->topic) {
+ if (src->topic)
nns_edge_set_info (src->edge_h, "TOPIC", src->topic);
- tensor_query_hybrid_set_node (&src->hybrid_info, src->srv_host,
- src->srv_port, src->server_info_h);
- tensor_query_hybrid_set_broker (&src->hybrid_info, src->host, src->port);
- if (!tensor_query_hybrid_publish (&src->hybrid_info, src->topic)) {
- nns_loge ("Failed to publish a topic.");
- return FALSE;
- }
- } else {
- g_free (src->srv_host);
- src->srv_host = g_strdup (src->host);
- src->srv_port = src->port;
- nns_logi ("Query-hybrid feature is disabled.");
- nns_logi
- ("Specify topic to register server to broker (e.g., topic=object_detection/mobilev3).");
- }
+ protocol_str = g_strdup_printf ("%d", src->protocol);
+ nns_edge_set_info (src->edge_h, "PROTOCOL", protocol_str);
+ g_free (protocol_str);
+
nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src);
if (0 != nns_edge_start (src->edge_h, true)) {
#include <gst/base/gstbasesrc.h>
#include <gst/base/gstpushsrc.h>
#include <tensor_meta.h>
-#include "tensor_query_hybrid.h"
+#include "tensor_query_server.h"
G_BEGIN_DECLS
guint16 port;
gchar *host;
- guint16 srv_port;
- gchar *srv_host;
guint timeout;
/* Query-hybrid feature */
gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */
- query_hybrid_info_s hybrid_info;
-
- query_server_info_handle server_info_h;
nns_edge_protocol_e protocol;
edge_server_handle server_h;
# tensor-query element with nnstreamer-edge
NNSTREAMER_QUERY_SRCS := \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_common.c \
- $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_hybrid.c \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_client.c \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversink.c \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversrc.c \
'target': 'tvm_runtime',
'project_args': { 'ENABLE_TVM' : 1 }
},
- 'query-hybrid-support': {
- 'target': 'nnsquery',
- 'project_args': { 'ENABLE_QUERY_HYBRID': 1 }
- },
'trix-engine-support': {
'target': 'npu-engine',
'project_args': { 'ENABLE_TRIX_ENGINE' : 1 }
kill -9 $pid &> /dev/null
wait $pid
-# TODO enable query-hybrid test
-# Now nnsquery library is not available.
-# After publishing the nnsquery pkg, enable below testcases.
-# rm *.log
-report
-exit
-
# Check whether mqtt broker is running or not
pid=`ps aux | grep mosquitto | grep -v grep | awk '{print $2}'`
if [ $pid > 0 ]; then
fi
# Test Query-hybrid. Get server info from broker.
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} tensor_query_serversrc num-buffers=3 operation=passthrough ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location=server1_%1d.log t. ! queue ! tensor_query_serversink" 4-1 0 0 $PERFORMANCE $TIMEOUT_SEC &
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} tensor_query_serversrc num-buffers=3 operation=passthrough port=5000 ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location=server2_%1d.log t. ! queue ! tensor_query_serversink port=5001" 4-2 0 0 $PERFORMANCE $TIMEOUT_SEC &
-sleep $SLEEPTIME_SEC
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} videotestsrc is-live=true num-buffers=7 ! videoconvert ! videoscale ! video/x-raw,width=300,height=300,format=RGB ! tensor_converter ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location= raw4_%1d.log t. ! queue ! tensor_query_client operation=passthrough ! multifilesink location=result4_%1d.log" 4-3 0 0 $PERFORMANCE
-_callCompareTest raw4_0.log result4_0.log 4-4 "Compare 4-4" 1 0
-_callCompareTest raw4_1.log result4_1.log 4-5 "Compare 4-5" 1 0
-_callCompareTest raw4_2.log result4_2.log 4-6 "Compare 4-6" 1 0
-# Server 1 is stopped and lost the fourth buffer.
-_callCompareTest raw4_4.log result4_3.log 4-7 "Compare 4-7" 1 0
-_callCompareTest raw4_5.log result4_4.log 4-8 "Compare 4-8" 1 0
-_callCompareTest raw4_6.log result4_5.log 4-9 "Compare 4-9" 1 0
-
-# Compare the results of the server and the client.
-_callCompareTest server1_0.log result4_0.log 4-10 "Compare 4-10" 1 0
-_callCompareTest server1_1.log result4_1.log 4-11 "Compare 4-11" 1 0
-_callCompareTest server1_2.log result4_2.log 4-12 "Compare 4-12" 1 0
-_callCompareTest server2_0.log result4_3.log 4-13 "Compare 4-13" 1 0
-_callCompareTest server2_1.log result4_4.log 4-14 "Compare 4-14" 1 0
-_callCompareTest server2_2.log result4_5.log 4-15 "Compare 4-15" 1 0
-
-sleep $SLEEPTIME_SEC
+gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} tensor_query_serversrc id=12345 host=tcp://localhost port=1883 topic=passthrough ! other/tensors,format=flexible,framerate=0/1 ! tee name = t t. ! queue ! multifilesink location=server1_%1d.log t. ! queue ! tensor_query_serversink id=12345 async=false" 4-1 0 0 5
+gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} videotestsrc num-buffers=50 is-live=true ! videoconvert ! videoscale ! video/x-raw,width=300,height=300,format=RGB ! tensor_converter ! other/tensors,format=flexible ! tee name = t t. ! queue ! multifilesink location= raw4_%1d.log t. ! queue ! tensor_query_client host=tcp://localhost port=1883 topic=passthrough ! multifilesink location=result4_%1d.log" 4-3 0 0 $PERFORMANCE
+_callCompareTest raw4_0.log result4_0.log 4-4 "Compare the raw file and client received file 4-4" 1 0
+_callCompareTest raw4_1.log result4_1.log 4-5 "Compare the raw file and client received file 4-5" 1 0
+_callCompareTest raw4_2.log result4_2.log 4-6 "Compare the raw file and client received file 4-6" 1 0
+_callCompareTest server1_0.log result4_0.log 4-7 "Compare the server 1 received file and client received file 4-7" 1 0
+_callCompareTest server1_1.log result4_1.log 4-8 "Compare the server 1 received file and client received file 4-8" 1 0
+_callCompareTest server1_2.log result4_2.log 4-9 "Compare the server 1 received file and client received file 4-9" 1 0
+
+kill -9 $pid &> /dev/null
+wait $pid
+
rm *.log
report