--- /dev/null
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file nnstreamer-edge-queue.c
+ * @date 24 August 2022
+ * @brief Thread-safe queue.
+ * @see https://github.com/nnstreamer/nnstreamer-edge
+ * @author Jaeyun Jung <jy1210.jung@samsung.com>
+ */
+
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-queue.h"
+#include "nnstreamer-edge-util.h"
+
+/**
+ * @brief Internal structure for queue data.
+ */
+typedef struct _nns_edge_queue_data_s nns_edge_queue_data_s;
+
+/**
+ * @brief Internal structure for queue data.
+ */
+struct _nns_edge_queue_data_s
+{
+ void *data;
+ nns_edge_queue_data_destroy_cb destroy;
+ nns_edge_queue_data_s *next;
+};
+
+/**
+ * @brief Internal structure for queue.
+ */
+typedef struct
+{
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+
+ unsigned int length;
+ nns_edge_queue_data_s *head;
+ nns_edge_queue_data_s *tail;
+} nns_edge_queue_s;
+
+/**
+ * @brief Pop data from queue.
+ * @note This function should be called with lock.
+ */
+static void *
+_pop_data (nns_edge_queue_s * q)
+{
+ nns_edge_queue_data_s *qdata;
+ void *data = NULL;
+
+ qdata = q->head;
+ if (qdata) {
+ data = qdata->data;
+
+ q->head = qdata->next;
+ if ((--q->length) == 0U)
+ q->head = q->tail = NULL;
+
+ free (qdata);
+ }
+
+ return data;
+}
+
+/**
+ * @brief Create queue.
+ */
+bool
+nns_edge_queue_create (nns_edge_queue_h * handle)
+{
+ nns_edge_queue_s *q;
+
+ if (!handle) {
+ nns_edge_loge ("[Queue] Invalid param, handle is null.");
+ return false;
+ }
+
+ q = calloc (1, sizeof (nns_edge_queue_s));
+ if (!q) {
+ nns_edge_loge ("[Queue] Failed to allocate new memory.");
+ return false;
+ }
+
+ nns_edge_lock_init (q);
+ nns_edge_cond_init (q);
+
+ *handle = q;
+ return true;
+}
+
+/**
+ * @brief Destroy queue.
+ */
+bool
+nns_edge_queue_destroy (nns_edge_queue_h handle)
+{
+ nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+ nns_edge_queue_data_s *qdata;
+
+ if (!q) {
+ nns_edge_loge ("[Queue] Invalid param, queue is null.");
+ return false;
+ }
+
+ nns_edge_lock (q);
+ nns_edge_cond_signal (q);
+ while ((qdata = q->head) != NULL) {
+ if (qdata->destroy)
+ qdata->destroy (qdata->data);
+
+ q->head = qdata->next;
+ free (qdata);
+ }
+
+ q->head = q->tail = NULL;
+ q->length = 0U;
+ nns_edge_unlock (q);
+
+ nns_edge_cond_destroy (q);
+ nns_edge_lock_destroy (q);
+ free (q);
+
+ return true;
+}
+
+/**
+ * @brief Get the length of the queue.
+ */
+unsigned int
+nns_edge_queue_get_length (nns_edge_queue_h handle)
+{
+ nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+ unsigned int len;
+
+ if (!q) {
+ nns_edge_loge ("[Queue] Invalid param, queue is null.");
+ return 0;
+ }
+
+ nns_edge_lock (q);
+ len = q->length;
+ nns_edge_unlock (q);
+
+ return len;
+}
+
+/**
+ * @brief Add new data into queue.
+ */
+bool
+nns_edge_queue_push (nns_edge_queue_h handle, void *data,
+ nns_edge_queue_data_destroy_cb destroy)
+{
+ nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+ nns_edge_queue_data_s *qdata;
+
+ if (!q) {
+ nns_edge_loge ("[Queue] Invalid param, queue is null.");
+ return false;
+ }
+
+ if (!data) {
+ nns_edge_loge ("[Queue] Invalid param, data is null.");
+ return false;
+ }
+
+ qdata = calloc (1, sizeof (nns_edge_queue_data_s));
+ if (!qdata) {
+ nns_edge_loge ("[Queue] Failed to allocate new memory for data.");
+ return false;
+ }
+
+ qdata->data = data;
+ qdata->destroy = destroy;
+
+ nns_edge_lock (q);
+ if (!q->head)
+ q->head = qdata;
+ if (q->tail)
+ q->tail->next = qdata;
+ q->tail = qdata;
+ q->length++;
+ nns_edge_cond_signal (q);
+ nns_edge_unlock (q);
+
+ return true;
+}
+
+/**
+ * @brief Remove and return the first data in queue.
+ */
+bool
+nns_edge_queue_pop (nns_edge_queue_h handle, void **data)
+{
+ nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+
+ if (!q) {
+ nns_edge_loge ("[Queue] Invalid param, queue is null.");
+ return false;
+ }
+
+ if (!data) {
+ nns_edge_loge ("[Queue] Invalid param, data is null.");
+ return false;
+ }
+
+ nns_edge_lock (q);
+ *data = _pop_data (q);
+ nns_edge_unlock (q);
+
+ return (*data != NULL);
+}
+
+/**
+ * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue.
+ */
+bool
+nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
+ void **data)
+{
+ nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+
+ if (!q) {
+ nns_edge_loge ("[Queue] Invalid param, queue is null.");
+ return false;
+ }
+
+ if (!data) {
+ nns_edge_loge ("[Queue] Invalid param, data is null.");
+ return false;
+ }
+
+ nns_edge_lock (q);
+ if (q->length == 0U) {
+ if (timeout > 0) {
+ struct timespec ts;
+ struct timeval now;
+
+ gettimeofday (&now, NULL);
+
+ ts.tv_sec = now.tv_sec + timeout / 1000;
+ ts.tv_nsec = now.tv_usec * 1000 + (timeout % 1000) * 1000000;
+
+ nns_edge_cond_timedwait (q, &ts);
+ } else {
+ nns_edge_cond_wait (q);
+ }
+ }
+
+ *data = _pop_data (q);
+ nns_edge_unlock (q);
+
+ return (*data != NULL);
+}
--- /dev/null
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file nnstreamer-edge-queue.h
+ * @date 24 August 2022
+ * @brief Thread-safe queue.
+ * @see https://github.com/nnstreamer/nnstreamer-edge
+ * @author Jaeyun Jung <jy1210.jung@samsung.com>
+ * @note This file is internal header for nnstreamer-edge. DO NOT export this file.
+ */
+
+#ifndef __NNSTREAMER_EDGE_QUEUE_H__
+#define __NNSTREAMER_EDGE_QUEUE_H__
+
+#include <stdbool.h>
+#include <sys/time.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+typedef void *nns_edge_queue_h;
+
+/**
+ * @brief Callback called when data in queue is released.
+ */
+typedef void (*nns_edge_queue_data_destroy_cb) (void *data);
+
+/**
+ * @brief Create queue.
+ * @param[out] handle Newly created handle.
+ * @return true on success.
+ */
+bool nns_edge_queue_create (nns_edge_queue_h *handle);
+
+/**
+ * @brief Destroy queue.
+ * @param[in] handle The queue handle.
+ * @return true on success.
+ */
+bool nns_edge_queue_destroy (nns_edge_queue_h handle);
+
+/**
+ * @brief Get the length of the queue.
+ * @param[in] handle The queue handle.
+ * @return The number of data in the queue.
+ */
+unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle);
+
+/**
+ * @brief Add new data into queue.
+ * @param[in] handle The queue handle.
+ * @param[in] data The data to be added.
+ * @param[in] destroy Nullable, the callback function to release data.
+ * @return true on success.
+ */
+bool nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_edge_queue_data_destroy_cb destroy);
+
+/**
+ * @brief Remove and return the first data in queue.
+ * @param[in] handle The queue handle.
+ * @param[out] data The data in the queue.
+ * @return true on success. false if queue is empty.
+ */
+bool nns_edge_queue_pop (nns_edge_queue_h handle, void **data);
+
+/**
+ * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue.
+ * @param[in] handle The queue handle.
+ * @param[in] timeout The time to wait for new data, in milliseconds. (0 for infinite timeout)
+ * @param[out] data The data in the queue.
+ * @return true on success.
+ */
+bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, void **data);
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+#endif /* __NNSTREAMER_EDGE_QUEUE_H__ */
#include "nnstreamer-edge-internal.h"\r
#include "nnstreamer-edge-log.h"\r
#include "nnstreamer-edge-util.h"\r
+#include "nnstreamer-edge-queue.h"\r
\r
/**\r
* @brief Data struct for unittest.\r
free (data);\r
}\r
\r
+/**\r
+ * @brief Thread to push new data into queue.\r
+ */\r
+static void *\r
+_test_thread_edge_queue_push (void *thread_data)\r
+{\r
+ nns_edge_queue_h queue_h = thread_data;\r
+ unsigned int i, j;\r
+ void *data;\r
+\r
+ for (i = 0; i < 6U; i++) {\r
+ usleep (50000);\r
+\r
+ data = malloc (5 * sizeof (unsigned int));\r
+ if (data) {\r
+ for (j = 0; j < 5U; j++)\r
+ ((unsigned int *) data)[j] = i * 10U + j;\r
+ }\r
+\r
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data, nns_edge_free));\r
+ }\r
+\r
+ return NULL;\r
+}\r
+\r
+/**\r
+ * @brief Push and pop data.\r
+ */\r
+TEST(edgeQueue, pushData)\r
+{\r
+ nns_edge_queue_h queue_h;\r
+ void *data1, *data2, *data3, *result;\r
+ unsigned int i, len;\r
+\r
+ data1 = malloc (5 * sizeof (unsigned int));\r
+ ASSERT_TRUE (data1 != NULL);\r
+ for (i = 0; i < 5U; i++)\r
+ ((unsigned int *) data1)[i] = i + 10U;\r
+\r
+ data2 = malloc (5 * sizeof (unsigned int));\r
+ ASSERT_TRUE (data1 != NULL);\r
+ for (i = 0; i < 5U; i++)\r
+ ((unsigned int *) data2)[i] = i + 20U;\r
+\r
+ data3 = malloc (5 * sizeof (unsigned int));\r
+ ASSERT_TRUE (data1 != NULL);\r
+ for (i = 0; i < 5U; i++)\r
+ ((unsigned int *) data3)[i] = i + 30U;\r
+\r
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, NULL));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 1U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, NULL));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 2U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, NULL));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 3U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 2U);\r
+ EXPECT_EQ (result, data1);\r
+ for (i = 0; i < 5U; i++)\r
+ EXPECT_EQ (((unsigned int *) result)[i], i + 10U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 1U);\r
+ EXPECT_EQ (result, data2);\r
+ for (i = 0; i < 5U; i++)\r
+ EXPECT_EQ (((unsigned int *) result)[i], i + 20U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 0U);\r
+ EXPECT_EQ (result, data3);\r
+ for (i = 0; i < 5U; i++)\r
+ EXPECT_EQ (((unsigned int *) result)[i], i + 30U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, nns_edge_free));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 1U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, nns_edge_free));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 2U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, nns_edge_free));\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ EXPECT_EQ (len, 3U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Wait for pushing data.\r
+ */\r
+TEST(edgeQueue, pushDataOnThread)\r
+{\r
+ nns_edge_queue_h queue_h;\r
+ pthread_t push_thread;\r
+ pthread_attr_t attr;\r
+ unsigned int i, j, len, retry;\r
+\r
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+ pthread_attr_init (&attr);\r
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);\r
+ pthread_create (&push_thread, &attr, _test_thread_edge_queue_push, queue_h);\r
+ pthread_attr_destroy (&attr);\r
+\r
+ for (i = 0; i < 3U; i++) {\r
+ void *result = NULL;\r
+\r
+ EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result));\r
+\r
+ for (j = 0; j < 5U; j++)\r
+ EXPECT_EQ (((unsigned int *) result)[j], i * 10U + j);\r
+\r
+ free (result);\r
+ }\r
+\r
+ retry = 0U;\r
+ do {\r
+ usleep (20000);\r
+ len = nns_edge_queue_get_length (queue_h);\r
+ } while (len < 3U && retry++ < 200U);\r
+\r
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Create queue - invalid param.\r
+ */\r
+TEST(edgeQueue, createInvalidParam01_n)\r
+{\r
+ EXPECT_FALSE (nns_edge_queue_create (NULL));\r
+}\r
+\r
+/**\r
+ * @brief Destroy queue - invalid param.\r
+ */\r
+TEST(edgeQueue, destroyInvalidParam01_n)\r
+{\r
+ EXPECT_FALSE (nns_edge_queue_destroy (NULL));\r
+}\r
+\r
+/**\r
+ * @brief Get length of queue - invalid param.\r
+ */\r
+TEST(edgeQueue, getLengthInvalidParam01_n)\r
+{\r
+ unsigned int len;\r
+\r
+ len = nns_edge_queue_get_length (NULL);\r
+ EXPECT_EQ (len, 0U);\r
+}\r
+\r
+/**\r
+ * @brief Push data into queue - invalid param.\r
+ */\r
+TEST(edgeQueue, pushInvalidParam01_n)\r
+{\r
+ void *data;\r
+ unsigned int i;\r
+\r
+ data = malloc (5 * sizeof (unsigned int));\r
+ ASSERT_TRUE (data != NULL);\r
+\r
+ EXPECT_FALSE (nns_edge_queue_push (NULL, data, NULL));\r
+\r
+ free (data);\r
+}\r
+\r
+/**\r
+ * @brief Push data into queue - invalid param.\r
+ */\r
+TEST(edgeQueue, pushInvalidParam02_n)\r
+{\r
+ nns_edge_queue_h queue_h;\r
+\r
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+ EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, NULL));\r
+\r
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, popInvalidParam01_n)\r
+{\r
+ void *data;\r
+\r
+ EXPECT_FALSE (nns_edge_queue_pop (NULL, &data));\r
+}\r
+\r
+/**\r
+ * @brief Pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, popInvalidParam02_n)\r
+{\r
+ nns_edge_queue_h queue_h;\r
+\r
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+ EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL));\r
+\r
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Wait and pop data from queue, timed out.\r
+ */\r
+TEST(edgeQueue, waitPopTimedout)\r
+{\r
+ nns_edge_queue_h queue_h;\r
+ void *data;\r
+\r
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+ EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data));\r
+\r
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Wait and pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, waitPopInvalidParam01_n)\r
+{\r
+ void *data;\r
+\r
+ EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data));\r
+}\r
+\r
+/**\r
+ * @brief Wait and pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, waitPopInvalidParam02_n)\r
+{\r
+ nns_edge_queue_h queue_h;\r
+\r
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+ EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL));\r
+\r
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
#if defined(ENABLE_MQTT)\r
/**\r
* @brief Edge event callback for test.\r