Set max number in queue when sending data to other node.
Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
* DEST_IP or DEST_HOST | IP address of the destination node. In case of TCP connection, it is the IP address of the destination node, and in the case of Hybrid or AITT connection, it is the IP address of the broker.
* DEST_PORT | Port of the destination node. In case of TCP connection, it is the port number of the destination node, and in the case of Hybrid or AITT connection, it is the port number of the broker. The value should be 0 or higher.
* TOPIC | Topic used to publish/subscribe to/from the broker.
+ * QUEUE_SIZE | Max number of data in the queue, when sending edge data to other node. Default 0 means unlimited.
* ID or CLIENT_ID | Unique identifier of the edge handle or client ID. (Read-only)
*/
int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value);
/* Not allowed key */
nns_edge_loge ("Cannot update %s.", key);
ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
+ } else if (0 == strcasecmp (key, "QUEUE_SIZE")) {
+ unsigned int limit = (unsigned int) strtoull (value, NULL, 10);
+ nns_edge_queue_set_limit (eh->send_queue, limit);
} else {
ret = nns_edge_metadata_set (eh->metadata, key, value);
}
pthread_mutex_t lock;
pthread_cond_t cond;
+ unsigned int max_data; /**< Max data in queue (default 0 means unlimited) */
unsigned int length;
nns_edge_queue_data_s *head;
nns_edge_queue_data_s *tail;
return len;
}
+/**
+ * @brief Set the max length of the queue.
+ */
+bool
+nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit)
+{
+ nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+
+ if (!q) {
+ nns_edge_loge ("[Queue] Invalid param, queue is null.");
+ return false;
+ }
+
+ nns_edge_lock (q);
+ q->max_data = limit;
+ nns_edge_unlock (q);
+
+ return true;
+}
+
/**
* @brief Add new data into queue.
*/
qdata->destroy = destroy;
nns_edge_lock (q);
- if (!q->head)
- q->head = qdata;
- if (q->tail)
- q->tail->next = qdata;
- q->tail = qdata;
- q->length++;
+ if (q->max_data > 0U && q->length >= q->max_data) {
+ nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
+ q->max_data);
+ } else {
+ if (!q->head)
+ q->head = qdata;
+ if (q->tail)
+ q->tail->next = qdata;
+ q->tail = qdata;
+ q->length++;
+ }
nns_edge_cond_signal (q);
nns_edge_unlock (q);
*/
unsigned int nns_edge_queue_get_length (nns_edge_queue_h handle);
+/**
+ * @brief Set the max length of the queue.
+ * @param[in] handle The queue handle.
+ * @param[in] limit The max data in queue. Default 0 means unlimited.
+ * @return true on success.
+ */
+bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit);
+
/**
* @brief Add new data into queue.
* @param[in] handle The queue handle.
nns_edge_set_info (server_h, "IP", "127.0.0.1");
nns_edge_set_info (server_h, "PORT", val);
nns_edge_set_info (server_h, "CAPS", "test server");
+ nns_edge_set_info (server_h, "QUEUE_SIZE", "10");
_td_server->handle = server_h;
nns_edge_free (val);
EXPECT_EQ (len, 0U);
}
+/**
+ * @brief Set limit of queue.
+ */
+TEST(edgeQueue, setLimit)
+{
+ nns_edge_queue_h queue_h;
+ void *data;
+ unsigned int i, len;
+
+ data = malloc (sizeof (unsigned int));
+ ASSERT_TRUE (data != NULL);
+
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+ EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U));
+
+ for (i = 0; i < 5U; i++)
+ nns_edge_queue_push (queue_h, data, NULL);
+
+ len = nns_edge_queue_get_length (queue_h);
+ EXPECT_EQ (len, 3U);
+
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+
+ free (data);
+}
+
+/**
+ * @brief Set limit of queue - invalid param.
+ */
+TEST(edgeQueue, setLimitInvalidParam01_n)
+{
+ EXPECT_FALSE (nns_edge_queue_set_limit (NULL, 5U));
+}
+
/**
* @brief Push data into queue - invalid param.
*/