[MQTT] Separate MQTT implementation
authorgichan <gichan2.jang@samsung.com>
Fri, 14 Oct 2022 02:11:22 +0000 (11:11 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 19 Oct 2022 01:49:20 +0000 (10:49 +0900)
Separate implementation for MQTT into paho-mqtt-c and mosquitto.
Let's use mosquitto lib in environments where the paho lib is not supported, such as tizenRT.

Signed-off-by: gichan <gichan2.jang@samsung.com>
CMakeLists.txt
src/CMakeLists.txt
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-mqtt.c [deleted file]

index 668422d5d6e658dfa029765a13d9ca0c9d7a0d71..679fd27c25930f360d94c49cf2df6e0e1956185c 100644 (file)
@@ -50,7 +50,12 @@ IF(MQTT_SUPPORT)
         FIND_LIBRARY(PAHO_MQTT_LIB NAMES paho-mqtt3a paho-mqtt3c paho-mqtt3as paho-mqtt3cs)
 
         IF(NOT PAHO_MQTT_LIB)
-            MESSAGE(FATAL_ERROR "Cannot find Paho MQTT library.")
+            FIND_LIBRARY(MOSQUITTO_LIB NAMES mosquitto)
+            IF(NOT MOSQUITTO_LIB)
+                MESSAGE("FATAL_ERROR Cannot find paho-mqtt-c and mosquitto library.")
+            ELSE()
+                MESSAGE("FOUND MOSQUITTO LIB.")
+            ENDIF()
         ELSE()
             MESSAGE("Found Paho MQTT library.")
         ENDIF()
index 8920f47be48535cdb7c887758b2e39b4a9d8f8f1..e1354ce6889d10b4e0a4a0b940377b8c5d5b9773 100644 (file)
@@ -10,7 +10,11 @@ SET(NNS_EDGE_SRCS
 )
 
 IF(MQTT_SUPPORT)
-    SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c)
+    IF(PAHO_MQTT_LIB)
+        SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt-paho.c)
+    ELSE()
+        SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt-mosquitto.c)
+    ENDIF()
 ENDIF()
 
 IF(AITT_SUPPORT)
@@ -23,7 +27,11 @@ TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PRIVATE ${INCLUDE_DIR} ${EDGE_RE
 TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${EDGE_REQUIRE_PKGS_LDFLAGS})
 
 IF(MQTT_SUPPORT)
