#if !defined(ENABLE_MQTT)
#error "This file can be built with Paho MQTT library."
#endif
-#define DEFAULT_SUB_TIMEOUT 1000000 /** 1 second */
#include <unistd.h>
#include <MQTTAsync.h>
{
void *mqtt_h;
GAsyncQueue *server_list;
- GMutex mqtt_mutex;
- GCond mqtt_gcond;
- bool mqtt_is_connected;
char *topic;
} nns_edge_broker_s;
-/**
- * @brief Callback function to be called when the connection is lost.
- */
-static void
-mqtt_cb_connection_lost (void *context, char *cause)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
-
- eh = (nns_edge_handle_s *) context;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
- nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause);
- g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = false;
- g_cond_broadcast (&bh->mqtt_gcond);
- g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the connection is completed.
- */
-static void
-mqtt_cb_connection_success (void *context, MQTTAsync_successData * response)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
-
- UNUSED (response);
- eh = (nns_edge_handle_s *) context;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
-
- g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = true;
- g_cond_broadcast (&bh->mqtt_gcond);
- g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the connection is failed.
- */
-static void
-mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
-
- UNUSED (response);
- eh = (nns_edge_handle_s *) context;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
-
- nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id);
- g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = false;
- g_cond_broadcast (&bh->mqtt_gcond);
- g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the disconnection is completed.
- */
-static void
-mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
-
- UNUSED (response);
- eh = (nns_edge_handle_s *) context;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
-
- nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id);
- g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = false;
- g_cond_broadcast (&bh->mqtt_gcond);
- g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the disconnection is failed.
- */
-static void
-mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response)
-{
- nns_edge_handle_s *eh;
-
- UNUSED (response);
- eh = (nns_edge_handle_s *) context;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return;
- }
-
- nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id);
-}
-
/**
* @brief Callback function to be called when a message is arrived.
* @return Return TRUE to prevent delivering the message again.
nns_edge_broker_s *bh;
MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
int ret = NNS_EDGE_ERROR_NONE;
- int64_t end_time;
MQTTAsync handle;
char *url;
char *client_id;
+ unsigned int wait_count;
if (!STR_IS_VALID (topic)) {
nns_edge_loge ("Invalid param, given topic is invalid.");
goto error;
}
- g_cond_init (&bh->mqtt_gcond);
- g_mutex_init (&bh->mqtt_mutex);
bh->topic = nns_edge_strdup (topic);
- bh->mqtt_is_connected = false;
bh->mqtt_h = handle;
bh->server_list = g_async_queue_new ();
eh->broker_h = bh;
- MQTTAsync_setCallbacks (handle, edge_h,
- mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL);
+ MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
options.cleansession = 1;
options.keepAliveInterval = 6;
- options.onSuccess = mqtt_cb_connection_success;
- options.onFailure = mqtt_cb_connection_failure;
options.context = edge_h;
if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
}
/* Waiting for the connection */
- end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND;
- g_mutex_lock (&bh->mqtt_mutex);
- while (!bh->mqtt_is_connected) {
- if (!g_cond_wait_until (&bh->mqtt_gcond, &bh->mqtt_mutex, end_time)) {
- g_mutex_unlock (&bh->mqtt_mutex);
- nns_edge_loge ("Failed to connect to MQTT broker."
- "Please check broker is running status or broker host address.");
+ wait_count = 0U;
+ do {
+ if (wait_count > 500U) {
ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
goto error;
}
- }
- g_mutex_unlock (&bh->mqtt_mutex);
+
+ usleep (10000);
+ wait_count++;
+ } while (!MQTTAsync_isConnected (handle));
+
return NNS_EDGE_ERROR_NONE;
error:
MQTTAsync handle;
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
char *msg;
+ unsigned int wait_count;
eh = (nns_edge_handle_s *) edge_h;
nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
eh->id, eh->dest_host, eh->dest_port);
- options.onSuccess = mqtt_cb_disconnection_success;
- options.onFailure = mqtt_cb_disconnection_failure;
options.context = edge_h;
/* Clear retained message */
MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
- while (MQTTAsync_isConnected (handle)) {
+ wait_count = 0U;
+ do {
if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to disconnect MQTT.");
- return NNS_EDGE_ERROR_IO;
+ break;
+ }
+
+ if (wait_count > 500U) {
+ nns_edge_loge ("Failed to disconnect MQTT, timed out.");
+ break;
}
- g_usleep (10000);
- }
+
+ usleep (10000);
+ wait_count++;
+ } while (MQTTAsync_isConnected (handle));
MQTTAsync_destroy (&handle);
}
- g_cond_clear (&bh->mqtt_gcond);
- g_mutex_clear (&bh->mqtt_mutex);
-
while ((msg = g_async_queue_try_pop (bh->server_list))) {
SAFE_FREE (msg);
}
}
bh = (nns_edge_broker_s *) eh->broker_h;
- if (!bh->mqtt_h) {
+ handle = bh->mqtt_h;
+
+ if (!handle) {
nns_edge_loge ("Invalid state, MQTT connection was not completed.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- handle = bh->mqtt_h;
if (!MQTTAsync_isConnected (handle)) {
nns_edge_loge ("Failed to publish message, MQTT is not connected.");
}
bh = (nns_edge_broker_s *) eh->broker_h;
- if (!bh->mqtt_h) {
+ handle = bh->mqtt_h;
+
+ if (!handle) {
nns_edge_loge ("Invalid state, MQTT connection was not completed.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- handle = bh->mqtt_h;
if (!MQTTAsync_isConnected (handle)) {
nns_edge_loge ("Invalid state, MQTT connection was not completed.");
}
bh = (nns_edge_broker_s *) eh->broker_h;
- if (!bh->mqtt_h) {
+ handle = bh->mqtt_h;
+
+ if (!handle) {
nns_edge_loge ("Invalid state, MQTT connection was not completed.");
return false;
}
- handle = bh->mqtt_h;
if (MQTTAsync_isConnected (handle)) {
return true;
bh = (nns_edge_broker_s *) eh->broker_h;
- *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT);
+ /* Wait for 1 second */
+ *msg = g_async_queue_timeout_pop (bh->server_list, 1000000U);
if (!*msg) {
nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
return NNS_EDGE_ERROR_UNKNOWN;