[Edge] Update edge lib
authorgichan <gichan2.jang@samsung.com>
Thu, 14 Jul 2022 05:32:02 +0000 (14:32 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 20 Jul 2022 06:36:16 +0000 (15:36 +0900)
Update edge lib
 - This chagnes will be removed soon.

Signed-off-by: gichan <gichan2.jang@samsung.com>
gst/nnstreamer/tensor_query/meson.build
gst/nnstreamer/tensor_query/nnstreamer-edge-common.h [moved from gst/nnstreamer/tensor_query/nnstreamer_edge_common.h with 77% similarity]
gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h [moved from gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h with 98% similarity]
gst/nnstreamer/tensor_query/nnstreamer_edge_common.c
gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c
gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c

index 755fcbd..1830053 100644 (file)
@@ -26,8 +26,6 @@ nnstreamer_edge_deps = [
   glib_dep, gio_dep, thread_dep
 ]
 
-nnstreamer_headers += join_paths(meson.current_source_dir(), 'nnstreamer-edge.h')
-
 if aitt_support_is_available
   nnstreamer_edge_sources += join_paths(meson.current_source_dir(), 'nnstreamer_edge_aitt.c')
   nnstreamer_edge_deps += aitt_support_deps
@@ -1,8 +1,8 @@
-/* SPDX-License-Identifier: LGPL-2.1-only */
+/* SPDX-License-Identifier: Apache-2.0 */
 /**
  * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
  *
- * @file   nnstreamer_edge_common.h
+ * @file   nnstreamer-edge-common.h
  * @date   6 April 2022
  * @brief  Common util functions for nnstreamer edge.
  * @see    https://github.com/nnstreamer/nnstreamer
@@ -29,14 +29,8 @@ extern "C" {
 #define UNUSED(expr) do { (void)(expr); } while (0)
 #endif
 
-/**
- * @brief g_memdup() function replaced by g_memdup2() in glib version >= 2.68
- */
-#if GLIB_USE_G_MEMDUP2
-#define _g_memdup g_memdup2
-#else
-#define _g_memdup g_memdup
-#endif
+#define STR_IS_VALID(s) ((s) && (s)[0] != '\0')
+#define SAFE_FREE(p) do { if (p) { free (p); (p) = NULL; } } while (0)
 
 #define NNS_EDGE_MAGIC 0xfeedfeed
 #define NNS_EDGE_MAGIC_DEAD 0xdeaddead
@@ -88,6 +82,29 @@ typedef struct {
 #define nns_edge_logf g_error
 
 /**
+ * @brief Free allocated memory.
+ */
+void nns_edge_free (void *data);
+
+/**
+ * @brief Allocate new memory and copy bytes.
+ * @note Caller should release newly allocated memory using nns_edge_free().
+ */
+void *nns_edge_memdup (const void *data, size_t size);
+
+/**
+ * @brief Allocate new memory and copy string.
+ * @note Caller should release newly allocated string using nns_edge_free().
+ */
+char *nns_edge_strdup (const char *str);
+
+/**
+ * @brief Allocate new memory and print formatted string.
+ * @note Caller should release newly allocated string using nns_edge_free().
+ */
+char *nns_edge_strdup_printf (const char *format, ...);
+
+/**
  * @brief Create nnstreamer edge event.
  * @note This is internal function for edge event.
  */
@@ -45,9 +45,7 @@ typedef struct {
   char *recv_ip;
   int recv_port;
   GHashTable *conn_table;
-
   GSocketListener *listener;
-  GCancellable *cancellable;
 
   /* MQTT */
   void *mqtt_handle;
index 7ea1533..dfd26ef 100644 (file)
@@ -1,8 +1,8 @@
-/* SPDX-License-Identifier: LGPL-2.1-only */
+/* SPDX-License-Identifier: Apache-2.0 */
 /**
  * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
  *
- * @file   nnstreamer_edge_internal.c
+ * @file   nnstreamer-edge-common.c
  * @date   6 April 2022
  * @brief  Common util functions for nnstreamer edge.
  * @see    https://github.com/nnstreamer/nnstreamer
  * @bug    No known bugs except for NYI items
  */
 
-#include "nnstreamer_edge_common.h"
+#define _GNU_SOURCE
+#include <stdio.h>
+
+#include "nnstreamer-edge-common.h"
+
+/**
+ * @brief Free allocated memory.
+ */
+void
+nns_edge_free (void *data)
+{
+  if (data)
+    free (data);
+}
+
+/**
+ * @brief Allocate new memory and copy bytes.
+ * @note Caller should release newly allocated memory using nns_edge_free().
+ */
+void *
+nns_edge_memdup (const void *data, size_t size)
+{
+  void *mem = NULL;
+
+  if (data && size > 0) {
+    mem = malloc (size);
+
+    if (mem) {
+      memcpy (mem, data, size);
+    } else {
+      nns_edge_loge ("Failed to allocate memory (%zd).", size);
+    }
+  }
+
+  return mem;
+}
+
+/**
+ * @brief Allocate new memory and copy string.
+ * @note Caller should release newly allocated string using nns_edge_free().
+ */
+char *
+nns_edge_strdup (const char *str)
+{
+  char *new_str = NULL;
+  size_t len;
+
+  if (str) {
+    len = strlen (str);
+
+    new_str = (char *) malloc (len + 1);
+    if (new_str) {
+      memcpy (new_str, str, len);
+      new_str[len] = '\0';
+    } else {
+      nns_edge_loge ("Failed to allocate memory (%zd).", len + 1);
+    }
+  }
+
+  return new_str;
+}
+
+/**
+ * @brief Allocate new memory and print formatted string.
+ * @note Caller should release newly allocated string using nns_edge_free().
+ */
+char *
+nns_edge_strdup_printf (const char *format, ...)
+{
+  char *new_str = NULL;
+  va_list args;
+  int len;
+
+  va_start (args, format);
+  len = vasprintf (&new_str, format, args);
+  if (len < 0)
+    new_str = NULL;
+  va_end (args);
+
+  return new_str;
+}
 
 /**
  * @brief Create nnstreamer edge event.
@@ -64,7 +144,7 @@ nns_edge_event_destroy (nns_edge_event_h event_h)
   if (ee->data.destroy_cb)
     ee->data.destroy_cb (ee->data.data);
 
-  g_free (ee);
+  SAFE_FREE (ee);
   return NNS_EDGE_ERROR_NONE;
 }
 
@@ -180,7 +260,7 @@ nns_edge_event_parse_capability (nns_edge_event_h event_h, char **capability)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  *capability = g_strdup (ee->data.data);
+  *capability = nns_edge_strdup (ee->data.data);
 
   return NNS_EDGE_ERROR_NONE;
 }
@@ -206,8 +286,8 @@ nns_edge_data_create (nns_edge_data_h * data_h)
 
   memset (ed, 0, sizeof (nns_edge_data_s));
   ed->magic = NNS_EDGE_MAGIC;
-  ed->info_table =
-      g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
+  ed->info_table = g_hash_table_new_full (g_str_hash, g_str_equal,
+      nns_edge_free, nns_edge_free);
 
   *data_h = ed;
   return NNS_EDGE_ERROR_NONE;
@@ -238,7 +318,7 @@ nns_edge_data_destroy (nns_edge_data_h data_h)
 
   g_hash_table_destroy (ed->info_table);
 
-  g_free (ed);
+  SAFE_FREE (ed);
   return NNS_EDGE_ERROR_NONE;
 }
 
@@ -295,14 +375,16 @@ nns_edge_data_copy (nns_edge_data_h data_h, nns_edge_data_h * new_data_h)
 
   copied->num = ed->num;
   for (i = 0; i < ed->num; i++) {
-    copied->data[i].data = _g_memdup (ed->data[i].data, ed->data[i].data_len);
+    copied->data[i].data = nns_edge_memdup (ed->data[i].data,
+        ed->data[i].data_len);
     copied->data[i].data_len = ed->data[i].data_len;
-    copied->data[i].destroy_cb = g_free;
+    copied->data[i].destroy_cb = nns_edge_free;
   }
 
   g_hash_table_iter_init (&iter, ed->info_table);
   while (g_hash_table_iter_next (&iter, &key, &value)) {
-    g_hash_table_insert (copied->info_table, g_strdup (key), g_strdup (value));
+    g_hash_table_insert (copied->info_table, nns_edge_strdup (key),
+        nns_edge_strdup (value));
   }
 
   return NNS_EDGE_ERROR_NONE;
@@ -419,17 +501,18 @@ nns_edge_data_set_info (nns_edge_data_h data_h, const char *key,
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (!key || *key == '\0') {
+  if (!STR_IS_VALID (key)) {
     nns_edge_loge ("Invalid param, given key is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (!value || *value == '\0') {
+  if (!STR_IS_VALID (value)) {
     nns_edge_loge ("Invalid param, given value is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  g_hash_table_insert (ed->info_table, g_strdup (key), g_strdup (value));
+  g_hash_table_insert (ed->info_table, nns_edge_strdup (key),
+      nns_edge_strdup (value));
 
   return NNS_EDGE_ERROR_NONE;
 }
@@ -450,7 +533,7 @@ nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (!key || *key == '\0') {
+  if (!STR_IS_VALID (key)) {
     nns_edge_loge ("Invalid param, given key is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
@@ -466,7 +549,7 @@ nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  *value = g_strdup (val);
+  *value = nns_edge_strdup (val);
 
   return NNS_EDGE_ERROR_NONE;
 }
index 43b218b..5d76533 100644 (file)
@@ -1,8 +1,8 @@
-/* SPDX-License-Identifier: LGPL-2.1-only */
+/* SPDX-License-Identifier: Apache-2.0 */
 /**
  * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
  *
- * @file   nnstreamer_edge_internal.c
+ * @file   nnstreamer-edge-internal.c
  * @date   6 April 2022
  * @brief  Common library to support communication among devices.
  * @see    https://github.com/nnstreamer/nnstreamer
@@ -10,8 +10,8 @@
  * @bug    No known bugs except for NYI items
  */
 
-#include "nnstreamer_edge_common.h"
-#include "nnstreamer_edge_internal.h"
+#include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-internal.h"
 
 #define N_BACKLOG 10
 #define DEFAULT_TIMEOUT_SEC 10
@@ -34,6 +34,7 @@ typedef enum
  */
 typedef struct
 {
+  unsigned int magic;
   nns_edge_cmd_e cmd;
   int64_t client_id;
 
@@ -52,7 +53,7 @@ typedef struct
 } nns_edge_cmd_s;
 
 /**
- * @brief Data structure for edge TCP connection.
+ * @brief Data structure for edge connection.
  */
 typedef struct
 {
@@ -61,7 +62,6 @@ typedef struct
   int8_t running;
   pthread_t msg_thread;
   GSocket *socket;
-  GCancellable *cancellable;
 } nns_edge_conn_s;
 
 /**
@@ -84,18 +84,11 @@ typedef struct
   nns_edge_conn_s *conn;
 } nns_edge_thread_data_s;
 
-static bool _nns_edge_close_connection (nns_edge_conn_s * conn);
-static int _nns_edge_create_message_thread (nns_edge_handle_s * eh,
-    nns_edge_conn_s * conn, int64_t client_id);
-static int _nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip,
-    int port);
-
 /**
  * @brief Send data to connected socket.
  */
 static bool
-_send_raw_data (GSocket * socket, void *data, size_t size,
-    GCancellable * cancellable)
+_send_raw_data (GSocket * socket, void *data, size_t size)
 {
   size_t bytes_sent = 0;
   ssize_t rret;
@@ -103,7 +96,7 @@ _send_raw_data (GSocket * socket, void *data, size_t size,
 
   while (bytes_sent < size) {
     rret = g_socket_send (socket, (char *) data + bytes_sent,
-        size - bytes_sent, cancellable, &err);
+        size - bytes_sent, NULL, &err);
 
     if (rret == 0) {
       nns_edge_loge ("Connection closed.");
@@ -123,11 +116,32 @@ _send_raw_data (GSocket * socket, void *data, size_t size,
 }
 
 /**
+ * @brief Internal function to check connection.
+ */
+static bool
+_nns_edge_check_connection (GSocket * socket)
+{
+  GIOCondition condition;
+
+  if (!socket || g_socket_is_closed (socket))
+    return false;
+
+  condition = g_socket_condition_check (socket,
+      G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+  if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) {
+    nns_edge_logw ("Socket is not available, possibly closed.");
+    return false;
+  }
+
+  return true;
+}
+
+/**
  * @brief Receive data from connected socket.
  */
 static bool
-_receive_raw_data (GSocket * socket, void *data, size_t size,
-    GCancellable * cancellable)
+_receive_raw_data (GSocket * socket, void *data, size_t size)
 {
   size_t bytes_received = 0;
   ssize_t rret;
@@ -135,7 +149,7 @@ _receive_raw_data (GSocket * socket, void *data, size_t size,
 
   while (bytes_received < size) {
     rret = g_socket_receive (socket, (char *) data + bytes_received,
-        size - bytes_received, cancellable, &err);
+        size - bytes_received, NULL, &err);
 
     if (rret == 0) {
       nns_edge_loge ("Connection closed.");
@@ -174,7 +188,38 @@ _parse_host_str (const char *host, char **ip, int *port)
 static void
 _get_host_str (const char *ip, const int port, char **host)
 {
-  *host = g_strdup_printf ("%s:%d", ip, port);
+  *host = nns_edge_strdup_printf ("%s:%d", ip, port);
+}
+
+/**
+ * @brief Get available port number.
+ */
+static int
+_get_available_port (void)
+{
+  struct sockaddr_in sin;
+  int port = 0, sock;
+  socklen_t len = sizeof (struct sockaddr);
+
+  sin.sin_family = AF_INET;
+  sin.sin_addr.s_addr = INADDR_ANY;
+  sock = socket (AF_INET, SOCK_STREAM, 0);
+  if (sock < 0) {
+    nns_edge_loge ("Failed to get available port. Socket creation failure.");
+    return 0;
+  }
+  sin.sin_port = port;
+  if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) {
+    if (getsockname (sock, (struct sockaddr *) &sin, &len) == 0) {
+      port = ntohs (sin.sin_port);
+      nns_edge_logi ("Available port number: %d", port);
+    } else {
+      nns_edge_logw ("Failed to read local socket info.");
+    }
+  }
+  close (sock);
+
+  return port;
 }
 
 /**
@@ -187,6 +232,7 @@ _nns_edge_cmd_init (nns_edge_cmd_s * cmd, nns_edge_cmd_e c, int64_t cid)
     return;
 
   memset (cmd, 0, sizeof (nns_edge_cmd_s));
+  cmd->info.magic = NNS_EDGE_MAGIC;
   cmd->info.cmd = c;
   cmd->info.client_id = cid;
 }
@@ -202,11 +248,32 @@ _nns_edge_cmd_clear (nns_edge_cmd_s * cmd)
   if (!cmd)
     return;
 
+  cmd->info.magic = NNS_EDGE_MAGIC_DEAD;
+
   for (i = 0; i < cmd->info.num; i++) {
-    if (cmd->mem[i])
-      free (cmd->mem[i]);
-    cmd->mem[i] = NULL;
+    SAFE_FREE (cmd->mem[i]);
+  }
+}
+
+/**
+ * @brief Validate edge command.
+ */
+static bool
+_nns_edge_cmd_is_valid (nns_edge_cmd_s * cmd)
+{
+  int command;
+
+  if (!cmd)
+    return false;
+
+  command = (int) cmd->info.cmd;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (&cmd->info) ||
+      (command < 0 || command >= _NNS_EDGE_CMD_END)) {
+    return false;
   }
+
+  return true;
 }
 
 /**
@@ -217,18 +284,28 @@ _nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
 {
   unsigned int n;
 
-  if (!conn || !cmd)
+  if (!conn) {
+    nns_edge_loge ("Failed to send command, edge connection is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!_nns_edge_cmd_is_valid (cmd)) {
+    nns_edge_loge ("Failed to send command, invalid command.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!_nns_edge_check_connection (conn->socket)) {
+    nns_edge_loge ("Failed to send command, socket has error.");
+    return NNS_EDGE_ERROR_IO;
+  }
 
-  if (!_send_raw_data (conn->socket, &cmd->info,
-          sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+  if (!_send_raw_data (conn->socket, &cmd->info, sizeof (nns_edge_cmd_info_s))) {
     nns_edge_loge ("Failed to send command to socket.");
     return NNS_EDGE_ERROR_IO;
   }
 
   for (n = 0; n < cmd->info.num; n++) {
-    if (!_send_raw_data (conn->socket, cmd->mem[n],
-            cmd->info.mem_size[n], conn->cancellable)) {
+    if (!_send_raw_data (conn->socket, cmd->mem[n], cmd->info.mem_size[n])) {
       nns_edge_loge ("Failed to send %uth memory to socket.", n);
       return NNS_EDGE_ERROR_IO;
     }
@@ -249,12 +326,22 @@ _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
   if (!conn || !cmd)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
 
+  if (!_nns_edge_check_connection (conn->socket)) {
+    nns_edge_loge ("Failed to receive command, socket has error.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
   if (!_receive_raw_data (conn->socket, &cmd->info,
-          sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+          sizeof (nns_edge_cmd_info_s))) {
     nns_edge_loge ("Failed to receive command from socket.");
     return NNS_EDGE_ERROR_IO;
   }
 
+  if (!_nns_edge_cmd_is_valid (cmd)) {
+    nns_edge_loge ("Failed to receive command, invalid command.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
   nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num);
   if (cmd->info.num >= NNS_EDGE_DATA_LIMIT) {
     nns_edge_loge ("Invalid request, the max memories for data transfer is %d.",
@@ -270,8 +357,7 @@ _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
       break;
     }
 
-    if (!_receive_raw_data (conn->socket, cmd->mem[n],
-            cmd->info.mem_size[n], conn->cancellable)) {
+    if (!_receive_raw_data (conn->socket, cmd->mem[n], cmd->info.mem_size[n])) {
       nns_edge_loge ("Failed to receive %uth memory from socket.", n++);
       ret = NNS_EDGE_ERROR_IO;
       break;
@@ -280,8 +366,7 @@ _nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
 
   if (ret != NNS_EDGE_ERROR_NONE) {
     for (i = 0; i < n; i++) {
-      free (cmd->mem[i]);
-      cmd->mem[i] = NULL;
+      SAFE_FREE (cmd->mem[i]);
     }
   }
 
@@ -335,11 +420,44 @@ error:
 }
 
 /**
+ * @brief Close connection
+ */
+static bool
+_nns_edge_close_connection (nns_edge_conn_s * conn)
+{
+  if (!conn)
+    return false;
+
+  if (conn->running && conn->msg_thread) {
+    conn->running = 0;
+    pthread_cancel (conn->msg_thread);
+    pthread_join (conn->msg_thread, NULL);
+    conn->msg_thread = 0;
+  }
+
+  if (conn->socket) {
+    nns_edge_cmd_s cmd;
+
+    /* Send error before closing the socket. */
+    nns_edge_logd ("Send error cmd to close connection.");
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0);
+    _nns_edge_cmd_send (conn, &cmd);
+
+    /* Release socket. If its last reference is dropped, it will close socket automatically. */
+    g_clear_object (&conn->socket);
+  }
+
+  SAFE_FREE (conn->ip);
+  SAFE_FREE (conn);
+  return true;
+}
+
+/**
  * @brief Get nnstreamer-edge connection data.
  * @note This function should be called with handle lock.
  */
 static nns_edge_conn_data_s *
-_nns_edge_get_conn (nns_edge_handle_s * eh, int64_t client_id)
+_nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id)
 {
   if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
@@ -354,7 +472,7 @@ _nns_edge_get_conn (nns_edge_handle_s * eh, int64_t client_id)
  * @note This function should be called with handle lock.
  */
 static nns_edge_conn_data_s *
-_nns_edge_add_conn (nns_edge_handle_s * eh, int64_t client_id)
+_nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id)
 {
   nns_edge_conn_data_s *data = NULL;
 
@@ -385,48 +503,24 @@ _nns_edge_add_conn (nns_edge_handle_s * eh, int64_t client_id)
  * @brief Remove nnstreamer-edge connection data. This will be called when removing connection data from hash table.
  */
 static void
-_nns_edge_remove_conn (gpointer data)
+_nns_edge_remove_connection (gpointer data)
 {
   nns_edge_conn_data_s *cdata = (nns_edge_conn_data_s *) data;
 
   if (cdata) {
     _nns_edge_close_connection (cdata->src_conn);
     _nns_edge_close_connection (cdata->sink_conn);
+    cdata->src_conn = cdata->sink_conn = NULL;
 
-    g_free (cdata);
-  }
-}
-
-/**
- * @brief Internal function to check connection.
- */
-static bool
-_nns_edge_check_connection (nns_edge_conn_s * conn)
-{
-  size_t size;
-  GIOCondition condition;
-
-  if (!conn)
-    return false;
-
-  condition = g_socket_condition_check (conn->socket,
-      G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
-  size = g_socket_get_available_bytes (conn->socket);
-
-  if (condition && size <= 0) {
-    nns_edge_logw ("Socket is not available, possibly EOS.");
-    return false;
+    SAFE_FREE (cdata);
   }
-
-  return true;
 }
 
 /**
  * @brief Get socket address
  */
 static bool
-_nns_edge_get_saddr (const char *ip, const int port,
-    GCancellable * cancellable, GSocketAddress ** saddr)
+_nns_edge_get_saddr (const char *ip, const int port, GSocketAddress ** saddr)
 {
   GError *err = NULL;
   GInetAddress *addr;
@@ -437,7 +531,7 @@ _nns_edge_get_saddr (const char *ip, const int port,
     GList *results;
     GResolver *resolver;
     resolver = g_resolver_get_default ();
-    results = g_resolver_lookup_by_name (resolver, ip, cancellable, &err);
+    results = g_resolver_lookup_by_name (resolver, ip, NULL, &err);
     if (!results) {
       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
         nns_edge_loge ("Failed to resolve ip, name resolver is cancelled.");
@@ -461,384 +555,148 @@ _nns_edge_get_saddr (const char *ip, const int port,
 }
 
 /**
- * @brief Get registered handle. If not registered, create new handle and register it.
+ * @brief Connect to requested socket.
  */
-int
-nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
+static bool
+_nns_edge_connect_socket (nns_edge_conn_s * conn)
 {
-  nns_edge_handle_s *eh;
+  GError *err = NULL;
+  GSocketAddress *saddr = NULL;
+  bool ret = false;
 
-  if (!id || *id == '\0') {
-    nns_edge_loge ("Invalid param, given ID is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  if (!_nns_edge_get_saddr (conn->ip, conn->port, &saddr)) {
+    nns_edge_loge ("Failed to get socket address");
+    return ret;
   }
 
-  if (!topic || *topic == '\0') {
-    nns_edge_loge ("Invalid param, given topic is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
+  /* create sending client socket */
+  conn->socket =
+      g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
+      G_SOCKET_PROTOCOL_TCP, &err);
 
-  if (!edge_h) {
-    nns_edge_loge ("Invalid param, edge_h should not be null.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  if (!conn->socket) {
+    nns_edge_loge ("Failed to create new socket");
+    goto done;
   }
 
-  /**
-   * @todo manage edge handles
-   * 1. consider adding hash table or list to manage edge handles.
-   * 2. compare topic and return error if existing topic in handle is different.
-   */
-  eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s));
-  if (!eh) {
-    nns_edge_loge ("Failed to allocate memory for edge handle.");
-    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+  if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
+    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
+    goto done;
   }
 
-  memset (eh, 0, sizeof (nns_edge_handle_s));
-  nns_edge_lock_init (eh);
-  eh->magic = NNS_EDGE_MAGIC;
-  eh->id = g_strdup (id);
-  eh->topic = g_strdup (topic);
-  eh->protocol = NNS_EDGE_PROTOCOL_TCP;
-  eh->is_server = true;
-  eh->recv_ip = g_strdup ("localhost");
-  eh->recv_port = 0;
-  eh->caps_str = NULL;
+  if (!g_socket_connect (conn->socket, saddr, NULL, &err)) {
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      nns_edge_logd ("Cancelled connecting");
+    } else {
+      nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port);
+    }
+    goto done;
+  }
 
-  /* Connection data for each client ID. */
-  eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
-      _nns_edge_remove_conn);
+  /* now connected to the requested socket */
+  ret = true;
 
-  *edge_h = eh;
-  return NNS_EDGE_ERROR_NONE;
+done:
+  g_object_unref (saddr);
+  g_clear_error (&err);
+  return ret;
 }
 
 /**
- * @brief [TCP] Callback for src socket listener that pushes socket to the queue
+ * @brief Connect to the destination node. (host:sender(sink) - dest:receiver(listener, src))
  */
-static void
-accept_socket_async_cb (GObject * source, GAsyncResult * result,
-    gpointer user_data)
+static int
+_nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id,
+    const char *ip, int port)
 {
-  GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
-  GSocket *socket = NULL;
-  GError *err = NULL;
-  nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
   nns_edge_conn_s *conn = NULL;
+  nns_edge_conn_data_s *conn_data;
   nns_edge_cmd_s cmd;
+  char *host;
   bool done = false;
-  char *connected_ip = NULL;
-  int connected_port = 0;
-  nns_edge_conn_data_s *conn_data = NULL;
-  int64_t client_id;
   int ret;
 
-  socket =
-      g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
-      &err);
-
-  if (!socket) {
-    nns_edge_loge ("Failed to get socket: %s", err->message);
-    g_clear_error (&err);
-    goto error;
-  }
-  g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC);
-
-  /* create socket with connection */
   conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s));
   if (!conn) {
-    nns_edge_loge ("Failed to allocate edge connection");
+    nns_edge_loge ("Failed to allocate client data.");
     goto error;
   }
 
   memset (conn, 0, sizeof (nns_edge_conn_s));
-  conn->socket = socket;
-  conn->cancellable = g_cancellable_new ();
+  conn->ip = nns_edge_strdup (ip);
+  conn->port = port;
 
-  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
-  if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
-    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
-    g_clear_error (&err);
+  if (!_nns_edge_connect_socket (conn)) {
     goto error;
   }
 
-  /* Send capability and info to check compatibility. */
-  client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id;
-
-  _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
-  cmd.info.num = 1;
-  cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
-  cmd.mem[0] = eh->caps_str;
+  if (!eh->is_server) {
+    /* Receive capability and client ID from server. */
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+    ret = _nns_edge_cmd_receive (conn, &cmd);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to receive capability.");
+      goto error;
+    }
 
-  ret = _nns_edge_cmd_send (conn, &cmd);
-  if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to send capability.");
-    goto error;
-  }
+    if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
+      nns_edge_loge ("Failed to get capability.");
+      _nns_edge_cmd_clear (&cmd);
+      goto error;
+    }
 
-  /* Receive ip and port from destination. */
-  ret = _nns_edge_cmd_receive (conn, &cmd);
-  if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to receive node info.");
-    goto error;
-  }
+    client_id = eh->client_id = cmd.info.client_id;
 
-  if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
-    nns_edge_loge ("Failed to get host info.");
+    /* Check compatibility. */
+    ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
+        cmd.mem[0], cmd.info.mem_size[0], NULL);
     _nns_edge_cmd_clear (&cmd);