-    TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB})
+    IF(PAHO_MQTT_LIB)
+        TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB})
+    ELSE()
+        TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${MOSQUITTO_LIB})
+    ENDIF()
 ENDIF()
 
 IF(AITT_SUPPORT)
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
new file mode 100644 (file)
index 0000000..291a13e
--- /dev/null
@@ -0,0 +1,94 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-mqtt-mosquitto.c
+ * @date   14 Oct 2022
+ * @brief  Internal functions to support MQTT protocol (mosquitto Library).
+ * @see    https://github.com/nnstreamer/nnstreamer-edge
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug    No known bugs except for NYI items
+ */
+
+#include "nnstreamer-edge-internal.h"
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-queue.h"
+
+/**
+ * @brief Data structure for mqtt broker handle.
+ */
+typedef struct
+{
+  void *mqtt_h;
+  nns_edge_queue_h server_list;
+  char *topic;
+} nns_edge_broker_s;
+
+/**
+ * @brief Connect to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+{
+  UNUSED (edge_h);
+  UNUSED (topic);
+  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Close the connection to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_close (nns_edge_h edge_h)
+{
+  UNUSED (edge_h);
+  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+{
+  UNUSED (edge_h);
+  UNUSED (data);
+  UNUSED (length);
+  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+{
+  UNUSED (edge_h);
+  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Check mqtt connection
+ */
+bool
+nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+{
+  UNUSED (edge_h);
+  return false;
+}
+
+/**
+ * @brief Get message from mqtt broker.
+ */
+int
+nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+{
+  UNUSED (edge_h);
+  UNUSED (msg);
+  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c
new file mode 100644 (file)
index 0000000..49c23ff
--- /dev/null
@@ -0,0 +1,371 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-mqtt-paho.c
+ * @date   11 May 2022
+ * @brief  Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library).
+ * @see    https://github.com/nnstreamer/nnstreamer
+ * @author Sangjung Woo <sangjung.woo@samsung.com>
+ * @bug    No known bugs except for NYI items
+ */
+
+#if !defined(ENABLE_MQTT)
+#error "This file can be built with Paho MQTT library."
+#endif
+
+#include <MQTTAsync.h>
+#include "nnstreamer-edge-internal.h"
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-queue.h"
+
+/**
+ * @brief Data structure for mqtt broker handle.
+ */
+typedef struct
+{
+  void *mqtt_h;
+  nns_edge_queue_h server_list;
+  char *topic;
+} nns_edge_broker_s;
+
+/**
+ * @brief Callback function to be called when a message is arrived.
+ * @return Return TRUE to prevent delivering the message again.
+ */
+static int
+mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
+    MQTTAsync_message * message)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  char *msg = NULL;
+
+  UNUSED (topic);
+  UNUSED (topic_len);
+  eh = (nns_edge_handle_s *) context;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return TRUE;
+  }
+
+  if (0 >= message->payloadlen) {
+    nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
+    return TRUE;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
+  nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
+      eh->id, eh->topic);
+
+  msg = nns_edge_memdup (message->payload, message->payloadlen);
+  if (msg)
+    nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
+
+  return TRUE;
+}
+
+/**
+ * @brief Connect to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
+  int ret = NNS_EDGE_ERROR_NONE;
+  MQTTAsync handle;
+  char *url;
+  char *client_id;
+  unsigned int wait_count;
+
+  if (!STR_IS_VALID (topic)) {
+    nns_edge_loge ("Invalid param, given topic is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+      eh->id, eh->dest_host, eh->dest_port);
+
+  bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
+  if (!bh) {
+    nns_edge_loge ("Failed to allocate memory for broker handle.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
+  client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+
+  ret = MQTTAsync_create (&handle, url, client_id,
+      MQTTCLIENT_PERSISTENCE_NONE, NULL);
+  SAFE_FREE (url);
+  SAFE_FREE (client_id);
+
+  if (MQTTASYNC_SUCCESS != ret) {
+    nns_edge_loge ("Failed to create MQTT handle.");
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+
+  bh->topic = nns_edge_strdup (topic);
+  bh->mqtt_h = handle;
+  nns_edge_queue_create (&bh->server_list);
+  eh->broker_h = bh;
+
+  MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
+
+  options.cleansession = 1;
+  options.keepAliveInterval = 6;
+  options.context = edge_h;
+
+  if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
+    nns_edge_loge ("Failed to connect MQTT.");
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+
+  /* Waiting for the connection */
+  wait_count = 0U;
+  do {
+    if (wait_count > 500U) {
+      ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+      goto error;
+    }
+
+    usleep (10000);
+    wait_count++;
+  } while (!MQTTAsync_isConnected (handle));
+
+  return NNS_EDGE_ERROR_NONE;
+
+error:
+  nns_edge_mqtt_close (eh);
+  return ret;
+}
+
+/**
+ * @brief Close the connection to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_close (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  MQTTAsync handle;
+  MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
+  unsigned int wait_count;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (handle) {
+    nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
+        eh->id, eh->dest_host, eh->dest_port);
+
+    options.context = edge_h;
+
+    /* Clear retained message */
+    MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
+
+    wait_count = 0U;
+    do {
+      if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
+        nns_edge_loge ("Failed to disconnect MQTT.");
+        break;
+      }
+
+      if (wait_count > 500U) {
+        nns_edge_loge ("Failed to disconnect MQTT, timed out.");
+        break;
+      }
+
+      usleep (10000);
+      wait_count++;
+    } while (MQTTAsync_isConnected (handle));
+
+    MQTTAsync_destroy (&handle);
+  }
+
+  nns_edge_queue_destroy (bh->server_list);
+  bh->server_list = NULL;
+
+  SAFE_FREE (bh->topic);
+  SAFE_FREE (bh);
+
+  eh->broker_h = NULL;
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  MQTTAsync handle;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data || length <= 0) {
+    nns_edge_loge ("Invalid param, given data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (!handle) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!MQTTAsync_isConnected (handle)) {
+    nns_edge_loge ("Failed to publish message, MQTT is not connected.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  /* Publish a message (default QoS 1 - at least once and retained true). */
+  ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
+  if (ret != MQTTASYNC_SUCCESS) {
+    nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
+        eh->id, eh->topic);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  MQTTAsync handle;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (!handle) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!MQTTAsync_isConnected (handle)) {
+    nns_edge_loge ("Failed to subscribe, MQTT is not connected.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  /* Subscribe a topic (default QoS 1 - at least once). */
+  ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
+  if (ret != MQTTASYNC_SUCCESS) {
+    nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
+        eh->id, eh->topic);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Check mqtt connection
+ */
+bool
+nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  MQTTAsync handle;
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return false;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (!handle) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return false;
+  }
+
+  if (MQTTAsync_isConnected (handle)) {
+    return true;
+  }
+
+  return false;
+}
+
+/**
+ * @brief Get message from mqtt broker.
+ */
+int
+nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!msg) {
+    nns_edge_loge ("Invalid param, given msg param is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
+  /* Wait for 1 second */
+  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
+    nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c
deleted file mode 100644 (file)
index db743c7..0000000
+++ /dev/null
@@ -1,371 +0,0 @@
-/* SPDX-License-Identifier: Apache-2.0 */
-/**
- * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
- *
- * @file   nnstreamer-edge-mqtt.c
- * @date   11 May 2022
- * @brief  Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library).
- * @see    https://github.com/nnstreamer/nnstreamer
- * @author Sangjung Woo <sangjung.woo@samsung.com>
- * @bug    No known bugs except for NYI items
- */
-
-#if !defined(ENABLE_MQTT)
-#error "This file can be built with Paho MQTT library."
-#endif
-
-#include <MQTTAsync.h>
-#include "nnstreamer-edge-internal.h"
-#include "nnstreamer-edge-log.h"
-#include "nnstreamer-edge-util.h"
-#include "nnstreamer-edge-queue.h"
-
-/**
- * @brief Data structure for mqtt broker handle.
- */
-typedef struct
-{
-  void *mqtt_h;
-  nns_edge_queue_h server_list;
-  char *topic;
-} nns_edge_broker_s;
-
-/**
- * @brief Callback function to be called when a message is arrived.
- * @return Return TRUE to prevent delivering the message again.
- */
-static int
-mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
-    MQTTAsync_message * message)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-  char *msg = NULL;
-
-  UNUSED (topic);
-  UNUSED (topic_len);
-  eh = (nns_edge_handle_s *) context;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return TRUE;
-  }
-
-  if (0 >= message->payloadlen) {
-    nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
-    return TRUE;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-
-  nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
-      eh->id, eh->topic);
-
-  msg = nns_edge_memdup (message->payload, message->payloadlen);
-  if (msg)
-    nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
-
-  return TRUE;
-}
-
-/**
- * @brief Connect to MQTT.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-  MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
-  int ret = NNS_EDGE_ERROR_NONE;
-  MQTTAsync handle;
-  char *url;
-  char *client_id;
-  unsigned int wait_count;
-
-  if (!STR_IS_VALID (topic)) {
-    nns_edge_loge ("Invalid param, given topic is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
-      eh->id, eh->dest_host, eh->dest_port);
-
-  bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
-  if (!bh) {
-    nns_edge_loge ("Failed to allocate memory for broker handle.");
-    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
-  }
-
-  url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
-  client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
-
-  ret = MQTTAsync_create (&handle, url, client_id,
-      MQTTCLIENT_PERSISTENCE_NONE, NULL);
-  SAFE_FREE (url);
-  SAFE_FREE (client_id);
-
-  if (MQTTASYNC_SUCCESS != ret) {
-    nns_edge_loge ("Failed to create MQTT handle.");
-    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
-    goto error;
-  }
-
-  bh->topic = nns_edge_strdup (topic);
-  bh->mqtt_h = handle;
-  nns_edge_queue_create (&bh->server_list);
-  eh->broker_h = bh;
-
-  MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
-
-  options.cleansession = 1;
-  options.keepAliveInterval = 6;
-  options.context = edge_h;
-
-  if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
-    nns_edge_loge ("Failed to connect MQTT.");
-    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
-    goto error;
-  }
-
-  /* Waiting for the connection */
-  wait_count = 0U;
-  do {
-    if (wait_count > 500U) {
-      ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
-      goto error;
-    }
-
-    usleep (10000);
-    wait_count++;
-  } while (!MQTTAsync_isConnected (handle));
-
-  return NNS_EDGE_ERROR_NONE;
-
-error:
-  nns_edge_mqtt_close (eh);
-  return ret;
-}
-
-/**
- * @brief Close the connection to MQTT.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_close (nns_edge_h edge_h)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-  MQTTAsync handle;
-  MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
-  unsigned int wait_count;
-
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-  handle = bh->mqtt_h;
-
-  if (handle) {
-    nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
-        eh->id, eh->dest_host, eh->dest_port);
-
-    options.context = edge_h;
-
-    /* Clear retained message */
-    MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
-
-    wait_count = 0U;
-    do {
-      if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
-        nns_edge_loge ("Failed to disconnect MQTT.");
-        break;
-      }
-
-      if (wait_count > 500U) {
-        nns_edge_loge ("Failed to disconnect MQTT, timed out.");
-        break;
-      }
-
-      usleep (10000);
-      wait_count++;
-    } while (MQTTAsync_isConnected (handle));
-
-    MQTTAsync_destroy (&handle);
-  }
-
-  nns_edge_queue_destroy (bh->server_list);
-  bh->server_list = NULL;
-
-  SAFE_FREE (bh->topic);
-  SAFE_FREE (bh);
-
-  eh->broker_h = NULL;
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Publish raw data.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-  MQTTAsync handle;
-  int ret;
-
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  if (!data || length <= 0) {
-    nns_edge_loge ("Invalid param, given data is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-  handle = bh->mqtt_h;
-
-  if (!handle) {
-    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  if (!MQTTAsync_isConnected (handle)) {
-    nns_edge_loge ("Failed to publish message, MQTT is not connected.");
-    return NNS_EDGE_ERROR_IO;
-  }
-
-  /* Publish a message (default QoS 1 - at least once and retained true). */
-  ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
-  if (ret != MQTTASYNC_SUCCESS) {
-    nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
-        eh->id, eh->topic);
-    return NNS_EDGE_ERROR_IO;
-  }
-
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Subscribe a topic.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_subscribe (nns_edge_h edge_h)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-  MQTTAsync handle;
-  int ret;
-
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-  handle = bh->mqtt_h;
-
-  if (!handle) {
-    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  if (!MQTTAsync_isConnected (handle)) {
-    nns_edge_loge ("Failed to subscribe, MQTT is not connected.");
-    return NNS_EDGE_ERROR_IO;
-  }
-
-  /* Subscribe a topic (default QoS 1 - at least once). */
-  ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
-  if (ret != MQTTASYNC_SUCCESS) {
-    nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
-        eh->id, eh->topic);
-    return NNS_EDGE_ERROR_IO;
-  }
-
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Check mqtt connection
- */
-bool
-nns_edge_mqtt_is_connected (nns_edge_h edge_h)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-  MQTTAsync handle;
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return false;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-  handle = bh->mqtt_h;
-
-  if (!handle) {
-    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
-    return false;
-  }
-
-  if (MQTTAsync_isConnected (handle)) {
-    return true;
-  }
-
-  return false;
-}
-
-/**
- * @brief Get message from mqtt broker.
- */
-int
-nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  if (!msg) {
-    nns_edge_loge ("Invalid param, given msg param is invalid.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-
-  /* Wait for 1 second */
-  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
-    nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
-    return NNS_EDGE_ERROR_UNKNOWN;
-  }
-
-  return NNS_EDGE_ERROR_NONE;
-}