[MQTT] topic in broker handle
authorJaeyun <jy1210.jung@samsung.com>
Thu, 11 Aug 2022 07:19:13 +0000 (16:19 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Fri, 12 Aug 2022 01:31:10 +0000 (10:31 +0900)
1. Do not replace topic string in edge handle (set topic in mqtt-handle).
2. Use util function to get/parse host string.
3. Fix res leak case when mqtt connection is failed.
4. Code clean, remove unnecessary conversion after allocating new memory for mqtt-handle.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-internal.h
src/libnnstreamer-edge/nnstreamer-edge-mqtt.c

index 08322b2fc791f48c24bd59c8f749ee7d7c785b0c..2802621cba3b8bfbca14a850f980b0939101e7a9 100644 (file)
@@ -1052,34 +1052,33 @@ nns_edge_start (nns_edge_h edge_h)
 
   if (eh->flags & NNS_EDGE_FLAG_SERVER) {
     if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
-      gchar *device, *topic, *msg;
+      char *topic, *msg;
 
-      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+      /** @todo Set unique device name.
+       * Device name should be unique. Consider using MAC address later.
+       * Now, use ID received from the user.
+       */
+      topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/",
+          eh->id, eh->topic);
+
+      ret = nns_edge_mqtt_connect (eh, topic);
+      SAFE_FREE (topic);
+
+      if (NNS_EDGE_ERROR_NONE != ret) {
         nns_edge_loge
             ("Failed to start nnstreamer-edge. Connection failure to broker.");
-        ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
         goto error;
       }
 
-      /** @todo Set unique device name.
-       * Device name should be unique. Consider using MAC address later.
-       * Now, use ID received from the user.
-      */
-      device = g_strdup_printf ("device-%s", eh->id);
-      topic = g_strdup_printf ("edge/inference/%s/%s/", device, eh->topic);
-
-      g_free (device);
-      g_free (eh->topic);
-      eh->topic = topic;
-      msg = nns_edge_strdup_printf ("%s:%d", eh->host, eh->port);
-
-      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_publish (eh, msg,
-              strlen (msg) + 1)) {
-        nns_edge_loge ("Failed to publish the meesage: %s", msg);
-        ret = NNS_EDGE_ERROR_IO;
+      msg = nns_edge_get_host_string (eh->host, eh->port);
+
+      ret = nns_edge_mqtt_publish (eh, msg, strlen (msg) + 1);
+      SAFE_FREE (msg);
+
+      if (NNS_EDGE_ERROR_NONE != ret) {
+        nns_edge_loge ("Failed to publish the meesage to broker.");
         goto error;
       }
-      nns_edge_free (msg);
     }
   }
 
@@ -1210,8 +1209,6 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
 {
   nns_edge_handle_s *eh;
   int ret;
-  char *server_ip = NULL;
-  int server_port;
 
   eh = (nns_edge_handle_s *) edge_h;
   if (!eh) {
@@ -1248,54 +1245,51 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
   eh->dest_port = dest_port;
 
   if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
-    gchar *topic, *msg = NULL;
+    char *topic, *msg = NULL;
+    char *server_ip = NULL;
+    int server_port;
 
     if (!nns_edge_mqtt_is_connected (eh)) {
-      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+      topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
+
+      ret = nns_edge_mqtt_connect (eh, topic);
+      SAFE_FREE (topic);
+
+      if (NNS_EDGE_ERROR_NONE != ret) {
         nns_edge_loge ("Connection failure to broker.");
         nns_edge_unlock (eh);
-        return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+        return ret;
       }
-      topic = g_strdup_printf ("edge/inference/+/%s/#", eh->topic);
-      g_free (eh->topic);
-      eh->topic = topic;
 
-      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_subscribe (eh)) {
+      ret = nns_edge_mqtt_subscribe (eh);
+      if (NNS_EDGE_ERROR_NONE != ret) {
         nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
         nns_edge_unlock (eh);
-        return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+        return ret;
       }
     }
 
-    ret = nns_edge_mqtt_get_message (eh, &msg);
-    while (NNS_EDGE_ERROR_NONE == ret) {
-      gchar **splits;
-      splits = g_strsplit (msg, ":", -1);
-      server_ip = g_strdup (splits[0]);
-      server_port = g_ascii_strtoull (splits[1], NULL, 10);
+    while ((ret = nns_edge_mqtt_get_message (eh, &msg)) == NNS_EDGE_ERROR_NONE) {
+      nns_edge_parse_host_string (msg, &server_ip, &server_port);
+      SAFE_FREE (msg);
+
       nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
           server_port);
 
-      g_strfreev (splits);
-      g_free (msg);
-
       ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
+      SAFE_FREE (server_ip);
+
       if (NNS_EDGE_ERROR_NONE == ret) {
         break;
       }
-      SAFE_FREE (server_ip);
-      ret = nns_edge_mqtt_get_message (eh, &msg);
     }
-  } else { /** case for NNS_EDGE_CONNECT_TYPE_TCP == eh->protocol */
-    server_ip = nns_edge_strdup (dest_host);
-    server_port = dest_port;
-    ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
+  } else {
+    ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port);
     if (ret != NNS_EDGE_ERROR_NONE) {
-      nns_edge_loge ("Failed to connect to %s:%d", server_ip, server_port);
+      nns_edge_loge ("Failed to connect to %s:%d", dest_host, dest_port);
     }
   }
 
