[CodeClean/MQTT] broker handle
authorJaeyun <jy1210.jung@samsung.com>
Mon, 21 Nov 2022 10:20:01 +0000 (19:20 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Tue, 6 Dec 2022 04:35:44 +0000 (13:35 +0900)
Code clean, add broker handle and revise MQTT functions.
This will remove dependency to edge-handle in MQTT impl.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt.h
tests/unittest_nnstreamer-edge.cc

index 831f217df48793488031a94985f399ed1e4596f7..cb65f593d348242a0609afccbd9351d91a6296a4 100644 (file)
@@ -1269,7 +1269,8 @@ nns_edge_start (nns_edge_h edge_h)
       topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/",
           eh->id, eh->topic);
 
-      ret = nns_edge_mqtt_connect (eh, topic);
+      ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
+          &eh->broker_h);
       SAFE_FREE (topic);
 
       if (NNS_EDGE_ERROR_NONE != ret) {
@@ -1280,7 +1281,7 @@ nns_edge_start (nns_edge_h edge_h)
 
       msg = nns_edge_get_host_string (eh->host, eh->port);
 
-      ret = nns_edge_mqtt_publish (eh, msg, strlen (msg) + 1);
+      ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
       SAFE_FREE (msg);
 
       if (NNS_EDGE_ERROR_NONE != ret) {
@@ -1337,7 +1338,7 @@ nns_edge_release_handle (nns_edge_h edge_h)
 
   switch (eh->connect_type) {
     case NNS_EDGE_CONNECT_TYPE_HYBRID:
-      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
+      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) {
         nns_edge_logw ("Failed to close mqtt connection.");
       }
       eh->broker_h = NULL;
@@ -1474,14 +1475,13 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
   eh->dest_port = dest_port;
 
   if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
-    char *topic, *msg = NULL;
-    char *server_ip = NULL;
-    int server_port;
+    char *topic;
 
-    if (!nns_edge_mqtt_is_connected (eh)) {
+    if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
       topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
 
-      ret = nns_edge_mqtt_connect (eh, topic);
+      ret = nns_edge_mqtt_connect (eh->id, topic, dest_host, dest_port,
+          &eh->broker_h);
       SAFE_FREE (topic);
 
       if (NNS_EDGE_ERROR_NONE != ret) {
@@ -1489,14 +1489,22 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
         goto done;
       }
 
-      ret = nns_edge_mqtt_subscribe (eh);
+      ret = nns_edge_mqtt_subscribe (eh->broker_h);
       if (NNS_EDGE_ERROR_NONE != ret) {
         nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
         goto done;
       }
     }
 
-    while ((ret = nns_edge_mqtt_get_message (eh, &msg)) == NNS_EDGE_ERROR_NONE) {
+    do {
+      char *msg = NULL;
+      char *server_ip = NULL;
+      int server_port = 0;
+
+      ret = nns_edge_mqtt_get_message (eh->broker_h, &msg);
+      if (ret != NNS_EDGE_ERROR_NONE || !msg)
+        break;
+
       nns_edge_parse_host_string (msg, &server_ip, &server_port);
       SAFE_FREE (msg);
 
@@ -1509,7 +1517,7 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
       if (NNS_EDGE_ERROR_NONE == ret) {
         break;
       }
-    }
+    } while (TRUE);
   } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
     ret = nns_edge_aitt_connect (eh);
     if (ret != NNS_EDGE_ERROR_NONE) {
index 921ee7935e380625ee33b0200eb3362f2d25eaf4..235b302a4c0d678c309b7a801ac465dd02a38319 100644 (file)
@@ -27,7 +27,10 @@ typedef struct
 {
   void *mqtt_h;
   nns_edge_queue_h server_list;
+  char *id;
   char *topic;
+  char *host;
+  int port;
   bool connected;
 } nns_edge_broker_s;
 
@@ -65,7 +68,8 @@ on_message_callback (struct mosquitto *client, void *data,
  * @brief Initializes MQTT object.
  */
 static int
-_nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic)
+_nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host,
+    const int port, nns_edge_broker_h * broker_h)
 {
   nns_edge_broker_s *bh;
   int mret;
@@ -73,8 +77,7 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic)
   struct mosquitto *handle;
   int ver = MQTT_PROTOCOL_V311; /** @todo check mqtt version (TizenRT repo) */
 
-  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
-      eh->id, eh->dest_host, eh->dest_port);
+  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port);
 
   bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
   if (!bh) {
@@ -83,7 +86,7 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic)
   }
 
   mosquitto_lib_init ();
