From 6bd5bbde9c8fb78d3a12c47c9910f8f9d170482a Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Wed, 19 Oct 2022 16:16:51 +0900 Subject: [PATCH] [Queue] leaky option Add leaky option in nns-edge queue. Signed-off-by: Jaeyun --- include/nnstreamer-edge.h | 2 +- .../nnstreamer-edge-internal.c | 26 ++++- .../nnstreamer-edge-queue.c | 54 ++++++--- .../nnstreamer-edge-queue.h | 12 +- tests/unittest_nnstreamer-edge.cc | 104 +++++++++++++++++- 5 files changed, 174 insertions(+), 24 deletions(-) diff --git a/include/nnstreamer-edge.h b/include/nnstreamer-edge.h index 0f8367d..7d2dd74 100644 --- a/include/nnstreamer-edge.h +++ b/include/nnstreamer-edge.h @@ -305,7 +305,7 @@ int nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h); * DEST_IP or DEST_HOST | IP address of the destination node. In case of TCP connection, it is the IP address of the destination node, and in the case of Hybrid or AITT connection, it is the IP address of the broker. * DEST_PORT | Port of the destination node. In case of TCP connection, it is the port number of the destination node, and in the case of Hybrid or AITT connection, it is the port number of the broker. The value should be 0 or higher. * TOPIC | Topic used to publish/subscribe to/from the broker. - * QUEUE_SIZE | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited. + * QUEUE_SIZE | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited. N: where leaky 'OLD' drops old buffer (default NEW). (e.g., QUEUE_SIZE=5:OLD drops old buffer and pushes new data when queue size reaches 5.) * ID or CLIENT_ID | Unique identifier of the edge handle or client ID. (Read-only) */ int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value); diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 8f8274c..942ad07 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -1772,8 +1772,30 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) nns_edge_loge ("Cannot update %s.", key); ret = NNS_EDGE_ERROR_INVALID_PARAMETER; } else if (0 == strcasecmp (key, "QUEUE_SIZE")) { - unsigned int limit = (unsigned int) strtoull (value, NULL, 10); - nns_edge_queue_set_limit (eh->send_queue, limit); + char *s; + unsigned int limit; + nns_edge_queue_leak_e leaky = NNS_EDGE_QUEUE_LEAK_UNKNOWN; + + s = strstr (value, ":"); + if (s) { + char *v = nns_edge_strndup (value, s - value); + + limit = (unsigned int) strtoull (v, NULL, 10); + nns_edge_free (v); + + if (strcasecmp (s + 1, "NEW") == 0) { + leaky = NNS_EDGE_QUEUE_LEAK_NEW; + } else if (strcasecmp (s + 1, "OLD") == 0) { + leaky = NNS_EDGE_QUEUE_LEAK_OLD; + } else { + nns_edge_loge ("Cannot set queue leaky option (%s).", s + 1); + ret = NNS_EDGE_ERROR_INVALID_PARAMETER; + } + } else { + limit = (unsigned int) strtoull (value, NULL, 10); + } + + nns_edge_queue_set_limit (eh->send_queue, limit, leaky); } else { ret = nns_edge_metadata_set (eh->metadata, key, value); } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.c b/src/libnnstreamer-edge/nnstreamer-edge-queue.c index b8f533a..d96f26e 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-queue.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.c @@ -37,6 +37,7 @@ typedef struct pthread_mutex_t lock; pthread_cond_t cond; + nns_edge_queue_leak_e leaky; unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */ unsigned int length; nns_edge_queue_data_s *head; @@ -44,23 +45,28 @@ typedef struct } nns_edge_queue_s; /** - * @brief Pop data from queue. + * @brief Pop data from queue. If the param 'clear' is true, release old data and return null. * @note This function should be called with lock. */ static void * -_pop_data (nns_edge_queue_s * q) +_pop_data (nns_edge_queue_s * q, bool clear) { nns_edge_queue_data_s *qdata; void *data = NULL; qdata = q->head; if (qdata) { - data = qdata->data; - q->head = qdata->next; if ((--q->length) == 0U) q->head = q->tail = NULL; + if (clear) { + if (qdata->destroy) + qdata->destroy (qdata->data); + } else { + data = qdata->data; + } + free (qdata); } @@ -88,6 +94,7 @@ nns_edge_queue_create (nns_edge_queue_h * handle) nns_edge_lock_init (q); nns_edge_cond_init (q); + q->leaky = NNS_EDGE_QUEUE_LEAK_NEW; *handle = q; return true; @@ -153,7 +160,8 @@ nns_edge_queue_get_length (nns_edge_queue_h handle) * @brief Set the max length of the queue. */ bool -nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit) +nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit, + nns_edge_queue_leak_e leaky) { nns_edge_queue_s *q = (nns_edge_queue_s *) handle; @@ -164,6 +172,8 @@ nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit) nns_edge_lock (q); q->max_data = limit; + if (leaky != NNS_EDGE_QUEUE_LEAK_UNKNOWN) + q->leaky = leaky; nns_edge_unlock (q); return true; @@ -178,6 +188,7 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data, { nns_edge_queue_s *q = (nns_edge_queue_s *) handle; nns_edge_queue_data_s *qdata; + bool pushed = false; if (!q) { nns_edge_loge ("[Queue] Invalid param, queue is null."); @@ -200,20 +211,29 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_edge_lock (q); if (q->max_data > 0U && q->length >= q->max_data) { - nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.", - q->max_data); - } else { - if (!q->head) - q->head = qdata; - if (q->tail) - q->tail->next = qdata; - q->tail = qdata; - q->length++; + /* Clear old data in queue if leaky option is 'old'. */ + if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) { + _pop_data (q, true); + } else { + nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.", + q->max_data); + goto done; + } } + + if (!q->head) + q->head = qdata; + if (q->tail) + q->tail->next = qdata; + q->tail = qdata; + q->length++; + pushed = true; + +done: nns_edge_cond_signal (q); nns_edge_unlock (q); - return true; + return pushed; } /** @@ -235,7 +255,7 @@ nns_edge_queue_pop (nns_edge_queue_h handle, void **data) } nns_edge_lock (q); - *data = _pop_data (q); + *data = _pop_data (q, false); nns_edge_unlock (q); return (*data != NULL); @@ -277,7 +297,7 @@ nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, } } - *data = _pop_data (q); + *data = _pop_data (q, false); nns_edge_unlock (q); return (*data != NULL); diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.h b/src/libnnstreamer-edge/nnstreamer-edge-queue.h index e8bd6dc..2a48064 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-queue.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.h @@ -28,6 +28,15 @@ typedef void *nns_edge_queue_h; */ typedef void (*nns_edge_queue_data_destroy_cb) (void *data); +/** + * @brief Enumeration for the queue leaky option. + */ +typedef enum { + NNS_EDGE_QUEUE_LEAK_UNKNOWN = 0, + NNS_EDGE_QUEUE_LEAK_NEW, + NNS_EDGE_QUEUE_LEAK_OLD +} nns_edge_queue_leak_e; + /** * @brief Create queue. * @param[out] handle Newly created handle. @@ -53,9 +62,10 @@ unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle); * @brief Set the max length of the queue. * @param[in] handle The queue handle. * @param[in] limit The max data in queue. Default 0 means unlimited. + * @param[in] leaky The queue leaky option. * @return true on success. */ -bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit); +bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit, nns_edge_queue_leak_e leaky); /** * @brief Add new data into queue. diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index 3ceabd0..6acdf53 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -158,7 +158,7 @@ TEST(edge, connectLocal) nns_edge_set_info (server_h, "IP", "127.0.0.1"); nns_edge_set_info (server_h, "PORT", val); nns_edge_set_info (server_h, "CAPS", "test server"); - nns_edge_set_info (server_h, "QUEUE_SIZE", "10"); + nns_edge_set_info (server_h, "QUEUE_SIZE", "10:OLD"); _td_server->handle = server_h; nns_edge_free (val); @@ -826,6 +826,26 @@ TEST(edge, setInfoInvalidParam08_n) EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } +/** + * @brief Set info - invalid param. + */ +TEST(edge, setInfoInvalidParam09_n) +{ + nns_edge_h edge_h; + int ret; + + ret = nns_edge_create_handle ("temp-id", NNS_EDGE_CONNECT_TYPE_TCP, + NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* Invalid option */ + ret = nns_edge_set_info (edge_h, "QUEUE_SIZE", "15:INVALID_LEAKY"); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + /** * @brief Get info. */ @@ -3325,7 +3345,7 @@ TEST(edgeQueue, setLimit) ASSERT_TRUE (data != NULL); EXPECT_TRUE (nns_edge_queue_create (&queue_h)); - EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U)); + EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW)); for (i = 0; i < 5U; i++) nns_edge_queue_push (queue_h, data, NULL); @@ -3338,12 +3358,89 @@ TEST(edgeQueue, setLimit) free (data); } +/** + * @brief Set leaky option of queue. + */ +TEST(edgeQueue, setLeaky) +{ + nns_edge_queue_h queue_h; + void *data; + unsigned int i, len; + bool res; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + /* leaky option new */ + EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW)); + + for (i = 0; i < 5U; i++) { + data = malloc (sizeof (unsigned int)); + ASSERT_TRUE (data != NULL); + + *((unsigned int *) data) = i + 1; + + res = nns_edge_queue_push (queue_h, data, free); + if (i < 3U) { + EXPECT_TRUE (res); + } else { + EXPECT_FALSE (res); + free (data); + } + } + + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 3U); + + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_EQ (*((unsigned int *) data), 1U); + free (data); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_EQ (*((unsigned int *) data), 2U); + free (data); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_EQ (*((unsigned int *) data), 3U); + free (data); + + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 0U); + + /* leaky option old */ + EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_OLD)); + + for (i = 0; i < 5U; i++) { + data = malloc (sizeof (unsigned int)); + ASSERT_TRUE (data != NULL); + + *((unsigned int *) data) = i + 1; + + EXPECT_TRUE (nns_edge_queue_push (queue_h, data, free)); + } + + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 3U); + + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_EQ (*((unsigned int *) data), 3U); + free (data); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_EQ (*((unsigned int *) data), 4U); + free (data); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_EQ (*((unsigned int *) data), 5U); + free (data); + + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 0U); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + /** * @brief Set limit of queue - invalid param. */ TEST(edgeQueue, setLimitInvalidParam01_n) { - EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U)); + EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U, NNS_EDGE_QUEUE_LEAK_NEW)); } /** @@ -3552,6 +3649,7 @@ TEST(edgeMqtt, connectLocal) nns_edge_set_info (server_h, "DEST_PORT", "1883"); nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic"); nns_edge_set_info (server_h, "CAPS", "test server"); + nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW"); _td_server->handle = server_h; /* Prepare client */ -- 2.34.1