[CodeClean/Queue] handle data size
authorJaeyun <jy1210.jung@samsung.com>
Tue, 22 Nov 2022 10:38:28 +0000 (19:38 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Tue, 6 Dec 2022 07:15:17 +0000 (16:15 +0900)
Code clean, prepare next PR. Handle data size of queue.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt.h
src/libnnstreamer-edge/nnstreamer-edge-queue.c
src/libnnstreamer-edge/nnstreamer-edge-queue.h
tests/unittest_nnstreamer-edge.cc

index cb65f593d348242a0609afccbd9351d91a6296a4..0f4552aca8e5322cfb595d0768d795c1b6575d0e 100644 (file)
@@ -797,11 +797,12 @@ _nns_edge_send_thread (void *thread_data)
   nns_edge_conn_data_s *conn_data;
   nns_edge_conn_s *conn;
   nns_edge_data_h data_h;
+  nns_size_t data_size;
   int64_t client_id;
   char *val;
   int ret;
 
-  while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) {
+  while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h, &data_size)) {
     /* Send data to destination */
     switch (eh->connect_type) {
       case NNS_EDGE_CONNECT_TYPE_TCP:
@@ -1500,9 +1501,10 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
       char *msg = NULL;
       char *server_ip = NULL;
       int server_port = 0;
+      nns_size_t msg_len = 0;
 
-      ret = nns_edge_mqtt_get_message (eh->broker_h, &msg);
-      if (ret != NNS_EDGE_ERROR_NONE || !msg)
+      ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len);
+      if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
         break;
 
       nns_edge_parse_host_string (msg, &server_ip, &server_port);
@@ -1642,7 +1644,7 @@ nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h)
   nns_edge_data_copy (data_h, &new_data_h);
 
   if (!nns_edge_queue_push (eh->send_queue, new_data_h,
-          nns_edge_data_release_handle)) {
+          sizeof (nns_edge_data_h), nns_edge_data_release_handle)) {
     nns_edge_loge ("Failed to send data, cannot push data into queue.");
     nns_edge_unlock (eh);
     return NNS_EDGE_ERROR_IO;
index 235b302a4c0d678c309b7a801ac465dd02a38319..14c838c302a296d399a641c9b1920db22575acfb 100644 (file)
@@ -43,6 +43,7 @@ on_message_callback (struct mosquitto *client, void *data,
 {
   nns_edge_broker_s *bh = (nns_edge_broker_s *) data;
   char *msg = NULL;
+  nns_size_t msg_len;
 
   if (!bh) {
     nns_edge_loge ("Invalid param, given broker handle is invalid.");
@@ -57,9 +58,10 @@ on_message_callback (struct mosquitto *client, void *data,
   nns_edge_logd ("MQTT message is arrived (ID:%d, Topic:%s).",
       message->mid, message->topic);
 
-  msg = nns_edge_memdup (message->payload, message->payloadlen);
+  msg_len = (nns_size_t) message->payloadlen;
+  msg = nns_edge_memdup (message->payload, msg_len);
   if (msg)
-    nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
+    nns_edge_queue_push (bh->server_list, msg, msg_len, nns_edge_free);
 
   return;
 }
@@ -309,7 +311,8 @@ nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
  * @brief Get message from mqtt broker.
  */
 int
-nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
+nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
+    nns_size_t * msg_len)
 {
   nns_edge_broker_s *bh;
 
@@ -326,7 +329,7 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
   bh = (nns_edge_broker_s *) broker_h;
 
   /* Wait for 1 second */
-  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
+  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, msg, msg_len)) {
     nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     return NNS_EDGE_ERROR_UNKNOWN;
   }
index b45dde104784034a5cd0fbd30d003189be56873c..d5ea8f61af22b556e91d9f2b99f4573f3962f453 100644 (file)
@@ -43,6 +43,7 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
 {
   nns_edge_broker_s *bh;
   char *msg = NULL;
+  nns_size_t msg_len;
 
   UNUSED (topic);
   UNUSED (topic_len);
@@ -61,9 +62,10 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
   nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
       bh->id, bh->topic);
 
-  msg = nns_edge_memdup (message->payload, message->payloadlen);
+  msg_len = (nns_size_t) message->payloadlen;
+  msg = nns_edge_memdup (message->payload, msg_len);
   if (msg)
-    nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
+    nns_edge_queue_push (bh->server_list, msg, msg_len, nns_edge_free);
 
   return TRUE;
 }
@@ -348,7 +350,8 @@ nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
  * @brief Get message from mqtt broker.
  */
 int
-nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
+nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
+    nns_size_t * msg_len)
 {
   nns_edge_broker_s *bh;
 
@@ -365,7 +368,7 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
   bh = (nns_edge_broker_s *) broker_h;
 
   /* Wait for 1 second */
-  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
+  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, msg, msg_len)) {
     nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     return NNS_EDGE_ERROR_UNKNOWN;
   }
