-/* SPDX-License-Identifier: LGPL-2.1-only */
+/* SPDX-License-Identifier: Apache-2.0 */
/**
* Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
*
- * @file nnstreamer_edge_internal.c
+ * @file nnstreamer-edge-internal.c
* @date 6 April 2022
* @brief Common library to support communication among devices.
* @see https://github.com/nnstreamer/nnstreamer
* @bug No known bugs except for NYI items
*/
-#include "nnstreamer_edge_common.h"
-#include "nnstreamer_edge_internal.h"
+#include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-internal.h"
#define N_BACKLOG 10
#define DEFAULT_TIMEOUT_SEC 10
*/
typedef struct
{
+ unsigned int magic;
nns_edge_cmd_e cmd;
int64_t client_id;
} nns_edge_cmd_s;
/**
- * @brief Data structure for edge TCP connection.
+ * @brief Data structure for edge connection.
*/
typedef struct
{
int8_t running;
pthread_t msg_thread;
GSocket *socket;
- GCancellable *cancellable;
} nns_edge_conn_s;
/**
nns_edge_conn_s *conn;
} nns_edge_thread_data_s;
-static bool _nns_edge_close_connection (nns_edge_conn_s * conn);
-static int _nns_edge_create_message_thread (nns_edge_handle_s * eh,
- nns_edge_conn_s * conn, int64_t client_id);
-static int _nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip,
- int port);
-
/**
* @brief Send data to connected socket.
*/
static bool
-_send_raw_data (GSocket * socket, void *data, size_t size,
- GCancellable * cancellable)
+_send_raw_data (GSocket * socket, void *data, size_t size)
{
size_t bytes_sent = 0;
ssize_t rret;
while (bytes_sent < size) {
rret = g_socket_send (socket, (char *) data + bytes_sent,
- size - bytes_sent, cancellable, &err);
+ size - bytes_sent, NULL, &err);
if (rret == 0) {
nns_edge_loge ("Connection closed.");
}
/**
+ * @brief Internal function to check connection.
+ */
+static bool
+_nns_edge_check_connection (GSocket * socket)
+{
+ GIOCondition condition;
+
+ if (!socket || g_socket_is_closed (socket))
+ return false;
+
+ condition = g_socket_condition_check (socket,
+ G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+ if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) {
+ nns_edge_logw ("Socket is not available, possibly closed.");
+ return false;
+ }
+
+ return true;
+}
+
+/**
* @brief Receive data from connected socket.
*/
static bool
-_receive_raw_data (GSocket * socket, void *data, size_t size,
- GCancellable * cancellable)
+_receive_raw_data (GSocket * socket, void *data, size_t size)
{
size_t bytes_received = 0;
ssize_t rret;
while (bytes_received < size) {
rret = g_socket_receive (socket, (char *) data + bytes_received,
- size - bytes_received, cancellable, &err);
+ size - bytes_received, NULL, &err);
if (rret == 0) {
nns_edge_loge ("Connection closed.");
static void
_get_host_str (const char *ip, const int port, char **host)
{
- *host = g_strdup_printf ("%s:%d", ip, port);
+ *host = nns_edge_strdup_printf ("%s:%d", ip, port);
+}
+
+/**
+ * @brief Get available port number.
+ */
+static int
+_get_available_port (void)
+{
+ struct sockaddr_in sin;
+ int port = 0, sock;
+ socklen_t len = sizeof (struct sockaddr);
+
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = INADDR_ANY;
+ sock = socket (AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) {
+ nns_edge_loge ("Failed to get available port. Socket creation failure.");
+ return 0;
+ }
+ sin.sin_port = port;
+ if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) {
+ if (getsockname (sock, (struct sockaddr *) &sin, &len) == 0) {
+ port = ntohs (sin.sin_port);
+ nns_edge_logi ("Available port number: %d", port);
+ } else {
+ nns_edge_logw ("Failed to read local socket info.");
+ }
+ }
+ close (sock);
+
+ return port;
}
/**
return;
memset (cmd, 0, sizeof (nns_edge_cmd_s));
+ cmd->info.magic = NNS_EDGE_MAGIC;
cmd->info.cmd = c;
cmd->info.client_id = cid;
}
if (!cmd)
return;
+ cmd->info.magic = NNS_EDGE_MAGIC_DEAD;
+
for (i = 0; i < cmd->info.num; i++) {
- if (cmd->mem[i])
- free (cmd->mem[i]);
- cmd->mem[i] = NULL;
+ SAFE_FREE (cmd->mem[i]);
+ }
+}
+
+/**
+ * @brief Validate edge command.
+ */
+static bool
+_nns_edge_cmd_is_valid (nns_edge_cmd_s * cmd)
+{
+ int command;
+
+ if (!cmd)
+ return false;
+
+ command = (int) cmd->info.cmd;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (&cmd->info) ||
+ (command < 0 || command >= _NNS_EDGE_CMD_END)) {
+ return false;
}
+
+ return true;
}
/**
{
unsigned int n;
- if (!conn || !cmd)
+ if (!conn) {
+ nns_edge_loge ("Failed to send command, edge connection is null.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!_nns_edge_cmd_is_valid (cmd)) {
+ nns_edge_loge ("Failed to send command, invalid command.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!_nns_edge_check_connection (conn->socket)) {
+ nns_edge_loge ("Failed to send command, socket has error.");
+ return NNS_EDGE_ERROR_IO;
+ }
- if (!_send_raw_data (conn->socket, &cmd->info,
- sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+ if (!_send_raw_data (conn->socket, &cmd->info, sizeof (nns_edge_cmd_info_s))) {
nns_edge_loge ("Failed to send command to socket.");
return NNS_EDGE_ERROR_IO;
}
for (n = 0; n < cmd->info.num; n++) {
- if (!_send_raw_data (conn->socket, cmd->mem[n],
- cmd->info.mem_size[n], conn->cancellable)) {
+ if (!_send_raw_data (conn->socket, cmd->mem[n], cmd->info.mem_size[n])) {
nns_edge_loge ("Failed to send %uth memory to socket.", n);
return NNS_EDGE_ERROR_IO;
}
if (!conn || !cmd)
return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ if (!_nns_edge_check_connection (conn->socket)) {
+ nns_edge_loge ("Failed to receive command, socket has error.");
+ return NNS_EDGE_ERROR_IO;
+ }
+
if (!_receive_raw_data (conn->socket, &cmd->info,
- sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+ sizeof (nns_edge_cmd_info_s))) {
nns_edge_loge ("Failed to receive command from socket.");
return NNS_EDGE_ERROR_IO;
}
+ if (!_nns_edge_cmd_is_valid (cmd)) {
+ nns_edge_loge ("Failed to receive command, invalid command.");
+ return NNS_EDGE_ERROR_IO;
+ }
+
nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num);
if (cmd->info.num >= NNS_EDGE_DATA_LIMIT) {
nns_edge_loge ("Invalid request, the max memories for data transfer is %d.",
break;
}
- if (!_receive_raw_data (conn->socket, cmd->mem[n],
- cmd->info.mem_size[n], conn->cancellable)) {
+ if (!_receive_raw_data (conn->socket, cmd->mem[n], cmd->info.mem_size[n])) {
nns_edge_loge ("Failed to receive %uth memory from socket.", n++);
ret = NNS_EDGE_ERROR_IO;
break;
if (ret != NNS_EDGE_ERROR_NONE) {
for (i = 0; i < n; i++) {
- free (cmd->mem[i]);
- cmd->mem[i] = NULL;
+ SAFE_FREE (cmd->mem[i]);
}
}
}
/**
+ * @brief Close connection
+ */
+static bool
+_nns_edge_close_connection (nns_edge_conn_s * conn)
+{
+ if (!conn)
+ return false;
+
+ if (conn->running && conn->msg_thread) {
+ conn->running = 0;
+ pthread_cancel (conn->msg_thread);
+ pthread_join (conn->msg_thread, NULL);
+ conn->msg_thread = 0;
+ }
+
+ if (conn->socket) {
+ nns_edge_cmd_s cmd;
+
+ /* Send error before closing the socket. */
+ nns_edge_logd ("Send error cmd to close connection.");
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0);
+ _nns_edge_cmd_send (conn, &cmd);
+
+ /* Release socket. If its last reference is dropped, it will close socket automatically. */
+ g_clear_object (&conn->socket);
+ }
+
+ SAFE_FREE (conn->ip);
+ SAFE_FREE (conn);
+ return true;
+}
+
+/**
* @brief Get nnstreamer-edge connection data.
* @note This function should be called with handle lock.
*/
static nns_edge_conn_data_s *
-_nns_edge_get_conn (nns_edge_handle_s * eh, int64_t client_id)
+_nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id)
{
if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
* @note This function should be called with handle lock.
*/
static nns_edge_conn_data_s *
-_nns_edge_add_conn (nns_edge_handle_s * eh, int64_t client_id)
+_nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id)
{
nns_edge_conn_data_s *data = NULL;
* @brief Remove nnstreamer-edge connection data. This will be called when removing connection data from hash table.
*/
static void
-_nns_edge_remove_conn (gpointer data)
+_nns_edge_remove_connection (gpointer data)
{
nns_edge_conn_data_s *cdata = (nns_edge_conn_data_s *) data;
if (cdata) {
_nns_edge_close_connection (cdata->src_conn);
_nns_edge_close_connection (cdata->sink_conn);
+ cdata->src_conn = cdata->sink_conn = NULL;
- g_free (cdata);
- }
-}
-
-/**
- * @brief Internal function to check connection.
- */
-static bool
-_nns_edge_check_connection (nns_edge_conn_s * conn)
-{
- size_t size;
- GIOCondition condition;
-
- if (!conn)
- return false;
-
- condition = g_socket_condition_check (conn->socket,
- G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
- size = g_socket_get_available_bytes (conn->socket);
-
- if (condition && size <= 0) {
- nns_edge_logw ("Socket is not available, possibly EOS.");
- return false;
+ SAFE_FREE (cdata);
}
-
- return true;
}
/**
* @brief Get socket address
*/
static bool
-_nns_edge_get_saddr (const char *ip, const int port,
- GCancellable * cancellable, GSocketAddress ** saddr)
+_nns_edge_get_saddr (const char *ip, const int port, GSocketAddress ** saddr)
{
GError *err = NULL;
GInetAddress *addr;
GList *results;
GResolver *resolver;
resolver = g_resolver_get_default ();
- results = g_resolver_lookup_by_name (resolver, ip, cancellable, &err);
+ results = g_resolver_lookup_by_name (resolver, ip, NULL, &err);
if (!results) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
nns_edge_loge ("Failed to resolve ip, name resolver is cancelled.");
}
/**
- * @brief Get registered handle. If not registered, create new handle and register it.
+ * @brief Connect to requested socket.
*/
-int
-nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
+static bool
+_nns_edge_connect_socket (nns_edge_conn_s * conn)
{
- nns_edge_handle_s *eh;
+ GError *err = NULL;
+ GSocketAddress *saddr = NULL;
+ bool ret = false;
- if (!id || *id == '\0') {
- nns_edge_loge ("Invalid param, given ID is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ if (!_nns_edge_get_saddr (conn->ip, conn->port, &saddr)) {
+ nns_edge_loge ("Failed to get socket address");
+ return ret;
}
- if (!topic || *topic == '\0') {
- nns_edge_loge ("Invalid param, given topic is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
+ /* create sending client socket */
+ conn->socket =
+ g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
+ G_SOCKET_PROTOCOL_TCP, &err);
- if (!edge_h) {
- nns_edge_loge ("Invalid param, edge_h should not be null.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ if (!conn->socket) {
+ nns_edge_loge ("Failed to create new socket");
+ goto done;
}
- /**
- * @todo manage edge handles
- * 1. consider adding hash table or list to manage edge handles.
- * 2. compare topic and return error if existing topic in handle is different.
- */
- eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s));
- if (!eh) {
- nns_edge_loge ("Failed to allocate memory for edge handle.");
- return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+ /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+ if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
+ nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
+ goto done;
}
- memset (eh, 0, sizeof (nns_edge_handle_s));
- nns_edge_lock_init (eh);
- eh->magic = NNS_EDGE_MAGIC;
- eh->id = g_strdup (id);
- eh->topic = g_strdup (topic);
- eh->protocol = NNS_EDGE_PROTOCOL_TCP;
- eh->is_server = true;
- eh->recv_ip = g_strdup ("localhost");
- eh->recv_port = 0;
- eh->caps_str = NULL;
+ if (!g_socket_connect (conn->socket, saddr, NULL, &err)) {
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ nns_edge_logd ("Cancelled connecting");
+ } else {
+ nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port);
+ }
+ goto done;
+ }
- /* Connection data for each client ID. */
- eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
- _nns_edge_remove_conn);
+ /* now connected to the requested socket */
+ ret = true;
- *edge_h = eh;
- return NNS_EDGE_ERROR_NONE;
+done:
+ g_object_unref (saddr);
+ g_clear_error (&err);
+ return ret;
}
/**
- * @brief [TCP] Callback for src socket listener that pushes socket to the queue
+ * @brief Connect to the destination node. (host:sender(sink) - dest:receiver(listener, src))
*/
-static void
-accept_socket_async_cb (GObject * source, GAsyncResult * result,
- gpointer user_data)
+static int
+_nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id,
+ const char *ip, int port)
{
- GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
- GSocket *socket = NULL;
- GError *err = NULL;
- nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
nns_edge_conn_s *conn = NULL;
+ nns_edge_conn_data_s *conn_data;
nns_edge_cmd_s cmd;
+ char *host;
bool done = false;
- char *connected_ip = NULL;
- int connected_port = 0;
- nns_edge_conn_data_s *conn_data = NULL;
- int64_t client_id;
int ret;
- socket =
- g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
- &err);
-
- if (!socket) {
- nns_edge_loge ("Failed to get socket: %s", err->message);
- g_clear_error (&err);
- goto error;
- }
- g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC);
-
- /* create socket with connection */
conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s));
if (!conn) {
- nns_edge_loge ("Failed to allocate edge connection");
+ nns_edge_loge ("Failed to allocate client data.");
goto error;
}
memset (conn, 0, sizeof (nns_edge_conn_s));
- conn->socket = socket;
- conn->cancellable = g_cancellable_new ();
+ conn->ip = nns_edge_strdup (ip);
+ conn->port = port;
- /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
- if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
- nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
- g_clear_error (&err);
+ if (!_nns_edge_connect_socket (conn)) {
goto error;
}
- /* Send capability and info to check compatibility. */
- client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id;
-
- _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
- cmd.info.num = 1;
- cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
- cmd.mem[0] = eh->caps_str;
+ if (!eh->is_server) {
+ /* Receive capability and client ID from server. */
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+ ret = _nns_edge_cmd_receive (conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to receive capability.");
+ goto error;
+ }
- ret = _nns_edge_cmd_send (conn, &cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to send capability.");
- goto error;
- }
+ if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
+ nns_edge_loge ("Failed to get capability.");
+ _nns_edge_cmd_clear (&cmd);
+ goto error;
+ }
- /* Receive ip and port from destination. */
- ret = _nns_edge_cmd_receive (conn, &cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to receive node info.");
- goto error;
- }
+ client_id = eh->client_id = cmd.info.client_id;
- if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
- nns_edge_loge ("Failed to get host info.");
+ /* Check compatibility. */
+ ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
+ cmd.mem[0], cmd.info.mem_size[0], NULL);
_nns_edge_cmd_clear (&cmd);
- goto error;
- }
- _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
- _nns_edge_cmd_clear (&cmd);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("The event returns error, capability is not acceptable.");
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+ } else {
+ /* Send ip and port to destination. */
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
- ret = _nns_edge_create_message_thread (eh, conn, client_id);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to create message handle thread.");
- goto error;
+ _get_host_str (eh->recv_ip, eh->recv_port, &host);
+ cmd.info.num = 1;
+ cmd.info.mem_size[0] = strlen (host) + 1;
+ cmd.mem[0] = host;
+ }
+
+ ret = _nns_edge_cmd_send (conn, &cmd);
+ _nns_edge_cmd_clear (&cmd);
+
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to send host info.");
+ goto error;
+ }
}
- conn_data = _nns_edge_add_conn (eh, client_id);
+ conn_data = _nns_edge_add_connection (eh, client_id);
if (conn_data) {
/* Close old connection and set new one. */
- _nns_edge_close_connection (conn_data->src_conn);
- conn_data->src_conn = conn;
+ _nns_edge_close_connection (conn_data->sink_conn);
+ conn_data->sink_conn = conn;
done = true;
}
error:
- if (done) {
- if (eh->is_server) {
- _nns_edge_tcp_connect (eh, connected_ip, connected_port);
- }
- } else {
+ if (!done) {
_nns_edge_close_connection (conn);
+ return NNS_EDGE_ERROR_CONNECTION_FAILURE;
}
- g_socket_listener_accept_socket_async (socket_listener,
- eh->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, eh);
-
- g_free (connected_ip);
+ return NNS_EDGE_ERROR_NONE;
}
/**
- * @brief Get available port number.
+ * @brief Message thread, receive buffer from the client.
*/
-static int
-_get_available_port (void)
-{
- struct sockaddr_in sin;
- int port = 0, sock;
- socklen_t len = sizeof (struct sockaddr);
-
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = INADDR_ANY;
- sock = socket (AF_INET, SOCK_STREAM, 0);
- sin.sin_port = port;
- if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) {
- getsockname (sock, (struct sockaddr *) &sin, &len);
- port = ntohs (sin.sin_port);
- nns_edge_logi ("Available port number: %d", port);
- }
- close (sock);
-
- return port;
-}
-
-/**
- * @brief Initialize the nnstreamer edge handle.
- */
-int
-nns_edge_start (nns_edge_h edge_h, bool is_server)
-{
- GSocketAddress *saddr = NULL;
- GError *err = NULL;
- int ret = 0;
- nns_edge_handle_s *eh;
-
- eh = (nns_edge_handle_s *) edge_h;
- if (!eh) {
- nns_edge_loge ("Invalid param, given edge handle is null.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- nns_edge_lock (eh);
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- eh->is_server = is_server;
- if (!is_server && 0 == eh->recv_port)
- eh->recv_port = _get_available_port ();
-
- /** Initialize server src data. */
- eh->cancellable = g_cancellable_new ();
- eh->listener = g_socket_listener_new ();
- g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
-
- if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, eh->cancellable,
- &saddr)) {
- nns_edge_loge ("Failed to get socket address");
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
- goto error;
- }
- if (!g_socket_listener_add_address (eh->listener, saddr,
- G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
- nns_edge_loge ("Failed to add address: %s", err->message);
- g_clear_error (&err);
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
- goto error;
- }
- g_object_unref (saddr);
- saddr = NULL;
-
- g_socket_listener_accept_socket_async (eh->listener,
- eh->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, eh);
-
-error:
- if (saddr)
- g_object_unref (saddr);
-
- nns_edge_unlock (eh);
- return ret;
-}
-
-/**
- * @brief Release the given handle.
- */
-int
-nns_edge_release_handle (nns_edge_h edge_h)
-{
- nns_edge_handle_s *eh;
-
- eh = (nns_edge_handle_s *) edge_h;
- if (!eh) {
- nns_edge_loge ("Invalid param, given edge handle is null.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- nns_edge_lock (eh);
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- eh->magic = NNS_EDGE_MAGIC_DEAD;
- eh->event_cb = NULL;
- eh->user_data = NULL;
- g_free (eh->id);
- g_free (eh->topic);
- g_free (eh->ip);
- g_free (eh->recv_ip);
- g_free (eh->caps_str);
- g_hash_table_destroy (eh->conn_table);
-
- nns_edge_unlock (eh);
- nns_edge_lock_destroy (eh);
- g_free (eh);
-
- return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Set the event callback.
- */
-int
-nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
- void *user_data)
-{
- nns_edge_handle_s *eh;
- int ret;
-
- eh = (nns_edge_handle_s *) edge_h;
- if (!eh) {
- nns_edge_loge ("Invalid param, given edge handle is null.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- nns_edge_lock (eh);
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED,
- NULL, 0, NULL);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to set new event callback.");
- nns_edge_unlock (eh);
- return ret;
- }
-
- eh->event_cb = cb;
- eh->user_data = user_data;
-
- nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Connect to requested socket using TCP.
- */
-static bool
-_nns_edge_connect_socket (nns_edge_conn_s * conn)
-{
- GError *err = NULL;
- GSocketAddress *saddr = NULL;
- bool ret = false;
-
- if (!_nns_edge_get_saddr (conn->ip, conn->port, conn->cancellable, &saddr)) {
- nns_edge_loge ("Failed to get socket address");
- return ret;
- }
-
- /* create sending client socket */
- conn->socket =
- g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
- G_SOCKET_PROTOCOL_TCP, &err);
-
- if (!conn->socket) {
- nns_edge_loge ("Failed to create new socket");
- goto done;
- }
-
- /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
- if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
- nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
- goto done;
- }
-
- if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- nns_edge_logd ("Cancelled connecting");
- } else {
- nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port);
- }
- goto done;
- }
-
- /* now connected to the requested socket */
- ret = true;
-
-done:
- g_object_unref (saddr);
- g_clear_error (&err);
- return ret;
-}
-
-/**
- * @brief [TCP] Receive buffer from the client
- * @param[in] conn connection info
- */
-static void *
-_message_handler (void *thread_data)
+static void *
+_nns_edge_message_handler (void *thread_data)
{
nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data;
nns_edge_handle_s *eh;
eh = (nns_edge_handle_s *) _tdata->eh;
conn = _tdata->conn;
client_id = _tdata->client_id;
- g_free (_tdata);
+ SAFE_FREE (_tdata);
+
+ /* Start message thread, add ref to clearly make the socket live. */
+ conn->running = 1;
while (conn->running) {
nns_edge_data_h data_h;
break;
}
- if (!_nns_edge_check_connection (conn))
- break;
-
/** Receive data from the client */
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
ret = _nns_edge_cmd_receive (conn, &cmd);
if (ret != NNS_EDGE_ERROR_NONE) {
nns_edge_loge ("Failed to receive data from the connected node.");
}
/* Set client ID in edge data */
- val = g_strdup_printf ("%ld", (long int) client_id);
+ val = nns_edge_strdup_printf ("%ld", (long int) client_id);
nns_edge_data_set_info (data_h, "client_id", val);
- g_free (val);
+ SAFE_FREE (val);
for (i = 0; i < cmd.info.num; i++) {
nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL);
}
ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED,
- data_h, sizeof (data_h), NULL);
+ data_h, sizeof (nns_edge_data_h), NULL);
if (ret != NNS_EDGE_ERROR_NONE) {
/* Try to get next request if server does not accept data from client. */
nns_edge_logw ("The server does not accept data from client.");
/** Create message receving thread */
pthread_attr_init (&attr);
- pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
- conn->running = 1;
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
+
thread_data->eh = eh;
thread_data->conn = conn;
thread_data->client_id = client_id;
- tid =
- pthread_create (&conn->msg_thread, &attr, _message_handler, thread_data);
+ tid = pthread_create (&conn->msg_thread, &attr, _nns_edge_message_handler,
+ thread_data);
pthread_attr_destroy (&attr);
if (tid < 0) {
nns_edge_loge ("Failed to create message handler thread.");
conn->running = 0;
- g_free (thread_data);
+ SAFE_FREE (thread_data);
return NNS_EDGE_ERROR_IO;
}
}
/**
- * @brief Connect to the destination node usig TCP.
+ * @brief Callback for socket listener, accept socket and create message thread.
*/
-static int
-_nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip, int port)
+static void
+_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result,
+ gpointer user_data)
{
+ GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
+ GSocket *socket = NULL;
+ GError *err = NULL;
+ nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
nns_edge_conn_s *conn = NULL;
- nns_edge_conn_data_s *conn_data;
nns_edge_cmd_s cmd;
- char *host;
- int64_t client_id;
bool done = false;
+ char *connected_ip = NULL;
+ int connected_port = 0;
+ nns_edge_conn_data_s *conn_data = NULL;
+ int64_t client_id;
int ret;
+ socket =
+ g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
+ &err);
+
+ if (!socket) {
+ nns_edge_loge ("Failed to get socket: %s", err->message);
+ g_clear_error (&err);
+ return;
+ }
+ g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC);
+
+ /* create socket with connection */
conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s));
if (!conn) {
- nns_edge_loge ("Failed to allocate client data.");
+ nns_edge_loge ("Failed to allocate edge connection");
goto error;
}
memset (conn, 0, sizeof (nns_edge_conn_s));
- conn->ip = g_strdup (ip);
- conn->port = port;
- conn->cancellable = g_cancellable_new ();
+ conn->socket = socket;
- if (!_nns_edge_connect_socket (conn)) {
+ /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+ if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
+ nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
+ g_clear_error (&err);
goto error;
}
- /* Get destination capability. */
- ret = _nns_edge_cmd_receive (conn, &cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to receive capability.");
- goto error;
- }
+ client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id;
- if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
- nns_edge_loge ("Failed to get capability.");
- _nns_edge_cmd_clear (&cmd);
- goto error;
- }
+ /* Send capability and info to check compatibility. */
+ if (eh->is_server) {
+ if (!STR_IS_VALID (eh->caps_str)) {
+ nns_edge_loge ("Cannot accept socket, invalid capability.");
+ goto error;
+ }
- client_id = eh->client_id = cmd.info.client_id;
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
+ cmd.info.num = 1;
+ cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
+ cmd.mem[0] = eh->caps_str;
- /* Check compatibility. */
- ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
- cmd.mem[0], cmd.info.mem_size[0], NULL);
- _nns_edge_cmd_clear (&cmd);
+ ret = _nns_edge_cmd_send (conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to send capability.");
+ goto error;
+ }
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("The event returns error, capability is not acceptable.");
- _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
- } else {
- /* Send ip and port to destination. */
- _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
+ /* Receive ip and port from destination. */
+ ret = _nns_edge_cmd_receive (conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to receive node info.");
+ goto error;
+ }
- _get_host_str (eh->recv_ip, eh->recv_port, &host);
- cmd.info.num = 1;
- cmd.info.mem_size[0] = strlen (host) + 1;
- cmd.mem[0] = host;
- }
+ if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
+ nns_edge_loge ("Failed to get host info.");
+ _nns_edge_cmd_clear (&cmd);
+ goto error;
+ }
+
+ _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
+ _nns_edge_cmd_clear (&cmd);
- ret = _nns_edge_cmd_send (conn, &cmd);
- _nns_edge_cmd_clear (&cmd);
+ /* Connect to client listener. */
+ ret = _nns_edge_connect_to (eh, client_id, connected_ip, connected_port);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to connect host %s:%d.",
+ connected_ip, connected_port);
+ goto error;
+ }
+ }
+ ret = _nns_edge_create_message_thread (eh, conn, client_id);
if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to send host info.");
+ nns_edge_loge ("Failed to create message handle thread.");
goto error;
}
- conn_data = _nns_edge_add_conn (eh, client_id);
+ conn_data = _nns_edge_add_connection (eh, client_id);
if (conn_data) {
/* Close old connection and set new one. */
- _nns_edge_close_connection (conn_data->sink_conn);
- conn_data->sink_conn = conn;
+ _nns_edge_close_connection (conn_data->src_conn);
+ conn_data->src_conn = conn;
done = true;
}
error:
if (!done) {
_nns_edge_close_connection (conn);
- return NNS_EDGE_ERROR_CONNECTION_FAILURE;
}
+ if (eh->listener)
+ g_socket_listener_accept_socket_async (eh->listener, NULL,
+ (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
+
+ SAFE_FREE (connected_ip);
+}
+
+/**
+ * @brief Get registered handle. If not registered, create new handle and register it.
+ */
+int
+nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
+{
+ nns_edge_handle_s *eh;
+
+ if (!STR_IS_VALID (id)) {
+ nns_edge_loge ("Invalid param, given ID is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!STR_IS_VALID (topic)) {
+ nns_edge_loge ("Invalid param, given topic is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!edge_h) {
+ nns_edge_loge ("Invalid param, edge_h should not be null.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ /**
+ * @todo manage edge handles
+ * 1. consider adding hash table or list to manage edge handles.
+ * 2. compare topic and return error if existing topic in handle is different.
+ */
+ eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s));
+ if (!eh) {
+ nns_edge_loge ("Failed to allocate memory for edge handle.");
+ return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+ }
+
+ memset (eh, 0, sizeof (nns_edge_handle_s));
+ nns_edge_lock_init (eh);
+ eh->magic = NNS_EDGE_MAGIC;
+ eh->id = nns_edge_strdup (id);
+ eh->topic = nns_edge_strdup (topic);
+ eh->protocol = NNS_EDGE_PROTOCOL_TCP;
+ eh->is_server = true;
+ eh->recv_ip = nns_edge_strdup ("localhost");
+ eh->recv_port = 0;
+ eh->caps_str = NULL;
+
+ /* Connection data for each client ID. */
+ eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
+ _nns_edge_remove_connection);
+
+ *edge_h = eh;
return NNS_EDGE_ERROR_NONE;
}
/**
- * @brief Connect to the destination node.
+ * @brief Start the nnstreamer edge.
*/
int
-nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
- const char *ip, int port)
+nns_edge_start (nns_edge_h edge_h, bool is_server)
{
+ GSocketAddress *saddr = NULL;
+ GError *err = NULL;
+ int ret = 0;
nns_edge_handle_s *eh;
- int ret;
eh = (nns_edge_handle_s *) edge_h;
if (!eh) {
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- if (!ip || *ip == '\0') {
- nns_edge_loge ("Invalid param, given IP is invalid.");
+ nns_edge_lock (eh);
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ eh->is_server = is_server;
+ if (!is_server && 0 == eh->recv_port) {
+ eh->recv_port = _get_available_port ();
+ if (eh->recv_port <= 0) {
+ nns_edge_loge ("Failed to start edge. Cannot get available port.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ }
+ }
+
+ /** Initialize server src data. */
+ eh->listener = g_socket_listener_new ();
+ g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
+
+ if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, &saddr)) {
+ nns_edge_loge ("Failed to get socket address");
+ ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ goto error;
+ }
+ if (!g_socket_listener_add_address (eh->listener, saddr,
+ G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
+ nns_edge_loge ("Failed to add address: %s", err->message);
+ g_clear_error (&err);
+ ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ goto error;
+ }
+
+ g_socket_listener_accept_socket_async (eh->listener, NULL,
+ (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
+
+error:
+ if (saddr)
+ g_object_unref (saddr);
+
+ nns_edge_unlock (eh);
+ return ret;
+}
+
+/**
+ * @brief Release the given handle.
+ */
+int
+nns_edge_release_handle (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+
+ eh = (nns_edge_handle_s *) edge_h;
+ if (!eh) {
+ nns_edge_loge ("Invalid param, given edge handle is null.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- if (!eh->event_cb) {
- nns_edge_loge ("NNStreamer-edge event callback is not registered.");
- nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ eh->magic = NNS_EDGE_MAGIC_DEAD;
+ eh->event_cb = NULL;
+ eh->user_data = NULL;
+
+ if (eh->listener)
+ g_clear_object (&eh->listener);
+
+ g_hash_table_destroy (eh->conn_table);
+ eh->conn_table = NULL;
+
+ SAFE_FREE (eh->id);
+ SAFE_FREE (eh->topic);
+ SAFE_FREE (eh->ip);
+ SAFE_FREE (eh->recv_ip);
+ SAFE_FREE (eh->caps_str);
+
+ nns_edge_unlock (eh);
+ nns_edge_lock_destroy (eh);
+ SAFE_FREE (eh);
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Set the event callback.
+ */
+int
+nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
+ void *user_data)
+{
+ nns_edge_handle_s *eh;
+ int ret;
+
+ eh = (nns_edge_handle_s *) edge_h;
+ if (!eh) {
+ nns_edge_loge ("Invalid param, given edge handle is null.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- eh->is_server = false;
- eh->protocol = protocol;
+ nns_edge_lock (eh);
- /** Connect to info channel. */
- ret = _nns_edge_tcp_connect (eh, ip, port);
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED,
+ NULL, 0, NULL);
if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to connect to %s:%d", ip, port);
+ nns_edge_loge ("Failed to set new event callback.");
+ nns_edge_unlock (eh);
+ return ret;
}
+ eh->event_cb = cb;
+ eh->user_data = user_data;
+
nns_edge_unlock (eh);
- return ret;
+ return NNS_EDGE_ERROR_NONE;
}
/**
- * @brief Close connection
+ * @brief Connect to the destination node.
*/
-static bool
-_nns_edge_close_connection (nns_edge_conn_s * conn)
+int
+nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
+ const char *ip, int port)
{
- GError *err = NULL;
+ nns_edge_handle_s *eh;
+ int ret;
- if (!conn)
- return false;
+ eh = (nns_edge_handle_s *) edge_h;
+ if (!eh) {
+ nns_edge_loge ("Invalid param, given edge handle is null.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
- if (conn->running) {
- conn->running = 0;
- pthread_join (conn->msg_thread, NULL);
+ if (!STR_IS_VALID (ip)) {
+ nns_edge_loge ("Invalid param, given IP is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- if (conn->socket) {
- if (!g_socket_close (conn->socket, &err)) {
- nns_edge_loge ("Failed to close socket: %s", err->message);
- g_clear_error (&err);
- return false;
- }
- g_object_unref (conn->socket);
- conn->socket = NULL;
+ nns_edge_lock (eh);
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- if (conn->cancellable) {
- g_object_unref (conn->cancellable);
- conn->cancellable = NULL;
+ if (!eh->event_cb) {
+ nns_edge_loge ("NNStreamer-edge event callback is not registered.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_CONNECTION_FAILURE;
}
- g_free (conn->ip);
- g_free (conn);
- return true;
+ eh->protocol = protocol;
+
+ /** Connect to info channel. */
+ ret = _nns_edge_connect_to (eh, eh->client_id, ip, port);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to connect to %s:%d", ip, port);
+ }
+
+ nns_edge_unlock (eh);
+ return ret;
}
/**
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- conn_data = _nns_edge_get_conn (eh, eh->client_id);
- if (!_nns_edge_check_connection (conn_data->sink_conn)) {
+ conn_data = _nns_edge_get_connection (eh, eh->client_id);
+ if (!_nns_edge_check_connection (conn_data->sink_conn->socket)) {
nns_edge_loge ("Failed to request, connection failure.");
nns_edge_unlock (eh);
return NNS_EDGE_ERROR_CONNECTION_FAILURE;
}
ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE)
+ nns_edge_loge ("Failed to request, cannot send edge data.");
nns_edge_unlock (eh);
return ret;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- *topic = g_strdup (eh->topic);
+ *topic = nns_edge_strdup (eh->topic);
nns_edge_unlock (eh);
return NNS_EDGE_ERROR_NONE;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
+ if (!STR_IS_VALID (key)) {
+ nns_edge_loge ("Invalid param, given key is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!STR_IS_VALID (value)) {
+ nns_edge_loge ("Invalid param, given value is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
nns_edge_lock (eh);
if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
* @todo Change key-value set as json or hash table.
*/
if (0 == g_ascii_strcasecmp (key, "CAPS")) {
- ret_str = g_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value);
- g_free (eh->caps_str);
+ ret_str = nns_edge_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value);
+ SAFE_FREE (eh->caps_str);
eh->caps_str = ret_str;
} else if (0 == g_ascii_strcasecmp (key, "IP")) {
- g_free (eh->recv_ip);
- eh->recv_ip = g_strdup (value);
+ SAFE_FREE (eh->recv_ip);
+ eh->recv_ip = nns_edge_strdup (value);
} else if (0 == g_ascii_strcasecmp (key, "PORT")) {
eh->recv_port = g_ascii_strtoll (value, NULL, 10);
} else if (0 == g_ascii_strcasecmp (key, "TOPIC")) {
- g_free (eh->topic);
- eh->topic = g_strdup (value);
+ SAFE_FREE (eh->topic);
+ eh->topic = nns_edge_strdup (value);
} else {
nns_edge_logw ("Failed to set edge info. Unknown key: %s", key);
}
}
client_id = g_ascii_strtoll (val, NULL, 10);
- g_free (val);
+ SAFE_FREE (val);
- conn_data = _nns_edge_get_conn (eh, client_id);
+ conn_data = _nns_edge_get_connection (eh, client_id);
if (!conn_data) {
nns_edge_loge ("Cannot find connection, invalid client ID.");
nns_edge_unlock (eh);
}
ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE)
+ nns_edge_loge ("Failed to respond, cannot send edge data.");
nns_edge_unlock (eh);
return ret;