-    goto error;
-  }
 
-  _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
-  _nns_edge_cmd_clear (&cmd);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("The event returns error, capability is not acceptable.");
+      _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+    } else {
+      /* Send ip and port to destination. */
+      _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
 
-  ret = _nns_edge_create_message_thread (eh, conn, client_id);
-  if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to create message handle thread.");
-    goto error;
+      _get_host_str (eh->recv_ip, eh->recv_port, &host);
+      cmd.info.num = 1;
+      cmd.info.mem_size[0] = strlen (host) + 1;
+      cmd.mem[0] = host;
+    }
+
+    ret = _nns_edge_cmd_send (conn, &cmd);
+    _nns_edge_cmd_clear (&cmd);
+
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to send host info.");
+      goto error;
+    }
   }
 
-  conn_data = _nns_edge_add_conn (eh, client_id);
+  conn_data = _nns_edge_add_connection (eh, client_id);
   if (conn_data) {
     /* Close old connection and set new one. */
-    _nns_edge_close_connection (conn_data->src_conn);
-    conn_data->src_conn = conn;
+    _nns_edge_close_connection (conn_data->sink_conn);
+    conn_data->sink_conn = conn;
     done = true;
   }
 
 error:
-  if (done) {
-    if (eh->is_server) {
-      _nns_edge_tcp_connect (eh, connected_ip, connected_port);
-    }
-  } else {
+  if (!done) {
     _nns_edge_close_connection (conn);
+    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
   }
 
-  g_socket_listener_accept_socket_async (socket_listener,
-      eh->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, eh);
-
-  g_free (connected_ip);
+  return NNS_EDGE_ERROR_NONE;
 }
 
 /**
- * @brief Get available port number.
+ * @brief Message thread, receive buffer from the client.
  */
