[Edge] Add mqtt-hubrid feature
authorgichan <gichan2.jang@samsung.com>
Fri, 22 Jul 2022 01:24:31 +0000 (10:24 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Thu, 28 Jul 2022 23:55:11 +0000 (08:55 +0900)
Add mqtt-hybrid feature to nns-edge lib.

Signed-off-by: gichan <gichan2.jang@samsung.com>
gst/nnstreamer/tensor_query/nnstreamer-edge-common.h
gst/nnstreamer/tensor_query/nnstreamer-edge-internal.h
gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c
gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c

index 24a98a4..1f003d9 100644 (file)
@@ -22,6 +22,8 @@
 #include <unistd.h>
 #include "nnstreamer-edge.h"
 
+typedef void *nns_edge_broker_h;
+
 #ifdef __cplusplus
 extern "C" {
 #endif /* __cplusplus */
index 55a3fd6..0e66b46 100644 (file)
@@ -33,6 +33,8 @@ typedef struct {
   nns_edge_protocol_e protocol;
   char *ip; /**< host IP */
   int port; /**< host port */
+  char *listener_ip;
+  int listener_port;
 
   /* Edge event callback and user data */
   nns_edge_event_cb event_cb;
@@ -46,7 +48,7 @@ typedef struct {
   GSocketListener *listener;
 
   /* MQTT */
-  void *mqtt_handle;
+  nns_edge_broker_h broker_h;
 } nns_edge_handle_s;
 
 #if defined(ENABLE_MQTT)
@@ -73,6 +75,17 @@ int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int nns_edge_mqtt_subscribe (nns_edge_h edge_h);
+
+/**
+ * @brief Check mqtt connection
+ */
+bool nns_edge_mqtt_is_connected (nns_edge_h edge_h);
+
+/**
+ * @brief Get message from mqtt broker.
+ */
+int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg);
+
 #else
 /**
  * @todo consider to change code style later.
@@ -87,6 +100,8 @@ int nns_edge_mqtt_subscribe (nns_edge_h edge_h);
 #define nns_edge_mqtt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #define nns_edge_mqtt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_is_connected(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #endif
 
 #ifdef __cplusplus
index 2732eb5..57c4926 100644 (file)
@@ -12,6 +12,8 @@
 
 #include "nnstreamer-edge-common.h"
 #include "nnstreamer-edge-internal.h"
+#include <arpa/inet.h>
+#include <ifaddrs.h>
 
 #define N_BACKLOG 10
 #define DEFAULT_TIMEOUT_SEC 10
@@ -360,7 +362,7 @@ _nns_edge_invoke_event_cb (nns_edge_handle_s * eh, nns_edge_event_e event,
 
   /* If event callback is null, return ok. */
   if (!eh->event_cb) {
-    nns_edge_logw ("The event callback is null, do nothing!");
+    nns_edge_logi ("The event callback is null, do nothing!");
     return NNS_EDGE_ERROR_NONE;
   }
 
@@ -633,7 +635,7 @@ _nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id,
       /* Send ip and port to destination. */
       _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
 
-      _get_host_str (eh->ip, eh->port, &host);
+      _get_host_str (eh->listener_ip, eh->listener_port, &host);
       cmd.info.num = 1;
       cmd.info.mem_size[0] = strlen (host) + 1;
       cmd.mem[0] = host;
@@ -700,7 +702,7 @@ _nns_edge_message_handler (void *thread_data)
       break;
     }
 
-    /** Receive data from the client */
+    /* Receive data from the client */
     _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
     ret = _nns_edge_cmd_receive (conn, &cmd);
     if (ret != NNS_EDGE_ERROR_NONE) {
@@ -767,7 +769,7 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
   }
 
-   /** Create message receving thread */
+  /* Create message receving thread */
   pthread_attr_init (&attr);
   pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
 
@@ -932,7 +934,7 @@ nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  /**
+                                    /**
    * @todo manage edge handles
    * 1. consider adding hash table or list to manage edge handles.
    * 2. compare topic and return error if existing topic in handle is different.
@@ -953,6 +955,7 @@ nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
   eh->ip = nns_edge_strdup ("localhost");
   eh->port = 0;
   eh->caps_str = NULL;
+  eh->broker_h = NULL;
 
   /* Connection data for each client ID. */
   eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
@@ -963,6 +966,39 @@ nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h)
 }
 
 /**
+ * @brief Get IP address
+ */
+static gchar *
+_get_ip_address (void)
+{
+  struct ifaddrs *addrs;
+  gchar *ret = NULL;
+
+  getifaddrs (&addrs);
+  while (addrs) {
+    if (addrs->ifa_addr && addrs->ifa_addr->sa_family == AF_INET) {
+      struct sockaddr_in *pAddr = (struct sockaddr_in *) addrs->ifa_addr;
+
+      if (NULL != strstr (addrs->ifa_name, "en") ||
+          NULL != strstr (addrs->ifa_name, "et")) {
+        g_free (ret);
+        ret = g_strdup (inet_ntoa (pAddr->sin_addr));
+        break;
+      }
+    }
+    addrs = addrs->ifa_next;
+  }
+  while ((addrs = addrs->ifa_next));
+
+  freeifaddrs (addrs);
+
+  if (NULL == ret)
+    ret = g_strdup ("localhost");
+
+  return ret;
+}
+
+/**
  * @brief Start the nnstreamer edge.
  */
 int