index 76cd8aa2ed3195749d935d43d9fe5aa57cd8c4eb..f6bc732d2bdf29d55e122b7e0e2a69573f2e1a88 100644 (file)
@@ -53,10 +53,9 @@ int nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h);
 bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h);
 
 /**
- * @brief Get message from mqtt broker.
+ * @brief Get message from mqtt broker. If no message in the queue, it waits up to 1 second for new message.
  */
-int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg);
-
+int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t *msg_len);
 #else
 /**
  * @todo consider to change code style later.
index 46808881868ee697ea8a015e6585bef4dda85ece..0078e88aad5164ad527963fb63c99e4afc25d57b 100644 (file)
@@ -10,6 +10,7 @@
  * @bug    No known bugs except for NYI items.
  */
 
+#include "nnstreamer-edge-internal.h"
 #include "nnstreamer-edge-log.h"
 #include "nnstreamer-edge-queue.h"
 #include "nnstreamer-edge-util.h"
@@ -24,8 +25,7 @@ typedef struct _nns_edge_queue_data_s nns_edge_queue_data_s;
  */
 struct _nns_edge_queue_data_s
 {
-  void *data;
-  nns_edge_data_destroy_cb destroy;
+  nns_edge_raw_data_s data;
   nns_edge_queue_data_s *next;
 };
 
@@ -48,11 +48,11 @@ typedef struct
  * @brief Pop data from queue. If the param 'clear' is true, release old data and return null.
  * @note This function should be called with lock.
  */
-static void *
-_pop_data (nns_edge_queue_s * q, bool clear)
+static bool
+_pop_data (nns_edge_queue_s * q, bool clear, void **data, nns_size_t * size)
 {
   nns_edge_queue_data_s *qdata;
-  void *data = NULL;
+  bool popped = false;
 
   qdata = q->head;
   if (qdata) {
@@ -61,16 +61,20 @@ _pop_data (nns_edge_queue_s * q, bool clear)
       q->head = q->tail = NULL;
 
     if (clear) {
-      if (qdata->destroy)
-        qdata->destroy (qdata->data);
+      if (qdata->data.destroy_cb)
+        qdata->data.destroy_cb (qdata->data.data);
     } else {
-      data = qdata->data;
+      if (data)
+        *data = qdata->data.data;
+      if (size)
+        *size = qdata->data.data_len;
+      popped = true;
     }
 
     free (qdata);
   }
 
-  return data;
+  return popped;
 }
 
 /**
@@ -117,7 +121,7 @@ nns_edge_queue_destroy (nns_edge_queue_h handle)
   nns_edge_cond_signal (q);
 
   while (q->length > 0U)
-    _pop_data (q, true);
+    _pop_data (q, true, NULL, NULL);
 
   nns_edge_unlock (q);
 
@@ -176,7 +180,7 @@ nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit,
  * @brief Add new data into queue.
  */
 bool
-nns_edge_queue_push (nns_edge_queue_h handle, void *data,
+nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_size_t size,
     nns_edge_data_destroy_cb destroy)
 {
   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
@@ -193,11 +197,16 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data,
     return false;
   }
 
