[MQTT] Wait until removing the retained message.
authorgichan2-jang <gichan2.jang@samsung.com>
Wed, 16 Aug 2023 09:29:30 +0000 (18:29 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Thu, 7 Sep 2023 06:57:59 +0000 (15:57 +0900)
Change to wait until removing the retained message.

Signed-off-by: gichan2-jang <gichan2.jang@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c

index c69698caac9685a7db6b0913eecedb5a38075b3a..90a765beb24dc5c08a12db8c0396ae77d09024dd 100644 (file)
@@ -38,6 +38,10 @@ typedef struct
   /* event callback for new message */
   nns_edge_event_cb event_cb;
   void *user_data;
+
+  pthread_mutex_t lock;
+  pthread_cond_t cond;
+  bool cleared;
 } nns_edge_broker_s;
 
 /**
@@ -160,6 +164,9 @@ _nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host,
   bh->connected = true;
   bh->event_cb = NULL;
   bh->user_data = NULL;
+  bh->cleared = false;
+  nns_edge_lock_init (bh);
+  nns_edge_cond_init (bh);
 
   *broker_h = bh;
   return NNS_EDGE_ERROR_NONE;
@@ -214,6 +221,26 @@ nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
   return ret;
 }
 
+/**
+ * @brief publish callback.
+ * @note This callback is called both if the message is sent successfully or if the broker responded with an error.
+ */
+static void
+_publish_cb (struct mosquitto *mosq, void *obj, int mid)
+{
+  nns_edge_broker_s *bh = NULL;
+
+  bh = (nns_edge_broker_s *) mosquitto_userdata (mosq);
+
+  if (!bh || bh->cleared)
+    return;
+
+  nns_edge_lock (bh);
+  bh->cleared = true;
+  nns_edge_cond_signal (bh);
+  nns_edge_unlock (bh);
+}
+
 /**
  * @brief Close the connection to MQTT.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
@@ -236,9 +263,19 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
     nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
         bh->id, bh->host, bh->port);
 
-    /* Clear retained message */
+    /* Clear retained message and wait up to 1 second before removing the message. */
+    mosquitto_publish_callback_set (handle, _publish_cb);
     mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
 
+    nns_edge_lock (bh);
+    if (!bh->cleared) {
+      nns_edge_cond_wait_until (bh, 1000U);
+    }
+    bh->cleared = true;
+    nns_edge_cond_destroy (bh);
+    nns_edge_unlock (bh);
+    nns_edge_lock_destroy (bh);
+
     mosquitto_disconnect (handle);
     mosquitto_destroy (handle);
     mosquitto_lib_cleanup ();
index 01742b4e07d29829ed223cabb8c8eb792c389fbc..df13adae918047652ae69174c6faa570ba8f1d41 100644 (file)
@@ -216,6 +216,7 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
   MQTTAsync handle;
   MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
   unsigned int wait_count;
+  MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
 
   if (!broker_h) {
     nns_edge_loge ("Invalid param, given broker handle is invalid.");
@@ -231,8 +232,9 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
 
     options.context = bh;
 
-    /* Clear retained message */
-    MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
+    /* Clear retained message and wait up to 1 second before removing the message. */
+    MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, &ropts);
+    MQTTAsync_waitForCompletion (handle, ropts.token, 1000U);
 
     wait_count = 0U;
     do {