-  client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+  client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ());
 
   handle = mosquitto_new (client_id, TRUE, NULL);
   SAFE_FREE (client_id);
@@ -109,7 +112,7 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic)
     goto error;
   }
 
-  mret = mosquitto_connect (handle, eh->dest_host, eh->dest_port, 60);
+  mret = mosquitto_connect (handle, host, port, 60);
   if (mret != MOSQ_ERR_SUCCESS) {
     nns_edge_loge ("Failed to connect MQTT.");
     goto error;
@@ -117,10 +120,13 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic)
 
   nns_edge_queue_create (&bh->server_list);
   bh->mqtt_h = handle;
+  bh->id = nns_edge_strdup (id);
   bh->topic = nns_edge_strdup (topic);
+  bh->host = nns_edge_strdup (host);
+  bh->port = port;
   bh->connected = true;
 
-  eh->broker_h = bh;
+  *broker_h = bh;
   return NNS_EDGE_ERROR_NONE;
 
 error:
@@ -136,24 +142,37 @@ error:
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
+    const int port, nns_edge_broker_h * broker_h)
 {
-  nns_edge_handle_s *eh;
   int ret = NNS_EDGE_ERROR_NONE;
 
+  if (!STR_IS_VALID (id)) {
+    nns_edge_loge ("Invalid param, given id is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
   if (!STR_IS_VALID (topic)) {
     nns_edge_loge ("Invalid param, given topic is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  eh = (nns_edge_handle_s *) edge_h;
+  if (!STR_IS_VALID (host)) {
+    nns_edge_loge ("Invalid param, given host is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!PORT_IS_VALID (port)) {
+    nns_edge_loge ("Invalid param, given port is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  ret = _nns_edge_mqtt_init_client (eh, topic);
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, mqtt_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = _nns_edge_mqtt_init_client (id, topic, host, port, broker_h);
   if (NNS_EDGE_ERROR_NONE != ret)
     nns_edge_loge ("Failed to initialize the MQTT client object.");
 
@@ -165,25 +184,22 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_close (nns_edge_h edge_h)
+nns_edge_mqtt_close (nns_edge_broker_h broker_h)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   struct mosquitto *handle;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
   handle = bh->mqtt_h;
 
   if (handle) {
     nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
-        eh->id, eh->dest_host, eh->dest_port);
+        bh->id, bh->host, bh->port);
 
     /* Clear retained message */
     mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
@@ -195,9 +211,10 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
 
   nns_edge_queue_destroy (bh->server_list);
   bh->server_list = NULL;
+  SAFE_FREE (bh->id);
   SAFE_FREE (bh->topic);
+  SAFE_FREE (bh->host);
   SAFE_FREE (bh);
-  eh->broker_h = NULL;
 
   return NNS_EDGE_ERROR_NONE;
 }
@@ -207,17 +224,15 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data,
+    const int length)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   struct mosquitto *handle;
   int ret;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
@@ -226,7 +241,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
   handle = bh->mqtt_h;
 
   if (!handle) {
@@ -243,7 +258,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
   ret = mosquitto_publish (handle, NULL, bh->topic, length, data, 1, true);
   if (MOSQ_ERR_SUCCESS != ret) {
     nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
-        eh->id, eh->topic);
+        bh->id, bh->topic);
     return NNS_EDGE_ERROR_IO;
   }
 
@@ -255,21 +270,18 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   void *handle;
   int ret;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
   handle = bh->mqtt_h;
 
   if (!handle) {
@@ -286,7 +298,7 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h)
   ret = mosquitto_subscribe (handle, NULL, bh->topic, 1);
   if (MOSQ_ERR_SUCCESS != ret) {
     nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
-        eh->id, eh->topic);
+        bh->id, bh->topic);
     return NNS_EDGE_ERROR_IO;
   }
 
@@ -297,15 +309,12 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h)
  * @brief Get message from mqtt broker.
  */
 int
-nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
@@ -314,7 +323,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
 
   /* Wait for 1 second */
   if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
@@ -329,19 +338,16 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
  * @brief Check mqtt connection
  */
 bool
-nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return false;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
 
   return bh->connected;
 }
