From 7ea2029d906a0d9d9dab609e57042fbd03f0988e Mon Sep 17 00:00:00 2001 From: gichan Date: Thu, 14 Jul 2022 14:32:02 +0900 Subject: [PATCH] [Edge] Update edge lib Update edge lib - This chagnes will be removed soon. Signed-off-by: gichan --- gst/nnstreamer/tensor_query/meson.build | 2 - ...amer_edge_common.h => nnstreamer-edge-common.h} | 37 +- ..._edge_internal.h => nnstreamer-edge-internal.h} | 2 - .../tensor_query/nnstreamer_edge_common.c | 115 ++- .../tensor_query/nnstreamer_edge_internal.c | 1058 +++++++++++--------- gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c | 4 +- 6 files changed, 691 insertions(+), 527 deletions(-) rename gst/nnstreamer/tensor_query/{nnstreamer_edge_common.h => nnstreamer-edge-common.h} (77%) rename gst/nnstreamer/tensor_query/{nnstreamer_edge_internal.h => nnstreamer-edge-internal.h} (98%) diff --git a/gst/nnstreamer/tensor_query/meson.build b/gst/nnstreamer/tensor_query/meson.build index 755fcbd..1830053 100644 --- a/gst/nnstreamer/tensor_query/meson.build +++ b/gst/nnstreamer/tensor_query/meson.build @@ -26,8 +26,6 @@ nnstreamer_edge_deps = [ glib_dep, gio_dep, thread_dep ] -nnstreamer_headers += join_paths(meson.current_source_dir(), 'nnstreamer-edge.h') - if aitt_support_is_available nnstreamer_edge_sources += join_paths(meson.current_source_dir(), 'nnstreamer_edge_aitt.c') nnstreamer_edge_deps += aitt_support_deps diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_common.h b/gst/nnstreamer/tensor_query/nnstreamer-edge-common.h similarity index 77% rename from gst/nnstreamer/tensor_query/nnstreamer_edge_common.h rename to gst/nnstreamer/tensor_query/nnstreamer-edge-common.h index 8d9bd8f..4125ef4 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_common.h +++ b/gst/nnstreamer/tensor_query/nnstreamer-edge-common.h @@ -1,8 +1,8 @@ -/* SPDX-License-Identifier: LGPL-2.1-only */ +/* SPDX-License-Identifier: Apache-2.0 */ /** * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. * - * @file nnstreamer_edge_common.h + * @file nnstreamer-edge-common.h * @date 6 April 2022 * @brief Common util functions for nnstreamer edge. * @see https://github.com/nnstreamer/nnstreamer @@ -29,14 +29,8 @@ extern "C" { #define UNUSED(expr) do { (void)(expr); } while (0) #endif -/** - * @brief g_memdup() function replaced by g_memdup2() in glib version >= 2.68 - */ -#if GLIB_USE_G_MEMDUP2 -#define _g_memdup g_memdup2 -#else -#define _g_memdup g_memdup -#endif +#define STR_IS_VALID(s) ((s) && (s)[0] != '\0') +#define SAFE_FREE(p) do { if (p) { free (p); (p) = NULL; } } while (0) #define NNS_EDGE_MAGIC 0xfeedfeed #define NNS_EDGE_MAGIC_DEAD 0xdeaddead @@ -88,6 +82,29 @@ typedef struct { #define nns_edge_logf g_error /** + * @brief Free allocated memory. + */ +void nns_edge_free (void *data); + +/** + * @brief Allocate new memory and copy bytes. + * @note Caller should release newly allocated memory using nns_edge_free(). + */ +void *nns_edge_memdup (const void *data, size_t size); + +/** + * @brief Allocate new memory and copy string. + * @note Caller should release newly allocated string using nns_edge_free(). + */ +char *nns_edge_strdup (const char *str); + +/** + * @brief Allocate new memory and print formatted string. + * @note Caller should release newly allocated string using nns_edge_free(). + */ +char *nns_edge_strdup_printf (const char *format, ...); + +/** * @brief Create nnstreamer edge event. * @note This is internal function for edge event. */ diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h b/gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h similarity index 98% rename from gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h rename to gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h index 2d3eabf..1e755cc 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h +++ b/gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h @@ -45,9 +45,7 @@ typedef struct { char *recv_ip; int recv_port; GHashTable *conn_table; - GSocketListener *listener; - GCancellable *cancellable; /* MQTT */ void *mqtt_handle; diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c index 7ea1533..dfd26ef 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_common.c @@ -1,8 +1,8 @@ -/* SPDX-License-Identifier: LGPL-2.1-only */ +/* SPDX-License-Identifier: Apache-2.0 */ /** * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. * - * @file nnstreamer_edge_internal.c + * @file nnstreamer-edge-common.c * @date 6 April 2022 * @brief Common util functions for nnstreamer edge. * @see https://github.com/nnstreamer/nnstreamer @@ -10,7 +10,87 @@ * @bug No known bugs except for NYI items */ -#include "nnstreamer_edge_common.h" +#define _GNU_SOURCE +#include + +#include "nnstreamer-edge-common.h" + +/** + * @brief Free allocated memory. + */ +void +nns_edge_free (void *data) +{ + if (data) + free (data); +} + +/** + * @brief Allocate new memory and copy bytes. + * @note Caller should release newly allocated memory using nns_edge_free(). + */ +void * +nns_edge_memdup (const void *data, size_t size) +{ + void *mem = NULL; + + if (data && size > 0) { + mem = malloc (size); + + if (mem) { + memcpy (mem, data, size); + } else { + nns_edge_loge ("Failed to allocate memory (%zd).", size); + } + } + + return mem; +} + +/** + * @brief Allocate new memory and copy string. + * @note Caller should release newly allocated string using nns_edge_free(). + */ +char * +nns_edge_strdup (const char *str) +{ + char *new_str = NULL; + size_t len; + + if (str) { + len = strlen (str); + + new_str = (char *) malloc (len + 1); + if (new_str) { + memcpy (new_str, str, len); + new_str[len] = '\0'; + } else { + nns_edge_loge ("Failed to allocate memory (%zd).", len + 1); + } + } + + return new_str; +} + +/** + * @brief Allocate new memory and print formatted string. + * @note Caller should release newly allocated string using nns_edge_free(). + */ +char * +nns_edge_strdup_printf (const char *format, ...) +{ + char *new_str = NULL; + va_list args; + int len; + + va_start (args, format); + len = vasprintf (&new_str, format, args); + if (len < 0) + new_str = NULL; + va_end (args); + + return new_str; +} /** * @brief Create nnstreamer edge event. @@ -64,7 +144,7 @@ nns_edge_event_destroy (nns_edge_event_h event_h) if (ee->data.destroy_cb) ee->data.destroy_cb (ee->data.data); - g_free (ee); + SAFE_FREE (ee); return NNS_EDGE_ERROR_NONE; } @@ -180,7 +260,7 @@ nns_edge_event_parse_capability (nns_edge_event_h event_h, char **capability) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - *capability = g_strdup (ee->data.data); + *capability = nns_edge_strdup (ee->data.data); return NNS_EDGE_ERROR_NONE; } @@ -206,8 +286,8 @@ nns_edge_data_create (nns_edge_data_h * data_h) memset (ed, 0, sizeof (nns_edge_data_s)); ed->magic = NNS_EDGE_MAGIC; - ed->info_table = - g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free); + ed->info_table = g_hash_table_new_full (g_str_hash, g_str_equal, + nns_edge_free, nns_edge_free); *data_h = ed; return NNS_EDGE_ERROR_NONE; @@ -238,7 +318,7 @@ nns_edge_data_destroy (nns_edge_data_h data_h) g_hash_table_destroy (ed->info_table); - g_free (ed); + SAFE_FREE (ed); return NNS_EDGE_ERROR_NONE; } @@ -295,14 +375,16 @@ nns_edge_data_copy (nns_edge_data_h data_h, nns_edge_data_h * new_data_h) copied->num = ed->num; for (i = 0; i < ed->num; i++) { - copied->data[i].data = _g_memdup (ed->data[i].data, ed->data[i].data_len); + copied->data[i].data = nns_edge_memdup (ed->data[i].data, + ed->data[i].data_len); copied->data[i].data_len = ed->data[i].data_len; - copied->data[i].destroy_cb = g_free; + copied->data[i].destroy_cb = nns_edge_free; } g_hash_table_iter_init (&iter, ed->info_table); while (g_hash_table_iter_next (&iter, &key, &value)) { - g_hash_table_insert (copied->info_table, g_strdup (key), g_strdup (value)); + g_hash_table_insert (copied->info_table, nns_edge_strdup (key), + nns_edge_strdup (value)); } return NNS_EDGE_ERROR_NONE; @@ -419,17 +501,18 @@ nns_edge_data_set_info (nns_edge_data_h data_h, const char *key, return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (!key || *key == '\0') { + if (!STR_IS_VALID (key)) { nns_edge_loge ("Invalid param, given key is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (!value || *value == '\0') { + if (!STR_IS_VALID (value)) { nns_edge_loge ("Invalid param, given value is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - g_hash_table_insert (ed->info_table, g_strdup (key), g_strdup (value)); + g_hash_table_insert (ed->info_table, nns_edge_strdup (key), + nns_edge_strdup (value)); return NNS_EDGE_ERROR_NONE; } @@ -450,7 +533,7 @@ nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (!key || *key == '\0') { + if (!STR_IS_VALID (key)) { nns_edge_loge ("Invalid param, given key is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -466,7 +549,7 @@ nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - *value = g_strdup (val); + *value = nns_edge_strdup (val); return NNS_EDGE_ERROR_NONE; } diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c index 43b218b..5d76533 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c @@ -1,8 +1,8 @@ -/* SPDX-License-Identifier: LGPL-2.1-only */ +/* SPDX-License-Identifier: Apache-2.0 */ /** * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. * - * @file nnstreamer_edge_internal.c + * @file nnstreamer-edge-internal.c * @date 6 April 2022 * @brief Common library to support communication among devices. * @see https://github.com/nnstreamer/nnstreamer @@ -10,8 +10,8 @@ * @bug No known bugs except for NYI items */ -#include "nnstreamer_edge_common.h" -#include "nnstreamer_edge_internal.h" +#include "nnstreamer-edge-common.h" +#include "nnstreamer-edge-internal.h" #define N_BACKLOG 10 #define DEFAULT_TIMEOUT_SEC 10 @@ -34,6 +34,7 @@ typedef enum */ typedef struct { + unsigned int magic; nns_edge_cmd_e cmd; int64_t client_id; @@ -52,7 +53,7 @@ typedef struct } nns_edge_cmd_s; /** - * @brief Data structure for edge TCP connection. + * @brief Data structure for edge connection. */ typedef struct { @@ -61,7 +62,6 @@ typedef struct int8_t running; pthread_t msg_thread; GSocket *socket; - GCancellable *cancellable; } nns_edge_conn_s; /** @@ -84,18 +84,11 @@ typedef struct nns_edge_conn_s *conn; } nns_edge_thread_data_s; -static bool _nns_edge_close_connection (nns_edge_conn_s * conn); -static int _nns_edge_create_message_thread (nns_edge_handle_s * eh, - nns_edge_conn_s * conn, int64_t client_id); -static int _nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip, - int port); - /** * @brief Send data to connected socket. */ static bool -_send_raw_data (GSocket * socket, void *data, size_t size, - GCancellable * cancellable) +_send_raw_data (GSocket * socket, void *data, size_t size) { size_t bytes_sent = 0; ssize_t rret; @@ -103,7 +96,7 @@ _send_raw_data (GSocket * socket, void *data, size_t size, while (bytes_sent < size) { rret = g_socket_send (socket, (char *) data + bytes_sent, - size - bytes_sent, cancellable, &err); + size - bytes_sent, NULL, &err); if (rret == 0) { nns_edge_loge ("Connection closed."); @@ -123,11 +116,32 @@ _send_raw_data (GSocket * socket, void *data, size_t size, } /** + * @brief Internal function to check connection. + */ +static bool +_nns_edge_check_connection (GSocket * socket) +{ + GIOCondition condition; + + if (!socket || g_socket_is_closed (socket)) + return false; + + condition = g_socket_condition_check (socket, + G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP); + + if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) { + nns_edge_logw ("Socket is not available, possibly closed."); + return false; + } + + return true; +} + +/** * @brief Receive data from connected socket. */ static bool -_receive_raw_data (GSocket * socket, void *data, size_t size, - GCancellable * cancellable) +_receive_raw_data (GSocket * socket, void *data, size_t size) { size_t bytes_received = 0; ssize_t rret; @@ -135,7 +149,7 @@ _receive_raw_data (GSocket * socket, void *data, size_t size, while (bytes_received < size) { rret = g_socket_receive (socket, (char *) data + bytes_received, - size - bytes_received, cancellable, &err); + size - bytes_received, NULL, &err); if (rret == 0) { nns_edge_loge ("Connection closed."); @@ -174,7 +188,38 @@ _parse_host_str (const char *host, char **ip, int *port) static void _get_host_str (const char *ip, const int port, char **host) { - *host = g_strdup_printf ("%s:%d", ip, port); + *host = nns_edge_strdup_printf ("%s:%d", ip, port); +} + +/** + * @brief Get available port number. + */ +static int +_get_available_port (void) +{ + struct sockaddr_in sin; + int port = 0, sock; + socklen_t len = sizeof (struct sockaddr); + + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = INADDR_ANY; + sock = socket (AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + nns_edge_loge ("Failed to get available port. Socket creation failure."); + return 0; + } + sin.sin_port = port; + if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) { + if (getsockname (sock, (struct sockaddr *) &sin, &len) == 0) { + port = ntohs (sin.sin_port); + nns_edge_logi ("Available port number: %d", port); + } else { + nns_edge_logw ("Failed to read local socket info."); + } + } + close (sock); + + return port; } /** @@ -187,6 +232,7 @@ _nns_edge_cmd_init (nns_edge_cmd_s * cmd, nns_edge_cmd_e c, int64_t cid) return; memset (cmd, 0, sizeof (nns_edge_cmd_s)); + cmd->info.magic = NNS_EDGE_MAGIC; cmd->info.cmd = c; cmd->info.client_id = cid; } @@ -202,11 +248,32 @@ _nns_edge_cmd_clear (nns_edge_cmd_s * cmd) if (!cmd) return; + cmd->info.magic = NNS_EDGE_MAGIC_DEAD; + for (i = 0; i < cmd->info.num; i++) { - if (cmd->mem[i]) - free (cmd->mem[i]); - cmd->mem[i] = NULL; + SAFE_FREE (cmd->mem[i]); + } +} + +/** + * @brief Validate edge command. + */ +static bool +_nns_edge_cmd_is_valid (nns_edge_cmd_s * cmd) +{ + int command; + + if (!cmd) + return false; + + command = (int) cmd->info.cmd; + + if (!NNS_EDGE_MAGIC_IS_VALID (&cmd->info) || + (command < 0 || command >= _NNS_EDGE_CMD_END)) { + return false; } + + return true; } /** @@ -217,18 +284,28 @@ _nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) { unsigned int n; - if (!conn || !cmd) + if (!conn) { + nns_edge_loge ("Failed to send command, edge connection is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!_nns_edge_cmd_is_valid (cmd)) { + nns_edge_loge ("Failed to send command, invalid command."); return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!_nns_edge_check_connection (conn->socket)) { + nns_edge_loge ("Failed to send command, socket has error."); + return NNS_EDGE_ERROR_IO; + } - if (!_send_raw_data (conn->socket, &cmd->info, - sizeof (nns_edge_cmd_info_s), conn->cancellable)) { + if (!_send_raw_data (conn->socket, &cmd->info, sizeof (nns_edge_cmd_info_s))) { nns_edge_loge ("Failed to send command to socket."); return NNS_EDGE_ERROR_IO; } for (n = 0; n < cmd->info.num; n++) { - if (!_send_raw_data (conn->socket, cmd->mem[n], - cmd->info.mem_size[n], conn->cancellable)) { + if (!_send_raw_data (conn->socket, cmd->mem[n], cmd->info.mem_size[n])) { nns_edge_loge ("Failed to send %uth memory to socket.", n); return NNS_EDGE_ERROR_IO; } @@ -249,12 +326,22 @@ _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) if (!conn || !cmd) return NNS_EDGE_ERROR_INVALID_PARAMETER; + if (!_nns_edge_check_connection (conn->socket)) { + nns_edge_loge ("Failed to receive command, socket has error."); + return NNS_EDGE_ERROR_IO; + } + if (!_receive_raw_data (conn->socket, &cmd->info, - sizeof (nns_edge_cmd_info_s), conn->cancellable)) { + sizeof (nns_edge_cmd_info_s))) { nns_edge_loge ("Failed to receive command from socket."); return NNS_EDGE_ERROR_IO; } + if (!_nns_edge_cmd_is_valid (cmd)) { + nns_edge_loge ("Failed to receive command, invalid command."); + return NNS_EDGE_ERROR_IO; + } + nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num); if (cmd->info.num >= NNS_EDGE_DATA_LIMIT) { nns_edge_loge ("Invalid request, the max memories for data transfer is %d.", @@ -270,8 +357,7 @@ _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) break; } - if (!_receive_raw_data (conn->socket, cmd->mem[n], - cmd->info.mem_size[n], conn->cancellable)) { + if (!_receive_raw_data (conn->socket, cmd->mem[n], cmd->info.mem_size[n])) { nns_edge_loge ("Failed to receive %uth memory from socket.", n++); ret = NNS_EDGE_ERROR_IO; break; @@ -280,8 +366,7 @@ _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) if (ret != NNS_EDGE_ERROR_NONE) { for (i = 0; i < n; i++) { - free (cmd->mem[i]); - cmd->mem[i] = NULL; + SAFE_FREE (cmd->mem[i]); } } @@ -335,11 +420,44 @@ error: } /** + * @brief Close connection + */ +static bool +_nns_edge_close_connection (nns_edge_conn_s * conn) +{ + if (!conn) + return false; + + if (conn->running && conn->msg_thread) { + conn->running = 0; + pthread_cancel (conn->msg_thread); + pthread_join (conn->msg_thread, NULL); + conn->msg_thread = 0; + } + + if (conn->socket) { + nns_edge_cmd_s cmd; + + /* Send error before closing the socket. */ + nns_edge_logd ("Send error cmd to close connection."); + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0); + _nns_edge_cmd_send (conn, &cmd); + + /* Release socket. If its last reference is dropped, it will close socket automatically. */ + g_clear_object (&conn->socket); + } + + SAFE_FREE (conn->ip); + SAFE_FREE (conn); + return true; +} + +/** * @brief Get nnstreamer-edge connection data. * @note This function should be called with handle lock. */ static nns_edge_conn_data_s * -_nns_edge_get_conn (nns_edge_handle_s * eh, int64_t client_id) +_nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id) { if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { nns_edge_loge ("Invalid param, given edge handle is invalid."); @@ -354,7 +472,7 @@ _nns_edge_get_conn (nns_edge_handle_s * eh, int64_t client_id) * @note This function should be called with handle lock. */ static nns_edge_conn_data_s * -_nns_edge_add_conn (nns_edge_handle_s * eh, int64_t client_id) +_nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id) { nns_edge_conn_data_s *data = NULL; @@ -385,48 +503,24 @@ _nns_edge_add_conn (nns_edge_handle_s * eh, int64_t client_id) * @brief Remove nnstreamer-edge connection data. This will be called when removing connection data from hash table. */ static void -_nns_edge_remove_conn (gpointer data) +_nns_edge_remove_connection (gpointer data) { nns_edge_conn_data_s *cdata = (nns_edge_conn_data_s *) data; if (cdata) { _nns_edge_close_connection (cdata->src_conn); _nns_edge_close_connection (cdata->sink_conn); + cdata->src_conn = cdata->sink_conn = NULL; - g_free (cdata); - } -} - -/** - * @brief Internal function to check connection. - */ -static bool -_nns_edge_check_connection (nns_edge_conn_s * conn) -{ - size_t size; - GIOCondition condition; - - if (!conn) - return false; - - condition = g_socket_condition_check (conn->socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); - size = g_socket_get_available_bytes (conn->socket); - - if (condition && size <= 0) { - nns_edge_logw ("Socket is not available, possibly EOS."); - return false; + SAFE_FREE (cdata); } - - return true; } /** * @brief Get socket address */ static bool -_nns_edge_get_saddr (const char *ip, const int port, - GCancellable * cancellable, GSocketAddress ** saddr) +_nns_edge_get_saddr (const char *ip, const int port, GSocketAddress ** saddr) { GError *err = NULL; GInetAddress *addr; @@ -437,7 +531,7 @@ _nns_edge_get_saddr (const char *ip, const int port, GList *results; GResolver *resolver; resolver = g_resolver_get_default (); - results = g_resolver_lookup_by_name (resolver, ip, cancellable, &err); + results = g_resolver_lookup_by_name (resolver, ip, NULL, &err); if (!results) { if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { nns_edge_loge ("Failed to resolve ip, name resolver is cancelled."); @@ -461,384 +555,148 @@ _nns_edge_get_saddr (const char *ip, const int port, } /** - * @brief Get registered handle. If not registered, create new handle and register it. + * @brief Connect to requested socket. */ -int -nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h) +static bool +_nns_edge_connect_socket (nns_edge_conn_s * conn) { - nns_edge_handle_s *eh; + GError *err = NULL; + GSocketAddress *saddr = NULL; + bool ret = false; - if (!id || *id == '\0') { - nns_edge_loge ("Invalid param, given ID is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; + if (!_nns_edge_get_saddr (conn->ip, conn->port, &saddr)) { + nns_edge_loge ("Failed to get socket address"); + return ret; } - if (!topic || *topic == '\0') { - nns_edge_loge ("Invalid param, given topic is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } + /* create sending client socket */ + conn->socket = + g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, &err); - if (!edge_h) { - nns_edge_loge ("Invalid param, edge_h should not be null."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; + if (!conn->socket) { + nns_edge_loge ("Failed to create new socket"); + goto done; } - /** - * @todo manage edge handles - * 1. consider adding hash table or list to manage edge handles. - * 2. compare topic and return error if existing topic in handle is different. - */ - eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s)); - if (!eh) { - nns_edge_loge ("Failed to allocate memory for edge handle."); - return NNS_EDGE_ERROR_OUT_OF_MEMORY; + /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ + if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { + nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); + goto done; } - memset (eh, 0, sizeof (nns_edge_handle_s)); - nns_edge_lock_init (eh); - eh->magic = NNS_EDGE_MAGIC; - eh->id = g_strdup (id); - eh->topic = g_strdup (topic); - eh->protocol = NNS_EDGE_PROTOCOL_TCP; - eh->is_server = true; - eh->recv_ip = g_strdup ("localhost"); - eh->recv_port = 0; - eh->caps_str = NULL; + if (!g_socket_connect (conn->socket, saddr, NULL, &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + nns_edge_logd ("Cancelled connecting"); + } else { + nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port); + } + goto done; + } - /* Connection data for each client ID. */ - eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, - _nns_edge_remove_conn); + /* now connected to the requested socket */ + ret = true; - *edge_h = eh; - return NNS_EDGE_ERROR_NONE; +done: + g_object_unref (saddr); + g_clear_error (&err); + return ret; } /** - * @brief [TCP] Callback for src socket listener that pushes socket to the queue + * @brief Connect to the destination node. (host:sender(sink) - dest:receiver(listener, src)) */ -static void -accept_socket_async_cb (GObject * source, GAsyncResult * result, - gpointer user_data) +static int +_nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id, + const char *ip, int port) { - GSocketListener *socket_listener = G_SOCKET_LISTENER (source); - GSocket *socket = NULL; - GError *err = NULL; - nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data; nns_edge_conn_s *conn = NULL; + nns_edge_conn_data_s *conn_data; nns_edge_cmd_s cmd; + char *host; bool done = false; - char *connected_ip = NULL; - int connected_port = 0; - nns_edge_conn_data_s *conn_data = NULL; - int64_t client_id; int ret; - socket = - g_socket_listener_accept_socket_finish (socket_listener, result, NULL, - &err); - - if (!socket) { - nns_edge_loge ("Failed to get socket: %s", err->message); - g_clear_error (&err); - goto error; - } - g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC); - - /* create socket with connection */ conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s)); if (!conn) { - nns_edge_loge ("Failed to allocate edge connection"); + nns_edge_loge ("Failed to allocate client data."); goto error; } memset (conn, 0, sizeof (nns_edge_conn_s)); - conn->socket = socket; - conn->cancellable = g_cancellable_new (); + conn->ip = nns_edge_strdup (ip); + conn->port = port; - /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ - if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { - nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); - g_clear_error (&err); + if (!_nns_edge_connect_socket (conn)) { goto error; } - /* Send capability and info to check compatibility. */ - client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id; - - _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id); - cmd.info.num = 1; - cmd.info.mem_size[0] = strlen (eh->caps_str) + 1; - cmd.mem[0] = eh->caps_str; + if (!eh->is_server) { + /* Receive capability and client ID from server. */ + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); + ret = _nns_edge_cmd_receive (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to receive capability."); + goto error; + } - ret = _nns_edge_cmd_send (conn, &cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to send capability."); - goto error; - } + if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) { + nns_edge_loge ("Failed to get capability."); + _nns_edge_cmd_clear (&cmd); + goto error; + } - /* Receive ip and port from destination. */ - ret = _nns_edge_cmd_receive (conn, &cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to receive node info."); - goto error; - } + client_id = eh->client_id = cmd.info.client_id; - if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) { - nns_edge_loge ("Failed to get host info."); + /* Check compatibility. */ + ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY, + cmd.mem[0], cmd.info.mem_size[0], NULL); _nns_edge_cmd_clear (&cmd); - goto error; - } - _parse_host_str (cmd.mem[0], &connected_ip, &connected_port); - _nns_edge_cmd_clear (&cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("The event returns error, capability is not acceptable."); + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); + } else { + /* Send ip and port to destination. */ + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id); - ret = _nns_edge_create_message_thread (eh, conn, client_id); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to create message handle thread."); - goto error; + _get_host_str (eh->recv_ip, eh->recv_port, &host); + cmd.info.num = 1; + cmd.info.mem_size[0] = strlen (host) + 1; + cmd.mem[0] = host; + } + + ret = _nns_edge_cmd_send (conn, &cmd); + _nns_edge_cmd_clear (&cmd); + + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send host info."); + goto error; + } } - conn_data = _nns_edge_add_conn (eh, client_id); + conn_data = _nns_edge_add_connection (eh, client_id); if (conn_data) { /* Close old connection and set new one. */ - _nns_edge_close_connection (conn_data->src_conn); - conn_data->src_conn = conn; + _nns_edge_close_connection (conn_data->sink_conn); + conn_data->sink_conn = conn; done = true; } error: - if (done) { - if (eh->is_server) { - _nns_edge_tcp_connect (eh, connected_ip, connected_port); - } - } else { + if (!done) { _nns_edge_close_connection (conn); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; } - g_socket_listener_accept_socket_async (socket_listener, - eh->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, eh); - - g_free (connected_ip); + return NNS_EDGE_ERROR_NONE; } /** - * @brief Get available port number. + * @brief Message thread, receive buffer from the client. */ -static int -_get_available_port (void) -{ - struct sockaddr_in sin; - int port = 0, sock; - socklen_t len = sizeof (struct sockaddr); - - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = INADDR_ANY; - sock = socket (AF_INET, SOCK_STREAM, 0); - sin.sin_port = port; - if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) { - getsockname (sock, (struct sockaddr *) &sin, &len); - port = ntohs (sin.sin_port); - nns_edge_logi ("Available port number: %d", port); - } - close (sock); - - return port; -} - -/** - * @brief Initialize the nnstreamer edge handle. - */ -int -nns_edge_start (nns_edge_h edge_h, bool is_server) -{ - GSocketAddress *saddr = NULL; - GError *err = NULL; - int ret = 0; - nns_edge_handle_s *eh; - - eh = (nns_edge_handle_s *) edge_h; - if (!eh) { - nns_edge_loge ("Invalid param, given edge handle is null."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - nns_edge_lock (eh); - - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - nns_edge_unlock (eh); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - eh->is_server = is_server; - if (!is_server && 0 == eh->recv_port) - eh->recv_port = _get_available_port (); - - /** Initialize server src data. */ - eh->cancellable = g_cancellable_new (); - eh->listener = g_socket_listener_new (); - g_socket_listener_set_backlog (eh->listener, N_BACKLOG); - - if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, eh->cancellable, - &saddr)) { - nns_edge_loge ("Failed to get socket address"); - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; - goto error; - } - if (!g_socket_listener_add_address (eh->listener, saddr, - G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) { - nns_edge_loge ("Failed to add address: %s", err->message); - g_clear_error (&err); - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; - goto error; - } - g_object_unref (saddr); - saddr = NULL; - - g_socket_listener_accept_socket_async (eh->listener, - eh->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, eh); - -error: - if (saddr) - g_object_unref (saddr); - - nns_edge_unlock (eh); - return ret; -} - -/** - * @brief Release the given handle. - */ -int -nns_edge_release_handle (nns_edge_h edge_h) -{ - nns_edge_handle_s *eh; - - eh = (nns_edge_handle_s *) edge_h; - if (!eh) { - nns_edge_loge ("Invalid param, given edge handle is null."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - nns_edge_lock (eh); - - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - nns_edge_unlock (eh); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - eh->magic = NNS_EDGE_MAGIC_DEAD; - eh->event_cb = NULL; - eh->user_data = NULL; - g_free (eh->id); - g_free (eh->topic); - g_free (eh->ip); - g_free (eh->recv_ip); - g_free (eh->caps_str); - g_hash_table_destroy (eh->conn_table); - - nns_edge_unlock (eh); - nns_edge_lock_destroy (eh); - g_free (eh); - - return NNS_EDGE_ERROR_NONE; -} - -/** - * @brief Set the event callback. - */ -int -nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb, - void *user_data) -{ - nns_edge_handle_s *eh; - int ret; - - eh = (nns_edge_handle_s *) edge_h; - if (!eh) { - nns_edge_loge ("Invalid param, given edge handle is null."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - nns_edge_lock (eh); - - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - nns_edge_unlock (eh); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED, - NULL, 0, NULL); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to set new event callback."); - nns_edge_unlock (eh); - return ret; - } - - eh->event_cb = cb; - eh->user_data = user_data; - - nns_edge_unlock (eh); - return NNS_EDGE_ERROR_NONE; -} - -/** - * @brief Connect to requested socket using TCP. - */ -static bool -_nns_edge_connect_socket (nns_edge_conn_s * conn) -{ - GError *err = NULL; - GSocketAddress *saddr = NULL; - bool ret = false; - - if (!_nns_edge_get_saddr (conn->ip, conn->port, conn->cancellable, &saddr)) { - nns_edge_loge ("Failed to get socket address"); - return ret; - } - - /* create sending client socket */ - conn->socket = - g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, - G_SOCKET_PROTOCOL_TCP, &err); - - if (!conn->socket) { - nns_edge_loge ("Failed to create new socket"); - goto done; - } - - /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ - if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { - nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); - goto done; - } - - if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - nns_edge_logd ("Cancelled connecting"); - } else { - nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port); - } - goto done; - } - - /* now connected to the requested socket */ - ret = true; - -done: - g_object_unref (saddr); - g_clear_error (&err); - return ret; -} - -/** - * @brief [TCP] Receive buffer from the client - * @param[in] conn connection info - */ -static void * -_message_handler (void *thread_data) +static void * +_nns_edge_message_handler (void *thread_data) { nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data; nns_edge_handle_s *eh; @@ -856,7 +714,10 @@ _message_handler (void *thread_data) eh = (nns_edge_handle_s *) _tdata->eh; conn = _tdata->conn; client_id = _tdata->client_id; - g_free (_tdata); + SAFE_FREE (_tdata); + + /* Start message thread, add ref to clearly make the socket live. */ + conn->running = 1; while (conn->running) { nns_edge_data_h data_h; @@ -868,10 +729,8 @@ _message_handler (void *thread_data) break; } - if (!_nns_edge_check_connection (conn)) - break; - /** Receive data from the client */ + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); ret = _nns_edge_cmd_receive (conn, &cmd); if (ret != NNS_EDGE_ERROR_NONE) { nns_edge_loge ("Failed to receive data from the connected node."); @@ -896,16 +755,16 @@ _message_handler (void *thread_data) } /* Set client ID in edge data */ - val = g_strdup_printf ("%ld", (long int) client_id); + val = nns_edge_strdup_printf ("%ld", (long int) client_id); nns_edge_data_set_info (data_h, "client_id", val); - g_free (val); + SAFE_FREE (val); for (i = 0; i < cmd.info.num; i++) { nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL); } ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED, - data_h, sizeof (data_h), NULL); + data_h, sizeof (nns_edge_data_h), NULL); if (ret != NNS_EDGE_ERROR_NONE) { /* Try to get next request if server does not accept data from client. */ nns_edge_logw ("The server does not accept data from client."); @@ -939,20 +798,20 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, /** Create message receving thread */ pthread_attr_init (&attr); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); - conn->running = 1; + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE); + thread_data->eh = eh; thread_data->conn = conn; thread_data->client_id = client_id; - tid = - pthread_create (&conn->msg_thread, &attr, _message_handler, thread_data); + tid = pthread_create (&conn->msg_thread, &attr, _nns_edge_message_handler, + thread_data); pthread_attr_destroy (&attr); if (tid < 0) { nns_edge_loge ("Failed to create message handler thread."); conn->running = 0; - g_free (thread_data); + SAFE_FREE (thread_data); return NNS_EDGE_ERROR_IO; } @@ -960,101 +819,187 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, } /** - * @brief Connect to the destination node usig TCP. + * @brief Callback for socket listener, accept socket and create message thread. */ -static int -_nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip, int port) +static void +_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result, + gpointer user_data) { + GSocketListener *socket_listener = G_SOCKET_LISTENER (source); + GSocket *socket = NULL; + GError *err = NULL; + nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data; nns_edge_conn_s *conn = NULL; - nns_edge_conn_data_s *conn_data; nns_edge_cmd_s cmd; - char *host; - int64_t client_id; bool done = false; + char *connected_ip = NULL; + int connected_port = 0; + nns_edge_conn_data_s *conn_data = NULL; + int64_t client_id; int ret; + socket = + g_socket_listener_accept_socket_finish (socket_listener, result, NULL, + &err); + + if (!socket) { + nns_edge_loge ("Failed to get socket: %s", err->message); + g_clear_error (&err); + return; + } + g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC); + + /* create socket with connection */ conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s)); if (!conn) { - nns_edge_loge ("Failed to allocate client data."); + nns_edge_loge ("Failed to allocate edge connection"); goto error; } memset (conn, 0, sizeof (nns_edge_conn_s)); - conn->ip = g_strdup (ip); - conn->port = port; - conn->cancellable = g_cancellable_new (); + conn->socket = socket; - if (!_nns_edge_connect_socket (conn)) { + /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ + if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { + nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); + g_clear_error (&err); goto error; } - /* Get destination capability. */ - ret = _nns_edge_cmd_receive (conn, &cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to receive capability."); - goto error; - } + client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id; - if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) { - nns_edge_loge ("Failed to get capability."); - _nns_edge_cmd_clear (&cmd); - goto error; - } + /* Send capability and info to check compatibility. */ + if (eh->is_server) { + if (!STR_IS_VALID (eh->caps_str)) { + nns_edge_loge ("Cannot accept socket, invalid capability."); + goto error; + } - client_id = eh->client_id = cmd.info.client_id; + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id); + cmd.info.num = 1; + cmd.info.mem_size[0] = strlen (eh->caps_str) + 1; + cmd.mem[0] = eh->caps_str; - /* Check compatibility. */ - ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY, - cmd.mem[0], cmd.info.mem_size[0], NULL); - _nns_edge_cmd_clear (&cmd); + ret = _nns_edge_cmd_send (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send capability."); + goto error; + } - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("The event returns error, capability is not acceptable."); - _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); - } else { - /* Send ip and port to destination. */ - _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id); + /* Receive ip and port from destination. */ + ret = _nns_edge_cmd_receive (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to receive node info."); + goto error; + } - _get_host_str (eh->recv_ip, eh->recv_port, &host); - cmd.info.num = 1; - cmd.info.mem_size[0] = strlen (host) + 1; - cmd.mem[0] = host; - } + if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) { + nns_edge_loge ("Failed to get host info."); + _nns_edge_cmd_clear (&cmd); + goto error; + } + + _parse_host_str (cmd.mem[0], &connected_ip, &connected_port); + _nns_edge_cmd_clear (&cmd); - ret = _nns_edge_cmd_send (conn, &cmd); - _nns_edge_cmd_clear (&cmd); + /* Connect to client listener. */ + ret = _nns_edge_connect_to (eh, client_id, connected_ip, connected_port); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to connect host %s:%d.", + connected_ip, connected_port); + goto error; + } + } + ret = _nns_edge_create_message_thread (eh, conn, client_id); if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to send host info."); + nns_edge_loge ("Failed to create message handle thread."); goto error; } - conn_data = _nns_edge_add_conn (eh, client_id); + conn_data = _nns_edge_add_connection (eh, client_id); if (conn_data) { /* Close old connection and set new one. */ - _nns_edge_close_connection (conn_data->sink_conn); - conn_data->sink_conn = conn; + _nns_edge_close_connection (conn_data->src_conn); + conn_data->src_conn = conn; done = true; } error: if (!done) { _nns_edge_close_connection (conn); - return NNS_EDGE_ERROR_CONNECTION_FAILURE; } + if (eh->listener) + g_socket_listener_accept_socket_async (eh->listener, NULL, + (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh); + + SAFE_FREE (connected_ip); +} + +/** + * @brief Get registered handle. If not registered, create new handle and register it. + */ +int +nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h) +{ + nns_edge_handle_s *eh; + + if (!STR_IS_VALID (id)) { + nns_edge_loge ("Invalid param, given ID is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!STR_IS_VALID (topic)) { + nns_edge_loge ("Invalid param, given topic is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!edge_h) { + nns_edge_loge ("Invalid param, edge_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /** + * @todo manage edge handles + * 1. consider adding hash table or list to manage edge handles. + * 2. compare topic and return error if existing topic in handle is different. + */ + eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s)); + if (!eh) { + nns_edge_loge ("Failed to allocate memory for edge handle."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + memset (eh, 0, sizeof (nns_edge_handle_s)); + nns_edge_lock_init (eh); + eh->magic = NNS_EDGE_MAGIC; + eh->id = nns_edge_strdup (id); + eh->topic = nns_edge_strdup (topic); + eh->protocol = NNS_EDGE_PROTOCOL_TCP; + eh->is_server = true; + eh->recv_ip = nns_edge_strdup ("localhost"); + eh->recv_port = 0; + eh->caps_str = NULL; + + /* Connection data for each client ID. */ + eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + _nns_edge_remove_connection); + + *edge_h = eh; return NNS_EDGE_ERROR_NONE; } /** - * @brief Connect to the destination node. + * @brief Start the nnstreamer edge. */ int -nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, - const char *ip, int port) +nns_edge_start (nns_edge_h edge_h, bool is_server) { + GSocketAddress *saddr = NULL; + GError *err = NULL; + int ret = 0; nns_edge_handle_s *eh; - int ret; eh = (nns_edge_handle_s *) edge_h; if (!eh) { @@ -1062,8 +1007,63 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (!ip || *ip == '\0') { - nns_edge_loge ("Invalid param, given IP is invalid."); + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + eh->is_server = is_server; + if (!is_server && 0 == eh->recv_port) { + eh->recv_port = _get_available_port (); + if (eh->recv_port <= 0) { + nns_edge_loge ("Failed to start edge. Cannot get available port."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + } + + /** Initialize server src data. */ + eh->listener = g_socket_listener_new (); + g_socket_listener_set_backlog (eh->listener, N_BACKLOG); + + if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, &saddr)) { + nns_edge_loge ("Failed to get socket address"); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + if (!g_socket_listener_add_address (eh->listener, saddr, + G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) { + nns_edge_loge ("Failed to add address: %s", err->message); + g_clear_error (&err); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + g_socket_listener_accept_socket_async (eh->listener, NULL, + (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh); + +error: + if (saddr) + g_object_unref (saddr); + + nns_edge_unlock (eh); + return ret; +} + +/** + * @brief Release the given handle. + */ +int +nns_edge_release_handle (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -1075,59 +1075,113 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (!eh->event_cb) { - nns_edge_loge ("NNStreamer-edge event callback is not registered."); - nns_edge_unlock (eh); - return NNS_EDGE_ERROR_CONNECTION_FAILURE; + eh->magic = NNS_EDGE_MAGIC_DEAD; + eh->event_cb = NULL; + eh->user_data = NULL; + + if (eh->listener) + g_clear_object (&eh->listener); + + g_hash_table_destroy (eh->conn_table); + eh->conn_table = NULL; + + SAFE_FREE (eh->id); + SAFE_FREE (eh->topic); + SAFE_FREE (eh->ip); + SAFE_FREE (eh->recv_ip); + SAFE_FREE (eh->caps_str); + + nns_edge_unlock (eh); + nns_edge_lock_destroy (eh); + SAFE_FREE (eh); + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Set the event callback. + */ +int +nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb, + void *user_data) +{ + nns_edge_handle_s *eh; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; } - eh->is_server = false; - eh->protocol = protocol; + nns_edge_lock (eh); - /** Connect to info channel. */ - ret = _nns_edge_tcp_connect (eh, ip, port); + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED, + NULL, 0, NULL); if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to connect to %s:%d", ip, port); + nns_edge_loge ("Failed to set new event callback."); + nns_edge_unlock (eh); + return ret; } + eh->event_cb = cb; + eh->user_data = user_data; + nns_edge_unlock (eh); - return ret; + return NNS_EDGE_ERROR_NONE; } /** - * @brief Close connection + * @brief Connect to the destination node. */ -static bool -_nns_edge_close_connection (nns_edge_conn_s * conn) +int +nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, + const char *ip, int port) { - GError *err = NULL; + nns_edge_handle_s *eh; + int ret; - if (!conn) - return false; + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } - if (conn->running) { - conn->running = 0; - pthread_join (conn->msg_thread, NULL); + if (!STR_IS_VALID (ip)) { + nns_edge_loge ("Invalid param, given IP is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (conn->socket) { - if (!g_socket_close (conn->socket, &err)) { - nns_edge_loge ("Failed to close socket: %s", err->message); - g_clear_error (&err); - return false; - } - g_object_unref (conn->socket); - conn->socket = NULL; + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (conn->cancellable) { - g_object_unref (conn->cancellable); - conn->cancellable = NULL; + if (!eh->event_cb) { + nns_edge_loge ("NNStreamer-edge event callback is not registered."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; } - g_free (conn->ip); - g_free (conn); - return true; + eh->protocol = protocol; + + /** Connect to info channel. */ + ret = _nns_edge_connect_to (eh, eh->client_id, ip, port); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to connect to %s:%d", ip, port); + } + + nns_edge_unlock (eh); + return ret; } /** @@ -1222,8 +1276,8 @@ nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - conn_data = _nns_edge_get_conn (eh, eh->client_id); - if (!_nns_edge_check_connection (conn_data->sink_conn)) { + conn_data = _nns_edge_get_connection (eh, eh->client_id); + if (!_nns_edge_check_connection (conn_data->sink_conn->socket)) { nns_edge_loge ("Failed to request, connection failure."); nns_edge_unlock (eh); return NNS_EDGE_ERROR_CONNECTION_FAILURE; @@ -1237,6 +1291,8 @@ nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h) } ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) + nns_edge_loge ("Failed to request, cannot send edge data."); nns_edge_unlock (eh); return ret; @@ -1331,7 +1387,7 @@ nns_edge_get_topic (nns_edge_h edge_h, char **topic) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - *topic = g_strdup (eh->topic); + *topic = nns_edge_strdup (eh->topic); nns_edge_unlock (eh); return NNS_EDGE_ERROR_NONE; @@ -1352,6 +1408,16 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) return NNS_EDGE_ERROR_INVALID_PARAMETER; } + if (!STR_IS_VALID (key)) { + nns_edge_loge ("Invalid param, given key is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!STR_IS_VALID (value)) { + nns_edge_loge ("Invalid param, given value is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + nns_edge_lock (eh); if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { @@ -1365,17 +1431,17 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) * @todo Change key-value set as json or hash table. */ if (0 == g_ascii_strcasecmp (key, "CAPS")) { - ret_str = g_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value); - g_free (eh->caps_str); + ret_str = nns_edge_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value); + SAFE_FREE (eh->caps_str); eh->caps_str = ret_str; } else if (0 == g_ascii_strcasecmp (key, "IP")) { - g_free (eh->recv_ip); - eh->recv_ip = g_strdup (value); + SAFE_FREE (eh->recv_ip); + eh->recv_ip = nns_edge_strdup (value); } else if (0 == g_ascii_strcasecmp (key, "PORT")) { eh->recv_port = g_ascii_strtoll (value, NULL, 10); } else if (0 == g_ascii_strcasecmp (key, "TOPIC")) { - g_free (eh->topic); - eh->topic = g_strdup (value); + SAFE_FREE (eh->topic); + eh->topic = nns_edge_strdup (value); } else { nns_edge_logw ("Failed to set edge info. Unknown key: %s", key); } @@ -1425,9 +1491,9 @@ nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h) } client_id = g_ascii_strtoll (val, NULL, 10); - g_free (val); + SAFE_FREE (val); - conn_data = _nns_edge_get_conn (eh, client_id); + conn_data = _nns_edge_get_connection (eh, client_id); if (!conn_data) { nns_edge_loge ("Cannot find connection, invalid client ID."); nns_edge_unlock (eh); @@ -1442,6 +1508,8 @@ nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h) } ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) + nns_edge_loge ("Failed to respond, cannot send edge data."); nns_edge_unlock (eh); return ret; diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c index 3dc49dd..69a7c49 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c @@ -16,8 +16,8 @@ #include #include -#include "nnstreamer_edge_common.h" -#include "nnstreamer_edge_internal.h" +#include "nnstreamer-edge-common.h" +#include "nnstreamer-edge-internal.h" /** * @brief Callback function to be called when the connection is lost. -- 2.7.4