From 7e39399feed9270d1927b4e079a827862e42978b Mon Sep 17 00:00:00 2001 From: gichan2-jang Date: Wed, 16 Aug 2023 18:29:30 +0900 Subject: [PATCH] [MQTT] Wait until removing the retained message. Change to wait until removing the retained message. Signed-off-by: gichan2-jang --- .../nnstreamer-edge-mqtt-mosquitto.c | 39 ++++++++++++++++++- .../nnstreamer-edge-mqtt-paho.c | 6 ++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c index c69698c..90a765b 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -38,6 +38,10 @@ typedef struct /* event callback for new message */ nns_edge_event_cb event_cb; void *user_data; + + pthread_mutex_t lock; + pthread_cond_t cond; + bool cleared; } nns_edge_broker_s; /** @@ -160,6 +164,9 @@ _nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host, bh->connected = true; bh->event_cb = NULL; bh->user_data = NULL; + bh->cleared = false; + nns_edge_lock_init (bh); + nns_edge_cond_init (bh); *broker_h = bh; return NNS_EDGE_ERROR_NONE; @@ -214,6 +221,26 @@ nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, return ret; } +/** + * @brief publish callback. + * @note This callback is called both if the message is sent successfully or if the broker responded with an error. + */ +static void +_publish_cb (struct mosquitto *mosq, void *obj, int mid) +{ + nns_edge_broker_s *bh = NULL; + + bh = (nns_edge_broker_s *) mosquitto_userdata (mosq); + + if (!bh || bh->cleared) + return; + + nns_edge_lock (bh); + bh->cleared = true; + nns_edge_cond_signal (bh); + nns_edge_unlock (bh); +} + /** * @brief Close the connection to MQTT. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. @@ -236,9 +263,19 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", bh->id, bh->host, bh->port); - /* Clear retained message */ + /* Clear retained message and wait up to 1 second before removing the message. */ + mosquitto_publish_callback_set (handle, _publish_cb); mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true); + nns_edge_lock (bh); + if (!bh->cleared) { + nns_edge_cond_wait_until (bh, 1000U); + } + bh->cleared = true; + nns_edge_cond_destroy (bh); + nns_edge_unlock (bh); + nns_edge_lock_destroy (bh); + mosquitto_disconnect (handle); mosquitto_destroy (handle); mosquitto_lib_cleanup (); diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c index 01742b4..df13ada 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c @@ -216,6 +216,7 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) MQTTAsync handle; MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; unsigned int wait_count; + MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; if (!broker_h) { nns_edge_loge ("Invalid param, given broker handle is invalid."); @@ -231,8 +232,9 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) options.context = bh; - /* Clear retained message */ - MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL); + /* Clear retained message and wait up to 1 second before removing the message. */ + MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, &ropts); + MQTTAsync_waitForCompletion (handle, ropts.token, 1000U); wait_count = 0U; do { -- 2.34.1