* @bug No known bugs except for NYI items.
*/
+#include "nnstreamer-edge-internal.h"
#include "nnstreamer-edge-log.h"
#include "nnstreamer-edge-queue.h"
#include "nnstreamer-edge-util.h"
*/
struct _nns_edge_queue_data_s
{
- void *data;
- nns_edge_data_destroy_cb destroy;
+ nns_edge_raw_data_s data;
nns_edge_queue_data_s *next;
};
* @brief Pop data from queue. If the param 'clear' is true, release old data and return null.
* @note This function should be called with lock.
*/
-static void *
-_pop_data (nns_edge_queue_s * q, bool clear)
+static bool
+_pop_data (nns_edge_queue_s * q, bool clear, void **data, nns_size_t * size)
{
nns_edge_queue_data_s *qdata;
- void *data = NULL;
+ bool popped = false;
qdata = q->head;
if (qdata) {
q->head = q->tail = NULL;
if (clear) {
- if (qdata->destroy)
- qdata->destroy (qdata->data);
+ if (qdata->data.destroy_cb)
+ qdata->data.destroy_cb (qdata->data.data);
} else {
- data = qdata->data;
+ if (data)
+ *data = qdata->data.data;
+ if (size)
+ *size = qdata->data.data_len;
+ popped = true;
}
free (qdata);
}
- return data;
+ return popped;
}
/**
nns_edge_cond_signal (q);
while (q->length > 0U)
- _pop_data (q, true);
+ _pop_data (q, true, NULL, NULL);
nns_edge_unlock (q);
* @brief Add new data into queue.
*/
bool
-nns_edge_queue_push (nns_edge_queue_h handle, void *data,
+nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_size_t size,
nns_edge_data_destroy_cb destroy)
{
nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
return false;
}
+ if (size == 0U) {
+ nns_edge_loge ("[Queue] Invalid param, size should be larger than zero.");
+ return false;
+ }
+
nns_edge_lock (q);
if (q->max_data > 0U && q->length >= q->max_data) {
/* Clear old data in queue if leaky option is 'old'. */
if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) {
- _pop_data (q, true);
+ _pop_data (q, true, NULL, NULL);
} else {
nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
q->max_data);
goto done;
}
- qdata->data = data;
- qdata->destroy = destroy;
+ qdata->data.data = data;
+ qdata->data.data_len = size;
+ qdata->data.destroy_cb = destroy;
if (!q->head)
q->head = qdata;
* @brief Remove and return the first data in queue.
*/
bool
-nns_edge_queue_pop (nns_edge_queue_h handle, void **data)
+nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t * size)
{
nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+ bool popped = false;
if (!q) {
nns_edge_loge ("[Queue] Invalid param, queue is null.");
return false;
}
+ if (!size) {
+ nns_edge_loge ("[Queue] Invalid param, size is null.");
+ return false;
+ }
+
nns_edge_lock (q);
- *data = _pop_data (q, false);
+ popped = _pop_data (q, false, data, size);
nns_edge_unlock (q);
- return (*data != NULL);
+ return (popped && *data != NULL);
}
/**
*/
bool
nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
- void **data)
+ void **data, nns_size_t * size)
{
nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+ bool popped = false;
if (!q) {
nns_edge_loge ("[Queue] Invalid param, queue is null.");
return false;
}
+ if (!size) {
+ nns_edge_loge ("[Queue] Invalid param, size is null.");
+ return false;
+ }
+
nns_edge_lock (q);
if (q->length == 0U) {
if (timeout > 0) {
}
}
- *data = _pop_data (q, false);
+ popped = _pop_data (q, false, data, size);
nns_edge_unlock (q);
- return (*data != NULL);
+ return (popped && *data != NULL);
}
nns_edge_queue_h queue_h = thread_data;
unsigned int i, j;
void *data;
+ nns_size_t dsize;
for (i = 0; i < 6U; i++) {
usleep (50000);
- data = malloc (5 * sizeof (unsigned int));
+ dsize = 5 * sizeof (unsigned int);
+ data = malloc (dsize);
if (data) {
for (j = 0; j < 5U; j++)
((unsigned int *) data)[j] = i * 10U + j;
}
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data, nns_edge_free));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data, dsize, nns_edge_free));
}
return NULL;
{
nns_edge_queue_h queue_h;
void *data1, *data2, *data3, *result;
+ nns_size_t dsize, rsize;
unsigned int i, len;
- data1 = malloc (5 * sizeof (unsigned int));
+ dsize = 5 * sizeof (unsigned int);
+
+ data1 = malloc (dsize);
ASSERT_TRUE (data1 != NULL);
for (i = 0; i < 5U; i++)
((unsigned int *) data1)[i] = i + 10U;
- data2 = malloc (5 * sizeof (unsigned int));
+ data2 = malloc (dsize);
ASSERT_TRUE (data1 != NULL);
for (i = 0; i < 5U; i++)
((unsigned int *) data2)[i] = i + 20U;
- data3 = malloc (5 * sizeof (unsigned int));
+ data3 = malloc (dsize);
ASSERT_TRUE (data1 != NULL);
for (i = 0; i < 5U; i++)
((unsigned int *) data3)[i] = i + 30U;
EXPECT_TRUE (nns_edge_queue_create (&queue_h));
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, NULL));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, dsize, NULL));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 1U);
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, NULL));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, dsize, NULL));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 2U);
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, NULL));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, dsize, NULL));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 3U);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));
+ rsize = 0U;
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 2U);
EXPECT_EQ (result, data1);
+ EXPECT_EQ (dsize, rsize);
for (i = 0; i < 5U; i++)
EXPECT_EQ (((unsigned int *) result)[i], i + 10U);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));
+ rsize = 0U;
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 1U);
EXPECT_EQ (result, data2);
+ EXPECT_EQ (dsize, rsize);
for (i = 0; i < 5U; i++)
EXPECT_EQ (((unsigned int *) result)[i], i + 20U);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));
+ rsize = 0U;
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 0U);
EXPECT_EQ (result, data3);
+ EXPECT_EQ (dsize, rsize);
for (i = 0; i < 5U; i++)
EXPECT_EQ (((unsigned int *) result)[i], i + 30U);
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, nns_edge_free));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, dsize, nns_edge_free));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 1U);
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, nns_edge_free));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, dsize, nns_edge_free));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 2U);
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, nns_edge_free));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, dsize, nns_edge_free));
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 3U);
for (i = 0; i < 3U; i++) {
void *result = NULL;
+ nns_size_t rsize = 0U;
- EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result));
+ EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result, &rsize));
for (j = 0; j < 5U; j++)
EXPECT_EQ (((unsigned int *) result)[j], i * 10U + j);
+ EXPECT_EQ (rsize, 5 * sizeof (unsigned int));
free (result);
}
{
nns_edge_queue_h queue_h;
void *data;
+ nns_size_t dsize;
unsigned int i, len;
- data = malloc (sizeof (unsigned int));
+ dsize = sizeof (unsigned int);
+ data = malloc (dsize);
ASSERT_TRUE (data != NULL);
EXPECT_TRUE (nns_edge_queue_create (&queue_h));
EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
for (i = 0; i < 5U; i++)
- nns_edge_queue_push (queue_h, data, NULL);
+ nns_edge_queue_push (queue_h, data, dsize, NULL);
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 3U);
{
nns_edge_queue_h queue_h;
void *data;
+ nns_size_t dsize, rsize;
unsigned int i, len;
bool res;
/* leaky option new */
EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
+ dsize = sizeof (unsigned int);
+
for (i = 0; i < 5U; i++) {
- data = malloc (sizeof (unsigned int));
+ data = malloc (dsize);
ASSERT_TRUE (data != NULL);
*((unsigned int *) data) = i + 1;
- res = nns_edge_queue_push (queue_h, data, free);
+ res = nns_edge_queue_push (queue_h, data, dsize, free);
if (i < 3U) {
EXPECT_TRUE (res);
} else {
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 3U);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
EXPECT_EQ (*((unsigned int *) data), 1U);
free (data);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
EXPECT_EQ (*((unsigned int *) data), 2U);
free (data);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
EXPECT_EQ (*((unsigned int *) data), 3U);
free (data);
EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_OLD));
for (i = 0; i < 5U; i++) {
- data = malloc (sizeof (unsigned int));
+ data = malloc (dsize);
ASSERT_TRUE (data != NULL);
*((unsigned int *) data) = i + 1;
- EXPECT_TRUE (nns_edge_queue_push (queue_h, data, free));
+ EXPECT_TRUE (nns_edge_queue_push (queue_h, data, dsize, free));
}
len = nns_edge_queue_get_length (queue_h);
EXPECT_EQ (len, 3U);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
EXPECT_EQ (*((unsigned int *) data), 3U);
free (data);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
EXPECT_EQ (*((unsigned int *) data), 4U);
free (data);
- EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+ EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
EXPECT_EQ (*((unsigned int *) data), 5U);
free (data);
TEST(edgeQueue, pushInvalidParam01_n)
{
void *data;
+ nns_size_t dsize;
- data = malloc (5 * sizeof (unsigned int));
+ dsize = 5 * sizeof (unsigned int);
+ data = malloc (dsize);
ASSERT_TRUE (data != NULL);
- EXPECT_FALSE (nns_edge_queue_push (NULL, data, NULL));
+ EXPECT_FALSE (nns_edge_queue_push (NULL, data, dsize, NULL));
free (data);
}
TEST(edgeQueue, pushInvalidParam02_n)
{
nns_edge_queue_h queue_h;
+ nns_size_t dsize;
+
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+ dsize = 5 * sizeof (unsigned int);
+ EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, dsize, NULL));
+
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
+/**
+ * @brief Push data into queue - invalid param.
+ */
+TEST(edgeQueue, pushInvalidParam03_n)
+{
+ nns_edge_queue_h queue_h;
+ void *data;
EXPECT_TRUE (nns_edge_queue_create (&queue_h));
- EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, NULL));
+ data = malloc (5 * sizeof (unsigned int));
+ ASSERT_TRUE (data != NULL);
+
+ EXPECT_FALSE (nns_edge_queue_push (queue_h, data, 0U, NULL));
EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
}
TEST(edgeQueue, popInvalidParam01_n)
{
void *data;
+ nns_size_t size;
- EXPECT_FALSE (nns_edge_queue_pop (NULL, &data));
+ EXPECT_FALSE (nns_edge_queue_pop (NULL, &data, &size));
}
/**
TEST(edgeQueue, popInvalidParam02_n)
{
nns_edge_queue_h queue_h;
+ nns_size_t size;
+
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+ EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL, &size));
+
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
+/**
+ * @brief Pop data from queue - invalid param.
+ */
+TEST(edgeQueue, popInvalidParam03_n)
+{
+ nns_edge_queue_h queue_h;
+ void *data;
EXPECT_TRUE (nns_edge_queue_create (&queue_h));
- EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL));
+ EXPECT_FALSE (nns_edge_queue_pop (queue_h, &data, NULL));
EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
}
{
nns_edge_queue_h queue_h;
void *data;
+ nns_size_t size;
EXPECT_TRUE (nns_edge_queue_create (&queue_h));
- EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data));
+ EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data, &size));
EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
}
TEST(edgeQueue, waitPopInvalidParam01_n)
{
void *data;
+ nns_size_t size;
- EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data));
+ EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data, &size));
}
/**
TEST(edgeQueue, waitPopInvalidParam02_n)
{
nns_edge_queue_h queue_h;
+ nns_size_t size;
EXPECT_TRUE (nns_edge_queue_create (&queue_h));
- EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL));
+ EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL, &size));
+
+ EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
+/**
+ * @brief Wait and pop data from queue - invalid param.
+ */
+TEST(edgeQueue, waitPopInvalidParam03_n)
+{
+ nns_edge_queue_h queue_h;
+ void *data;
+
+ EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+ EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, &data, NULL));
EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
}
/**
* @brief Get message with invalid param.
*/
-TEST(edgeMqtt, getMessageInvalidParam_n)
+TEST(edgeMqtt, getMessageInvalidParam1_n)
{
int ret = -1;
- char *msg = NULL;
+ void *msg = NULL;
+ nns_size_t msg_len;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_mqtt_get_message (NULL, &msg);
+ ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
}
{
int ret = -1;
nns_edge_broker_h broker_h;
+ nns_size_t msg_len;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_close (broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqtt, getMessageInvalidParam3_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+ void *msg = NULL;
if (!_check_mqtt_broker ())
return;
ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_mqtt_get_message (broker_h, NULL);
+ ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
ret = nns_edge_mqtt_close (broker_h);
{
int ret = -1;
nns_edge_broker_h broker_h;
- char *msg = NULL;
+ void *msg = NULL;
+ nns_size_t msg_len;
if (!_check_mqtt_broker ())
return;
ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_mqtt_get_message (broker_h, &msg);
+ ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
ret = nns_edge_mqtt_close (broker_h);