-  SAFE_FREE (server_ip);
   nns_edge_unlock (eh);
   return ret;
 }
index c3bec0e6057e65552047784b7e079ab0fa542695..23461776423a155b29401caabbade671cc914252 100644 (file)
@@ -57,7 +57,7 @@ typedef struct {
  * @brief Connect to MQTT.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
-int nns_edge_mqtt_connect (nns_edge_h edge_h);
+int nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic);
 
 /**
  * @brief Close the connection to MQTT.
@@ -101,7 +101,7 @@ int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg);
 #define nns_edge_mqtt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #define nns_edge_mqtt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
-#define nns_edge_mqtt_is_connected(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_is_connected(...) (false)
 #define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #endif
 
index a5434358a6eaa003c320d8bdbf58fb107117f294..1403061b5fda9c97ff1a1ac27f42259f8bb8ff72 100644 (file)
@@ -29,7 +29,8 @@ typedef struct
   GAsyncQueue *server_list;
   GMutex mqtt_mutex;
   GCond mqtt_gcond;
-  gboolean mqtt_is_connected;
+  bool mqtt_is_connected;
+  char *topic;
 } nns_edge_broker_s;
 
 /**
@@ -51,13 +52,9 @@ mqtt_cb_connection_lost (void *context, char *cause)
   bh = (nns_edge_broker_s *) eh->broker_h;
   nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause);
   g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = FALSE;
+  bh->mqtt_is_connected = false;
   g_cond_broadcast (&bh->mqtt_gcond);
   g_mutex_unlock (&bh->mqtt_mutex);
-
-  if (eh->event_cb) {
-    /** @todo send new event (MQTT disconnected) */
-  }
 }
 
 /**
@@ -80,13 +77,9 @@ mqtt_cb_connection_success (void *context, MQTTAsync_successData * response)
   bh = (nns_edge_broker_s *) eh->broker_h;
 
   g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = TRUE;
+  bh->mqtt_is_connected = true;
   g_cond_broadcast (&bh->mqtt_gcond);
   g_mutex_unlock (&bh->mqtt_mutex);
-
-  if (eh->event_cb) {
-    /** @todo send new event (MQTT connected) */
-  }
 }
 
 /**
@@ -110,13 +103,9 @@ mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response)
 
   nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id);
   g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = FALSE;
+  bh->mqtt_is_connected = false;
   g_cond_broadcast (&bh->mqtt_gcond);
   g_mutex_unlock (&bh->mqtt_mutex);
-
-  if (eh->event_cb) {
-    /** @todo send new event (MQTT connection failure) */
-  }
 }
 
 /**
@@ -140,13 +129,9 @@ mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response)
 
   nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id);
   g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = FALSE;
+  bh->mqtt_is_connected = false;
   g_cond_broadcast (&bh->mqtt_gcond);
   g_mutex_unlock (&bh->mqtt_mutex);
-
-  if (eh->event_cb) {
-    /** @todo send new event (MQTT disconnected) */
-  }
 }
 
 /**
@@ -166,9 +151,6 @@ mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response)
   }
 
   nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id);
-  if (eh->event_cb) {
-    /** @todo send new event (MQTT disconnection failure) */
-  }
 }
 
 /**
@@ -203,14 +185,9 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
   nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
       eh->id, eh->topic);
 
-  msg = (char *) malloc (message->payloadlen);
-  memcpy (msg, message->payload, message->payloadlen);
+  msg = nns_edge_memdup (message->payload, message->payloadlen);
   g_async_queue_push (bh->server_list, msg);
 
-  if (eh->event_cb) {
-    /** @todo send new event (message arrived) */
-  }
-
   return TRUE;
 }
 
@@ -219,7 +196,7 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
  */
 int
