[Queue] leaky option
authorJaeyun <jy1210.jung@samsung.com>
Wed, 19 Oct 2022 07:16:51 +0000 (16:16 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Tue, 1 Nov 2022 01:29:48 +0000 (10:29 +0900)
Add leaky option in nns-edge queue.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
include/nnstreamer-edge.h
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-queue.c
src/libnnstreamer-edge/nnstreamer-edge-queue.h
tests/unittest_nnstreamer-edge.cc

index 0f8367d636fe41b83d3c5fd1c269aa4251d0291d..7d2dd74290332553786ab13e223394a17587c35a 100644 (file)
@@ -305,7 +305,7 @@ int nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h);
  * DEST_IP or DEST_HOST | IP address of the destination node. In case of TCP connection, it is the IP address of the destination node, and in the case of Hybrid or AITT connection, it is the IP address of the broker.
  * DEST_PORT            | Port of the destination node. In case of TCP connection, it is the port number of the destination node, and in the case of Hybrid or AITT connection, it is the port number of the broker. The value should be 0 or higher.
  * TOPIC                | Topic used to publish/subscribe to/from the broker.
- * QUEUE_SIZE           | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited.
+ * QUEUE_SIZE           | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited. N:<leaky [NEW, OLD]> where leaky 'OLD' drops old buffer (default NEW). (e.g., QUEUE_SIZE=5:OLD drops old buffer and pushes new data when queue size reaches 5.)
  * ID or CLIENT_ID      | Unique identifier of the edge handle or client ID. (Read-only)
  */
 int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value);
index 8f8274cf6b828c86878f8aed30f20d71a1fbe767..942ad077fabd347196a546e45a2ed68193207ce0 100644 (file)
@@ -1772,8 +1772,30 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
     nns_edge_loge ("Cannot update %s.", key);
     ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
   } else if (0 == strcasecmp (key, "QUEUE_SIZE")) {
-    unsigned int limit = (unsigned int) strtoull (value, NULL, 10);
-    nns_edge_queue_set_limit (eh->send_queue, limit);
+    char *s;
+    unsigned int limit;
+    nns_edge_queue_leak_e leaky = NNS_EDGE_QUEUE_LEAK_UNKNOWN;
+
+    s = strstr (value, ":");
+    if (s) {
+      char *v = nns_edge_strndup (value, s - value);
+
+      limit = (unsigned int) strtoull (v, NULL, 10);
+      nns_edge_free (v);
+
+      if (strcasecmp (s + 1, "NEW") == 0) {
+        leaky = NNS_EDGE_QUEUE_LEAK_NEW;
+      } else if (strcasecmp (s + 1, "OLD") == 0) {
+        leaky = NNS_EDGE_QUEUE_LEAK_OLD;
+      } else {
+        nns_edge_loge ("Cannot set queue leaky option (%s).", s + 1);
+        ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
+      }
+    } else {
+      limit = (unsigned int) strtoull (value, NULL, 10);
+    }
+
+    nns_edge_queue_set_limit (eh->send_queue, limit, leaky);
   } else {
     ret = nns_edge_metadata_set (eh->metadata, key, value);
   }
index b8f533a2e6687629eec8a5993b97e3f865c3f96f..d96f26ef2108ede2e99316f9c00acf7f479ee5c9 100644 (file)
@@ -37,6 +37,7 @@ typedef struct
   pthread_mutex_t lock;
   pthread_cond_t cond;
 
+  nns_edge_queue_leak_e leaky;
   unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */
   unsigned int length;
   nns_edge_queue_data_s *head;
@@ -44,23 +45,28 @@ typedef struct
 } nns_edge_queue_s;
 
 /**
- * @brief Pop data from queue.
+ * @brief Pop data from queue. If the param 'clear' is true, release old data and return null.
  * @note This function should be called with lock.
  */
 static void *