-static int
-_get_available_port (void)
-{
-  struct sockaddr_in sin;
-  int port = 0, sock;
-  socklen_t len = sizeof (struct sockaddr);
-
-  sin.sin_family = AF_INET;
-  sin.sin_addr.s_addr = INADDR_ANY;
-  sock = socket (AF_INET, SOCK_STREAM, 0);
-  sin.sin_port = port;
-  if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) {
-    getsockname (sock, (struct sockaddr *) &sin, &len);
-    port = ntohs (sin.sin_port);
-    nns_edge_logi ("Available port number: %d", port);
-  }
-  close (sock);
-
-  return port;
-}
-
-/**
- * @brief Initialize the nnstreamer edge handle.
- */
-int
-nns_edge_start (nns_edge_h edge_h, bool is_server)
-{
-  GSocketAddress *saddr = NULL;
-  GError *err = NULL;
-  int ret = 0;
-  nns_edge_handle_s *eh;
-
-  eh = (nns_edge_handle_s *) edge_h;
-  if (!eh) {
-    nns_edge_loge ("Invalid param, given edge handle is null.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  nns_edge_lock (eh);
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    nns_edge_unlock (eh);
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  eh->is_server = is_server;
-  if (!is_server && 0 == eh->recv_port)
-    eh->recv_port = _get_available_port ();
-
-  /** Initialize server src data. */
-  eh->cancellable = g_cancellable_new ();
-  eh->listener = g_socket_listener_new ();
-  g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
-
-  if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, eh->cancellable,
-          &saddr)) {
-    nns_edge_loge ("Failed to get socket address");
-    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
-    goto error;
-  }
-  if (!g_socket_listener_add_address (eh->listener, saddr,
-          G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
-    nns_edge_loge ("Failed to add address: %s", err->message);
-    g_clear_error (&err);
-    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
-    goto error;
-  }
-  g_object_unref (saddr);
-  saddr = NULL;
-
-  g_socket_listener_accept_socket_async (eh->listener,
-      eh->cancellable, (GAsyncReadyCallback) accept_socket_async_cb, eh);
-
-error:
-  if (saddr)
-    g_object_unref (saddr);
-
-  nns_edge_unlock (eh);
-  return ret;
-}
-
-/**
- * @brief Release the given handle.
- */
-int
-nns_edge_release_handle (nns_edge_h edge_h)
-{
-  nns_edge_handle_s *eh;
-
-  eh = (nns_edge_handle_s *) edge_h;
-  if (!eh) {
-    nns_edge_loge ("Invalid param, given edge handle is null.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  nns_edge_lock (eh);
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    nns_edge_unlock (eh);
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  eh->magic = NNS_EDGE_MAGIC_DEAD;
-  eh->event_cb = NULL;
-  eh->user_data = NULL;
-  g_free (eh->id);
-  g_free (eh->topic);
-  g_free (eh->ip);
-  g_free (eh->recv_ip);
-  g_free (eh->caps_str);
-  g_hash_table_destroy (eh->conn_table);
-
-  nns_edge_unlock (eh);
-  nns_edge_lock_destroy (eh);
-  g_free (eh);
-
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Set the event callback.
- */
-int
-nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
-    void *user_data)
-{
-  nns_edge_handle_s *eh;
-  int ret;
-
-  eh = (nns_edge_handle_s *) edge_h;
-  if (!eh) {
-    nns_edge_loge ("Invalid param, given edge handle is null.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  nns_edge_lock (eh);
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    nns_edge_unlock (eh);
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED,
-      NULL, 0, NULL);
-  if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to set new event callback.");
-    nns_edge_unlock (eh);
-    return ret;
-  }
-
-  eh->event_cb = cb;
-  eh->user_data = user_data;
-
-  nns_edge_unlock (eh);
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Connect to requested socket using TCP.
- */
-static bool
-_nns_edge_connect_socket (nns_edge_conn_s * conn)
-{
-  GError *err = NULL;
-  GSocketAddress *saddr = NULL;
-  bool ret = false;
-
-  if (!_nns_edge_get_saddr (conn->ip, conn->port, conn->cancellable, &saddr)) {
-    nns_edge_loge ("Failed to get socket address");
-    return ret;
-  }
-
-  /* create sending client socket */
-  conn->socket =
-      g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
-      G_SOCKET_PROTOCOL_TCP, &err);
-
-  if (!conn->socket) {
-    nns_edge_loge ("Failed to create new socket");
-    goto done;
-  }
-
-  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
-  if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
-    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
-    goto done;
-  }
-
-  if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) {
-    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-      nns_edge_logd ("Cancelled connecting");
-    } else {
-      nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port);
-    }
-    goto done;
-  }
-
-  /* now connected to the requested socket */
-  ret = true;
-
-done:
-  g_object_unref (saddr);
-  g_clear_error (&err);
-  return ret;
-}
-
-/**
- * @brief [TCP] Receive buffer from the client
- * @param[in] conn connection info
- */
-static void *
-_message_handler (void *thread_data)
+static void *
+_nns_edge_message_handler (void *thread_data)
 {
   nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data;
   nns_edge_handle_s *eh;
@@ -856,7 +714,10 @@ _message_handler (void *thread_data)
   eh = (nns_edge_handle_s *) _tdata->eh;
   conn = _tdata->conn;
   client_id = _tdata->client_id;
-  g_free (_tdata);
+  SAFE_FREE (_tdata);
+
+  /* Start message thread, add ref to clearly make the socket live. */
+  conn->running = 1;
 
   while (conn->running) {
     nns_edge_data_h data_h;
@@ -868,10 +729,8 @@ _message_handler (void *thread_data)
       break;
     }
 
-    if (!_nns_edge_check_connection (conn))
-      break;
-
     /** Receive data from the client */
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
     ret = _nns_edge_cmd_receive (conn, &cmd);
     if (ret != NNS_EDGE_ERROR_NONE) {
       nns_edge_loge ("Failed to receive data from the connected node.");
@@ -896,16 +755,16 @@ _message_handler (void *thread_data)
     }
 
     /* Set client ID in edge data */
-    val = g_strdup_printf ("%ld", (long int) client_id);
+    val = nns_edge_strdup_printf ("%ld", (long int) client_id);
     nns_edge_data_set_info (data_h, "client_id", val);
-    g_free (val);
+    SAFE_FREE (val);
 
     for (i = 0; i < cmd.info.num; i++) {
       nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL);
     }
 
     ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED,