@@ -987,21 +1023,53 @@ nns_edge_start (nns_edge_h edge_h, bool is_server)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
+  eh->listener_ip = _get_ip_address ();
+  eh->listener_port = nns_edge_get_available_port ();
+
   eh->is_server = is_server;
-  if (!is_server && 0 == eh->port) {
-    eh->port = nns_edge_get_available_port ();
-    if (eh->port <= 0) {
-      nns_edge_loge ("Failed to start edge. Cannot get available port.");
-      nns_edge_unlock (eh);
-      return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+  if (eh->is_server) {
+    if (NNS_EDGE_PROTOCOL_MQTT == eh->protocol) {
+      gchar *device, *topic, *msg;
+
+      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+        nns_edge_loge
+            ("Failed to start nnstreamer-edge. Connection failure to broker.");
+        ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+        goto error;
+      }
+
+      /* @todo Set unique device name.
+       * Device name should be unique. Consider using MAC address later.
+       * Now, use ID received from the user.
+       */
+      device = g_strdup_printf ("device-%s", eh->id);
+      topic = g_strdup_printf ("edge/inference/%s/%s/", device, eh->topic);
+
+      g_free (device);
+      g_free (eh->topic);
+      eh->topic = topic;
+      msg = g_strdup_printf ("%s:%d", eh->listener_ip, eh->listener_port);
+
+      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_publish (eh, msg,
+              strlen (msg) + 1)) {
+        nns_edge_loge ("Failed to publish the meesage: %s", msg);
+        ret = NNS_EDGE_ERROR_IO;
+        g_free (msg);
+        goto error;
+      }
+      g_free (msg);
+    } else { /** case for NNS_EDGE_PROTOCOL_TCP == eh->protocol */
+      g_free (eh->listener_ip);
+      eh->listener_ip = g_strdup (eh->ip);
+      eh->listener_port = eh->port;
     }
   }
 
-  /** Initialize server src data. */
+  /* Initialize server src data. */
   eh->listener = g_socket_listener_new ();
   g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
 
-  if (!_nns_edge_get_saddr (eh->ip, eh->port, &saddr)) {
+  if (!_nns_edge_get_saddr (eh->listener_ip, eh->listener_port, &saddr)) {
     nns_edge_loge ("Failed to get socket address");
     ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
     goto error;
@@ -1047,6 +1115,12 @@ nns_edge_release_handle (nns_edge_h edge_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
+  if (nns_edge_mqtt_is_connected (eh)) {
+    if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
+      nns_edge_logw ("Failed to close mqtt connection.");
+    }
+  }
+
   eh->magic = NNS_EDGE_MAGIC_DEAD;
   eh->event_cb = NULL;
   eh->user_data = NULL;
@@ -1061,6 +1135,7 @@ nns_edge_release_handle (nns_edge_h edge_h)
   SAFE_FREE (eh->topic);
   SAFE_FREE (eh->ip);
   SAFE_FREE (eh->caps_str);
+  SAFE_FREE (eh->listener_ip);
 
   nns_edge_unlock (eh);
   nns_edge_lock_destroy (eh);
@@ -1117,6 +1192,8 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
 {
   nns_edge_handle_s *eh;
   int ret;
+  char *server_ip = NULL;
+  int server_port;
 
   eh = (nns_edge_handle_s *) edge_h;
   if (!eh) {
@@ -1144,11 +1221,53 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol,
   }
 
   eh->protocol = protocol;
+  if (0 != g_ascii_strcasecmp (eh->topic, "TCP_DIRECT")) {
+    gchar *topic, *msg = NULL;
+
+    g_free (eh->ip);
+    eh->ip = g_strdup (ip);
+    eh->port = port;
+    if (!nns_edge_mqtt_is_connected (eh)) {
+      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+        nns_edge_loge ("Connection failure to broker.");
+        nns_edge_unlock (eh);
+        return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+      }
+      topic = g_strdup_printf ("edge/inference/+/%s/#", eh->topic);
+      g_free (eh->topic);
+      eh->topic = topic;
+
+      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_subscribe (eh)) {
+        nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
+        nns_edge_unlock (eh);
+        return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+      }
+    }
+
+    if (NNS_EDGE_ERROR_NONE == nns_edge_mqtt_get_message (eh, &msg)) {
+      gchar **splits;
+      splits = g_strsplit (msg, ":", -1);
+      server_ip = g_strdup (splits[0]);
+      server_port = g_ascii_strtoull (splits[1], NULL, 10);
+      nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
+          server_port);
+
+      g_strfreev (splits);
+      g_free (msg);
+    } else {
+      nns_edge_loge ("Failed to get server IP addr within timeout.");
+      nns_edge_unlock (eh);
+      return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    }
+  } else { /** case for NNS_EDGE_PROTOCOL_TCP == eh->protocol */
+    server_ip = g_strdup (ip);
+    server_port = port;
+  }
 
   /** Connect to info channel. */
