From ac9a1c44a539484415a2c32130e0fde3fd4b5842 Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Fri, 26 Aug 2022 15:31:23 +0900 Subject: [PATCH] [MQTT] replace glib queue Replace g-async-queue to internal queue in MQTT functions. Signed-off-by: Jaeyun --- src/libnnstreamer-edge/nnstreamer-edge-mqtt.c | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c index 09acaeb..db743c7 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c @@ -18,6 +18,7 @@ #include "nnstreamer-edge-internal.h" #include "nnstreamer-edge-log.h" #include "nnstreamer-edge-util.h" +#include "nnstreamer-edge-queue.h" /** * @brief Data structure for mqtt broker handle. @@ -25,7 +26,7 @@ typedef struct { void *mqtt_h; - GAsyncQueue *server_list; + nns_edge_queue_h server_list; char *topic; } nns_edge_broker_s; @@ -62,7 +63,7 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, msg = nns_edge_memdup (message->payload, message->payloadlen); if (msg) - g_async_queue_push (bh->server_list, msg); + nns_edge_queue_push (bh->server_list, msg, nns_edge_free); return TRUE; } @@ -120,7 +121,7 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) bh->topic = nns_edge_strdup (topic); bh->mqtt_h = handle; - bh->server_list = g_async_queue_new (); + nns_edge_queue_create (&bh->server_list); eh->broker_h = bh; MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL); @@ -165,7 +166,6 @@ nns_edge_mqtt_close (nns_edge_h edge_h) nns_edge_broker_s *bh; MQTTAsync handle; MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; - char *msg; unsigned int wait_count; eh = (nns_edge_handle_s *) edge_h; @@ -206,10 +206,7 @@ nns_edge_mqtt_close (nns_edge_h edge_h) MQTTAsync_destroy (&handle); } - while ((msg = g_async_queue_try_pop (bh->server_list))) { - SAFE_FREE (msg); - } - g_async_queue_unref (bh->server_list); + nns_edge_queue_destroy (bh->server_list); bh->server_list = NULL; SAFE_FREE (bh->topic); @@ -365,8 +362,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) bh = (nns_edge_broker_s *) eh->broker_h; /* Wait for 1 second */ - *msg = g_async_queue_timeout_pop (bh->server_list, 1000000U); - if (!*msg) { + if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { nns_edge_loge ("Failed to get message from mqtt broker within timeout."); return NNS_EDGE_ERROR_UNKNOWN; } -- 2.34.1