-        data_h, sizeof (data_h), NULL);
+        data_h, sizeof (nns_edge_data_h), NULL);
     if (ret != NNS_EDGE_ERROR_NONE) {
       /* Try to get next request if server does not accept data from client. */
       nns_edge_logw ("The server does not accept data from client.");
@@ -939,20 +798,20 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
 
    /** Create message receving thread */
   pthread_attr_init (&attr);
-  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
-  conn->running = 1;
+  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
+
   thread_data->eh = eh;
   thread_data->conn = conn;
   thread_data->client_id = client_id;
 
-  tid =
-      pthread_create (&conn->msg_thread, &attr, _message_handler, thread_data);
+  tid = pthread_create (&conn->msg_thread, &attr, _nns_edge_message_handler,
+      thread_data);
   pthread_attr_destroy (&attr);
 
   if (tid < 0) {
     nns_edge_loge ("Failed to create message handler thread.");
     conn->running = 0;
-    g_free (thread_data);
+    SAFE_FREE (thread_data);
     return NNS_EDGE_ERROR_IO;
   }
 
@@ -960,101 +819,187 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
 }
 
 /**
- * @brief Connect to the destination node usig TCP.
+ * @brief Callback for socket listener, accept socket and create message thread.
  */
-static int
-_nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip, int port)
+static void
+_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result,
+    gpointer user_data)
 {
+  GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
+  GSocket *socket = NULL;
+  GError *err = NULL;
+  nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
   nns_edge_conn_s *conn = NULL;
-  nns_edge_conn_data_s *conn_data;
   nns_edge_cmd_s cmd;
-  char *host;
-  int64_t client_id;
   bool done = false;
+  char *connected_ip = NULL;
+  int connected_port = 0;
+  nns_edge_conn_data_s *conn_data = NULL;
+  int64_t client_id;
   int ret;
 
+  socket =
+      g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
+      &err);
+
+  if (!socket) {
+    nns_edge_loge ("Failed to get socket: %s", err->message);
+    g_clear_error (&err);
+    return;
+  }
+  g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC);
+
+  /* create socket with connection */
   conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s));
   if (!conn) {
-    nns_edge_loge ("Failed to allocate client data.");
+    nns_edge_loge ("Failed to allocate edge connection");
     goto error;
   }
 
   memset (conn, 0, sizeof (nns_edge_conn_s));