-  ret = _nns_edge_connect_to (eh, eh->client_id, ip, port);
+  ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
   if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Failed to connect to %s:%d", ip, port);
+    nns_edge_loge ("Failed to connect to %s:%d", server_ip, server_port);
   }
 
   nns_edge_unlock (eh);
@@ -1210,7 +1329,7 @@ nns_edge_publish (nns_edge_h edge_h, nns_edge_data_h data_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  /** @todo update code (publish data) */
+  /* @todo update code (publish data) */
 
   nns_edge_unlock (eh);
   return NNS_EDGE_ERROR_NONE;
@@ -1296,7 +1415,7 @@ nns_edge_subscribe (nns_edge_h edge_h, nns_edge_data_h data_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  /** @todo update code (subscribe) */
+                                    /** @todo update code (subscribe) */
 
   nns_edge_unlock (eh);
   return NNS_EDGE_ERROR_NONE;
@@ -1324,7 +1443,7 @@ nns_edge_unsubscribe (nns_edge_h edge_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  /** @todo update code (unsubscribe) */
+                                    /** @todo update code (unsubscribe) */
 
   nns_edge_unlock (eh);
   return NNS_EDGE_ERROR_NONE;
@@ -1396,7 +1515,7 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  /**
+  /*
    * @todo User handles (replace or append) the capability of edge handle.
    * @todo Change key-value set as json or hash table.
    */
@@ -1411,6 +1530,8 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
   } else if (0 == strcasecmp (key, "TOPIC")) {
     SAFE_FREE (eh->topic);
     eh->topic = nns_edge_strdup (value);
+  } else if (0 == strcasecmp (key, "PROTOCOL")) {
+    eh->protocol = g_ascii_strtoll (value, NULL, 10);
   } else {
     nns_edge_logw ("Failed to set edge info. Unknown key: %s", key);
   }
@@ -1451,7 +1572,7 @@ nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  /**
+  /*
    * @todo User handles (replace or append) the capability of edge handle.
    * @todo Change key-value set as json or hash table.
    */
index 69a7c49..07ce98a 100644 (file)
@@ -13,6 +13,7 @@
 #if !defined(ENABLE_MQTT)
 #error "This file can be built with Paho MQTT library."
 #endif
+#define DEFAULT_SUB_TIMEOUT 1000000 /** 1 second */
 
 #include <unistd.h>
 #include <MQTTAsync.h>
 #include "nnstreamer-edge-internal.h"
 
 /**
+ * @brief Data structure for mqtt broker handle.
+ */
+typedef struct
+{
+  void *mqtt_h;
+  GAsyncQueue *server_list;
+  GMutex mqtt_mutex;
+  GCond mqtt_gcond;
+  gboolean mqtt_is_connected;
+} nns_edge_broker_s;
+
+/**
  * @brief Callback function to be called when the connection is lost.
  */
 static void
 mqtt_cb_connection_lost (void *context, char *cause)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
 
   eh = (nns_edge_handle_s *) context;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return;
   }
 
+  bh = (nns_edge_broker_s *) eh->broker_h;
   nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause);
