[Queue] base code for queue
authorJaeyun <jy1210.jung@samsung.com>
Wed, 24 Aug 2022 09:46:45 +0000 (18:46 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 31 Aug 2022 02:04:23 +0000 (11:04 +0900)
Add base code for queue in nns-edge.
This will be used on TCP connection - thread to send data.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
src/CMakeLists.txt
src/libnnstreamer-edge/nnstreamer-edge-queue.c [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-queue.h [new file with mode: 0644]
src/libnnstreamer-edge/nnstreamer-edge-util.h
tests/unittest_nnstreamer-edge.cc

index 4626f2dfd5a5bb58dff791e4f903917cddcb8e6f..54ac18f9b2bd602eff4ae9f53dc49a2174a37ab1 100644 (file)
@@ -4,6 +4,7 @@ SET(NNS_EDGE_SRCS
     ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-common.c
     ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-internal.c
     ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-util.c
+    ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-queue.c
 )
 
 IF(ENABLE_PAHO_MQTT)
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.c b/src/libnnstreamer-edge/nnstreamer-edge-queue.c
new file mode 100644 (file)
index 0000000..7e56da9
--- /dev/null
@@ -0,0 +1,257 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-queue.c
+ * @date   24 August 2022
+ * @brief  Thread-safe queue.
+ * @see    https://github.com/nnstreamer/nnstreamer-edge
+ * @author Jaeyun Jung <jy1210.jung@samsung.com>
+ */
+
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-queue.h"
+#include "nnstreamer-edge-util.h"
+
+/**
+ * @brief Internal structure for queue data.
+ */
+typedef struct _nns_edge_queue_data_s nns_edge_queue_data_s;
+
+/**
+ * @brief Internal structure for queue data.
+ */
+struct _nns_edge_queue_data_s
+{
+  void *data;
+  nns_edge_queue_data_destroy_cb destroy;
+  nns_edge_queue_data_s *next;
+};
+
+/**
+ * @brief Internal structure for queue.
+ */
+typedef struct
+{
+  pthread_mutex_t lock;
+  pthread_cond_t cond;
+
+  unsigned int length;
+  nns_edge_queue_data_s *head;
+  nns_edge_queue_data_s *tail;
+} nns_edge_queue_s;
+
+/**
+ * @brief Pop data from queue.
+ * @note This function should be called with lock.
+ */
+static void *
+_pop_data (nns_edge_queue_s * q)
+{
+  nns_edge_queue_data_s *qdata;
+  void *data = NULL;
+
+  qdata = q->head;
+  if (qdata) {
+    data = qdata->data;
+
+    q->head = qdata->next;
+    if ((--q->length) == 0U)
+      q->head = q->tail = NULL;
+
+    free (qdata);
+  }
+
+  return data;
+}
+
+/**
+ * @brief Create queue.
+ */
+bool
+nns_edge_queue_create (nns_edge_queue_h * handle)
+{
+  nns_edge_queue_s *q;
+
+  if (!handle) {
+    nns_edge_loge ("[Queue] Invalid param, handle is null.");
+    return false;
+  }
+
+  q = calloc (1, sizeof (nns_edge_queue_s));
+  if (!q) {
+    nns_edge_loge ("[Queue] Failed to allocate new memory.");
+    return false;
+  }
+
+  nns_edge_lock_init (q);
+  nns_edge_cond_init (q);
+
+  *handle = q;
+  return true;
+}
+
+/**
+ * @brief Destroy queue.
+ */
+bool
+nns_edge_queue_destroy (nns_edge_queue_h handle)
+{
+  nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+  nns_edge_queue_data_s *qdata;
+
+  if (!q) {
+    nns_edge_loge ("[Queue] Invalid param, queue is null.");
+    return false;
+  }
+
+  nns_edge_lock (q);
+  nns_edge_cond_signal (q);
+  while ((qdata = q->head) != NULL) {
+    if (qdata->destroy)
+      qdata->destroy (qdata->data);
+
+    q->head = qdata->next;
+    free (qdata);
+  }
+
+  q->head = q->tail = NULL;
+  q->length = 0U;
+  nns_edge_unlock (q);
+
+  nns_edge_cond_destroy (q);
+  nns_edge_lock_destroy (q);
+  free (q);
+
+  return true;
+}
+
+/**
+ * @brief Get the length of the queue.
+ */
+unsigned int
+nns_edge_queue_get_length (nns_edge_queue_h handle)
+{
+  nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+  unsigned int len;
+
+  if (!q) {
+    nns_edge_loge ("[Queue] Invalid param, queue is null.");
+    return 0;
+  }
+
+  nns_edge_lock (q);
+  len = q->length;
+  nns_edge_unlock (q);
+
+  return len;
+}
+
+/**
+ * @brief Add new data into queue.
+ */
+bool
+nns_edge_queue_push (nns_edge_queue_h handle, void *data,
+    nns_edge_queue_data_destroy_cb destroy)
+{
+  nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+  nns_edge_queue_data_s *qdata;
+
+  if (!q) {
+    nns_edge_loge ("[Queue] Invalid param, queue is null.");
+    return false;
+  }
+
+  if (!data) {
+    nns_edge_loge ("[Queue] Invalid param, data is null.");
+    return false;
+  }
+
+  qdata = calloc (1, sizeof (nns_edge_queue_data_s));
+  if (!qdata) {
+    nns_edge_loge ("[Queue] Failed to allocate new memory for data.");
+    return false;
+  }
+
+  qdata->data = data;
+  qdata->destroy = destroy;
+
+  nns_edge_lock (q);
+  if (!q->head)
+    q->head = qdata;
+  if (q->tail)
+    q->tail->next = qdata;
+  q->tail = qdata;
+  q->length++;
+  nns_edge_cond_signal (q);
+  nns_edge_unlock (q);
+
+  return true;
+}
+
+/**
+ * @brief Remove and return the first data in queue.
+ */
+bool
+nns_edge_queue_pop (nns_edge_queue_h handle, void **data)
+{
+  nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+
+  if (!q) {
+    nns_edge_loge ("[Queue] Invalid param, queue is null.");
+    return false;
+  }
+
+  if (!data) {
+    nns_edge_loge ("[Queue] Invalid param, data is null.");
+    return false;
+  }
+
+  nns_edge_lock (q);
+  *data = _pop_data (q);
+  nns_edge_unlock (q);
+
+  return (*data != NULL);
+}
+
+/**
+ * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue.
+ */
+bool
+nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
+    void **data)
+{
+  nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+
+  if (!q) {
+    nns_edge_loge ("[Queue] Invalid param, queue is null.");
+    return false;
+  }
+
+  if (!data) {
+    nns_edge_loge ("[Queue] Invalid param, data is null.");
+    return false;
+  }
+
+  nns_edge_lock (q);
+  if (q->length == 0U) {
+    if (timeout > 0) {
+      struct timespec ts;
+      struct timeval now;
+
+      gettimeofday (&now, NULL);
+
+      ts.tv_sec = now.tv_sec + timeout / 1000;
+      ts.tv_nsec = now.tv_usec * 1000 + (timeout % 1000) * 1000000;
+
+      nns_edge_cond_timedwait (q, &ts);
+    } else {
+      nns_edge_cond_wait (q);
+    }
+  }
+
+  *data = _pop_data (q);
+  nns_edge_unlock (q);
+
+  return (*data != NULL);
+}
diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.h b/src/libnnstreamer-edge/nnstreamer-edge-queue.h
new file mode 100644 (file)
index 0000000..4f7edd3
--- /dev/null
@@ -0,0 +1,80 @@
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file   nnstreamer-edge-queue.h
+ * @date   24 August 2022
+ * @brief  Thread-safe queue.
+ * @see    https://github.com/nnstreamer/nnstreamer-edge
+ * @author Jaeyun Jung <jy1210.jung@samsung.com>
+ * @note   This file is internal header for nnstreamer-edge. DO NOT export this file.
+ */
+
+#ifndef __NNSTREAMER_EDGE_QUEUE_H__
+#define __NNSTREAMER_EDGE_QUEUE_H__
+
+#include <stdbool.h>
+#include <sys/time.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+typedef void *nns_edge_queue_h;
+
+/**
+ * @brief Callback called when data in queue is released.
+ */
+typedef void (*nns_edge_queue_data_destroy_cb) (void *data);
+
+/**
+ * @brief Create queue.
+ * @param[out] handle Newly created handle.
+ * @return true on success.
+ */
+bool nns_edge_queue_create (nns_edge_queue_h *handle);
+
+/**
+ * @brief Destroy queue.
+ * @param[in] handle The queue handle.
+ * @return true on success.
+ */
+bool nns_edge_queue_destroy (nns_edge_queue_h handle);
+
+/**
+ * @brief Get the length of the queue.
+ * @param[in] handle The queue handle.
+ * @return The number of data in the queue.
+ */
+unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle);
+
+/**
+ * @brief Add new data into queue.
+ * @param[in] handle The queue handle.
+ * @param[in] data The data to be added.
+ * @param[in] destroy Nullable, the callback function to release data.
+ * @return true on success.
+ */
+bool nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_edge_queue_data_destroy_cb destroy);
+
+/**
+ * @brief Remove and return the first data in queue.
+ * @param[in] handle The queue handle.
+ * @param[out] data The data in the queue.
+ * @return true on success. false if queue is empty.
+ */
+bool nns_edge_queue_pop (nns_edge_queue_h handle, void **data);
+
+/**
+ * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue.
+ * @param[in] handle The queue handle.
+ * @param[in] timeout The time to wait for new data, in milliseconds. (0 for infinite timeout)
+ * @param[out] data The data in the queue.
+ * @return true on success.
+ */
+bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, void **data);
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+#endif /* __NNSTREAMER_EDGE_QUEUE_H__ */
index 97aa77db27bc1a328f3c4d9a2eb20cb004ba1c78..b10a958c58228657221854e9382db097d2d02f48 100644 (file)
@@ -43,6 +43,12 @@ extern "C" {
 #define nns_edge_lock(h) do { pthread_mutex_lock (&(h)->lock); } while (0)
 #define nns_edge_unlock(h) do { pthread_mutex_unlock (&(h)->lock); } while (0)
 
+#define nns_edge_cond_init(h) do { pthread_cond_init (&(h)->cond, NULL); } while (0)
+#define nns_edge_cond_destroy(h) do { pthread_cond_destroy (&(h)->cond); } while (0)
+#define nns_edge_cond_wait(h) do { pthread_cond_wait (&(h)->cond, &(h)->lock); } while (0)
+#define nns_edge_cond_timedwait(h,t) do { pthread_cond_timedwait (&(h)->cond, &(h)->lock, (t)); } while (0)
+#define nns_edge_cond_signal(h) do { pthread_cond_signal (&(h)->cond); } while (0)
+
 /**
  * @brief Get available port number.
  */
index e686dc0ee382a63802432d3ef621587d1280950a..64672210df0d7b9205f3ca7d6bbb776db1dbf027 100644 (file)
@@ -13,6 +13,7 @@
 #include "nnstreamer-edge-internal.h"\r
 #include "nnstreamer-edge-log.h"\r
 #include "nnstreamer-edge-util.h"\r
+#include "nnstreamer-edge-queue.h"\r
 \r
 /**\r
  * @brief Data struct for unittest.\r
@@ -2985,6 +2986,262 @@ TEST(edgeMeta, deserializeInvalidParam03_n)
   free (data);\r
 }\r
 \r
+/**\r
+ * @brief Thread to push new data into queue.\r
+ */\r
+static void *\r
+_test_thread_edge_queue_push (void *thread_data)\r
+{\r
+  nns_edge_queue_h queue_h = thread_data;\r
+  unsigned int i, j;\r
+  void *data;\r
+\r
+  for (i = 0; i < 6U; i++) {\r
+    usleep (50000);\r
+\r
+    data = malloc (5 * sizeof (unsigned int));\r
+    if (data) {\r
+      for (j = 0; j < 5U; j++)\r
+        ((unsigned int *) data)[j] = i * 10U + j;\r
+    }\r
+\r
+    EXPECT_TRUE (nns_edge_queue_push (queue_h, data, nns_edge_free));\r
+  }\r
+\r
+  return NULL;\r
+}\r
+\r
+/**\r
+ * @brief Push and pop data.\r
+ */\r
+TEST(edgeQueue, pushData)\r
+{\r
+  nns_edge_queue_h queue_h;\r
+  void *data1, *data2, *data3, *result;\r
+  unsigned int i, len;\r
+\r
+  data1 = malloc (5 * sizeof (unsigned int));\r
+  ASSERT_TRUE (data1 != NULL);\r
+  for (i = 0; i < 5U; i++)\r
+    ((unsigned int *) data1)[i] = i + 10U;\r
+\r
+  data2 = malloc (5 * sizeof (unsigned int));\r
+  ASSERT_TRUE (data1 != NULL);\r
+  for (i = 0; i < 5U; i++)\r
+    ((unsigned int *) data2)[i] = i + 20U;\r
+\r
+  data3 = malloc (5 * sizeof (unsigned int));\r
+  ASSERT_TRUE (data1 != NULL);\r
+  for (i = 0; i < 5U; i++)\r
+    ((unsigned int *) data3)[i] = i + 30U;\r
+\r
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, NULL));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 1U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, NULL));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 2U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, NULL));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 3U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 2U);\r
+  EXPECT_EQ (result, data1);\r
+  for (i = 0; i < 5U; i++)\r
+    EXPECT_EQ (((unsigned int *) result)[i], i + 10U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 1U);\r
+  EXPECT_EQ (result, data2);\r
+  for (i = 0; i < 5U; i++)\r
+    EXPECT_EQ (((unsigned int *) result)[i], i + 20U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 0U);\r
+  EXPECT_EQ (result, data3);\r
+  for (i = 0; i < 5U; i++)\r
+    EXPECT_EQ (((unsigned int *) result)[i], i + 30U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, nns_edge_free));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 1U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, nns_edge_free));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 2U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, nns_edge_free));\r
+  len = nns_edge_queue_get_length (queue_h);\r
+  EXPECT_EQ (len, 3U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Wait for pushing data.\r
+ */\r
+TEST(edgeQueue, pushDataOnThread)\r
+{\r
+  nns_edge_queue_h queue_h;\r
+  pthread_t push_thread;\r
+  pthread_attr_t attr;\r
+  unsigned int i, j, len, retry;\r
+\r
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+  pthread_attr_init (&attr);\r
+  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);\r
+  pthread_create (&push_thread, &attr, _test_thread_edge_queue_push, queue_h);\r
+  pthread_attr_destroy (&attr);\r
+\r
+  for (i = 0; i < 3U; i++) {\r
+    void *result = NULL;\r
+\r
+    EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result));\r
+\r
+    for (j = 0; j < 5U; j++)\r
+      EXPECT_EQ (((unsigned int *) result)[j], i * 10U + j);\r
+\r
+    free (result);\r
+  }\r
+\r
+  retry = 0U;\r
+  do {\r
+    usleep (20000);\r
+    len = nns_edge_queue_get_length (queue_h);\r
+  } while (len < 3U && retry++ < 200U);\r
+\r
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Create queue - invalid param.\r
+ */\r
+TEST(edgeQueue, createInvalidParam01_n)\r
+{\r
+  EXPECT_FALSE (nns_edge_queue_create (NULL));\r
+}\r
+\r
+/**\r
+ * @brief Destroy queue - invalid param.\r
+ */\r
+TEST(edgeQueue, destroyInvalidParam01_n)\r
+{\r
+  EXPECT_FALSE (nns_edge_queue_destroy (NULL));\r
+}\r
+\r
+/**\r
+ * @brief Get length of queue - invalid param.\r
+ */\r
+TEST(edgeQueue, getLengthInvalidParam01_n)\r
+{\r
+  unsigned int len;\r
+\r
+  len = nns_edge_queue_get_length (NULL);\r
+  EXPECT_EQ (len, 0U);\r
+}\r
+\r
+/**\r
+ * @brief Push data into queue - invalid param.\r
+ */\r
+TEST(edgeQueue, pushInvalidParam01_n)\r
+{\r
+  void *data;\r
+  unsigned int i;\r
+\r
+  data = malloc (5 * sizeof (unsigned int));\r
+  ASSERT_TRUE (data != NULL);\r
+\r
+  EXPECT_FALSE (nns_edge_queue_push (NULL, data, NULL));\r
+\r
+  free (data);\r
+}\r
+\r
+/**\r
+ * @brief Push data into queue - invalid param.\r
+ */\r
+TEST(edgeQueue, pushInvalidParam02_n)\r
+{\r
+  nns_edge_queue_h queue_h;\r
+\r
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+  EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, NULL));\r
+\r
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, popInvalidParam01_n)\r
+{\r
+  void *data;\r
+\r
+  EXPECT_FALSE (nns_edge_queue_pop (NULL, &data));\r
+}\r
+\r
+/**\r
+ * @brief Pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, popInvalidParam02_n)\r
+{\r
+  nns_edge_queue_h queue_h;\r
+\r
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+  EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL));\r
+\r
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Wait and pop data from queue, timed out.\r
+ */\r
+TEST(edgeQueue, waitPopTimedout)\r
+{\r
+  nns_edge_queue_h queue_h;\r
+  void *data;\r
+\r
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+  EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data));\r
+\r
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
+/**\r
+ * @brief Wait and pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, waitPopInvalidParam01_n)\r
+{\r
+  void *data;\r
+\r
+  EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data));\r
+}\r
+\r
+/**\r
+ * @brief Wait and pop data from queue - invalid param.\r
+ */\r
+TEST(edgeQueue, waitPopInvalidParam02_n)\r
+{\r
+  nns_edge_queue_h queue_h;\r
+\r
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));\r
+\r
+  EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL));\r
+\r
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));\r
+}\r
+\r
 #if defined(ENABLE_MQTT)\r
 /**\r
  * @brief Edge event callback for test.\r