[Edge] refactoring edge cmd
authorJaeyun <jy1210.jung@samsung.com>
Fri, 24 Jun 2022 10:47:02 +0000 (19:47 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Fri, 1 Jul 2022 02:25:58 +0000 (11:25 +0900)
Refactoring to send/receive edge command and buffer.
- define command info struct to handle command, id, and memories.
- util functions to init, clear edge command.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c

index 53918f7..0896e59 100644 (file)
  */
 typedef enum
 {
-  _NNS_EDGE_CMD_TRANSFER_DATA = 0,
-  _NNS_EDGE_CMD_CLIENT_ID,
-  _NNS_EDGE_CMD_SRC_IP,
-  _NNS_EDGE_CMD_SRC_PORT,
+  _NNS_EDGE_CMD_ERROR = 0,
+  _NNS_EDGE_CMD_TRANSFER_DATA,
+  _NNS_EDGE_CMD_HOST_INFO,
   _NNS_EDGE_CMD_CAPABILITY,
   _NNS_EDGE_CMD_END
 } nns_edge_cmd_e;
 
 /**
- * @brief Structures for tensor query command buffers.
+ * @brief Structure for edge command info. It should be fixed size.
  */
 typedef struct
 {
   nns_edge_cmd_e cmd;
-  union
-  {
-    nns_edge_data_s data;
-    int64_t client_id;
-    int port;
-  };
-} nns_edge_cmd_buf_s;
+  int64_t client_id;
+
+  /* memory info */
+  uint32_t num;
+  size_t mem_size[NNS_EDGE_DATA_LIMIT];
+} nns_edge_cmd_info_s;
+
+/**
+ * @brief Structure for edge command and buffers.
+ */
+typedef struct
+{
+  nns_edge_cmd_info_s info;
+  void *mem[NNS_EDGE_DATA_LIMIT];
+} nns_edge_cmd_s;
 
 /**
  * @brief Data structure for edge TCP connection.
@@ -68,7 +75,7 @@ typedef struct
 } nns_edge_conn_data_s;
 
 /**
- * @brief Structures for thread data of tensor query messge handling.
+ * @brief Structures for thread data of message handling.
  */
 typedef struct
 {
@@ -78,13 +85,203 @@ typedef struct
 } nns_edge_thread_data_s;
 
 static bool _nns_edge_close_connection (nns_edge_conn_s * conn);
-static int _nns_edge_send (nns_edge_conn_s * conn,
-    nns_edge_cmd_buf_s * cmd_buf);
-static int _nns_edge_receive (nns_edge_conn_s * conn,
-    nns_edge_cmd_buf_s * cmd_buf);
 static int _nns_edge_create_message_thread (nns_edge_handle_s * eh,
     nns_edge_conn_s * conn, int64_t client_id);
