From df023850bc84505e70bde9205cd0e044d9a0df7b Mon Sep 17 00:00:00 2001 From: gichan Date: Wed, 8 Jun 2022 09:55:44 +0900 Subject: [PATCH] [Edge] Tensor query TCP connection Implement tensor query TCP connection using nnstreamer-edge lib. Signed-off-by: gichan --- debian/control | 2 +- debian/control.debian | 2 +- debian/control.ubuntu.ppa | 2 +- gst/nnstreamer/meson.build | 4 +- gst/nnstreamer/registerer/nnstreamer.c | 4 + gst/nnstreamer/tensor_query/meson.build | 2 + gst/nnstreamer/tensor_query/nnstreamer-edge.h | 1 + .../tensor_query/nnstreamer_edge_internal.c | 1 + gst/nnstreamer/tensor_query/tensor_query_client.c | 465 +++++---- gst/nnstreamer/tensor_query/tensor_query_client.h | 20 +- gst/nnstreamer/tensor_query/tensor_query_common.c | 1089 +------------------- gst/nnstreamer/tensor_query/tensor_query_common.h | 164 +-- gst/nnstreamer/tensor_query/tensor_query_server.c | 165 +-- gst/nnstreamer/tensor_query/tensor_query_server.h | 57 +- .../tensor_query/tensor_query_serversink.c | 120 +-- .../tensor_query/tensor_query_serversink.h | 11 +- .../tensor_query/tensor_query_serversrc.c | 241 +++-- .../tensor_query/tensor_query_serversrc.h | 15 +- jni/nnstreamer.mk | 7 +- meson.build | 4 + meson_options.txt | 1 + packaging/nnstreamer.spec | 1 + 22 files changed, 586 insertions(+), 1792 deletions(-) diff --git a/debian/control b/debian/control index eb7b581..4c12cb1 100644 --- a/debian/control +++ b/debian/control @@ -3,7 +3,7 @@ Section: libs Priority: optional Maintainer: MyungJoo Ham Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4), - ninja-build, meson (>=0.50), debhelper (>=9), + ninja-build, meson (>=0.50), debhelper (>=9), nnstreamer-edge-dev, libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev, gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good, libgtest-dev, ssat, libpng-dev, libopencv-dev, liborc-0.4-dev, flex, bison, diff --git a/debian/control.debian b/debian/control.debian index e984241..e91a20d 100644 --- a/debian/control.debian +++ b/debian/control.debian @@ -3,7 +3,7 @@ Section: libs Priority: optional Maintainer: MyungJoo Ham Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4), - ninja-build, meson (>=0.49), debhelper (>=9), + ninja-build, meson (>=0.49), debhelper (>=9), nnstreamer-edge-dev, libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev, gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good, libgtest-dev, libpng-dev, libopencv-dev, liborc-0.4-dev, flex, bison, diff --git a/debian/control.ubuntu.ppa b/debian/control.ubuntu.ppa index 9b6741e..b2c631d 100644 --- a/debian/control.ubuntu.ppa +++ b/debian/control.ubuntu.ppa @@ -3,7 +3,7 @@ Section: libs Priority: optional Maintainer: MyungJoo Ham Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4), - ninja-build, meson (>=0.49), debhelper (>=9), + ninja-build, meson (>=0.49), debhelper (>=9), nnstreamer-edge-dev, libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev, gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good, libgtest-dev, ssat, libpng-dev, libopencv-dev, liborc-0.4-dev, diff --git a/gst/nnstreamer/meson.build b/gst/nnstreamer/meson.build index e5e30c2..b7a1e41 100644 --- a/gst/nnstreamer/meson.build +++ b/gst/nnstreamer/meson.build @@ -112,7 +112,7 @@ nnstreamer_deps += nnstreamer_single_dep # Build libraries ("both_libraries" are supported from 0.46.) nnstreamer_shared = shared_library('nnstreamer', nnstreamer_sources, - dependencies: nnstreamer_deps, + dependencies: [nnstreamer_deps, nns_edge_dep], include_directories: nnstreamer_inc, install: true, install_dir: plugins_install_dir @@ -132,7 +132,7 @@ meson.add_install_script( nnstreamer_static = static_library('nnstreamer', nnstreamer_sources, - dependencies: nnstreamer_deps, + dependencies: [nnstreamer_deps, nns_edge_dep], include_directories: nnstreamer_inc, install: true, install_dir: nnstreamer_libdir diff --git a/gst/nnstreamer/registerer/nnstreamer.c b/gst/nnstreamer/registerer/nnstreamer.c index 1fd9895..6f78592 100644 --- a/gst/nnstreamer/registerer/nnstreamer.c +++ b/gst/nnstreamer/registerer/nnstreamer.c @@ -69,9 +69,11 @@ #endif /* __gnu_linux__ && !__ANDROID__ */ #include +#if defined(ENABLE_NNSTREAMER_EDGE) #include #include #include +#endif #define NNSTREAMER_INIT(plugin,name,type) \ do { \ @@ -104,9 +106,11 @@ gst_nnstreamer_init (GstPlugin * plugin) NNSTREAMER_INIT (plugin, transform, TRANSFORM); NNSTREAMER_INIT (plugin, if, IF); NNSTREAMER_INIT (plugin, rate, RATE); +#if defined(ENABLE_NNSTREAMER_EDGE) NNSTREAMER_INIT (plugin, query_serversrc, QUERY_SERVERSRC); NNSTREAMER_INIT (plugin, query_serversink, QUERY_SERVERSINK); NNSTREAMER_INIT (plugin, query_client, QUERY_CLIENT); +#endif #if defined(__gnu_linux__) && !defined(__ANDROID__) /* IIO requires Linux / non-Android */ #if (GST_VERSION_MAJOR == 1) && (GST_VERSION_MINOR >= 8) diff --git a/gst/nnstreamer/tensor_query/meson.build b/gst/nnstreamer/tensor_query/meson.build index 1830053..755fcbd 100644 --- a/gst/nnstreamer/tensor_query/meson.build +++ b/gst/nnstreamer/tensor_query/meson.build @@ -26,6 +26,8 @@ nnstreamer_edge_deps = [ glib_dep, gio_dep, thread_dep ] +nnstreamer_headers += join_paths(meson.current_source_dir(), 'nnstreamer-edge.h') + if aitt_support_is_available nnstreamer_edge_sources += join_paths(meson.current_source_dir(), 'nnstreamer_edge_aitt.c') nnstreamer_edge_deps += aitt_support_deps diff --git a/gst/nnstreamer/tensor_query/nnstreamer-edge.h b/gst/nnstreamer/tensor_query/nnstreamer-edge.h index b7b3955..0382909 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer-edge.h +++ b/gst/nnstreamer/tensor_query/nnstreamer-edge.h @@ -64,6 +64,7 @@ typedef enum { typedef enum { NNS_EDGE_PROTOCOL_TCP = 0, + NNS_EDGE_PROTOCOL_UDP, NNS_EDGE_PROTOCOL_MQTT, NNS_EDGE_PROTOCOL_AITT, NNS_EDGE_PROTOCOL_AITT_TCP, diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c index 7c7ac5a..43b218b 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c @@ -735,6 +735,7 @@ nns_edge_release_handle (nns_edge_h edge_h) g_free (eh->topic); g_free (eh->ip); g_free (eh->recv_ip); + g_free (eh->caps_str); g_hash_table_destroy (eh->conn_table); nns_edge_unlock (eh); diff --git a/gst/nnstreamer/tensor_query/tensor_query_client.c b/gst/nnstreamer/tensor_query/tensor_query_client.c index 3317a58..3397b8e 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_client.c +++ b/gst/nnstreamer/tensor_query/tensor_query_client.c @@ -19,6 +19,14 @@ #include #include #include +#include "nnstreamer-edge.h" +#include "tensor_query_common.h" + +#include +#include +#include +#include +#include /** * @brief Macro for debug mode. @@ -33,20 +41,15 @@ enum { PROP_0, - PROP_SINK_HOST, - PROP_SINK_PORT, - PROP_SRC_HOST, - PROP_SRC_PORT, + PROP_HOST, + PROP_PORT, PROP_PROTOCOL, PROP_OPERATION, - PROP_BROKER_HOST, - PROP_BROKER_PORT, PROP_SILENT, }; #define TCP_HIGHEST_PORT 65535 #define TCP_DEFAULT_HOST "localhost" -#define TCP_DEFAULT_SINK_PORT 3000 #define TCP_DEFAULT_SRC_PORT 3001 #define DEFAULT_SILENT TRUE @@ -104,22 +107,13 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass) gobject_class->finalize = gst_tensor_query_client_finalize; /** install property goes here */ - g_object_class_install_property (gobject_class, PROP_SINK_HOST, - g_param_spec_string ("sink-host", "Sink Host", - "A tenor query sink host to send the packets to/from", - TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SINK_PORT, - g_param_spec_uint ("sink-port", "Sink Port", - "The port of tensor query sink to send the packets to/from", 0, - TCP_HIGHEST_PORT, TCP_DEFAULT_SINK_PORT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SRC_HOST, - g_param_spec_string ("src-host", "Source Host", - "A tenor query src host to send the packets to/from", + g_object_class_install_property (gobject_class, PROP_HOST, + g_param_spec_string ("host", "Host", + "A tenor query server host to send the packets to/from", TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SRC_PORT, - g_param_spec_uint ("src-port", "Source Port", - "The port of tensor query src to send the packets to/from", 0, + g_object_class_install_property (gobject_class, PROP_PORT, + g_param_spec_uint ("port", "Port", + "The port of tensor query server to send the packets to/from", 0, TCP_HIGHEST_PORT, TCP_DEFAULT_SRC_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SILENT, @@ -134,14 +128,6 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass) g_param_spec_string ("operation", "Operation", "The main operation of the host.", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BROKER_HOST, - g_param_spec_string ("broker-host", "Broker Host", - "Broker host address to connect.", DEFAULT_BROKER_HOST, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BROKER_PORT, - g_param_spec_uint ("broker-port", "Broker Port", - "Broker port to connect.", 0, 65535, - DEFAULT_BROKER_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); @@ -158,6 +144,40 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass) } /** + * @brief Get IP address + */ +static gchar * +_get_ip_address (void) +{ + struct ifaddrs *addrs, *run; + gchar *ret = NULL; + + if (0 != getifaddrs (&addrs)) + goto done; + run = addrs; + + while (run) { + if (run->ifa_addr && run->ifa_addr->sa_family == AF_INET) { + struct sockaddr_in *pAddr = (struct sockaddr_in *) run->ifa_addr; + + if (NULL != strstr (run->ifa_name, "en") || + NULL != strstr (run->ifa_name, "et")) { + g_free (ret); + ret = g_strdup (inet_ntoa (pAddr->sin_addr)); + } + } + run = run->ifa_next; + } + freeifaddrs (addrs); + +done: + if (NULL == ret) + ret = g_strdup ("localhost"); + + return ret; +} + +/** * @brief initialize the new element */ static void @@ -180,18 +200,16 @@ gst_tensor_query_client_init (GstTensorQueryClient * self) /* init properties */ self->silent = DEFAULT_SILENT; self->protocol = DEFAULT_PROTOCOL; - self->sink_conn = NULL; - self->sink_host = g_strdup (TCP_DEFAULT_HOST); - self->sink_port = TCP_DEFAULT_SINK_PORT; - self->src_conn = NULL; - self->src_host = g_strdup (TCP_DEFAULT_HOST); - self->src_port = TCP_DEFAULT_SRC_PORT; + self->host = g_strdup (TCP_DEFAULT_HOST); + self->port = TCP_DEFAULT_SRC_PORT; + self->srv_host = g_strdup (TCP_DEFAULT_HOST); + self->srv_port = TCP_DEFAULT_SRC_PORT; self->operation = NULL; - self->broker_host = g_strdup (DEFAULT_BROKER_HOST); - self->broker_port = DEFAULT_BROKER_PORT; self->in_caps_str = NULL; + self->msg_queue = g_async_queue_new (); tensor_query_hybrid_init (&self->hybrid_info, NULL, 0, FALSE); + nns_edge_create_handle ("TEMP_ID", "TEMP_CLIENT_TOPIC", &self->edge_h); } /** @@ -201,25 +219,26 @@ static void gst_tensor_query_client_finalize (GObject * object) { GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object); + nns_edge_data_h data_h; - g_free (self->sink_host); - self->sink_host = NULL; - g_free (self->src_host); - self->src_host = NULL; + g_free (self->host); + self->host = NULL; + g_free (self->srv_host); + self->srv_host = NULL; g_free (self->operation); self->operation = NULL; - g_free (self->broker_host); - self->broker_host = NULL; g_free (self->in_caps_str); self->in_caps_str = NULL; - tensor_query_hybrid_close (&self->hybrid_info); - if (self->sink_conn) { - nnstreamer_query_close (self->sink_conn); - self->sink_conn = NULL; + + while ((data_h = g_async_queue_try_pop (self->msg_queue))) { + nns_edge_data_destroy (data_h); } - if (self->src_conn) { - nnstreamer_query_close (self->src_conn); - self->src_conn = NULL; + g_async_queue_unref (self->msg_queue); + + tensor_query_hybrid_close (&self->hybrid_info); + if (self->edge_h) { + nns_edge_release_handle (self->edge_h); + self->edge_h = NULL; } G_OBJECT_CLASS (parent_class)->finalize (object); @@ -235,33 +254,22 @@ gst_tensor_query_client_set_property (GObject * object, guint prop_id, GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object); switch (prop_id) { - case PROP_SINK_HOST: + case PROP_HOST: if (!g_value_get_string (value)) { nns_logw ("Sink host property cannot be NULL"); break; } - g_free (self->sink_host); - self->sink_host = g_value_dup_string (value); - break; - case PROP_SINK_PORT: - self->sink_port = g_value_get_uint (value); - break; - case PROP_SRC_HOST: - if (!g_value_get_string (value)) { - nns_logw ("Source host property cannot be NULL"); - break; - } - g_free (self->src_host); - self->src_host = g_value_dup_string (value); + g_free (self->host); + self->host = g_value_dup_string (value); break; - case PROP_SRC_PORT: - self->src_port = g_value_get_uint (value); + case PROP_PORT: + self->port = g_value_get_uint (value); break; case PROP_PROTOCOL: { /** @todo expand when other protocols are ready */ - TensorQueryProtocol protocol = g_value_get_enum (value); - if (protocol == _TENSOR_QUERY_PROTOCOL_TCP) + nns_edge_protocol_e protocol = g_value_get_enum (value); + if (protocol == NNS_EDGE_PROTOCOL_TCP) self->protocol = protocol; } break; @@ -274,17 +282,6 @@ gst_tensor_query_client_set_property (GObject * object, guint prop_id, g_free (self->operation); self->operation = g_value_dup_string (value); break; - case PROP_BROKER_HOST: - if (!g_value_get_string (value)) { - nns_logw ("Broker host property cannot be NULL"); - break; - } - g_free (self->broker_host); - self->broker_host = g_value_dup_string (value); - break; - case PROP_BROKER_PORT: - self->broker_port = g_value_get_uint (value); - break; case PROP_SILENT: self->silent = g_value_get_boolean (value); break; @@ -304,17 +301,11 @@ gst_tensor_query_client_get_property (GObject * object, guint prop_id, GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object); switch (prop_id) { - case PROP_SINK_HOST: - g_value_set_string (value, self->sink_host); - break; - case PROP_SINK_PORT: - g_value_set_uint (value, self->sink_port); + case PROP_HOST: + g_value_set_string (value, self->host); break; - case PROP_SRC_HOST: - g_value_set_string (value, self->src_host); - break; - case PROP_SRC_PORT: - g_value_set_uint (value, self->src_port); + case PROP_PORT: + g_value_set_uint (value, self->port); break; case PROP_PROTOCOL: g_value_set_enum (value, self->protocol); @@ -322,11 +313,6 @@ gst_tensor_query_client_get_property (GObject * object, guint prop_id, case PROP_OPERATION: g_value_set_string (value, self->operation); break; - case PROP_BROKER_HOST: - g_value_set_string (value, self->broker_host); - break; - case PROP_BROKER_PORT: - g_value_set_uint (value, self->broker_port); break; case PROP_SILENT: g_value_set_boolean (value, self->silent); @@ -372,88 +358,14 @@ gst_tensor_query_client_update_caps (GstTensorQueryClient * self, } /** - * @brief Connect to query server. (Direct connection) - */ -static gboolean -_connect_to_server (GstTensorQueryClient * self) -{ - TensorQueryCommandData cmd_buf; - query_client_id_t client_id; - - nns_logd ("Server src info: %s:%u", self->src_host, self->src_port); - self->src_conn = nnstreamer_query_connect (self->protocol, self->src_host, - self->src_port, QUERY_DEFAULT_TIMEOUT_SEC); - if (!self->src_conn) { - nns_loge ("Failed to connect server source "); - return FALSE; - } - - /** Receive client ID from server src */ - if (0 != nnstreamer_query_receive (self->src_conn, &cmd_buf) || - cmd_buf.cmd != _TENSOR_QUERY_CMD_CLIENT_ID) { - nns_loge ("Failed to receive client ID."); - return FALSE; - } - - client_id = cmd_buf.client_id; - nnstreamer_query_set_client_id (self->src_conn, client_id); - - cmd_buf.cmd = _TENSOR_QUERY_CMD_REQUEST_INFO; - cmd_buf.data.data = (uint8_t *) self->in_caps_str; - cmd_buf.data.size = (size_t) strlen (self->in_caps_str) + 1; - - if (0 != nnstreamer_query_send (self->src_conn, &cmd_buf)) { - nns_loge ("Failed to send request info cmd buf"); - return FALSE; - } - - if (0 != nnstreamer_query_receive (self->src_conn, &cmd_buf)) { - nns_loge ("Failed to receive response from the query server."); - return FALSE; - } - - if (cmd_buf.cmd == _TENSOR_QUERY_CMD_RESPOND_APPROVE) { - if (!gst_tensor_query_client_update_caps (self, (char *) cmd_buf.data.data)) { - nns_loge ("Failed to update client source caps."); - return FALSE; - } - } else { - /** @todo Retry for info */ - nns_loge ("Failed to receive approve command."); - return FALSE; - } - - nns_logd ("Server sink info: %s:%u", self->sink_host, self->sink_port); - self->sink_conn = - nnstreamer_query_connect (self->protocol, self->sink_host, - self->sink_port, QUERY_DEFAULT_TIMEOUT_SEC); - if (!self->sink_conn) { - nns_loge ("Failed to connect server sink "); - return FALSE; - } - - nnstreamer_query_set_client_id (self->sink_conn, client_id); - cmd_buf.cmd = _TENSOR_QUERY_CMD_CLIENT_ID; - cmd_buf.client_id = client_id; - if (0 != nnstreamer_query_send (self->sink_conn, &cmd_buf)) { - nns_loge ("Failed to send client ID to server sink"); - return FALSE; - } - return TRUE; -} - -/** * @brief Copy server info. */ static void _copy_srv_info (GstTensorQueryClient * self, query_server_info_s * server) { - g_free (self->src_host); - self->src_host = g_strdup (server->src.host); - self->src_port = server->src.port; - g_free (self->sink_host); - self->sink_host = g_strdup (server->sink.host); - self->sink_port = server->sink.port; + g_free (self->srv_host); + self->srv_host = g_strdup (server->src.host); + self->srv_port = server->src.port; } /** @@ -469,17 +381,15 @@ _client_retry_connection (GstTensorQueryClient * self) nns_logd ("Retry to connect to available server."); while ((server = tensor_query_hybrid_get_server_info (&self->hybrid_info))) { - nnstreamer_query_close (self->sink_conn); - nnstreamer_query_close (self->src_conn); - self->sink_conn = NULL; - self->src_conn = NULL; + nns_edge_disconnect (self->edge_h); _copy_srv_info (self, server); tensor_query_hybrid_free_server_info (server); - if (_connect_to_server (self)) { - nns_logi ("Connected to new server. src: %s:%u, sink: %s:%u", - self->src_host, self->src_port, self->sink_host, self->sink_port); + if (nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP, self->srv_host, + self->srv_port)) { + nns_logi ("Connected to new server: %s:%u.", self->srv_host, + self->srv_port); ret = TRUE; break; } @@ -489,6 +399,118 @@ _client_retry_connection (GstTensorQueryClient * self) } /** + * @brief Parse caps from received event data. + */ +static gchar * +_nns_edge_parse_caps (gchar * caps_str, gboolean is_src) +{ + gchar **strv = g_strsplit (caps_str, "@", -1); + gint num = g_strv_length (strv), i; + gchar *find_key = NULL; + gchar *ret_str = NULL; + + find_key = + is_src == + TRUE ? g_strdup ("query_server_src_caps") : + g_strdup ("query_server_sink_caps"); + + for (i = 1; i < num; i += 2) { + if (0 == g_strcmp0 (find_key, strv[i])) { + ret_str = g_strdup (strv[i + 1]); + break; + } + } + + g_free (find_key); + g_strfreev (strv); + + return ret_str; +} + +/** + * @brief nnstreamer-edge event callback. + */ +static int +_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data) +{ + nns_edge_event_e event_type; + int ret = NNS_EDGE_ERROR_NONE; + gchar *caps_str = NULL; + GstTensorQueryClient *self = (GstTensorQueryClient *) user_data; + + if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) { + nns_loge ("Failed to get event type!"); + return NNS_EDGE_ERROR_NOT_SUPPORTED; + } + + switch (event_type) { + case NNS_EDGE_EVENT_CAPABILITY: + { + GstCaps *server_caps, *client_caps; + GstStructure *server_st, *client_st; + gboolean result = FALSE; + gchar *ret_str; + + nns_edge_event_parse_capability (event_h, &caps_str); + ret_str = _nns_edge_parse_caps (caps_str, TRUE); + client_caps = gst_caps_from_string ((gchar *) self->in_caps_str); + server_caps = gst_caps_from_string (ret_str); + g_free (ret_str); + + /** Server framerate may vary. Let's skip comparing the framerate. */ + gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1, + NULL); + gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1, + NULL); + + server_st = gst_caps_get_structure (server_caps, 0); + client_st = gst_caps_get_structure (client_caps, 0); + + if (gst_structure_is_tensor_stream (server_st)) { + GstTensorsConfig server_config, client_config; + + gst_tensors_config_from_structure (&server_config, server_st); + gst_tensors_config_from_structure (&client_config, client_st); + + result = gst_tensors_config_is_equal (&server_config, &client_config); + } + + if (result || gst_caps_can_intersect (client_caps, server_caps)) { + /** Update client src caps */ + ret_str = _nns_edge_parse_caps (caps_str, FALSE); + if (!gst_tensor_query_client_update_caps (self, ret_str)) { + nns_loge ("Failed to update client source caps."); + ret = NNS_EDGE_ERROR_UNKNOWN; + } + g_free (ret_str); + } else { + /* respond deny with src caps string */ + nns_loge ("Query caps is not acceptable!"); + ret = NNS_EDGE_ERROR_UNKNOWN; + } + + gst_caps_unref (server_caps); + gst_caps_unref (client_caps); + + break; + } + case NNS_EDGE_EVENT_NEW_DATA_RECEIVED: + { + nns_edge_data_h data; + + nns_edge_event_parse_new_data (event_h, &data); + g_async_queue_push (self->msg_queue, data); + break; + } + default: + break; + } + + g_free (caps_str); + return ret; +} + +/** * @brief This function handles sink event. */ static gboolean @@ -497,6 +519,7 @@ gst_tensor_query_client_sink_event (GstPad * pad, { GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent); gboolean ret = TRUE; + gchar *ip; GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT, GST_EVENT_TYPE_NAME (event), event); @@ -512,7 +535,7 @@ gst_tensor_query_client_sink_event (GstPad * pad, query_server_info_s *server; tensor_query_hybrid_set_broker (&self->hybrid_info, - self->broker_host, self->broker_port); + self->host, self->port); if (!tensor_query_hybrid_subscribe (&self->hybrid_info, self->operation)) { @@ -530,13 +553,31 @@ gst_tensor_query_client_sink_event (GstPad * pad, nns_logi ("Query-hybrid feature is disabled."); nns_logi ("Specify operation to subscribe to the available broker (e.g., operation=object_detection)."); + g_free (self->srv_host); + self->srv_host = g_strdup (self->host); + self->srv_port = self->port; } g_free (self->in_caps_str); self->in_caps_str = gst_caps_to_string (caps); + nns_edge_set_info (self->edge_h, "CAPS", self->in_caps_str); + nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self); + + ip = _get_ip_address (); + nns_edge_set_info (self->edge_h, "IP", ip); + nns_edge_set_info (self->edge_h, "PORT", "0"); + g_free (ip); + + if (0 != nns_edge_start (self->edge_h, false)) { + nns_loge + ("Failed to start NNStreamer-edge. Please check server IP and port"); + return FALSE; + } - if (!_connect_to_server (self)) { - ret = _client_retry_connection (self); + if (0 != nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP, + self->srv_host, self->srv_port)) { + nns_loge ("Failed to connect to edge server!"); + return FALSE; } gst_event_unref (event); @@ -610,24 +651,61 @@ gst_tensor_query_client_chain (GstPad * pad, GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent); GstBuffer *out_buf = NULL; GstFlowReturn res = GST_FLOW_OK; - + nns_edge_data_h data_h; + guint i, num_mems, num_data; + int ret; + GstMemory *mem[NNS_TENSOR_SIZE_LIMIT]; + GstMapInfo map[NNS_TENSOR_SIZE_LIMIT]; UNUSED (pad); - if (!tensor_query_send_buffer (self->src_conn, GST_ELEMENT (self), buf)) { - nns_logw ("Failed to send buffer to server node, retry connection."); + ret = nns_edge_data_create (&data_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_loge ("Failed to create data handle in client chain."); + return GST_FLOW_ERROR; + } + + num_mems = gst_buffer_n_memory (buf); + for (i = 0; i < num_mems; i++) { + mem[i] = gst_buffer_peek_memory (buf, i); + if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) { + ml_loge ("Cannot map the %uth memory in gst-buffer.", i); + num_mems = i; + goto done; + } + nns_edge_data_add (data_h, map[i].data, map[i].size, NULL); + } + if (0 != nns_edge_request (self->edge_h, data_h)) { + nns_logw ("Failed to request to server node, retry connection."); goto retry; } - out_buf = tensor_query_receive_buffer (self->sink_conn); - if (out_buf) { + nns_edge_data_destroy (data_h); + + data_h = g_async_queue_try_pop (self->msg_queue); + if (data_h) { + out_buf = gst_buffer_new (); + if (NNS_EDGE_ERROR_NONE != nns_edge_data_get_count (data_h, &num_data)) { + nns_loge ("Failed to get the number of memories of the edge data."); + res = GST_FLOW_ERROR; + goto done; + } + for (i = 0; i < num_data; i++) { + void *data = NULL; + size_t data_len; + gpointer new_data; + + nns_edge_data_get (data_h, i, &data, &data_len); + new_data = _g_memdup (data, data_len); + gst_buffer_append_memory (out_buf, + gst_memory_new_wrapped (0, new_data, data_len, 0, + data_len, new_data, g_free)); + } /* metadata from incoming buffer */ gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1); res = gst_pad_push (self->srcpad, out_buf); - goto done; } - - nns_logw ("Failed to receive result from server node, retry connection."); + goto done; retry: if (!self->operation || !_client_retry_connection (self)) { @@ -635,6 +713,13 @@ retry: res = GST_FLOW_ERROR; } done: + if (data_h) { + nns_edge_data_destroy (data_h); + } + + for (i = 0; i < num_mems; i++) + gst_memory_unmap (mem[i], &map[i]); + gst_buffer_unref (buf); return res; } diff --git a/gst/nnstreamer/tensor_query/tensor_query_client.h b/gst/nnstreamer/tensor_query/tensor_query_client.h index 0a66038..175fa13 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_client.h +++ b/gst/nnstreamer/tensor_query/tensor_query_client.h @@ -16,7 +16,6 @@ #include #include #include -#include "tensor_query_common.h" #include "tensor_query_hybrid.h" G_BEGIN_DECLS @@ -48,23 +47,18 @@ struct _GstTensorQueryClient gboolean silent; /**< True if logging is minimized */ gchar *in_caps_str; - TensorQueryProtocol protocol; - /* Query-hybrid feature */ gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */ query_hybrid_info_s hybrid_info; - gchar *broker_host; - guint16 broker_port; + gchar *host; + guint16 port; - /* src information (Connect to query server source) */ - query_connection_handle src_conn; - gchar *src_host; - guint16 src_port; + gchar *srv_host; + guint16 srv_port; - /* sink socket and information (Connect to query server sink) */ - query_connection_handle sink_conn; - gchar *sink_host; - guint16 sink_port; + nns_edge_protocol_e protocol; + nns_edge_h edge_h; + GAsyncQueue *msg_queue; }; /** diff --git a/gst/nnstreamer/tensor_query/tensor_query_common.c b/gst/nnstreamer/tensor_query/tensor_query_common.c index 5cc4c24..bad1db4 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_common.c +++ b/gst/nnstreamer/tensor_query/tensor_query_common.c @@ -14,1099 +14,16 @@ #ifdef HAVE_CONFIG_H #include "config.h" #endif - -#include -#include -#include #include #include #include -#include -#include -#include #include "tensor_query_common.h" -#include - -#define TENSOR_QUERY_SERVER_DATA_LEN 128 -#define N_BACKLOG 10 -#define CLIENT_ID_LEN sizeof(query_client_id_t) #ifndef EREMOTEIO #define EREMOTEIO 121 /* This is Linux-specific. Define this for non-Linux systems */ #endif /** - * @brief Query server dependent network data - */ -typedef struct -{ - TensorQueryProtocol protocol; - int8_t is_src; - char *src_caps_str; - char *sink_caps_str; - GAsyncQueue *msg_queue; - union - { - struct - { - GSocketListener *socket_listener; - GCancellable *cancellable; - GAsyncQueue *conn_queue; - }; - /* check the size of struct is less */ - uint8_t _dummy[TENSOR_QUERY_SERVER_DATA_LEN]; - }; -} TensorQueryServerData; - -/** - * @brief Structures for tensor query client data. - */ -typedef struct -{ - TensorQueryProtocol protocol; - union - { - struct - { - GSocket *client_socket; - GCancellable *cancellable; - }; - }; -} TensorQueryClientData; - -/** - * @brief Connection info structure - */ -typedef struct -{ - TensorQueryProtocol protocol; - char *host; - uint16_t port; - uint32_t timeout; - query_client_id_t client_id; - pthread_t msg_thread; - int8_t running; - - /* network info */ - union - { - /* TCP */ - struct - { - GSocket *socket; - GCancellable *cancellable; - }; - }; -} TensorQueryConnection; - -/** - * @brief Structures for tensor query message handling thread data. - */ -typedef struct -{ - TensorQueryServerData *sdata; - TensorQueryConnection *conn; -} TensorQueryMsgThreadData; - -static int -query_tcp_receive (GSocket * socket, uint8_t * data, size_t size, - GCancellable * cancellable); -static gboolean query_tcp_send (GSocket * socket, uint8_t * data, size_t size, - GCancellable * cancellable); -static void -accept_socket_async_cb (GObject * source, GAsyncResult * result, - gpointer user_data); - -/** - * @brief Internal function to check connection. - */ -static gboolean -_query_check_connection (query_connection_handle connection) -{ - TensorQueryConnection *conn; - size_t size; - GIOCondition condition; - - conn = (TensorQueryConnection *) connection; - - condition = g_socket_condition_check (conn->socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); - size = g_socket_get_available_bytes (conn->socket); - - if (condition && size <= 0) { - nns_logw ("Socket is not available, possibly EOS."); - return FALSE; - } - - return TRUE; -} - -/** - * @brief Internal function to get client id. - */ -static query_client_id_t -_query_get_client_id (query_connection_handle connection) -{ - TensorQueryConnection *conn = (TensorQueryConnection *) connection; - return conn->client_id; -} - -/** - * @brief Get socket address - */ -static gboolean -gst_tensor_query_get_saddr (const char *host, uint16_t port, - GCancellable * cancellable, GSocketAddress ** saddr) -{ - GError *err = NULL; - GInetAddress *addr; - - /* look up name if we need to */ - addr = g_inet_address_new_from_string (host); - if (!addr) { - GList *results; - GResolver *resolver; - resolver = g_resolver_get_default (); - results = g_resolver_lookup_by_name (resolver, host, cancellable, &err); - if (!results) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - nns_logd ("gst_tensor_query_socket_new: Cancelled name resolval"); - } else { - nns_loge ("Failed to resolve host '%s': %s", host, err->message); - } - g_clear_error (&err); - g_object_unref (resolver); - return FALSE; - } - /** @todo Try with the second address if the first fails */ - addr = G_INET_ADDRESS (g_object_ref (results->data)); - g_resolver_free_addresses (results); - g_object_unref (resolver); - } - - *saddr = g_inet_socket_address_new (addr, port); - g_object_unref (addr); - - return TRUE; -} - -/** - * @brief Create and connect to requested socket. - */ -static gboolean -gst_tensor_query_connect (query_connection_handle conn_h) -{ - GError *err = NULL; - GSocketAddress *saddr = NULL; - TensorQueryConnection *conn = (TensorQueryConnection *) conn_h; - gboolean ret = FALSE; - - if (!gst_tensor_query_get_saddr (conn->host, conn->port, conn->cancellable, - &saddr)) { - nns_loge ("Failed to get socket address"); - return ret; - } - - /* create sending client socket */ - conn->socket = - g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, - G_SOCKET_PROTOCOL_TCP, &err); - /** @todo Support UDP protocol */ - - if (!conn->socket) { - nns_loge ("Failed to create new socket"); - goto done; - } - - /* setting TCP_NODELAY to TRUE in order to avoid packet batching as known as Nagle's algorithm */ - if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, TRUE, &err)) { - nns_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); - goto done; - } - - if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - nns_logd ("Cancelled connecting"); - } else { - nns_loge ("Failed to connect to host"); - } - goto done; - } - - /* now connected to the requested socket */ - ret = TRUE; - -done: - g_object_unref (saddr); - g_clear_error (&err); - return ret; -} - -/** - * @brief Set timeout. - */ -void -nnstreamer_query_set_timeout (query_connection_handle connection, - uint32_t timeout) -{ - TensorQueryConnection *conn = (TensorQueryConnection *) connection; - - if (conn->timeout != timeout) { - conn->timeout = timeout; - - switch (conn->protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - g_socket_set_timeout (conn->socket, timeout); - break; - default: - /* NYI */ - nns_loge ("Invalid protocol"); - break; - } - } -} - -/** - * @brief Set client ID. - */ -void -nnstreamer_query_set_client_id (query_connection_handle connection, - query_client_id_t id) -{ - TensorQueryConnection *conn = (TensorQueryConnection *) connection; - conn->client_id = id; -} - -/** - * @brief Connect to the specified address. - */ -query_connection_handle -nnstreamer_query_connect (TensorQueryProtocol protocol, const char *ip, - uint16_t port, uint32_t timeout) -{ - TensorQueryConnection *conn = g_new0 (TensorQueryConnection, 1); - - conn->protocol = protocol; - conn->host = g_strdup (ip); - conn->port = port; - conn->timeout = timeout; - - switch (protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - { - conn->cancellable = g_cancellable_new (); - if (!gst_tensor_query_connect (conn)) { - nns_loge ("Failed to create new socket"); - nnstreamer_query_close (conn); - return NULL; - } - break; - } - default: - nns_loge ("Unsupported protocol."); - nnstreamer_query_close (conn); - return NULL; - } - - nnstreamer_query_set_timeout (conn, timeout); - return conn; -} - -/** - * @brief receive command from connected device. - * @return 0 if OK, negative value if error - * @note The socket operates in two modes: blocking and non-blocking. - * In non-blocking mode, if there is no data available, it is immediately returned. - */ -int -nnstreamer_query_receive (query_connection_handle connection, - TensorQueryCommandData * data) -{ - TensorQueryConnection *conn = (TensorQueryConnection *) connection; - - if (!conn) { - nns_loge ("Invalid connection data"); - return -EINVAL; - } - - switch (conn->protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - { - TensorQueryCommand cmd; - - if (query_tcp_receive (conn->socket, (uint8_t *) & cmd, sizeof (cmd), - conn->cancellable) < 0) { - nns_loge ("Failed to receive command from socket"); - return -EREMOTEIO; - } - - nns_logd ("Received command: %d", cmd); - data->cmd = cmd; - - if (cmd == _TENSOR_QUERY_CMD_TRANSFER_DATA || - cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) { - /* receive size */ - if (query_tcp_receive (conn->socket, (uint8_t *) & data->data.size, - sizeof (data->data.size), conn->cancellable) < 0) { - nns_loge ("Failed to receive data size from socket"); - return -EREMOTEIO; - } - - if (cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) { - data->data.data = (uint8_t *) g_malloc0 (data->data.size); - } - - /* receive data */ - if (query_tcp_receive (conn->socket, (uint8_t *) data->data.data, - data->data.size, conn->cancellable) < 0) { - nns_loge ("Failed to receive data from socket"); - return -EREMOTEIO; - } - return 0; - } else if (data->cmd == _TENSOR_QUERY_CMD_CLIENT_ID) { - /* receive client id */ - if (query_tcp_receive (conn->socket, (uint8_t *) & data->client_id, - CLIENT_ID_LEN, conn->cancellable) < 0) { - nns_logd ("Failed to receive client id from socket"); - return -EREMOTEIO; - } - } else { - /* receive data_info */ - if (query_tcp_receive (conn->socket, (uint8_t *) & data->data_info, - sizeof (TensorQueryDataInfo), conn->cancellable) < 0) { - nns_logd ("Failed to receive data info from socket"); - return -EREMOTEIO; - } - } - } - break; - default: - /* NYI */ - return -EPROTONOSUPPORT; - } - return 0; -} - -/** - * @brief send command to connected device. - * @return 0 if OK, negative value if error - */ -int -nnstreamer_query_send (query_connection_handle connection, - TensorQueryCommandData * data) -{ - TensorQueryConnection *conn = (TensorQueryConnection *) connection; - - if (!data) { - nns_loge ("Sending data is NULL"); - return -EINVAL; - } - if (!conn) { - nns_loge ("Invalid connection data"); - return -EINVAL; - } - - switch (conn->protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - if (!query_tcp_send (conn->socket, (uint8_t *) & data->cmd, - sizeof (TensorQueryCommand), conn->cancellable)) { - nns_logd ("Failed to send to socket"); - return -EREMOTEIO; - } - if (data->cmd == _TENSOR_QUERY_CMD_TRANSFER_DATA || - data->cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) { - /* send size */ - if (!query_tcp_send (conn->socket, (uint8_t *) & data->data.size, - sizeof (data->data.size), conn->cancellable)) { - nns_logd ("Failed to send size to socket"); - return -EREMOTEIO; - } - /* send data */ - if (!query_tcp_send (conn->socket, (uint8_t *) data->data.data, - data->data.size, conn->cancellable)) { - nns_logd ("Failed to send data to socket"); - return -EREMOTEIO; - } - } else if (data->cmd == _TENSOR_QUERY_CMD_CLIENT_ID) { - /* send client id */ - if (!query_tcp_send (conn->socket, (uint8_t *) & data->client_id, - CLIENT_ID_LEN, conn->cancellable)) { - nns_logd ("Failed to send client id to socket"); - return -EREMOTEIO; - } - } else { - /* send data_info */ - if (!query_tcp_send (conn->socket, (uint8_t *) & data->data_info, - sizeof (TensorQueryDataInfo), conn->cancellable)) { - nns_logd ("Failed to send data_info to socket"); - return -EREMOTEIO; - } - } - break; - default: - /* NYI */ - return -EPROTONOSUPPORT; - } - return 0; -} - -/** - * @brief free connection - * @return 0 if OK, negative value if error - */ -int -nnstreamer_query_close (query_connection_handle connection) -{ - TensorQueryConnection *conn = (TensorQueryConnection *) connection; - GError *err = NULL; - int ret = 0; - - if (!conn) { - nns_loge ("Invalid connection data"); - return -EINVAL; - } - switch (conn->protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - { - if (conn->socket) { - if (!g_socket_close (conn->socket, &err)) { - nns_loge ("Failed to close socket: %s", err->message); - g_clear_error (&err); - } - g_object_unref (conn->socket); - conn->socket = NULL; - } - - if (conn->cancellable) { - g_object_unref (conn->cancellable); - conn->cancellable = NULL; - } - break; - } - default: - /* NYI */ - ret = -EPROTONOSUPPORT; - break; - } - - g_free (conn->host); - conn->host = NULL; - g_free (conn); - return ret; -} - -/** - * @brief return initialized server handle - * @return query_server_handle, NULL if error - */ -query_server_handle -nnstreamer_query_server_data_new (void) -{ - TensorQueryServerData *sdata = g_try_new0 (TensorQueryServerData, 1); - if (!sdata) { - nns_loge ("Failed to allocate server data"); - return NULL; - } - - return (query_server_handle) sdata; -} - -/** - * @brief free server handle - */ -void -nnstreamer_query_server_data_free (query_server_handle server_data) -{ - TensorQueryServerData *sdata = (TensorQueryServerData *) server_data; - if (!sdata) - return; - - switch (sdata->protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - { - TensorQueryConnection *conn_remained; - GstBuffer *buf_remained; - - if (sdata->conn_queue) { - while ((conn_remained = g_async_queue_try_pop (sdata->conn_queue))) { - conn_remained->running = 0; - pthread_join (conn_remained->msg_thread, NULL); - nnstreamer_query_close (conn_remained); - } - g_async_queue_unref (sdata->conn_queue); - sdata->conn_queue = NULL; - } - - if (sdata->is_src && sdata->msg_queue) { - while ((buf_remained = g_async_queue_try_pop (sdata->msg_queue))) { - gst_buffer_unref (buf_remained); - } - g_async_queue_unref (sdata->msg_queue); - sdata->msg_queue = NULL; - } - - if (sdata->socket_listener) { - g_socket_listener_close (sdata->socket_listener); - g_object_unref (sdata->socket_listener); - sdata->socket_listener = NULL; - } - - if (sdata->cancellable) { - g_object_unref (sdata->cancellable); - sdata->cancellable = NULL; - } - break; - } - default: - /* NYI */ - nns_loge ("Invalid protocol"); - break; - } - g_free (sdata->src_caps_str); - g_free (sdata->sink_caps_str); - g_free (sdata); -} - -/** - * @brief set server handle params and setup server - * @return 0 if OK, negative value if error - */ -int -nnstreamer_query_server_init (query_server_handle server_data, - TensorQueryProtocol protocol, const char *host, uint16_t port, - int8_t is_src) -{ - TensorQueryServerData *sdata = (TensorQueryServerData *) server_data; - GSocketAddress *saddr = NULL; - GError *err = NULL; - int ret = 0; - - if (!sdata) - return -EINVAL; - - sdata->protocol = protocol; - sdata->is_src = is_src; - - switch (protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - sdata->cancellable = g_cancellable_new (); - if (!gst_tensor_query_get_saddr (host, port, sdata->cancellable, &saddr)) { - nns_loge ("Failed to get socket address"); - ret = -EADDRNOTAVAIL; - goto error; - } - - sdata->socket_listener = g_socket_listener_new (); - g_socket_listener_set_backlog (sdata->socket_listener, N_BACKLOG); - - if (!g_socket_listener_add_address (sdata->socket_listener, saddr, - G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) { - nns_loge ("Failed to add address: %s", err->message); - g_clear_error (&err); - ret = -EADDRNOTAVAIL; - goto error; - } - - sdata->conn_queue = g_async_queue_new (); - if (sdata->is_src) - sdata->msg_queue = g_async_queue_new (); - - g_socket_listener_accept_socket_async (sdata->socket_listener, - sdata->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, - sdata); - break; - default: - /* NYI */ - nns_loge ("Invalid protocol"); - ret = -EPROTONOSUPPORT; - break; - } - -error: - if (saddr) - g_object_unref (saddr); - - return ret; -} - -/** - * @brief accept connection from remote - * @return query_connection_handle including connection data - */ -query_connection_handle -nnstreamer_query_server_accept (query_server_handle server_data, - query_client_id_t client_id) -{ - TensorQueryServerData *sdata = (TensorQueryServerData *) server_data; - TensorQueryConnection *conn; - gint total, checked; - - if (!sdata) - return NULL; - - switch (sdata->protocol) { - case _TENSOR_QUERY_PROTOCOL_TCP: - { - total = g_async_queue_length (sdata->conn_queue); - checked = 0; - - while (checked < total) { - conn = g_async_queue_pop (sdata->conn_queue); - checked++; - - if (!_query_check_connection (conn)) { - nnstreamer_query_close (conn); - continue; - } - - g_async_queue_push (sdata->conn_queue, conn); - - if (client_id == _query_get_client_id (conn)) - return conn; - } - break; - } - default: - /* NYI */ - nns_loge ("Invalid protocol"); - break; - } - - return NULL; -} - -/** - * @brief [TCP] receive data for tcp server - * @return 0 if OK, negative value if error - */ -static int -query_tcp_receive (GSocket * socket, uint8_t * data, size_t size, - GCancellable * cancellable) -{ - size_t bytes_received = 0; - ssize_t rret; - GError *err = NULL; - - while (bytes_received < size) { - rret = g_socket_receive (socket, (char *) data + bytes_received, - size - bytes_received, cancellable, &err); - - if (rret == 0) { - nns_logi ("Connection closed"); - return -EREMOTEIO; - } - - if (rret < 0) { - nns_logi ("Failed to read from socket: %s", err->message); - g_clear_error (&err); - return -EREMOTEIO; - } - bytes_received += rret; - } - return 0; -} - -/** - * @brief [TCP] send data for tcp server - */ -static gboolean -query_tcp_send (GSocket * socket, uint8_t * data, size_t size, - GCancellable * cancellable) -{ - size_t bytes_sent = 0; - ssize_t rret; - GError *err = NULL; - while (bytes_sent < size) { - rret = g_socket_send (socket, (char *) data + bytes_sent, - size - bytes_sent, cancellable, &err); - if (rret == 0) { - nns_logi ("Connection closed"); - return FALSE; - } - if (rret < 0) { - nns_loge ("Error while sending data %s", err->message); - g_clear_error (&err); - return FALSE; - } - bytes_sent += rret; - } - return TRUE; -} - -/** - * @brief [TCP] Receive buffer from the client - * @param[in] conn connection info - */ -static void * -_message_handler (void *thread_data) -{ - TensorQueryMsgThreadData *_thread_data = - (TensorQueryMsgThreadData *) thread_data; - TensorQueryConnection *_conn = _thread_data->conn; - TensorQueryServerData *_sdata = _thread_data->sdata; - TensorQueryCommandData cmd_data_receive; - TensorQueryCommandData cmd_data_send; - GstBuffer *outbuf = NULL; - GstMetaQuery *meta_query; - - cmd_data_send.cmd = _TENSOR_QUERY_CMD_CLIENT_ID; - cmd_data_send.client_id = g_get_monotonic_time (); - - if (0 != nnstreamer_query_send (_conn, &cmd_data_send)) { - nns_loge ("Failed to send client id to client"); - goto done; - } - nnstreamer_query_set_client_id (_conn, cmd_data_send.client_id); - - if (0 != nnstreamer_query_receive (_conn, &cmd_data_receive)) { - nns_logi ("Failed to receive cmd"); - goto done; - } - - if (cmd_data_receive.cmd == _TENSOR_QUERY_CMD_REQUEST_INFO) { - GstCaps *server_caps, *client_caps; - GstStructure *server_st, *client_st; - gboolean result = FALSE; - - server_caps = gst_caps_from_string (_sdata->src_caps_str); - client_caps = gst_caps_from_string ((char *) cmd_data_receive.data.data); - /** Server framerate may vary. Let's skip comparing the framerate. */ - gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1, - NULL); - gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1, - NULL); - - server_st = gst_caps_get_structure (server_caps, 0); - client_st = gst_caps_get_structure (client_caps, 0); - - if (gst_structure_is_tensor_stream (server_st)) { - GstTensorsConfig server_config, client_config; - - gst_tensors_config_from_structure (&server_config, server_st); - gst_tensors_config_from_structure (&client_config, client_st); - - result = gst_tensors_config_is_equal (&server_config, &client_config); - } - - if (result || gst_caps_can_intersect (client_caps, server_caps)) { - cmd_data_send.cmd = _TENSOR_QUERY_CMD_RESPOND_APPROVE; - cmd_data_send.data.data = (uint8_t *) _sdata->sink_caps_str; - cmd_data_send.data.size = (size_t) strlen (_sdata->sink_caps_str) + 1; - } else { - /* respond deny with src caps string */ - nns_loge ("Query caps is not acceptable!"); - nns_loge ("Query client sink caps: %s", cmd_data_receive.data.data); - nns_loge ("Query server src caps: %s", _sdata->src_caps_str); - - cmd_data_send.cmd = _TENSOR_QUERY_CMD_RESPOND_DENY; - cmd_data_send.data.data = (uint8_t *) _sdata->src_caps_str; - cmd_data_send.data.size = (size_t) strlen (_sdata->src_caps_str) + 1; - } - - g_free (cmd_data_receive.data.data); - cmd_data_receive.data.data = NULL; - - gst_caps_unref (server_caps); - gst_caps_unref (client_caps); - - if (nnstreamer_query_send (_conn, &cmd_data_send) != 0) { - nns_logi ("Failed to send respond"); - goto done; - } - } - - while (_conn->running) { - if (!_query_check_connection (_conn)) - break; - - outbuf = tensor_query_receive_buffer (_conn); - if (outbuf) { - meta_query = gst_buffer_add_meta_query (outbuf); - if (meta_query) { - meta_query->client_id = _query_get_client_id (_conn); - } - - g_async_queue_push (_sdata->msg_queue, outbuf); - } - } - -done: - g_free (thread_data); - _conn->running = 0; - return NULL; -} - -/** - * @brief [TCP] Callback for socket listener that pushes socket to the queue - */ -static void -accept_socket_async_cb (GObject * source, GAsyncResult * result, - gpointer user_data) -{ - GSocketListener *socket_listener = G_SOCKET_LISTENER (source); - GSocket *socket = NULL; - GSocketAddress *saddr = NULL; - GError *err = NULL; - TensorQueryServerData *sdata = user_data; - TensorQueryConnection *conn = NULL; - TensorQueryCommandData cmd_data; - gboolean done = FALSE; - - socket = - g_socket_listener_accept_socket_finish (socket_listener, result, NULL, - &err); - if (!socket) { - nns_loge ("Failed to get socket: %s", err->message); - g_clear_error (&err); - goto error; - } - g_socket_set_timeout (socket, QUERY_DEFAULT_TIMEOUT_SEC); - /* create socket with connection */ - conn = g_try_new0 (TensorQueryConnection, 1); - if (!conn) { - nns_loge ("Failed to allocate connection"); - goto error; - } - - conn->socket = socket; - conn->cancellable = g_cancellable_new (); - - /* setting TCP_NODELAY to TRUE in order to avoid packet batching as known as Nagle's algorithm */ - if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, TRUE, &err)) { - nns_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); - g_clear_error (&err); - goto error; - } - - saddr = g_socket_get_remote_address (socket, &err); - if (!saddr) { - nns_loge ("Failed to get socket address: %s", err->message); - g_clear_error (&err); - goto error; - } - conn->protocol = (g_socket_get_protocol (socket) == G_SOCKET_PROTOCOL_TCP) ? - _TENSOR_QUERY_PROTOCOL_TCP : _TENSOR_QUERY_PROTOCOL_END; - conn->host = g_inet_address_to_string (g_inet_socket_address_get_address ( - (GInetSocketAddress *) saddr)); - conn->port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr); - g_object_unref (saddr); - nns_logi ("New client connected from %s:%u", conn->host, conn->port); - - /** Generate and send client_id to client */ - if (sdata->is_src) { - TensorQueryMsgThreadData *thread_data = NULL; - pthread_attr_t attr; - int tid; - - thread_data = g_try_new0 (TensorQueryMsgThreadData, 1); - if (!thread_data) { - nns_loge ("Failed to allocate query thread data."); - goto error; - } - conn->running = 1; - thread_data->sdata = sdata; - thread_data->conn = conn; - - pthread_attr_init (&attr); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); - tid = pthread_create (&conn->msg_thread, &attr, _message_handler, - thread_data); - pthread_attr_destroy (&attr); - - if (tid < 0) { - nns_loge ("Failed to create message handler thread."); - nnstreamer_query_close (conn); - g_free (thread_data); - return; - } - } else { /** server sink */ - if (0 != nnstreamer_query_receive (conn, &cmd_data)) { - nns_loge ("Failed to receive command."); - goto error; - } - if (cmd_data.cmd == _TENSOR_QUERY_CMD_CLIENT_ID) { - nns_logd ("Connected client id: %ld", (long) cmd_data.client_id); - nnstreamer_query_set_client_id (conn, cmd_data.client_id); - } - } - - done = TRUE; - g_async_queue_push (sdata->conn_queue, conn); - -error: - if (!done) { - nnstreamer_query_close (conn); - } - - g_socket_listener_accept_socket_async (socket_listener, - sdata->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, sdata); -} - -/** - * @brief set server source and sink caps string. - */ -void -nnstreamer_query_server_data_set_caps_str (query_server_handle server_data, - const char *src_caps_str, const char *sink_caps_str) -{ - TensorQueryServerData *sdata = (TensorQueryServerData *) server_data; - g_free (sdata->src_caps_str); - g_free (sdata->sink_caps_str); - sdata->src_caps_str = g_strdup (src_caps_str); - sdata->sink_caps_str = g_strdup (sink_caps_str); -} - -/** - * @brief Get buffer from message queue. - */ -GstBuffer * -nnstreamer_query_server_get_buffer (query_server_handle server_data) -{ - TensorQueryServerData *sdata = (TensorQueryServerData *) server_data; - - return GST_BUFFER (g_async_queue_pop (sdata->msg_queue)); -} - -/** - * @brief Send gst-buffer to destination node. - * @return True if all data in gst-buffer is successfully sent. False if failed to transfer data. - * @todo This function should be used in nnstreamer element. Update function name rule and params later. - */ -gboolean -tensor_query_send_buffer (query_connection_handle connection, - GstElement * element, GstBuffer * buffer) -{ - TensorQueryCommandData cmd_data = { 0 }; - GstMemory *mem[NNS_TENSOR_SIZE_LIMIT]; - GstMapInfo map[NNS_TENSOR_SIZE_LIMIT]; - gboolean done = FALSE; - guint i, num_mems; - - num_mems = gst_buffer_n_memory (buffer); - - /* start */ - cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_START; - cmd_data.data_info.base_time = gst_element_get_base_time (element); - cmd_data.data_info.duration = GST_BUFFER_DURATION (buffer); - cmd_data.data_info.dts = GST_BUFFER_DTS (buffer); - cmd_data.data_info.pts = GST_BUFFER_PTS (buffer); - cmd_data.data_info.num_mems = num_mems; - - /* memory chunks in gst-buffer */ - for (i = 0; i < num_mems; i++) { - mem[i] = gst_buffer_peek_memory (buffer, i); - if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) { - ml_loge ("Cannot map the %uth memory in gst-buffer.", i); - num_mems = i; - goto error; - } - - cmd_data.data_info.mem_sizes[i] = map[i].size; - } - - if (nnstreamer_query_send (connection, &cmd_data) != 0) { - nns_loge ("Failed to send start command."); - goto error; - } - - /* transfer data */ - cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_DATA; - for (i = 0; i < num_mems; i++) { - cmd_data.data.size = map[i].size; - cmd_data.data.data = map[i].data; - - if (nnstreamer_query_send (connection, &cmd_data) != 0) { - nns_loge ("Failed to send %uth data buffer", i); - goto error; - } - } - - /* done */ - cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_END; - if (nnstreamer_query_send (connection, &cmd_data) != 0) { - nns_loge ("Failed to send end command."); - goto error; - } - - done = TRUE; - -error: - for (i = 0; i < num_mems; i++) - gst_memory_unmap (mem[i], &map[i]); - - return done; -} - -/** - * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer. - * @return Newly generated gst-buffer. Null if failed to receive data. - * @todo This function should be used in nnstreamer element. Update function name rule and params later. - */ -GstBuffer * -tensor_query_receive_buffer (query_connection_handle connection) -{ - TensorQueryCommandData cmd_data = { 0 }; - GstBuffer *buffer = NULL; - gboolean done = FALSE; - gpointer data; - gsize len; - guint i, num_mems; - - if (nnstreamer_query_receive (connection, &cmd_data) != 0) { - nns_loge ("Failed to receive start command."); - goto error; - } - - if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_START) { - nns_loge ("Invalid command %d, cannot start data transfer.", cmd_data.cmd); - goto error; - } - - buffer = gst_buffer_new (); - - num_mems = cmd_data.data_info.num_mems; - for (i = 0; i < num_mems; i++) { - len = cmd_data.data_info.mem_sizes[i]; - data = g_malloc0 (len); - - cmd_data.data.data = data; - cmd_data.data.size = len; - - if (nnstreamer_query_receive (connection, &cmd_data) != 0) { - nns_loge ("Failed to receive %uth data.", i); - g_free (data); - goto error; - } - - gst_buffer_append_memory (buffer, - gst_memory_new_wrapped (0, data, len, 0, len, data, g_free)); - } - - /* done */ - if (nnstreamer_query_receive (connection, &cmd_data) != 0) { - nns_loge ("Failed to receive end command."); - goto error; - } - - if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) { - nns_loge ("Invalid command %d, failed to transfer data.", cmd_data.cmd); - goto error; - } - - done = TRUE; - -error: - if (!done) { - if (buffer) { - gst_buffer_unref (buffer); - buffer = NULL; - } - } - - return buffer; -} - -/** * @brief register GEnumValue array for query protocol property handling */ GType @@ -1115,11 +32,11 @@ gst_tensor_query_protocol_get_type (void) static GType protocol = 0; if (protocol == 0) { static GEnumValue protocols[] = { - {_TENSOR_QUERY_PROTOCOL_TCP, "TCP", + {NNS_EDGE_PROTOCOL_TCP, "TCP", "Raw TCP protocol. Directly sending stream frames via TCP connections."}, - {_TENSOR_QUERY_PROTOCOL_UDP, "UDP", + {NNS_EDGE_PROTOCOL_UDP, "UDP", "Raw UDP protocol. Directly sending stream frames via UDP connections."}, - {_TENSOR_QUERY_PROTOCOL_MQTT, "MQTT", "Connect with MQTT brokers."}, + {NNS_EDGE_PROTOCOL_MQTT, "MQTT", "Connect with MQTT brokers."}, {0, NULL, NULL}, }; protocol = g_enum_register_static ("tensor_query_protocol", protocols); diff --git a/gst/nnstreamer/tensor_query/tensor_query_common.h b/gst/nnstreamer/tensor_query/tensor_query_common.h index 282e2d7..ccff038 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_common.h +++ b/gst/nnstreamer/tensor_query/tensor_query_common.h @@ -16,14 +16,12 @@ #include "tensor_typedef.h" #include "tensor_common.h" #include "tensor_meta.h" +#include "nnstreamer-edge.h" #ifdef __cplusplus extern "C" { #endif /* __cplusplus */ -typedef void * query_connection_handle; -typedef void * query_server_handle; - /** * @brief Default timeout, in seconds. */ @@ -32,173 +30,17 @@ typedef void * query_server_handle; /** * @brief protocol options for tensor query. */ -typedef enum -{ - _TENSOR_QUERY_PROTOCOL_TCP = 0, - _TENSOR_QUERY_PROTOCOL_UDP = 1, - _TENSOR_QUERY_PROTOCOL_MQTT = 2, - _TENSOR_QUERY_PROTOCOL_END -} TensorQueryProtocol; #define DEFAULT_HOST "localhost" -#define DEFAULT_PROTOCOL (_TENSOR_QUERY_PROTOCOL_TCP) +#define DEFAULT_PROTOCOL (NNS_EDGE_PROTOCOL_TCP) #define GST_TYPE_QUERY_PROTOCOL (gst_tensor_query_protocol_get_type ()) + /** * @brief register GEnumValue array for query protocol property handling */ GType gst_tensor_query_protocol_get_type (void); -/** - * @brief Structures for tensor query commands. - */ -typedef enum -{ - _TENSOR_QUERY_CMD_REQUEST_INFO = 0, - _TENSOR_QUERY_CMD_RESPOND_APPROVE = 1, - _TENSOR_QUERY_CMD_RESPOND_DENY = 2, - _TENSOR_QUERY_CMD_TRANSFER_START = 3, - _TENSOR_QUERY_CMD_TRANSFER_DATA = 4, - _TENSOR_QUERY_CMD_TRANSFER_END = 5, - _TENSOR_QUERY_CMD_CLIENT_ID = 6, - _TENSOR_QUERY_CMD_END -} TensorQueryCommand; - - -/** - * @brief Structures for tensor query data info. - */ -typedef struct -{ - int64_t base_time; - int64_t sent_time; - uint64_t duration; - uint64_t dts; - uint64_t pts; - uint32_t num_mems; - uint64_t mem_sizes[NNS_TENSOR_SIZE_LIMIT]; -} TensorQueryDataInfo; - -typedef struct -{ - uint8_t *data; - size_t size; -} TensorQueryData; - -/** - * @brief Structures for tensor query command buffers. - */ -typedef struct -{ - TensorQueryCommand cmd; - union - { - TensorQueryDataInfo data_info; /** _TENSOR_QUERY_CMD_REQUEST_INFO */ - TensorQueryData data; /** _TENSOR_QUERY_CMD_TRANSFER_DATA */ - query_client_id_t client_id; /** _TENSOR_QUERY_CMD_CLIENT_ID */ - }; -} TensorQueryCommandData; - -/** - * @brief connect to the specified address. - * @return 0 if OK, negative value if error - */ -extern query_connection_handle -nnstreamer_query_connect (TensorQueryProtocol protocol, const char *ip, uint16_t port, uint32_t timeout); - -/** - * @brief Set timeout. - * @param timeout the timeout value, in seconds. 0 for none. - */ -extern void -nnstreamer_query_set_timeout (query_connection_handle connection, uint32_t timeout); - -/** - * @brief Set client ID. - */ -extern void -nnstreamer_query_set_client_id (query_connection_handle connection, query_client_id_t id); - -/** - * @brief send command to connected device. - * @return 0 if OK, negative value if error - */ -extern int -nnstreamer_query_send (query_connection_handle connection, TensorQueryCommandData *data); - -/** - * @brief receive command from connected device. - * @return 0 if OK, negative value if error - */ -extern int -nnstreamer_query_receive (query_connection_handle connection, TensorQueryCommandData *data); - -/** - * @brief close connection with corresponding id. - * @return 0 if OK, negative value if error - */ -extern int -nnstreamer_query_close (query_connection_handle connection); - -/* server */ -/** - * @brief accept connection from remote - * @return query_connection_handle including connection data - */ -extern query_connection_handle -nnstreamer_query_server_accept (query_server_handle server_data, query_client_id_t client_id); - -/** - * @brief return initialized server handle - * @return query_server_handle, NULL if error - */ -extern query_server_handle -nnstreamer_query_server_data_new (void); - -/** - * @brief free server handle - */ -extern void -nnstreamer_query_server_data_free (query_server_handle server_data); - -/** - * @brief set server handle params and setup server - * @return 0 if OK, negative value if error - */ -extern int -nnstreamer_query_server_init (query_server_handle server_data, - TensorQueryProtocol protocol, const char *host, uint16_t port, int8_t is_src); - -/** - * @brief set server source and sink tensors config. - */ -extern void -nnstreamer_query_server_data_set_caps_str (query_server_handle server_data, - const char * src_caps_str, const char * sink_caps_str); - -/** - * @brief Get buffer from message queue. - */ -extern GstBuffer * -nnstreamer_query_server_get_buffer (query_server_handle server_data); - -/** - * @brief Send gst-buffer to destination node. - * @return True if all data in gst-buffer is successfully sent. False if failed to transfer data. - * @todo This function should be used in nnstreamer element. Update function name rule and params later. - */ -extern gboolean -tensor_query_send_buffer (query_connection_handle connection, - GstElement * element, GstBuffer * buffer); - -/** - * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer. - * @return Newly generated gst-buffer. Null if failed to receive data. - * @todo This function should be used in nnstreamer element. Update function name rule and params later. - */ -extern GstBuffer * -tensor_query_receive_buffer (query_connection_handle connection); - #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/gst/nnstreamer/tensor_query/tensor_query_server.c b/gst/nnstreamer/tensor_query/tensor_query_server.c index a0eb46b..2d55bcd 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_server.c +++ b/gst/nnstreamer/tensor_query/tensor_query_server.c @@ -15,9 +15,9 @@ #endif #include "tensor_query_server.h" -#include #include #include +#include "nnstreamer-edge.h" /** * @brief mutex for tensor-query server table. @@ -33,35 +33,35 @@ static void init_queryserver (void) __attribute__((constructor)); static void fini_queryserver (void) __attribute__((destructor)); /** - * @brief Getter to get nth GstTensorQueryServerInfo. + * @brief Get nnstreamer edge server handle. */ -query_server_info_handle -gst_tensor_query_server_get_data (guint id) +edge_server_handle +gst_tensor_query_server_get_handle (char *id) { - gpointer p; + edge_server_handle p; G_LOCK (query_server_table); - p = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id)); + p = g_hash_table_lookup (_qs_table, id); G_UNLOCK (query_server_table); return p; } /** - * @brief Add GstTensorQueryServerInfo into hash table. + * @brief Add nnstreamer edge server handle into hash table. */ -query_server_info_handle -gst_tensor_query_server_add_data (guint id) +edge_server_handle +gst_tensor_query_server_add_data (char *id) { - GstTensorQueryServerInfo *data = NULL; + GstTensorQueryServer *data = NULL; - data = (GstTensorQueryServerInfo *) gst_tensor_query_server_get_data (id); + data = gst_tensor_query_server_get_handle (id); if (NULL != data) { return data; } - data = g_try_new0 (GstTensorQueryServerInfo, 1); + data = g_try_new0 (GstTensorQueryServer, 1); if (NULL == data) { GST_ERROR ("Failed to allocate memory for tensor query server data."); return NULL; @@ -70,50 +70,69 @@ gst_tensor_query_server_add_data (guint id) g_mutex_init (&data->lock); g_cond_init (&data->cond); data->id = id; - data->sink_host = NULL; - data->sink_port = 0; data->configured = FALSE; - data->sink_caps_str = NULL; + + if (0 != nns_edge_create_handle (id, "TEMP_SERVER_TOPIC", &data->edge_h)) { + GST_ERROR ("Failed to get nnstreamer edge handle."); + gst_tensor_query_server_remove_data (data); + return NULL; + } G_LOCK (query_server_table); - g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data); + g_hash_table_insert (_qs_table, g_strdup (id), data); G_UNLOCK (query_server_table); return data; } + /** - * @brief Remove GstTensorQueryServerInfo. + * @brief Get edge handle from server data. + */ +nns_edge_h +gst_tensor_query_server_get_edge_handle (edge_server_handle server_h) +{ + GstTensorQueryServer *data = (GstTensorQueryServer *) server_h; + + return data->edge_h; +} + +/** + * @brief Remove GstTensorQueryServer. */ void -gst_tensor_query_server_remove_data (query_server_info_handle server_info_h) +gst_tensor_query_server_remove_data (edge_server_handle server_h) { - GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h; + GstTensorQueryServer *data = (GstTensorQueryServer *) server_h; if (NULL == data) { return; } G_LOCK (query_server_table); - g_hash_table_remove (_qs_table, GUINT_TO_POINTER (data->id)); + if (!g_hash_table_lookup (_qs_table, data->id)) + g_hash_table_remove (_qs_table, data->id); G_UNLOCK (query_server_table); - g_free (data->sink_host); - data->sink_host = NULL; - g_free (data->sink_caps_str); - data->sink_caps_str = NULL; + + if (data->edge_h) { + nns_edge_release_handle (data->edge_h); + data->edge_h = NULL; + } + g_cond_clear (&data->cond); g_mutex_clear (&data->lock); g_free (data); + data = NULL; } /** * @brief Wait until the sink is configured and get server info handle. */ gboolean -gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h) +gst_tensor_query_server_wait_sink (edge_server_handle server_h) { gint64 end_time; - GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h; + GstTensorQueryServer *data = (GstTensorQueryServer *) server_h; if (NULL == data) { return FALSE; @@ -135,108 +154,22 @@ gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h) } /** - * @brief set sink caps string. + * @brief set query server sink configured. */ void -gst_tensor_query_server_set_sink_caps_str (query_server_info_handle - server_info_h, const gchar * caps_str) +gst_tensor_query_server_set_configured (edge_server_handle server_h) { - GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h; - + GstTensorQueryServer *data = (GstTensorQueryServer *) server_h; if (NULL == data) { return; } g_mutex_lock (&data->lock); - if (caps_str) { - g_free (data->sink_caps_str); - data->sink_caps_str = g_strdup (caps_str); - } data->configured = TRUE; g_cond_broadcast (&data->cond); g_mutex_unlock (&data->lock); } /** - * @brief get sink caps string. - */ -gchar * -gst_tensor_query_server_get_sink_caps_str (query_server_info_handle - server_info_h) -{ - GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h; - gchar *caps_str = NULL; - - if (NULL == data) { - return caps_str; - } - - g_mutex_lock (&data->lock); - caps_str = g_strdup (data->sink_caps_str); - g_mutex_unlock (&data->lock); - - return caps_str; -} - -/** - * @brief set sink host - */ -void -gst_tensor_query_server_set_sink_host (query_server_info_handle server_info_h, - gchar * host, guint16 port) -{ - GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h; - - if (NULL == data) { - return; - } - - g_mutex_lock (&data->lock); - data->sink_host = g_strdup (host); - data->sink_port = port; - g_mutex_unlock (&data->lock); -} - -/** - * @brief get sink host - */ -gchar * -gst_tensor_query_server_get_sink_host (query_server_info_handle server_info_h) -{ - GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h; - gchar *sink_host = NULL; - - if (NULL == data) { - return NULL; - } - - g_mutex_lock (&data->lock); - sink_host = g_strdup (data->sink_host); - g_mutex_unlock (&data->lock); - - return sink_host; -} - -/** - * @brief get sink port - */ -guint16 -gst_tensor_query_server_get_sink_port (query_server_info_handle server_info_h) -{ - GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h; - guint16 sink_port = 0; - - if (NULL == data) { - return sink_port; - } - - g_mutex_lock (&data->lock); - sink_port = data->sink_port; - g_mutex_unlock (&data->lock); - - return sink_port; -} - -/** * @brief Initialize the query server. */ static void @@ -244,7 +177,7 @@ init_queryserver (void) { G_LOCK (query_server_table); g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */ - _qs_table = g_hash_table_new (g_direct_hash, g_direct_equal); + _qs_table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); G_UNLOCK (query_server_table); } diff --git a/gst/nnstreamer/tensor_query/tensor_query_server.h b/gst/nnstreamer/tensor_query/tensor_query_server.h index 66282c9..492fde6 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_server.h +++ b/gst/nnstreamer/tensor_query/tensor_query_server.h @@ -15,79 +15,64 @@ #include #include +#include "nnstreamer-edge.h" +#include "tensor_meta.h" G_BEGIN_DECLS #define DEFAULT_SERVER_ID 0 #define DEFAULT_QUERY_INFO_TIMEOUT 5 typedef void * query_server_info_handle; +typedef void * edge_server_handle; /** * @brief GstTensorQueryServer internal info data structure. */ typedef struct { - gint64 id; - gchar *sink_caps_str; - gchar *sink_host; - guint16 sink_port; + char *id; gboolean configured; GMutex lock; GCond cond; -} GstTensorQueryServerInfo; + + nns_edge_h edge_h; +} GstTensorQueryServer; /** - * @brief Get GstTensorQueryServerInfo. + * @brief Get nnstreamer edge server handle. */ -query_server_info_handle -gst_tensor_query_server_get_data (guint id); +edge_server_handle +gst_tensor_query_server_get_handle (char *id); /** - * @brief Add GstTensorQueryServerInfo. + * @brief Add GstTensorQueryServer. */ -query_server_info_handle -gst_tensor_query_server_add_data (guint id); +edge_server_handle +gst_tensor_query_server_add_data (char *id); /** - * @brief Remove GstTensorQueryServerInfo. + * @brief Remove GstTensorQueryServer. */ void -gst_tensor_query_server_remove_data (query_server_info_handle server_info_h); +gst_tensor_query_server_remove_data (edge_server_handle server_h); /** * @brief Wait until the sink is configured and get server info handle. */ gboolean -gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h); +gst_tensor_query_server_wait_sink (edge_server_handle server_h); /** - * @brief set sink caps string. + * @brief Get edge handle from server data. */ -void gst_tensor_query_server_set_sink_caps_str (query_server_info_handle server_info_h, const gchar * caps_str); +nns_edge_h +gst_tensor_query_server_get_edge_handle (edge_server_handle server_h); /** - * @brief get sink caps string. - */ -gchar * -gst_tensor_query_server_get_sink_caps_str (query_server_info_handle server_info_h); - -/** - * @brief set sink host address and port + * @brief set query server sink configured. */ void -gst_tensor_query_server_set_sink_host (query_server_info_handle server_info_h, gchar *host, guint16 port); - -/** - * @brief get sink host - */ -gchar * -gst_tensor_query_server_get_sink_host (query_server_info_handle server_info_h); - -/** - * @brief get sink port - */ -guint16 -gst_tensor_query_server_get_sink_port (query_server_info_handle server_info_h); +gst_tensor_query_server_set_configured (edge_server_handle server_h); G_END_DECLS diff --git a/gst/nnstreamer/tensor_query/tensor_query_serversink.c b/gst/nnstreamer/tensor_query/tensor_query_serversink.c index c895fd0..f3a6e47 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_serversink.c +++ b/gst/nnstreamer/tensor_query/tensor_query_serversink.c @@ -16,6 +16,7 @@ #include #include "tensor_query_serversink.h" +#include "nnstreamer_util.h" GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_serversink_debug); #define GST_CAT_DEFAULT gst_tensor_query_serversink_debug @@ -34,8 +35,6 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", enum { PROP_0, - PROP_HOST, - PROP_PORT, PROP_PROTOCOL, PROP_ID, PROP_TIMEOUT, @@ -53,7 +52,6 @@ static void gst_tensor_query_serversink_get_property (GObject * object, static void gst_tensor_query_serversink_finalize (GObject * object); static gboolean gst_tensor_query_serversink_start (GstBaseSink * bsink); -static gboolean gst_tensor_query_serversink_stop (GstBaseSink * bsink); static GstFlowReturn gst_tensor_query_serversink_render (GstBaseSink * bsink, GstBuffer * buf); static gboolean gst_tensor_query_serversink_set_caps (GstBaseSink * basesink, @@ -77,14 +75,6 @@ gst_tensor_query_serversink_class_init (GstTensorQueryServerSinkClass * klass) gobject_class->get_property = gst_tensor_query_serversink_get_property; gobject_class->finalize = gst_tensor_query_serversink_finalize; - g_object_class_install_property (gobject_class, PROP_HOST, - g_param_spec_string ("host", "Host", "The hostname to listen as", - DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_PORT, - g_param_spec_uint ("port", "Port", - "The port to listen to (0=random available port)", 0, - 65535, DEFAULT_PORT_SINK, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PROTOCOL, g_param_spec_enum ("protocol", "Protocol", "The network protocol to establish connection", @@ -116,7 +106,6 @@ gst_tensor_query_serversink_class_init (GstTensorQueryServerSinkClass * klass) "Samsung Electronics Co., Ltd."); gstbasesink_class->start = gst_tensor_query_serversink_start; - gstbasesink_class->stop = gst_tensor_query_serversink_stop; gstbasesink_class->set_caps = gst_tensor_query_serversink_set_caps; gstbasesink_class->render = gst_tensor_query_serversink_render; @@ -130,12 +119,9 @@ gst_tensor_query_serversink_class_init (GstTensorQueryServerSinkClass * klass) static void gst_tensor_query_serversink_init (GstTensorQueryServerSink * sink) { - sink->host = g_strdup (DEFAULT_HOST); - sink->port = DEFAULT_PORT_SINK; sink->protocol = DEFAULT_PROTOCOL; sink->timeout = QUERY_DEFAULT_TIMEOUT_SEC; sink->sink_id = DEFAULT_SERVER_ID; - sink->server_data = nnstreamer_query_server_data_new (); sink->metaless_frame_count = 0; } @@ -146,15 +132,7 @@ static void gst_tensor_query_serversink_finalize (GObject * object) { GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (object); - g_free (sink->host); - if (sink->server_data) { - nnstreamer_query_server_data_free (sink->server_data); - sink->server_data = NULL; - } - if (sink->server_info_h) { - gst_tensor_query_server_remove_data (sink->server_info_h); - sink->server_info_h = NULL; - } + gst_tensor_query_server_remove_data (sink->server_h); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -168,17 +146,6 @@ gst_tensor_query_serversink_set_property (GObject * object, guint prop_id, GstTensorQueryServerSink *serversink = GST_TENSOR_QUERY_SERVERSINK (object); switch (prop_id) { - case PROP_HOST: - if (!g_value_get_string (value)) { - nns_logw ("host property cannot be NULL"); - break; - } - g_free (serversink->host); - serversink->host = g_value_dup_string (value); - break; - case PROP_PORT: - serversink->port = g_value_get_uint (value); - break; case PROP_PROTOCOL: serversink->protocol = g_value_get_enum (value); break; @@ -207,12 +174,6 @@ gst_tensor_query_serversink_get_property (GObject * object, guint prop_id, GstTensorQueryServerSink *serversink = GST_TENSOR_QUERY_SERVERSINK (object); switch (prop_id) { - case PROP_HOST: - g_value_set_string (value, serversink->host); - break; - case PROP_PORT: - g_value_set_uint (value, serversink->port); - break; case PROP_PROTOCOL: g_value_set_enum (value, serversink->protocol); break; @@ -240,22 +201,11 @@ gst_tensor_query_serversink_start (GstBaseSink * bsink) GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink); GstCaps *caps; gchar *caps_str = NULL; + gchar *id_str = NULL; - if (!sink->server_data) { - nns_loge ("Server_data is NULL"); - return FALSE; - } - - if (nnstreamer_query_server_init (sink->server_data, sink->protocol, - sink->host, sink->port, FALSE) != 0) { - nns_loge ("Failed to setup server"); - return FALSE; - } - - /** Set server sink information */ - sink->server_info_h = gst_tensor_query_server_add_data (sink->sink_id); - gst_tensor_query_server_set_sink_host (sink->server_info_h, sink->host, - sink->port); + id_str = g_strdup_printf ("%u", sink->sink_id); + sink->server_h = gst_tensor_query_server_add_data (id_str); + g_free (id_str); caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (bsink)); if (!caps) { @@ -265,8 +215,9 @@ gst_tensor_query_serversink_start (GstBaseSink * bsink) if (caps) { caps_str = gst_caps_to_string (caps); } - gst_tensor_query_server_set_sink_caps_str (sink->server_info_h, caps_str); + sink->edge_h = gst_tensor_query_server_get_edge_handle (sink->server_h); + gst_tensor_query_server_set_configured (sink->server_h); gst_caps_unref (caps); g_free (caps_str); @@ -274,18 +225,6 @@ gst_tensor_query_serversink_start (GstBaseSink * bsink) } /** - * @brief stop processing of query_serversink - */ -static gboolean -gst_tensor_query_serversink_stop (GstBaseSink * bsink) -{ - GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink); - nnstreamer_query_server_data_free (sink->server_data); - sink->server_data = NULL; - return TRUE; -} - -/** * @brief An implementation of the set_caps vmethod in GstBaseSinkClass */ static gboolean @@ -295,7 +234,8 @@ gst_tensor_query_serversink_set_caps (GstBaseSink * bsink, GstCaps * caps) gchar *caps_str; caps_str = gst_caps_to_string (caps); - gst_tensor_query_server_set_sink_caps_str (sink->server_info_h, caps_str); + nns_edge_set_info (sink->edge_h, "CAPS", "@query_server_sink_caps@"); + nns_edge_set_info (sink->edge_h, "CAPS", caps_str); g_free (caps_str); @@ -310,20 +250,41 @@ gst_tensor_query_serversink_render (GstBaseSink * bsink, GstBuffer * buf) { GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink); GstMetaQuery *meta_query; - query_connection_handle conn; + nns_edge_data_h data_h = NULL; + guint i, num_mems = 0; + gint ret; + GstMemory *mem[NNS_TENSOR_SIZE_LIMIT]; + GstMapInfo map[NNS_TENSOR_SIZE_LIMIT]; + char *val; meta_query = gst_buffer_get_meta_query (buf); if (meta_query) { sink->metaless_frame_count = 0; - conn = nnstreamer_query_server_accept (sink->server_data, - meta_query->client_id); - if (conn) { - if (!tensor_query_send_buffer (conn, GST_ELEMENT (sink), buf)) { - nns_logw ("Failed to send buffer to client, drop current buffer."); + + ret = nns_edge_data_create (&data_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_loge ("Failed to create data handle in server sink."); + return GST_FLOW_ERROR; + } + + num_mems = gst_buffer_n_memory (buf); + for (i = 0; i < num_mems; i++) { + mem[i] = gst_buffer_peek_memory (buf, i); + if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) { + ml_loge ("Cannot map the %uth memory in gst-buffer.", i); + num_mems = i; + goto done; } - } else { - nns_logw ("Cannot get the client connection, drop current buffer."); + nns_edge_data_add (data_h, map[i].data, map[i].size, NULL); } + + val = g_strdup_printf ("%ld", (long int) meta_query->client_id); + nns_edge_data_set_info (data_h, "client_id", val); + g_free (val); + + nns_edge_respond (sink->edge_h, data_h); + nns_edge_data_destroy (data_h); + data_h = NULL; } else { nns_logw ("Cannot get tensor query meta. Drop buffers!\n"); sink->metaless_frame_count++; @@ -336,6 +297,9 @@ gst_tensor_query_serversink_render (GstBaseSink * bsink, GstBuffer * buf) return GST_FLOW_ERROR; } } +done: + for (i = 0; i < num_mems; i++) + gst_memory_unmap (mem[i], &map[i]); return GST_FLOW_OK; } diff --git a/gst/nnstreamer/tensor_query/tensor_query_serversink.h b/gst/nnstreamer/tensor_query/tensor_query_serversink.h index 07bba42..b88da3b 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_serversink.h +++ b/gst/nnstreamer/tensor_query/tensor_query_serversink.h @@ -17,9 +17,8 @@ #include #include #include -#include "tensor_query_common.h" #include "tensor_query_server.h" - +#include "tensor_query_common.h" G_BEGIN_DECLS #define GST_TYPE_TENSOR_QUERY_SERVERSINK \ @@ -45,14 +44,14 @@ struct _GstTensorQueryServerSink GstBaseSink element; /**< parent object */ guint sink_id; - guint16 port; - gchar *host; - TensorQueryProtocol protocol; guint timeout; - query_server_handle server_data; query_server_info_handle server_info_h; gint metaless_frame_limit; gint metaless_frame_count; + + nns_edge_protocol_e protocol; + edge_server_handle server_h; + nns_edge_h edge_h; }; /** diff --git a/gst/nnstreamer/tensor_query/tensor_query_serversrc.c b/gst/nnstreamer/tensor_query/tensor_query_serversrc.c index 080c5d9..9859abd 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_serversrc.c +++ b/gst/nnstreamer/tensor_query/tensor_query_serversrc.c @@ -16,9 +16,10 @@ #include #include -#include "tensor_query_common.h" #include "tensor_query_serversrc.h" #include "tensor_query_server.h" +#include "tensor_query_common.h" +#include "nnstreamer_util.h" GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_serversrc_debug); #define GST_CAT_DEFAULT gst_tensor_query_serversrc_debug @@ -44,8 +45,6 @@ enum PROP_PROTOCOL, PROP_TIMEOUT, PROP_OPERATION, - PROP_BROKER_HOST, - PROP_BROKER_PORT, PROP_ID, PROP_IS_LIVE }; @@ -61,7 +60,6 @@ static void gst_tensor_query_serversrc_get_property (GObject * object, static void gst_tensor_query_serversrc_finalize (GObject * object); static gboolean gst_tensor_query_serversrc_start (GstBaseSrc * bsrc); -static gboolean gst_tensor_query_serversrc_stop (GstBaseSrc * bsrc); static GstFlowReturn gst_tensor_query_serversrc_create (GstPushSrc * psrc, GstBuffer ** buf); @@ -103,18 +101,10 @@ gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass) 3600, QUERY_DEFAULT_TIMEOUT_SEC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_OPERATION, - g_param_spec_string ("operation", "Operation", - "The main operation of the host and option if necessary. " - "(operation)/(optional topic for main operation).", + g_param_spec_string ("topic", "Topic", + "The main topic of the host and option if necessary. " + "(topic)/(optional topic for main topic).", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BROKER_HOST, - g_param_spec_string ("broker-host", "Broker Host", - "Broker host address to connect.", DEFAULT_BROKER_HOST, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BROKER_PORT, - g_param_spec_uint ("broker-port", "Broker Port", - "Broker port to connect.", 0, 65535, - DEFAULT_BROKER_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_ID, g_param_spec_uint ("id", "ID", "ID for distinguishing query servers.", 0, @@ -134,7 +124,6 @@ gst_tensor_query_serversrc_class_init (GstTensorQueryServerSrcClass * klass) "Samsung Electronics Co., Ltd."); gstbasesrc_class->start = gst_tensor_query_serversrc_start; - gstbasesrc_class->stop = gst_tensor_query_serversrc_stop; gstpushsrc_class->create = gst_tensor_query_serversrc_create; GST_DEBUG_CATEGORY_INIT (gst_tensor_query_serversrc_debug, @@ -151,13 +140,14 @@ gst_tensor_query_serversrc_init (GstTensorQueryServerSrc * src) src->port = DEFAULT_PORT_SRC; src->protocol = DEFAULT_PROTOCOL; src->timeout = QUERY_DEFAULT_TIMEOUT_SEC; - src->operation = NULL; - src->broker_host = g_strdup (DEFAULT_BROKER_HOST); - src->broker_port = DEFAULT_BROKER_PORT; + src->topic = NULL; + src->srv_host = g_strdup (DEFAULT_HOST); + src->srv_port = DEFAULT_PORT_SRC; src->src_id = DEFAULT_SERVER_ID; tensor_query_hybrid_init (&src->hybrid_info, NULL, 0, TRUE); - src->server_data = nnstreamer_query_server_data_new (); src->configured = FALSE; + src->msg_queue = g_async_queue_new (); + gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); /** set the timestamps on each buffer */ gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE); @@ -172,17 +162,20 @@ static void gst_tensor_query_serversrc_finalize (GObject * object) { GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (object); + nns_edge_data_h data_h; g_free (src->host); src->host = NULL; - g_free (src->operation); - src->operation = NULL; - g_free (src->broker_host); - src->broker_host = NULL; - if (src->server_data) { - nnstreamer_query_server_data_free (src->server_data); - src->server_data = NULL; + g_free (src->topic); + src->topic = NULL; + g_free (src->srv_host); + src->srv_host = NULL; + + while ((data_h = g_async_queue_try_pop (src->msg_queue))) { + nns_edge_data_destroy (data_h); } + g_async_queue_unref (src->msg_queue); + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -215,23 +208,11 @@ gst_tensor_query_serversrc_set_property (GObject * object, guint prop_id, break; case PROP_OPERATION: if (!g_value_get_string (value)) { - nns_logw - ("operation property cannot be NULL. Query-hybrid is disabled."); + nns_logw ("topic property cannot be NULL. Query-hybrid is disabled."); break; } - g_free (serversrc->operation); - serversrc->operation = g_value_dup_string (value); - break; - case PROP_BROKER_HOST: - if (!g_value_get_string (value)) { - nns_logw ("Broker host property cannot be NULL"); - break; - } - g_free (serversrc->broker_host); - serversrc->broker_host = g_value_dup_string (value); - break; - case PROP_BROKER_PORT: - serversrc->broker_port = g_value_get_uint (value); + g_free (serversrc->topic); + serversrc->topic = g_value_dup_string (value); break; case PROP_ID: serversrc->src_id = g_value_get_uint (value); @@ -269,13 +250,7 @@ gst_tensor_query_serversrc_get_property (GObject * object, guint prop_id, g_value_set_uint (value, serversrc->timeout); break; case PROP_OPERATION: - g_value_set_string (value, serversrc->operation); - break; - case PROP_BROKER_HOST: - g_value_set_string (value, serversrc->broker_host); - break; - case PROP_BROKER_PORT: - g_value_set_uint (value, serversrc->broker_port); + g_value_set_string (value, serversrc->topic); break; case PROP_ID: g_value_set_uint (value, serversrc->src_id); @@ -291,61 +266,147 @@ gst_tensor_query_serversrc_get_property (GObject * object, guint prop_id, } /** + * @brief nnstreamer-edge event callback. + */ +static int +_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data) +{ + nns_edge_event_e event_type; + int ret = NNS_EDGE_ERROR_NONE; + + GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) user_data; + if (0 != nns_edge_event_get_type (event_h, &event_type)) { + nns_loge ("Failed to get event type!"); + return NNS_EDGE_ERROR_UNKNOWN; + } + + switch (event_type) { + case NNS_EDGE_EVENT_NEW_DATA_RECEIVED: + { + nns_edge_data_h data; + + nns_edge_event_parse_new_data (event_h, &data); + g_async_queue_push (src->msg_queue, data); + break; + } + default: + break; + } + + return ret; +} + +/** * @brief start processing of query_serversrc, setting up the server */ static gboolean gst_tensor_query_serversrc_start (GstBaseSrc * bsrc) { GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc); + char *id_str = NULL; + char *port = NULL; - if (!src->server_data) { - nns_loge ("Server_data is NULL"); - return FALSE; - } + id_str = g_strdup_printf ("%d", src->src_id); + src->server_h = gst_tensor_query_server_add_data (id_str); + g_free (id_str); - if (nnstreamer_query_server_init (src->server_data, src->protocol, - src->host, src->port, TRUE) != 0) { - nns_loge ("Failed to setup server"); - return FALSE; - } - - src->server_info_h = gst_tensor_query_server_add_data (src->src_id); - if (!gst_tensor_query_server_wait_sink (src->server_info_h)) { - nns_loge ("Failed to get server information from query server."); - return FALSE; - } + src->edge_h = gst_tensor_query_server_get_edge_handle (src->server_h); + nns_edge_set_info (src->edge_h, "IP", src->host); + port = g_strdup_printf ("%d", src->port); + nns_edge_set_info (src->edge_h, "PORT", port); + g_free (port); /** Publish query sever connection info */ - if (src->operation) { - tensor_query_hybrid_set_node (&src->hybrid_info, src->host, src->port, - src->server_info_h); - tensor_query_hybrid_set_broker (&src->hybrid_info, - src->broker_host, src->broker_port); + if (src->topic) { + nns_edge_set_info (src->edge_h, "TOPIC", src->topic); + tensor_query_hybrid_set_node (&src->hybrid_info, src->srv_host, + src->srv_port, src->server_info_h); + tensor_query_hybrid_set_broker (&src->hybrid_info, src->host, src->port); - if (!tensor_query_hybrid_publish (&src->hybrid_info, src->operation)) { + if (!tensor_query_hybrid_publish (&src->hybrid_info, src->topic)) { nns_loge ("Failed to publish a topic."); return FALSE; } } else { + g_free (src->srv_host); + src->srv_host = g_strdup (src->host); + src->srv_port = src->port; nns_logi ("Query-hybrid feature is disabled."); nns_logi - ("Specify operation to register server to broker (e.g., operation=object_detection/mobilev3)."); + ("Specify topic to register server to broker (e.g., topic=object_detection/mobilev3)."); + } + nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src); + + if (0 != nns_edge_start (src->edge_h, true)) { + nns_loge + ("Failed to start NNStreamer-edge. Please check server IP and port"); + return FALSE; + } + + if (!gst_tensor_query_server_wait_sink (src->server_h)) { + nns_loge ("Failed to get server information from query server."); + return FALSE; } + return TRUE; } /** - * @brief stop processing of query_serversrc + * @brief Get buffer from message queue. */ -static gboolean -gst_tensor_query_serversrc_stop (GstBaseSrc * bsrc) +static GstBuffer * +_gst_tensor_query_serversrc_get_buffer (GstTensorQueryServerSrc * src) { - GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc); + nns_edge_data_h data_h; + GstBuffer *buffer = NULL; + guint i, num_data; + GstMetaQuery *meta_query; - tensor_query_hybrid_close (&src->hybrid_info); - nnstreamer_query_server_data_free (src->server_data); - src->server_data = NULL; - return TRUE; + data_h = g_async_queue_pop (src->msg_queue); + + if (!data_h) { + nns_loge ("Failed to get message from the server message queue"); + return NULL; + } + buffer = gst_buffer_new (); + + if (NNS_EDGE_ERROR_NONE != nns_edge_data_get_count (data_h, &num_data)) { + nns_loge ("Failed to get the number of memories of the edge data."); + gst_buffer_unref (buffer); + return NULL; + } + for (i = 0; i < num_data; i++) { + void *data = NULL; + size_t data_len = 0; + gpointer new_data; + + nns_edge_data_get (data_h, i, &data, &data_len); + new_data = _g_memdup (data, data_len); + + gst_buffer_append_memory (buffer, + gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data, + g_free)); + } + + if (buffer) { + meta_query = gst_buffer_add_meta_query (buffer); + if (meta_query) { + char *val; + int ret; + + ret = nns_edge_data_get_info (data_h, "client_id", &val); + if (NNS_EDGE_ERROR_NONE != ret) { + gst_buffer_unref (buffer); + buffer = NULL; + } else { + meta_query->client_id = g_ascii_strtoll (val, NULL, 10); + g_free (val); + } + } + } + nns_edge_data_destroy (data_h); + + return buffer; } /** @@ -357,33 +418,27 @@ gst_tensor_query_serversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (psrc); GstBaseSrc *bsrc = GST_BASE_SRC (psrc); - if (!src->server_data) { - nns_loge ("Server data is NULL"); - return GST_FLOW_ERROR; - } - if (!src->configured) { - gchar *sink_caps_str, *src_caps_str; + gchar *src_caps_str; GstCaps *caps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (bsrc), NULL); if (gst_caps_is_fixed (caps)) { gst_base_src_set_caps (bsrc, caps); } - src->server_info_h = gst_tensor_query_server_get_data (src->src_id); - sink_caps_str = - gst_tensor_query_server_get_sink_caps_str (src->server_info_h); src_caps_str = gst_caps_to_string (caps); - nnstreamer_query_server_data_set_caps_str (src->server_data, src_caps_str, - sink_caps_str); - - g_free (sink_caps_str); + nns_edge_set_info (src->edge_h, "CAPS", "@query_server_src_caps@"); + nns_edge_set_info (src->edge_h, "CAPS", src_caps_str); g_free (src_caps_str); gst_caps_unref (caps); src->configured = TRUE; } - *outbuf = nnstreamer_query_server_get_buffer (src->server_data); + *outbuf = _gst_tensor_query_serversrc_get_buffer (src); + if (!outbuf) { + nns_loge ("Failed to get buffer to push to the tensor query serversrc."); + return GST_FLOW_ERROR; + } return GST_FLOW_OK; } diff --git a/gst/nnstreamer/tensor_query/tensor_query_serversrc.h b/gst/nnstreamer/tensor_query/tensor_query_serversrc.h index f3616b0..2e703ec 100644 --- a/gst/nnstreamer/tensor_query/tensor_query_serversrc.h +++ b/gst/nnstreamer/tensor_query/tensor_query_serversrc.h @@ -15,8 +15,6 @@ #include #include -#include "tensor_query_common.h" -#include #include #include "tensor_query_hybrid.h" @@ -48,17 +46,20 @@ struct _GstTensorQueryServerSrc guint16 port; gchar *host; - TensorQueryProtocol protocol; + guint16 srv_port; + gchar *srv_host; guint timeout; /* Query-hybrid feature */ - gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */ + gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */ query_hybrid_info_s hybrid_info; - gchar *broker_host; - guint16 broker_port; - query_server_handle server_data; /* server data passed to common functions */ query_server_info_handle server_info_h; + + nns_edge_protocol_e protocol; + edge_server_handle server_h; + nns_edge_h edge_h; + GAsyncQueue *msg_queue; }; /** diff --git a/jni/nnstreamer.mk b/jni/nnstreamer.mk index bddab60..c541966 100644 --- a/jni/nnstreamer.mk +++ b/jni/nnstreamer.mk @@ -68,12 +68,17 @@ NNSTREAMER_PLUGINS_SRCS := \ $(NNSTREAMER_GST_HOME)/elements/gsttensor_split.c \ $(NNSTREAMER_GST_HOME)/elements/gsttensor_transform.c \ $(NNSTREAMER_GST_HOME)/tensor_filter/tensor_filter.c \ + +# tensor-query element with nnstreamer-edge +NNSTREAMER_QUERY_SRCS := \ $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_common.c \ $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_hybrid.c \ $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_client.c \ $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversink.c \ $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversrc.c \ - $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_server.c + $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_server.c \ + $(NNSTREAMER_GST_HOME)/tensor_query/nnstreamer_edge_common.c \ + $(NNSTREAMER_GST_HOME)/tensor_query/nnstreamer_edge_internal.c # source AMC (Android MediaCodec) NNSTREAMER_SOURCE_AMC_SRCS := \ diff --git a/meson.build b/meson.build index 9ee4444..1e4a79a 100644 --- a/meson.build +++ b/meson.build @@ -425,6 +425,10 @@ features = { 'target': 'aitt', 'project_args': { 'ENABLE_AITT' : 1 } }, + 'nnstreamer-edge-support': { + 'target': 'nnstreamer-edge', + 'project_args': { 'ENABLE_NNSTREAMER_EDGE': 1 } + }, 'mxnet-support': { 'extra_deps': [ mxnet_dep ] } diff --git a/meson_options.txt b/meson_options.txt index 04f1034..076bd98 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -24,6 +24,7 @@ option('mqtt-support', type: 'feature', value: 'auto') option('tvm-support', type: 'feature', value: 'auto') option('query-hybrid-support', type: 'feature', value: 'auto') option('trix-engine-support', type: 'feature', value: 'auto') +option('nnstreamer-edge-support', type: 'feature', value: 'auto') option('aitt-support', type: 'feature', value: 'auto') option('mxnet-support', type: 'feature', value: 'auto') diff --git a/packaging/nnstreamer.spec b/packaging/nnstreamer.spec index 8fdf3ea..73f5ec6 100644 --- a/packaging/nnstreamer.spec +++ b/packaging/nnstreamer.spec @@ -155,6 +155,7 @@ Requires: nnstreamer-core = %{version}-%{release} Requires: nnstreamer-configuration = %{version}-%{release} Requires: nnstreamer-single = %{version}-%{release} Recommends: nnstreamer-default-configuration = %{version}-%{release} +BuildRequires: nnstreamer-edge-devel ## Define build requirements ## BuildRequires: gstreamer-devel -- 2.7.4