#include "nnstreamer-edge-internal.h"
#include "nnstreamer-edge-log.h"
#include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-queue.h"
/**
* @brief Data structure for mqtt broker handle.
typedef struct
{
void *mqtt_h;
- GAsyncQueue *server_list;
+ nns_edge_queue_h server_list;
char *topic;
} nns_edge_broker_s;
msg = nns_edge_memdup (message->payload, message->payloadlen);
if (msg)
- g_async_queue_push (bh->server_list, msg);
+ nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
return TRUE;
}
bh->topic = nns_edge_strdup (topic);
bh->mqtt_h = handle;
- bh->server_list = g_async_queue_new ();
+ nns_edge_queue_create (&bh->server_list);
eh->broker_h = bh;
MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
nns_edge_broker_s *bh;
MQTTAsync handle;
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
- char *msg;
unsigned int wait_count;
eh = (nns_edge_handle_s *) edge_h;
MQTTAsync_destroy (&handle);
}
- while ((msg = g_async_queue_try_pop (bh->server_list))) {
- SAFE_FREE (msg);
- }
- g_async_queue_unref (bh->server_list);
+ nns_edge_queue_destroy (bh->server_list);
bh->server_list = NULL;
SAFE_FREE (bh->topic);
bh = (nns_edge_broker_s *) eh->broker_h;
/* Wait for 1 second */
- *msg = g_async_queue_timeout_pop (bh->server_list, 1000000U);
- if (!*msg) {
+ if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
return NNS_EDGE_ERROR_UNKNOWN;
}