[MQTT] function to clear retained msg
authorJaeyun Jung <jy1210.jung@samsung.com>
Thu, 7 Sep 2023 07:10:27 +0000 (16:10 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Thu, 7 Sep 2023 07:44:25 +0000 (16:44 +0900)
Add function to clear retained message, and set wait-time interval.

Signed-off-by: Jaeyun Jung <jy1210.jung@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c

index 90a765beb24dc5c08a12db8c0396ae77d09024dd..85f71a92a169da5a39270cbc2aa5f9e61e73ddac 100644 (file)
@@ -222,11 +222,11 @@ nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
 }
 
 /**
- * @brief publish callback.
+ * @brief Publish callback for clearing retained message.
  * @note This callback is called both if the message is sent successfully or if the broker responded with an error.
  */
 static void
-_publish_cb (struct mosquitto *mosq, void *obj, int mid)
+_clear_retained_cb (struct mosquitto *mosq, void *obj, int mid)
 {
   nns_edge_broker_s *bh = NULL;
 
@@ -241,6 +241,36 @@ _publish_cb (struct mosquitto *mosq, void *obj, int mid)
   nns_edge_unlock (bh);
 }
 
+/**
+ * @brief Clear retained message.
+ */
+static void
+_nns_edge_clear_retained (nns_edge_broker_s * bh)
+{
+  struct mosquitto *handle;
+  unsigned int wait = 0U;
+
+  if (!bh)
+    return;
+
+  handle = bh->mqtt_h;
+  if (handle) {
+    nns_edge_lock (bh);
+    bh->cleared = false;
+
+    mosquitto_publish_callback_set (handle, _clear_retained_cb);
+    mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
+
+    /* Wait up to 10 seconds. */
+    while (!bh->cleared && ++wait < 1000U)
+      nns_edge_cond_wait_until (bh, 10);
+
+    mosquitto_publish_callback_set (handle, NULL);
+    bh->cleared = true;
+    nns_edge_unlock (bh);
+  }
+}
+
 /**
  * @brief Close the connection to MQTT.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
@@ -263,18 +293,7 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
     nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
         bh->id, bh->host, bh->port);
 
-    /* Clear retained message and wait up to 1 second before removing the message. */
-    mosquitto_publish_callback_set (handle, _publish_cb);
-    mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
-
-    nns_edge_lock (bh);
-    if (!bh->cleared) {
-      nns_edge_cond_wait_until (bh, 1000U);
-    }
-    bh->cleared = true;
-    nns_edge_cond_destroy (bh);
-    nns_edge_unlock (bh);
-    nns_edge_lock_destroy (bh);
+    _nns_edge_clear_retained (bh);
 
     mosquitto_disconnect (handle);
     mosquitto_destroy (handle);
@@ -283,6 +302,8 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
 
   nns_edge_queue_destroy (bh->message_queue);
   bh->message_queue = NULL;
+  nns_edge_lock_destroy (bh);
+  nns_edge_cond_destroy (bh);
   SAFE_FREE (bh->id);
   SAFE_FREE (bh->topic);
   SAFE_FREE (bh->host);
index df13adae918047652ae69174c6faa570ba8f1d41..424b283739e754281b66e6a54a124254609a39d3 100644 (file)
@@ -214,9 +214,9 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
 {
   nns_edge_broker_s *bh;
   MQTTAsync handle;
-  MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
-  unsigned int wait_count;
+  MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
   MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
+  unsigned int wait_count;
 
   if (!broker_h) {
     nns_edge_loge ("Invalid param, given broker handle is invalid.");
@@ -230,26 +230,25 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
     nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
         bh->id, bh->host, bh->port);
 
-    options.context = bh;
-
-    /* Clear retained message and wait up to 1 second before removing the message. */
+    /* Clear retained message and wait up to 10 seconds before removing the message. */
     MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, &ropts);
-    MQTTAsync_waitForCompletion (handle, ropts.token, 1000U);
+    MQTTAsync_waitForCompletion (handle, ropts.token, 10000U);
+
+    /* Wait for message transfer, 10 milliseconds. */
+    dopts.timeout = 10;
+    dopts.context = bh;
 
     wait_count = 0U;
     do {
-      if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
+      if (MQTTAsync_disconnect (handle, &dopts) != MQTTASYNC_SUCCESS) {
         nns_edge_loge ("Failed to disconnect MQTT.");
         break;
       }
 
-      if (wait_count > 500U) {
+      if (++wait_count > 500U) {
         nns_edge_loge ("Failed to disconnect MQTT, timed out.");
         break;
       }
-
-      usleep (10000);
-      wait_count++;
     } while (MQTTAsync_isConnected (handle));
 
     MQTTAsync_destroy (&handle);