+  g_mutex_lock (&bh->mqtt_mutex);
+  bh->mqtt_is_connected = FALSE;
+  g_cond_broadcast (&bh->mqtt_gcond);
+  g_mutex_unlock (&bh->mqtt_mutex);
+
   if (eh->event_cb) {
     /** @todo send new event (MQTT disconnected) */
   }
@@ -47,16 +67,23 @@ static void
 mqtt_cb_connection_success (void *context, MQTTAsync_successData * response)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
 
   UNUSED (response);
   eh = (nns_edge_handle_s *) context;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return;
   }
 
-  nns_edge_logi ("MQTT connection is completed (ID:%s).", eh->id);
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
+  g_mutex_lock (&bh->mqtt_mutex);
+  bh->mqtt_is_connected = TRUE;
+  g_cond_broadcast (&bh->mqtt_gcond);
+  g_mutex_unlock (&bh->mqtt_mutex);
+
   if (eh->event_cb) {
     /** @todo send new event (MQTT connected) */
   }
@@ -69,16 +96,24 @@ static void
 mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
 
   UNUSED (response);
   eh = (nns_edge_handle_s *) context;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return;
   }
 
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
   nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id);
+  g_mutex_lock (&bh->mqtt_mutex);
+  bh->mqtt_is_connected = FALSE;
+  g_cond_broadcast (&bh->mqtt_gcond);
+  g_mutex_unlock (&bh->mqtt_mutex);
+
   if (eh->event_cb) {
     /** @todo send new event (MQTT connection failure) */
   }
@@ -91,16 +126,24 @@ static void
 mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
 
   UNUSED (response);
   eh = (nns_edge_handle_s *) context;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return;
   }
 
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
   nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id);
+  g_mutex_lock (&bh->mqtt_mutex);
+  bh->mqtt_is_connected = FALSE;
+  g_cond_broadcast (&bh->mqtt_gcond);
+  g_mutex_unlock (&bh->mqtt_mutex);
+
   if (eh->event_cb) {
     /** @todo send new event (MQTT disconnected) */
   }
@@ -137,19 +180,33 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
     MQTTAsync_message * message)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  char *msg = NULL;
 
   UNUSED (topic);
   UNUSED (topic_len);
   UNUSED (message);
   eh = (nns_edge_handle_s *) context;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return TRUE;
   }
 
+  if (0 >= message->payloadlen) {
+    nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
+    return TRUE;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
   nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
       eh->id, eh->topic);
+
+  msg = (char *) malloc (message->payloadlen);
+  memcpy (msg, message->payload, message->payloadlen);
+  g_async_queue_push (bh->server_list, msg);
+
   if (eh->event_cb) {
     /** @todo send new event (message arrived) */
   }
@@ -165,11 +222,13 @@ int
 nns_edge_mqtt_connect (nns_edge_h edge_h)
 {
   nns_edge_handle_s *eh;
-  MQTTAsync handle;
+  nns_edge_broker_s *bh;
   MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
+  int ret = NNS_EDGE_ERROR_NONE;
+  int64_t end_time;
+  MQTTAsync handle;
   char *url;
   char *client_id;
-  int ret;
 
   eh = (nns_edge_handle_s *) edge_h;
 
@@ -178,8 +237,13 @@ nns_edge_mqtt_connect (nns_edge_h edge_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
-      eh->id, eh->ip, eh->port);
+  bh = (nns_edge_broker_s *) malloc (sizeof (nns_edge_broker_s));
+  if (!bh) {
+    nns_edge_loge ("Failed to allocate memory for broker handle.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  memset (bh, 0, sizeof (nns_edge_broker_s));
 
   url = g_strdup_printf ("%s:%d", eh->ip, eh->port);
   client_id = g_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
@@ -192,6 +256,24 @@ nns_edge_mqtt_connect (nns_edge_h edge_h)
     goto error;
   }
 
+  g_cond_init (&bh->mqtt_gcond);
+  g_mutex_init (&bh->mqtt_mutex);
+  bh->mqtt_is_connected = FALSE;
+  bh->mqtt_h = handle;
+  bh->server_list = g_async_queue_new ();
+  eh->broker_h = bh;
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  if (!bh->mqtt_h) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    ret = NNS_EDGE_ERROR_IO;
+    goto error;
+  }
+  handle = bh->mqtt_h;
+
+  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+      eh->id, eh->ip, eh->port);
+
   MQTTAsync_setCallbacks (handle, edge_h,
       mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL);
 
@@ -203,17 +285,26 @@ nns_edge_mqtt_connect (nns_edge_h edge_h)
 
   if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
     nns_edge_loge ("Failed to connect MQTT.");
