/* event callback for new message */
nns_edge_event_cb event_cb;
void *user_data;
+
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ bool cleared;
} nns_edge_broker_s;
/**
bh->connected = true;
bh->event_cb = NULL;
bh->user_data = NULL;
+ bh->cleared = false;
+ nns_edge_lock_init (bh);
+ nns_edge_cond_init (bh);
*broker_h = bh;
return NNS_EDGE_ERROR_NONE;
return ret;
}
+/**
+ * @brief publish callback.
+ * @note This callback is called both if the message is sent successfully or if the broker responded with an error.
+ */
+static void
+_publish_cb (struct mosquitto *mosq, void *obj, int mid)
+{
+ nns_edge_broker_s *bh = NULL;
+
+ bh = (nns_edge_broker_s *) mosquitto_userdata (mosq);
+
+ if (!bh || bh->cleared)
+ return;
+
+ nns_edge_lock (bh);
+ bh->cleared = true;
+ nns_edge_cond_signal (bh);
+ nns_edge_unlock (bh);
+}
+
/**
* @brief Close the connection to MQTT.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
nns_edge_logd ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
bh->id, bh->host, bh->port);
- /* Clear retained message */
+ /* Clear retained message and wait up to 1 second before removing the message. */
+ mosquitto_publish_callback_set (handle, _publish_cb);
mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
+ nns_edge_lock (bh);
+ if (!bh->cleared) {
+ nns_edge_cond_wait_until (bh, 1000U);
+ }
+ bh->cleared = true;
+ nns_edge_cond_destroy (bh);
+ nns_edge_unlock (bh);
+ nns_edge_lock_destroy (bh);
+
mosquitto_disconnect (handle);
mosquitto_destroy (handle);
mosquitto_lib_cleanup ();
MQTTAsync handle;
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
unsigned int wait_count;
+ MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
if (!broker_h) {
nns_edge_loge ("Invalid param, given broker handle is invalid.");
options.context = bh;
- /* Clear retained message */
- MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
+ /* Clear retained message and wait up to 1 second before removing the message. */
+ MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, &ropts);
+ MQTTAsync_waitForCompletion (handle, ropts.token, 1000U);
wait_count = 0U;
do {