From b91121c8ca477fe789bd626e2c209c11852ad88a Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Wed, 24 Aug 2022 18:46:45 +0900 Subject: [PATCH] [Queue] base code for queue Add base code for queue in nns-edge. This will be used on TCP connection - thread to send data. Signed-off-by: Jaeyun --- src/CMakeLists.txt | 1 + .../nnstreamer-edge-queue.c | 257 ++++++++++++++++++ .../nnstreamer-edge-queue.h | 80 ++++++ src/libnnstreamer-edge/nnstreamer-edge-util.h | 6 + tests/unittest_nnstreamer-edge.cc | 257 ++++++++++++++++++ 5 files changed, 601 insertions(+) create mode 100644 src/libnnstreamer-edge/nnstreamer-edge-queue.c create mode 100644 src/libnnstreamer-edge/nnstreamer-edge-queue.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4626f2d..54ac18f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,6 +4,7 @@ SET(NNS_EDGE_SRCS ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-common.c ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-internal.c ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-util.c + ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-queue.c ) IF(ENABLE_PAHO_MQTT) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.c b/src/libnnstreamer-edge/nnstreamer-edge-queue.c new file mode 100644 index 0000000..7e56da9 --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.c @@ -0,0 +1,257 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-queue.c + * @date 24 August 2022 + * @brief Thread-safe queue. + * @see https://github.com/nnstreamer/nnstreamer-edge + * @author Jaeyun Jung + */ + +#include "nnstreamer-edge-log.h" +#include "nnstreamer-edge-queue.h" +#include "nnstreamer-edge-util.h" + +/** + * @brief Internal structure for queue data. + */ +typedef struct _nns_edge_queue_data_s nns_edge_queue_data_s; + +/** + * @brief Internal structure for queue data. + */ +struct _nns_edge_queue_data_s +{ + void *data; + nns_edge_queue_data_destroy_cb destroy; + nns_edge_queue_data_s *next; +}; + +/** + * @brief Internal structure for queue. + */ +typedef struct +{ + pthread_mutex_t lock; + pthread_cond_t cond; + + unsigned int length; + nns_edge_queue_data_s *head; + nns_edge_queue_data_s *tail; +} nns_edge_queue_s; + +/** + * @brief Pop data from queue. + * @note This function should be called with lock. + */ +static void * +_pop_data (nns_edge_queue_s * q) +{ + nns_edge_queue_data_s *qdata; + void *data = NULL; + + qdata = q->head; + if (qdata) { + data = qdata->data; + + q->head = qdata->next; + if ((--q->length) == 0U) + q->head = q->tail = NULL; + + free (qdata); + } + + return data; +} + +/** + * @brief Create queue. + */ +bool +nns_edge_queue_create (nns_edge_queue_h * handle) +{ + nns_edge_queue_s *q; + + if (!handle) { + nns_edge_loge ("[Queue] Invalid param, handle is null."); + return false; + } + + q = calloc (1, sizeof (nns_edge_queue_s)); + if (!q) { + nns_edge_loge ("[Queue] Failed to allocate new memory."); + return false; + } + + nns_edge_lock_init (q); + nns_edge_cond_init (q); + + *handle = q; + return true; +} + +/** + * @brief Destroy queue. + */ +bool +nns_edge_queue_destroy (nns_edge_queue_h handle) +{ + nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + nns_edge_queue_data_s *qdata; + + if (!q) { + nns_edge_loge ("[Queue] Invalid param, queue is null."); + return false; + } + + nns_edge_lock (q); + nns_edge_cond_signal (q); + while ((qdata = q->head) != NULL) { + if (qdata->destroy) + qdata->destroy (qdata->data); + + q->head = qdata->next; + free (qdata); + } + + q->head = q->tail = NULL; + q->length = 0U; + nns_edge_unlock (q); + + nns_edge_cond_destroy (q); + nns_edge_lock_destroy (q); + free (q); + + return true; +} + +/** + * @brief Get the length of the queue. + */ +unsigned int +nns_edge_queue_get_length (nns_edge_queue_h handle) +{ + nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + unsigned int len; + + if (!q) { + nns_edge_loge ("[Queue] Invalid param, queue is null."); + return 0; + } + + nns_edge_lock (q); + len = q->length; + nns_edge_unlock (q); + + return len; +} + +/** + * @brief Add new data into queue. + */ +bool +nns_edge_queue_push (nns_edge_queue_h handle, void *data, + nns_edge_queue_data_destroy_cb destroy) +{ + nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + nns_edge_queue_data_s *qdata; + + if (!q) { + nns_edge_loge ("[Queue] Invalid param, queue is null."); + return false; + } + + if (!data) { + nns_edge_loge ("[Queue] Invalid param, data is null."); + return false; + } + + qdata = calloc (1, sizeof (nns_edge_queue_data_s)); + if (!qdata) { + nns_edge_loge ("[Queue] Failed to allocate new memory for data."); + return false; + } + + qdata->data = data; + qdata->destroy = destroy; + + nns_edge_lock (q); + if (!q->head) + q->head = qdata; + if (q->tail) + q->tail->next = qdata; + q->tail = qdata; + q->length++; + nns_edge_cond_signal (q); + nns_edge_unlock (q); + + return true; +} + +/** + * @brief Remove and return the first data in queue. + */ +bool +nns_edge_queue_pop (nns_edge_queue_h handle, void **data) +{ + nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + + if (!q) { + nns_edge_loge ("[Queue] Invalid param, queue is null."); + return false; + } + + if (!data) { + nns_edge_loge ("[Queue] Invalid param, data is null."); + return false; + } + + nns_edge_lock (q); + *data = _pop_data (q); + nns_edge_unlock (q); + + return (*data != NULL); +} + +/** + * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue. + */ +bool +nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, + void **data) +{ + nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + + if (!q) { + nns_edge_loge ("[Queue] Invalid param, queue is null."); + return false; + } + + if (!data) { + nns_edge_loge ("[Queue] Invalid param, data is null."); + return false; + } + + nns_edge_lock (q); + if (q->length == 0U) { + if (timeout > 0) { + struct timespec ts; + struct timeval now; + + gettimeofday (&now, NULL); + + ts.tv_sec = now.tv_sec + timeout / 1000; + ts.tv_nsec = now.tv_usec * 1000 + (timeout % 1000) * 1000000; + + nns_edge_cond_timedwait (q, &ts); + } else { + nns_edge_cond_wait (q); + } + } + + *data = _pop_data (q); + nns_edge_unlock (q); + + return (*data != NULL); +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.h b/src/libnnstreamer-edge/nnstreamer-edge-queue.h new file mode 100644 index 0000000..4f7edd3 --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.h @@ -0,0 +1,80 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-queue.h + * @date 24 August 2022 + * @brief Thread-safe queue. + * @see https://github.com/nnstreamer/nnstreamer-edge + * @author Jaeyun Jung + * @note This file is internal header for nnstreamer-edge. DO NOT export this file. + */ + +#ifndef __NNSTREAMER_EDGE_QUEUE_H__ +#define __NNSTREAMER_EDGE_QUEUE_H__ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +typedef void *nns_edge_queue_h; + +/** + * @brief Callback called when data in queue is released. + */ +typedef void (*nns_edge_queue_data_destroy_cb) (void *data); + +/** + * @brief Create queue. + * @param[out] handle Newly created handle. + * @return true on success. + */ +bool nns_edge_queue_create (nns_edge_queue_h *handle); + +/** + * @brief Destroy queue. + * @param[in] handle The queue handle. + * @return true on success. + */ +bool nns_edge_queue_destroy (nns_edge_queue_h handle); + +/** + * @brief Get the length of the queue. + * @param[in] handle The queue handle. + * @return The number of data in the queue. + */ +unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle); + +/** + * @brief Add new data into queue. + * @param[in] handle The queue handle. + * @param[in] data The data to be added. + * @param[in] destroy Nullable, the callback function to release data. + * @return true on success. + */ +bool nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_edge_queue_data_destroy_cb destroy); + +/** + * @brief Remove and return the first data in queue. + * @param[in] handle The queue handle. + * @param[out] data The data in the queue. + * @return true on success. false if queue is empty. + */ +bool nns_edge_queue_pop (nns_edge_queue_h handle, void **data); + +/** + * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue. + * @param[in] handle The queue handle. + * @param[in] timeout The time to wait for new data, in milliseconds. (0 for infinite timeout) + * @param[out] data The data in the queue. + * @return true on success. + */ +bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, void **data); + +#ifdef __cplusplus +} +#endif /* __cplusplus */ +#endif /* __NNSTREAMER_EDGE_QUEUE_H__ */ diff --git a/src/libnnstreamer-edge/nnstreamer-edge-util.h b/src/libnnstreamer-edge/nnstreamer-edge-util.h index 97aa77d..b10a958 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-util.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-util.h @@ -43,6 +43,12 @@ extern "C" { #define nns_edge_lock(h) do { pthread_mutex_lock (&(h)->lock); } while (0) #define nns_edge_unlock(h) do { pthread_mutex_unlock (&(h)->lock); } while (0) +#define nns_edge_cond_init(h) do { pthread_cond_init (&(h)->cond, NULL); } while (0) +#define nns_edge_cond_destroy(h) do { pthread_cond_destroy (&(h)->cond); } while (0) +#define nns_edge_cond_wait(h) do { pthread_cond_wait (&(h)->cond, &(h)->lock); } while (0) +#define nns_edge_cond_timedwait(h,t) do { pthread_cond_timedwait (&(h)->cond, &(h)->lock, (t)); } while (0) +#define nns_edge_cond_signal(h) do { pthread_cond_signal (&(h)->cond); } while (0) + /** * @brief Get available port number. */ diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index e686dc0..6467221 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -13,6 +13,7 @@ #include "nnstreamer-edge-internal.h" #include "nnstreamer-edge-log.h" #include "nnstreamer-edge-util.h" +#include "nnstreamer-edge-queue.h" /** * @brief Data struct for unittest. @@ -2985,6 +2986,262 @@ TEST(edgeMeta, deserializeInvalidParam03_n) free (data); } +/** + * @brief Thread to push new data into queue. + */ +static void * +_test_thread_edge_queue_push (void *thread_data) +{ + nns_edge_queue_h queue_h = thread_data; + unsigned int i, j; + void *data; + + for (i = 0; i < 6U; i++) { + usleep (50000); + + data = malloc (5 * sizeof (unsigned int)); + if (data) { + for (j = 0; j < 5U; j++) + ((unsigned int *) data)[j] = i * 10U + j; + } + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data, nns_edge_free)); + } + + return NULL; +} + +/** + * @brief Push and pop data. + */ +TEST(edgeQueue, pushData) +{ + nns_edge_queue_h queue_h; + void *data1, *data2, *data3, *result; + unsigned int i, len; + + data1 = malloc (5 * sizeof (unsigned int)); + ASSERT_TRUE (data1 != NULL); + for (i = 0; i < 5U; i++) + ((unsigned int *) data1)[i] = i + 10U; + + data2 = malloc (5 * sizeof (unsigned int)); + ASSERT_TRUE (data1 != NULL); + for (i = 0; i < 5U; i++) + ((unsigned int *) data2)[i] = i + 20U; + + data3 = malloc (5 * sizeof (unsigned int)); + ASSERT_TRUE (data1 != NULL); + for (i = 0; i < 5U; i++) + ((unsigned int *) data3)[i] = i + 30U; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, NULL)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 1U); + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, NULL)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 2U); + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, NULL)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 3U); + + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 2U); + EXPECT_EQ (result, data1); + for (i = 0; i < 5U; i++) + EXPECT_EQ (((unsigned int *) result)[i], i + 10U); + + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 1U); + EXPECT_EQ (result, data2); + for (i = 0; i < 5U; i++) + EXPECT_EQ (((unsigned int *) result)[i], i + 20U); + + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 0U); + EXPECT_EQ (result, data3); + for (i = 0; i < 5U; i++) + EXPECT_EQ (((unsigned int *) result)[i], i + 30U); + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, nns_edge_free)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 1U); + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, nns_edge_free)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 2U); + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, nns_edge_free)); + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 3U); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Wait for pushing data. + */ +TEST(edgeQueue, pushDataOnThread) +{ + nns_edge_queue_h queue_h; + pthread_t push_thread; + pthread_attr_t attr; + unsigned int i, j, len, retry; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + pthread_create (&push_thread, &attr, _test_thread_edge_queue_push, queue_h); + pthread_attr_destroy (&attr); + + for (i = 0; i < 3U; i++) { + void *result = NULL; + + EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result)); + + for (j = 0; j < 5U; j++) + EXPECT_EQ (((unsigned int *) result)[j], i * 10U + j); + + free (result); + } + + retry = 0U; + do { + usleep (20000); + len = nns_edge_queue_get_length (queue_h); + } while (len < 3U && retry++ < 200U); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Create queue - invalid param. + */ +TEST(edgeQueue, createInvalidParam01_n) +{ + EXPECT_FALSE (nns_edge_queue_create (NULL)); +} + +/** + * @brief Destroy queue - invalid param. + */ +TEST(edgeQueue, destroyInvalidParam01_n) +{ + EXPECT_FALSE (nns_edge_queue_destroy (NULL)); +} + +/** + * @brief Get length of queue - invalid param. + */ +TEST(edgeQueue, getLengthInvalidParam01_n) +{ + unsigned int len; + + len = nns_edge_queue_get_length (NULL); + EXPECT_EQ (len, 0U); +} + +/** + * @brief Push data into queue - invalid param. + */ +TEST(edgeQueue, pushInvalidParam01_n) +{ + void *data; + unsigned int i; + + data = malloc (5 * sizeof (unsigned int)); + ASSERT_TRUE (data != NULL); + + EXPECT_FALSE (nns_edge_queue_push (NULL, data, NULL)); + + free (data); +} + +/** + * @brief Push data into queue - invalid param. + */ +TEST(edgeQueue, pushInvalidParam02_n) +{ + nns_edge_queue_h queue_h; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, NULL)); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Pop data from queue - invalid param. + */ +TEST(edgeQueue, popInvalidParam01_n) +{ + void *data; + + EXPECT_FALSE (nns_edge_queue_pop (NULL, &data)); +} + +/** + * @brief Pop data from queue - invalid param. + */ +TEST(edgeQueue, popInvalidParam02_n) +{ + nns_edge_queue_h queue_h; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL)); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Wait and pop data from queue, timed out. + */ +TEST(edgeQueue, waitPopTimedout) +{ + nns_edge_queue_h queue_h; + void *data; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data)); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Wait and pop data from queue - invalid param. + */ +TEST(edgeQueue, waitPopInvalidParam01_n) +{ + void *data; + + EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data)); +} + +/** + * @brief Wait and pop data from queue - invalid param. + */ +TEST(edgeQueue, waitPopInvalidParam02_n) +{ + nns_edge_queue_h queue_h; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL)); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + #if defined(ENABLE_MQTT) /** * @brief Edge event callback for test. -- 2.34.1