-  conn->ip = g_strdup (ip);
-  conn->port = port;
-  conn->cancellable = g_cancellable_new ();
+  conn->socket = socket;
 
-  if (!_nns_edge_connect_socket (conn)) {
+  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+  if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
+    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
+    g_clear_error (&err);
     goto error;
   }
 
-  /* Get destination capability. */
-  ret = _nns_edge_cmd_receive (conn, &cmd);
-  if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to receive capability.");
-    goto error;
-  }
+  client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id;
 
-  if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
-    nns_edge_loge ("Failed to get capability.");
-    _nns_edge_cmd_clear (&cmd);
-    goto error;
-  }
+  /* Send capability and info to check compatibility. */
+  if (eh->is_server) {
+    if (!STR_IS_VALID (eh->caps_str)) {
+      nns_edge_loge ("Cannot accept socket, invalid capability.");
+      goto error;
+    }
 
-  client_id = eh->client_id = cmd.info.client_id;
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
+    cmd.info.num = 1;
+    cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
+    cmd.mem[0] = eh->caps_str;
 
-  /* Check compatibility. */
-  ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
-      cmd.mem[0], cmd.info.mem_size[0], NULL);
-  _nns_edge_cmd_clear (&cmd);
+    ret = _nns_edge_cmd_send (conn, &cmd);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to send capability.");
+      goto error;
+    }
 