-static int _nns_edge_tcp_connect (nns_edge_h edge_h, const char *ip, int port);
+static int _nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip,
+    int port);
+
+/**
+ * @brief Send data to connected socket.
+ */
+static bool
+_send_raw_data (GSocket * socket, void *data, size_t size,
+    GCancellable * cancellable)
+{
+  size_t bytes_sent = 0;
+  ssize_t rret;
+  GError *err = NULL;
+
+  while (bytes_sent < size) {
+    rret = g_socket_send (socket, (char *) data + bytes_sent,
+        size - bytes_sent, cancellable, &err);
+
+    if (rret == 0) {
+      nns_edge_loge ("Connection closed.");
+      return false;
+    }
+
+    if (rret < 0) {
+      nns_edge_loge ("Error while sending data (%s).", err->message);
+      g_clear_error (&err);
+      return false;
+    }
+
+    bytes_sent += rret;
+  }
+
+  return true;
+}
+
+/**
+ * @brief Receive data from connected socket.
+ */
+static bool
+_receive_raw_data (GSocket * socket, void *data, size_t size,
+    GCancellable * cancellable)
+{
+  size_t bytes_received = 0;
+  ssize_t rret;
+  GError *err = NULL;
+
+  while (bytes_received < size) {
+    rret = g_socket_receive (socket, (char *) data + bytes_received,
+        size - bytes_received, cancellable, &err);
+
+    if (rret == 0) {
+      nns_edge_loge ("Connection closed.");
+      return false;
+    }
+
+    if (rret < 0) {
+      nns_edge_loge ("Failed to read from socket (%s).", err->message);
+      g_clear_error (&err);
+      return false;
+    }
+
+    bytes_received += rret;
+  }
+
+  return true;
+}
+
+/**
+ * @brief Parse string and get host IP:port.
+ */
+static void
+_parse_host_str (const char *host, char **ip, int *port)
+{
+  char *p = g_strrstr (host, ":");
+
+  if (p) {
+    *ip = g_strndup (host, (p - host));
+    *port = (int) g_ascii_strtoll (p + 1, NULL, 10);
+  }
+}
+
+/**
+ * @brief Get host string (IP:port).
+ */
+static void
+_get_host_str (const char *ip, const int port, char **host)
+{
+  *host = g_strdup_printf ("%s:%d", ip, port);
+}
+
+/**
+ * @brief initialize edge command.
+ */
+static void
+_nns_edge_cmd_init (nns_edge_cmd_s * cmd, nns_edge_cmd_e c, int64_t cid)
+{
+  if (!cmd)
+    return;
+
+  memset (cmd, 0, sizeof (nns_edge_cmd_s));
+  cmd->info.cmd = c;
+  cmd->info.client_id = cid;
+}
+
+/**
+ * @brief Clear allocated memory in edge command.
+ */
+static void
+_nns_edge_cmd_clear (nns_edge_cmd_s * cmd)
+{
+  unsigned int i;
+
+  if (!cmd)
+    return;
+
+  for (i = 0; i < cmd->info.num; i++) {
+    if (cmd->mem[i])
+      free (cmd->mem[i]);
+    cmd->mem[i] = NULL;
+  }
+}
+
+/**
+ * @brief Send edge command to connected device.
+ */
+static int
+_nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
+{
+  unsigned int n;
+
+  if (!conn || !cmd)
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+
+  if (!_send_raw_data (conn->socket, &cmd->info,
+          sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+    nns_edge_loge ("Failed to send command to socket.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  for (n = 0; n < cmd->info.num; n++) {
+    if (!_send_raw_data (conn->socket, cmd->mem[n],
+            cmd->info.mem_size[n], conn->cancellable)) {
+      nns_edge_loge ("Failed to send %uth memory to socket.", n);
+      return NNS_EDGE_ERROR_IO;
+    }
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Receive edge command from connected device.
+ */
+static int
+_nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
+{
+  unsigned int i, n;
+  int ret = NNS_EDGE_ERROR_NONE;
+
+  if (!conn || !cmd)
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+
+  if (!_receive_raw_data (conn->socket, &cmd->info,
+          sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+    nns_edge_loge ("Failed to receive command from socket.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num);
+
+  for (n = 0; n < cmd->info.num; n++) {
+    cmd->mem[n] = malloc (cmd->info.mem_size[n]);
+    if (!cmd->mem[n]) {
+      nns_edge_loge ("Failed to allocate memory to receive data from socket.");
+      ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
+      break;
+    }
+
+    if (!_receive_raw_data (conn->socket, cmd->mem[n],
+            cmd->info.mem_size[n], conn->cancellable)) {
+      nns_edge_loge ("Failed to receive %uth memory from socket.", n++);
+      ret = NNS_EDGE_ERROR_IO;
+      break;
+    }
+  }
+
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    for (i = 0; i < n; i++) {
+      free (cmd->mem[i]);
+      cmd->mem[i] = NULL;
+    }
+  }
+
+  return ret;
+}
 
 /**
  * @brief Internal function to invoke event callback.
@@ -223,7 +420,7 @@ _nns_edge_check_connection (nns_edge_conn_s * conn)
  * @brief Get socket address
  */
 static bool
-_nns_edge_get_saddr (const char *ip, int port,
+_nns_edge_get_saddr (const char *ip, const int port,
     GCancellable * cancellable, GSocketAddress ** saddr)
 {
   GError *err = NULL;
@@ -238,7 +435,7 @@ _nns_edge_get_saddr (const char *ip, int port,
     results = g_resolver_lookup_by_name (resolver, ip, cancellable, &err);
     if (!results) {
       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-        nns_edge_logd ("gst_tensor_query_socket_new: Cancelled name resolval");
+        nns_edge_loge ("Failed to resolve ip, name resolver is cancelled.");
       } else {
         nns_edge_loge ("Failed to resolve ip '%s': %s", ip, err->message);
       }
@@ -322,13 +519,14 @@ accept_socket_async_cb (GObject * source, GAsyncResult * result,
   GSocket *socket = NULL;
   GError *err = NULL;
   nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
-  nns_edge_cmd_buf_s cmd_buf;
   nns_edge_conn_s *conn = NULL;
+  nns_edge_cmd_s cmd;
   bool done = false;
-  gchar *connected_ip = NULL;
-  int connected_port;
+  char *connected_ip = NULL;
+  int connected_port = 0;
   nns_edge_conn_data_s *conn_data = NULL;
   int64_t client_id;
+  int ret;
 
   socket =
       g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
@@ -359,44 +557,38 @@ accept_socket_async_cb (GObject * source, GAsyncResult * result,
     goto error;
   }
 
-  /** Send caps string to check compatibility. */
-  cmd_buf.cmd = _NNS_EDGE_CMD_CAPABILITY;
-  cmd_buf.data.num = 1;
-  cmd_buf.data.data[0].data = eh->caps_str;
-  cmd_buf.data.data[0].data_len = strlen (eh->caps_str) + 1;
-
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_send (conn, &cmd_buf)) {
-    nns_edge_loge ("Failed to send server src caps to client.");
-    goto error;
-  }
-
+  /* Send capability and info to check compatibility. */
   client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id;
-  cmd_buf.cmd = _NNS_EDGE_CMD_CLIENT_ID;
-  cmd_buf.client_id = client_id;
 
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_send (conn, &cmd_buf)) {
-    nns_edge_loge ("Failed to send client id to client");
+  _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
+  cmd.info.num = 1;
+  cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
+  cmd.mem[0] = eh->caps_str;
+
+  ret = _nns_edge_cmd_send (conn, &cmd);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to send capability.");
     goto error;
   }
 
-  /** Receive SRC ip. */
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_receive (conn, &cmd_buf) ||
-      _NNS_EDGE_CMD_SRC_IP != cmd_buf.cmd) {
-    nns_edge_loge ("Failed to get src IP.");
+  /* Receive ip and port from destination. */
+  ret = _nns_edge_cmd_receive (conn, &cmd);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to receive node info.");
     goto error;
   }
-  connected_ip = g_strdup (cmd_buf.data.data[0].data);
-  g_free (cmd_buf.data.data[0].data);
 
-  /** Receive SRC port. */
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_receive (conn, &cmd_buf) ||
-      _NNS_EDGE_CMD_SRC_PORT != cmd_buf.cmd) {
-    nns_edge_loge ("Failed to get src port.");
+  if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
+    nns_edge_loge ("Failed to get host info.");
+    _nns_edge_cmd_clear (&cmd);
     goto error;
   }
-  connected_port = cmd_buf.port;
 
-  if (0 != _nns_edge_create_message_thread (eh, conn, client_id)) {
+  _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
+  _nns_edge_cmd_clear (&cmd);
+
+  ret = _nns_edge_create_message_thread (eh, conn, client_id);
+  if (ret != NNS_EDGE_ERROR_NONE) {
     nns_edge_loge ("Failed to create message handle thread.");
     goto error;
   }
@@ -636,191 +828,6 @@ done:
 }
 
 /**
- * @brief [TCP] send data for tcp server
- */
-static bool
-_send_data (GSocket * socket, uint8_t * data, size_t size,
-    GCancellable * cancellable)
-{
-  size_t bytes_sent = 0;
-  ssize_t rret;
-  GError *err = NULL;
-  while (bytes_sent < size) {
-    rret = g_socket_send (socket, (char *) data + bytes_sent,
-        size - bytes_sent, cancellable, &err);
-    if (rret == 0) {
-      nns_edge_logi ("Connection closed");
-      return false;
-    }
-    if (rret < 0) {
-      nns_edge_loge ("Error while sending data %s", err->message);
-      g_clear_error (&err);
-      return false;
-    }
-    bytes_sent += rret;
-  }
-  return true;
-}
-
-/**
- * @brief [TCP] send data for tcp server
- */
-static int
-_nns_edge_send (nns_edge_conn_s * conn, nns_edge_cmd_buf_s * cmd_buf)
-{
-  unsigned int n;
-
-  if (!_send_data (conn->socket, (uint8_t *) & cmd_buf->cmd,
-          sizeof (nns_edge_cmd_e), conn->cancellable)) {
-    nns_edge_logd ("Failed to send command to socket");
-    return NNS_EDGE_ERROR_IO;
-  }
-
-  if (cmd_buf->cmd == _NNS_EDGE_CMD_TRANSFER_DATA ||
-      cmd_buf->cmd == _NNS_EDGE_CMD_CAPABILITY ||
-      cmd_buf->cmd == _NNS_EDGE_CMD_SRC_IP) {
-    /** Send the number of memory. */
-    if (!_send_data (conn->socket, (uint8_t *) & cmd_buf->data.num,
-            sizeof (cmd_buf->data.num), conn->cancellable)) {
-      nns_edge_loge ("Failed to send the number of memory to socket");
-      return NNS_EDGE_ERROR_IO;
-    }
-    for (n = 0; n < cmd_buf->data.num; n++) {
-      /* send size */
-      if (!_send_data (conn->socket,
-              (uint8_t *) & cmd_buf->data.data[n].data_len,
-              sizeof (cmd_buf->data.data[n].data_len), conn->cancellable)) {
-        nns_edge_loge ("Failed to send size to socket");
-        return NNS_EDGE_ERROR_IO;
-      }
-      /* send data */
-      if (!_send_data (conn->socket,
-              (uint8_t *) cmd_buf->data.data[n].data,
-              cmd_buf->data.data[n].data_len, conn->cancellable)) {
-        nns_edge_loge ("Failed to send data to socket");
-        return NNS_EDGE_ERROR_IO;
-      }
-    }
-  } else if (cmd_buf->cmd == _NNS_EDGE_CMD_CLIENT_ID) {
-    /* send client id */
-    if (!_send_data (conn->socket, (uint8_t *) & cmd_buf->client_id,
-            sizeof (cmd_buf->client_id), conn->cancellable)) {
-      nns_edge_logd ("Failed to send client id to socket");
-      return NNS_EDGE_ERROR_IO;
-    }
-  } else if (cmd_buf->cmd == _NNS_EDGE_CMD_SRC_PORT) {
-    /* send client id */
-    if (!_send_data (conn->socket, (uint8_t *) & cmd_buf->port,
-            sizeof (cmd_buf->port), conn->cancellable)) {
-      nns_edge_logd ("Failed to send client id to socket");
-      return NNS_EDGE_ERROR_IO;
-    }
-  } else {
-    nns_edge_loge ("Not supported command.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief [TCP] receive data from tcp server
- * @return 0 if OK, negative value if error
- */
-static int
-_receive_data (GSocket * socket, uint8_t * data, size_t size,
-    GCancellable * cancellable)
-{
-  size_t bytes_received = 0;
-  ssize_t rret;
-  GError *err = NULL;
-
-  while (bytes_received < size) {
-    rret = g_socket_receive (socket, (char *) data + bytes_received,
-        size - bytes_received, cancellable, &err);
-
-    if (rret == 0) {
-      nns_edge_logi ("Connection closed");
-      return NNS_EDGE_ERROR_IO;
-    }
-
-    if (rret < 0) {
-      nns_edge_logi ("Failed to read from socket: %s", err->message);
-      g_clear_error (&err);
-      return NNS_EDGE_ERROR_IO;
-    }
-    bytes_received += rret;
-  }
-  return 0;
-}
-
-/**
- * @brief receive command from connected device.
- * @return 0 if OK, negative value if error
- */
-static int
-_nns_edge_receive (nns_edge_conn_s * conn, nns_edge_cmd_buf_s * cmd_buf)
-{
-  if (_receive_data (conn->socket, (uint8_t *) & cmd_buf->cmd,
-          sizeof (cmd_buf->cmd), conn->cancellable) < 0) {
-    nns_edge_loge ("Failed to receive command from socket");
-    return NNS_EDGE_ERROR_IO;
-  }
-
-  nns_edge_logd ("Received command: %d", cmd_buf->cmd);
-
-  if (cmd_buf->cmd == _NNS_EDGE_CMD_TRANSFER_DATA ||
-      cmd_buf->cmd == _NNS_EDGE_CMD_CAPABILITY ||
-      cmd_buf->cmd == _NNS_EDGE_CMD_SRC_IP) {
-    unsigned int n;
-
-    /* Receive the number of memory */
-    if (_receive_data (conn->socket, (uint8_t *) & cmd_buf->data.num,
-            sizeof (cmd_buf->data.num), conn->cancellable) < 0) {
-      nns_edge_loge ("Failed to receive data size from socket");
-      return NNS_EDGE_ERROR_IO;
-    }
-    for (n = 0; n < cmd_buf->data.num; n++) {
-      /* receive size */
-      if (_receive_data (conn->socket,
-              (uint8_t *) & cmd_buf->data.data[n].data_len,
-              sizeof (cmd_buf->data.data[n].data_len), conn->cancellable) < 0) {
-        nns_edge_loge ("Failed to receive data size from socket");
-        return NNS_EDGE_ERROR_IO;
-      }
-      cmd_buf->data.data[n].data =
-          (uint8_t *) g_malloc0 (cmd_buf->data.data[n].data_len);
-
-      /* receive data */
-      if (_receive_data (conn->socket,
-              (uint8_t *) cmd_buf->data.data[n].data,
-              cmd_buf->data.data[n].data_len, conn->cancellable) < 0) {
-        nns_edge_loge ("Failed to receive data from socket");
-        return NNS_EDGE_ERROR_IO;
-      }
-    }
-  } else if (cmd_buf->cmd == _NNS_EDGE_CMD_CLIENT_ID) {
-    /* receive client id */
-    if (_receive_data (conn->socket, (uint8_t *) & cmd_buf->client_id,
-            sizeof (cmd_buf->client_id), conn->cancellable) < 0) {
-      nns_edge_logd ("Failed to receive client id from socket");
-      return NNS_EDGE_ERROR_IO;
-    }
-  } else if (cmd_buf->cmd == _NNS_EDGE_CMD_SRC_PORT) {
-    /* receive server src port */
-    if (_receive_data (conn->socket, (uint8_t *) & cmd_buf->port,
-            sizeof (cmd_buf->port), conn->cancellable) < 0) {
-      nns_edge_logd ("Failed to receive sink port from socket");
-      return NNS_EDGE_ERROR_IO;
-    }
-  } else {
-    nns_edge_loge ("Not supported command.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
  * @brief [TCP] Receive buffer from the client
  * @param[in] conn connection info
  */
@@ -830,8 +837,8 @@ _message_handler (void *thread_data)
   nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data;
   nns_edge_handle_s *eh;
   nns_edge_conn_s *conn;
+  nns_edge_cmd_s cmd;
   int64_t client_id;
-  nns_edge_cmd_buf_s cmd_buf;
   char *val;
   int ret;
 
@@ -859,23 +866,26 @@ _message_handler (void *thread_data)
       break;
 
     /** Receive data from the client */
-    if (NNS_EDGE_ERROR_NONE != _nns_edge_receive (conn, &cmd_buf)) {
+    ret = _nns_edge_cmd_receive (conn, &cmd);
+    if (ret != NNS_EDGE_ERROR_NONE) {
       nns_edge_loge ("Failed to receive data from the connected node.");
       break;
     }
 
-    if (cmd_buf.cmd != _NNS_EDGE_CMD_TRANSFER_DATA) {
-      /**
-       * @todo cmd from client
-       * 1. handle other cmd later
-       * 2. release cmd data
-       */
+    if (cmd.info.cmd == _NNS_EDGE_CMD_ERROR) {
+      nns_edge_loge ("Received error, stop msg thread.");
+      _nns_edge_cmd_clear (&cmd);
+      break;
+    } else if (cmd.info.cmd != _NNS_EDGE_CMD_TRANSFER_DATA) {
+      /** @todo handle other cmd later */
+      _nns_edge_cmd_clear (&cmd);
       continue;
     }
 
     ret = nns_edge_data_create (&data_h);
     if (ret != NNS_EDGE_ERROR_NONE) {
       nns_edge_loge ("Failed to create data handle in msg thread.");
+      _nns_edge_cmd_clear (&cmd);
       continue;
     }
 
@@ -884,9 +894,8 @@ _message_handler (void *thread_data)
     nns_edge_data_set_info (data_h, "client_id", val);
     g_free (val);
 
-    for (i = 0; i < cmd_buf.data.num; i++) {
-      nns_edge_data_add (data_h, cmd_buf.data.data[i].data,
-          cmd_buf.data.data[i].data_len, g_free);
+    for (i = 0; i < cmd.info.num; i++) {
+      nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL);
     }
 
     ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED,
@@ -897,6 +906,7 @@ _message_handler (void *thread_data)
     }
 
     nns_edge_data_destroy (data_h);
+    _nns_edge_cmd_clear (&cmd);
   }
 
   conn->running = 0;
@@ -917,7 +927,7 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
   thread_data =
       (nns_edge_thread_data_s *) malloc (sizeof (nns_edge_thread_data_s));
   if (!thread_data) {
-    nns_edge_loge ("Failed to allocate query thread data.");
+    nns_edge_loge ("Failed to allocate edge thread data.");
     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
   }
 
@@ -947,18 +957,16 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
  * @brief Connect to the destination node usig TCP.
  */
 static int
-_nns_edge_tcp_connect (nns_edge_h edge_h, const char *ip, int port)
+_nns_edge_tcp_connect (nns_edge_handle_s * eh, const char *ip, int port)
 {
-  nns_edge_handle_s *eh;
-  nns_edge_cmd_buf_s cmd_buf;
   nns_edge_conn_s *conn = NULL;
-  int64_t client_id;
   nns_edge_conn_data_s *conn_data;
+  nns_edge_cmd_s cmd;
+  char *host;
+  int64_t client_id;
   bool done = false;
   int ret;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
   conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s));
   if (!conn) {
     nns_edge_loge ("Failed to allocate client data.");
@@ -974,42 +982,44 @@ _nns_edge_tcp_connect (nns_edge_h edge_h, const char *ip, int port)
     goto error;
   }
 
-  /* Get server caps string */
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_receive (conn, &cmd_buf) ||
-      _NNS_EDGE_CMD_CAPABILITY != cmd_buf.cmd) {
-    nns_edge_loge ("Failed to get server src caps.");
-    goto error;
-  }
-
-  /** Send server src and sink capability */
-  ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
-      cmd_buf.data.data[0].data, cmd_buf.data.data[0].data_len, g_free);
+  /* Get destination capability. */
+  ret = _nns_edge_cmd_receive (conn, &cmd);
   if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("The server is not accepted.");
+    nns_edge_loge ("Failed to receive capability.");
     goto error;
   }
 
-  /** Get client ID from the server */
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_receive (conn, &cmd_buf) ||
-      _NNS_EDGE_CMD_CLIENT_ID != cmd_buf.cmd) {
-    nns_edge_loge ("Failed to get client from the server.");
+  if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
+    nns_edge_loge ("Failed to get capability.");
+    _nns_edge_cmd_clear (&cmd);
     goto error;
   }
-  client_id = eh->client_id = cmd_buf.client_id;
 
-  /** Send src port.  */
-  cmd_buf.cmd = _NNS_EDGE_CMD_SRC_IP;
-  cmd_buf.data.data[0].data = eh->recv_ip;
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_send (conn, &cmd_buf)) {
-    nns_edge_loge ("Failed to send src IP address.");
-    goto error;
+  client_id = eh->client_id = cmd.info.client_id;
+
+  /* Check compatibility. */
+  ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
+      cmd.mem[0], cmd.info.mem_size[0], NULL);
+  _nns_edge_cmd_clear (&cmd);
+
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("The event returns error, capability is not acceptable.");
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+  } else {
+    /* Send ip and port to destination. */
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
+
+    _get_host_str (eh->recv_ip, eh->recv_port, &host);
+    cmd.info.num = 1;
+    cmd.info.mem_size[0] = strlen (host) + 1;
+    cmd.mem[0] = host;
   }
 
-  /** Send src port.  */
-  cmd_buf.cmd = _NNS_EDGE_CMD_SRC_PORT;
-  cmd_buf.port = eh->recv_port;
-  if (NNS_EDGE_ERROR_NONE != _nns_edge_send (conn, &cmd_buf)) {
-    nns_edge_loge ("Failed to send src port.");
+  ret = _nns_edge_cmd_send (conn, &cmd);
+  _nns_edge_cmd_clear (&cmd);
+
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to send host info.");
     goto error;
   }
 
@@ -1069,7 +1079,7 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
   eh->protocol = protocol;
 
   /** Connect to info channel. */
-  ret = _nns_edge_tcp_connect (edge_h, ip, port);
+  ret = _nns_edge_tcp_connect (eh, ip, port);
   if (ret != NNS_EDGE_ERROR_NONE) {
     nns_edge_loge ("Failed to connect to %s:%d", ip, port);
   }
@@ -1182,8 +1192,8 @@ int
 nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data)
 {
   nns_edge_handle_s *eh;
-  nns_edge_cmd_buf_s cmd_buf;
   nns_edge_conn_data_s *conn_data;
+  nns_edge_cmd_s cmd;
   int ret;
   unsigned int i;
 
@@ -1214,15 +1224,14 @@ nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data)
     return NNS_EDGE_ERROR_CONNECTION_FAILURE;
   }
 
-  cmd_buf.cmd = _NNS_EDGE_CMD_TRANSFER_DATA;
+  _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, eh->client_id);
 
-  nns_edge_data_get_count (data_h, &cmd_buf.data.num);
-  for (i = 0; i < cmd_buf.data.num; i++) {
-    nns_edge_data_get (data_h, i, &cmd_buf.data.data[i].data,
-        &cmd_buf.data.data[i].data_len);
+  nns_edge_data_get_count (data_h, &cmd.info.num);
+  for (i = 0; i < cmd.info.num; i++) {
+    nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]);
   }
 
-  ret = _nns_edge_send (conn_data->sink_conn, &cmd_buf);
+  ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
 
   nns_edge_unlock (eh);
   return ret;
@@ -1379,7 +1388,8 @@ nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h)
 {
   nns_edge_handle_s *eh;
   nns_edge_conn_data_s *conn_data;
-  nns_edge_cmd_buf_s cmd_buf;
+  nns_edge_cmd_s cmd;
+  int64_t client_id;
   char *val;
   int ret;
   unsigned int i;
@@ -1410,24 +1420,24 @@ nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  conn_data = _nns_edge_get_conn (eh, g_ascii_strtoll (val, NULL, 10));
+  client_id = g_ascii_strtoll (val, NULL, 10);
   g_free (val);
 
+  conn_data = _nns_edge_get_conn (eh, client_id);
   if (!conn_data) {
     nns_edge_loge ("Cannot find connection, invalid client ID.");
     nns_edge_unlock (eh);
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  cmd_buf.cmd = _NNS_EDGE_CMD_TRANSFER_DATA;
+  _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, client_id);
 
-  nns_edge_data_get_count (data_h, &cmd_buf.data.num);
-  for (i = 0; i < cmd_buf.data.num; i++) {
-    nns_edge_data_get (data_h, i, &cmd_buf.data.data[i].data,
-        &cmd_buf.data.data[i].data_len);
+  nns_edge_data_get_count (data_h, &cmd.info.num);
+  for (i = 0; i < cmd.info.num; i++) {
+    nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]);
   }
 
-  ret = _nns_edge_send (conn_data->sink_conn, &cmd_buf);
+  ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
 
   nns_edge_unlock (eh);
   return ret;