-nns_edge_mqtt_connect (nns_edge_h edge_h)
+nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
 {
   nns_edge_handle_s *eh;
   nns_edge_broker_s *bh;
@@ -237,17 +214,23 @@ nns_edge_mqtt_connect (nns_edge_h edge_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  bh = (nns_edge_broker_s *) malloc (sizeof (nns_edge_broker_s));
+  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+      eh->id, eh->dest_host, eh->dest_port);
+
+  bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
   if (!bh) {
     nns_edge_loge ("Failed to allocate memory for broker handle.");
     return NNS_EDGE_ERROR_OUT_OF_MEMORY;
   }
 
-  url = nns_edge_strdup_printf ("%s:%d", eh->dest_host, eh->dest_port);
+  url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
   client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
 
   ret = MQTTAsync_create (&handle, url, client_id,
       MQTTCLIENT_PERSISTENCE_NONE, NULL);
+  SAFE_FREE (url);
+  SAFE_FREE (client_id);
+
   if (MQTTASYNC_SUCCESS != ret) {
     nns_edge_loge ("Failed to create MQTT handle.");
     ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
@@ -256,22 +239,12 @@ nns_edge_mqtt_connect (nns_edge_h edge_h)
 
   g_cond_init (&bh->mqtt_gcond);
   g_mutex_init (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = FALSE;
+  bh->topic = nns_edge_strdup (topic);
+  bh->mqtt_is_connected = false;
   bh->mqtt_h = handle;
   bh->server_list = g_async_queue_new ();
   eh->broker_h = bh;
 
-  bh = (nns_edge_broker_s *) eh->broker_h;
-  if (!bh->mqtt_h) {
-    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
-    ret = NNS_EDGE_ERROR_IO;
-    goto error;
-  }
-  handle = bh->mqtt_h;
-
-  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
-      eh->id, eh->dest_host, eh->dest_port);
-
   MQTTAsync_setCallbacks (handle, edge_h,
       mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL);
 
@@ -327,42 +300,43 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
   }
 
   bh = (nns_edge_broker_s *) eh->broker_h;
-
-  if (!bh->mqtt_h) {
-    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
   handle = bh->mqtt_h;
 
-  nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
-      eh->id, eh->dest_host, eh->dest_port);
+  if (handle) {
+    nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
+        eh->id, eh->dest_host, eh->dest_port);
 
-  options.onSuccess = mqtt_cb_disconnection_success;
-  options.onFailure = mqtt_cb_disconnection_failure;
-  options.context = edge_h;
+    options.onSuccess = mqtt_cb_disconnection_success;
+    options.onFailure = mqtt_cb_disconnection_failure;
+    options.context = edge_h;
 
-  /** Clear retained message */
-  MQTTAsync_send (handle, eh->topic, 0, NULL, 1, 1, NULL);
+    /* Clear retained message */
+    MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
 
-  while (MQTTAsync_isConnected (handle)) {
-    if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
-      nns_edge_loge ("Failed to disconnect MQTT.");
-      return NNS_EDGE_ERROR_IO;
+    while (MQTTAsync_isConnected (handle)) {
+      if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
+        nns_edge_loge ("Failed to disconnect MQTT.");
+        return NNS_EDGE_ERROR_IO;
+      }
+      g_usleep (10000);
     }
-    g_usleep (10000);
+
+    MQTTAsync_destroy (&handle);
   }
+
   g_cond_clear (&bh->mqtt_gcond);
   g_mutex_clear (&bh->mqtt_mutex);
 
-  MQTTAsync_destroy (&handle);
-
   while ((msg = g_async_queue_try_pop (bh->server_list))) {
     SAFE_FREE (msg);
   }
   g_async_queue_unref (bh->server_list);
   bh->server_list = NULL;
+
+  SAFE_FREE (bh->topic);
   SAFE_FREE (bh);
 
+  eh->broker_h = NULL;
   return NNS_EDGE_ERROR_NONE;
 }
 
@@ -403,7 +377,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
   }
 
   /* Publish a message (default QoS 1 - at least once and retained true). */
-  ret = MQTTAsync_send (handle, eh->topic, length, data, 1, 1, NULL);
+  ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
   if (ret != MQTTASYNC_SUCCESS) {
     nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
         eh->id, eh->topic);
@@ -445,7 +419,7 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h)
   }
 
   /* Subscribe a topic (default QoS 1 - at least once). */
-  ret = MQTTAsync_subscribe (handle, eh->topic, 1, NULL);
+  ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
   if (ret != MQTTASYNC_SUCCESS) {
     nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
         eh->id, eh->topic);
@@ -505,7 +479,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
 
   *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT);
   if (!*msg) {
-    nns_edge_loge ("Failed to get message from mqtt broker within timeout");
+    nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     return NNS_EDGE_ERROR_UNKNOWN;
   }