-  if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("The event returns error, capability is not acceptable.");
-    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
-  } else {
-    /* Send ip and port to destination. */
-    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
+    /* Receive ip and port from destination. */
+    ret = _nns_edge_cmd_receive (conn, &cmd);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to receive node info.");
+      goto error;
+    }
 
-    _get_host_str (eh->recv_ip, eh->recv_port, &host);
-    cmd.info.num = 1;
-    cmd.info.mem_size[0] = strlen (host) + 1;
-    cmd.mem[0] = host;
-  }
+    if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
+      nns_edge_loge ("Failed to get host info.");
+      _nns_edge_cmd_clear (&cmd);
+      goto error;
+    }
+
+    _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
+    _nns_edge_cmd_clear (&cmd);
 
-  ret = _nns_edge_cmd_send (conn, &cmd);
-  _nns_edge_cmd_clear (&cmd);
+    /* Connect to client listener. */
+    ret = _nns_edge_connect_to (eh, client_id, connected_ip, connected_port);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to connect host %s:%d.",
+          connected_ip, connected_port);
+      goto error;
+    }
+  }
 
+  ret = _nns_edge_create_message_thread (eh, conn, client_id);
   if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to send host info.");
+    nns_edge_loge ("Failed to create message handle thread.");
     goto error;
   }
 
