From a0e66cacff8cee9d2e18dd2ac9bc655d19e449a7 Mon Sep 17 00:00:00 2001 From: gichan Date: Fri, 22 Jul 2022 09:47:54 +0900 Subject: [PATCH] [Edge] Sync with nnst-edge repo latest Sync with nnstreamer-edge repo latest. Signed-off-by: gichan --- .../tensor_query/nnstreamer-edge-common.h | 9 ++ .../tensor_query/nnstreamer-edge-internal.h | 14 +-- .../tensor_query/nnstreamer_edge_common.c | 33 ++++++ .../tensor_query/nnstreamer_edge_internal.c | 125 ++++++++------------- 4 files changed, 96 insertions(+), 85 deletions(-) diff --git a/gst/nnstreamer/tensor_query/nnstreamer-edge-common.h b/gst/nnstreamer/tensor_query/nnstreamer-edge-common.h index 4125ef4..24a98a4 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer-edge-common.h +++ b/gst/nnstreamer/tensor_query/nnstreamer-edge-common.h @@ -15,7 +15,11 @@ #define __NNSTREAMER_EDGE_COMMON_H__ #include /** @todo remove glib */ +#include +#include +#include #include +#include #include "nnstreamer-edge.h" #ifdef __cplusplus @@ -82,6 +86,11 @@ typedef struct { #define nns_edge_logf g_error /** + * @brief Internal util function to get available port number. + */ +int nns_edge_get_available_port (void); + +/** * @brief Free allocated memory. */ void nns_edge_free (void *data); diff --git a/gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h b/gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h index 1e755cc..55a3fd6 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h +++ b/gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h @@ -1,8 +1,8 @@ -/* SPDX-License-Identifier: LGPL-2.1-only */ +/* SPDX-License-Identifier: Apache-2.0 */ /** * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. * - * @file nnstreamer_edge_internal.h + * @file nnstreamer-edge-internal.h * @date 11 May 2022 * @brief Internal functions to support communication among devices. * @see https://github.com/nnstreamer/nnstreamer @@ -19,9 +19,8 @@ extern "C" { #endif /* __cplusplus */ #include "nnstreamer-edge.h" +#include "nnstreamer-edge-common.h" #include -#include -#include /** * @brief Data structure for edge handle. @@ -32,8 +31,8 @@ typedef struct { char *id; char *topic; nns_edge_protocol_e protocol; - char *ip; - int port; + char *ip; /**< host IP */ + int port; /**< host port */ /* Edge event callback and user data */ nns_edge_event_cb event_cb; @@ -42,8 +41,7 @@ typedef struct { bool is_server; int64_t client_id; char *caps_str; - char *recv_ip; - int recv_port; + GHashTable *conn_table; GSocketListener *listener; diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c index 26fa39e..b6ca905 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c @@ -16,6 +16,39 @@ #include "nnstreamer-edge-common.h" /** + * @brief Internal util function to get available port number. + */ +int +nns_edge_get_available_port (void) +{ + struct sockaddr_in sin; + int port = 0, sock; + socklen_t len = sizeof (struct sockaddr); + + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = INADDR_ANY; + sin.sin_port = 0; + + sock = socket (AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + nns_edge_loge ("Failed to get available port, socket creation failure."); + return 0; + } + + if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) { + if (getsockname (sock, (struct sockaddr *) &sin, &len) == 0) { + port = ntohs (sin.sin_port); + nns_edge_logi ("Available port number: %d", port); + } else { + nns_edge_logw ("Failed to read local socket info."); + } + } + close (sock); + + return port; +} + +/** * @brief Free allocated memory. */ void diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c index 89df5f3..2732eb5 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c @@ -116,28 +116,6 @@ _send_raw_data (GSocket * socket, void *data, size_t size) } /** - * @brief Internal function to check connection. - */ -static bool -_nns_edge_check_connection (GSocket * socket) -{ - GIOCondition condition; - - if (!socket || g_socket_is_closed (socket)) - return false; - - condition = g_socket_condition_check (socket, - G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP); - - if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) { - nns_edge_logw ("Socket is not available, possibly closed."); - return false; - } - - return true; -} - -/** * @brief Receive data from connected socket. */ static bool @@ -192,34 +170,25 @@ _get_host_str (const char *ip, const int port, char **host) } /** - * @brief Get available port number. + * @brief Internal function to check connection. */ -static int -_get_available_port (void) +static bool +_nns_edge_check_connection (nns_edge_conn_s * conn) { - struct sockaddr_in sin; - int port = 0, sock; - socklen_t len = sizeof (struct sockaddr); - - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = INADDR_ANY; - sock = socket (AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - nns_edge_loge ("Failed to get available port. Socket creation failure."); - return 0; - } - sin.sin_port = port; - if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) { - if (getsockname (sock, (struct sockaddr *) &sin, &len) == 0) { - port = ntohs (sin.sin_port); - nns_edge_logi ("Available port number: %d", port); - } else { - nns_edge_logw ("Failed to read local socket info."); - } + GIOCondition condition; + + if (!conn || !conn->socket || g_socket_is_closed (conn->socket)) + return false; + + condition = g_socket_condition_check (conn->socket, + G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP); + + if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) { + nns_edge_logw ("Socket is not available, possibly closed."); + return false; } - close (sock); - return port; + return true; } /** @@ -294,7 +263,7 @@ _nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (!_nns_edge_check_connection (conn->socket)) { + if (!_nns_edge_check_connection (conn)) { nns_edge_loge ("Failed to send command, socket has error."); return NNS_EDGE_ERROR_IO; } @@ -326,7 +295,7 @@ _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) if (!conn || !cmd) return NNS_EDGE_ERROR_INVALID_PARAMETER; - if (!_nns_edge_check_connection (conn->socket)) { + if (!_nns_edge_check_connection (conn)) { nns_edge_loge ("Failed to receive command, socket has error."); return NNS_EDGE_ERROR_IO; } @@ -428,7 +397,8 @@ _nns_edge_close_connection (nns_edge_conn_s * conn) if (!conn) return false; - if (conn->running && conn->msg_thread) { + /* Stop and clear the message thread. */ + if (conn->msg_thread) { conn->running = 0; pthread_cancel (conn->msg_thread); pthread_join (conn->msg_thread, NULL); @@ -443,7 +413,10 @@ _nns_edge_close_connection (nns_edge_conn_s * conn) _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0); _nns_edge_cmd_send (conn, &cmd); - /* Release socket. If its last reference is dropped, it will close socket automatically. */ + /** + * Close and release the socket. + * Using GSocket, if its last reference is dropped, it will close socket automatically. + */ g_clear_object (&conn->socket); } @@ -660,7 +633,7 @@ _nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id, /* Send ip and port to destination. */ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id); - _get_host_str (eh->recv_ip, eh->recv_port, &host); + _get_host_str (eh->ip, eh->port, &host); cmd.info.num = 1; cmd.info.mem_size[0] = strlen (host) + 1; cmd.mem[0] = host; @@ -716,9 +689,7 @@ _nns_edge_message_handler (void *thread_data) client_id = _tdata->client_id; SAFE_FREE (_tdata); - /* Start message thread, add ref to clearly make the socket live. */ conn->running = 1; - while (conn->running) { nns_edge_data_h data_h; unsigned int i; @@ -811,6 +782,7 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, if (tid < 0) { nns_edge_loge ("Failed to create message handler thread."); conn->running = 0; + conn->msg_thread = 0; SAFE_FREE (thread_data); return NNS_EDGE_ERROR_IO; } @@ -977,9 +949,9 @@ nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h) eh->id = nns_edge_strdup (id); eh->topic = nns_edge_strdup (topic); eh->protocol = NNS_EDGE_PROTOCOL_TCP; - eh->is_server = true; - eh->recv_ip = nns_edge_strdup ("localhost"); - eh->recv_port = 0; + eh->is_server = false; + eh->ip = nns_edge_strdup ("localhost"); + eh->port = 0; eh->caps_str = NULL; /* Connection data for each client ID. */ @@ -1016,9 +988,9 @@ nns_edge_start (nns_edge_h edge_h, bool is_server) } eh->is_server = is_server; - if (!is_server && 0 == eh->recv_port) { - eh->recv_port = _get_available_port (); - if (eh->recv_port <= 0) { + if (!is_server && 0 == eh->port) { + eh->port = nns_edge_get_available_port (); + if (eh->port <= 0) { nns_edge_loge ("Failed to start edge. Cannot get available port."); nns_edge_unlock (eh); return NNS_EDGE_ERROR_CONNECTION_FAILURE; @@ -1029,7 +1001,7 @@ nns_edge_start (nns_edge_h edge_h, bool is_server) eh->listener = g_socket_listener_new (); g_socket_listener_set_backlog (eh->listener, N_BACKLOG); - if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, &saddr)) { + if (!_nns_edge_get_saddr (eh->ip, eh->port, &saddr)) { nns_edge_loge ("Failed to get socket address"); ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; goto error; @@ -1088,7 +1060,6 @@ nns_edge_release_handle (nns_edge_h edge_h) SAFE_FREE (eh->id); SAFE_FREE (eh->topic); SAFE_FREE (eh->ip); - SAFE_FREE (eh->recv_ip); SAFE_FREE (eh->caps_str); nns_edge_unlock (eh); @@ -1277,7 +1248,7 @@ nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h) } conn_data = _nns_edge_get_connection (eh, eh->client_id); - if (!_nns_edge_check_connection (conn_data->sink_conn->socket)) { + if (!_nns_edge_check_connection (conn_data->sink_conn)) { nns_edge_loge ("Failed to request, connection failure."); nns_edge_unlock (eh); return NNS_EDGE_ERROR_CONNECTION_FAILURE; @@ -1429,15 +1400,15 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) * @todo User handles (replace or append) the capability of edge handle. * @todo Change key-value set as json or hash table. */ - if (0 == g_ascii_strcasecmp (key, "CAPS")) { + if (0 == strcasecmp (key, "CAPS") || 0 == strcasecmp (key, "CAPABILITY")) { SAFE_FREE (eh->caps_str); - eh->caps_str = g_strdup (value); - } else if (0 == g_ascii_strcasecmp (key, "IP")) { - SAFE_FREE (eh->recv_ip); - eh->recv_ip = nns_edge_strdup (value); - } else if (0 == g_ascii_strcasecmp (key, "PORT")) { - eh->recv_port = g_ascii_strtoll (value, NULL, 10); - } else if (0 == g_ascii_strcasecmp (key, "TOPIC")) { + eh->caps_str = nns_edge_strdup (value); + } else if (0 == strcasecmp (key, "IP")) { + SAFE_FREE (eh->ip); + eh->ip = nns_edge_strdup (value); + } else if (0 == strcasecmp (key, "PORT")) { + eh->port = g_ascii_strtoll (value, NULL, 10); + } else if (0 == strcasecmp (key, "TOPIC")) { SAFE_FREE (eh->topic); eh->topic = nns_edge_strdup (value); } else { @@ -1468,7 +1439,7 @@ nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value) } if (!value) { - nns_edge_loge ("Invalid param, given value is invalid."); + nns_edge_loge ("Invalid param, value should not be null."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -1484,13 +1455,13 @@ nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value) * @todo User handles (replace or append) the capability of edge handle. * @todo Change key-value set as json or hash table. */ - if (0 == g_ascii_strcasecmp (key, "CAPS")) { + if (0 == strcasecmp (key, "CAPS") || 0 == strcasecmp (key, "CAPABILITY")) { *value = nns_edge_strdup (eh->caps_str); - } else if (0 == g_ascii_strcasecmp (key, "IP")) { - *value = nns_edge_strdup (eh->recv_ip); - } else if (0 == g_ascii_strcasecmp (key, "PORT")) { - *value = nns_edge_strdup_printf ("%d", eh->recv_port); - } else if (0 == g_ascii_strcasecmp (key, "TOPIC")) { + } else if (0 == strcasecmp (key, "IP")) { + *value = nns_edge_strdup (eh->ip); + } else if (0 == strcasecmp (key, "PORT")) { + *value = nns_edge_strdup_printf ("%d", eh->port); + } else if (0 == strcasecmp (key, "TOPIC")) { *value = nns_edge_strdup (eh->topic); } else { nns_edge_logw ("Failed to get edge info. Unknown key: %s", key); -- 2.7.4