From acd33be25b438eb0fae72f7da6fbb6f4fc6c0953 Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Fri, 12 Aug 2022 20:11:59 +0900 Subject: [PATCH] [MQTT] remove connection cb Remove callbacks for connection, waiting for connection or disconnection state. Signed-off-by: Jaeyun --- src/libnnstreamer-edge/nnstreamer-edge-mqtt.c | 191 ++++-------------- tests/unittest_nnstreamer-edge.cc | 2 - 2 files changed, 34 insertions(+), 159 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c index bcf0164..5fccb51 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c @@ -13,7 +13,6 @@ #if !defined(ENABLE_MQTT) #error "This file can be built with Paho MQTT library." #endif -#define DEFAULT_SUB_TIMEOUT 1000000 /** 1 second */ #include #include @@ -27,132 +26,9 @@ typedef struct { void *mqtt_h; GAsyncQueue *server_list; - GMutex mqtt_mutex; - GCond mqtt_gcond; - bool mqtt_is_connected; char *topic; } nns_edge_broker_s; -/** - * @brief Callback function to be called when the connection is lost. - */ -static void -mqtt_cb_connection_lost (void *context, char *cause) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - - eh = (nns_edge_handle_s *) context; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause); - g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = false; - g_cond_broadcast (&bh->mqtt_gcond); - g_mutex_unlock (&bh->mqtt_mutex); -} - -/** - * @brief Callback function to be called when the connection is completed. - */ -static void -mqtt_cb_connection_success (void *context, MQTTAsync_successData * response) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - - UNUSED (response); - eh = (nns_edge_handle_s *) context; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - - g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = true; - g_cond_broadcast (&bh->mqtt_gcond); - g_mutex_unlock (&bh->mqtt_mutex); -} - -/** - * @brief Callback function to be called when the connection is failed. - */ -static void -mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - - UNUSED (response); - eh = (nns_edge_handle_s *) context; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - - nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id); - g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = false; - g_cond_broadcast (&bh->mqtt_gcond); - g_mutex_unlock (&bh->mqtt_mutex); -} - -/** - * @brief Callback function to be called when the disconnection is completed. - */ -static void -mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - - UNUSED (response); - eh = (nns_edge_handle_s *) context; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - - nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id); - g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = false; - g_cond_broadcast (&bh->mqtt_gcond); - g_mutex_unlock (&bh->mqtt_mutex); -} - -/** - * @brief Callback function to be called when the disconnection is failed. - */ -static void -mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response) -{ - nns_edge_handle_s *eh; - - UNUSED (response); - eh = (nns_edge_handle_s *) context; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return; - } - - nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id); -} - /** * @brief Callback function to be called when a message is arrived. * @return Return TRUE to prevent delivering the message again. @@ -203,10 +79,10 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) nns_edge_broker_s *bh; MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; int ret = NNS_EDGE_ERROR_NONE; - int64_t end_time; MQTTAsync handle; char *url; char *client_id; + unsigned int wait_count; if (!STR_IS_VALID (topic)) { nns_edge_loge ("Invalid param, given topic is invalid."); @@ -243,21 +119,15 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) goto error; } - g_cond_init (&bh->mqtt_gcond); - g_mutex_init (&bh->mqtt_mutex); bh->topic = nns_edge_strdup (topic); - bh->mqtt_is_connected = false; bh->mqtt_h = handle; bh->server_list = g_async_queue_new (); eh->broker_h = bh; - MQTTAsync_setCallbacks (handle, edge_h, - mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL); + MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL); options.cleansession = 1; options.keepAliveInterval = 6; - options.onSuccess = mqtt_cb_connection_success; - options.onFailure = mqtt_cb_connection_failure; options.context = edge_h; if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) { @@ -267,18 +137,17 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) } /* Waiting for the connection */ - end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND; - g_mutex_lock (&bh->mqtt_mutex); - while (!bh->mqtt_is_connected) { - if (!g_cond_wait_until (&bh->mqtt_gcond, &bh->mqtt_mutex, end_time)) { - g_mutex_unlock (&bh->mqtt_mutex); - nns_edge_loge ("Failed to connect to MQTT broker." - "Please check broker is running status or broker host address."); + wait_count = 0U; + do { + if (wait_count > 500U) { ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; goto error; } - } - g_mutex_unlock (&bh->mqtt_mutex); + + usleep (10000); + wait_count++; + } while (!MQTTAsync_isConnected (handle)); + return NNS_EDGE_ERROR_NONE; error: @@ -298,6 +167,7 @@ nns_edge_mqtt_close (nns_edge_h edge_h) MQTTAsync handle; MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; char *msg; + unsigned int wait_count; eh = (nns_edge_handle_s *) edge_h; @@ -313,27 +183,30 @@ nns_edge_mqtt_close (nns_edge_h edge_h) nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", eh->id, eh->dest_host, eh->dest_port); - options.onSuccess = mqtt_cb_disconnection_success; - options.onFailure = mqtt_cb_disconnection_failure; options.context = edge_h; /* Clear retained message */ MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL); - while (MQTTAsync_isConnected (handle)) { + wait_count = 0U; + do { if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to disconnect MQTT."); - return NNS_EDGE_ERROR_IO; + break; + } + + if (wait_count > 500U) { + nns_edge_loge ("Failed to disconnect MQTT, timed out."); + break; } - g_usleep (10000); - } + + usleep (10000); + wait_count++; + } while (MQTTAsync_isConnected (handle)); MQTTAsync_destroy (&handle); } - g_cond_clear (&bh->mqtt_gcond); - g_mutex_clear (&bh->mqtt_mutex); - while ((msg = g_async_queue_try_pop (bh->server_list))) { SAFE_FREE (msg); } @@ -372,11 +245,12 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) } bh = (nns_edge_broker_s *) eh->broker_h; - if (!bh->mqtt_h) { + handle = bh->mqtt_h; + + if (!handle) { nns_edge_loge ("Invalid state, MQTT connection was not completed."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - handle = bh->mqtt_h; if (!MQTTAsync_isConnected (handle)) { nns_edge_loge ("Failed to publish message, MQTT is not connected."); @@ -414,11 +288,12 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h) } bh = (nns_edge_broker_s *) eh->broker_h; - if (!bh->mqtt_h) { + handle = bh->mqtt_h; + + if (!handle) { nns_edge_loge ("Invalid state, MQTT connection was not completed."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - handle = bh->mqtt_h; if (!MQTTAsync_isConnected (handle)) { nns_edge_loge ("Invalid state, MQTT connection was not completed."); @@ -453,11 +328,12 @@ nns_edge_mqtt_is_connected (nns_edge_h edge_h) } bh = (nns_edge_broker_s *) eh->broker_h; - if (!bh->mqtt_h) { + handle = bh->mqtt_h; + + if (!handle) { nns_edge_loge ("Invalid state, MQTT connection was not completed."); return false; } - handle = bh->mqtt_h; if (MQTTAsync_isConnected (handle)) { return true; @@ -489,7 +365,8 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) bh = (nns_edge_broker_s *) eh->broker_h; - *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT); + /* Wait for 1 second */ + *msg = g_async_queue_timeout_pop (bh->server_list, 1000000U); if (!*msg) { nns_edge_loge ("Failed to get message from mqtt broker within timeout."); return NNS_EDGE_ERROR_UNKNOWN; diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index 9cf19f5..7014c4c 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -3100,7 +3100,6 @@ TEST(edgeMqtt, connectInvalidParam2_n) { int ret = -1; nns_edge_h edge_h; - char *msg = NULL; if (!_check_mqtt_broker ()) return; @@ -3125,7 +3124,6 @@ TEST(edgeMqtt, connectInvalidParam3_n) { int ret = -1; nns_edge_h edge_h; - char *msg = NULL; if (!_check_mqtt_broker ()) return; -- 2.34.1