[MQTT] replace glib queue
authorJaeyun <jy1210.jung@samsung.com>
Fri, 26 Aug 2022 06:31:23 +0000 (15:31 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 31 Aug 2022 02:23:33 +0000 (11:23 +0900)
Replace g-async-queue to internal queue in MQTT functions.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-mqtt.c

index 09acaebe31ff7803b4728a0e2ce949842c4600c2..db743c7ad3b8f7651ff58e5257ec7eddad6ec787 100644 (file)
@@ -18,6 +18,7 @@
 #include "nnstreamer-edge-internal.h"
 #include "nnstreamer-edge-log.h"
 #include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-queue.h"
 
 /**
  * @brief Data structure for mqtt broker handle.
@@ -25,7 +26,7 @@
 typedef struct
 {
   void *mqtt_h;
-  GAsyncQueue *server_list;
+  nns_edge_queue_h server_list;
   char *topic;
 } nns_edge_broker_s;
 
@@ -62,7 +63,7 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
 
   msg = nns_edge_memdup (message->payload, message->payloadlen);
   if (msg)
-    g_async_queue_push (bh->server_list, msg);
+    nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
 
   return TRUE;
 }
@@ -120,7 +121,7 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
 
   bh->topic = nns_edge_strdup (topic);
   bh->mqtt_h = handle;
-  bh->server_list = g_async_queue_new ();
+  nns_edge_queue_create (&bh->server_list);
   eh->broker_h = bh;
 
   MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
@@ -165,7 +166,6 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
   nns_edge_broker_s *bh;
   MQTTAsync handle;
   MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
-  char *msg;
   unsigned int wait_count;
 
   eh = (nns_edge_handle_s *) edge_h;
@@ -206,10 +206,7 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
     MQTTAsync_destroy (&handle);
   }
 
-  while ((msg = g_async_queue_try_pop (bh->server_list))) {
-    SAFE_FREE (msg);
-  }
-  g_async_queue_unref (bh->server_list);
+  nns_edge_queue_destroy (bh->server_list);
   bh->server_list = NULL;
 
   SAFE_FREE (bh->topic);
@@ -365,8 +362,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
   bh = (nns_edge_broker_s *) eh->broker_h;
 
   /* Wait for 1 second */
-  *msg = g_async_queue_timeout_pop (bh->server_list, 1000000U);
-  if (!*msg) {
+  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
     nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     return NNS_EDGE_ERROR_UNKNOWN;
   }