+  if (size == 0U) {
+    nns_edge_loge ("[Queue] Invalid param, size should be larger than zero.");
+    return false;
+  }
+
   nns_edge_lock (q);
   if (q->max_data > 0U && q->length >= q->max_data) {
     /* Clear old data in queue if leaky option is 'old'. */
     if (q->leaky == NNS_EDGE_QUEUE_LEAK_OLD) {
-      _pop_data (q, true);
+      _pop_data (q, true, NULL, NULL);
     } else {
       nns_edge_logw ("[Queue] Cannot push new data, max data in queue is %u.",
           q->max_data);
@@ -211,8 +220,9 @@ nns_edge_queue_push (nns_edge_queue_h handle, void *data,
     goto done;
   }
 
-  qdata->data = data;
-  qdata->destroy = destroy;
+  qdata->data.data = data;
+  qdata->data.data_len = size;
+  qdata->data.destroy_cb = destroy;
 
   if (!q->head)
     q->head = qdata;
@@ -233,9 +243,10 @@ done:
  * @brief Remove and return the first data in queue.
  */
 bool
-nns_edge_queue_pop (nns_edge_queue_h handle, void **data)
+nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t * size)
 {
   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+  bool popped = false;
 
   if (!q) {
     nns_edge_loge ("[Queue] Invalid param, queue is null.");
@@ -247,11 +258,16 @@ nns_edge_queue_pop (nns_edge_queue_h handle, void **data)
     return false;
   }
 
+  if (!size) {
+    nns_edge_loge ("[Queue] Invalid param, size is null.");
+    return false;
+  }
+
   nns_edge_lock (q);
-  *data = _pop_data (q, false);
+  popped = _pop_data (q, false, data, size);
   nns_edge_unlock (q);
 
-  return (*data != NULL);
+  return (popped && *data != NULL);
 }
 
 /**
@@ -259,9 +275,10 @@ nns_edge_queue_pop (nns_edge_queue_h handle, void **data)
  */
 bool
 nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
-    void **data)
+    void **data, nns_size_t * size)
 {
   nns_edge_queue_s *q = (nns_edge_queue_s *) handle;
+  bool popped = false;
 
   if (!q) {
     nns_edge_loge ("[Queue] Invalid param, queue is null.");
@@ -273,6 +290,11 @@ nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
     return false;
   }
 
+  if (!size) {
+    nns_edge_loge ("[Queue] Invalid param, size is null.");
+    return false;
+  }
+
   nns_edge_lock (q);
   if (q->length == 0U) {
     if (timeout > 0) {
@@ -290,8 +312,8 @@ nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout,
     }
   }
 
-  *data = _pop_data (q, false);
+  popped = _pop_data (q, false, data, size);
   nns_edge_unlock (q);
 
-  return (*data != NULL);
+  return (popped && *data != NULL);
 }
index 176ccdf4e6e195581d9bae5b0bc40c048b8c37cd..7bc199436fdcebe13456965ff7972601992a0b0d 100644 (file)
@@ -67,27 +67,30 @@ bool nns_edge_queue_set_limit (nns_edge_queue_h handle, unsigned int limit, nns_
  * @brief Add new data into queue.
  * @param[in] handle The queue handle.
  * @param[in] data The data to be added.
+ * @param[in] size The size of pushed data.
  * @param[in] destroy Nullable, the callback function to release data.
  * @return true on success.
  */
-bool nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_edge_data_destroy_cb destroy);
+bool nns_edge_queue_push (nns_edge_queue_h handle, void *data, nns_size_t size, nns_edge_data_destroy_cb destroy);
 
 /**
  * @brief Remove and return the first data in queue.
  * @param[in] handle The queue handle.
  * @param[out] data The data in the queue.
+ * @param[out] size The size of data.
  * @return true on success. false if queue is empty.
  */
-bool nns_edge_queue_pop (nns_edge_queue_h handle, void **data);
+bool nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t *size);
 
 /**
  * @brief Remove and return the first data in queue. If queue is empty, wait until new data is added in the queue.
  * @param[in] handle The queue handle.
  * @param[in] timeout The time to wait for new data, in milliseconds. (0 for infinite timeout)
  * @param[out] data The data in the queue.
+ * @param[out] size The size of data.
  * @return true on success.
  */
-bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, void **data);
+bool nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, void **data, nns_size_t *size);
 
 #ifdef __cplusplus
 }
index 5f1f5756283043322c5477e62612aae46aaebfcf..e9456a813cc1f3f65530c6ed8b278cc45b97f868 100644 (file)
@@ -3216,17 +3216,19 @@ _test_thread_edge_queue_push (void *thread_data)
   nns_edge_queue_h queue_h = thread_data;
   unsigned int i, j;
   void *data;