-    MQTTAsync_destroy (&handle);
     ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
     goto error;
   }
 
-  eh->mqtt_handle = handle;
-  ret = NNS_EDGE_ERROR_NONE;
+  /* Waiting for the connection */
+  end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND;
+  g_mutex_lock (&bh->mqtt_mutex);
+  while (!bh->mqtt_is_connected) {
+    if (!g_cond_wait_until (&bh->mqtt_gcond, &bh->mqtt_mutex, end_time)) {
+      g_mutex_unlock (&bh->mqtt_mutex);
+      nns_edge_loge ("Failed to connect to MQTT broker."
+          "Please check broker is running status or broker host address.");
+      goto error;
+    }
+  }
+  g_mutex_unlock (&bh->mqtt_mutex);
+  return NNS_EDGE_ERROR_NONE;
 
 error:
-  g_free (url);
-  g_free (client_id);
+  nns_edge_mqtt_close (eh);
   return ret;
 }
 
@@ -225,22 +316,25 @@ int
 nns_edge_mqtt_close (nns_edge_h edge_h)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
   MQTTAsync handle;
   MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
+  char *msg;
 
   eh = (nns_edge_handle_s *) edge_h;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  handle = eh->mqtt_handle;
+  bh = (nns_edge_broker_s *) eh->broker_h;
 
-  if (!handle) {
+  if (!bh->mqtt_h) {
     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
-    return NNS_EDGE_ERROR_IO;
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
+  handle = bh->mqtt_h;
 
   nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
       eh->id, eh->ip, eh->port);
@@ -249,15 +343,27 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
   options.onFailure = mqtt_cb_disconnection_failure;
   options.context = edge_h;
 
+  /** Clear retained message */
+  MQTTAsync_send (handle, eh->topic, 0, NULL, 1, 1, NULL);
+
   while (MQTTAsync_isConnected (handle)) {
     if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
       nns_edge_loge ("Failed to disconnect MQTT.");
       return NNS_EDGE_ERROR_IO;
     }
+    g_usleep (10000);
   }
+  g_cond_clear (&bh->mqtt_gcond);
+  g_mutex_clear (&bh->mqtt_mutex);
 
   MQTTAsync_destroy (&handle);
-  eh->mqtt_handle = NULL;
+
+  while ((msg = g_async_queue_try_pop (bh->server_list))) {
+    SAFE_FREE (msg);
+  }
+  g_async_queue_unref (bh->server_list);
+  bh->server_list = NULL;
+  SAFE_FREE (bh);
 
   return NNS_EDGE_ERROR_NONE;
 }
@@ -270,12 +376,13 @@ int
 nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
   MQTTAsync handle;
   int ret;
 
   eh = (nns_edge_handle_s *) edge_h;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
@@ -285,10 +392,15 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  handle = eh->mqtt_handle;
-
-  if (!handle || MQTTAsync_isConnected (handle)) {
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  if (!bh->mqtt_h) {
     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+  handle = bh->mqtt_h;
+
+  if (!MQTTAsync_isConnected (handle)) {
+    nns_edge_loge ("Failed to publish message, MQTT is not connected.");
     return NNS_EDGE_ERROR_IO;
   }
 
@@ -311,19 +423,25 @@ int
 nns_edge_mqtt_subscribe (nns_edge_h edge_h)
 {
   nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
   MQTTAsync handle;
   int ret;
 
   eh = (nns_edge_handle_s *) edge_h;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
     nns_edge_loge ("Invalid param, given edge handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  handle = eh->mqtt_handle;
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  if (!bh->mqtt_h) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+  handle = bh->mqtt_h;
 
-  if (!handle || MQTTAsync_isConnected (handle)) {
+  if (!MQTTAsync_isConnected (handle)) {
     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
     return NNS_EDGE_ERROR_IO;
   }
@@ -338,3 +456,60 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h)
 
   return NNS_EDGE_ERROR_NONE;
 }
+
+/**
+ * @brief Check mqtt connection
+ */
+bool
+nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  MQTTAsync handle;
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return false;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  if (!bh->mqtt_h) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return false;
+  }
+  handle = bh->mqtt_h;
+
+  if (MQTTAsync_isConnected (handle)) {
+    return true;
+  }
+
+  return false;
+}
+
+/**
+ * @brief Get message from mqtt broker.
+ */
+int
+nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
+  *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT);
+  if (!*msg) {
+    nns_edge_loge ("Failed to get message from mqtt broker within timeout");
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}