From: Jaeyun Date: Tue, 22 Nov 2022 10:38:28 +0000 (+0900) Subject: [CodeClean/Queue] handle data size X-Git-Tag: accepted/tizen/unified/20221226.070011~7 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=1129c4812f7776c5e22ff361347f4ccb80516897;p=platform%2Fupstream%2Fnnstreamer-edge.git [CodeClean/Queue] handle data size Code clean, prepare next PR. Handle data size of queue. Signed-off-by: Jaeyun --- diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index cb65f59..0f4552a 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -797,11 +797,12 @@ _nns_edge_send_thread (void *thread_data) nns_edge_conn_data_s *conn_data; nns_edge_conn_s *conn; nns_edge_data_h data_h; + nns_size_t data_size; int64_t client_id; char *val; int ret; - while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) { + while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h, &data_size)) { /* Send data to destination */ switch (eh->connect_type) { case NNS_EDGE_CONNECT_TYPE_TCP: @@ -1500,9 +1501,10 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) char *msg = NULL; char *server_ip = NULL; int server_port = 0; + nns_size_t msg_len = 0; - ret = nns_edge_mqtt_get_message (eh->broker_h, &msg); - if (ret != NNS_EDGE_ERROR_NONE || !msg) + ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len); + if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0) break; nns_edge_parse_host_string (msg, &server_ip, &server_port); @@ -1642,7 +1644,7 @@ nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h) nns_edge_data_copy (data_h, &new_data_h); if (!nns_edge_queue_push (eh->send_queue, new_data_h, - nns_edge_data_release_handle)) { + sizeof (nns_edge_data_h), nns_edge_data_release_handle)) { nns_edge_loge ("Failed to send data, cannot push data into queue."); nns_edge_unlock (eh); return NNS_EDGE_ERROR_IO; diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c index 235b302..14c838c 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -43,6 +43,7 @@ on_message_callback (struct mosquitto *client, void *data, { nns_edge_broker_s *bh = (nns_edge_broker_s *) data; char *msg = NULL; + nns_size_t msg_len; if (!bh) { nns_edge_loge ("Invalid param, given broker handle is invalid."); @@ -57,9 +58,10 @@ on_message_callback (struct mosquitto *client, void *data, nns_edge_logd ("MQTT message is arrived (ID:%d, Topic:%s).", message->mid, message->topic); - msg = nns_edge_memdup (message->payload, message->payloadlen); + msg_len = (nns_size_t) message->payloadlen; + msg = nns_edge_memdup (message->payload, msg_len); if (msg) - nns_edge_queue_push (bh->server_list, msg, nns_edge_free); + nns_edge_queue_push (bh->server_list, msg, msg_len, nns_edge_free); return; } @@ -309,7 +311,8 @@ nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h) * @brief Get message from mqtt broker. */ int -nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg) +nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, + nns_size_t * msg_len) { nns_edge_broker_s *bh; @@ -326,7 +329,7 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg) bh = (nns_edge_broker_s *) broker_h; /* Wait for 1 second */ - if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { + if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, msg, msg_len)) { nns_edge_loge ("Failed to get message from mqtt broker within timeout."); return NNS_EDGE_ERROR_UNKNOWN; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c index b45dde1..d5ea8f6 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c @@ -43,6 +43,7 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, { nns_edge_broker_s *bh; char *msg = NULL; + nns_size_t msg_len; UNUSED (topic); UNUSED (topic_len); @@ -61,9 +62,10 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", bh->id, bh->topic); - msg = nns_edge_memdup (message->payload, message->payloadlen); + msg_len = (nns_size_t) message->payloadlen; + msg = nns_edge_memdup (message->payload, msg_len); if (msg) - nns_edge_queue_push (bh->server_list, msg, nns_edge_free); + nns_edge_queue_push (bh->server_list, msg, msg_len, nns_edge_free); return TRUE; } @@ -348,7 +350,8 @@ nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h) * @brief Get message from mqtt broker. */ int -nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg) +nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, + nns_size_t * msg_len) { nns_edge_broker_s *bh; @@ -365,7 +368,7 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg) bh = (nns_edge_broker_s *) broker_h; /* Wait for 1 second */ - if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { + if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, msg, msg_len)) { nns_edge_loge ("Failed to get message from mqtt broker within timeout."); return NNS_EDGE_ERROR_UNKNOWN; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h index 76cd8aa..f6bc732 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h @@ -53,10 +53,9 @@ int nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h); bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h); /** - * @brief Get message from mqtt broker. + * @brief Get message from mqtt broker. If no message in the queue, it waits up to 1 second for new message. */ -int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg); - +int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t *msg_len); #else /** * @todo consider to change code style later. diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.c b/src/libnnstreamer-edge/nnstreamer-edge-queue.c index 4680888..0078e88 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-queue.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.c @@ -10,6 +10,7 @@ * @bug No known bugs except for NYI items. */ +#include "nnstreamer-edge-internal.h" #include "nnstreamer-edge-log.h" #include "nnstreamer-edge-queue.h" #include "nnstreamer-edge-util.h" @@ -24,8 +25,7 @@ typedef struct _nns_edge_queue_data_s nns_edge_queue_data_s; */ struct _nns_edge_queue_data_s { - void *data; - nns_edge_data_destroy_cb destroy; + nns_edge_raw_data_s data; nns_edge_queue_data_s *next; }; @@ -48,11 +48,11 @@ typedef struct * @brief Pop data from queue. If the param 'clear' is true, release old data and return null. * @note This function should be called with lock. */ -static void * -_pop_data (nns_edge_queue_s * q, bool clear) +static bool +_pop_data (nns_edge_queue_s * q, bool clear, void **data, nns_size_t * size) { nns_edge_queue_data_s *qdata; - void *data = NULL; + bool popped = false; qdata = q->head; if (qdata) { @@ -61,16 +61,20 @@ _pop_data (nns_edge_queue_s * q, bool clear) q->head = q->tail = NULL; if (clear) { - if (qdata->destroy) - qdata->destroy (qdata->data); + if (qdata->data.destroy_cb) + qdata->data.destroy_cb (qdata->data.data); } else { - data = qdata->data; + if (data) + *data = qdata->data.data; + if (size) + *size = qdata->data.data_len; + popped = true; } free (qdata); } - return data; + return popped; } /** @@ -117,7 +121,7 @@ nns_edge_queue_destroy (nns_edge_queue_h handle) nns_edge_cond_signal (q); while (q->length > 0U) - _pop_data (q, true); + _pop_data (q, true, NULL, NULL); nns_edge_unlock (q); @@ -176,7 +180,7 @@ nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit, * @brief Add new data into queue. */ bool -nns_edge_queue_push (nns_edge_queue_h handle, void *data, +nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_size_t size, nns_edge_data_destroy_cb destroy) { nns_edge_queue_s *q = (nns_edge_queue_s *) handle; @@ -193,11 +197,16 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data, return false; } + if (size == 0U) { + nns_edge_loge ("[Queue] Invalid param, size should be larger than zero."); + return false; + } + nns_edge_lock (q); if (q->max_data > 0U && q->length >= q->max_data) { /* Clear old data in queue if leaky option is 'old'. */ if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) { - _pop_data (q, true); + _pop_data (q, true, NULL, NULL); } else { nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.", q->max_data); @@ -211,8 +220,9 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data, goto done; } - qdata->data = data; - qdata->destroy = destroy; + qdata->data.data = data; + qdata->data.data_len = size; + qdata->data.destroy_cb = destroy; if (!q->head) q->head = qdata; @@ -233,9 +243,10 @@ done: * @brief Remove and return the first data in queue. */ bool -nns_edge_queue_pop (nns_edge_queue_h handle, void **data) +nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t * size) { nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + bool popped = false; if (!q) { nns_edge_loge ("[Queue] Invalid param, queue is null."); @@ -247,11 +258,16 @@ nns_edge_queue_pop (nns_edge_queue_h handle, void **data) return false; } + if (!size) { + nns_edge_loge ("[Queue] Invalid param, size is null."); + return false; + } + nns_edge_lock (q); - *data = _pop_data (q, false); + popped = _pop_data (q, false, data, size); nns_edge_unlock (q); - return (*data != NULL); + return (popped && *data != NULL); } /** @@ -259,9 +275,10 @@ nns_edge_queue_pop (nns_edge_queue_h handle, void **data) */ bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, - void **data) + void **data, nns_size_t * size) { nns_edge_queue_s *q = (nns_edge_queue_s *) handle; + bool popped = false; if (!q) { nns_edge_loge ("[Queue] Invalid param, queue is null."); @@ -273,6 +290,11 @@ nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, return false; } + if (!size) { + nns_edge_loge ("[Queue] Invalid param, size is null."); + return false; + } + nns_edge_lock (q); if (q->length == 0U) { if (timeout > 0) { @@ -290,8 +312,8 @@ nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, } } - *data = _pop_data (q, false); + popped = _pop_data (q, false, data, size); nns_edge_unlock (q); - return (*data != NULL); + return (popped && *data != NULL); } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.h b/src/libnnstreamer-edge/nnstreamer-edge-queue.h index 176ccdf..7bc1994 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-queue.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.h @@ -67,27 +67,30 @@ bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit, nns_ * @brief Add new data into queue. * @param[in] handle The queue handle. * @param[in] data The data to be added. + * @param[in] size The size of pushed data. * @param[in] destroy Nullable, the callback function to release data. * @return true on success. */ -bool nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_edge_data_destroy_cb destroy); +bool nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_size_t size, nns_edge_data_destroy_cb destroy); /** * @brief Remove and return the first data in queue. * @param[in] handle The queue handle. * @param[out] data The data in the queue. + * @param[out] size The size of data. * @return true on success. false if queue is empty. */ -bool nns_edge_queue_pop (nns_edge_queue_h handle, void **data); +bool nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t *size); /** * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue. * @param[in] handle The queue handle. * @param[in] timeout The time to wait for new data, in milliseconds. (0 for infinite timeout) * @param[out] data The data in the queue. + * @param[out] size The size of data. * @return true on success. */ -bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, void **data); +bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, void **data, nns_size_t *size); #ifdef __cplusplus } diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index 5f1f575..e9456a8 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -3216,17 +3216,19 @@ _test_thread_edge_queue_push (void *thread_data) nns_edge_queue_h queue_h = thread_data; unsigned int i, j; void *data; + nns_size_t dsize; for (i = 0; i < 6U; i++) { usleep (50000); - data = malloc (5 * sizeof (unsigned int)); + dsize = 5 * sizeof (unsigned int); + data = malloc (dsize); if (data) { for (j = 0; j < 5U; j++) ((unsigned int *) data)[j] = i * 10U + j; } - EXPECT_TRUE (nns_edge_queue_push (queue_h, data, nns_edge_free)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data, dsize, nns_edge_free)); } return NULL; @@ -3239,67 +3241,76 @@ TEST(edgeQueue, pushData) { nns_edge_queue_h queue_h; void *data1, *data2, *data3, *result; + nns_size_t dsize, rsize; unsigned int i, len; - data1 = malloc (5 * sizeof (unsigned int)); + dsize = 5 * sizeof (unsigned int); + + data1 = malloc (dsize); ASSERT_TRUE (data1 != NULL); for (i = 0; i < 5U; i++) ((unsigned int *) data1)[i] = i + 10U; - data2 = malloc (5 * sizeof (unsigned int)); + data2 = malloc (dsize); ASSERT_TRUE (data1 != NULL); for (i = 0; i < 5U; i++) ((unsigned int *) data2)[i] = i + 20U; - data3 = malloc (5 * sizeof (unsigned int)); + data3 = malloc (dsize); ASSERT_TRUE (data1 != NULL); for (i = 0; i < 5U; i++) ((unsigned int *) data3)[i] = i + 30U; EXPECT_TRUE (nns_edge_queue_create (&queue_h)); - EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, NULL)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, dsize, NULL)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 1U); - EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, NULL)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, dsize, NULL)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 2U); - EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, NULL)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, dsize, NULL)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 3U); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result)); + rsize = 0U; + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 2U); EXPECT_EQ (result, data1); + EXPECT_EQ (dsize, rsize); for (i = 0; i < 5U; i++) EXPECT_EQ (((unsigned int *) result)[i], i + 10U); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result)); + rsize = 0U; + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 1U); EXPECT_EQ (result, data2); + EXPECT_EQ (dsize, rsize); for (i = 0; i < 5U; i++) EXPECT_EQ (((unsigned int *) result)[i], i + 20U); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result)); + rsize = 0U; + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 0U); EXPECT_EQ (result, data3); + EXPECT_EQ (dsize, rsize); for (i = 0; i < 5U; i++) EXPECT_EQ (((unsigned int *) result)[i], i + 30U); - EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, nns_edge_free)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, dsize, nns_edge_free)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 1U); - EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, nns_edge_free)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, dsize, nns_edge_free)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 2U); - EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, nns_edge_free)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, dsize, nns_edge_free)); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 3U); @@ -3325,11 +3336,13 @@ TEST(edgeQueue, pushDataOnThread) for (i = 0; i < 3U; i++) { void *result = NULL; + nns_size_t rsize = 0U; - EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result)); + EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result, &rsize)); for (j = 0; j < 5U; j++) EXPECT_EQ (((unsigned int *) result)[j], i * 10U + j); + EXPECT_EQ (rsize, 5 * sizeof (unsigned int)); free (result); } @@ -3377,16 +3390,18 @@ TEST(edgeQueue, setLimit) { nns_edge_queue_h queue_h; void *data; + nns_size_t dsize; unsigned int i, len; - data = malloc (sizeof (unsigned int)); + dsize = sizeof (unsigned int); + data = malloc (dsize); ASSERT_TRUE (data != NULL); EXPECT_TRUE (nns_edge_queue_create (&queue_h)); EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW)); for (i = 0; i < 5U; i++) - nns_edge_queue_push (queue_h, data, NULL); + nns_edge_queue_push (queue_h, data, dsize, NULL); len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 3U); @@ -3403,6 +3418,7 @@ TEST(edgeQueue, setLeaky) { nns_edge_queue_h queue_h; void *data; + nns_size_t dsize, rsize; unsigned int i, len; bool res; @@ -3411,13 +3427,15 @@ TEST(edgeQueue, setLeaky) /* leaky option new */ EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW)); + dsize = sizeof (unsigned int); + for (i = 0; i < 5U; i++) { - data = malloc (sizeof (unsigned int)); + data = malloc (dsize); ASSERT_TRUE (data != NULL); *((unsigned int *) data) = i + 1; - res = nns_edge_queue_push (queue_h, data, free); + res = nns_edge_queue_push (queue_h, data, dsize, free); if (i < 3U) { EXPECT_TRUE (res); } else { @@ -3429,13 +3447,13 @@ TEST(edgeQueue, setLeaky) len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 3U); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize)); EXPECT_EQ (*((unsigned int *) data), 1U); free (data); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize)); EXPECT_EQ (*((unsigned int *) data), 2U); free (data); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize)); EXPECT_EQ (*((unsigned int *) data), 3U); free (data); @@ -3446,24 +3464,24 @@ TEST(edgeQueue, setLeaky) EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_OLD)); for (i = 0; i < 5U; i++) { - data = malloc (sizeof (unsigned int)); + data = malloc (dsize); ASSERT_TRUE (data != NULL); *((unsigned int *) data) = i + 1; - EXPECT_TRUE (nns_edge_queue_push (queue_h, data, free)); + EXPECT_TRUE (nns_edge_queue_push (queue_h, data, dsize, free)); } len = nns_edge_queue_get_length (queue_h); EXPECT_EQ (len, 3U); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize)); EXPECT_EQ (*((unsigned int *) data), 3U); free (data); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize)); EXPECT_EQ (*((unsigned int *) data), 4U); free (data); - EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data)); + EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize)); EXPECT_EQ (*((unsigned int *) data), 5U); free (data); @@ -3487,11 +3505,13 @@ TEST(edgeQueue, setLimitInvalidParam01_n) TEST(edgeQueue, pushInvalidParam01_n) { void *data; + nns_size_t dsize; - data = malloc (5 * sizeof (unsigned int)); + dsize = 5 * sizeof (unsigned int); + data = malloc (dsize); ASSERT_TRUE (data != NULL); - EXPECT_FALSE (nns_edge_queue_push (NULL, data, NULL)); + EXPECT_FALSE (nns_edge_queue_push (NULL, data, dsize, NULL)); free (data); } @@ -3502,10 +3522,30 @@ TEST(edgeQueue, pushInvalidParam01_n) TEST(edgeQueue, pushInvalidParam02_n) { nns_edge_queue_h queue_h; + nns_size_t dsize; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + dsize = 5 * sizeof (unsigned int); + EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, dsize, NULL)); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Push data into queue - invalid param. + */ +TEST(edgeQueue, pushInvalidParam03_n) +{ + nns_edge_queue_h queue_h; + void *data; EXPECT_TRUE (nns_edge_queue_create (&queue_h)); - EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, NULL)); + data = malloc (5 * sizeof (unsigned int)); + ASSERT_TRUE (data != NULL); + + EXPECT_FALSE (nns_edge_queue_push (queue_h, data, 0U, NULL)); EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); } @@ -3516,8 +3556,9 @@ TEST(edgeQueue, pushInvalidParam02_n) TEST(edgeQueue, popInvalidParam01_n) { void *data; + nns_size_t size; - EXPECT_FALSE (nns_edge_queue_pop (NULL, &data)); + EXPECT_FALSE (nns_edge_queue_pop (NULL, &data, &size)); } /** @@ -3526,10 +3567,26 @@ TEST(edgeQueue, popInvalidParam01_n) TEST(edgeQueue, popInvalidParam02_n) { nns_edge_queue_h queue_h; + nns_size_t size; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL, &size)); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Pop data from queue - invalid param. + */ +TEST(edgeQueue, popInvalidParam03_n) +{ + nns_edge_queue_h queue_h; + void *data; EXPECT_TRUE (nns_edge_queue_create (&queue_h)); - EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL)); + EXPECT_FALSE (nns_edge_queue_pop (queue_h, &data, NULL)); EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); } @@ -3541,10 +3598,11 @@ TEST(edgeQueue, waitPopTimedout) { nns_edge_queue_h queue_h; void *data; + nns_size_t size; EXPECT_TRUE (nns_edge_queue_create (&queue_h)); - EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data)); + EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data, &size)); EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); } @@ -3555,8 +3613,9 @@ TEST(edgeQueue, waitPopTimedout) TEST(edgeQueue, waitPopInvalidParam01_n) { void *data; + nns_size_t size; - EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data)); + EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data, &size)); } /** @@ -3565,10 +3624,26 @@ TEST(edgeQueue, waitPopInvalidParam01_n) TEST(edgeQueue, waitPopInvalidParam02_n) { nns_edge_queue_h queue_h; + nns_size_t size; EXPECT_TRUE (nns_edge_queue_create (&queue_h)); - EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL)); + EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL, &size)); + + EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); +} + +/** + * @brief Wait and pop data from queue - invalid param. + */ +TEST(edgeQueue, waitPopInvalidParam03_n) +{ + nns_edge_queue_h queue_h; + void *data; + + EXPECT_TRUE (nns_edge_queue_create (&queue_h)); + + EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, &data, NULL)); EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); } @@ -3953,15 +4028,16 @@ TEST(edgeMqtt, subscribeInvalidParam_n) /** * @brief Get message with invalid param. */ -TEST(edgeMqtt, getMessageInvalidParam_n) +TEST(edgeMqtt, getMessageInvalidParam1_n) { int ret = -1; - char *msg = NULL; + void *msg = NULL; + nns_size_t msg_len; if (!_check_mqtt_broker ()) return; - ret = nns_edge_mqtt_get_message (NULL, &msg); + ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } @@ -3972,6 +4048,29 @@ TEST(edgeMqtt, getMessageInvalidParam2_n) { int ret = -1; nns_edge_broker_h broker_h; + nns_size_t msg_len; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message with invalid param. + */ +TEST(edgeMqtt, getMessageInvalidParam3_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + void *msg = NULL; if (!_check_mqtt_broker ()) return; @@ -3979,7 +4078,7 @@ TEST(edgeMqtt, getMessageInvalidParam2_n) ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_get_message (broker_h, NULL); + ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); ret = nns_edge_mqtt_close (broker_h); @@ -3993,7 +4092,8 @@ TEST(edgeMqtt, getMessageWithinTimeout_n) { int ret = -1; nns_edge_broker_h broker_h; - char *msg = NULL; + void *msg = NULL; + nns_size_t msg_len; if (!_check_mqtt_broker ()) return; @@ -4001,7 +4101,7 @@ TEST(edgeMqtt, getMessageWithinTimeout_n) ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_get_message (broker_h, &msg); + ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); ret = nns_edge_mqtt_close (broker_h);