Implement tensor query TCP connection using nnstreamer-edge lib.
Signed-off-by: gichan <gichan2.jang@samsung.com>
Priority: optional
Maintainer: MyungJoo Ham <myungjoo.ham@samsung.com>
Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4),
- ninja-build, meson (>=0.50), debhelper (>=9),
+ ninja-build, meson (>=0.50), debhelper (>=9), nnstreamer-edge-dev,
libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev,
gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good,
libgtest-dev, ssat, libpng-dev, libopencv-dev, liborc-0.4-dev, flex, bison,
Priority: optional
Maintainer: MyungJoo Ham <myungjoo.ham@samsung.com>
Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4),
- ninja-build, meson (>=0.49), debhelper (>=9),
+ ninja-build, meson (>=0.49), debhelper (>=9), nnstreamer-edge-dev,
libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev,
gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good,
libgtest-dev, libpng-dev, libopencv-dev, liborc-0.4-dev, flex, bison,
Priority: optional
Maintainer: MyungJoo Ham <myungjoo.ham@samsung.com>
Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4),
- ninja-build, meson (>=0.49), debhelper (>=9),
+ ninja-build, meson (>=0.49), debhelper (>=9), nnstreamer-edge-dev,
libgstreamer1.0-dev, libgstreamer-plugins-base1.0-dev, libglib2.0-dev,
gstreamer1.0-tools, gstreamer1.0-plugins-base, gstreamer1.0-plugins-good,
libgtest-dev, ssat, libpng-dev, libopencv-dev, liborc-0.4-dev,
# Build libraries ("both_libraries" are supported from 0.46.)
nnstreamer_shared = shared_library('nnstreamer',
nnstreamer_sources,
- dependencies: nnstreamer_deps,
+ dependencies: [nnstreamer_deps, nns_edge_dep],
include_directories: nnstreamer_inc,
install: true,
install_dir: plugins_install_dir
nnstreamer_static = static_library('nnstreamer',
nnstreamer_sources,
- dependencies: nnstreamer_deps,
+ dependencies: [nnstreamer_deps, nns_edge_dep],
include_directories: nnstreamer_inc,
install: true,
install_dir: nnstreamer_libdir
#endif /* __gnu_linux__ && !__ANDROID__ */
#include <tensor_filter/tensor_filter.h>
+#if defined(ENABLE_NNSTREAMER_EDGE)
#include <tensor_query/tensor_query_serversrc.h>
#include <tensor_query/tensor_query_serversink.h>
#include <tensor_query/tensor_query_client.h>
+#endif
#define NNSTREAMER_INIT(plugin,name,type) \
do { \
NNSTREAMER_INIT (plugin, transform, TRANSFORM);
NNSTREAMER_INIT (plugin, if, IF);
NNSTREAMER_INIT (plugin, rate, RATE);
+#if defined(ENABLE_NNSTREAMER_EDGE)
NNSTREAMER_INIT (plugin, query_serversrc, QUERY_SERVERSRC);
NNSTREAMER_INIT (plugin, query_serversink, QUERY_SERVERSINK);
NNSTREAMER_INIT (plugin, query_client, QUERY_CLIENT);
+#endif
#if defined(__gnu_linux__) && !defined(__ANDROID__)
/* IIO requires Linux / non-Android */
#if (GST_VERSION_MAJOR == 1) && (GST_VERSION_MINOR >= 8)
glib_dep, gio_dep, thread_dep
]
+nnstreamer_headers += join_paths(meson.current_source_dir(), 'nnstreamer-edge.h')
+
if aitt_support_is_available
nnstreamer_edge_sources += join_paths(meson.current_source_dir(), 'nnstreamer_edge_aitt.c')
nnstreamer_edge_deps += aitt_support_deps
typedef enum {
NNS_EDGE_PROTOCOL_TCP = 0,
+ NNS_EDGE_PROTOCOL_UDP,
NNS_EDGE_PROTOCOL_MQTT,
NNS_EDGE_PROTOCOL_AITT,
NNS_EDGE_PROTOCOL_AITT_TCP,
g_free (eh->topic);
g_free (eh->ip);
g_free (eh->recv_ip);
+ g_free (eh->caps_str);
g_hash_table_destroy (eh->conn_table);
nns_edge_unlock (eh);
#include <gio/gio.h>
#include <glib.h>
#include <string.h>
+#include "nnstreamer-edge.h"
+#include "tensor_query_common.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <ifaddrs.h>
/**
* @brief Macro for debug mode.
enum
{
PROP_0,
- PROP_SINK_HOST,
- PROP_SINK_PORT,
- PROP_SRC_HOST,
- PROP_SRC_PORT,
+ PROP_HOST,
+ PROP_PORT,
PROP_PROTOCOL,
PROP_OPERATION,
- PROP_BROKER_HOST,
- PROP_BROKER_PORT,
PROP_SILENT,
};
#define TCP_HIGHEST_PORT 65535
#define TCP_DEFAULT_HOST "localhost"
-#define TCP_DEFAULT_SINK_PORT 3000
#define TCP_DEFAULT_SRC_PORT 3001
#define DEFAULT_SILENT TRUE
gobject_class->finalize = gst_tensor_query_client_finalize;
/** install property goes here */
- g_object_class_install_property (gobject_class, PROP_SINK_HOST,
- g_param_spec_string ("sink-host", "Sink Host",
- "A tenor query sink host to send the packets to/from",
- TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_SINK_PORT,
- g_param_spec_uint ("sink-port", "Sink Port",
- "The port of tensor query sink to send the packets to/from", 0,
- TCP_HIGHEST_PORT, TCP_DEFAULT_SINK_PORT,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_SRC_HOST,
- g_param_spec_string ("src-host", "Source Host",
- "A tenor query src host to send the packets to/from",
+ g_object_class_install_property (gobject_class, PROP_HOST,
+ g_param_spec_string ("host", "Host",
+ "A tenor query server host to send the packets to/from",
TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_SRC_PORT,
- g_param_spec_uint ("src-port", "Source Port",
- "The port of tensor query src to send the packets to/from", 0,
+ g_object_class_install_property (gobject_class, PROP_PORT,
+ g_param_spec_uint ("port", "Port",
+ "The port of tensor query server to send the packets to/from", 0,
TCP_HIGHEST_PORT, TCP_DEFAULT_SRC_PORT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SILENT,
g_param_spec_string ("operation", "Operation",
"The main operation of the host.",
"", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BROKER_HOST,
- g_param_spec_string ("broker-host", "Broker Host",
- "Broker host address to connect.", DEFAULT_BROKER_HOST,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BROKER_PORT,
- g_param_spec_uint ("broker-port", "Broker Port",
- "Broker port to connect.", 0, 65535,
- DEFAULT_BROKER_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
}
/**
+ * @brief Get IP address
+ */
+static gchar *
+_get_ip_address (void)
+{
+ struct ifaddrs *addrs, *run;
+ gchar *ret = NULL;
+
+ if (0 != getifaddrs (&addrs))
+ goto done;
+ run = addrs;
+
+ while (run) {
+ if (run->ifa_addr && run->ifa_addr->sa_family == AF_INET) {
+ struct sockaddr_in *pAddr = (struct sockaddr_in *) run->ifa_addr;
+
+ if (NULL != strstr (run->ifa_name, "en") ||
+ NULL != strstr (run->ifa_name, "et")) {
+ g_free (ret);
+ ret = g_strdup (inet_ntoa (pAddr->sin_addr));
+ }
+ }
+ run = run->ifa_next;
+ }
+ freeifaddrs (addrs);
+
+done:
+ if (NULL == ret)
+ ret = g_strdup ("localhost");
+
+ return ret;
+}
+
+/**
* @brief initialize the new element
*/
static void
/* init properties */
self->silent = DEFAULT_SILENT;
self->protocol = DEFAULT_PROTOCOL;
- self->sink_conn = NULL;
- self->sink_host = g_strdup (TCP_DEFAULT_HOST);
- self->sink_port = TCP_DEFAULT_SINK_PORT;
- self->src_conn = NULL;
- self->src_host = g_strdup (TCP_DEFAULT_HOST);
- self->src_port = TCP_DEFAULT_SRC_PORT;
+ self->host = g_strdup (TCP_DEFAULT_HOST);
+ self->port = TCP_DEFAULT_SRC_PORT;
+ self->srv_host = g_strdup (TCP_DEFAULT_HOST);
+ self->srv_port = TCP_DEFAULT_SRC_PORT;
self->operation = NULL;
- self->broker_host = g_strdup (DEFAULT_BROKER_HOST);
- self->broker_port = DEFAULT_BROKER_PORT;
self->in_caps_str = NULL;
+ self->msg_queue = g_async_queue_new ();
tensor_query_hybrid_init (&self->hybrid_info, NULL, 0, FALSE);
+ nns_edge_create_handle ("TEMP_ID", "TEMP_CLIENT_TOPIC", &self->edge_h);
}
/**
gst_tensor_query_client_finalize (GObject * object)
{
GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
+ nns_edge_data_h data_h;
- g_free (self->sink_host);
- self->sink_host = NULL;
- g_free (self->src_host);
- self->src_host = NULL;
+ g_free (self->host);
+ self->host = NULL;
+ g_free (self->srv_host);
+ self->srv_host = NULL;
g_free (self->operation);
self->operation = NULL;
- g_free (self->broker_host);
- self->broker_host = NULL;
g_free (self->in_caps_str);
self->in_caps_str = NULL;
- tensor_query_hybrid_close (&self->hybrid_info);
- if (self->sink_conn) {
- nnstreamer_query_close (self->sink_conn);
- self->sink_conn = NULL;
+
+ while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
+ nns_edge_data_destroy (data_h);
}
- if (self->src_conn) {
- nnstreamer_query_close (self->src_conn);
- self->src_conn = NULL;
+ g_async_queue_unref (self->msg_queue);
+
+ tensor_query_hybrid_close (&self->hybrid_info);
+ if (self->edge_h) {
+ nns_edge_release_handle (self->edge_h);
+ self->edge_h = NULL;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
switch (prop_id) {
- case PROP_SINK_HOST:
+ case PROP_HOST:
if (!g_value_get_string (value)) {
nns_logw ("Sink host property cannot be NULL");
break;
}
- g_free (self->sink_host);
- self->sink_host = g_value_dup_string (value);
- break;
- case PROP_SINK_PORT:
- self->sink_port = g_value_get_uint (value);
- break;
- case PROP_SRC_HOST:
- if (!g_value_get_string (value)) {
- nns_logw ("Source host property cannot be NULL");
- break;
- }
- g_free (self->src_host);
- self->src_host = g_value_dup_string (value);
+ g_free (self->host);
+ self->host = g_value_dup_string (value);
break;
- case PROP_SRC_PORT:
- self->src_port = g_value_get_uint (value);
+ case PROP_PORT:
+ self->port = g_value_get_uint (value);
break;
case PROP_PROTOCOL:
{
/** @todo expand when other protocols are ready */
- TensorQueryProtocol protocol = g_value_get_enum (value);
- if (protocol == _TENSOR_QUERY_PROTOCOL_TCP)
+ nns_edge_protocol_e protocol = g_value_get_enum (value);
+ if (protocol == NNS_EDGE_PROTOCOL_TCP)
self->protocol = protocol;
}
break;
g_free (self->operation);
self->operation = g_value_dup_string (value);
break;
- case PROP_BROKER_HOST:
- if (!g_value_get_string (value)) {
- nns_logw ("Broker host property cannot be NULL");
- break;
- }
- g_free (self->broker_host);
- self->broker_host = g_value_dup_string (value);
- break;
- case PROP_BROKER_PORT:
- self->broker_port = g_value_get_uint (value);
- break;
case PROP_SILENT:
self->silent = g_value_get_boolean (value);
break;
GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
switch (prop_id) {
- case PROP_SINK_HOST:
- g_value_set_string (value, self->sink_host);
- break;
- case PROP_SINK_PORT:
- g_value_set_uint (value, self->sink_port);
+ case PROP_HOST:
+ g_value_set_string (value, self->host);
break;
- case PROP_SRC_HOST:
- g_value_set_string (value, self->src_host);
- break;
- case PROP_SRC_PORT:
- g_value_set_uint (value, self->src_port);
+ case PROP_PORT:
+ g_value_set_uint (value, self->port);
break;
case PROP_PROTOCOL:
g_value_set_enum (value, self->protocol);
case PROP_OPERATION:
g_value_set_string (value, self->operation);
break;
- case PROP_BROKER_HOST:
- g_value_set_string (value, self->broker_host);
- break;
- case PROP_BROKER_PORT:
- g_value_set_uint (value, self->broker_port);
break;
case PROP_SILENT:
g_value_set_boolean (value, self->silent);
}
/**
- * @brief Connect to query server. (Direct connection)
- */
-static gboolean
-_connect_to_server (GstTensorQueryClient * self)
-{
- TensorQueryCommandData cmd_buf;
- query_client_id_t client_id;
-
- nns_logd ("Server src info: %s:%u", self->src_host, self->src_port);
- self->src_conn = nnstreamer_query_connect (self->protocol, self->src_host,
- self->src_port, QUERY_DEFAULT_TIMEOUT_SEC);
- if (!self->src_conn) {
- nns_loge ("Failed to connect server source ");
- return FALSE;
- }
-
- /** Receive client ID from server src */
- if (0 != nnstreamer_query_receive (self->src_conn, &cmd_buf) ||
- cmd_buf.cmd != _TENSOR_QUERY_CMD_CLIENT_ID) {
- nns_loge ("Failed to receive client ID.");
- return FALSE;
- }
-
- client_id = cmd_buf.client_id;
- nnstreamer_query_set_client_id (self->src_conn, client_id);
-
- cmd_buf.cmd = _TENSOR_QUERY_CMD_REQUEST_INFO;
- cmd_buf.data.data = (uint8_t *) self->in_caps_str;
- cmd_buf.data.size = (size_t) strlen (self->in_caps_str) + 1;
-
- if (0 != nnstreamer_query_send (self->src_conn, &cmd_buf)) {
- nns_loge ("Failed to send request info cmd buf");
- return FALSE;
- }
-
- if (0 != nnstreamer_query_receive (self->src_conn, &cmd_buf)) {
- nns_loge ("Failed to receive response from the query server.");
- return FALSE;
- }
-
- if (cmd_buf.cmd == _TENSOR_QUERY_CMD_RESPOND_APPROVE) {
- if (!gst_tensor_query_client_update_caps (self, (char *) cmd_buf.data.data)) {
- nns_loge ("Failed to update client source caps.");
- return FALSE;
- }
- } else {
- /** @todo Retry for info */
- nns_loge ("Failed to receive approve command.");
- return FALSE;
- }
-
- nns_logd ("Server sink info: %s:%u", self->sink_host, self->sink_port);
- self->sink_conn =
- nnstreamer_query_connect (self->protocol, self->sink_host,
- self->sink_port, QUERY_DEFAULT_TIMEOUT_SEC);
- if (!self->sink_conn) {
- nns_loge ("Failed to connect server sink ");
- return FALSE;
- }
-
- nnstreamer_query_set_client_id (self->sink_conn, client_id);
- cmd_buf.cmd = _TENSOR_QUERY_CMD_CLIENT_ID;
- cmd_buf.client_id = client_id;
- if (0 != nnstreamer_query_send (self->sink_conn, &cmd_buf)) {
- nns_loge ("Failed to send client ID to server sink");
- return FALSE;
- }
- return TRUE;
-}
-
-/**
* @brief Copy server info.
*/
static void
_copy_srv_info (GstTensorQueryClient * self, query_server_info_s * server)
{
- g_free (self->src_host);
- self->src_host = g_strdup (server->src.host);
- self->src_port = server->src.port;
- g_free (self->sink_host);
- self->sink_host = g_strdup (server->sink.host);
- self->sink_port = server->sink.port;
+ g_free (self->srv_host);
+ self->srv_host = g_strdup (server->src.host);
+ self->srv_port = server->src.port;
}
/**
nns_logd ("Retry to connect to available server.");
while ((server = tensor_query_hybrid_get_server_info (&self->hybrid_info))) {
- nnstreamer_query_close (self->sink_conn);
- nnstreamer_query_close (self->src_conn);
- self->sink_conn = NULL;
- self->src_conn = NULL;
+ nns_edge_disconnect (self->edge_h);
_copy_srv_info (self, server);
tensor_query_hybrid_free_server_info (server);
- if (_connect_to_server (self)) {
- nns_logi ("Connected to new server. src: %s:%u, sink: %s:%u",
- self->src_host, self->src_port, self->sink_host, self->sink_port);
+ if (nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP, self->srv_host,
+ self->srv_port)) {
+ nns_logi ("Connected to new server: %s:%u.", self->srv_host,
+ self->srv_port);
ret = TRUE;
break;
}
}
/**
+ * @brief Parse caps from received event data.
+ */
+static gchar *
+_nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
+{
+ gchar **strv = g_strsplit (caps_str, "@", -1);
+ gint num = g_strv_length (strv), i;
+ gchar *find_key = NULL;
+ gchar *ret_str = NULL;
+
+ find_key =
+ is_src ==
+ TRUE ? g_strdup ("query_server_src_caps") :
+ g_strdup ("query_server_sink_caps");
+
+ for (i = 1; i < num; i += 2) {
+ if (0 == g_strcmp0 (find_key, strv[i])) {
+ ret_str = g_strdup (strv[i + 1]);
+ break;
+ }
+ }
+
+ g_free (find_key);
+ g_strfreev (strv);
+
+ return ret_str;
+}
+
+/**
+ * @brief nnstreamer-edge event callback.
+ */
+static int
+_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+ nns_edge_event_e event_type;
+ int ret = NNS_EDGE_ERROR_NONE;
+ gchar *caps_str = NULL;
+ GstTensorQueryClient *self = (GstTensorQueryClient *) user_data;
+
+ if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) {
+ nns_loge ("Failed to get event type!");
+ return NNS_EDGE_ERROR_NOT_SUPPORTED;
+ }
+
+ switch (event_type) {
+ case NNS_EDGE_EVENT_CAPABILITY:
+ {
+ GstCaps *server_caps, *client_caps;
+ GstStructure *server_st, *client_st;
+ gboolean result = FALSE;
+ gchar *ret_str;
+
+ nns_edge_event_parse_capability (event_h, &caps_str);
+ ret_str = _nns_edge_parse_caps (caps_str, TRUE);
+ client_caps = gst_caps_from_string ((gchar *) self->in_caps_str);
+ server_caps = gst_caps_from_string (ret_str);
+ g_free (ret_str);
+
+ /** Server framerate may vary. Let's skip comparing the framerate. */
+ gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
+ NULL);
+ gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
+ NULL);
+
+ server_st = gst_caps_get_structure (server_caps, 0);
+ client_st = gst_caps_get_structure (client_caps, 0);
+
+ if (gst_structure_is_tensor_stream (server_st)) {
+ GstTensorsConfig server_config, client_config;
+
+ gst_tensors_config_from_structure (&server_config, server_st);
+ gst_tensors_config_from_structure (&client_config, client_st);
+
+ result = gst_tensors_config_is_equal (&server_config, &client_config);
+ }
+
+ if (result || gst_caps_can_intersect (client_caps, server_caps)) {
+ /** Update client src caps */
+ ret_str = _nns_edge_parse_caps (caps_str, FALSE);
+ if (!gst_tensor_query_client_update_caps (self, ret_str)) {
+ nns_loge ("Failed to update client source caps.");
+ ret = NNS_EDGE_ERROR_UNKNOWN;
+ }
+ g_free (ret_str);
+ } else {
+ /* respond deny with src caps string */
+ nns_loge ("Query caps is not acceptable!");
+ ret = NNS_EDGE_ERROR_UNKNOWN;
+ }
+
+ gst_caps_unref (server_caps);
+ gst_caps_unref (client_caps);
+
+ break;
+ }
+ case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+ {
+ nns_edge_data_h data;
+
+ nns_edge_event_parse_new_data (event_h, &data);
+ g_async_queue_push (self->msg_queue, data);
+ break;
+ }
+ default:
+ break;
+ }
+
+ g_free (caps_str);
+ return ret;
+}
+
+/**
* @brief This function handles sink event.
*/
static gboolean
{
GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
gboolean ret = TRUE;
+ gchar *ip;
GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
query_server_info_s *server;
tensor_query_hybrid_set_broker (&self->hybrid_info,
- self->broker_host, self->broker_port);
+ self->host, self->port);
if (!tensor_query_hybrid_subscribe (&self->hybrid_info,
self->operation)) {
nns_logi ("Query-hybrid feature is disabled.");
nns_logi
("Specify operation to subscribe to the available broker (e.g., operation=object_detection).");
+ g_free (self->srv_host);
+ self->srv_host = g_strdup (self->host);
+ self->srv_port = self->port;
}
g_free (self->in_caps_str);
self->in_caps_str = gst_caps_to_string (caps);
+ nns_edge_set_info (self->edge_h, "CAPS", self->in_caps_str);
+ nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
+
+ ip = _get_ip_address ();
+ nns_edge_set_info (self->edge_h, "IP", ip);
+ nns_edge_set_info (self->edge_h, "PORT", "0");
+ g_free (ip);
+
+ if (0 != nns_edge_start (self->edge_h, false)) {
+ nns_loge
+ ("Failed to start NNStreamer-edge. Please check server IP and port");
+ return FALSE;
+ }
- if (!_connect_to_server (self)) {
- ret = _client_retry_connection (self);
+ if (0 != nns_edge_connect (self->edge_h, NNS_EDGE_PROTOCOL_TCP,
+ self->srv_host, self->srv_port)) {
+ nns_loge ("Failed to connect to edge server!");
+ return FALSE;
}
gst_event_unref (event);
GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
GstBuffer *out_buf = NULL;
GstFlowReturn res = GST_FLOW_OK;
-
+ nns_edge_data_h data_h;
+ guint i, num_mems, num_data;
+ int ret;
+ GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
+ GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
UNUSED (pad);
- if (!tensor_query_send_buffer (self->src_conn, GST_ELEMENT (self), buf)) {
- nns_logw ("Failed to send buffer to server node, retry connection.");
+ ret = nns_edge_data_create (&data_h);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_loge ("Failed to create data handle in client chain.");
+ return GST_FLOW_ERROR;
+ }
+
+ num_mems = gst_buffer_n_memory (buf);
+ for (i = 0; i < num_mems; i++) {
+ mem[i] = gst_buffer_peek_memory (buf, i);
+ if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
+ ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
+ num_mems = i;
+ goto done;
+ }
+ nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
+ }
+ if (0 != nns_edge_request (self->edge_h, data_h)) {
+ nns_logw ("Failed to request to server node, retry connection.");
goto retry;
}
- out_buf = tensor_query_receive_buffer (self->sink_conn);
- if (out_buf) {
+ nns_edge_data_destroy (data_h);
+
+ data_h = g_async_queue_try_pop (self->msg_queue);
+ if (data_h) {
+ out_buf = gst_buffer_new ();
+ if (NNS_EDGE_ERROR_NONE != nns_edge_data_get_count (data_h, &num_data)) {
+ nns_loge ("Failed to get the number of memories of the edge data.");
+ res = GST_FLOW_ERROR;
+ goto done;
+ }
+ for (i = 0; i < num_data; i++) {
+ void *data = NULL;
+ size_t data_len;
+ gpointer new_data;
+
+ nns_edge_data_get (data_h, i, &data, &data_len);
+ new_data = _g_memdup (data, data_len);
+ gst_buffer_append_memory (out_buf,
+ gst_memory_new_wrapped (0, new_data, data_len, 0,
+ data_len, new_data, g_free));
+ }
/* metadata from incoming buffer */
gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
res = gst_pad_push (self->srcpad, out_buf);
- goto done;
}
-
- nns_logw ("Failed to receive result from server node, retry connection.");
+ goto done;
retry:
if (!self->operation || !_client_retry_connection (self)) {
res = GST_FLOW_ERROR;
}
done:
+ if (data_h) {
+ nns_edge_data_destroy (data_h);
+ }
+
+ for (i = 0; i < num_mems; i++)
+ gst_memory_unmap (mem[i], &map[i]);
+
gst_buffer_unref (buf);
return res;
}
#include <gst/gst.h>
#include <gio/gio.h>
#include <tensor_common.h>
-#include "tensor_query_common.h"
#include "tensor_query_hybrid.h"
G_BEGIN_DECLS
gboolean silent; /**< True if logging is minimized */
gchar *in_caps_str;
- TensorQueryProtocol protocol;
-
/* Query-hybrid feature */
gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */
query_hybrid_info_s hybrid_info;
- gchar *broker_host;
- guint16 broker_port;
+ gchar *host;
+ guint16 port;
- /* src information (Connect to query server source) */
- query_connection_handle src_conn;
- gchar *src_host;
- guint16 src_port;
+ gchar *srv_host;
+ guint16 srv_port;
- /* sink socket and information (Connect to query server sink) */
- query_connection_handle sink_conn;
- gchar *sink_host;
- guint16 sink_port;
+ nns_edge_protocol_e protocol;
+ nns_edge_h edge_h;
+ GAsyncQueue *msg_queue;
};
/**
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
-
-#include <gio/gio.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
-#include <errno.h>
-#include <nnstreamer_util.h>
-#include <nnstreamer_log.h>
#include "tensor_query_common.h"
-#include <pthread.h>
-
-#define TENSOR_QUERY_SERVER_DATA_LEN 128
-#define N_BACKLOG 10
-#define CLIENT_ID_LEN sizeof(query_client_id_t)
#ifndef EREMOTEIO
#define EREMOTEIO 121 /* This is Linux-specific. Define this for non-Linux systems */
#endif
/**
- * @brief Query server dependent network data
- */
-typedef struct
-{
- TensorQueryProtocol protocol;
- int8_t is_src;
- char *src_caps_str;
- char *sink_caps_str;
- GAsyncQueue *msg_queue;
- union
- {
- struct
- {
- GSocketListener *socket_listener;
- GCancellable *cancellable;
- GAsyncQueue *conn_queue;
- };
- /* check the size of struct is less */
- uint8_t _dummy[TENSOR_QUERY_SERVER_DATA_LEN];
- };
-} TensorQueryServerData;
-
-/**
- * @brief Structures for tensor query client data.
- */
-typedef struct
-{
- TensorQueryProtocol protocol;
- union
- {
- struct
- {
- GSocket *client_socket;
- GCancellable *cancellable;
- };
- };
-} TensorQueryClientData;
-
-/**
- * @brief Connection info structure
- */
-typedef struct
-{
- TensorQueryProtocol protocol;
- char *host;
- uint16_t port;
- uint32_t timeout;
- query_client_id_t client_id;
- pthread_t msg_thread;
- int8_t running;
-
- /* network info */
- union
- {
- /* TCP */
- struct
- {
- GSocket *socket;
- GCancellable *cancellable;
- };
- };
-} TensorQueryConnection;
-
-/**
- * @brief Structures for tensor query message handling thread data.
- */
-typedef struct
-{
- TensorQueryServerData *sdata;
- TensorQueryConnection *conn;
-} TensorQueryMsgThreadData;
-
-static int
-query_tcp_receive (GSocket * socket, uint8_t * data, size_t size,
- GCancellable * cancellable);
-static gboolean query_tcp_send (GSocket * socket, uint8_t * data, size_t size,
- GCancellable * cancellable);
-static void
-accept_socket_async_cb (GObject * source, GAsyncResult * result,
- gpointer user_data);
-
-/**
- * @brief Internal function to check connection.
- */
-static gboolean
-_query_check_connection (query_connection_handle connection)
-{
- TensorQueryConnection *conn;
- size_t size;
- GIOCondition condition;
-
- conn = (TensorQueryConnection *) connection;
-
- condition = g_socket_condition_check (conn->socket,
- G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
- size = g_socket_get_available_bytes (conn->socket);
-
- if (condition && size <= 0) {
- nns_logw ("Socket is not available, possibly EOS.");
- return FALSE;
- }
-
- return TRUE;
-}
-
-/**
- * @brief Internal function to get client id.
- */
-static query_client_id_t
-_query_get_client_id (query_connection_handle connection)
-{
- TensorQueryConnection *conn = (TensorQueryConnection *) connection;
- return conn->client_id;
-}
-
-/**
- * @brief Get socket address
- */
-static gboolean
-gst_tensor_query_get_saddr (const char *host, uint16_t port,
- GCancellable * cancellable, GSocketAddress ** saddr)
-{
- GError *err = NULL;
- GInetAddress *addr;
-
- /* look up name if we need to */
- addr = g_inet_address_new_from_string (host);
- if (!addr) {
- GList *results;
- GResolver *resolver;
- resolver = g_resolver_get_default ();
- results = g_resolver_lookup_by_name (resolver, host, cancellable, &err);
- if (!results) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- nns_logd ("gst_tensor_query_socket_new: Cancelled name resolval");
- } else {
- nns_loge ("Failed to resolve host '%s': %s", host, err->message);
- }
- g_clear_error (&err);
- g_object_unref (resolver);
- return FALSE;
- }
- /** @todo Try with the second address if the first fails */
- addr = G_INET_ADDRESS (g_object_ref (results->data));
- g_resolver_free_addresses (results);
- g_object_unref (resolver);
- }
-
- *saddr = g_inet_socket_address_new (addr, port);
- g_object_unref (addr);
-
- return TRUE;
-}
-
-/**
- * @brief Create and connect to requested socket.
- */
-static gboolean
-gst_tensor_query_connect (query_connection_handle conn_h)
-{
- GError *err = NULL;
- GSocketAddress *saddr = NULL;
- TensorQueryConnection *conn = (TensorQueryConnection *) conn_h;
- gboolean ret = FALSE;
-
- if (!gst_tensor_query_get_saddr (conn->host, conn->port, conn->cancellable,
- &saddr)) {
- nns_loge ("Failed to get socket address");
- return ret;
- }
-
- /* create sending client socket */
- conn->socket =
- g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
- G_SOCKET_PROTOCOL_TCP, &err);
- /** @todo Support UDP protocol */
-
- if (!conn->socket) {
- nns_loge ("Failed to create new socket");
- goto done;
- }
-
- /* setting TCP_NODELAY to TRUE in order to avoid packet batching as known as Nagle's algorithm */
- if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, TRUE, &err)) {
- nns_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
- goto done;
- }
-
- if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- nns_logd ("Cancelled connecting");
- } else {
- nns_loge ("Failed to connect to host");
- }
- goto done;
- }
-
- /* now connected to the requested socket */
- ret = TRUE;
-
-done:
- g_object_unref (saddr);
- g_clear_error (&err);
- return ret;
-}
-
-/**
- * @brief Set timeout.
- */
-void
-nnstreamer_query_set_timeout (query_connection_handle connection,
- uint32_t timeout)
-{
- TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-
- if (conn->timeout != timeout) {
- conn->timeout = timeout;
-
- switch (conn->protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- g_socket_set_timeout (conn->socket, timeout);
- break;
- default:
- /* NYI */
- nns_loge ("Invalid protocol");
- break;
- }
- }
-}
-
-/**
- * @brief Set client ID.
- */
-void
-nnstreamer_query_set_client_id (query_connection_handle connection,
- query_client_id_t id)
-{
- TensorQueryConnection *conn = (TensorQueryConnection *) connection;
- conn->client_id = id;
-}
-
-/**
- * @brief Connect to the specified address.
- */
-query_connection_handle
-nnstreamer_query_connect (TensorQueryProtocol protocol, const char *ip,
- uint16_t port, uint32_t timeout)
-{
- TensorQueryConnection *conn = g_new0 (TensorQueryConnection, 1);
-
- conn->protocol = protocol;
- conn->host = g_strdup (ip);
- conn->port = port;
- conn->timeout = timeout;
-
- switch (protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- {
- conn->cancellable = g_cancellable_new ();
- if (!gst_tensor_query_connect (conn)) {
- nns_loge ("Failed to create new socket");
- nnstreamer_query_close (conn);
- return NULL;
- }
- break;
- }
- default:
- nns_loge ("Unsupported protocol.");
- nnstreamer_query_close (conn);
- return NULL;
- }
-
- nnstreamer_query_set_timeout (conn, timeout);
- return conn;
-}
-
-/**
- * @brief receive command from connected device.
- * @return 0 if OK, negative value if error
- * @note The socket operates in two modes: blocking and non-blocking.
- * In non-blocking mode, if there is no data available, it is immediately returned.
- */
-int
-nnstreamer_query_receive (query_connection_handle connection,
- TensorQueryCommandData * data)
-{
- TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-
- if (!conn) {
- nns_loge ("Invalid connection data");
- return -EINVAL;
- }
-
- switch (conn->protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- {
- TensorQueryCommand cmd;
-
- if (query_tcp_receive (conn->socket, (uint8_t *) & cmd, sizeof (cmd),
- conn->cancellable) < 0) {
- nns_loge ("Failed to receive command from socket");
- return -EREMOTEIO;
- }
-
- nns_logd ("Received command: %d", cmd);
- data->cmd = cmd;
-
- if (cmd == _TENSOR_QUERY_CMD_TRANSFER_DATA ||
- cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) {
- /* receive size */
- if (query_tcp_receive (conn->socket, (uint8_t *) & data->data.size,
- sizeof (data->data.size), conn->cancellable) < 0) {
- nns_loge ("Failed to receive data size from socket");
- return -EREMOTEIO;
- }
-
- if (cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) {
- data->data.data = (uint8_t *) g_malloc0 (data->data.size);
- }
-
- /* receive data */
- if (query_tcp_receive (conn->socket, (uint8_t *) data->data.data,
- data->data.size, conn->cancellable) < 0) {
- nns_loge ("Failed to receive data from socket");
- return -EREMOTEIO;
- }
- return 0;
- } else if (data->cmd == _TENSOR_QUERY_CMD_CLIENT_ID) {
- /* receive client id */
- if (query_tcp_receive (conn->socket, (uint8_t *) & data->client_id,
- CLIENT_ID_LEN, conn->cancellable) < 0) {
- nns_logd ("Failed to receive client id from socket");
- return -EREMOTEIO;
- }
- } else {
- /* receive data_info */
- if (query_tcp_receive (conn->socket, (uint8_t *) & data->data_info,
- sizeof (TensorQueryDataInfo), conn->cancellable) < 0) {
- nns_logd ("Failed to receive data info from socket");
- return -EREMOTEIO;
- }
- }
- }
- break;
- default:
- /* NYI */
- return -EPROTONOSUPPORT;
- }
- return 0;
-}
-
-/**
- * @brief send command to connected device.
- * @return 0 if OK, negative value if error
- */
-int
-nnstreamer_query_send (query_connection_handle connection,
- TensorQueryCommandData * data)
-{
- TensorQueryConnection *conn = (TensorQueryConnection *) connection;
-
- if (!data) {
- nns_loge ("Sending data is NULL");
- return -EINVAL;
- }
- if (!conn) {
- nns_loge ("Invalid connection data");
- return -EINVAL;
- }
-
- switch (conn->protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- if (!query_tcp_send (conn->socket, (uint8_t *) & data->cmd,
- sizeof (TensorQueryCommand), conn->cancellable)) {
- nns_logd ("Failed to send to socket");
- return -EREMOTEIO;
- }
- if (data->cmd == _TENSOR_QUERY_CMD_TRANSFER_DATA ||
- data->cmd <= _TENSOR_QUERY_CMD_RESPOND_DENY) {
- /* send size */
- if (!query_tcp_send (conn->socket, (uint8_t *) & data->data.size,
- sizeof (data->data.size), conn->cancellable)) {
- nns_logd ("Failed to send size to socket");
- return -EREMOTEIO;
- }
- /* send data */
- if (!query_tcp_send (conn->socket, (uint8_t *) data->data.data,
- data->data.size, conn->cancellable)) {
- nns_logd ("Failed to send data to socket");
- return -EREMOTEIO;
- }
- } else if (data->cmd == _TENSOR_QUERY_CMD_CLIENT_ID) {
- /* send client id */
- if (!query_tcp_send (conn->socket, (uint8_t *) & data->client_id,
- CLIENT_ID_LEN, conn->cancellable)) {
- nns_logd ("Failed to send client id to socket");
- return -EREMOTEIO;
- }
- } else {
- /* send data_info */
- if (!query_tcp_send (conn->socket, (uint8_t *) & data->data_info,
- sizeof (TensorQueryDataInfo), conn->cancellable)) {
- nns_logd ("Failed to send data_info to socket");
- return -EREMOTEIO;
- }
- }
- break;
- default:
- /* NYI */
- return -EPROTONOSUPPORT;
- }
- return 0;
-}
-
-/**
- * @brief free connection
- * @return 0 if OK, negative value if error
- */
-int
-nnstreamer_query_close (query_connection_handle connection)
-{
- TensorQueryConnection *conn = (TensorQueryConnection *) connection;
- GError *err = NULL;
- int ret = 0;
-
- if (!conn) {
- nns_loge ("Invalid connection data");
- return -EINVAL;
- }
- switch (conn->protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- {
- if (conn->socket) {
- if (!g_socket_close (conn->socket, &err)) {
- nns_loge ("Failed to close socket: %s", err->message);
- g_clear_error (&err);
- }
- g_object_unref (conn->socket);
- conn->socket = NULL;
- }
-
- if (conn->cancellable) {
- g_object_unref (conn->cancellable);
- conn->cancellable = NULL;
- }
- break;
- }
- default:
- /* NYI */
- ret = -EPROTONOSUPPORT;
- break;
- }
-
- g_free (conn->host);
- conn->host = NULL;
- g_free (conn);
- return ret;
-}
-
-/**
- * @brief return initialized server handle
- * @return query_server_handle, NULL if error
- */
-query_server_handle
-nnstreamer_query_server_data_new (void)
-{
- TensorQueryServerData *sdata = g_try_new0 (TensorQueryServerData, 1);
- if (!sdata) {
- nns_loge ("Failed to allocate server data");
- return NULL;
- }
-
- return (query_server_handle) sdata;
-}
-
-/**
- * @brief free server handle
- */
-void
-nnstreamer_query_server_data_free (query_server_handle server_data)
-{
- TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
- if (!sdata)
- return;
-
- switch (sdata->protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- {
- TensorQueryConnection *conn_remained;
- GstBuffer *buf_remained;
-
- if (sdata->conn_queue) {
- while ((conn_remained = g_async_queue_try_pop (sdata->conn_queue))) {
- conn_remained->running = 0;
- pthread_join (conn_remained->msg_thread, NULL);
- nnstreamer_query_close (conn_remained);
- }
- g_async_queue_unref (sdata->conn_queue);
- sdata->conn_queue = NULL;
- }
-
- if (sdata->is_src && sdata->msg_queue) {
- while ((buf_remained = g_async_queue_try_pop (sdata->msg_queue))) {
- gst_buffer_unref (buf_remained);
- }
- g_async_queue_unref (sdata->msg_queue);
- sdata->msg_queue = NULL;
- }
-
- if (sdata->socket_listener) {
- g_socket_listener_close (sdata->socket_listener);
- g_object_unref (sdata->socket_listener);
- sdata->socket_listener = NULL;
- }
-
- if (sdata->cancellable) {
- g_object_unref (sdata->cancellable);
- sdata->cancellable = NULL;
- }
- break;
- }
- default:
- /* NYI */
- nns_loge ("Invalid protocol");
- break;
- }
- g_free (sdata->src_caps_str);
- g_free (sdata->sink_caps_str);
- g_free (sdata);
-}
-
-/**
- * @brief set server handle params and setup server
- * @return 0 if OK, negative value if error
- */
-int
-nnstreamer_query_server_init (query_server_handle server_data,
- TensorQueryProtocol protocol, const char *host, uint16_t port,
- int8_t is_src)
-{
- TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
- GSocketAddress *saddr = NULL;
- GError *err = NULL;
- int ret = 0;
-
- if (!sdata)
- return -EINVAL;
-
- sdata->protocol = protocol;
- sdata->is_src = is_src;
-
- switch (protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- sdata->cancellable = g_cancellable_new ();
- if (!gst_tensor_query_get_saddr (host, port, sdata->cancellable, &saddr)) {
- nns_loge ("Failed to get socket address");
- ret = -EADDRNOTAVAIL;
- goto error;
- }
-
- sdata->socket_listener = g_socket_listener_new ();
- g_socket_listener_set_backlog (sdata->socket_listener, N_BACKLOG);
-
- if (!g_socket_listener_add_address (sdata->socket_listener, saddr,
- G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
- nns_loge ("Failed to add address: %s", err->message);
- g_clear_error (&err);
- ret = -EADDRNOTAVAIL;
- goto error;
- }
-
- sdata->conn_queue = g_async_queue_new ();
- if (sdata->is_src)
- sdata->msg_queue = g_async_queue_new ();
-
- g_socket_listener_accept_socket_async (sdata->socket_listener,
- sdata->cancellable, (GAsyncReadyCallback) accept_socket_async_cb,
- sdata);
- break;
- default:
- /* NYI */
- nns_loge ("Invalid protocol");
- ret = -EPROTONOSUPPORT;
- break;
- }
-
-error:
- if (saddr)
- g_object_unref (saddr);
-
- return ret;
-}
-
-/**
- * @brief accept connection from remote
- * @return query_connection_handle including connection data
- */
-query_connection_handle
-nnstreamer_query_server_accept (query_server_handle server_data,
- query_client_id_t client_id)
-{
- TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
- TensorQueryConnection *conn;
- gint total, checked;
-
- if (!sdata)
- return NULL;
-
- switch (sdata->protocol) {
- case _TENSOR_QUERY_PROTOCOL_TCP:
- {
- total = g_async_queue_length (sdata->conn_queue);
- checked = 0;
-
- while (checked < total) {
- conn = g_async_queue_pop (sdata->conn_queue);
- checked++;
-
- if (!_query_check_connection (conn)) {
- nnstreamer_query_close (conn);
- continue;
- }
-
- g_async_queue_push (sdata->conn_queue, conn);
-
- if (client_id == _query_get_client_id (conn))
- return conn;
- }
- break;
- }
- default:
- /* NYI */
- nns_loge ("Invalid protocol");
- break;
- }
-
- return NULL;
-}
-
-/**
- * @brief [TCP] receive data for tcp server
- * @return 0 if OK, negative value if error
- */
-static int
-query_tcp_receive (GSocket * socket, uint8_t * data, size_t size,
- GCancellable * cancellable)
-{
- size_t bytes_received = 0;
- ssize_t rret;
- GError *err = NULL;
-
- while (bytes_received < size) {
- rret = g_socket_receive (socket, (char *) data + bytes_received,
- size - bytes_received, cancellable, &err);
-
- if (rret == 0) {
- nns_logi ("Connection closed");
- return -EREMOTEIO;
- }
-
- if (rret < 0) {
- nns_logi ("Failed to read from socket: %s", err->message);
- g_clear_error (&err);
- return -EREMOTEIO;
- }
- bytes_received += rret;
- }
- return 0;
-}
-
-/**
- * @brief [TCP] send data for tcp server
- */
-static gboolean
-query_tcp_send (GSocket * socket, uint8_t * data, size_t size,
- GCancellable * cancellable)
-{
- size_t bytes_sent = 0;
- ssize_t rret;
- GError *err = NULL;
- while (bytes_sent < size) {
- rret = g_socket_send (socket, (char *) data + bytes_sent,
- size - bytes_sent, cancellable, &err);
- if (rret == 0) {
- nns_logi ("Connection closed");
- return FALSE;
- }
- if (rret < 0) {
- nns_loge ("Error while sending data %s", err->message);
- g_clear_error (&err);
- return FALSE;
- }
- bytes_sent += rret;
- }
- return TRUE;
-}
-
-/**
- * @brief [TCP] Receive buffer from the client
- * @param[in] conn connection info
- */
-static void *
-_message_handler (void *thread_data)
-{
- TensorQueryMsgThreadData *_thread_data =
- (TensorQueryMsgThreadData *) thread_data;
- TensorQueryConnection *_conn = _thread_data->conn;
- TensorQueryServerData *_sdata = _thread_data->sdata;
- TensorQueryCommandData cmd_data_receive;
- TensorQueryCommandData cmd_data_send;
- GstBuffer *outbuf = NULL;
- GstMetaQuery *meta_query;
-
- cmd_data_send.cmd = _TENSOR_QUERY_CMD_CLIENT_ID;
- cmd_data_send.client_id = g_get_monotonic_time ();
-
- if (0 != nnstreamer_query_send (_conn, &cmd_data_send)) {
- nns_loge ("Failed to send client id to client");
- goto done;
- }
- nnstreamer_query_set_client_id (_conn, cmd_data_send.client_id);
-
- if (0 != nnstreamer_query_receive (_conn, &cmd_data_receive)) {
- nns_logi ("Failed to receive cmd");
- goto done;
- }
-
- if (cmd_data_receive.cmd == _TENSOR_QUERY_CMD_REQUEST_INFO) {
- GstCaps *server_caps, *client_caps;
- GstStructure *server_st, *client_st;
- gboolean result = FALSE;
-
- server_caps = gst_caps_from_string (_sdata->src_caps_str);
- client_caps = gst_caps_from_string ((char *) cmd_data_receive.data.data);
- /** Server framerate may vary. Let's skip comparing the framerate. */
- gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
- NULL);
- gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
- NULL);
-
- server_st = gst_caps_get_structure (server_caps, 0);
- client_st = gst_caps_get_structure (client_caps, 0);
-
- if (gst_structure_is_tensor_stream (server_st)) {
- GstTensorsConfig server_config, client_config;
-
- gst_tensors_config_from_structure (&server_config, server_st);
- gst_tensors_config_from_structure (&client_config, client_st);
-
- result = gst_tensors_config_is_equal (&server_config, &client_config);
- }
-
- if (result || gst_caps_can_intersect (client_caps, server_caps)) {
- cmd_data_send.cmd = _TENSOR_QUERY_CMD_RESPOND_APPROVE;
- cmd_data_send.data.data = (uint8_t *) _sdata->sink_caps_str;
- cmd_data_send.data.size = (size_t) strlen (_sdata->sink_caps_str) + 1;
- } else {
- /* respond deny with src caps string */
- nns_loge ("Query caps is not acceptable!");
- nns_loge ("Query client sink caps: %s", cmd_data_receive.data.data);
- nns_loge ("Query server src caps: %s", _sdata->src_caps_str);
-
- cmd_data_send.cmd = _TENSOR_QUERY_CMD_RESPOND_DENY;
- cmd_data_send.data.data = (uint8_t *) _sdata->src_caps_str;
- cmd_data_send.data.size = (size_t) strlen (_sdata->src_caps_str) + 1;
- }
-
- g_free (cmd_data_receive.data.data);
- cmd_data_receive.data.data = NULL;
-
- gst_caps_unref (server_caps);
- gst_caps_unref (client_caps);
-
- if (nnstreamer_query_send (_conn, &cmd_data_send) != 0) {
- nns_logi ("Failed to send respond");
- goto done;
- }
- }
-
- while (_conn->running) {
- if (!_query_check_connection (_conn))
- break;
-
- outbuf = tensor_query_receive_buffer (_conn);
- if (outbuf) {
- meta_query = gst_buffer_add_meta_query (outbuf);
- if (meta_query) {
- meta_query->client_id = _query_get_client_id (_conn);
- }
-
- g_async_queue_push (_sdata->msg_queue, outbuf);
- }
- }
-
-done:
- g_free (thread_data);
- _conn->running = 0;
- return NULL;
-}
-
-/**
- * @brief [TCP] Callback for socket listener that pushes socket to the queue
- */
-static void
-accept_socket_async_cb (GObject * source, GAsyncResult * result,
- gpointer user_data)
-{
- GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
- GSocket *socket = NULL;
- GSocketAddress *saddr = NULL;
- GError *err = NULL;
- TensorQueryServerData *sdata = user_data;
- TensorQueryConnection *conn = NULL;
- TensorQueryCommandData cmd_data;
- gboolean done = FALSE;
-
- socket =
- g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
- &err);
- if (!socket) {
- nns_loge ("Failed to get socket: %s", err->message);
- g_clear_error (&err);
- goto error;
- }
- g_socket_set_timeout (socket, QUERY_DEFAULT_TIMEOUT_SEC);
- /* create socket with connection */
- conn = g_try_new0 (TensorQueryConnection, 1);
- if (!conn) {
- nns_loge ("Failed to allocate connection");
- goto error;
- }
-
- conn->socket = socket;
- conn->cancellable = g_cancellable_new ();
-
- /* setting TCP_NODELAY to TRUE in order to avoid packet batching as known as Nagle's algorithm */
- if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, TRUE, &err)) {
- nns_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
- g_clear_error (&err);
- goto error;
- }
-
- saddr = g_socket_get_remote_address (socket, &err);
- if (!saddr) {
- nns_loge ("Failed to get socket address: %s", err->message);
- g_clear_error (&err);
- goto error;
- }
- conn->protocol = (g_socket_get_protocol (socket) == G_SOCKET_PROTOCOL_TCP) ?
- _TENSOR_QUERY_PROTOCOL_TCP : _TENSOR_QUERY_PROTOCOL_END;
- conn->host = g_inet_address_to_string (g_inet_socket_address_get_address (
- (GInetSocketAddress *) saddr));
- conn->port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr);
- g_object_unref (saddr);
- nns_logi ("New client connected from %s:%u", conn->host, conn->port);
-
- /** Generate and send client_id to client */
- if (sdata->is_src) {
- TensorQueryMsgThreadData *thread_data = NULL;
- pthread_attr_t attr;
- int tid;
-
- thread_data = g_try_new0 (TensorQueryMsgThreadData, 1);
- if (!thread_data) {
- nns_loge ("Failed to allocate query thread data.");
- goto error;
- }
- conn->running = 1;
- thread_data->sdata = sdata;
- thread_data->conn = conn;
-
- pthread_attr_init (&attr);
- pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
- tid = pthread_create (&conn->msg_thread, &attr, _message_handler,
- thread_data);
- pthread_attr_destroy (&attr);
-
- if (tid < 0) {
- nns_loge ("Failed to create message handler thread.");
- nnstreamer_query_close (conn);
- g_free (thread_data);
- return;
- }
- } else { /** server sink */
- if (0 != nnstreamer_query_receive (conn, &cmd_data)) {
- nns_loge ("Failed to receive command.");
- goto error;
- }
- if (cmd_data.cmd == _TENSOR_QUERY_CMD_CLIENT_ID) {
- nns_logd ("Connected client id: %ld", (long) cmd_data.client_id);
- nnstreamer_query_set_client_id (conn, cmd_data.client_id);
- }
- }
-
- done = TRUE;
- g_async_queue_push (sdata->conn_queue, conn);
-
-error:
- if (!done) {
- nnstreamer_query_close (conn);
- }
-
- g_socket_listener_accept_socket_async (socket_listener,
- sdata->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, sdata);
-}
-
-/**
- * @brief set server source and sink caps string.
- */
-void
-nnstreamer_query_server_data_set_caps_str (query_server_handle server_data,
- const char *src_caps_str, const char *sink_caps_str)
-{
- TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
- g_free (sdata->src_caps_str);
- g_free (sdata->sink_caps_str);
- sdata->src_caps_str = g_strdup (src_caps_str);
- sdata->sink_caps_str = g_strdup (sink_caps_str);
-}
-
-/**
- * @brief Get buffer from message queue.
- */
-GstBuffer *
-nnstreamer_query_server_get_buffer (query_server_handle server_data)
-{
- TensorQueryServerData *sdata = (TensorQueryServerData *) server_data;
-
- return GST_BUFFER (g_async_queue_pop (sdata->msg_queue));
-}
-
-/**
- * @brief Send gst-buffer to destination node.
- * @return True if all data in gst-buffer is successfully sent. False if failed to transfer data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-gboolean
-tensor_query_send_buffer (query_connection_handle connection,
- GstElement * element, GstBuffer * buffer)
-{
- TensorQueryCommandData cmd_data = { 0 };
- GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
- GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
- gboolean done = FALSE;
- guint i, num_mems;
-
- num_mems = gst_buffer_n_memory (buffer);
-
- /* start */
- cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_START;
- cmd_data.data_info.base_time = gst_element_get_base_time (element);
- cmd_data.data_info.duration = GST_BUFFER_DURATION (buffer);
- cmd_data.data_info.dts = GST_BUFFER_DTS (buffer);
- cmd_data.data_info.pts = GST_BUFFER_PTS (buffer);
- cmd_data.data_info.num_mems = num_mems;
-
- /* memory chunks in gst-buffer */
- for (i = 0; i < num_mems; i++) {
- mem[i] = gst_buffer_peek_memory (buffer, i);
- if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
- ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
- num_mems = i;
- goto error;
- }
-
- cmd_data.data_info.mem_sizes[i] = map[i].size;
- }
-
- if (nnstreamer_query_send (connection, &cmd_data) != 0) {
- nns_loge ("Failed to send start command.");
- goto error;
- }
-
- /* transfer data */
- cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_DATA;
- for (i = 0; i < num_mems; i++) {
- cmd_data.data.size = map[i].size;
- cmd_data.data.data = map[i].data;
-
- if (nnstreamer_query_send (connection, &cmd_data) != 0) {
- nns_loge ("Failed to send %uth data buffer", i);
- goto error;
- }
- }
-
- /* done */
- cmd_data.cmd = _TENSOR_QUERY_CMD_TRANSFER_END;
- if (nnstreamer_query_send (connection, &cmd_data) != 0) {
- nns_loge ("Failed to send end command.");
- goto error;
- }
-
- done = TRUE;
-
-error:
- for (i = 0; i < num_mems; i++)
- gst_memory_unmap (mem[i], &map[i]);
-
- return done;
-}
-
-/**
- * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer.
- * @return Newly generated gst-buffer. Null if failed to receive data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-GstBuffer *
-tensor_query_receive_buffer (query_connection_handle connection)
-{
- TensorQueryCommandData cmd_data = { 0 };
- GstBuffer *buffer = NULL;
- gboolean done = FALSE;
- gpointer data;
- gsize len;
- guint i, num_mems;
-
- if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
- nns_loge ("Failed to receive start command.");
- goto error;
- }
-
- if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_START) {
- nns_loge ("Invalid command %d, cannot start data transfer.", cmd_data.cmd);
- goto error;
- }
-
- buffer = gst_buffer_new ();
-
- num_mems = cmd_data.data_info.num_mems;
- for (i = 0; i < num_mems; i++) {
- len = cmd_data.data_info.mem_sizes[i];
- data = g_malloc0 (len);
-
- cmd_data.data.data = data;
- cmd_data.data.size = len;
-
- if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
- nns_loge ("Failed to receive %uth data.", i);
- g_free (data);
- goto error;
- }
-
- gst_buffer_append_memory (buffer,
- gst_memory_new_wrapped (0, data, len, 0, len, data, g_free));
- }
-
- /* done */
- if (nnstreamer_query_receive (connection, &cmd_data) != 0) {
- nns_loge ("Failed to receive end command.");
- goto error;
- }
-
- if (cmd_data.cmd != _TENSOR_QUERY_CMD_TRANSFER_END) {
- nns_loge ("Invalid command %d, failed to transfer data.", cmd_data.cmd);
- goto error;
- }
-
- done = TRUE;
-
-error:
- if (!done) {
- if (buffer) {
- gst_buffer_unref (buffer);
- buffer = NULL;
- }
- }
-
- return buffer;
-}
-
-/**
* @brief register GEnumValue array for query protocol property handling
*/
GType
static GType protocol = 0;
if (protocol == 0) {
static GEnumValue protocols[] = {
- {_TENSOR_QUERY_PROTOCOL_TCP, "TCP",
+ {NNS_EDGE_PROTOCOL_TCP, "TCP",
"Raw TCP protocol. Directly sending stream frames via TCP connections."},
- {_TENSOR_QUERY_PROTOCOL_UDP, "UDP",
+ {NNS_EDGE_PROTOCOL_UDP, "UDP",
"Raw UDP protocol. Directly sending stream frames via UDP connections."},
- {_TENSOR_QUERY_PROTOCOL_MQTT, "MQTT", "Connect with MQTT brokers."},
+ {NNS_EDGE_PROTOCOL_MQTT, "MQTT", "Connect with MQTT brokers."},
{0, NULL, NULL},
};
protocol = g_enum_register_static ("tensor_query_protocol", protocols);
#include "tensor_typedef.h"
#include "tensor_common.h"
#include "tensor_meta.h"
+#include "nnstreamer-edge.h"
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
-typedef void * query_connection_handle;
-typedef void * query_server_handle;
-
/**
* @brief Default timeout, in seconds.
*/
/**
* @brief protocol options for tensor query.
*/
-typedef enum
-{
- _TENSOR_QUERY_PROTOCOL_TCP = 0,
- _TENSOR_QUERY_PROTOCOL_UDP = 1,
- _TENSOR_QUERY_PROTOCOL_MQTT = 2,
- _TENSOR_QUERY_PROTOCOL_END
-} TensorQueryProtocol;
#define DEFAULT_HOST "localhost"
-#define DEFAULT_PROTOCOL (_TENSOR_QUERY_PROTOCOL_TCP)
+#define DEFAULT_PROTOCOL (NNS_EDGE_PROTOCOL_TCP)
#define GST_TYPE_QUERY_PROTOCOL (gst_tensor_query_protocol_get_type ())
+
/**
* @brief register GEnumValue array for query protocol property handling
*/
GType
gst_tensor_query_protocol_get_type (void);
-/**
- * @brief Structures for tensor query commands.
- */
-typedef enum
-{
- _TENSOR_QUERY_CMD_REQUEST_INFO = 0,
- _TENSOR_QUERY_CMD_RESPOND_APPROVE = 1,
- _TENSOR_QUERY_CMD_RESPOND_DENY = 2,
- _TENSOR_QUERY_CMD_TRANSFER_START = 3,
- _TENSOR_QUERY_CMD_TRANSFER_DATA = 4,
- _TENSOR_QUERY_CMD_TRANSFER_END = 5,
- _TENSOR_QUERY_CMD_CLIENT_ID = 6,
- _TENSOR_QUERY_CMD_END
-} TensorQueryCommand;
-
-
-/**
- * @brief Structures for tensor query data info.
- */
-typedef struct
-{
- int64_t base_time;
- int64_t sent_time;
- uint64_t duration;
- uint64_t dts;
- uint64_t pts;
- uint32_t num_mems;
- uint64_t mem_sizes[NNS_TENSOR_SIZE_LIMIT];
-} TensorQueryDataInfo;
-
-typedef struct
-{
- uint8_t *data;
- size_t size;
-} TensorQueryData;
-
-/**
- * @brief Structures for tensor query command buffers.
- */
-typedef struct
-{
- TensorQueryCommand cmd;
- union
- {
- TensorQueryDataInfo data_info; /** _TENSOR_QUERY_CMD_REQUEST_INFO */
- TensorQueryData data; /** _TENSOR_QUERY_CMD_TRANSFER_DATA */
- query_client_id_t client_id; /** _TENSOR_QUERY_CMD_CLIENT_ID */
- };
-} TensorQueryCommandData;
-
-/**
- * @brief connect to the specified address.
- * @return 0 if OK, negative value if error
- */
-extern query_connection_handle
-nnstreamer_query_connect (TensorQueryProtocol protocol, const char *ip, uint16_t port, uint32_t timeout);
-
-/**
- * @brief Set timeout.
- * @param timeout the timeout value, in seconds. 0 for none.
- */
-extern void
-nnstreamer_query_set_timeout (query_connection_handle connection, uint32_t timeout);
-
-/**
- * @brief Set client ID.
- */
-extern void
-nnstreamer_query_set_client_id (query_connection_handle connection, query_client_id_t id);
-
-/**
- * @brief send command to connected device.
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_send (query_connection_handle connection, TensorQueryCommandData *data);
-
-/**
- * @brief receive command from connected device.
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_receive (query_connection_handle connection, TensorQueryCommandData *data);
-
-/**
- * @brief close connection with corresponding id.
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_close (query_connection_handle connection);
-
-/* server */
-/**
- * @brief accept connection from remote
- * @return query_connection_handle including connection data
- */
-extern query_connection_handle
-nnstreamer_query_server_accept (query_server_handle server_data, query_client_id_t client_id);
-
-/**
- * @brief return initialized server handle
- * @return query_server_handle, NULL if error
- */
-extern query_server_handle
-nnstreamer_query_server_data_new (void);
-
-/**
- * @brief free server handle
- */
-extern void
-nnstreamer_query_server_data_free (query_server_handle server_data);
-
-/**
- * @brief set server handle params and setup server
- * @return 0 if OK, negative value if error
- */
-extern int
-nnstreamer_query_server_init (query_server_handle server_data,
- TensorQueryProtocol protocol, const char *host, uint16_t port, int8_t is_src);
-
-/**
- * @brief set server source and sink tensors config.
- */
-extern void
-nnstreamer_query_server_data_set_caps_str (query_server_handle server_data,
- const char * src_caps_str, const char * sink_caps_str);
-
-/**
- * @brief Get buffer from message queue.
- */
-extern GstBuffer *
-nnstreamer_query_server_get_buffer (query_server_handle server_data);
-
-/**
- * @brief Send gst-buffer to destination node.
- * @return True if all data in gst-buffer is successfully sent. False if failed to transfer data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-extern gboolean
-tensor_query_send_buffer (query_connection_handle connection,
- GstElement * element, GstBuffer * buffer);
-
-/**
- * @brief Receive data and generate gst-buffer. Caller should handle metadata of returned buffer.
- * @return Newly generated gst-buffer. Null if failed to receive data.
- * @todo This function should be used in nnstreamer element. Update function name rule and params later.
- */
-extern GstBuffer *
-tensor_query_receive_buffer (query_connection_handle connection);
-
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif
#include "tensor_query_server.h"
-#include <nnstreamer_util.h>
#include <tensor_typedef.h>
#include <tensor_common.h>
+#include "nnstreamer-edge.h"
/**
* @brief mutex for tensor-query server table.
static void fini_queryserver (void) __attribute__((destructor));
/**
- * @brief Getter to get nth GstTensorQueryServerInfo.
+ * @brief Get nnstreamer edge server handle.
*/
-query_server_info_handle
-gst_tensor_query_server_get_data (guint id)
+edge_server_handle
+gst_tensor_query_server_get_handle (char *id)
{
- gpointer p;
+ edge_server_handle p;
G_LOCK (query_server_table);
- p = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
+ p = g_hash_table_lookup (_qs_table, id);
G_UNLOCK (query_server_table);
return p;
}
/**
- * @brief Add GstTensorQueryServerInfo into hash table.
+ * @brief Add nnstreamer edge server handle into hash table.
*/
-query_server_info_handle
-gst_tensor_query_server_add_data (guint id)
+edge_server_handle
+gst_tensor_query_server_add_data (char *id)
{
- GstTensorQueryServerInfo *data = NULL;
+ GstTensorQueryServer *data = NULL;
- data = (GstTensorQueryServerInfo *) gst_tensor_query_server_get_data (id);
+ data = gst_tensor_query_server_get_handle (id);
if (NULL != data) {
return data;
}
- data = g_try_new0 (GstTensorQueryServerInfo, 1);
+ data = g_try_new0 (GstTensorQueryServer, 1);
if (NULL == data) {
GST_ERROR ("Failed to allocate memory for tensor query server data.");
return NULL;
g_mutex_init (&data->lock);
g_cond_init (&data->cond);
data->id = id;
- data->sink_host = NULL;
- data->sink_port = 0;
data->configured = FALSE;
- data->sink_caps_str = NULL;
+
+ if (0 != nns_edge_create_handle (id, "TEMP_SERVER_TOPIC", &data->edge_h)) {
+ GST_ERROR ("Failed to get nnstreamer edge handle.");
+ gst_tensor_query_server_remove_data (data);
+ return NULL;
+ }
G_LOCK (query_server_table);
- g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
+ g_hash_table_insert (_qs_table, g_strdup (id), data);
G_UNLOCK (query_server_table);
return data;
}
+
/**
- * @brief Remove GstTensorQueryServerInfo.
+ * @brief Get edge handle from server data.
+ */
+nns_edge_h
+gst_tensor_query_server_get_edge_handle (edge_server_handle server_h)
+{
+ GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
+
+ return data->edge_h;
+}
+
+/**
+ * @brief Remove GstTensorQueryServer.
*/
void
-gst_tensor_query_server_remove_data (query_server_info_handle server_info_h)
+gst_tensor_query_server_remove_data (edge_server_handle server_h)
{
- GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
+ GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
if (NULL == data) {
return;
}
G_LOCK (query_server_table);
- g_hash_table_remove (_qs_table, GUINT_TO_POINTER (data->id));
+ if (!g_hash_table_lookup (_qs_table, data->id))
+ g_hash_table_remove (_qs_table, data->id);
G_UNLOCK (query_server_table);
- g_free (data->sink_host);
- data->sink_host = NULL;
- g_free (data->sink_caps_str);
- data->sink_caps_str = NULL;
+
+ if (data->edge_h) {
+ nns_edge_release_handle (data->edge_h);
+ data->edge_h = NULL;
+ }
+
g_cond_clear (&data->cond);
g_mutex_clear (&data->lock);
g_free (data);
+ data = NULL;
}
/**
* @brief Wait until the sink is configured and get server info handle.
*/
gboolean
-gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h)
+gst_tensor_query_server_wait_sink (edge_server_handle server_h)
{
gint64 end_time;
- GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
+ GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
if (NULL == data) {
return FALSE;
}
/**
- * @brief set sink caps string.
+ * @brief set query server sink configured.
*/
void
-gst_tensor_query_server_set_sink_caps_str (query_server_info_handle
- server_info_h, const gchar * caps_str)
+gst_tensor_query_server_set_configured (edge_server_handle server_h)
{
- GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
-
+ GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
if (NULL == data) {
return;
}
g_mutex_lock (&data->lock);
- if (caps_str) {
- g_free (data->sink_caps_str);
- data->sink_caps_str = g_strdup (caps_str);
- }
data->configured = TRUE;
g_cond_broadcast (&data->cond);
g_mutex_unlock (&data->lock);
}
/**
- * @brief get sink caps string.
- */
-gchar *
-gst_tensor_query_server_get_sink_caps_str (query_server_info_handle
- server_info_h)
-{
- GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
- gchar *caps_str = NULL;
-
- if (NULL == data) {
- return caps_str;
- }
-
- g_mutex_lock (&data->lock);
- caps_str = g_strdup (data->sink_caps_str);
- g_mutex_unlock (&data->lock);
-
- return caps_str;
-}
-
-/**
- * @brief set sink host
- */
-void
-gst_tensor_query_server_set_sink_host (query_server_info_handle server_info_h,
- gchar * host, guint16 port)
-{
- GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
-
- if (NULL == data) {
- return;
- }
-
- g_mutex_lock (&data->lock);
- data->sink_host = g_strdup (host);
- data->sink_port = port;
- g_mutex_unlock (&data->lock);
-}
-
-/**
- * @brief get sink host
- */
-gchar *
-gst_tensor_query_server_get_sink_host (query_server_info_handle server_info_h)
-{
- GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
- gchar *sink_host = NULL;
-
- if (NULL == data) {
- return NULL;
- }
-
- g_mutex_lock (&data->lock);
- sink_host = g_strdup (data->sink_host);
- g_mutex_unlock (&data->lock);
-
- return sink_host;
-}
-
-/**
- * @brief get sink port
- */
-guint16
-gst_tensor_query_server_get_sink_port (query_server_info_handle server_info_h)
-{
- GstTensorQueryServerInfo *data = (GstTensorQueryServerInfo *) server_info_h;
- guint16 sink_port = 0;
-
- if (NULL == data) {
- return sink_port;
- }
-
- g_mutex_lock (&data->lock);
- sink_port = data->sink_port;
- g_mutex_unlock (&data->lock);
-
- return sink_port;
-}
-
-/**
* @brief Initialize the query server.
*/
static void
{
G_LOCK (query_server_table);
g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */
- _qs_table = g_hash_table_new (g_direct_hash, g_direct_equal);
+ _qs_table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
G_UNLOCK (query_server_table);
}
#include <gst/gst.h>
#include <tensor_common.h>
+#include "nnstreamer-edge.h"
+#include "tensor_meta.h"
G_BEGIN_DECLS
#define DEFAULT_SERVER_ID 0
#define DEFAULT_QUERY_INFO_TIMEOUT 5
typedef void * query_server_info_handle;
+typedef void * edge_server_handle;
/**
* @brief GstTensorQueryServer internal info data structure.
*/
typedef struct
{
- gint64 id;
- gchar *sink_caps_str;
- gchar *sink_host;
- guint16 sink_port;
+ char *id;
gboolean configured;
GMutex lock;
GCond cond;
-} GstTensorQueryServerInfo;
+
+ nns_edge_h edge_h;
+} GstTensorQueryServer;
/**
- * @brief Get GstTensorQueryServerInfo.
+ * @brief Get nnstreamer edge server handle.
*/
-query_server_info_handle
-gst_tensor_query_server_get_data (guint id);
+edge_server_handle
+gst_tensor_query_server_get_handle (char *id);
/**
- * @brief Add GstTensorQueryServerInfo.
+ * @brief Add GstTensorQueryServer.
*/
-query_server_info_handle
-gst_tensor_query_server_add_data (guint id);
+edge_server_handle
+gst_tensor_query_server_add_data (char *id);
/**
- * @brief Remove GstTensorQueryServerInfo.
+ * @brief Remove GstTensorQueryServer.
*/
void
-gst_tensor_query_server_remove_data (query_server_info_handle server_info_h);
+gst_tensor_query_server_remove_data (edge_server_handle server_h);
/**
* @brief Wait until the sink is configured and get server info handle.
*/
gboolean
-gst_tensor_query_server_wait_sink (query_server_info_handle server_info_h);
+gst_tensor_query_server_wait_sink (edge_server_handle server_h);
/**
- * @brief set sink caps string.
+ * @brief Get edge handle from server data.
*/
-void gst_tensor_query_server_set_sink_caps_str (query_server_info_handle server_info_h, const gchar * caps_str);
+nns_edge_h
+gst_tensor_query_server_get_edge_handle (edge_server_handle server_h);
/**
- * @brief get sink caps string.
- */
-gchar *
-gst_tensor_query_server_get_sink_caps_str (query_server_info_handle server_info_h);
-
-/**
- * @brief set sink host address and port
+ * @brief set query server sink configured.
*/
void
-gst_tensor_query_server_set_sink_host (query_server_info_handle server_info_h, gchar *host, guint16 port);
-
-/**
- * @brief get sink host
- */
-gchar *
-gst_tensor_query_server_get_sink_host (query_server_info_handle server_info_h);
-
-/**
- * @brief get sink port
- */
-guint16
-gst_tensor_query_server_get_sink_port (query_server_info_handle server_info_h);
+gst_tensor_query_server_set_configured (edge_server_handle server_h);
G_END_DECLS
#include <string.h>
#include "tensor_query_serversink.h"
+#include "nnstreamer_util.h"
GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_serversink_debug);
#define GST_CAT_DEFAULT gst_tensor_query_serversink_debug
enum
{
PROP_0,
- PROP_HOST,
- PROP_PORT,
PROP_PROTOCOL,
PROP_ID,
PROP_TIMEOUT,
static void gst_tensor_query_serversink_finalize (GObject * object);
static gboolean gst_tensor_query_serversink_start (GstBaseSink * bsink);
-static gboolean gst_tensor_query_serversink_stop (GstBaseSink * bsink);
static GstFlowReturn gst_tensor_query_serversink_render (GstBaseSink * bsink,
GstBuffer * buf);
static gboolean gst_tensor_query_serversink_set_caps (GstBaseSink * basesink,
gobject_class->get_property = gst_tensor_query_serversink_get_property;
gobject_class->finalize = gst_tensor_query_serversink_finalize;
- g_object_class_install_property (gobject_class, PROP_HOST,
- g_param_spec_string ("host", "Host", "The hostname to listen as",
- DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_PORT,
- g_param_spec_uint ("port", "Port",
- "The port to listen to (0=random available port)", 0,
- 65535, DEFAULT_PORT_SINK,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol",
"The network protocol to establish connection",
"Samsung Electronics Co., Ltd.");
gstbasesink_class->start = gst_tensor_query_serversink_start;
- gstbasesink_class->stop = gst_tensor_query_serversink_stop;
gstbasesink_class->set_caps = gst_tensor_query_serversink_set_caps;
gstbasesink_class->render = gst_tensor_query_serversink_render;
static void
gst_tensor_query_serversink_init (GstTensorQueryServerSink * sink)
{
- sink->host = g_strdup (DEFAULT_HOST);
- sink->port = DEFAULT_PORT_SINK;
sink->protocol = DEFAULT_PROTOCOL;
sink->timeout = QUERY_DEFAULT_TIMEOUT_SEC;
sink->sink_id = DEFAULT_SERVER_ID;
- sink->server_data = nnstreamer_query_server_data_new ();
sink->metaless_frame_count = 0;
}
gst_tensor_query_serversink_finalize (GObject * object)
{
GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (object);
- g_free (sink->host);
- if (sink->server_data) {
- nnstreamer_query_server_data_free (sink->server_data);
- sink->server_data = NULL;
- }
- if (sink->server_info_h) {
- gst_tensor_query_server_remove_data (sink->server_info_h);
- sink->server_info_h = NULL;
- }
+ gst_tensor_query_server_remove_data (sink->server_h);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
GstTensorQueryServerSink *serversink = GST_TENSOR_QUERY_SERVERSINK (object);
switch (prop_id) {
- case PROP_HOST:
- if (!g_value_get_string (value)) {
- nns_logw ("host property cannot be NULL");
- break;
- }
- g_free (serversink->host);
- serversink->host = g_value_dup_string (value);
- break;
- case PROP_PORT:
- serversink->port = g_value_get_uint (value);
- break;
case PROP_PROTOCOL:
serversink->protocol = g_value_get_enum (value);
break;
GstTensorQueryServerSink *serversink = GST_TENSOR_QUERY_SERVERSINK (object);
switch (prop_id) {
- case PROP_HOST:
- g_value_set_string (value, serversink->host);
- break;
- case PROP_PORT:
- g_value_set_uint (value, serversink->port);
- break;
case PROP_PROTOCOL:
g_value_set_enum (value, serversink->protocol);
break;
GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
GstCaps *caps;
gchar *caps_str = NULL;
+ gchar *id_str = NULL;
- if (!sink->server_data) {
- nns_loge ("Server_data is NULL");
- return FALSE;
- }
-
- if (nnstreamer_query_server_init (sink->server_data, sink->protocol,
- sink->host, sink->port, FALSE) != 0) {
- nns_loge ("Failed to setup server");
- return FALSE;
- }
-
- /** Set server sink information */
- sink->server_info_h = gst_tensor_query_server_add_data (sink->sink_id);
- gst_tensor_query_server_set_sink_host (sink->server_info_h, sink->host,
- sink->port);
+ id_str = g_strdup_printf ("%u", sink->sink_id);
+ sink->server_h = gst_tensor_query_server_add_data (id_str);
+ g_free (id_str);
caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (bsink));
if (!caps) {
if (caps) {
caps_str = gst_caps_to_string (caps);
}
- gst_tensor_query_server_set_sink_caps_str (sink->server_info_h, caps_str);
+ sink->edge_h = gst_tensor_query_server_get_edge_handle (sink->server_h);
+ gst_tensor_query_server_set_configured (sink->server_h);
gst_caps_unref (caps);
g_free (caps_str);
}
/**
- * @brief stop processing of query_serversink
- */
-static gboolean
-gst_tensor_query_serversink_stop (GstBaseSink * bsink)
-{
- GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
- nnstreamer_query_server_data_free (sink->server_data);
- sink->server_data = NULL;
- return TRUE;
-}
-
-/**
* @brief An implementation of the set_caps vmethod in GstBaseSinkClass
*/
static gboolean
gchar *caps_str;
caps_str = gst_caps_to_string (caps);
- gst_tensor_query_server_set_sink_caps_str (sink->server_info_h, caps_str);
+ nns_edge_set_info (sink->edge_h, "CAPS", "@query_server_sink_caps@");
+ nns_edge_set_info (sink->edge_h, "CAPS", caps_str);
g_free (caps_str);
{
GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
GstMetaQuery *meta_query;
- query_connection_handle conn;
+ nns_edge_data_h data_h = NULL;
+ guint i, num_mems = 0;
+ gint ret;
+ GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
+ GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
+ char *val;
meta_query = gst_buffer_get_meta_query (buf);
if (meta_query) {
sink->metaless_frame_count = 0;
- conn = nnstreamer_query_server_accept (sink->server_data,
- meta_query->client_id);
- if (conn) {
- if (!tensor_query_send_buffer (conn, GST_ELEMENT (sink), buf)) {
- nns_logw ("Failed to send buffer to client, drop current buffer.");
+
+ ret = nns_edge_data_create (&data_h);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_loge ("Failed to create data handle in server sink.");
+ return GST_FLOW_ERROR;
+ }
+
+ num_mems = gst_buffer_n_memory (buf);
+ for (i = 0; i < num_mems; i++) {
+ mem[i] = gst_buffer_peek_memory (buf, i);
+ if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
+ ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
+ num_mems = i;
+ goto done;
}
- } else {
- nns_logw ("Cannot get the client connection, drop current buffer.");
+ nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
}
+
+ val = g_strdup_printf ("%ld", (long int) meta_query->client_id);
+ nns_edge_data_set_info (data_h, "client_id", val);
+ g_free (val);
+
+ nns_edge_respond (sink->edge_h, data_h);
+ nns_edge_data_destroy (data_h);
+ data_h = NULL;
} else {
nns_logw ("Cannot get tensor query meta. Drop buffers!\n");
sink->metaless_frame_count++;
return GST_FLOW_ERROR;
}
}
+done:
+ for (i = 0; i < num_mems; i++)
+ gst_memory_unmap (mem[i], &map[i]);
return GST_FLOW_OK;
}
#include <gst/base/gstbasesink.h>
#include <tensor_common.h>
#include <tensor_meta.h>
-#include "tensor_query_common.h"
#include "tensor_query_server.h"
-
+#include "tensor_query_common.h"
G_BEGIN_DECLS
#define GST_TYPE_TENSOR_QUERY_SERVERSINK \
GstBaseSink element; /**< parent object */
guint sink_id;
- guint16 port;
- gchar *host;
- TensorQueryProtocol protocol;
guint timeout;
- query_server_handle server_data;
query_server_info_handle server_info_h;
gint metaless_frame_limit;
gint metaless_frame_count;
+
+ nns_edge_protocol_e protocol;
+ edge_server_handle server_h;
+ nns_edge_h edge_h;
};
/**
#include <tensor_typedef.h>
#include <tensor_common.h>
-#include "tensor_query_common.h"
#include "tensor_query_serversrc.h"
#include "tensor_query_server.h"
+#include "tensor_query_common.h"
+#include "nnstreamer_util.h"
GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_serversrc_debug);
#define GST_CAT_DEFAULT gst_tensor_query_serversrc_debug
PROP_PROTOCOL,
PROP_TIMEOUT,
PROP_OPERATION,
- PROP_BROKER_HOST,
- PROP_BROKER_PORT,
PROP_ID,
PROP_IS_LIVE
};
static void gst_tensor_query_serversrc_finalize (GObject * object);
static gboolean gst_tensor_query_serversrc_start (GstBaseSrc * bsrc);
-static gboolean gst_tensor_query_serversrc_stop (GstBaseSrc * bsrc);
static GstFlowReturn gst_tensor_query_serversrc_create (GstPushSrc * psrc,
GstBuffer ** buf);
3600, QUERY_DEFAULT_TIMEOUT_SEC,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_OPERATION,
- g_param_spec_string ("operation", "Operation",
- "The main operation of the host and option if necessary. "
- "(operation)/(optional topic for main operation).",
+ g_param_spec_string ("topic", "Topic",
+ "The main topic of the host and option if necessary. "
+ "(topic)/(optional topic for main topic).",
"", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BROKER_HOST,
- g_param_spec_string ("broker-host", "Broker Host",
- "Broker host address to connect.", DEFAULT_BROKER_HOST,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_BROKER_PORT,
- g_param_spec_uint ("broker-port", "Broker Port",
- "Broker port to connect.", 0, 65535,
- DEFAULT_BROKER_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_ID,
g_param_spec_uint ("id", "ID",
"ID for distinguishing query servers.", 0,
"Samsung Electronics Co., Ltd.");
gstbasesrc_class->start = gst_tensor_query_serversrc_start;
- gstbasesrc_class->stop = gst_tensor_query_serversrc_stop;
gstpushsrc_class->create = gst_tensor_query_serversrc_create;
GST_DEBUG_CATEGORY_INIT (gst_tensor_query_serversrc_debug,
src->port = DEFAULT_PORT_SRC;
src->protocol = DEFAULT_PROTOCOL;
src->timeout = QUERY_DEFAULT_TIMEOUT_SEC;
- src->operation = NULL;
- src->broker_host = g_strdup (DEFAULT_BROKER_HOST);
- src->broker_port = DEFAULT_BROKER_PORT;
+ src->topic = NULL;
+ src->srv_host = g_strdup (DEFAULT_HOST);
+ src->srv_port = DEFAULT_PORT_SRC;
src->src_id = DEFAULT_SERVER_ID;
tensor_query_hybrid_init (&src->hybrid_info, NULL, 0, TRUE);
- src->server_data = nnstreamer_query_server_data_new ();
src->configured = FALSE;
+ src->msg_queue = g_async_queue_new ();
+
gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
/** set the timestamps on each buffer */
gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE);
gst_tensor_query_serversrc_finalize (GObject * object)
{
GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (object);
+ nns_edge_data_h data_h;
g_free (src->host);
src->host = NULL;
- g_free (src->operation);
- src->operation = NULL;
- g_free (src->broker_host);
- src->broker_host = NULL;
- if (src->server_data) {
- nnstreamer_query_server_data_free (src->server_data);
- src->server_data = NULL;
+ g_free (src->topic);
+ src->topic = NULL;
+ g_free (src->srv_host);
+ src->srv_host = NULL;
+
+ while ((data_h = g_async_queue_try_pop (src->msg_queue))) {
+ nns_edge_data_destroy (data_h);
}
+ g_async_queue_unref (src->msg_queue);
+
G_OBJECT_CLASS (parent_class)->finalize (object);
}
break;
case PROP_OPERATION:
if (!g_value_get_string (value)) {
- nns_logw
- ("operation property cannot be NULL. Query-hybrid is disabled.");
+ nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
break;
}
- g_free (serversrc->operation);
- serversrc->operation = g_value_dup_string (value);
- break;
- case PROP_BROKER_HOST:
- if (!g_value_get_string (value)) {
- nns_logw ("Broker host property cannot be NULL");
- break;
- }
- g_free (serversrc->broker_host);
- serversrc->broker_host = g_value_dup_string (value);
- break;
- case PROP_BROKER_PORT:
- serversrc->broker_port = g_value_get_uint (value);
+ g_free (serversrc->topic);
+ serversrc->topic = g_value_dup_string (value);
break;
case PROP_ID:
serversrc->src_id = g_value_get_uint (value);
g_value_set_uint (value, serversrc->timeout);
break;
case PROP_OPERATION:
- g_value_set_string (value, serversrc->operation);
- break;
- case PROP_BROKER_HOST:
- g_value_set_string (value, serversrc->broker_host);
- break;
- case PROP_BROKER_PORT:
- g_value_set_uint (value, serversrc->broker_port);
+ g_value_set_string (value, serversrc->topic);
break;
case PROP_ID:
g_value_set_uint (value, serversrc->src_id);
}
/**
+ * @brief nnstreamer-edge event callback.
+ */
+static int
+_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+ nns_edge_event_e event_type;
+ int ret = NNS_EDGE_ERROR_NONE;
+
+ GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) user_data;
+ if (0 != nns_edge_event_get_type (event_h, &event_type)) {
+ nns_loge ("Failed to get event type!");
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
+
+ switch (event_type) {
+ case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+ {
+ nns_edge_data_h data;
+
+ nns_edge_event_parse_new_data (event_h, &data);
+ g_async_queue_push (src->msg_queue, data);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+/**
* @brief start processing of query_serversrc, setting up the server
*/
static gboolean
gst_tensor_query_serversrc_start (GstBaseSrc * bsrc)
{
GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
+ char *id_str = NULL;
+ char *port = NULL;
- if (!src->server_data) {
- nns_loge ("Server_data is NULL");
- return FALSE;
- }
+ id_str = g_strdup_printf ("%d", src->src_id);
+ src->server_h = gst_tensor_query_server_add_data (id_str);
+ g_free (id_str);
- if (nnstreamer_query_server_init (src->server_data, src->protocol,
- src->host, src->port, TRUE) != 0) {
- nns_loge ("Failed to setup server");
- return FALSE;
- }
-
- src->server_info_h = gst_tensor_query_server_add_data (src->src_id);
- if (!gst_tensor_query_server_wait_sink (src->server_info_h)) {
- nns_loge ("Failed to get server information from query server.");
- return FALSE;
- }
+ src->edge_h = gst_tensor_query_server_get_edge_handle (src->server_h);
+ nns_edge_set_info (src->edge_h, "IP", src->host);
+ port = g_strdup_printf ("%d", src->port);
+ nns_edge_set_info (src->edge_h, "PORT", port);
+ g_free (port);
/** Publish query sever connection info */
- if (src->operation) {
- tensor_query_hybrid_set_node (&src->hybrid_info, src->host, src->port,
- src->server_info_h);
- tensor_query_hybrid_set_broker (&src->hybrid_info,
- src->broker_host, src->broker_port);
+ if (src->topic) {
+ nns_edge_set_info (src->edge_h, "TOPIC", src->topic);
+ tensor_query_hybrid_set_node (&src->hybrid_info, src->srv_host,
+ src->srv_port, src->server_info_h);
+ tensor_query_hybrid_set_broker (&src->hybrid_info, src->host, src->port);
- if (!tensor_query_hybrid_publish (&src->hybrid_info, src->operation)) {
+ if (!tensor_query_hybrid_publish (&src->hybrid_info, src->topic)) {
nns_loge ("Failed to publish a topic.");
return FALSE;
}
} else {
+ g_free (src->srv_host);
+ src->srv_host = g_strdup (src->host);
+ src->srv_port = src->port;
nns_logi ("Query-hybrid feature is disabled.");
nns_logi
- ("Specify operation to register server to broker (e.g., operation=object_detection/mobilev3).");
+ ("Specify topic to register server to broker (e.g., topic=object_detection/mobilev3).");
+ }
+ nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src);
+
+ if (0 != nns_edge_start (src->edge_h, true)) {
+ nns_loge
+ ("Failed to start NNStreamer-edge. Please check server IP and port");
+ return FALSE;
+ }
+
+ if (!gst_tensor_query_server_wait_sink (src->server_h)) {
+ nns_loge ("Failed to get server information from query server.");
+ return FALSE;
}
+
return TRUE;
}
/**
- * @brief stop processing of query_serversrc
+ * @brief Get buffer from message queue.
*/
-static gboolean
-gst_tensor_query_serversrc_stop (GstBaseSrc * bsrc)
+static GstBuffer *
+_gst_tensor_query_serversrc_get_buffer (GstTensorQueryServerSrc * src)
{
- GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
+ nns_edge_data_h data_h;
+ GstBuffer *buffer = NULL;
+ guint i, num_data;
+ GstMetaQuery *meta_query;
- tensor_query_hybrid_close (&src->hybrid_info);
- nnstreamer_query_server_data_free (src->server_data);
- src->server_data = NULL;
- return TRUE;
+ data_h = g_async_queue_pop (src->msg_queue);
+
+ if (!data_h) {
+ nns_loge ("Failed to get message from the server message queue");
+ return NULL;
+ }
+ buffer = gst_buffer_new ();
+
+ if (NNS_EDGE_ERROR_NONE != nns_edge_data_get_count (data_h, &num_data)) {
+ nns_loge ("Failed to get the number of memories of the edge data.");
+ gst_buffer_unref (buffer);
+ return NULL;
+ }
+ for (i = 0; i < num_data; i++) {
+ void *data = NULL;
+ size_t data_len = 0;
+ gpointer new_data;
+
+ nns_edge_data_get (data_h, i, &data, &data_len);
+ new_data = _g_memdup (data, data_len);
+
+ gst_buffer_append_memory (buffer,
+ gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
+ g_free));
+ }
+
+ if (buffer) {
+ meta_query = gst_buffer_add_meta_query (buffer);
+ if (meta_query) {
+ char *val;
+ int ret;
+
+ ret = nns_edge_data_get_info (data_h, "client_id", &val);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ gst_buffer_unref (buffer);
+ buffer = NULL;
+ } else {
+ meta_query->client_id = g_ascii_strtoll (val, NULL, 10);
+ g_free (val);
+ }
+ }
+ }
+ nns_edge_data_destroy (data_h);
+
+ return buffer;
}
/**
GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (psrc);
GstBaseSrc *bsrc = GST_BASE_SRC (psrc);
- if (!src->server_data) {
- nns_loge ("Server data is NULL");
- return GST_FLOW_ERROR;
- }
-
if (!src->configured) {
- gchar *sink_caps_str, *src_caps_str;
+ gchar *src_caps_str;
GstCaps *caps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (bsrc), NULL);
if (gst_caps_is_fixed (caps)) {
gst_base_src_set_caps (bsrc, caps);
}
- src->server_info_h = gst_tensor_query_server_get_data (src->src_id);
- sink_caps_str =
- gst_tensor_query_server_get_sink_caps_str (src->server_info_h);
src_caps_str = gst_caps_to_string (caps);
- nnstreamer_query_server_data_set_caps_str (src->server_data, src_caps_str,
- sink_caps_str);
-
- g_free (sink_caps_str);
+ nns_edge_set_info (src->edge_h, "CAPS", "@query_server_src_caps@");
+ nns_edge_set_info (src->edge_h, "CAPS", src_caps_str);
g_free (src_caps_str);
gst_caps_unref (caps);
src->configured = TRUE;
}
- *outbuf = nnstreamer_query_server_get_buffer (src->server_data);
+ *outbuf = _gst_tensor_query_serversrc_get_buffer (src);
+ if (!outbuf) {
+ nns_loge ("Failed to get buffer to push to the tensor query serversrc.");
+ return GST_FLOW_ERROR;
+ }
return GST_FLOW_OK;
}
#include <gst/base/gstbasesrc.h>
#include <gst/base/gstpushsrc.h>
-#include "tensor_query_common.h"
-#include <nnstreamer_util.h>
#include <tensor_meta.h>
#include "tensor_query_hybrid.h"
guint16 port;
gchar *host;
- TensorQueryProtocol protocol;
+ guint16 srv_port;
+ gchar *srv_host;
guint timeout;
/* Query-hybrid feature */
- gchar *operation; /**< Main operation such as 'object_detection' or 'image_segmentation' */
+ gchar *topic; /**< Main operation such as 'object_detection' or 'image_segmentation' */
query_hybrid_info_s hybrid_info;
- gchar *broker_host;
- guint16 broker_port;
- query_server_handle server_data; /* server data passed to common functions */
query_server_info_handle server_info_h;
+
+ nns_edge_protocol_e protocol;
+ edge_server_handle server_h;
+ nns_edge_h edge_h;
+ GAsyncQueue *msg_queue;
};
/**
$(NNSTREAMER_GST_HOME)/elements/gsttensor_split.c \
$(NNSTREAMER_GST_HOME)/elements/gsttensor_transform.c \
$(NNSTREAMER_GST_HOME)/tensor_filter/tensor_filter.c \
+
+# tensor-query element with nnstreamer-edge
+NNSTREAMER_QUERY_SRCS := \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_common.c \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_hybrid.c \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_client.c \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversink.c \
$(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_serversrc.c \
- $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_server.c
+ $(NNSTREAMER_GST_HOME)/tensor_query/tensor_query_server.c \
+ $(NNSTREAMER_GST_HOME)/tensor_query/nnstreamer_edge_common.c \
+ $(NNSTREAMER_GST_HOME)/tensor_query/nnstreamer_edge_internal.c
# source AMC (Android MediaCodec)
NNSTREAMER_SOURCE_AMC_SRCS := \
'target': 'aitt',
'project_args': { 'ENABLE_AITT' : 1 }
},
+ 'nnstreamer-edge-support': {
+ 'target': 'nnstreamer-edge',
+ 'project_args': { 'ENABLE_NNSTREAMER_EDGE': 1 }
+ },
'mxnet-support': {
'extra_deps': [ mxnet_dep ]
}
option('tvm-support', type: 'feature', value: 'auto')
option('query-hybrid-support', type: 'feature', value: 'auto')
option('trix-engine-support', type: 'feature', value: 'auto')
+option('nnstreamer-edge-support', type: 'feature', value: 'auto')
option('aitt-support', type: 'feature', value: 'auto')
option('mxnet-support', type: 'feature', value: 'auto')
Requires: nnstreamer-configuration = %{version}-%{release}
Requires: nnstreamer-single = %{version}-%{release}
Recommends: nnstreamer-default-configuration = %{version}-%{release}
+BuildRequires: nnstreamer-edge-devel
## Define build requirements ##
BuildRequires: gstreamer-devel