Add leaky option in nns-edge queue.
Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
* DEST_IP or DEST_HOST | IP address of the destination node. In case of TCP connection, it is the IP address of the destination node, and in the case of Hybrid or AITT connection, it is the IP address of the broker.
* DEST_PORT | Port of the destination node. In case of TCP connection, it is the port number of the destination node, and in the case of Hybrid or AITT connection, it is the port number of the broker. The value should be 0 or higher.
* TOPIC | Topic used to publish/subscribe to/from the broker.
- * QUEUE_SIZE | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited.
+ * QUEUE_SIZE | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited. N:<leaky [NEW, OLD]> where leaky 'OLD' drops old buffer (default NEW). (e.g., QUEUE_SIZE=5:OLD drops old buffer and pushes new data when queue size reaches 5.)
* ID or CLIENT_ID | Unique identifier of the edge handle or client ID. (Read-only)
*/
int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value);
nns_edge_loge ("Cannot update %s.", key);
ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
} else if (0 == strcasecmp (key, "QUEUE_SIZE")) {
- unsigned int limit = (unsigned int) strtoull (value, NULL, 10);
- nns_edge_queue_set_limit (eh->send_queue, limit);
+ char *s;
+ unsigned int limit;
+ nns_edge_queue_leak_e leaky = NNS_EDGE_QUEUE_LEAK_UNKNOWN;
+
+ s = strstr (value, ":");
+ if (s) {
+ char *v = nns_edge_strndup (value, s - value);
+
+ limit = (unsigned int) strtoull (v, NULL, 10);
+ nns_edge_free (v);
+
+ if (strcasecmp (s + 1, "NEW") == 0) {
+ leaky = NNS_EDGE_QUEUE_LEAK_NEW;
+ } else if (strcasecmp (s + 1, "OLD") == 0) {
+ leaky = NNS_EDGE_QUEUE_LEAK_OLD;
+ } else {
+ nns_edge_loge ("Cannot set queue leaky option (%s).", s + 1);
+ ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+ } else {
+ limit = (unsigned int) strtoull (value, NULL, 10);
+ }
+
+ nns_edge_queue_set_limit (eh->send_queue, limit, leaky);
} else {
ret = nns_edge_metadata_set (eh->metadata, key, value);
}
pthread_mutex_t lock;
pthread_cond_t cond;
+ nns_edge_queue_leak_e leaky;
unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */
unsigned int length;
nns_edge_queue_data_s *head;
} nns_edge_queue_s;
/**
- * @brief Pop data from queue.
+ * @brief Pop data from queue. If the param 'clear' is true, release old data and return null.
* @note This function should be called with lock.
*/
static void *
-_pop_data (nns_edge_queue_s * q)
+_pop_data (nns_edge_queue_s * q, bool clear)
{
nns_edge_queue_data_s *qdata;
void *data = NULL;
qdata = q->head;
if (qdata) {
- data = qdata->data;
-
q->head = qdata->next;
if ((--q->length) == 0U)
q->head = q->tail = NULL;
+ if (clear) {
+ if (qdata->destroy)
+ qdata->destroy (qdata->data);
+ } else {
+ data = qdata->data;
+ }
+
free (qdata);
}
nns_edge_lock_init (q);
nns_edge_cond_init (q);
+ q->leaky = NNS_EDGE_QUEUE_LEAK_NEW;
*handle = q;
return true;
* @brief Set the max length of the queue.
*/
bool
-nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit)
+nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit,
+ nns_edge_queue_leak_e leaky)
{
nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
nns_edge_lock (q);
q->max_data = limit;
+ if (leaky != NNS_EDGE_QUEUE_LEAK_UNKNOWN)
+ q->leaky = leaky;
nns_edge_unlock (q);
return true;
{
nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
nns_edge_queue_data_s *qdata;
+ bool pushed = false;
if (!q) {
nns_edge_loge ("[Queue] Invalid param, queue is null.");
nns_edge_lock (q);
if (q->max_data > 0U && q->length >= q->max_data) {
- nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
- q->max_data);
- } else {
- if (!q->head)
- q->head = qdata;
- if (q->tail)
- q->tail->next = qdata;
- q->tail = qdata;
- q->length++;
+ /* Clear old data in queue if leaky option is 'old'. */
+ if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) {
+ _pop_data (q, true);
+ } else {
+ nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
+ q->max_data);
+ goto done;
+ }
}
+
+ if (!q->head)
+ q->head = qdata;
+ if (q->tail)
+ q->tail->next = qdata;
+ q->tail = qdata;
+ q->length++;
+ pushed = true;
+
+done:
nns_edge_cond_signal (q);
nns_edge_unlock (q);
- return true;
+ return pushed;
}
/**
}
nns_edge_lock (q);
- *data = _pop_data (q);
+ *data = _pop_data (q, false);
nns_edge_unlock (q);
return (*data != NULL);
}
}
- *data = _pop_data (q);
+ *data = _pop_data (q, false);
nns_edge_unlock (q);
return (*data != NULL);
*/
typedef void (*nns_edge_queue_data_destroy_cb) (void *data);
+/**
+ * @brief Enumeration for the queue leaky option.
+ */
+typedef enum {
+ NNS_EDGE_QUEUE_LEAK_UNKNOWN = 0,
+ NNS_EDGE_QUEUE_LEAK_NEW,
+ NNS_EDGE_QUEUE_LEAK_OLD
+} nns_edge_queue_leak_e;
+
/**
* @brief Create queue.
* @param[out] handle Newly created handle.
* @brief Set the max length of the queue.
* @param[in] handle The queue handle.
* @param[in] limit The max data in queue. Default 0 means unlimited.
+ * @param[in] leaky The queue leaky option.
* @return true on success.
*/
-bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit);
+bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit, nns_edge_queue_leak_e leaky);
/**
* @brief Add new data into queue.
nns_edge_set_info (server_h, "IP", "127.0.0.1");
nns_edge_set_info (server_h, "PORT", val);
nns_edge_set_info (server_h, "CAPS", "test server");
- nns_edge_set_info (server_h, "QUEUE_SIZE", "10");
+ nns_edge_set_info (server_h, "QUEUE_SIZE", "10:OLD");
_td_server->handle = server_h;
nns_edge_free (val);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
+/**
+ * @brief Set info - invalid param.
+ */
+TEST(edge, setInfoInvalidParam09_n)
+{
+ nns_edge_h edge_h;
+ int ret;
+
+ ret = nns_edge_create_handle ("temp-id", NNS_EDGE_CONNECT_TYPE_TCP,
+ NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ /* Invalid option */
+ ret = nns_edge_set_info (edge_h, "QUEUE_SIZE", "15:INVALID_LEAKY");
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_release_handle (edge_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
/**
* @brief Get info.
*/
ASSERT_TRUE (data != NULL);
EXPECT_TRUE (nns_edge_queue_create (&queue_h));
- EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U));
+ EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
for (i = 0; i < 5U; i++)
nns_edge_queue_push (queue_h, data, NULL);
free (data);
}
+/**
+ * @brief Set leaky option of queue.
+ */
+TEST(edgeQueue, setLeaky)
+{
+ nns_edge_queue_h queue_h;
+ void *data;
+ unsigned int i, len;
+ bool res;
+
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+ /* leaky option new */
+ EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
+
+ for (i = 0; i < 5U; i++) {
+ data = malloc (sizeof (unsigned int));
+ ASSERT_TRUE (data != NULL);
+
+ *((unsigned int *) data) = i + 1;
+
+ res = nns_edge_queue_push (queue_h, data, free);
+ if (i < 3U) {
+ EXPECT_TRUE (res);
+ } else {
+ EXPECT_FALSE (res);
+ free (data);
+ }
+ }
+
+ len = nns_edge_queue_get_length (queue_h);
+ EXPECT_EQ (len, 3U);
+
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_EQ (*((unsigned int *) data), 1U);
+ free (data);
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_EQ (*((unsigned int *) data), 2U);
+ free (data);
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_EQ (*((unsigned int *) data), 3U);
+ free (data);
+
+ len = nns_edge_queue_get_length (queue_h);
+ EXPECT_EQ (len, 0U);
+
+ /* leaky option old */
+ EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_OLD));
+
+ for (i = 0; i < 5U; i++) {
+ data = malloc (sizeof (unsigned int));
+ ASSERT_TRUE (data != NULL);
+
+ *((unsigned int *) data) = i + 1;
+
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data, free));
+ }
+
+ len = nns_edge_queue_get_length (queue_h);
+ EXPECT_EQ (len, 3U);
+
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_EQ (*((unsigned int *) data), 3U);
+ free (data);
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_EQ (*((unsigned int *) data), 4U);
+ free (data);
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_EQ (*((unsigned int *) data), 5U);
+ free (data);
+
+ len = nns_edge_queue_get_length (queue_h);
+ EXPECT_EQ (len, 0U);
+
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
/**
* @brief Set limit of queue - invalid param.
*/
TEST(edgeQueue, setLimitInvalidParam01_n)
{
- EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U));
+ EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U, NNS_EDGE_QUEUE_LEAK_NEW));
}
/**
nns_edge_set_info (server_h, "DEST_PORT", "1883");
nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic");
nns_edge_set_info (server_h, "CAPS", "test server");
+ nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW");
_td_server->handle = server_h;
/* Prepare client */