-  conn_data = _nns_edge_add_conn (eh, client_id);
+  conn_data = _nns_edge_add_connection (eh, client_id);
   if (conn_data) {
     /* Close old connection and set new one. */
-    _nns_edge_close_connection (conn_data->sink_conn);
-    conn_data->sink_conn = conn;
+    _nns_edge_close_connection (conn_data->src_conn);
+    conn_data->src_conn = conn;
     done = true;
   }
 
 error:
   if (!done) {
     _nns_edge_close_connection (conn);
-    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
   }
 
+  if (eh->listener)
+    g_socket_listener_accept_socket_async (eh->listener, NULL,
+        (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
+
+  SAFE_FREE (connected_ip);
+}
+
+/**
+ * @brief Get registered handle. If not registered, create new handle and register it.
+ */
+int
+nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
+{
+  nns_edge_handle_s *eh;
+
+  if (!STR_IS_VALID (id)) {
+    nns_edge_loge ("Invalid param, given ID is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!STR_IS_VALID (topic)) {
+    nns_edge_loge ("Invalid param, given topic is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!edge_h) {
+    nns_edge_loge ("Invalid param, edge_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /**
+   * @todo manage edge handles
+   * 1. consider adding hash table or list to manage edge handles.
+   * 2. compare topic and return error if existing topic in handle is different.
+   */
+  eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s));
+  if (!eh) {
+    nns_edge_loge ("Failed to allocate memory for edge handle.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  memset (eh, 0, sizeof (nns_edge_handle_s));
+  nns_edge_lock_init (eh);
+  eh->magic = NNS_EDGE_MAGIC;
+  eh->id = nns_edge_strdup (id);
+  eh->topic = nns_edge_strdup (topic);
+  eh->protocol = NNS_EDGE_PROTOCOL_TCP;
+  eh->is_server = true;
+  eh->recv_ip = nns_edge_strdup ("localhost");
+  eh->recv_port = 0;
+  eh->caps_str = NULL;
+
+  /* Connection data for each client ID. */
+  eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
+      _nns_edge_remove_connection);
+
+  *edge_h = eh;
   return NNS_EDGE_ERROR_NONE;
 }
 
 /**
- * @brief Connect to the destination node.
+ * @brief Start the nnstreamer edge.
  */
 int
-nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
-    const char *ip, int port)
+nns_edge_start (nns_edge_h edge_h, bool is_server)
 {
+  GSocketAddress *saddr = NULL;
+  GError *err = NULL;
+  int ret = 0;
   nns_edge_handle_s *eh;
-  int ret;
 
   eh = (nns_edge_handle_s *) edge_h;
   if (!eh) {
@@ -1062,8 +1007,63 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (!ip || *ip == '\0') {
-    nns_edge_loge ("Invalid param, given IP is invalid.");
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  eh->is_server = is_server;
+  if (!is_server && 0 == eh->recv_port) {
+    eh->recv_port = _get_available_port ();
+    if (eh->recv_port <= 0) {
+      nns_edge_loge ("Failed to start edge. Cannot get available port.");
+      nns_edge_unlock (eh);
+      return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    }
+  }
+
+  /** Initialize server src data. */
+  eh->listener = g_socket_listener_new ();
+  g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
+
+  if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, &saddr)) {
+    nns_edge_loge ("Failed to get socket address");
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+  if (!g_socket_listener_add_address (eh->listener, saddr,
+          G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
+    nns_edge_loge ("Failed to add address: %s", err->message);
+    g_clear_error (&err);
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+
+  g_socket_listener_accept_socket_async (eh->listener, NULL,
+      (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
+
+error:
+  if (saddr)
+    g_object_unref (saddr);
+
+  nns_edge_unlock (eh);
+  return ret;
+}
+
+/**
+ * @brief Release the given handle.
+ */
+int
+nns_edge_release_handle (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
@@ -1075,59 +1075,113 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (!eh->event_cb) {
-    nns_edge_loge ("NNStreamer-edge event callback is not registered.");
-    nns_edge_unlock (eh);
-    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+  eh->magic = NNS_EDGE_MAGIC_DEAD;
+  eh->event_cb = NULL;
+  eh->user_data = NULL;
+
+  if (eh->listener)
+    g_clear_object (&eh->listener);
+
+  g_hash_table_destroy (eh->conn_table);
+  eh->conn_table = NULL;
+
+  SAFE_FREE (eh->id);
+  SAFE_FREE (eh->topic);
+  SAFE_FREE (eh->ip);
+  SAFE_FREE (eh->recv_ip);
+  SAFE_FREE (eh->caps_str);
+
+  nns_edge_unlock (eh);
+  nns_edge_lock_destroy (eh);
+  SAFE_FREE (eh);
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Set the event callback.
+ */
+int
+nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
+    void *user_data)
+{
+  nns_edge_handle_s *eh;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  eh->is_server = false;
-  eh->protocol = protocol;
+  nns_edge_lock (eh);
 
-  /** Connect to info channel. */
-  ret = _nns_edge_tcp_connect (eh, ip, port);
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED,
+      NULL, 0, NULL);
   if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to connect to %s:%d", ip, port);
+    nns_edge_loge ("Failed to set new event callback.");
+    nns_edge_unlock (eh);
+    return ret;
   }
 
+  eh->event_cb = cb;
+  eh->user_data = user_data;
+
   nns_edge_unlock (eh);
-  return ret;
+  return NNS_EDGE_ERROR_NONE;
 }
 
 /**
- * @brief Close connection
+ * @brief Connect to the destination node.
  */
-static bool
-_nns_edge_close_connection (nns_edge_conn_s * conn)
+int
+nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
+    const char *ip, int port)
 {
-  GError *err = NULL;
+  nns_edge_handle_s *eh;
+  int ret;
 
-  if (!conn)
-    return false;
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
 
-  if (conn->running) {
-    conn->running = 0;
-    pthread_join (conn->msg_thread, NULL);
+  if (!STR_IS_VALID (ip)) {
+    nns_edge_loge ("Invalid param, given IP is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (conn->socket) {
-    if (!g_socket_close (conn->socket, &err)) {
-      nns_edge_loge ("Failed to close socket: %s", err->message);
-      g_clear_error (&err);
-      return false;
-    }
-    g_object_unref (conn->socket);
-    conn->socket = NULL;
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (conn->cancellable) {
-    g_object_unref (conn->cancellable);
-    conn->cancellable = NULL;
+  if (!eh->event_cb) {
+    nns_edge_loge ("NNStreamer-edge event callback is not registered.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
   }
 
-  g_free (conn->ip);
-  g_free (conn);
-  return true;
+  eh->protocol = protocol;
+
+  /** Connect to info channel. */
+  ret = _nns_edge_connect_to (eh, eh->client_id, ip, port);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to connect to %s:%d", ip, port);
+  }
+
+  nns_edge_unlock (eh);
+  return ret;
 }
 
 /**
@@ -1222,8 +1276,8 @@ nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  conn_data = _nns_edge_get_conn (eh, eh->client_id);
-  if (!_nns_edge_check_connection (conn_data->sink_conn)) {
+  conn_data = _nns_edge_get_connection (eh, eh->client_id);
+  if (!_nns_edge_check_connection (conn_data->sink_conn->socket)) {
     nns_edge_loge ("Failed to request, connection failure.");
     nns_edge_unlock (eh);
     return NNS_EDGE_ERROR_CONNECTION_FAILURE;
@@ -1237,6 +1291,8 @@ nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h)
   }
 
   ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
+  if (ret != NNS_EDGE_ERROR_NONE)
+    nns_edge_loge ("Failed to request, cannot send edge data.");
 
   nns_edge_unlock (eh);
   return ret;
@@ -1331,7 +1387,7 @@ nns_edge_get_topic (nns_edge_h edge_h, char **topic)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  *topic = g_strdup (eh->topic);
+  *topic = nns_edge_strdup (eh->topic);
 
   nns_edge_unlock (eh);
   return NNS_EDGE_ERROR_NONE;
@@ -1352,6 +1408,16 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
+  if (!STR_IS_VALID (key)) {
+    nns_edge_loge ("Invalid param, given key is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!STR_IS_VALID (value)) {
+    nns_edge_loge ("Invalid param, given value is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
   nns_edge_lock (eh);
 
   if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
@@ -1365,17 +1431,17 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
    * @todo Change key-value set as json or hash table.
    */
   if (0 == g_ascii_strcasecmp (key, "CAPS")) {
-    ret_str = g_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value);
-    g_free (eh->caps_str);
+    ret_str = nns_edge_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value);
+    SAFE_FREE (eh->caps_str);
     eh->caps_str = ret_str;
   } else if (0 == g_ascii_strcasecmp (key, "IP")) {
-    g_free (eh->recv_ip);
-    eh->recv_ip = g_strdup (value);
+    SAFE_FREE (eh->recv_ip);
+    eh->recv_ip = nns_edge_strdup (value);
   } else if (0 == g_ascii_strcasecmp (key, "PORT")) {
     eh->recv_port = g_ascii_strtoll (value, NULL, 10);
   } else if (0 == g_ascii_strcasecmp (key, "TOPIC")) {
-    g_free (eh->topic);
-    eh->topic = g_strdup (value);
+    SAFE_FREE (eh->topic);
+    eh->topic = nns_edge_strdup (value);
   } else {
     nns_edge_logw ("Failed to set edge info. Unknown key: %s", key);
   }
@@ -1425,9 +1491,9 @@ nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h)
   }
 
   client_id = g_ascii_strtoll (val, NULL, 10);
-  g_free (val);
+  SAFE_FREE (val);
 
-  conn_data = _nns_edge_get_conn (eh, client_id);
+  conn_data = _nns_edge_get_connection (eh, client_id);
   if (!conn_data) {
     nns_edge_loge ("Cannot find connection, invalid client ID.");
     nns_edge_unlock (eh);
@@ -1442,6 +1508,8 @@ nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h)
   }
 
   ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
+  if (ret != NNS_EDGE_ERROR_NONE)
+    nns_edge_loge ("Failed to respond, cannot send edge data.");
 
   nns_edge_unlock (eh);
   return ret;
index 3dc49dd..69a7c49 100644 (file)
@@ -16,8 +16,8 @@
 
 #include <unistd.h>
 #include <MQTTAsync.h>
-#include "nnstreamer_edge_common.h"
-#include "nnstreamer_edge_internal.h"
+#include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-internal.h"
 
 /**
  * @brief Callback function to be called when the connection is lost.