From: Jaeyun Date: Thu, 13 Oct 2022 09:06:37 +0000 (+0900) Subject: [Queue] limit in data queue X-Git-Tag: accepted/tizen/unified/20221115.172904~17 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=32746b3c782e26eacf74b222c67fcbf1da8a193a;p=platform%2Fupstream%2Fnnstreamer-edge.git [Queue] limit in data queue Set max number in queue when sending data to other node. Signed-off-by: Jaeyun --- diff --git a/include/nnstreamer-edge.h b/include/nnstreamer-edge.h index f2547e3..3d7e33f 100644 --- a/include/nnstreamer-edge.h +++ b/include/nnstreamer-edge.h @@ -290,6 +290,7 @@ int nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h); * DEST_IP or DEST_HOST | IP address of the destination node. In case of TCP connection, it is the IP address of the destination node, and in the case of Hybrid or AITT connection, it is the IP address of the broker. * DEST_PORT | Port of the destination node. In case of TCP connection, it is the port number of the destination node, and in the case of Hybrid or AITT connection, it is the port number of the broker. The value should be 0 or higher. * TOPIC | Topic used to publish/subscribe to/from the broker. + * QUEUE_SIZE | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited. * ID or CLIENT_ID | Unique identifier of the edge handle or client ID. (Read-only) */ int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value); diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 81f9cd7..2b79ab7 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -1764,6 +1764,9 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) /* Not allowed key */ nns_edge_loge ("Cannot update %s.", key); ret = NNS_EDGE_ERROR_INVALID_PARAMETER; + } else if (0 == strcasecmp (key, "QUEUE_SIZE")) { + unsigned int limit = (unsigned int) strtoull (value, NULL, 10); + nns_edge_queue_set_limit (eh->send_queue, limit); } else { ret = nns_edge_metadata_set (eh->metadata, key, value); } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.c b/src/libnnstreamer-edge/nnstreamer-edge-queue.c index 7e56da9..c51b96e 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-queue.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.c @@ -36,6 +36,7 @@ typedef struct pthread_mutex_t lock; pthread_cond_t cond; + unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */ unsigned int length; nns_edge_queue_data_s *head; nns_edge_queue_data_s *tail; @@ -147,6 +148,26 @@ nns_edge_queue_get_length (nns_edge_queue_h handle) return len; } +/** + * @brief Set the max length of the queue. + */ +bool +nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit) +{ + nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + + if (!q) { + nns_edge_loge ("[Queue] Invalid param, queue is null."); + return false; + } + + nns_edge_lock (q); + q->max_data = limit; + nns_edge_unlock (q); + + return true; +} + /** * @brief Add new data into queue. */ @@ -177,12 +198,17 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data, qdata->destroy = destroy; nns_edge_lock (q); - if (!q->head) - q->head = qdata; - if (q->tail) - q->tail->next = qdata; - q->tail = qdata; - q->length++; + if (q->max_data > 0U && q->length >= q->max_data) { + nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.", + q->max_data); + } else { + if (!q->head) + q->head = qdata; + if (q->tail) + q->tail->next = qdata; + q->tail = qdata; + q->length++; + } nns_edge_cond_signal (q); nns_edge_unlock (q); diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.h b/src/libnnstreamer-edge/nnstreamer-edge-queue.h index 25c7a9f..e8bd6dc 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-queue.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.h @@ -49,6 +49,14 @@ bool nns_edge_queue_destroy (nns_edge_queue_h handle); */ unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle); +/** + * @brief Set the max length of the queue. + * @param[in] handle The queue handle. + * @param[in] limit The max data in queue. Default 0 means unlimited. + * @return true on success. + */ +bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit); + /** * @brief Add new data into queue. * @param[in] handle The queue handle. diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index 6476665..e4ec8a0 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -158,6 +158,7 @@ TEST(edge, connectLocal) nns_edge_set_info (server_h, "IP", "127.0.0.1"); nns_edge_set_info (server_h, "PORT", val); nns_edge_set_info (server_h, "CAPS", "test server"); + nns_edge_set_info (server_h, "QUEUE_SIZE", "10"); _td_server->handle = server_h; nns_edge_free (val); @@ -3311,6 +3312,40 @@ TEST(edgeQueue, getLengthInvalidParam01_n) EXPECT_EQ (len, 0U); } +/** + * @brief Set limit of queue. + */ +TEST(edgeQueue, setLimit) +{ + nns_edge_queue_h queue_h; + void *data; + unsigned int i, len; + + data = malloc (sizeof (unsigned int)); + ASSERT_TRUE (data != NULL); + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U)); + + for (i = 0; i < 5U; i++) + nns_edge_queue_push (queue_h, data, NULL); + + len = nns_edge_queue_get_length (queue_h); + EXPECT_EQ (len, 3U); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); + + free (data); +} + +/** + * @brief Set limit of queue - invalid param. + */ +TEST(edgeQueue, setLimitInvalidParam01_n) +{ + EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U)); +} + /** * @brief Push data into queue - invalid param. */