+  nns_size_t dsize;
 
   for (i = 0; i < 6U; i++) {
     usleep (50000);
 
-    data = malloc (5 * sizeof (unsigned int));
+    dsize = 5 * sizeof (unsigned int);
+    data = malloc (dsize);
     if (data) {
       for (j = 0; j < 5U; j++)
         ((unsigned int *) data)[j] = i * 10U + j;
     }
 
-    EXPECT_TRUE (nns_edge_queue_push (queue_h, data, nns_edge_free));
+    EXPECT_TRUE (nns_edge_queue_push (queue_h, data, dsize, nns_edge_free));
   }
 
   return NULL;
@@ -3239,67 +3241,76 @@ TEST(edgeQueue, pushData)
 {
   nns_edge_queue_h queue_h;
   void *data1, *data2, *data3, *result;
+  nns_size_t dsize, rsize;
   unsigned int i, len;
 
-  data1 = malloc (5 * sizeof (unsigned int));
+  dsize = 5 * sizeof (unsigned int);
+
+  data1 = malloc (dsize);
   ASSERT_TRUE (data1 != NULL);
   for (i = 0; i < 5U; i++)
     ((unsigned int *) data1)[i] = i + 10U;
 
-  data2 = malloc (5 * sizeof (unsigned int));
+  data2 = malloc (dsize);
   ASSERT_TRUE (data1 != NULL);
   for (i = 0; i < 5U; i++)
     ((unsigned int *) data2)[i] = i + 20U;
 
-  data3 = malloc (5 * sizeof (unsigned int));
+  data3 = malloc (dsize);
   ASSERT_TRUE (data1 != NULL);
   for (i = 0; i < 5U; i++)
     ((unsigned int *) data3)[i] = i + 30U;
 
   EXPECT_TRUE (nns_edge_queue_create (&queue_h));
 
-  EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, NULL));
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, dsize, NULL));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 1U);
 
-  EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, NULL));
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, dsize, NULL));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 2U);
 
-  EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, NULL));
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, dsize, NULL));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 3U);
 
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));
+  rsize = 0U;
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 2U);
   EXPECT_EQ (result, data1);
+  EXPECT_EQ (dsize, rsize);
   for (i = 0; i < 5U; i++)
     EXPECT_EQ (((unsigned int *) result)[i], i + 10U);
 
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));
+  rsize = 0U;
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 1U);
   EXPECT_EQ (result, data2);
+  EXPECT_EQ (dsize, rsize);
   for (i = 0; i < 5U; i++)
     EXPECT_EQ (((unsigned int *) result)[i], i + 20U);
 
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result));
+  rsize = 0U;
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &result, &rsize));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 0U);
   EXPECT_EQ (result, data3);
+  EXPECT_EQ (dsize, rsize);
   for (i = 0; i < 5U; i++)
     EXPECT_EQ (((unsigned int *) result)[i], i + 30U);
 
-  EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, nns_edge_free));
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data1, dsize, nns_edge_free));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 1U);
 
-  EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, nns_edge_free));
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data2, dsize, nns_edge_free));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 2U);
 
-  EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, nns_edge_free));
+  EXPECT_TRUE (nns_edge_queue_push (queue_h, data3, dsize, nns_edge_free));
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 3U);
 
@@ -3325,11 +3336,13 @@ TEST(edgeQueue, pushDataOnThread)
 
   for (i = 0; i < 3U; i++) {
     void *result = NULL;
+    nns_size_t rsize = 0U;
 
-    EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result));
+    EXPECT_TRUE (nns_edge_queue_wait_pop (queue_h, 0U, &result, &rsize));
 
     for (j = 0; j < 5U; j++)
       EXPECT_EQ (((unsigned int *) result)[j], i * 10U + j);
+    EXPECT_EQ (rsize, 5 * sizeof (unsigned int));
 
     free (result);
   }
@@ -3377,16 +3390,18 @@ TEST(edgeQueue, setLimit)
 {
   nns_edge_queue_h queue_h;
   void *data;
+  nns_size_t dsize;
   unsigned int i, len;
 
-  data = malloc (sizeof (unsigned int));
+  dsize = sizeof (unsigned int);
+  data = malloc (dsize);
   ASSERT_TRUE (data != NULL);
 
   EXPECT_TRUE (nns_edge_queue_create (&queue_h));
   EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
 
   for (i = 0; i < 5U; i++)
-    nns_edge_queue_push (queue_h, data, NULL);
+    nns_edge_queue_push (queue_h, data, dsize, NULL);
 
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 3U);
@@ -3403,6 +3418,7 @@ TEST(edgeQueue, setLeaky)
 {
   nns_edge_queue_h queue_h;
   void *data;
+  nns_size_t dsize, rsize;
   unsigned int i, len;
   bool res;
 
@@ -3411,13 +3427,15 @@ TEST(edgeQueue, setLeaky)
   /* leaky option new */
   EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_NEW));
 