index 49c23ff37d45efc585b80ffc851467eddd3f5222..b45dde104784034a5cd0fbd30d003189be56873c 100644 (file)
@@ -27,7 +27,10 @@ typedef struct
 {
   void *mqtt_h;
   nns_edge_queue_h server_list;
+  char *id;
   char *topic;
+  char *host;
+  int port;
 } nns_edge_broker_s;
 
 /**
@@ -38,28 +41,25 @@ static int
 mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
     MQTTAsync_message * message)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   char *msg = NULL;
 
   UNUSED (topic);
   UNUSED (topic_len);
-  eh = (nns_edge_handle_s *) context;
+  bh = (nns_edge_broker_s *) context;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!bh) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return TRUE;
   }
 
   if (0 >= message->payloadlen) {
-    nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
+    nns_edge_logw ("Invalid payload length: %d", message->payloadlen);
     return TRUE;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
-
   nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
-      eh->id, eh->topic);
+      bh->id, bh->topic);
 
   msg = nns_edge_memdup (message->payload, message->payloadlen);
   if (msg)
@@ -73,9 +73,9 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
+    const int port, nns_edge_broker_h * broker_h)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
   int ret = NNS_EDGE_ERROR_NONE;
@@ -84,20 +84,32 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
   char *client_id;
   unsigned int wait_count;
 
+  if (!STR_IS_VALID (id)) {
+    nns_edge_loge ("Invalid param, given id is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
   if (!STR_IS_VALID (topic)) {
     nns_edge_loge ("Invalid param, given topic is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  eh = (nns_edge_handle_s *) edge_h;
+  if (!STR_IS_VALID (host)) {
+    nns_edge_loge ("Invalid param, given host is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!PORT_IS_VALID (port)) {
+    nns_edge_loge ("Invalid param, given port is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, mqtt_h should not be null.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
-      eh->id, eh->dest_host, eh->dest_port);
+  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port);
 
   bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
   if (!bh) {
@@ -105,8 +117,8 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
   }
 
-  url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
-  client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+  url = nns_edge_get_host_string (host, port);
+  client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ());
 
   ret = MQTTAsync_create (&handle, url, client_id,
       MQTTCLIENT_PERSISTENCE_NONE, NULL);
@@ -119,16 +131,18 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
     goto error;
   }
 
+  bh->id = nns_edge_strdup (id);
   bh->topic = nns_edge_strdup (topic);
+  bh->host = nns_edge_strdup (host);
+  bh->port = port;
   bh->mqtt_h = handle;
   nns_edge_queue_create (&bh->server_list);
-  eh->broker_h = bh;
 
-  MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
+  MQTTAsync_setCallbacks (handle, bh, NULL, mqtt_cb_message_arrived, NULL);
 
   options.cleansession = 1;
   options.keepAliveInterval = 6;
-  options.context = edge_h;
+  options.context = bh;
 
   if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
     nns_edge_loge ("Failed to connect MQTT.");
@@ -148,10 +162,11 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
     wait_count++;
   } while (!MQTTAsync_isConnected (handle));
 
+  *broker_h = bh;
   return NNS_EDGE_ERROR_NONE;
 
 error:
-  nns_edge_mqtt_close (eh);
+  nns_edge_mqtt_close (bh);
   return ret;
 }
 
@@ -160,29 +175,26 @@ error:
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_close (nns_edge_h edge_h)
+nns_edge_mqtt_close (nns_edge_broker_h broker_h)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   MQTTAsync handle;
   MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
   unsigned int wait_count;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
   handle = bh->mqtt_h;
 
   if (handle) {
     nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
-        eh->id, eh->dest_host, eh->dest_port);
+        bh->id, bh->host, bh->port);
 
-    options.context = edge_h;
+    options.context = bh;
 
     /* Clear retained message */
     MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
@@ -209,10 +221,11 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
   nns_edge_queue_destroy (bh->server_list);
   bh->server_list = NULL;
 
+  SAFE_FREE (bh->id);
   SAFE_FREE (bh->topic);
+  SAFE_FREE (bh->host);
   SAFE_FREE (bh);
 
-  eh->broker_h = NULL;
   return NNS_EDGE_ERROR_NONE;
 }
 
