}
/**
- * @brief publish callback.
+ * @brief Publish callback for clearing retained message.
* @note This callback is called both if the message is sent successfully or if the broker responded with an error.
*/
static void
-_publish_cb (struct mosquitto *mosq, void *obj, int mid)
+_clear_retained_cb (struct mosquitto *mosq, void *obj, int mid)
{
nns_edge_broker_s *bh = NULL;
nns_edge_unlock (bh);
}
+/**
+ * @brief Clear retained message.
+ */
+static void
+_nns_edge_clear_retained (nns_edge_broker_s * bh)
+{
+ struct mosquitto *handle;
+ unsigned int wait = 0U;
+
+ if (!bh)
+ return;
+
+ handle = bh->mqtt_h;
+ if (handle) {
+ nns_edge_lock (bh);
+ bh->cleared = false;
+
+ mosquitto_publish_callback_set (handle, _clear_retained_cb);
+ mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
+
+ /* Wait up to 10 seconds. */
+ while (!bh->cleared && ++wait < 1000U)
+ nns_edge_cond_wait_until (bh, 10);
+
+ mosquitto_publish_callback_set (handle, NULL);
+ bh->cleared = true;
+ nns_edge_unlock (bh);
+ }
+}
+
/**
* @brief Close the connection to MQTT.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
bh->id, bh->host, bh->port);
- /* Clear retained message and wait up to 1 second before removing the message. */
- mosquitto_publish_callback_set (handle, _publish_cb);
- mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
-
- nns_edge_lock (bh);
- if (!bh->cleared) {
- nns_edge_cond_wait_until (bh, 1000U);
- }
- bh->cleared = true;
- nns_edge_cond_destroy (bh);
- nns_edge_unlock (bh);
- nns_edge_lock_destroy (bh);
+ _nns_edge_clear_retained (bh);
mosquitto_disconnect (handle);
mosquitto_destroy (handle);
nns_edge_queue_destroy (bh->message_queue);
bh->message_queue = NULL;
+ nns_edge_lock_destroy (bh);
+ nns_edge_cond_destroy (bh);
SAFE_FREE (bh->id);
SAFE_FREE (bh->topic);
SAFE_FREE (bh->host);
{
nns_edge_broker_s *bh;
MQTTAsync handle;
- MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
- unsigned int wait_count;
+ MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
+ unsigned int wait_count;
if (!broker_h) {
nns_edge_loge ("Invalid param, given broker handle is invalid.");
nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
bh->id, bh->host, bh->port);
- options.context = bh;
-
- /* Clear retained message and wait up to 1 second before removing the message. */
+ /* Clear retained message and wait up to 10 seconds before removing the message. */
MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, &ropts);
- MQTTAsync_waitForCompletion (handle, ropts.token, 1000U);
+ MQTTAsync_waitForCompletion (handle, ropts.token, 10000U);
+
+ /* Wait for message transfer, 10 milliseconds. */
+ dopts.timeout = 10;
+ dopts.context = bh;
wait_count = 0U;
do {
- if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
+ if (MQTTAsync_disconnect (handle, &dopts) != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to disconnect MQTT.");
break;
}
- if (wait_count > 500U) {
+ if (++wait_count > 500U) {
nns_edge_loge ("Failed to disconnect MQTT, timed out.");
break;
}
-
- usleep (10000);
- wait_count++;
} while (MQTTAsync_isConnected (handle));
MQTTAsync_destroy (&handle);