+  dsize = sizeof (unsigned int);
+
   for (i = 0; i < 5U; i++) {
-    data = malloc (sizeof (unsigned int));
+    data = malloc (dsize);
     ASSERT_TRUE (data != NULL);
 
     *((unsigned int *) data) = i + 1;
 
-    res = nns_edge_queue_push (queue_h, data, free);
+    res = nns_edge_queue_push (queue_h, data, dsize, free);
     if (i < 3U) {
       EXPECT_TRUE (res);
     } else {
@@ -3429,13 +3447,13 @@ TEST(edgeQueue, setLeaky)
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 3U);
 
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
   EXPECT_EQ (*((unsigned int *) data), 1U);
   free (data);
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
   EXPECT_EQ (*((unsigned int *) data), 2U);
   free (data);
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
   EXPECT_EQ (*((unsigned int *) data), 3U);
   free (data);
 
@@ -3446,24 +3464,24 @@ TEST(edgeQueue, setLeaky)
   EXPECT_TRUE (nns_edge_queue_set_limit (queue_h, 3U, NNS_EDGE_QUEUE_LEAK_OLD));
 
   for (i = 0; i < 5U; i++) {
-    data = malloc (sizeof (unsigned int));
+    data = malloc (dsize);
     ASSERT_TRUE (data != NULL);
 
     *((unsigned int *) data) = i + 1;
 
-    EXPECT_TRUE (nns_edge_queue_push (queue_h, data, free));
+    EXPECT_TRUE (nns_edge_queue_push (queue_h, data, dsize, free));
   }
 
   len = nns_edge_queue_get_length (queue_h);
   EXPECT_EQ (len, 3U);
 
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
   EXPECT_EQ (*((unsigned int *) data), 3U);
   free (data);
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
   EXPECT_EQ (*((unsigned int *) data), 4U);
   free (data);
-  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data));
+  EXPECT_TRUE (nns_edge_queue_pop (queue_h, &data, &rsize));
   EXPECT_EQ (*((unsigned int *) data), 5U);
   free (data);
 
@@ -3487,11 +3505,13 @@ TEST(edgeQueue, setLimitInvalidParam01_n)
 TEST(edgeQueue, pushInvalidParam01_n)
 {
   void *data;
+  nns_size_t dsize;
 
-  data = malloc (5 * sizeof (unsigned int));
+  dsize = 5 * sizeof (unsigned int);
+  data = malloc (dsize);
   ASSERT_TRUE (data != NULL);
 
-  EXPECT_FALSE (nns_edge_queue_push (NULL, data, NULL));
+  EXPECT_FALSE (nns_edge_queue_push (NULL, data, dsize, NULL));
 
   free (data);
 }
@@ -3502,10 +3522,30 @@ TEST(edgeQueue, pushInvalidParam01_n)
 TEST(edgeQueue, pushInvalidParam02_n)
 {
   nns_edge_queue_h queue_h;
+  nns_size_t dsize;
+
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+  dsize = 5 * sizeof (unsigned int);
+  EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, dsize, NULL));
+
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
+/**
+ * @brief Push data into queue - invalid param.
+ */
+TEST(edgeQueue, pushInvalidParam03_n)
+{
+  nns_edge_queue_h queue_h;
+  void *data;
 
   EXPECT_TRUE (nns_edge_queue_create (&queue_h));
 
-  EXPECT_FALSE (nns_edge_queue_push (queue_h, NULL, NULL));
+  data = malloc (5 * sizeof (unsigned int));
+  ASSERT_TRUE (data != NULL);
+
+  EXPECT_FALSE (nns_edge_queue_push (queue_h, data, 0U, NULL));
 
   EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
 }
