[Edge] migrate nnstreamer-edge
authorJaeyun <jy1210.jung@samsung.com>
Mon, 27 Jun 2022 10:45:49 +0000 (19:45 +0900)
committerSangjung Woo <again4you@gmail.com>
Fri, 1 Jul 2022 02:07:29 +0000 (11:07 +0900)
Migrate nnstreamer-edge library from nnstreamer repo.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
14 files changed:
CMakeLists.txt
debian/control
debian/nnstreamer-edge-dev.install [new file with mode: 0644]
debian/nnstreamer-edge.install [new file with mode: 0644]
include/nnstreamer-edge.h [new file with mode: 0644]
nnstreamer-edge.pc.in [new file with mode: 0644]
packaging/nnstreamer-edge.spec
src/CMakeLists.txt
src/libnnstreamer-edge/nnstreamer-edge-aitt.c [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-common.c [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-common.h [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-internal.c [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-internal.h [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-mqtt.c [new file with mode: 0644]

index cd05304bcfd022d55bfa8d5ffd661838a50a33a0..604ea83ad798fb4d33f8eed6ebc860449e565ff9 100644 (file)
@@ -8,9 +8,15 @@ IF (NOT DEFINED VERSION)
     SET(VERSION    0.0.1)
 ENDIF()
 
+# GoogleTest requires at least C++11 and match the nnstreamer cpp version.
+SET(CMAKE_CXX_STANDARD 14)
+
 # Set CFLAGS
 SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Werror")
 SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${EXTRA_CFLAGS} -pthread -fPIE -fPIC -g")
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_MQTT=1")
+# TODO FIXME remove glib dependency
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DGLIB_USE_G_MEMDUP2=1")
 
 IF (ENABLE_DEBUG)
     SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DDEBUG=1")
@@ -25,21 +31,22 @@ SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--as-needed")
 SET(SO_VERSION    ${VERSION})
 SET(PREFIX        ${CMAKE_INSTALL_PREFIX})
 SET(EXEC_PREFIX   ${PREFIX}/bin)
+SET(LIB_INSTALL_DIR ${CMAKE_INSTALL_LIBDIR})
 SET(INCLUDE_INSTALL_DIR "${PREFIX}/include")
 
 SET(SOURCE_DIR    ${CMAKE_CURRENT_SOURCE_DIR}/src)
 SET(INCLUDE_DIR   ${CMAKE_CURRENT_SOURCE_DIR}/include)
+SET(NNS_EDGE_SRC_DIR  ${SOURCE_DIR}/libnnstreamer-edge)
 SET(SENSOR_SRC_DIR  ${SOURCE_DIR}/libsensor)
 
 # Check requires packages
-SET(REQUIRES_LIST
-    paho-mqtt-c
-)
+# TODO FIXME remove glib dependency
+SET(REQUIRES_LIST "paho-mqtt-c glib-2.0 gio-2.0")
 
 INCLUDE(FindPkgConfig)
 PKG_CHECK_MODULES(EDGE_REQUIRE_PKGS REQUIRED ${REQUIRES_LIST})
 
-FOREACH(flag ${EDGE__REQUIRE_PKGS_CFLAGS})
+FOREACH(flag ${EDGE_REQUIRE_PKGS_CFLAGS})
     SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} ${flag}")
 ENDFOREACH(flag)
 
@@ -53,6 +60,10 @@ IF (ENABLE_TEST)
 ENDIF()
 
 # pkgconfig file
+CONFIGURE_FILE(nnstreamer-edge.pc.in nnstreamer-edge.pc @ONLY)
+INSTALL(FILES ${CMAKE_CURRENT_BINARY_DIR}/nnstreamer-edge.pc
+    DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig)
+
 CONFIGURE_FILE(nnstreamer-edge-sensor.pc.in nnstreamer-edge-sensor.pc @ONLY)
 INSTALL(FILES ${CMAKE_CURRENT_BINARY_DIR}/nnstreamer-edge-sensor.pc
     DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig)
index 249a39ac5036a5ad928e32bd7f2ee18425d2b98d..4ef5b8d92a8eaff286b03386681a558ba97714b5 100644 (file)
@@ -7,6 +7,20 @@ Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4),
 Standards-Version: 0.0.1
 Homepage: https://github.com/nnstreamer/nnstreamer-edge
 
+Package: nnstreamer-edge
+Architecture: any
+Multi-Arch: same
+Depends: ${shlibs:Depends}, ${misc:Depends}
+Description: nnstreamer-edge library
+ It provides interfaces to support among-device feature, publishing or subscribing raw data and specific messages.
+
+Package: nnstreamer-edge-dev
+Architecture: any
+Multi-Arch: same
+Depends: nnstreamer-edge
+Description: development package for nnstreamer-edge
+ It is a development package for nnstreamer-edge.
+
 Package: nnstreamer-edge-sensor
 Architecture: any
 Multi-Arch: same
diff --git a/debian/nnstreamer-edge-dev.install b/debian/nnstreamer-edge-dev.install
new file mode 100644 (file)
index 0000000..c47950c
--- /dev/null
@@ -0,0 +1,3 @@
+/usr/include/nnstreamer-edge.h
+/usr/lib/*/pkgconfig/nnstreamer-edge.pc
+/usr/lib/*/libnnstreamer-edge.so
diff --git a/debian/nnstreamer-edge.install b/debian/nnstreamer-edge.install
new file mode 100644 (file)
index 0000000..838ac9a
--- /dev/null
@@ -0,0 +1 @@
+/usr/lib/*/libnnstreamer-edge.so.*
diff --git a/include/nnstreamer-edge.h b/include/nnstreamer-edge.h
new file mode 100644 (file)
index 0000000..5c4fd9c
--- /dev/null
@@ -0,0 +1,217 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Gichan Jang <gichan2.jang@samsung.com>
+ *
+ * @file   nnstreamer-edge.h
+ * @date   25 Mar 2022
+ * @brief  Common library to support communication among devices.
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug    No known bugs except for NYI items
+ *
+ * @note This file will be moved to nnstreamer-edge repo. (https://github.com/nnstreamer/nnstreamer-edge)
+ *
+ * @todo Update document and sample code.
+ * 1. Add sample code when the 1st API set is complete - connection, pub/sub, request, ...
+ * 2. Update license when migrating this into edge repo. (Apache-2.0)
+ */
+
+#ifndef __NNSTREAMER_EDGE_H__
+#define __NNSTREAMER_EDGE_H__
+
+#include <errno.h>
+#include <limits.h>
+#include <stddef.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+typedef void *nns_edge_h;
+typedef void *nns_edge_event_h;
+typedef void *nns_edge_data_h;
+
+/**
+ * @brief The maximum number of data instances that nnstreamer-edge data may have.
+ */
+#define NNS_EDGE_DATA_LIMIT (16)
+
+/**
+ * @brief Enumeration for the error codes of nnstreamer-edge.
+ * @todo define detailed error code later (linux standard error, sync with tizen error code)
+ */
+typedef enum {
+  NNS_EDGE_ERROR_NONE = 0,
+  NNS_EDGE_ERROR_INVALID_PARAMETER = -EINVAL,
+  NNS_EDGE_ERROR_OUT_OF_MEMORY = -ENOMEM,
+  NNS_EDGE_ERROR_IO = -EIO,
+  NNS_EDGE_ERROR_CONNECTION_FAILURE = -ECONNREFUSED,
+  NNS_EDGE_ERROR_UNKNOWN = (-1073741824LL),
+  NNS_EDGE_ERROR_NOT_SUPPORTED = (NNS_EDGE_ERROR_UNKNOWN + 2),
+} nns_edge_error_e;
+
+typedef enum {
+  NNS_EDGE_EVENT_UNKNOWN = 0,
+  NNS_EDGE_EVENT_CAPABILITY,
+  NNS_EDGE_EVENT_NEW_DATA_RECEIVED,
+  NNS_EDGE_EVENT_CALLBACK_RELEASED,
+
+  NNS_EDGE_EVENT_CUSTOM = 0x01000000
+} nns_edge_event_e;
+
+typedef enum {
+  NNS_EDGE_PROTOCOL_TCP = 0,
+  NNS_EDGE_PROTOCOL_MQTT,
+  NNS_EDGE_PROTOCOL_AITT,
+  NNS_EDGE_PROTOCOL_AITT_TCP,
+
+  NNS_EDGE_PROTOCOL_MAX
+} nns_edge_protocol_e;
+
+/**
+ * @brief Callback for the nnstreamer edge event.
+ * @note This callback will suspend data stream. Do not spend too much time in the callback.
+ * @return User should return NNS_EDGE_ERROR_NONE if an event is successfully handled.
+ */
+typedef int (*nns_edge_event_cb) (nns_edge_event_h event_h, void *user_data);
+
+/**
+ * @brief Callback called when nnstreamer-edge data is released.
+ */
+typedef void (*nns_edge_data_destroy_cb) (void *data);
+
+/**
+ * @brief Get registered handle. If not registered, create new handle and register it.
+ */
+int nns_edge_create_handle (const char *id, const char *topic, nns_edge_h *edge_h);
+
+/**
+ * @brief Start the nnstreamer edge.
+ */
+int nns_edge_start (nns_edge_h edge_h, bool is_server);
+
+/**
+ * @brief Release the given handle.
+ */
+int nns_edge_release_handle (nns_edge_h edge_h);
+
+/**
+ * @brief Set the event callback.
+ */
+int nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb, void *user_data);
+
+/**
+ * @brief Connect to the destination node.
+ */
+int nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, const char *ip, int port);
+
+/**
+ * @brief Disconnect from the destination node.
+ */
+int nns_edge_disconnect (nns_edge_h edge_h);
+
+/**
+ * @brief Publish a message to a given topic.
+ */
+int nns_edge_publish (nns_edge_h edge_h, nns_edge_data_h data_h);
+
+/**
+ * @brief Request result to the server.
+ */
+int nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data);
+
+/**
+ * @brief Respond to a request.
+ */
+int nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h);
+
+/**
+ * @brief Subscribe a message to a given topic.
+ */
+int nns_edge_subscribe (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data);
+
+/**
+ * @brief Unsubscribe a message to a given topic.
+ */
+int nns_edge_unsubscribe (nns_edge_h edge_h);
+
+/**
+ * @brief Get the topic of edge handle. Caller should release returned string using free().
+ */
+int nns_edge_get_topic (nns_edge_h edge_h, char **topic);
+
+/**
+ * @brief Set nnstreamer edge info.
+ */
+int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value);
+
+/**
+ * @brief Get the nnstreamer edge event type.
+ */
+int nns_edge_event_get_type (nns_edge_event_h event_h, nns_edge_event_e *event);
+
+/**
+ * @brief Parse edge event (NNS_EDGE_EVENT_NEW_DATA_RECEIVED) and get received data.
+ * @note Caller should release returned edge data using nns_edge_data_destroy().
+ */
+int nns_edge_event_parse_new_data (nns_edge_event_h event_h, nns_edge_data_h *data_h);
+
+/**
+ * @brief Parse edge event (NNS_EDGE_EVENT_CAPABILITY) and get capability string.
+ * @note Caller should release returned string using free().
+ */
+int nns_edge_event_parse_capability (nns_edge_event_h event_h, char **capability);
+
+/**
+ * @brief Create nnstreamer edge data.
+ */
+int nns_edge_data_create (nns_edge_data_h *data_h);
+
+/**
+ * @brief Destroy nnstreamer edge data.
+ */
+int nns_edge_data_destroy (nns_edge_data_h data_h);
+
+/**
+ * @brief Validate edge data handle.
+ */
+int nns_edge_data_is_valid (nns_edge_data_h data_h);
+
+/**
+ * @brief Copy edge data and return new handle.
+ */
+int nns_edge_data_copy (nns_edge_data_h data_h, nns_edge_data_h *new_data_h);
+
+/**
+ * @brief Add raw data into nnstreamer edge data.
+ */
+int nns_edge_data_add (nns_edge_data_h data_h, void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb);
+
+/**
+ * @brief Get the nnstreamer edge data.
+ * @note DO NOT release returned data. You should copy the data to another buffer if the returned data is necessary.
+ */
+int nns_edge_data_get (nns_edge_data_h data_h, unsigned int index, void **data, size_t *data_len);
+
+/**
+ * @brief Get the number of nnstreamer edge data.
+ */
+int nns_edge_data_get_count (nns_edge_data_h data_h, unsigned int *count);
+
+/**
+ * @brief Set the information of edge data.
+ */
+int nns_edge_data_set_info (nns_edge_data_h data_h, const char *key, const char *value);
+
+/**
+ * @brief Get the information of edge data. Caller should release the returned value using free().
+ */
+int nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value);
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+#endif /* __NNSTREAMER_EDGE_H__ */
diff --git a/nnstreamer-edge.pc.in b/nnstreamer-edge.pc.in
new file mode 100644 (file)
index 0000000..cf31b4c
--- /dev/null
@@ -0,0 +1,13 @@
+# Package information for pkg-config
+
+prefix=@PREFIX@
+exec_prefix=@EXEC_PREFIX@
+libdir=@LIB_INSTALL_DIR@
+includedir=@INCLUDE_INSTALL_DIR@
+
+Name: nnstreamer-edge
+Description: NNStreamer-Edge library
+Version: @VERSION@
+Requires: @REQUIRES_LIST@
+Libs: -L${libdir} -lnnstreamer-edge
+Cflags: -I${includedir} -pthread
index 888e54030d545aadf1660271273ddf572262116b..5b2e1f85f293e8a723ae5470a42aba7c025c40ec 100644 (file)
@@ -12,6 +12,8 @@ Source1001: nnstreamer-edge.manifest
 
 BuildRequires:  cmake
 BuildRequires:  pkgconfig(paho-mqtt-c)
