[MQTT] remove connection cb
authorJaeyun <jy1210.jung@samsung.com>
Fri, 12 Aug 2022 11:11:59 +0000 (20:11 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Tue, 16 Aug 2022 04:39:09 +0000 (13:39 +0900)
Remove callbacks for connection, waiting for connection or disconnection state.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-mqtt.c
tests/unittest_nnstreamer-edge.cc

index bcf0164bcd6cc936076cab22e99b6322f77848f8..5fccb512d465ea474125fa79252683cfbd5c2046 100644 (file)
@@ -13,7 +13,6 @@
 #if !defined(ENABLE_MQTT)
 #error "This file can be built with Paho MQTT library."
 #endif
-#define DEFAULT_SUB_TIMEOUT 1000000 /** 1 second */
 
 #include <unistd.h>
 #include <MQTTAsync.h>
@@ -27,132 +26,9 @@ typedef struct
 {
   void *mqtt_h;
   GAsyncQueue *server_list;
-  GMutex mqtt_mutex;
-  GCond mqtt_gcond;
-  bool mqtt_is_connected;
   char *topic;
 } nns_edge_broker_s;
 
-/**
- * @brief Callback function to be called when the connection is lost.
- */
-static void
-mqtt_cb_connection_lost (void *context, char *cause)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-
-  eh = (nns_edge_handle_s *) context;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-  nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause);
-  g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = false;
-  g_cond_broadcast (&bh->mqtt_gcond);
-  g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the connection is completed.
- */
-static void
-mqtt_cb_connection_success (void *context, MQTTAsync_successData * response)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-
-  UNUSED (response);
-  eh = (nns_edge_handle_s *) context;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-
-  g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = true;
-  g_cond_broadcast (&bh->mqtt_gcond);
-  g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the connection is failed.
- */
-static void
-mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-
-  UNUSED (response);
-  eh = (nns_edge_handle_s *) context;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-
-  nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id);
-  g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = false;
-  g_cond_broadcast (&bh->mqtt_gcond);
-  g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the disconnection is completed.
- */
-static void
-mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response)
-{
-  nns_edge_handle_s *eh;
-  nns_edge_broker_s *bh;
-
-  UNUSED (response);
-  eh = (nns_edge_handle_s *) context;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return;
-  }
-
-  bh = (nns_edge_broker_s *) eh->broker_h;
-
-  nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id);
-  g_mutex_lock (&bh->mqtt_mutex);
-  bh->mqtt_is_connected = false;
-  g_cond_broadcast (&bh->mqtt_gcond);
-  g_mutex_unlock (&bh->mqtt_mutex);
-}
-
-/**
- * @brief Callback function to be called when the disconnection is failed.
- */
-static void
-mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response)
-{
-  nns_edge_handle_s *eh;
-
-  UNUSED (response);
-  eh = (nns_edge_handle_s *) context;
-
-  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
-    nns_edge_loge ("Invalid param, given edge handle is invalid.");
-    return;
-  }
-
-  nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id);
-}
-
 /**
  * @brief Callback function to be called when a message is arrived.
  * @return Return TRUE to prevent delivering the message again.
@@ -203,10 +79,10 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
   nns_edge_broker_s *bh;
   MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
   int ret = NNS_EDGE_ERROR_NONE;
-  int64_t end_time;
   MQTTAsync handle;
   char *url;
   char *client_id;
+  unsigned int wait_count;
 
   if (!STR_IS_VALID (topic)) {
     nns_edge_loge ("Invalid param, given topic is invalid.");
@@ -243,21 +119,15 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
     goto error;
   }
 
-  g_cond_init (&bh->mqtt_gcond);
-  g_mutex_init (&bh->mqtt_mutex);
   bh->topic = nns_edge_strdup (topic);
-  bh->mqtt_is_connected = false;
   bh->mqtt_h = handle;
   bh->server_list = g_async_queue_new ();
   eh->broker_h = bh;
 
-  MQTTAsync_setCallbacks (handle, edge_h,
-      mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL);
+  MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
 
   options.cleansession = 1;
   options.keepAliveInterval = 6;
-  options.onSuccess = mqtt_cb_connection_success;
-  options.onFailure = mqtt_cb_connection_failure;
   options.context = edge_h;
 
   if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
@@ -267,18 +137,17 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
   }
 
   /* Waiting for the connection */