@@ -221,17 +234,15 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data,
+    const int length)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   MQTTAsync handle;
   int ret;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
@@ -240,7 +251,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
   handle = bh->mqtt_h;
 
   if (!handle) {
@@ -257,7 +268,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
   ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
   if (ret != MQTTASYNC_SUCCESS) {
     nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
-        eh->id, eh->topic);
+        bh->id, bh->topic);
     return NNS_EDGE_ERROR_IO;
   }
 
@@ -269,21 +280,18 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   MQTTAsync handle;
   int ret;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
   handle = bh->mqtt_h;
 
   if (!handle) {
@@ -300,7 +308,7 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h)
   ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
   if (ret != MQTTASYNC_SUCCESS) {
     nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
-        eh->id, eh->topic);
+        bh->id, bh->topic);
     return NNS_EDGE_ERROR_IO;
   }
 
@@ -311,19 +319,17 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h)
  * @brief Check mqtt connection
  */
 bool
-nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
   MQTTAsync handle;
-  eh = (nns_edge_handle_s *) edge_h;
 
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return false;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
   handle = bh->mqtt_h;
 
   if (!handle) {
@@ -342,15 +348,12 @@ nns_edge_mqtt_is_connected (nns_edge_h edge_h)
  * @brief Get message from mqtt broker.
  */
 int
-nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
 {
-  nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
 
-  eh = (nns_edge_handle_s *) edge_h;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
@@ -359,7 +362,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
+  bh = (nns_edge_broker_s *) broker_h;
 
   /* Wait for 1 second */
   if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
index 1253d1e1a09637b462ec0a7be395950b1f8e8eed..76cd8aa2ed3195749d935d43d9fe5aa57cd8c4eb 100644 (file)
 extern "C" {
 #endif /* __cplusplus */
 
-typedef void *nns_edge_mqtt_h;
+typedef void *nns_edge_broker_h;
 
 #if defined(ENABLE_MQTT)
 /**
  * @brief Connect to MQTT.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
-int nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic);
+int nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, const int port, nns_edge_broker_h *broker_h);
 
 /**
  * @brief Close the connection to MQTT.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
-int nns_edge_mqtt_close (nns_edge_h edge_h);
+int nns_edge_mqtt_close (nns_edge_broker_h broker_h);
 
 /**
  * @brief Publish raw data.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
-int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length);
+int nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, const int length);
 
 /**
  * @brief Subscribe a topic.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
-int nns_edge_mqtt_subscribe (nns_edge_h edge_h);
+int nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h);
 
 /**
  * @brief Check mqtt connection
  */
-bool nns_edge_mqtt_is_connected (nns_edge_h edge_h);
+bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h);
 
 /**
  * @brief Get message from mqtt broker.
  */
-int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg);
+int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg);
 
 #else
 /**
  * @todo consider to change code style later.
  * If MQTT is disabled, nnstreamer does not include nnstreamer_edge_mqtt.c, and changing code style will make error as it is not used function now.
  *
- * static int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+ * static int nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, const int length)
  * {
  *   return NNS_EDGE_ERROR_NOT_SUPPORTED;
  * }
index 8ebe2574332cb916fa1b133aa5b7a327fc948f5e..5f1f5756283043322c5477e62612aae46aaebfcf 100644 (file)
@@ -3769,11 +3769,15 @@ TEST(edgeMqtt, connectLocal)
 TEST(edgeMqtt, connectInvalidParam1_n)
 {
   int ret = -1;
+  nns_edge_broker_h broker_h;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic");
+  ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
@@ -3783,22 +3787,16 @@ TEST(edgeMqtt, connectInvalidParam1_n)
 TEST(edgeMqtt, connectInvalidParam2_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
+  nns_edge_broker_h broker_h;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
-  ret = nns_edge_mqtt_connect (edge_h, NULL);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_release_handle (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
@@ -3807,179 +3805,134 @@ TEST(edgeMqtt, connectInvalidParam2_n)
 TEST(edgeMqtt, connectInvalidParam3_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
+  nns_edge_broker_h broker_h;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
-  ret = nns_edge_mqtt_connect (edge_h, "");
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_release_handle (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
- * @brief Connect to the mqtt broker with invalid hostaddress.
+ * @brief Connect to the mqtt broker with invalid param.
  */
 TEST(edgeMqtt, connectInvalidParam4_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://none");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
-  ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic");
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_release_handle (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
- * @brief Close the mqtt handle with invalid param.
+ * @brief Connect to the mqtt broker with invalid host address.
  */
-TEST(edgeMqtt, closeInvalidParam_n)
+TEST(edgeMqtt, connectInvalidParam5_n)
 {
   int ret = -1;
+  nns_edge_broker_h broker_h;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_mqtt_close (NULL);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
- * @brief Close the mqtt handle before the connection.
+ * @brief Connect to the mqtt broker with invalid port number.
  */
-TEST(edgeMqtt, closeInvalidParam2_n)
+TEST(edgeMqtt, connectInvalidParam6_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
+  nns_edge_broker_h broker_h;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_close (edge_h);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_release_handle (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
- * @brief Publish with invalid param.
+ * @brief Close the mqtt handle with invalid param.
  */
-TEST(edgeMqtt, publishInvalidParam_n)
+TEST(edgeMqtt, closeInvalidParam_n)
 {
   int ret = -1;
-  const char *msg = "TEMP_MESSAGE";
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
+  ret = nns_edge_mqtt_close (NULL);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
  * @brief Publish with invalid param.
  */
-TEST(edgeMqtt, publishInvalidParam2_n)
+TEST(edgeMqtt, publishInvalidParam_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
   const char *msg = "TEMP_MESSAGE";
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
-  ret = nns_edge_start (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  /* data is null */
-  ret = nns_edge_mqtt_publish (edge_h, NULL, strlen (msg) + 1);
+  ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_release_handle (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
  * @brief Publish with invalid param.
  */
-TEST(edgeMqtt, publishInvalidParam3_n)
+TEST(edgeMqtt, publishInvalidParam2_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
+  nns_edge_broker_h broker_h;
   const char *msg = "TEMP_MESSAGE";
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
-  ret = nns_edge_start (edge_h);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  /* data length is 0 */
-  ret = nns_edge_mqtt_publish (edge_h, msg, 0);
+  /* data is null */
+  ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_release_handle (edge_h);
+  ret = nns_edge_mqtt_close (broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 
 /**
- * @brief Publish the message without the connection.
+ * @brief Publish with invalid param.
  */
-TEST(edgeMqtt, publishInvalidParam4_n)
+TEST(edgeMqtt, publishInvalidParam3_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
+  nns_edge_broker_h broker_h;
   const char *msg = "TEMP_MESSAGE";
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
-  ret = nns_edge_mqtt_publish (edge_h, msg, strlen (msg) + 1);
+  /* data length is 0 */
+  ret = nns_edge_mqtt_publish (broker_h, msg, 0);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_release_handle (edge_h);
+  ret = nns_edge_mqtt_close (broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 
@@ -3997,28 +3950,6 @@ TEST(edgeMqtt, subscribeInvalidParam_n)
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
-/**
- * @brief Subscribe the topic before the connection.
- */
-TEST(edgeMqtt, subscribeInvalidParam2_n)
-{
-  int ret = -1;
-  nns_edge_h edge_h;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_subscribe (edge_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_release_handle (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
 /**
  * @brief Get message with invalid param.
  */
@@ -4040,24 +3971,18 @@ TEST(edgeMqtt, getMessageInvalidParam_n)
 TEST(edgeMqtt, getMessageInvalidParam2_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
+  nns_edge_broker_h broker_h;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
-  ret = nns_edge_start (edge_h);
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_get_message (edge_h, NULL);
+  ret = nns_edge_mqtt_get_message (broker_h, NULL);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_release_handle (edge_h);
+  ret = nns_edge_mqtt_close (broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 
@@ -4067,28 +3992,19 @@ TEST(edgeMqtt, getMessageInvalidParam2_n)
 TEST(edgeMqtt, getMessageWithinTimeout_n)
 {
   int ret = -1;
-  nns_edge_h edge_h;
+  nns_edge_broker_h broker_h;
   char *msg = NULL;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
-  ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic");
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_get_message (edge_h, &msg);
+  ret = nns_edge_mqtt_get_message (broker_h, &msg);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_close (edge_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_release_handle (edge_h);
+  ret = nns_edge_mqtt_close (broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 #endif /* ENABLE_MQTT */