+# TODO remove glib
+BuildRequires:  glib2-devel
 %if 0%{?sensor_test}
 BuildRequires:  gtest-devel
 %endif
@@ -22,7 +24,13 @@ BuildRequires:       lcov
 
 %description
 nnstreamer-edge provides remote source nodes for NNStreamer pipelines without GStreamer dependencies.
-It also contains communicaton library for sharing server node information & status
+It also contains communicaton library for sharing server node information & status.
+
+%package devel
+Summary: development package for nnstreamer-edge
+Requires: nnstreamer-edge = %{version}-%{release}
+%description devel
+It is a development package for nnstreamer-edge.
 
 %package sensor
 Summary: communication library for edge sensor
@@ -121,6 +129,15 @@ cp -r result %{buildroot}%{_datadir}/nnstreamer-edge/unittest/
 %clean
 rm -rf %{buildroot}
 
+%files
+%manifest nnstreamer-edge.manifest
+%defattr(-,root,root,-)
+%{_libdir}/libnnstreamer-edge.so*
+
+%files devel
+%{_includedir}/nnstreamer-edge.h
+%{_libdir}/pkgconfig/nnstreamer-edge.pc
+
 %files sensor
 %manifest nnstreamer-edge.manifest
 %defattr(-,root,root,-)
index 1dbd8d06f3cbaefc326eb876fd1f3a77c665565d..8f820e98335fd65a36dd0b3ff38af1d15b46398b 100644 (file)
@@ -1,3 +1,19 @@
+# NNStreamer-Edge library
+SET(NNS_EDGE_LIB_NAME nnstreamer-edge)
+SET(NNS_EDGE_SRCS
+    ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-common.c
+    ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-internal.c
+    ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c
+)
+
+ADD_LIBRARY(${NNS_EDGE_LIB_NAME} SHARED ${NNS_EDGE_SRCS})
+SET_TARGET_PROPERTIES(${NNS_EDGE_LIB_NAME} PROPERTIES VERSION ${SO_VERSION})
+TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PRIVATE ${INCLUDE_DIR} ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS})
+TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${EDGE_REQUIRE_PKGS_LDFLAGS})
+
+INSTALL (TARGETS ${NNS_EDGE_LIB_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR})
+INSTALL (FILES ${INCLUDE_DIR}/nnstreamer-edge.h DESTINATION ${INCLUDE_INSTALL_DIR})
+
 # Edge Sensor
 SET(SENSOR_SRCS ${SENSOR_SRC_DIR}/edge_sensor.c)
 ADD_LIBRARY(edge-sensor SHARED ${SENSOR_SRCS})
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-aitt.c b/src/libnnstreamer-edge/nnstreamer-edge-aitt.c
new file mode 100644 (file)
index 0000000..1a1b024
--- /dev/null
@@ -0,0 +1,33 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Gichan Jang <gichan2.jang@samsung.com>
+ *
+ * @file   nnstreamer-edge-aitt.c
+ * @date   28 Mar 2022
+ * @brief  Common library to support communication among devices using aitt.
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug    No known bugs except for NYI items
+ */
+
+#include <aitt_c.h>
+#include "nnstreamer-edge-common.h"
+
+typedef void *nns_edge_aitt_h;
+typedef void *nns_edge_aitt_msg_h;
+typedef void *nns_edge_aitt_sub_h;
+
+/**
+ * @brief Data structure for aitt handle.
+ * @todo Update AITT-related handle later. This is internal struct to manage edge-handle with AITT.
+ */
+typedef struct
+{
+  nns_edge_protocol_e protocol;
+  struct
+  {
+    nns_edge_aitt_h aitt_h;
+    nns_edge_aitt_msg_h msg_h;
+    nns_edge_aitt_sub_h sub_h;
+  };
+} nns_edge_handle_s;
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-common.c b/src/libnnstreamer-edge/nnstreamer-edge-common.c
new file mode 100644 (file)
index 0000000..b3eba68
--- /dev/null
@@ -0,0 +1,472 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-common.c
+ * @date   6 April 2022
+ * @brief  Common util functions for nnstreamer edge.
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug    No known bugs except for NYI items
+ */
+
+#include "nnstreamer-edge-common.h"
+
+/**
+ * @brief Create nnstreamer edge event.
+ */
+int
+nns_edge_event_create (nns_edge_event_e event, nns_edge_event_h * event_h)
+{
+  nns_edge_event_s *ee;
+
+  if (!event_h) {
+    nns_edge_loge ("Invalid param, event_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (event <= NNS_EDGE_EVENT_UNKNOWN) {
+    nns_edge_loge ("Invalid param, given event type is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ee = (nns_edge_event_s *) malloc (sizeof (nns_edge_event_s));
+  if (!ee) {
+    nns_edge_loge ("Failed to allocate memory for edge event.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  memset (ee, 0, sizeof (nns_edge_event_s));
+  ee->magic = NNS_EDGE_MAGIC;
+  ee->event = event;
+
+  *event_h = ee;
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Destroy nnstreamer edge event.
+ */
+int
+nns_edge_event_destroy (nns_edge_event_h event_h)
+{
+  nns_edge_event_s *ee;
+
+  ee = (nns_edge_event_s *) event_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ee)) {
+    nns_edge_loge ("Invalid param, given edge event is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ee->magic = NNS_EDGE_MAGIC_DEAD;
+
+  if (ee->data.destroy_cb)
+    ee->data.destroy_cb (ee->data.data);
+
+  g_free (ee);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Set event data.
+ */
+int
+nns_edge_event_set_data (nns_edge_event_h event_h, void *data, size_t data_len,
+    nns_edge_data_destroy_cb destroy_cb)
+{
+  nns_edge_event_s *ee;
+
+  ee = (nns_edge_event_s *) event_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ee)) {
+    nns_edge_loge ("Invalid param, given edge event is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data || data_len <= 0) {
+    nns_edge_loge ("Invalid param, data should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /* Clear old data and set new one. */
+  if (ee->data.destroy_cb)
+    ee->data.destroy_cb (ee->data.data);
+
+  ee->data.data = data;
+  ee->data.data_len = data_len;
+  ee->data.destroy_cb = destroy_cb;
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Get the nnstreamer edge event type.
+ */
+int
+nns_edge_event_get_type (nns_edge_event_h event_h, nns_edge_event_e * event)
+{
+  nns_edge_event_s *ee;
+
+  ee = (nns_edge_event_s *) event_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ee)) {
+    nns_edge_loge ("Invalid param, given edge event is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!event) {
+    nns_edge_loge ("Invalid param, event should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  *event = ee->event;
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Parse edge event (NNS_EDGE_EVENT_NEW_DATA_RECEIVED) and get received data.
+ * @note Caller should release returned edge data using nns_edge_data_destroy().
+ */
+int
+nns_edge_event_parse_new_data (nns_edge_event_h event_h,
+    nns_edge_data_h * data_h)
+{
+  nns_edge_event_s *ee;
+
+  ee = (nns_edge_event_s *) event_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ee)) {
+    nns_edge_loge ("Invalid param, given edge event is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data_h) {
+    nns_edge_loge ("Invalid param, data_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (ee->event != NNS_EDGE_EVENT_NEW_DATA_RECEIVED) {
+    nns_edge_loge ("The edge event has invalid event type.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  return nns_edge_data_copy ((nns_edge_data_h) ee->data.data, data_h);
+}
+
+/**
+ * @brief Parse edge event (NNS_EDGE_EVENT_CAPABILITY) and get capability string.
+ * @note Caller should release returned string using free().
+ */
+int
+nns_edge_event_parse_capability (nns_edge_event_h event_h, char **capability)
+{
+  nns_edge_event_s *ee;
+
+  ee = (nns_edge_event_s *) event_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ee)) {
+    nns_edge_loge ("Invalid param, given edge event is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!capability) {
+    nns_edge_loge ("Invalid param, capability should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (ee->event != NNS_EDGE_EVENT_CAPABILITY) {
+    nns_edge_loge ("The edge event has invalid event type.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  *capability = g_strdup (ee->data.data);
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Create nnstreamer edge data.
+ */
+int
+nns_edge_data_create (nns_edge_data_h * data_h)
+{
+  nns_edge_data_s *ed;
+
+  if (!data_h) {
+    nns_edge_loge ("Invalid param, data_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ed = (nns_edge_data_s *) malloc (sizeof (nns_edge_data_s));
+  if (!ed) {
+    nns_edge_loge ("Failed to allocate memory for edge data.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  memset (ed, 0, sizeof (nns_edge_data_s));
+  ed->magic = NNS_EDGE_MAGIC;
+  ed->info_table =
+      g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
+
+  *data_h = ed;
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Destroy nnstreamer edge data.
+ */
+int
+nns_edge_data_destroy (nns_edge_data_h data_h)
+{
+  nns_edge_data_s *ed;
+  unsigned int i;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ed->magic = NNS_EDGE_MAGIC_DEAD;
+
+  for (i = 0; i < ed->num; i++) {
+    if (ed->data[i].destroy_cb)
+      ed->data[i].destroy_cb (ed->data[i].data);
+  }
+
+  g_hash_table_destroy (ed->info_table);
+
+  g_free (ed);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Validate edge data handle.
+ */
+int
+nns_edge_data_is_valid (nns_edge_data_h data_h)
+{
+  nns_edge_data_s *ed;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, edge data handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Copy edge data and return new handle.
+ */
+int
+nns_edge_data_copy (nns_edge_data_h data_h, nns_edge_data_h * new_data_h)
+{
+  nns_edge_data_s *ed;
+  nns_edge_data_s *copied;
+  GHashTableIter iter;
+  gpointer key, value;
+  unsigned int i;
+  int ret;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, edge data handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!new_data_h) {
+    nns_edge_loge ("Invalid param, new_data_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = nns_edge_data_create (new_data_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to create new data handle.");
+    return ret;
+  }
+
+  copied = (nns_edge_data_s *) (*new_data_h);
+
+  copied->num = ed->num;
+  for (i = 0; i < ed->num; i++) {
+    copied->data[i].data = _g_memdup (ed->data[i].data, ed->data[i].data_len);
+    copied->data[i].data_len = ed->data[i].data_len;
+    copied->data[i].destroy_cb = g_free;
+  }
+
+  g_hash_table_iter_init (&iter, ed->info_table);
+  while (g_hash_table_iter_next (&iter, &key, &value)) {
+    g_hash_table_insert (copied->info_table, g_strdup (key), g_strdup (value));
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Add raw data into nnstreamer edge data.
+ */
+int
+nns_edge_data_add (nns_edge_data_h data_h, void *data, size_t data_len,
+    nns_edge_data_destroy_cb destroy_cb)
+{
+  nns_edge_data_s *ed;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (ed->num >= NNS_EDGE_DATA_LIMIT) {
+    nns_edge_loge ("Cannot add data, the maximum number of edge data is %d.",
+        NNS_EDGE_DATA_LIMIT);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data || data_len <= 0) {
+    nns_edge_loge ("Invalid param, data should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ed->data[ed->num].data = data;
+  ed->data[ed->num].data_len = data_len;
+  ed->data[ed->num].destroy_cb = destroy_cb;
+  ed->num++;
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Get the nnstreamer edge data.
+ * @note DO NOT release returned data. You should copy the data to another buffer if the returned data is necessary.
+ */
+int
+nns_edge_data_get (nns_edge_data_h data_h, unsigned int index, void **data,
+    size_t *data_len)
+{
+  nns_edge_data_s *ed;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data || !data_len) {
+    nns_edge_loge ("Invalid param, data and len should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (index >= ed->num) {
+    nns_edge_loge
+        ("Invalid param, the number of edge data is %u but requested %uth data.",
+        ed->num, index);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  *data = ed->data[index].data;
+  *data_len = ed->data[index].data_len;
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Get the number of nnstreamer edge data.
+ */
+int
+nns_edge_data_get_count (nns_edge_data_h data_h, unsigned int *count)
+{
+  nns_edge_data_s *ed;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!count) {
+    nns_edge_loge ("Invalid param, count should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  *count = ed->num;
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Set the information of edge data.
+ */
+int
+nns_edge_data_set_info (nns_edge_data_h data_h, const char *key,
+    const char *value)
+{
+  nns_edge_data_s *ed;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!key || *key == '\0') {
+    nns_edge_loge ("Invalid param, given key is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!value || *value == '\0') {
+    nns_edge_loge ("Invalid param, given value is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  g_hash_table_insert (ed->info_table, g_strdup (key), g_strdup (value));
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Get the information of edge data. Caller should release the returned value using free().
+ */
+int
+nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value)
+{
+  nns_edge_data_s *ed;
+  char *val;
+
+  ed = (nns_edge_data_s *) data_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (ed)) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!key || *key == '\0') {
+    nns_edge_loge ("Invalid param, given key is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!value) {
+    nns_edge_loge ("Invalid param, value should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  val = g_hash_table_lookup (ed->info_table, key);
+  if (!val) {
+    nns_edge_loge ("Invalid param, cannot find info about '%s'.", key);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  *value = g_strdup (val);
+
+  return NNS_EDGE_ERROR_NONE;
+}
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-common.h b/src/libnnstreamer-edge/nnstreamer-edge-common.h
new file mode 100644 (file)
index 0000000..892ef4a
--- /dev/null
@@ -0,0 +1,111 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-common.h
+ * @date   6 April 2022
+ * @brief  Common util functions for nnstreamer edge.
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug    No known bugs except for NYI items
+ * @note   This file is internal header for nnstreamer edge utils. DO NOT export this file.
+ */
+
+#ifndef __NNSTREAMER_EDGE_COMMON_H__
+#define __NNSTREAMER_EDGE_COMMON_H__
+
+#include <glib.h> /** @todo remove glib */
+#include <pthread.h>
+#include "nnstreamer-edge.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+/**
+ * @brief Utility to silence unused parameter warning for intentionally unused parameters (e.g., callback functions of a framework)
+ */
+#ifndef UNUSED
+#define UNUSED(expr) do { (void)(expr); } while (0)
+#endif
+
+/**
+ * @brief g_memdup() function replaced by g_memdup2() in glib version >= 2.68
+ */
+#if GLIB_USE_G_MEMDUP2
+#define _g_memdup g_memdup2
+#else
+#define _g_memdup g_memdup
+#endif
+
+#define NNS_EDGE_MAGIC 0xfeedfeed
+#define NNS_EDGE_MAGIC_DEAD 0xdeaddead
+#define NNS_EDGE_MAGIC_IS_VALID(h) ((h) && (h)->magic == NNS_EDGE_MAGIC)
+
+#define nns_edge_lock_init(h) do { pthread_mutex_init (&(h)->lock, NULL); } while (0)
+#define nns_edge_lock_destroy(h) do { pthread_mutex_destroy (&(h)->lock); } while (0)
+#define nns_edge_lock(h) do { pthread_mutex_lock (&(h)->lock); } while (0)
+#define nns_edge_unlock(h) do { pthread_mutex_unlock (&(h)->lock); } while (0)
+
+/**
+ * @brief Internal data structure for raw data.
+ */
+typedef struct {
+  void *data;
+  size_t data_len;
+  nns_edge_data_destroy_cb destroy_cb;
+} nns_edge_raw_data_s;
+
+/**
+ * @brief Internal data structure for edge data.
+ */
+typedef struct {
+  unsigned int magic;
+  unsigned int num;
+  nns_edge_raw_data_s data[NNS_EDGE_DATA_LIMIT];
+  GHashTable *info_table;
+} nns_edge_data_s;
+
+/**
+ * @brief Internal data structure for edge event.
+ */
+typedef struct {
+  unsigned int magic;
+  nns_edge_event_e event;
+  nns_edge_raw_data_s data;
+} nns_edge_event_s;
+
+/**
+ * @todo add log util for nnstreamer-edge.
+ * 1. define tag (e.g., "nnstreamer-edge").
+ * 2. consider macros to print function and line.
+ * 3. new API to get last error.
+ */
+#define nns_edge_logi g_info
+#define nns_edge_logw g_warning
+#define nns_edge_loge g_critical
+#define nns_edge_logd g_debug
+#define nns_edge_logf g_error
+
+/**
+ * @brief Create nnstreamer edge event.
+ * @note This is internal function for edge event.
+ */
+int nns_edge_event_create (nns_edge_event_e event, nns_edge_event_h * event_h);
+
+/**
+ * @brief Destroy nnstreamer edge event.
+ * @note This is internal function for edge event.
+ */
+int nns_edge_event_destroy (nns_edge_event_h event_h);
+
+/**
+ * @brief Set event data.
+ * @note This is internal function for edge event.
+ */
+int nns_edge_event_set_data (nns_edge_event_h event_h, void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb);
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+#endif /* __NNSTREAMER_EDGE_COMMON_H__ */
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c
new file mode 100644 (file)
index 0000000..f507f08
--- /dev/null
@@ -0,0 +1,1437 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-internal.c
+ * @date   6 April 2022
+ * @brief  Common library to support communication among devices.
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug    No known bugs except for NYI items
+ */
+
+#include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-internal.h"
+
+#define N_BACKLOG 10
+#define DEFAULT_TIMEOUT_SEC 10
+#define _STR_NULL(str) ((str) ? (str) : "(NULL)")
+
+/**
+ * @brief enum for nnstreamer edge query commands.
+ */
+typedef enum
+{
+  _NNS_EDGE_CMD_ERROR = 0,
+  _NNS_EDGE_CMD_TRANSFER_DATA,
+  _NNS_EDGE_CMD_HOST_INFO,
+  _NNS_EDGE_CMD_CAPABILITY,
+  _NNS_EDGE_CMD_END
+} nns_edge_cmd_e;
+
+/**
+ * @brief Structure for edge command info. It should be fixed size.
+ */
+typedef struct
+{
+  nns_edge_cmd_e cmd;
+  int64_t client_id;
+
+  /* memory info */
+  uint32_t num;
+  size_t mem_size[NNS_EDGE_DATA_LIMIT];
+} nns_edge_cmd_info_s;
+
+/**
+ * @brief Structure for edge command and buffers.
+ */
+typedef struct
+{
+  nns_edge_cmd_info_s info;
+  void *mem[NNS_EDGE_DATA_LIMIT];
+} nns_edge_cmd_s;
+
+/**
+ * @brief Data structure for edge connection.
+ */
+typedef struct
+{
+  char *ip;
+  int port;
+  int8_t running;
+  pthread_t msg_thread;
+  GSocket *socket;
+  GCancellable *cancellable;
+} nns_edge_conn_s;
+
+/**
+ * @brief Data structure for connection data.
+ */
+typedef struct
+{
+  nns_edge_conn_s *src_conn;
+  nns_edge_conn_s *sink_conn;
+  int64_t id;
+} nns_edge_conn_data_s;
+
+/**
+ * @brief Structures for thread data of message handling.
+ */
+typedef struct
+{
+  nns_edge_handle_s *eh;
+  int64_t client_id;
+  nns_edge_conn_s *conn;
+} nns_edge_thread_data_s;
+
+/**
+ * @brief Send data to connected socket.
+ */
+static bool
+_send_raw_data (GSocket * socket, void *data, size_t size,
+    GCancellable * cancellable)
+{
+  size_t bytes_sent = 0;
+  ssize_t rret;
+  GError *err = NULL;
+
+  while (bytes_sent < size) {
+    rret = g_socket_send (socket, (char *) data + bytes_sent,
+        size - bytes_sent, cancellable, &err);
+
+    if (rret == 0) {
+      nns_edge_loge ("Connection closed.");
+      return false;
+    }
+
+    if (rret < 0) {
+      nns_edge_loge ("Error while sending data (%s).", err->message);
+      g_clear_error (&err);
+      return false;
+    }
+
+    bytes_sent += rret;
+  }
+
+  return true;
+}
+
+/**
+ * @brief Receive data from connected socket.
+ */
+static bool
+_receive_raw_data (GSocket * socket, void *data, size_t size,
+    GCancellable * cancellable)
+{
+  size_t bytes_received = 0;
+  ssize_t rret;
+  GError *err = NULL;
+
+  while (bytes_received < size) {
+    rret = g_socket_receive (socket, (char *) data + bytes_received,
+        size - bytes_received, cancellable, &err);
+
+    if (rret == 0) {
+      nns_edge_loge ("Connection closed.");
+      return false;
+    }
+
+    if (rret < 0) {
+      nns_edge_loge ("Failed to read from socket (%s).", err->message);
+      g_clear_error (&err);
+      return false;
+    }
+
+    bytes_received += rret;
+  }
+
+  return true;
+}
+
+/**
+ * @brief Parse string and get host IP:port.
+ */
+static void
+_parse_host_str (const char *host, char **ip, int *port)
+{
+  char *p = g_strrstr (host, ":");
+
+  if (p) {
+    *ip = g_strndup (host, (p - host));
+    *port = (int) g_ascii_strtoll (p + 1, NULL, 10);
+  }
+}
+
+/**
+ * @brief Get host string (IP:port).
+ */
+static void
+_get_host_str (const char *ip, const int port, char **host)
+{
+  *host = g_strdup_printf ("%s:%d", ip, port);
+}
+
+/**
+ * @brief Get available port number.
+ */
+static int
+_get_available_port (void)
+{
+  struct sockaddr_in sin;
+  int port = 0, sock;
+  socklen_t len = sizeof (struct sockaddr);
+
+  sin.sin_family = AF_INET;
+  sin.sin_addr.s_addr = INADDR_ANY;
+  sock = socket (AF_INET, SOCK_STREAM, 0);
+  sin.sin_port = port;
+  if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) {
+    getsockname (sock, (struct sockaddr *) &sin, &len);
+    port = ntohs (sin.sin_port);
+    nns_edge_logi ("Available port number: %d", port);
+  }
+  close (sock);
+
+  return port;
+}
+
+/**
+ * @brief initialize edge command.
+ */
+static void
+_nns_edge_cmd_init (nns_edge_cmd_s * cmd, nns_edge_cmd_e c, int64_t cid)
+{
+  if (!cmd)
+    return;
+
+  memset (cmd, 0, sizeof (nns_edge_cmd_s));
+  cmd->info.cmd = c;
+  cmd->info.client_id = cid;
+}
+
+/**
+ * @brief Clear allocated memory in edge command.
+ */
+static void
+_nns_edge_cmd_clear (nns_edge_cmd_s * cmd)
+{
+  unsigned int i;
+
+  if (!cmd)
+    return;
+
+  for (i = 0; i < cmd->info.num; i++) {
+    if (cmd->mem[i])
+      free (cmd->mem[i]);
+    cmd->mem[i] = NULL;
+  }
+}
+
+/**
+ * @brief Send edge command to connected device.
+ */
+static int
+_nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
+{
+  unsigned int n;
+
+  if (!conn || !cmd)
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+
+  if (!_send_raw_data (conn->socket, &cmd->info,
+          sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+    nns_edge_loge ("Failed to send command to socket.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  for (n = 0; n < cmd->info.num; n++) {
+    if (!_send_raw_data (conn->socket, cmd->mem[n],
+            cmd->info.mem_size[n], conn->cancellable)) {
+      nns_edge_loge ("Failed to send %uth memory to socket.", n);
+      return NNS_EDGE_ERROR_IO;
+    }
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Receive edge command from connected device.
+ */
+static int
+_nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
+{
+  unsigned int i, n;
+  int ret = NNS_EDGE_ERROR_NONE;
+
+  if (!conn || !cmd)
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+
+  if (!_receive_raw_data (conn->socket, &cmd->info,
+          sizeof (nns_edge_cmd_info_s), conn->cancellable)) {
+    nns_edge_loge ("Failed to receive command from socket.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num);
+
+  for (n = 0; n < cmd->info.num; n++) {
+    cmd->mem[n] = malloc (cmd->info.mem_size[n]);
+    if (!cmd->mem[n]) {
+      nns_edge_loge ("Failed to allocate memory to receive data from socket.");
+      ret = NNS_EDGE_ERROR_OUT_OF_MEMORY;
+      break;
+    }
+
+    if (!_receive_raw_data (conn->socket, cmd->mem[n],
+            cmd->info.mem_size[n], conn->cancellable)) {
+      nns_edge_loge ("Failed to receive %uth memory from socket.", n++);
+      ret = NNS_EDGE_ERROR_IO;
+      break;
+    }
+  }
+
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    for (i = 0; i < n; i++) {
+      free (cmd->mem[i]);
+      cmd->mem[i] = NULL;
+    }
+  }
+
+  return ret;
+}
+
+/**
+ * @brief Internal function to invoke event callback.
+ * @note This function should be called with handle lock.
+ */
+static int
+_nns_edge_invoke_event_cb (nns_edge_handle_s * eh, nns_edge_event_e event,
+    void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb)
+{
+  nns_edge_event_h event_h;
+  int ret;
+
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /* If event callback is null, return ok. */
+  if (!eh->event_cb) {
+    nns_edge_logw ("The event callback is null, do nothing!");
+    return NNS_EDGE_ERROR_NONE;
+  }
+
+  ret = nns_edge_event_create (event, &event_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to create new edge event.");
+    return ret;
+  }
+
+  if (data) {
+    ret = nns_edge_event_set_data (event_h, data, data_len, destroy_cb);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to handle edge event due to invalid event data.");
+      goto error;
+    }
+  }
+
+  ret = eh->event_cb (event_h, eh->user_data);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("The event callback returns error.");
+  }
+
+error:
+  nns_edge_event_destroy (event_h);
+  return ret;
+}
+
+/**
+ * @brief Close connection
+ */
+static bool
+_nns_edge_close_connection (nns_edge_conn_s * conn)
+{
+  GError *err = NULL;
+
+  if (!conn)
+    return false;
+
+  if (conn->running) {
+    conn->running = 0;
+    pthread_join (conn->msg_thread, NULL);
+  }
+
+  if (conn->socket) {
+    if (!g_socket_close (conn->socket, &err)) {
+      nns_edge_loge ("Failed to close socket: %s", err->message);
+      g_clear_error (&err);
+      return false;
+    }
+    g_object_unref (conn->socket);
+    conn->socket = NULL;
+  }
+
+  if (conn->cancellable) {
+    g_object_unref (conn->cancellable);
+    conn->cancellable = NULL;
+  }
+
+  g_free (conn->ip);
+  g_free (conn);
+  return true;
+}
+
+/**
+ * @brief Get nnstreamer-edge connection data.
+ * @note This function should be called with handle lock.
+ */
+static nns_edge_conn_data_s *
+_nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id)
+{
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NULL;
+  }
+
+  return g_hash_table_lookup (eh->conn_table, GUINT_TO_POINTER (client_id));
+}
+
+/**
+ * @brief Get nnstreamer-edge connection data.
+ * @note This function should be called with handle lock.
+ */
+static nns_edge_conn_data_s *
+_nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id)
+{
+  nns_edge_conn_data_s *data = NULL;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NULL;
+  }
+
+  data = g_hash_table_lookup (eh->conn_table, GUINT_TO_POINTER (client_id));
+
+  if (NULL == data) {
+    data = (nns_edge_conn_data_s *) malloc (sizeof (nns_edge_conn_data_s));
+    if (NULL == data) {
+      nns_edge_loge ("Failed to allocate memory for connection data.");
+      return NULL;
+    }
+
+    memset (data, 0, sizeof (nns_edge_conn_data_s));
+    data->id = client_id;
+
+    g_hash_table_insert (eh->conn_table, GUINT_TO_POINTER (client_id), data);
+  }
+
+  return data;
+}
+
+/**
+ * @brief Remove nnstreamer-edge connection data. This will be called when removing connection data from hash table.
+ */
+static void
+_nns_edge_remove_connection (gpointer data)
+{
+  nns_edge_conn_data_s *cdata = (nns_edge_conn_data_s *) data;
+
+  if (cdata) {
+    _nns_edge_close_connection (cdata->src_conn);
+    _nns_edge_close_connection (cdata->sink_conn);
+
+    g_free (cdata);
+  }
+}
+
+/**
+ * @brief Internal function to check connection.
+ */
+static bool
+_nns_edge_check_connection (nns_edge_conn_s * conn)
+{
+  size_t size;
+  GIOCondition condition;
+
+  if (!conn)
+    return false;
+
+  condition = g_socket_condition_check (conn->socket,
+      G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+  size = g_socket_get_available_bytes (conn->socket);
+
+  if (condition && size <= 0) {
+    nns_edge_logw ("Socket is not available, possibly EOS.");
+    return false;
+  }
+
+  return true;
+}
+
+/**
+ * @brief Get socket address
+ */
+static bool
+_nns_edge_get_saddr (const char *ip, const int port,
+    GCancellable * cancellable, GSocketAddress ** saddr)
+{
+  GError *err = NULL;
+  GInetAddress *addr;
+
+  /* look up name if we need to */
+  addr = g_inet_address_new_from_string (ip);
+  if (!addr) {
+    GList *results;
+    GResolver *resolver;
+    resolver = g_resolver_get_default ();
+    results = g_resolver_lookup_by_name (resolver, ip, cancellable, &err);
+    if (!results) {
+      if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+        nns_edge_loge ("Failed to resolve ip, name resolver is cancelled.");
+      } else {
+        nns_edge_loge ("Failed to resolve ip '%s': %s", ip, err->message);
+      }
+      g_clear_error (&err);
+      g_object_unref (resolver);
+      return false;
+    }
+    /** @todo Try with the second address if the first fails */
+    addr = G_INET_ADDRESS (g_object_ref (results->data));
+    g_resolver_free_addresses (results);
+    g_object_unref (resolver);
+  }
+
+  *saddr = g_inet_socket_address_new (addr, port);
+  g_object_unref (addr);
+
+  return true;
+}
+
+/**
+ * @brief Connect to requested socket.
+ */
+static bool
+_nns_edge_connect_socket (nns_edge_conn_s * conn)
+{
+  GError *err = NULL;
+  GSocketAddress *saddr = NULL;
+  bool ret = false;
+
+  if (!_nns_edge_get_saddr (conn->ip, conn->port, conn->cancellable, &saddr)) {
+    nns_edge_loge ("Failed to get socket address");
+    return ret;
+  }
+
+  /* create sending client socket */
+  conn->socket =
+      g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
+      G_SOCKET_PROTOCOL_TCP, &err);
+
+  if (!conn->socket) {
+    nns_edge_loge ("Failed to create new socket");
+    goto done;
+  }
+
+  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+  if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
+    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
+    goto done;
+  }
+
+  if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) {
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      nns_edge_logd ("Cancelled connecting");
+    } else {
+      nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port);
+    }
+    goto done;
+  }
+
+  /* now connected to the requested socket */
+  ret = true;
+
+done:
+  g_object_unref (saddr);
+  g_clear_error (&err);
+  return ret;
+}
+
+/**
+ * @brief Connect to the destination node.
+ */
+static int
+_nns_edge_connect_to (nns_edge_handle_s * eh, const char *ip, int port)
+{
+  nns_edge_conn_s *conn = NULL;
+  nns_edge_conn_data_s *conn_data;
+  nns_edge_cmd_s cmd;
+  char *host;
+  int64_t client_id;
+  bool done = false;
+  int ret;
+
+  conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s));
+  if (!conn) {
+    nns_edge_loge ("Failed to allocate client data.");
+    goto error;
+  }
+
+  memset (conn, 0, sizeof (nns_edge_conn_s));
+  conn->ip = g_strdup (ip);
+  conn->port = port;
+  conn->cancellable = g_cancellable_new ();
+
+  if (!_nns_edge_connect_socket (conn)) {
+    goto error;
+  }
+
+  /* Get destination capability. */
+  ret = _nns_edge_cmd_receive (conn, &cmd);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to receive capability.");
+    goto error;
+  }
+
+  if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
+    nns_edge_loge ("Failed to get capability.");
+    _nns_edge_cmd_clear (&cmd);
+    goto error;
+  }
+
+  client_id = eh->client_id = cmd.info.client_id;
+
+  /* Check compatibility. */
+  ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
+      cmd.mem[0], cmd.info.mem_size[0], NULL);
+  _nns_edge_cmd_clear (&cmd);
+
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("The event returns error, capability is not acceptable.");
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+  } else {
+    /* Send ip and port to destination. */
+    _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
+
+    _get_host_str (eh->recv_ip, eh->recv_port, &host);
+    cmd.info.num = 1;
+    cmd.info.mem_size[0] = strlen (host) + 1;
+    cmd.mem[0] = host;
+  }
+
+  ret = _nns_edge_cmd_send (conn, &cmd);
+  _nns_edge_cmd_clear (&cmd);
+
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to send host info.");
+    goto error;
+  }
+
+  conn_data = _nns_edge_add_connection (eh, client_id);
+  if (conn_data) {
+    /* Close old connection and set new one. */
+    _nns_edge_close_connection (conn_data->sink_conn);
+    conn_data->sink_conn = conn;
+    done = true;
+  }
+
+error:
+  if (!done) {
+    _nns_edge_close_connection (conn);
+    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Message thread, receive buffer from the client.
+ */
+static void *
+_nns_edge_message_handler (void *thread_data)
+{
+  nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data;
+  nns_edge_handle_s *eh;
+  nns_edge_conn_s *conn;
+  nns_edge_cmd_s cmd;
+  int64_t client_id;
+  char *val;
+  int ret;
+
+  if (!_tdata) {
+    nns_edge_loge ("Internal error, thread data is null.");
+    return NULL;
+  }
+
+  eh = (nns_edge_handle_s *) _tdata->eh;
+  conn = _tdata->conn;
+  client_id = _tdata->client_id;
+  g_free (_tdata);
+
+  while (conn->running) {
+    nns_edge_data_h data_h;
+    unsigned int i;
+
+    /* Validate edge handle */
+    if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+      nns_edge_loge ("The edge handle is invalid, it would be expired.");
+      break;
+    }
+
+    if (!_nns_edge_check_connection (conn))
+      break;
+
+    /** Receive data from the client */
+    ret = _nns_edge_cmd_receive (conn, &cmd);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to receive data from the connected node.");
+      break;
+    }
+
+    if (cmd.info.cmd == _NNS_EDGE_CMD_ERROR) {
+      nns_edge_loge ("Received error, stop msg thread.");
+      _nns_edge_cmd_clear (&cmd);
+      break;
+    } else if (cmd.info.cmd != _NNS_EDGE_CMD_TRANSFER_DATA) {
+      /** @todo handle other cmd later */
+      _nns_edge_cmd_clear (&cmd);
+      continue;
+    }
+
+    ret = nns_edge_data_create (&data_h);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to create data handle in msg thread.");
+      _nns_edge_cmd_clear (&cmd);
+      continue;
+    }
+
+    /* Set client ID in edge data */
+    val = g_strdup_printf ("%ld", (long int) client_id);
+    nns_edge_data_set_info (data_h, "client_id", val);
+    g_free (val);
+
+    for (i = 0; i < cmd.info.num; i++) {
+      nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL);
+    }
+
+    ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED,
+        data_h, sizeof (data_h), NULL);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      /* Try to get next request if server does not accept data from client. */
+      nns_edge_logw ("The server does not accept data from client.");
+    }
+
+    nns_edge_data_destroy (data_h);
+    _nns_edge_cmd_clear (&cmd);
+  }
+
+  conn->running = 0;
+  return NULL;
+}
+
+/**
+ * @brief Create message handle thread.
+ */
+static int
+_nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
+    int64_t client_id)
+{
+  pthread_attr_t attr;
+  int tid;
+  nns_edge_thread_data_s *thread_data = NULL;
+
+  thread_data =
+      (nns_edge_thread_data_s *) malloc (sizeof (nns_edge_thread_data_s));
+  if (!thread_data) {
+    nns_edge_loge ("Failed to allocate edge thread data.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+   /** Create message receving thread */
+  pthread_attr_init (&attr);
+  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+  conn->running = 1;
+  thread_data->eh = eh;
+  thread_data->conn = conn;
+  thread_data->client_id = client_id;
+
+  tid = pthread_create (&conn->msg_thread, &attr, _nns_edge_message_handler,
+      thread_data);
+  pthread_attr_destroy (&attr);
+
+  if (tid < 0) {
+    nns_edge_loge ("Failed to create message handler thread.");
+    conn->running = 0;
+    g_free (thread_data);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Callback for socket listener, accept socket and create message thread.
+ */
+static void
+_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result,
+    gpointer user_data)
+{
+  GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
+  GSocket *socket = NULL;
+  GError *err = NULL;
+  nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
+  nns_edge_conn_s *conn = NULL;
+  nns_edge_cmd_s cmd;
+  bool done = false;
+  char *connected_ip = NULL;
+  int connected_port = 0;
+  nns_edge_conn_data_s *conn_data = NULL;
+  int64_t client_id;
+  int ret;
+
+  socket =
+      g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
+      &err);
+
+  if (!socket) {
+    nns_edge_loge ("Failed to get socket: %s", err->message);
+    g_clear_error (&err);
+    goto error;
+  }
+  g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC);
+
+  /* create socket with connection */
+  conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s));
+  if (!conn) {
+    nns_edge_loge ("Failed to allocate edge connection");
+    goto error;
+  }
+
+  memset (conn, 0, sizeof (nns_edge_conn_s));
+  conn->socket = socket;
+  conn->cancellable = g_cancellable_new ();
+
+  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+  if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
+    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
+    g_clear_error (&err);
+    goto error;
+  }
+
+  /* Send capability and info to check compatibility. */
+  client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id;
+
+  _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
+  cmd.info.num = 1;
+  cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
+  cmd.mem[0] = eh->caps_str;
+
+  ret = _nns_edge_cmd_send (conn, &cmd);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to send capability.");
+    goto error;
+  }
+
+  /* Receive ip and port from destination. */
+  ret = _nns_edge_cmd_receive (conn, &cmd);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to receive node info.");
+    goto error;
+  }
+
+  if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
+    nns_edge_loge ("Failed to get host info.");
+    _nns_edge_cmd_clear (&cmd);
+    goto error;
+  }
+
+  _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
+  _nns_edge_cmd_clear (&cmd);
+
+  ret = _nns_edge_create_message_thread (eh, conn, client_id);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to create message handle thread.");
+    goto error;
+  }
+
+  conn_data = _nns_edge_add_connection (eh, client_id);
+  if (conn_data) {
+    /* Close old connection and set new one. */
+    _nns_edge_close_connection (conn_data->src_conn);
+    conn_data->src_conn = conn;
+    done = true;
+  }
+
+error:
+  if (done) {
+    if (eh->is_server) {
+      _nns_edge_connect_to (eh, connected_ip, connected_port);
+    }
+  } else {
+    _nns_edge_close_connection (conn);
+  }
+
+  g_socket_listener_accept_socket_async (socket_listener, eh->cancellable,
+      (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
+
+  g_free (connected_ip);
+}
+
+/**
+ * @brief Get registered handle. If not registered, create new handle and register it.
+ */
+int
+nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
+{
+  nns_edge_handle_s *eh;
+
+  if (!id || *id == '\0') {
+    nns_edge_loge ("Invalid param, given ID is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!topic || *topic == '\0') {
+    nns_edge_loge ("Invalid param, given topic is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!edge_h) {
+    nns_edge_loge ("Invalid param, edge_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /**
+   * @todo manage edge handles
+   * 1. consider adding hash table or list to manage edge handles.
+   * 2. compare topic and return error if existing topic in handle is different.
+   */
+  eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s));
+  if (!eh) {
+    nns_edge_loge ("Failed to allocate memory for edge handle.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  memset (eh, 0, sizeof (nns_edge_handle_s));
+  nns_edge_lock_init (eh);
+  eh->magic = NNS_EDGE_MAGIC;
+  eh->id = g_strdup (id);
+  eh->topic = g_strdup (topic);
+  eh->protocol = NNS_EDGE_PROTOCOL_TCP;
+  eh->is_server = true;
+  eh->recv_ip = g_strdup ("localhost");
+  eh->recv_port = 0;
+  eh->caps_str = NULL;
+
+  /* Connection data for each client ID. */
+  eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
+      _nns_edge_remove_connection);
+
+  *edge_h = eh;
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Initialize the nnstreamer edge handle.
+ */
+int
+nns_edge_start (nns_edge_h edge_h, bool is_server)
+{
+  GSocketAddress *saddr = NULL;
+  GError *err = NULL;
+  int ret = 0;
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  eh->is_server = is_server;
+  if (!is_server && 0 == eh->recv_port)
+    eh->recv_port = _get_available_port ();
+
+  /** Initialize server src data. */
+  eh->cancellable = g_cancellable_new ();
+  eh->listener = g_socket_listener_new ();
+  g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
+
+  if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, eh->cancellable,
+          &saddr)) {
+    nns_edge_loge ("Failed to get socket address");
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+  if (!g_socket_listener_add_address (eh->listener, saddr,
+          G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
+    nns_edge_loge ("Failed to add address: %s", err->message);
+    g_clear_error (&err);
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+  g_object_unref (saddr);
+  saddr = NULL;
+
+  g_socket_listener_accept_socket_async (eh->listener, eh->cancellable,
+      (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
+
+error:
+  if (saddr)
+    g_object_unref (saddr);
+
+  nns_edge_unlock (eh);
+  return ret;
+}
+
+/**
+ * @brief Release the given handle.
+ */
+int
+nns_edge_release_handle (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  eh->magic = NNS_EDGE_MAGIC_DEAD;
+  eh->event_cb = NULL;
+  eh->user_data = NULL;
+  g_free (eh->id);
+  g_free (eh->topic);
+  g_free (eh->ip);
+  g_free (eh->recv_ip);
+  g_hash_table_destroy (eh->conn_table);
+
+  nns_edge_unlock (eh);
+  nns_edge_lock_destroy (eh);
+  g_free (eh);
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Set the event callback.
+ */
+int
+nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
+    void *user_data)
+{
+  nns_edge_handle_s *eh;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED,
+      NULL, 0, NULL);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to set new event callback.");
+    nns_edge_unlock (eh);
+    return ret;
+  }
+
+  eh->event_cb = cb;
+  eh->user_data = user_data;
+
+  nns_edge_unlock (eh);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Connect to the destination node.
+ */
+int
+nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
+    const char *ip, int port)
+{
+  nns_edge_handle_s *eh;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!ip || *ip == '\0') {
+    nns_edge_loge ("Invalid param, given IP is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!eh->event_cb) {
+    nns_edge_loge ("NNStreamer-edge event callback is not registered.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+  }
+
+  eh->is_server = false;
+  eh->protocol = protocol;
+
+  /** Connect to info channel. */
+  ret = _nns_edge_connect_to (eh, ip, port);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to connect to %s:%d", ip, port);
+  }
+
+  nns_edge_unlock (eh);
+  return ret;
+}
+
+/**
+ * @brief Disconnect from the destination node.
+ */
+int
+nns_edge_disconnect (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  g_hash_table_remove_all (eh->conn_table);
+
+  nns_edge_unlock (eh);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Publish a message to a given topic.
+ */
+int
+nns_edge_publish (nns_edge_h edge_h, nns_edge_data_h data_h)
+{
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /** @todo update code (publish data) */
+
+  nns_edge_unlock (eh);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Request result to the server.
+ */
+int
+nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_conn_data_s *conn_data;
+  nns_edge_cmd_s cmd;
+  int ret;
+  unsigned int i;
+
+  UNUSED (user_data);
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  conn_data = _nns_edge_get_connection (eh, eh->client_id);
+  if (!_nns_edge_check_connection (conn_data->sink_conn)) {
+    nns_edge_loge ("Failed to request, connection failure.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+  }
+
+  _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, eh->client_id);
+
+  nns_edge_data_get_count (data_h, &cmd.info.num);
+  for (i = 0; i < cmd.info.num; i++) {
+    nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]);
+  }
+
+  ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
+
+  nns_edge_unlock (eh);
+  return ret;
+}
+
+/**
+ * @brief Subscribe a message to a given topic.
+ */
+int
+nns_edge_subscribe (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data)
+{
+  nns_edge_handle_s *eh;
+
+  UNUSED (user_data);
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /** @todo update code (subscribe) */
+
+  nns_edge_unlock (eh);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Unsubscribe a message to a given topic.
+ */
+int
+nns_edge_unsubscribe (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /** @todo update code (unsubscribe) */
+
+  nns_edge_unlock (eh);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Get the topic of edge handle. Caller should release returned string using free().
+ * @todo is this necessary?
+ */
+int
+nns_edge_get_topic (nns_edge_h edge_h, char **topic)
+{
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!topic) {
+    nns_edge_loge ("Invalid param, topic should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  *topic = g_strdup (eh->topic);
+
+  nns_edge_unlock (eh);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Set nnstreamer edge info.
+ */
+int
+nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
+{
+  nns_edge_handle_s *eh;
+  char *ret_str = NULL;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /**
+   * @todo User handles (replace or append) the capability of edge handle.
+   * @todo Change key-value set as json or hash table.
+   */
+  if (0 == g_ascii_strcasecmp (key, "CAPS")) {
+    ret_str = g_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value);
+    g_free (eh->caps_str);
+    eh->caps_str = ret_str;
+  } else if (0 == g_ascii_strcasecmp (key, "IP")) {
+    g_free (eh->recv_ip);
+    eh->recv_ip = g_strdup (value);
+  } else if (0 == g_ascii_strcasecmp (key, "PORT")) {
+    eh->recv_port = g_ascii_strtoll (value, NULL, 10);
+  } else if (0 == g_ascii_strcasecmp (key, "TOPIC")) {
+    g_free (eh->topic);
+    eh->topic = g_strdup (value);
+  } else {
+    nns_edge_logw ("Failed to set edge info. Unknown key: %s", key);
+  }
+
+  nns_edge_unlock (eh);
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Respond to a request.
+ */
+int
+nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_conn_data_s *conn_data;
+  nns_edge_cmd_s cmd;
+  int64_t client_id;
+  char *val;
+  int ret;
+  unsigned int i;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Invalid param, given edge data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = nns_edge_data_get_info (data_h, "client_id", &val);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Cannot find client ID in edge data.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  client_id = g_ascii_strtoll (val, NULL, 10);
+  g_free (val);
+
+  conn_data = _nns_edge_get_connection (eh, client_id);
+  if (!conn_data) {
+    nns_edge_loge ("Cannot find connection, invalid client ID.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, client_id);
+
+  nns_edge_data_get_count (data_h, &cmd.info.num);
+  for (i = 0; i < cmd.info.num; i++) {
+    nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]);
+  }
+
+  ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
+
+  nns_edge_unlock (eh);
+  return ret;
+}
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.h b/src/libnnstreamer-edge/nnstreamer-edge-internal.h
new file mode 100644 (file)
index 0000000..8e3a928
--- /dev/null
@@ -0,0 +1,99 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-internal.h
+ * @date   11 May 2022
+ * @brief  Internal functions to support communication among devices.
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug    No known bugs except for NYI items
+ * @note   This file is internal header for nnstreamer edge. DO NOT export this file.
+ */
+
+#ifndef __NNSTREAMER_EDGE_INTERNAL_H__
+#define __NNSTREAMER_EDGE_INTERNAL_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+#include "nnstreamer-edge.h"
+#include <gio/gio.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+
+/**
+ * @brief Data structure for edge handle.
+ */
+typedef struct {
+  unsigned int magic;
+  pthread_mutex_t lock;
+  char *id;
+  char *topic;
+  nns_edge_protocol_e protocol;
+  char *ip;
+  int port;
+
+  /* Edge event callback and user data */
+  nns_edge_event_cb event_cb;
+  void *user_data;
+
+  bool is_server;
+  int64_t client_id;
+  char *caps_str;
+  char *recv_ip;
+  int recv_port;
+  GHashTable *conn_table;
+
+  GSocketListener *listener;
+  GCancellable *cancellable;
+
+  /* MQTT */
+  void *mqtt_handle;
+} nns_edge_handle_s;
+
+#if defined(ENABLE_MQTT)
+/**
+ * @brief Connect to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int nns_edge_mqtt_connect (nns_edge_h edge_h);
+
+/**
+ * @brief Close the connection to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int nns_edge_mqtt_close (nns_edge_h edge_h);
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length);
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int nns_edge_mqtt_subscribe (nns_edge_h edge_h);
+#else
+/**
+ * @todo consider to change code style later.
+ * If MQTT is disabled, nnstreamer does not include nnstreamer_edge_mqtt.c, and changing code style will make error as it is not used function now.
+ *
+ * static int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+ * {
+ *   return NNS_EDGE_ERROR_NOT_SUPPORTED;
+ * }
+ */
+#define nns_edge_mqtt_connect(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#endif
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+#endif /* __NNSTREAMER_EDGE_INTERNAL_H__ */
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c
new file mode 100644 (file)
index 0000000..3443672
--- /dev/null
@@ -0,0 +1,340 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-mqtt.c
+ * @date   11 May 2022
+ * @brief  Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library).
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Sangjung Woo <sangjung.woo@samsung.com>
+ * @bug    No known bugs except for NYI items
+ */
+
+#if !defined(ENABLE_MQTT)
+#error "This file can be built with Paho MQTT library."
+#endif
+
+#include <unistd.h>
+#include <MQTTAsync.h>
+#include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-internal.h"
+
+/**
+ * @brief Callback function to be called when the connection is lost.
+ */
+static void
+mqtt_cb_connection_lost (void *context, char *cause)
+{
+  nns_edge_handle_s *eh;
+
+  eh = (nns_edge_handle_s *) context;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return;
+  }
+
+  nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause);
+  if (eh->event_cb) {
+    /** @todo send new event (MQTT disconnected) */
+  }
+}
+
+/**
+ * @brief Callback function to be called when the connection is completed.
+ */
+static void
+mqtt_cb_connection_success (void *context, MQTTAsync_successData * response)
+{
+  nns_edge_handle_s *eh;
+
+  UNUSED (response);
+  eh = (nns_edge_handle_s *) context;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return;
+  }
+
+  nns_edge_logi ("MQTT connection is completed (ID:%s).", eh->id);
+  if (eh->event_cb) {
+    /** @todo send new event (MQTT connected) */
+  }
+}
+
+/**
+ * @brief Callback function to be called when the connection is failed.
+ */
+static void
+mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response)
+{
+  nns_edge_handle_s *eh;
+
+  UNUSED (response);
+  eh = (nns_edge_handle_s *) context;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return;
+  }
+
+  nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id);
+  if (eh->event_cb) {
+    /** @todo send new event (MQTT connection failure) */
+  }
+}
+
+/**
+ * @brief Callback function to be called when the disconnection is completed.
+ */
+static void
+mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response)
+{
+  nns_edge_handle_s *eh;
+
+  UNUSED (response);
+  eh = (nns_edge_handle_s *) context;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return;
+  }
+
+  nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id);
+  if (eh->event_cb) {
+    /** @todo send new event (MQTT disconnected) */
+  }
+}
+
+/**
+ * @brief Callback function to be called when the disconnection is failed.
+ */
+static void
+mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response)
+{
+  nns_edge_handle_s *eh;
+
+  UNUSED (response);
+  eh = (nns_edge_handle_s *) context;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return;
+  }
+
+  nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id);
+  if (eh->event_cb) {
+    /** @todo send new event (MQTT disconnection failure) */
+  }
+}
+
+/**
+ * @brief Callback function to be called when a message is arrived.
+ * @return Return TRUE to prevent delivering the message again.
+ */
+static int
+mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
+    MQTTAsync_message * message)
+{
+  nns_edge_handle_s *eh;
+
+  UNUSED (topic);
+  UNUSED (topic_len);
+  UNUSED (message);
+  eh = (nns_edge_handle_s *) context;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return TRUE;
+  }
+
+  nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
+      eh->id, eh->topic);
+  if (eh->event_cb) {
+    /** @todo send new event (message arrived) */
+  }
+
+  return TRUE;
+}
+
+/**
+ * @brief Connect to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_connect (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  MQTTAsync handle;
+  MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
+  char *url;
+  char *client_id;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+      eh->id, eh->ip, eh->port);
+
+  url = g_strdup_printf ("%s:%d", eh->ip, eh->port);
+  client_id = g_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+
+  ret = MQTTAsync_create (&handle, url, client_id,
+      MQTTCLIENT_PERSISTENCE_NONE, NULL);
+  if (MQTTASYNC_SUCCESS != ret) {
+    nns_edge_loge ("Failed to create MQTT handle.");
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+
+  MQTTAsync_setCallbacks (handle, edge_h,
+      mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL);
+
+  options.cleansession = 1;
+  options.keepAliveInterval = 6;
+  options.onSuccess = mqtt_cb_connection_success;
+  options.onFailure = mqtt_cb_connection_failure;
+  options.context = edge_h;
+
+  if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
+    nns_edge_loge ("Failed to connect MQTT.");
+    MQTTAsync_destroy (&handle);
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+
+  eh->mqtt_handle = handle;
+  ret = NNS_EDGE_ERROR_NONE;
+
+error:
+  g_free (url);
+  g_free (client_id);
+  return ret;
+}
+
+/**
+ * @brief Close the connection to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_close (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  MQTTAsync handle;
+  MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  handle = eh->mqtt_handle;
+
+  if (!handle) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
+      eh->id, eh->ip, eh->port);
+
+  options.onSuccess = mqtt_cb_disconnection_success;
+  options.onFailure = mqtt_cb_disconnection_failure;
+  options.context = edge_h;
+
+  while (MQTTAsync_isConnected (handle)) {
+    if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
+      nns_edge_loge ("Failed to disconnect MQTT.");
+      return NNS_EDGE_ERROR_IO;
+    }
+  }
+
+  MQTTAsync_destroy (&handle);
+  eh->mqtt_handle = NULL;
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+{
+  nns_edge_handle_s *eh;
+  MQTTAsync handle;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data || length <= 0) {
+    nns_edge_loge ("Invalid param, given data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  handle = eh->mqtt_handle;
+
+  if (!handle || !MQTTAsync_isConnected (handle)) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  /* Publish a message (default QoS 1 - at least once and retained true). */
+  ret = MQTTAsync_send (handle, eh->topic, length, data, 1, 1, NULL);
+  if (ret != MQTTASYNC_SUCCESS) {
+    nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
+        eh->id, eh->topic);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  MQTTAsync handle;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  handle = eh->mqtt_handle;
+
+  if (!handle || !MQTTAsync_isConnected (handle)) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  /* Subscribe a topic (default QoS 1 - at least once). */
+  ret = MQTTAsync_subscribe (handle, eh->topic, 1, NULL);
+  if (ret != MQTTASYNC_SUCCESS) {
+    nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
+        eh->id, eh->topic);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}