@@ -3516,8 +3556,9 @@ TEST(edgeQueue, pushInvalidParam02_n)
 TEST(edgeQueue, popInvalidParam01_n)
 {
   void *data;
+  nns_size_t size;
 
-  EXPECT_FALSE (nns_edge_queue_pop (NULL, &data));
+  EXPECT_FALSE (nns_edge_queue_pop (NULL, &data, &size));
 }
 
 /**
@@ -3526,10 +3567,26 @@ TEST(edgeQueue, popInvalidParam01_n)
 TEST(edgeQueue, popInvalidParam02_n)
 {
   nns_edge_queue_h queue_h;
+  nns_size_t size;
+
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+  EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL, &size));
+
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
+/**
+ * @brief Pop data from queue - invalid param.
+ */
+TEST(edgeQueue, popInvalidParam03_n)
+{
+  nns_edge_queue_h queue_h;
+  void *data;
 
   EXPECT_TRUE (nns_edge_queue_create (&queue_h));
 
-  EXPECT_FALSE (nns_edge_queue_pop (queue_h, NULL));
+  EXPECT_FALSE (nns_edge_queue_pop (queue_h, &data, NULL));
 
   EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
 }
@@ -3541,10 +3598,11 @@ TEST(edgeQueue, waitPopTimedout)
 {
   nns_edge_queue_h queue_h;
   void *data;
+  nns_size_t size;
 
   EXPECT_TRUE (nns_edge_queue_create (&queue_h));
 
-  EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data));
+  EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 10U, &data, &size));
 
   EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
 }
@@ -3555,8 +3613,9 @@ TEST(edgeQueue, waitPopTimedout)
 TEST(edgeQueue, waitPopInvalidParam01_n)
 {
   void *data;
+  nns_size_t size;
 
-  EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data));
+  EXPECT_FALSE (nns_edge_queue_wait_pop (NULL, 0U, &data, &size));
 }
 
 /**
@@ -3565,10 +3624,26 @@ TEST(edgeQueue, waitPopInvalidParam01_n)
 TEST(edgeQueue, waitPopInvalidParam02_n)
 {
   nns_edge_queue_h queue_h;
+  nns_size_t size;
 
   EXPECT_TRUE (nns_edge_queue_create (&queue_h));
 
-  EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL));
+  EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, NULL, &size));
+
+  EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
+}
+
+/**
+ * @brief Wait and pop data from queue - invalid param.
+ */
+TEST(edgeQueue, waitPopInvalidParam03_n)
+{
+  nns_edge_queue_h queue_h;
+  void *data;
+
+  EXPECT_TRUE (nns_edge_queue_create (&queue_h));
+
+  EXPECT_FALSE (nns_edge_queue_wait_pop (queue_h, 0U, &data, NULL));
 
   EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
 }
@@ -3953,15 +4028,16 @@ TEST(edgeMqtt, subscribeInvalidParam_n)
 /**
  * @brief Get message with invalid param.
  */
-TEST(edgeMqtt, getMessageInvalidParam_n)
+TEST(edgeMqtt, getMessageInvalidParam1_n)
 {
   int ret = -1;
-  char *msg = NULL;
+  void *msg = NULL;
+  nns_size_t msg_len;
 
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_mqtt_get_message (NULL, &msg);
+  ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
@@ -3972,6 +4048,29 @@ TEST(edgeMqtt, getMessageInvalidParam2_n)
 {
   int ret = -1;
   nns_edge_broker_h broker_h;
+  nns_size_t msg_len;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_close (broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqtt, getMessageInvalidParam3_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+  void *msg = NULL;
 
   if (!_check_mqtt_broker ())
     return;
@@ -3979,7 +4078,7 @@ TEST(edgeMqtt, getMessageInvalidParam2_n)
   ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_get_message (broker_h, NULL);
+  ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
   ret = nns_edge_mqtt_close (broker_h);
@@ -3993,7 +4092,8 @@ TEST(edgeMqtt, getMessageWithinTimeout_n)
 {
   int ret = -1;
   nns_edge_broker_h broker_h;
-  char *msg = NULL;
+  void *msg = NULL;
+  nns_size_t msg_len;
 
   if (!_check_mqtt_broker ())
     return;
@@ -4001,7 +4101,7 @@ TEST(edgeMqtt, getMessageWithinTimeout_n)
   ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_get_message (broker_h, &msg);
+  ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
   ret = nns_edge_mqtt_close (broker_h);