From 324d58fb4834da540abafa57f81e3d7ba9eda308 Mon Sep 17 00:00:00 2001 From: Jaeyun Jung Date: Thu, 7 Sep 2023 16:10:27 +0900 Subject: [PATCH] [MQTT] function to clear retained msg Add function to clear retained message, and set wait-time interval. Signed-off-by: Jaeyun Jung --- .../nnstreamer-edge-mqtt-mosquitto.c | 49 +++++++++++++------ .../nnstreamer-edge-mqtt-paho.c | 21 ++++---- 2 files changed, 45 insertions(+), 25 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c index 90a765b..85f71a9 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -222,11 +222,11 @@ nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, } /** - * @brief publish callback. + * @brief Publish callback for clearing retained message. * @note This callback is called both if the message is sent successfully or if the broker responded with an error. */ static void -_publish_cb (struct mosquitto *mosq, void *obj, int mid) +_clear_retained_cb (struct mosquitto *mosq, void *obj, int mid) { nns_edge_broker_s *bh = NULL; @@ -241,6 +241,36 @@ _publish_cb (struct mosquitto *mosq, void *obj, int mid) nns_edge_unlock (bh); } +/** + * @brief Clear retained message. + */ +static void +_nns_edge_clear_retained (nns_edge_broker_s * bh) +{ + struct mosquitto *handle; + unsigned int wait = 0U; + + if (!bh) + return; + + handle = bh->mqtt_h; + if (handle) { + nns_edge_lock (bh); + bh->cleared = false; + + mosquitto_publish_callback_set (handle, _clear_retained_cb); + mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true); + + /* Wait up to 10 seconds. */ + while (!bh->cleared && ++wait < 1000U) + nns_edge_cond_wait_until (bh, 10); + + mosquitto_publish_callback_set (handle, NULL); + bh->cleared = true; + nns_edge_unlock (bh); + } +} + /** * @brief Close the connection to MQTT. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. @@ -263,18 +293,7 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", bh->id, bh->host, bh->port); - /* Clear retained message and wait up to 1 second before removing the message. */ - mosquitto_publish_callback_set (handle, _publish_cb); - mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true); - - nns_edge_lock (bh); - if (!bh->cleared) { - nns_edge_cond_wait_until (bh, 1000U); - } - bh->cleared = true; - nns_edge_cond_destroy (bh); - nns_edge_unlock (bh); - nns_edge_lock_destroy (bh); + _nns_edge_clear_retained (bh); mosquitto_disconnect (handle); mosquitto_destroy (handle); @@ -283,6 +302,8 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) nns_edge_queue_destroy (bh->message_queue); bh->message_queue = NULL; + nns_edge_lock_destroy (bh); + nns_edge_cond_destroy (bh); SAFE_FREE (bh->id); SAFE_FREE (bh->topic); SAFE_FREE (bh->host); diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c index df13ada..424b283 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c @@ -214,9 +214,9 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) { nns_edge_broker_s *bh; MQTTAsync handle; - MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; - unsigned int wait_count; + MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; + unsigned int wait_count; if (!broker_h) { nns_edge_loge ("Invalid param, given broker handle is invalid."); @@ -230,26 +230,25 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", bh->id, bh->host, bh->port); - options.context = bh; - - /* Clear retained message and wait up to 1 second before removing the message. */ + /* Clear retained message and wait up to 10 seconds before removing the message. */ MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, &ropts); - MQTTAsync_waitForCompletion (handle, ropts.token, 1000U); + MQTTAsync_waitForCompletion (handle, ropts.token, 10000U); + + /* Wait for message transfer, 10 milliseconds. */ + dopts.timeout = 10; + dopts.context = bh; wait_count = 0U; do { - if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { + if (MQTTAsync_disconnect (handle, &dopts) != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to disconnect MQTT."); break; } - if (wait_count > 500U) { + if (++wait_count > 500U) { nns_edge_loge ("Failed to disconnect MQTT, timed out."); break; } - - usleep (10000); - wait_count++; } while (MQTTAsync_isConnected (handle)); MQTTAsync_destroy (&handle); -- 2.34.1