-_pop_data (nns_edge_queue_s * q)
+_pop_data (nns_edge_queue_s * q, bool clear)
 {
   nns_edge_queue_data_s *qdata;
   void *data = NULL;
 
   qdata = q->head;
   if (qdata) {
-    data = qdata->data;
-
     q->head = qdata->next;
     if ((--q->length) == 0U)
       q->head = q->tail = NULL;
 
+    if (clear) {
+      if (qdata->destroy)
+        qdata->destroy (qdata->data);
+    } else {
+      data = qdata->data;
+    }
+
     free (qdata);
   }
 
@@ -88,6 +94,7 @@ nns_edge_queue_create (nns_edge_queue_h * handle)
 
   nns_edge_lock_init (q);
   nns_edge_cond_init (q);
+  q->leaky = NNS_EDGE_QUEUE_LEAK_NEW;
 
   *handle = q;
   return true;
@@ -153,7 +160,8 @@ nns_edge_queue_get_length (nns_edge_queue_h handle)
  * @brief Set the max length of the queue.
  */
 bool
-nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit)
+nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit,
+    nns_edge_queue_leak_e leaky)
 {
   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
 
@@ -164,6 +172,8 @@ nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit)
 
   nns_edge_lock (q);
   q->max_data = limit;
+  if (leaky != NNS_EDGE_QUEUE_LEAK_UNKNOWN)
+    q->leaky = leaky;
   nns_edge_unlock (q);
 
   return true;
@@ -178,6 +188,7 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data,
 {
   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
   nns_edge_queue_data_s *qdata;
+  bool pushed = false;
 
   if (!q) {
     nns_edge_loge ("[Queue] Invalid param, queue is null.");
@@ -200,20 +211,29 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data,
 
   nns_edge_lock (q);
   if (q->max_data > 0U && q->length >= q->max_data) {
-    nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
-        q->max_data);
-  } else {
-    if (!q->head)
-      q->head = qdata;
-    if (q->tail)
-      q->tail->next = qdata;
-    q->tail = qdata;
-    q->length++;
+    /* Clear old data in queue if leaky option is 'old'. */
+    if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) {
+      _pop_data (q, true);
+    } else {
+      nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
+          q->max_data);
+      goto done;
+    }
   }
+
+  if (!q->head)
+    q->head = qdata;
+  if (q->tail)
+    q->tail->next = qdata;
+  q->tail = qdata;
+  q->length++;
+  pushed = true;
+
+done:
   nns_edge_cond_signal (q);
   nns_edge_unlock (q);
 
-  return true;
+  return pushed;
 }
 
 /**
@@ -235,7 +255,7 @@ nns_edge_queue_pop (nns_edge_queue_h handle, void **data)
   }
 
   nns_edge_lock (q);
-  *data = _pop_data (q);
+  *data = _pop_data (q, false);
   nns_edge_unlock (q);
 
   return (*data != NULL);
@@ -277,7 +297,7 @@ nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
     }
   }
 
-  *data = _pop_data (q);
+  *data = _pop_data (q, false);
   nns_edge_unlock (q);
 
   return (*data != NULL);
index e8bd6dca6a9cd39934cf7f6101c46ba5d8784bbb..2a480642bc7a175db4c9ca8d273ce583496ddf2e 100644 (file)
@@ -28,6 +28,15 @@ typedef void *nns_edge_queue_h;
  */
 typedef void (*nns_edge_queue_data_destroy_cb) (void *data);
 
+/**
+ * @brief Enumeration for the queue leaky option.
+ */
+typedef enum {
+  NNS_EDGE_QUEUE_LEAK_UNKNOWN = 0,
+  NNS_EDGE_QUEUE_LEAK_NEW,
+  NNS_EDGE_QUEUE_LEAK_OLD
+} nns_edge_queue_leak_e;
+
 /**
  * @brief Create queue.
  * @param[out] handle Newly created handle.
@@ -53,9 +62,10 @@ unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle);
  * @brief Set the max length of the queue.
  * @param[in] handle The queue handle.
  * @param[in] limit The max data in queue. Default 0 means unlimited.
+ * @param[in] leaky The queue leaky option.
  * @return true on success.
  */
-bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit);
+bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit, nns_edge_queue_leak_e leaky);
 
 /**
  * @brief Add new data into queue.
index 3ceabd040257330ed0d43e1b306fa5e2505aab4a..6acdf5348acc64f05068d154130d6f571625ea8b 100644 (file)
@@ -158,7 +158,7 @@ TEST(edge, connectLocal)
   nns_edge_set_info (server_h, "IP", "127.0.0.1");
   nns_edge_set_info (server_h, "PORT", val);
   nns_edge_set_info (server_h, "CAPS", "test server");
-  nns_edge_set_info (server_h, "QUEUE_SIZE", "10");
+  nns_edge_set_info (server_h, "QUEUE_SIZE", "10:OLD");
   _td_server->handle = server_h;
   nns_edge_free (val);
 
@@ -826,6 +826,26 @@ TEST(edge, setInfoInvalidParam08_n)
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 }
 
+/**
+ * @brief Set info - invalid param.
+ */
+TEST(edge, setInfoInvalidParam09_n)
+{
+  nns_edge_h edge_h;
+  int ret;
+
+  ret = nns_edge_create_handle ("temp-id", NNS_EDGE_CONNECT_TYPE_TCP,
+      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  /* Invalid option */
+  ret = nns_edge_set_info (edge_h, "QUEUE_SIZE", "15:INVALID_LEAKY");
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_release_handle (edge_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
 /**
  * @brief Get info.
  */
@@ -3325,7 +3345,7 @@ TEST(edgeQueue, setLimit)
   ASSERT_TRUE (data != NULL);
 
   EXPECT_TRUE (nns_edge_queue_create (&queue_h));
-  EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U));
+  EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
 
   for (i = 0; i < 5U; i++)
     nns_edge_queue_push (queue_h, data, NULL);
@@ -3338,12 +3358,89 @@ TEST(edgeQueue, setLimit)
   free (data);
 }
 
+/**
+ * @brief Set leaky option of queue.
+ */
+TEST(edgeQueue, setLeaky)
+{
+  nns_edge_queue_h queue_h;
+  void *data;
+  unsigned int i, len;
+  bool res;
+
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+  /* leaky option new */
+  EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
+
+  for (i = 0; i < 5U; i++) {
+    data = malloc (sizeof (unsigned int));
+    ASSERT_TRUE (data != NULL);
+
+    *((unsigned int *) data) = i + 1;
+
+    res = nns_edge_queue_push (queue_h, data, free);
+    if (i < 3U) {
+      EXPECT_TRUE (res);
+    } else {
+      EXPECT_FALSE (res);
+      free (data);
+    }
+  }
+
+  len = nns_edge_queue_get_length (queue_h);
+  EXPECT_EQ (len, 3U);
+
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_EQ (*((unsigned int *) data), 1U);
+  free (data);
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_EQ (*((unsigned int *) data), 2U);
+  free (data);
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_EQ (*((unsigned int *) data), 3U);
+  free (data);
+
+  len = nns_edge_queue_get_length (queue_h);
+  EXPECT_EQ (len, 0U);
+
+  /* leaky option old */
+  EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_OLD));
+
+  for (i = 0; i < 5U; i++) {
+    data = malloc (sizeof (unsigned int));
+    ASSERT_TRUE (data != NULL);
+
+    *((unsigned int *) data) = i + 1;
+
+    EXPECT_TRUE (nns_edge_queue_push (queue_h, data, free));
+  }
+
+  len = nns_edge_queue_get_length (queue_h);
+  EXPECT_EQ (len, 3U);
+
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_EQ (*((unsigned int *) data), 3U);
+  free (data);
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_EQ (*((unsigned int *) data), 4U);
+  free (data);
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_EQ (*((unsigned int *) data), 5U);
+  free (data);
+
+  len = nns_edge_queue_get_length (queue_h);
+  EXPECT_EQ (len, 0U);
+
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
 /**
  * @brief Set limit of queue - invalid param.
  */
 TEST(edgeQueue, setLimitInvalidParam01_n)
 {
-  EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U));
+  EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U, NNS_EDGE_QUEUE_LEAK_NEW));
 }
 
 /**
@@ -3552,6 +3649,7 @@ TEST(edgeMqtt, connectLocal)
   nns_edge_set_info (server_h, "DEST_PORT", "1883");
   nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic");
   nns_edge_set_info (server_h, "CAPS", "test server");
+  nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW");
   _td_server->handle = server_h;
 
   /* Prepare client */