-  end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND;
-  g_mutex_lock (&bh->mqtt_mutex);
-  while (!bh->mqtt_is_connected) {
-    if (!g_cond_wait_until (&bh->mqtt_gcond, &bh->mqtt_mutex, end_time)) {
-      g_mutex_unlock (&bh->mqtt_mutex);
-      nns_edge_loge ("Failed to connect to MQTT broker."
-          "Please check broker is running status or broker host address.");
+  wait_count = 0U;
+  do {
+    if (wait_count > 500U) {
       ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
       goto error;
     }
-  }
-  g_mutex_unlock (&bh->mqtt_mutex);
+
+    usleep (10000);
+    wait_count++;
+  } while (!MQTTAsync_isConnected (handle));
+
   return NNS_EDGE_ERROR_NONE;
 
 error:
@@ -298,6 +167,7 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
   MQTTAsync handle;
   MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
   char *msg;
+  unsigned int wait_count;
 
   eh = (nns_edge_handle_s *) edge_h;
 
@@ -313,27 +183,30 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
     nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
         eh->id, eh->dest_host, eh->dest_port);
 
-    options.onSuccess = mqtt_cb_disconnection_success;
-    options.onFailure = mqtt_cb_disconnection_failure;
     options.context = edge_h;
 
     /* Clear retained message */
     MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
 
-    while (MQTTAsync_isConnected (handle)) {
+    wait_count = 0U;
+    do {
       if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
         nns_edge_loge ("Failed to disconnect MQTT.");
-        return NNS_EDGE_ERROR_IO;
+        break;
+      }
+
+      if (wait_count > 500U) {
+        nns_edge_loge ("Failed to disconnect MQTT, timed out.");
+        break;
       }
-      g_usleep (10000);
-    }
+
+      usleep (10000);
+      wait_count++;
+    } while (MQTTAsync_isConnected (handle));
 
     MQTTAsync_destroy (&handle);
   }
 
-  g_cond_clear (&bh->mqtt_gcond);
-  g_mutex_clear (&bh->mqtt_mutex);
-
   while ((msg = g_async_queue_try_pop (bh->server_list))) {
     SAFE_FREE (msg);
   }
@@ -372,11 +245,12 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
   }
 
   bh = (nns_edge_broker_s *) eh->broker_h;
-  if (!bh->mqtt_h) {
+  handle = bh->mqtt_h;
+
+  if (!handle) {
     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
-  handle = bh->mqtt_h;
 
   if (!MQTTAsync_isConnected (handle)) {
     nns_edge_loge ("Failed to publish message, MQTT is not connected.");
@@ -414,11 +288,12 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h)
   }
 
   bh = (nns_edge_broker_s *) eh->broker_h;
-  if (!bh->mqtt_h) {
+  handle = bh->mqtt_h;
+
+  if (!handle) {
     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
-  handle = bh->mqtt_h;
 
   if (!MQTTAsync_isConnected (handle)) {
     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
@@ -453,11 +328,12 @@ nns_edge_mqtt_is_connected (nns_edge_h edge_h)
   }
 
   bh = (nns_edge_broker_s *) eh->broker_h;
-  if (!bh->mqtt_h) {
+  handle = bh->mqtt_h;
+
+  if (!handle) {
     nns_edge_loge ("Invalid state, MQTT connection was not completed.");
     return false;
   }
-  handle = bh->mqtt_h;
 
   if (MQTTAsync_isConnected (handle)) {
     return true;
@@ -489,7 +365,8 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
 
   bh = (nns_edge_broker_s *) eh->broker_h;
 
-  *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT);
+  /* Wait for 1 second */
+  *msg = g_async_queue_timeout_pop (bh->server_list, 1000000U);
   if (!*msg) {
     nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     return NNS_EDGE_ERROR_UNKNOWN;
index 9cf19f57ad399b70fa68385c53780a44bcd446f6..7014c4ced88e8f751b36c132b88503f5613eb846 100644 (file)
@@ -3100,7 +3100,6 @@ TEST(edgeMqtt, connectInvalidParam2_n)
 {\r
   int ret = -1;\r
   nns_edge_h edge_h;\r
-  char *msg = NULL;\r
 \r
   if (!_check_mqtt_broker ())\r
     return;\r
@@ -3125,7 +3124,6 @@ TEST(edgeMqtt, connectInvalidParam3_n)
 {\r
   int ret = -1;\r
   nns_edge_h edge_h;\r
-  char *msg = NULL;\r
 \r
   if (!_check_mqtt_broker ())\r
     return;\r