[Queue] limit in data queue
authorJaeyun <jy1210.jung@samsung.com>
Thu, 13 Oct 2022 09:06:37 +0000 (18:06 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 19 Oct 2022 07:11:54 +0000 (16:11 +0900)
Set max number in queue when sending data to other node.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
include/nnstreamer-edge.h
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-queue.c
src/libnnstreamer-edge/nnstreamer-edge-queue.h
tests/unittest_nnstreamer-edge.cc

index f2547e30bca2ff32252b913b99e6bba5312d47b2..3d7e33f7f12af762a847baf8cfff1a4c8e9a4d9a 100644 (file)
@@ -290,6 +290,7 @@ int nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h);
  * DEST_IP or DEST_HOST | IP address of the destination node. In case of TCP connection, it is the IP address of the destination node, and in the case of Hybrid or AITT connection, it is the IP address of the broker.
  * DEST_PORT            | Port of the destination node. In case of TCP connection, it is the port number of the destination node, and in the case of Hybrid or AITT connection, it is the port number of the broker. The value should be 0 or higher.
  * TOPIC                | Topic used to publish/subscribe to/from the broker.
+ * QUEUE_SIZE           | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited.
  * ID or CLIENT_ID      | Unique identifier of the edge handle or client ID. (Read-only)
  */
 int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value);
index 81f9cd7211cacdb24123e764361c11512a5be04c..2b79ab7908cbdfb758c5c393236b8ac43cd0cd7a 100644 (file)
@@ -1764,6 +1764,9 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
     /* Not allowed key */
     nns_edge_loge ("Cannot update %s.", key);
     ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
+  } else if (0 == strcasecmp (key, "QUEUE_SIZE")) {
+    unsigned int limit = (unsigned int) strtoull (value, NULL, 10);
+    nns_edge_queue_set_limit (eh->send_queue, limit);
   } else {
     ret = nns_edge_metadata_set (eh->metadata, key, value);
   }
index 7e56da9cc481490d946d874e4bfc7460e6ea0a84..c51b96eb024851929c2bef3e2f0e70c741762e25 100644 (file)
@@ -36,6 +36,7 @@ typedef struct
   pthread_mutex_t lock;
   pthread_cond_t cond;
 
+  unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */
   unsigned int length;
   nns_edge_queue_data_s *head;
   nns_edge_queue_data_s *tail;
@@ -147,6 +148,26 @@ nns_edge_queue_get_length (nns_edge_queue_h handle)
   return len;
 }
 
+/**
+ * @brief Set the max length of the queue.
+ */
+bool
+nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit)
+{
+  nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+
+  if (!q) {
+    nns_edge_loge ("[Queue] Invalid param, queue is null.");
+    return false;
+  }
+
+  nns_edge_lock (q);
+  q->max_data = limit;
+  nns_edge_unlock (q);
+
+  return true;
+}
+
 /**
  * @brief Add new data into queue.
  */
@@ -177,12 +198,17 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data,
   qdata->destroy = destroy;
 
   nns_edge_lock (q);
-  if (!q->head)
-    q->head = qdata;
-  if (q->tail)
-    q->tail->next = qdata;
-  q->tail = qdata;
-  q->length++;
+  if (q->max_data > 0U && q->length >= q->max_data) {
+    nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
+        q->max_data);
+  } else {
+    if (!q->head)
+      q->head = qdata;
+    if (q->tail)
+      q->tail->next = qdata;
+    q->tail = qdata;
+    q->length++;
+  }
   nns_edge_cond_signal (q);
   nns_edge_unlock (q);
 
index 25c7a9f6e887fffac2e5006c6021af623a7e88ed..e8bd6dca6a9cd39934cf7f6101c46ba5d8784bbb 100644 (file)
@@ -49,6 +49,14 @@ bool nns_edge_queue_destroy (nns_edge_queue_h handle);
  */
 unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle);
 
+/**
+ * @brief Set the max length of the queue.
+ * @param[in] handle The queue handle.
+ * @param[in] limit The max data in queue. Default 0 means unlimited.
+ * @return true on success.
+ */
+bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit);
+
 /**
  * @brief Add new data into queue.
  * @param[in] handle The queue handle.
index 6476665e93326f7183e2adba7279bfb12f535a8d..e4ec8a0c5ca9eb69f72a4cf2ca8bcb63b6928ecc 100644 (file)
@@ -158,6 +158,7 @@ TEST(edge, connectLocal)
   nns_edge_set_info (server_h, "IP", "127.0.0.1");
   nns_edge_set_info (server_h, "PORT", val);
   nns_edge_set_info (server_h, "CAPS", "test server");
+  nns_edge_set_info (server_h, "QUEUE_SIZE", "10");
   _td_server->handle = server_h;
   nns_edge_free (val);
 
@@ -3311,6 +3312,40 @@ TEST(edgeQueue, getLengthInvalidParam01_n)
   EXPECT_EQ (len, 0U);
 }
 
+/**
+ * @brief Set limit of queue.
+ */
+TEST(edgeQueue, setLimit)
+{
+  nns_edge_queue_h queue_h;
+  void *data;
+  unsigned int i, len;
+
+  data = malloc (sizeof (unsigned int));
+  ASSERT_TRUE (data != NULL);
+
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+  EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U));
+
+  for (i = 0; i < 5U; i++)
+    nns_edge_queue_push (queue_h, data, NULL);
+
+  len = nns_edge_queue_get_length (queue_h);
+  EXPECT_EQ (len, 3U);
+
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+
+  free (data);
+}
+
+/**
+ * @brief Set limit of queue - invalid param.
+ */
+TEST(edgeQueue, setLimitInvalidParam01_n)
+{
+  EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U));
+}
+
 /**
  * @brief Push data into queue - invalid param.
  */