[MQTT] Support MQTT direct data transmission
authorgichan <gichan2.jang@samsung.com>
Mon, 13 Mar 2023 07:44:23 +0000 (16:44 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Mon, 20 Mar 2023 03:47:02 +0000 (12:47 +0900)
 - Support MQTT direct data transmission.
 - Add unit test for mqtt

Signed-off-by: gichan <gichan2.jang@samsung.com>
packaging/nnstreamer-edge.spec
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt.h
tests/CMakeLists.txt
tests/unittest_nnstreamer-edge-mqtt.cc [new file with mode: 0644]
tests/unittest_nnstreamer-edge.cc

index f4a4bb06a054712ebf3e97326e6201c3d4d6858f..31e14ff3f7a537ae965f31f1322c3f5b06e190f7 100644 (file)
@@ -142,6 +142,10 @@ LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge
 LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge-aitt
 %endif
 
+%if 0%{?mqtt_support}
+LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge-mqtt
+%endif
+
 %if 0%{?testcoverage}
 # 'lcov' generates the date format with UTC time zone by default. Let's replace UTC with KST.
 # If you can get a root privilege, run ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime
@@ -191,6 +195,10 @@ rm -rf %{buildroot}
 %{_bindir}/unittest_nnstreamer-edge-aitt
 %endif
 
+%if 0%{?mqtt_support}
+%{_bindir}/unittest_nnstreamer-edge-mqtt
+%endif
+
 %if 0%{?testcoverage}
 %files unittest-coverage
 %{_datadir}/nnstreamer-edge/unittest/*
index 8eef0dc7689c90de768c41cfd3bd725498351e0b..dbc1b295269d96500ce314a5284410609e361d3f 100644 (file)
@@ -862,6 +862,11 @@ _nns_edge_send_thread (void *thread_data)
         if (NNS_EDGE_ERROR_NONE != ret)
           nns_edge_loge ("Failed to send data via AITT connection.");
         break;
+      case NNS_EDGE_CONNECT_TYPE_MQTT:
+        ret = nns_edge_mqtt_publish_data (eh->broker_h, data_h);
+        if (NNS_EDGE_ERROR_NONE != ret)
+          nns_edge_loge ("Failed to send data via MQTT connection.");
+        break;
       default:
         break;
     }
@@ -1283,8 +1288,9 @@ nns_edge_start (nns_edge_h edge_h)
 
   if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
       || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
-    if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
-      char *topic, *msg;
+    if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
+        || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
+      char *topic;
 
       /** @todo Set unique device name.
        * Device name should be unique. Consider using MAC address later.
@@ -1303,14 +1309,24 @@ nns_edge_start (nns_edge_h edge_h)
         goto done;
       }
 
-      msg = nns_edge_get_host_string (eh->host, eh->port);
+      if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
+        char *msg;
+        msg = nns_edge_get_host_string (eh->host, eh->port);
 
-      ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
-      SAFE_FREE (msg);
+        ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
+        SAFE_FREE (msg);
 
-      if (NNS_EDGE_ERROR_NONE != ret) {
-        nns_edge_loge ("Failed to publish the meesage to broker.");
-        goto done;
+        if (NNS_EDGE_ERROR_NONE != ret) {
+          nns_edge_loge ("Failed to publish the meesage to broker.");
+          goto done;
+        }
+      } else {
+        ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
+            eh->user_data);
+        if (NNS_EDGE_ERROR_NONE != ret) {
+          nns_edge_loge ("Failed to set event callback to MQTT broker.");
+          goto done;
+        }
       }
     } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
       ret = nns_edge_aitt_connect (eh->id, eh->topic, eh->dest_host,
@@ -1370,6 +1386,7 @@ nns_edge_release_handle (nns_edge_h edge_h)
 
   switch (eh->connect_type) {
     case NNS_EDGE_CONNECT_TYPE_HYBRID:
+    case NNS_EDGE_CONNECT_TYPE_MQTT:
       if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) {
         nns_edge_logw ("Failed to close mqtt connection.");
       }
@@ -1464,6 +1481,71 @@ nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb,
   return NNS_EDGE_ERROR_NONE;
 }
 
+/**
+ * @brief Parse the message received from the MQTT broker and connect to the server directly.
+ */
+static int
+_mqtt_hybrid_direct_connection (nns_edge_handle_s * eh)
+{
+  int ret;
+
+  do {
+    char *msg = NULL;
+    char *server_ip = NULL;
+    int server_port = 0;
+    nns_size_t msg_len = 0;
+
+    ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len);
+    if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
+      return ret;
+
+    nns_edge_parse_host_string (msg, &server_ip, &server_port);
+    SAFE_FREE (msg);
+
+    nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
+        server_port);
+
+    ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
+    SAFE_FREE (server_ip);
+
+    if (NNS_EDGE_ERROR_NONE == ret) {
+      return ret;
+    }
+  } while (TRUE);
+
+  return ret;
+}
+
+/**
+ * @brief Start subsciption to MQTT message
+ */
+static int
+_nns_edge_start_mqtt_sub (nns_edge_handle_s * eh)
+{
+  char *topic;
+  int ret;
+
+  if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
+    topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
+
+    ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
+        &eh->broker_h);
+    SAFE_FREE (topic);
+
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    }
+
+    ret = nns_edge_mqtt_subscribe (eh->broker_h);
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
+      return ret;
+    }
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
 /**
  * @brief Connect to the destination node.
  */
@@ -1506,51 +1588,22 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
   eh->dest_host = nns_edge_strdup (dest_host);
   eh->dest_port = dest_port;
 
-  if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
-    char *topic;
-
-    if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
-      topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
-
-      ret = nns_edge_mqtt_connect (eh->id, topic, dest_host, dest_port,
-          &eh->broker_h);
-      SAFE_FREE (topic);
-
-      if (NNS_EDGE_ERROR_NONE != ret) {
-        nns_edge_loge ("Connection failure to broker.");
-        goto done;
-      }
+  if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
+      || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
+    ret = _nns_edge_start_mqtt_sub (eh);
+    if (NNS_EDGE_ERROR_NONE != ret)
+      goto done;
 
-      ret = nns_edge_mqtt_subscribe (eh->broker_h);
+    if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
+      ret = _mqtt_hybrid_direct_connection (eh);
+    } else {
+      ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
+          eh->user_data);
       if (NNS_EDGE_ERROR_NONE != ret) {
-        nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
-        goto done;
+        nns_edge_loge ("Failed to set event callback to MQTT broker.");
+        return ret;
       }
     }
-
-    do {
-      char *msg = NULL;
-      char *server_ip = NULL;
-      int server_port = 0;
-      nns_size_t msg_len = 0;
-
-      ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len);
-      if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
-        break;
-
-      nns_edge_parse_host_string (msg, &server_ip, &server_port);
-      SAFE_FREE (msg);
-
-      nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
-          server_port);
-
-      ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
-      SAFE_FREE (server_ip);
-
-      if (NNS_EDGE_ERROR_NONE == ret) {
-        break;
-      }
-    } while (TRUE);
   } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
     ret = nns_edge_aitt_connect (eh->id, eh->topic, dest_host, dest_port,
         &eh->broker_h);
@@ -1624,6 +1677,10 @@ _nns_edge_is_connected (nns_edge_h edge_h)
       NNS_EDGE_ERROR_NONE == nns_edge_aitt_is_connected (eh->broker_h))
     return true;
 
+  if (NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type &&
+      nns_edge_mqtt_is_connected (eh->broker_h))
+    return true;
+
   conn_data = (nns_edge_conn_data_s *) eh->connections;
   while (conn_data) {
     conn = conn_data->sink_conn;
index 460599aa65e7c09a64f7ddbb282c115307b832da..cd8a659604188f7f5714355f48388ce370cacd00 100644 (file)
@@ -19,6 +19,8 @@
 #include "nnstreamer-edge-log.h"
 #include "nnstreamer-edge-util.h"
 #include "nnstreamer-edge-queue.h"
+#include "nnstreamer-edge-data.h"
+#include "nnstreamer-edge-event.h"
 
 /**
  * @brief Data structure for mqtt broker handle.
@@ -32,6 +34,10 @@ typedef struct
   char *host;
   int port;
   bool connected;
+
+  /* event callback for new message */
+  nns_edge_event_cb event_cb;
+  void *user_data;
 } nns_edge_broker_s;
 
 /**
@@ -44,6 +50,7 @@ on_message_callback (struct mosquitto *client, void *data,
   nns_edge_broker_s *bh = (nns_edge_broker_s *) data;
   char *msg = NULL;
   nns_size_t msg_len;
+  int ret;
 
   if (!bh) {
     nns_edge_loge ("Invalid param, given broker handle is invalid.");
@@ -60,8 +67,31 @@ on_message_callback (struct mosquitto *client, void *data,
 
   msg_len = (nns_size_t) message->payloadlen;
   msg = nns_edge_memdup (message->payload, msg_len);
-  if (msg)
-    nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+
+  if (msg) {
+    if (bh->event_cb) {
+      nns_edge_data_h data_h;
+
+      if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
+        nns_edge_loge ("Failed to create data handle in msg thread.");
+        return;
+      }
+
+      nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len);
+
+      ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data,
+          NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
+          NULL);
+      if (ret != NNS_EDGE_ERROR_NONE)
+        nns_edge_loge ("Failed to send an event for received message.");
+
+      nns_edge_data_destroy (data_h);
+      SAFE_FREE (msg);
+      return;
+    } else {
+      nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+    }
+  }
 
   return;
 }
@@ -127,6 +157,8 @@ _nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host,
   bh->host = nns_edge_strdup (host);
   bh->port = port;
   bh->connected = true;
+  bh->event_cb = NULL;
+  bh->user_data = NULL;
 
   *broker_h = bh;
   return NNS_EDGE_ERROR_NONE;
@@ -221,6 +253,30 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
   return NNS_EDGE_ERROR_NONE;
 }
 
+/**
+ * @brief Internal util function to send edge-data via MQTT connection.
+ */
+int
+nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h)
+{
+  int ret;
+  void *data = NULL;
+  nns_size_t size;
+
+  ret = nns_edge_data_serialize (data_h, &data, &size);
+  if (NNS_EDGE_ERROR_NONE != ret) {
+    nns_edge_loge ("Failed to serialize the edge data.");
+    return ret;
+  }
+
+  ret = nns_edge_mqtt_publish (handle, data, size);
+  if (NNS_EDGE_ERROR_NONE != ret)
+    nns_edge_loge ("Failed to send data to destination.");
+
+  SAFE_FREE (data);
+  return ret;
+}
+
 /**
  * @brief Publish raw data.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
@@ -354,3 +410,25 @@ nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
 
   return bh->connected;
 }
+
+/**
+ * @brief Set event callback for new message.
+ */
+int
+nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h,
+    nns_edge_event_cb cb, void *user_data)
+{
+  nns_edge_broker_s *bh;
+
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given MQTT handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) broker_h;
+
+  bh->event_cb = cb;
+  bh->user_data = user_data;
+
+  return NNS_EDGE_ERROR_NONE;
+}
index 804795f30026b14a34734911ff309c10241098af..ef90e4f5393b4c272086c79d6b92382a8964da9e 100644 (file)
@@ -19,6 +19,8 @@
 #include "nnstreamer-edge-log.h"
 #include "nnstreamer-edge-util.h"
 #include "nnstreamer-edge-queue.h"
+#include "nnstreamer-edge-data.h"
+#include "nnstreamer-edge-event.h"
 
 /**
  * @brief Data structure for mqtt broker handle.
@@ -31,6 +33,10 @@ typedef struct
   char *topic;
   char *host;
   int port;
+
+  /* event callback for new message */
+  nns_edge_event_cb event_cb;
+  void *user_data;
 } nns_edge_broker_s;
 
 /**
@@ -64,8 +70,31 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
 
   msg_len = (nns_size_t) message->payloadlen;
   msg = nns_edge_memdup (message->payload, msg_len);
-  if (msg)
-    nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+
+  if (msg) {
+    if (bh->event_cb) {
+      nns_edge_data_h data_h;
+
+      if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
+        nns_edge_loge ("Failed to create data handle in msg thread.");
+        return;
+      }
+
+      nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len);
+
+      ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data,
+          NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
+          NULL);
+      if (ret != NNS_EDGE_ERROR_NONE)
+        nns_edge_loge ("Failed to send an event for received message.");
+
+      nns_edge_data_destroy (data_h);
+      SAFE_FREE (msg);
+      return;
+    } else {
+      nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+    }
+  }
 
   return TRUE;
 }
@@ -138,6 +167,8 @@ nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
   bh->host = nns_edge_strdup (host);
   bh->port = port;
   bh->mqtt_h = handle;
+  bh->event_cb = NULL;
+  bh->user_data = NULL;
   nns_edge_queue_create (&bh->message_queue);
 
   MQTTAsync_setCallbacks (handle, bh, NULL, mqtt_cb_message_arrived, NULL);
@@ -231,6 +262,30 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h)
   return NNS_EDGE_ERROR_NONE;
 }
 
+/**
+ * @brief Internal util function to send edge-data via MQTT connection.
+ */
+int
+nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h)
+{
+  int ret;
+  void *data = NULL;
+  nns_size_t size;
+
+  ret = nns_edge_data_serialize (data_h, &data, &size);
+  if (NNS_EDGE_ERROR_NONE != ret) {
+    nns_edge_loge ("Failed to serialize the edge data.");
+    return ret;
+  }
+
+  ret = nns_edge_mqtt_publish (handle, data, size);
+  if (NNS_EDGE_ERROR_NONE != ret)
+    nns_edge_loge ("Failed to send data to destination.");
+
+  SAFE_FREE (data);
+  return ret;
+}
+
 /**
  * @brief Publish raw data.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
@@ -375,3 +430,25 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
 
   return NNS_EDGE_ERROR_NONE;
 }
+
+/**
+ * @brief Set event callback for new message.
+ */
+int
+nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h,
+    nns_edge_event_cb cb, void *user_data)
+{
+  nns_edge_broker_s *bh;
+
+  if (!broker_h) {
+    nns_edge_loge ("Invalid param, given MQTT handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) broker_h;
+
+  bh->event_cb = cb;
+  bh->user_data = user_data;
+
+  return NNS_EDGE_ERROR_NONE;
+}
index f6bc732d2bdf29d55e122b7e0e2a69573f2e1a88..2a8d642c0cc96498592a30d347be176ff669dce4 100644 (file)
@@ -56,6 +56,17 @@ bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h);
  * @brief Get message from mqtt broker. If no message in the queue, it waits up to 1 second for new message.
  */
 int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t *msg_len);
+
+/**
+ * @brief Internal util function to send edge-data via MQTT connection.
+ */
+int nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h);
+
+/**
+ * @brief Set event callback for new message.
+ */
+int nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h, nns_edge_event_cb cb, void *user_data);
+
 #else
 /**
  * @todo consider to change code style later.
@@ -72,6 +83,8 @@ int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_
 #define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #define nns_edge_mqtt_is_connected(...) (false)
 #define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_publish_data(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_set_event_callback(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #endif /* ENABLE_MQTT */
 
 #ifdef __cplusplus
index 3efd8c0345b1499c49c42c3676f939b2d32c8fc1..65bc4e0fedbde8c4b03a4e962d5724cbf81b96c4 100644 (file)
@@ -11,3 +11,11 @@ TARGET_INCLUDE_DIRECTORIES(unittest_nnstreamer-edge-aitt PRIVATE ${EDGE_REQUIRE_
 TARGET_LINK_LIBRARIES(unittest_nnstreamer-edge-aitt ${TEST_REQUIRE_PKGS_LDFLAGS} nnstreamer-edge)
 INSTALL (TARGETS unittest_nnstreamer-edge-aitt DESTINATION ${BIN_INSTALL_DIR})
 ENDIF()
+
+# MQTT test
+IF(MQTT_SUPPORT)
+ADD_EXECUTABLE(unittest_nnstreamer-edge-mqtt unittest_nnstreamer-edge-mqtt.cc)
+TARGET_INCLUDE_DIRECTORIES(unittest_nnstreamer-edge-mqtt PRIVATE ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS} ${INCLUDE_DIR} ${NNS_EDGE_SRC_DIR})
+TARGET_LINK_LIBRARIES(unittest_nnstreamer-edge-mqtt ${TEST_REQUIRE_PKGS_LDFLAGS} nnstreamer-edge)
+INSTALL (TARGETS unittest_nnstreamer-edge-mqtt DESTINATION ${BIN_INSTALL_DIR})
+ENDIF()
diff --git a/tests/unittest_nnstreamer-edge-mqtt.cc b/tests/unittest_nnstreamer-edge-mqtt.cc
new file mode 100644 (file)
index 0000000..1a55a92
--- /dev/null
@@ -0,0 +1,727 @@
+/**
+ * @file        unittest_nnstreamer-edge-mqtt.cc
+ * @date        15 Mar 2023
+ * @brief       Unittest for nnstreamer-edge MQTT direct data transmission.
+ * @see         https://github.com/nnstreamer/nnstreamer-edge
+ * @author      Gichan Jang <gichan2.jang@samsung.com>
+ * @bug         No known bugs
+ */
+
+#include <gtest/gtest.h>
+#include "nnstreamer-edge.h"
+#include "nnstreamer-edge-mqtt.h"
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-util.h"
+
+/**
+ * @brief Data struct for unittest.
+ */
+typedef struct
+{
+  nns_edge_h handle;
+  bool running;
+  bool is_server;
+  bool event_cb_released;
+  unsigned int received;
+} ne_test_data_s;
+
+/**
+ * @brief Allocate and initialize test data.
+ */
+static ne_test_data_s *
+_get_test_data (bool is_server)
+{
+  ne_test_data_s *_td;
+
+  _td = (ne_test_data_s *) calloc (1, sizeof (ne_test_data_s));
+
+  if (_td) {
+    _td->is_server = is_server;
+  }
+
+  return _td;
+}
+
+/**
+ * @brief Release test data.
+ */
+static void
+_free_test_data (ne_test_data_s *_td)
+{
+  if (!_td)
+    return;
+
+  SAFE_FREE (_td);
+}
+
+/**
+ * @brief Edge event callback for test.
+ */
+static int
+_test_edge_hybrid_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+  ne_test_data_s *_td = (ne_test_data_s *) user_data;
+  nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN;
+  nns_edge_data_h data_h;
+  void *data;
+  nns_size_t data_len;
+  char *val;
+  unsigned int i, count;
+  int ret;
+
+  if (!_td) {
+    /* Cannot update event status. */
+    return NNS_EDGE_ERROR_NONE;
+  }
+
+  ret = nns_edge_event_get_type (event_h, &event);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  switch (event) {
+    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+      _td->received++;
+
+      ret = nns_edge_event_parse_new_data (event_h, &data_h);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+      /* Compare metadata */
+      ret = nns_edge_data_get_info (data_h, "test-key", &val);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+      EXPECT_STREQ (val, "test-value");
+      SAFE_FREE (val);
+
+      if (_td->is_server) {
+        /**
+         * @note This is test code, responding to client.
+         * Recommend not to call edge API in event callback.
+         */
+        ret = nns_edge_send (_td->handle, data_h);
+        EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+      } else {
+        /* Compare received data */
+        ret = nns_edge_data_get_count (data_h, &count);
+        EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+        ret = nns_edge_data_get (data_h, 0, &data, &data_len);
+        EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+        EXPECT_EQ (count, 1U);
+        for (i = 0; i < 10U; i++)
+          EXPECT_EQ (((unsigned int *) data)[i], i);
+      }
+
+      ret = nns_edge_data_destroy (data_h);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+      break;
+    default:
+      break;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Check whether MQTT broker is running or not.
+ */
+static bool
+_check_mqtt_broker ()
+{
+  int ret = 0;
+
+  ret = system ("ps aux | grep mosquitto | grep -v grep");
+  if (0 != ret) {
+    nns_edge_logw ("MQTT broker is not running. Skip query hybrid test.");
+    return false;
+  }
+
+  return true;
+}
+
+/**
+ * @brief Connect to the local host using the information received from mqtt.
+ */
+TEST(edgeMqttHybrid, connectLocal)
+{
+  nns_edge_h server_h, client_h;
+  ne_test_data_s *_td_server, *_td_client;
+  nns_edge_data_h data_h;
+  nns_size_t data_len;
+  void *data;
+  unsigned int i, retry;
+  int ret = 0;
+  char *val;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  _td_server = _get_test_data (true);
+  _td_client = _get_test_data (false);
+  ASSERT_TRUE (_td_server != NULL && _td_client != NULL);
+
+  /* Prepare server (127.0.0.1:port) */
+  nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
+      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &server_h);
+  nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server);
+  nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1");
+  nns_edge_set_info (server_h, "DEST_PORT", "1883");
+  nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic");
+  nns_edge_set_info (server_h, "CAPS", "test server");
+  nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW");
+  _td_server->handle = server_h;
+
+  /* Prepare client */
+  nns_edge_create_handle ("temp-client", NNS_EDGE_CONNECT_TYPE_HYBRID,
+     NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &client_h);
+  nns_edge_set_event_callback (client_h, _test_edge_hybrid_event_cb, _td_client);
+  nns_edge_set_info (client_h, "CAPS", "test client");
+  nns_edge_set_info (client_h, "TOPIC", "temp-mqtt-topic");
+  _td_client->handle = client_h;
+
+  ret = nns_edge_start (server_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_start (client_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  usleep (200000);
+
+  ret = nns_edge_connect (client_h, "127.0.0.1", 1883);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  usleep (10000);
+
+  sleep (2);
+
+  /* Send request to server */
+  data_len = 10U * sizeof (unsigned int);
+  data = malloc (data_len);
+  ASSERT_TRUE (data != NULL);
+
+  for (i = 0; i < 10U; i++)
+    ((unsigned int *) data)[i] = i;
+
+  ret = nns_edge_data_create (&data_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_data_add (data_h, data, data_len, nns_edge_free);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  nns_edge_get_info (client_h, "client_id", &val);
+  nns_edge_data_set_info (data_h, "client_id", val);
+  SAFE_FREE (val);
+
+  ret = nns_edge_data_set_info (data_h, "test-key", "test-value");
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  for (i = 0; i < 5U; i++) {
+    ret = nns_edge_send (client_h, data_h);
+    EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+    usleep (10000);
+  }
+
+  ret = nns_edge_data_destroy (data_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  /* Wait for responding data (20 seconds) */
+  retry = 0U;
+  do {
+    usleep (100000);
+    if (_td_client->received > 0)
+      break;
+  } while (retry++ < 200U);
+
+  ret = nns_edge_release_handle (server_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_release_handle (client_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  EXPECT_TRUE (_td_server->received > 0);
+  EXPECT_TRUE (_td_client->received > 0);
+
+  _free_test_data (_td_server);
+  _free_test_data (_td_client);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam1_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam2_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam3_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam4_n)
+{
+  int ret = -1;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid host address.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam5_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid port number.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam6_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Close the mqtt handle with invalid param.
+ */
+TEST(edgeMqttHybrid, closeInvalidParam_n)
+{
+  int ret = -1;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_close (NULL);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Publish with invalid param.
+ */
+TEST(edgeMqttHybrid, publishInvalidParam_n)
+{
+  int ret = -1;
+  const char *msg = "TEMP_MESSAGE";
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Publish with invalid param.
+ */
+TEST(edgeMqttHybrid, publishInvalidParam2_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+  const char *msg = "TEMP_MESSAGE";
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  /* data is null */
+  ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_close (broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Publish with invalid param.
+ */
+TEST(edgeMqttHybrid, publishInvalidParam3_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+  const char *msg = "TEMP_MESSAGE";
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  /* data length is 0 */
+  ret = nns_edge_mqtt_publish (broker_h, msg, 0);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_close (broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Subscribe the topic with invalid param.
+ */
+TEST(edgeMqttHybrid, subscribeInvalidParam_n)
+{
+  int ret = -1;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_subscribe (NULL);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqttHybrid, getMessageInvalidParam1_n)
+{
+  int ret = -1;
+  void *msg = NULL;
+  nns_size_t msg_len;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqttHybrid, getMessageInvalidParam2_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+  nns_size_t msg_len;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_close (broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqttHybrid, getMessageInvalidParam3_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+  void *msg = NULL;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_close (broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message from empty message queue.
+ */
+TEST(edgeMqttHybrid, getMessageWithinTimeout_n)
+{
+  int ret = -1;
+  nns_edge_broker_h broker_h;
+  void *msg = NULL;
+  nns_size_t msg_len;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len);
+  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_mqtt_close (broker_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Edge event callback for test MQTT data transmission.
+ */
+static int
+_test_edge_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+  ne_test_data_s *_td = (ne_test_data_s *) user_data;
+  nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN;
+  nns_edge_data_h data_h;
+  void *data;
+  nns_size_t data_len;
+  unsigned int i, count;
+  int ret;
+
+  if (!_td) {
+    /* Cannot update event status. */
+    return NNS_EDGE_ERROR_NONE;
+  }
+
+  ret = nns_edge_event_get_type (event_h, &event);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  switch (event) {
+    case NNS_EDGE_EVENT_CALLBACK_RELEASED:
+      _td->event_cb_released = true;
+      break;
+    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+      _td->received++;
+      ret = nns_edge_event_parse_new_data (event_h, &data_h);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+      /* Compare received data */
+      ret = nns_edge_data_get_count (data_h, &count);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+      EXPECT_EQ (count, 2U);
+
+      ret = nns_edge_data_get (data_h, 0, &data, &data_len);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+      for (i = 0; i < 10U; i++)
+        EXPECT_EQ (((unsigned int *) data)[i], i);
+
+      ret = nns_edge_data_get (data_h, 1, &data, &data_len);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+      for (i = 0; i < 20U; i++)
+        EXPECT_EQ (((unsigned int *) data)[i], 20 - i);
+
+      ret = nns_edge_data_destroy (data_h);
+      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+      break;
+    default:
+      break;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Connect to local host, multiple clients.
+ */
+TEST(edgeMqtt, connectLocal)
+{
+  nns_edge_h server_h, client1_h, client2_h;
+  ne_test_data_s *_td_server, *_td_client1, *_td_client2;
+  nns_edge_data_h data_h;
+  nns_size_t data_len;
+  void *data1, *data2;
+  unsigned int i, retry;
+  int ret, port;
+  char *val;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  _td_server = _get_test_data (true);
+  _td_client1 = _get_test_data (false);
+  _td_client2 = _get_test_data (false);
+  ASSERT_TRUE (_td_server != NULL && _td_client1 != NULL && _td_client2 != NULL);
+  port = nns_edge_get_available_port ();
+
+  /* Prepare server (127.0.0.1:port) */
+  val = nns_edge_strdup_printf ("%d", port);
+  nns_edge_create_handle ("temp-sender", NNS_EDGE_CONNECT_TYPE_MQTT,
+      NNS_EDGE_NODE_TYPE_PUB, &server_h);
+  nns_edge_set_info (server_h, "IP", "127.0.0.1");
+  nns_edge_set_info (server_h, "PORT", val);
+  nns_edge_set_info (server_h, "DEST_IP", "127.0.0.1");
+  nns_edge_set_info (server_h, "DEST_PORT", "1883");
+  nns_edge_set_info (server_h, "TOPIC", "MQTT_TEST_TOPIC");
+  _td_server->handle = server_h;
+  SAFE_FREE (val);
+
+  /* Prepare client */
+  nns_edge_create_handle ("temp-receiver", NNS_EDGE_CONNECT_TYPE_MQTT,
+      NNS_EDGE_NODE_TYPE_SUB, &client1_h);
+  nns_edge_set_event_callback (client1_h, _test_edge_event_cb, _td_client1);
+  nns_edge_set_info (client1_h, "TOPIC", "MQTT_TEST_TOPIC");
+  _td_client1->handle = client1_h;
+
+  nns_edge_create_handle ("temp-client2", NNS_EDGE_CONNECT_TYPE_MQTT,
+      NNS_EDGE_NODE_TYPE_SUB, &client2_h);
+  nns_edge_set_event_callback (client2_h, _test_edge_event_cb, _td_client2);
+  nns_edge_set_info (client2_h, "TOPIC", "MQTT_TEST_TOPIC");
+  _td_client2->handle = client2_h;
+
+
+  ret = nns_edge_start (server_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_start (client1_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_start (client2_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  usleep (200000);
+
+  ret = nns_edge_connect (client1_h, "127.0.0.1", 1883);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  usleep (10000);
+  ret = nns_edge_connect (client2_h, "127.0.0.1", 1883);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  sleep (2);
+
+  /* Send request to server */
+  data_len = 10U * sizeof (unsigned int);
+  data1 = malloc (data_len);
+  ASSERT_TRUE (data1 != NULL);
+
+  data2 = malloc (data_len * 2);
+  ASSERT_TRUE (data2 != NULL);
+
+  for (i = 0; i < 10U; i++)
+    ((unsigned int *) data1)[i] = i;
+
+  for (i = 0; i < 20U; i++)
+    ((unsigned int *) data2)[i] = 20 - i;
+
+  ret = nns_edge_data_create (&data_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_data_add (data_h, data1, data_len, nns_edge_free);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_data_add (data_h, data2, data_len * 2, nns_edge_free);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  for (i = 0; i < 5U; i++) {
+    ret = nns_edge_send (server_h, data_h);
+    EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+    usleep (10000);
+  }
+
+  ret = nns_edge_data_destroy (data_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  /* Wait for responding data (20 seconds) */
+  retry = 0U;
+  do {
+    usleep (100000);
+    if (_td_client1->received > 0 && _td_client2->received > 0)
+      break;
+  } while (retry++ < 200U);
+
+  ret = nns_edge_disconnect (server_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  ret = nns_edge_release_handle (server_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_release_handle (client1_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+  ret = nns_edge_release_handle (client2_h);
+  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+  EXPECT_TRUE (_td_client1->received > 0);
+  EXPECT_TRUE (_td_client2->received > 0);
+
+  _free_test_data (_td_server);
+  _free_test_data (_td_client1);
+  _free_test_data (_td_client2);
+}
+
+/**
+ * @brief Check connection with invalid param.
+ */
+TEST(edgeMqtt, checkConnectionInvalidParam_n)
+{
+  int ret = -1;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  ret = nns_edge_mqtt_is_connected (NULL);
+  EXPECT_NE (ret, true);
+}
+
+/**
+ * @brief Main gtest
+ */
+int
+main (int argc, char **argv)
+{
+  int result = -1;
+
+  try {
+    testing::InitGoogleTest (&argc, argv);
+  } catch (...) {
+    nns_edge_loge ("Catch exception, failed to init google test.");
+  }
+
+  try {
+    result = RUN_ALL_TESTS ();
+  } catch (...) {
+    nns_edge_loge ("Catch exception, failed to run the unittest.");
+  }
+
+  return result;
+}
index e6b564a7ae49f1c68709363ad12640da10730784..9bd8cdced115dd399d0c851e0790d2cee64f04d9 100644 (file)
@@ -3657,463 +3657,6 @@ TEST(edgeQueue, waitPopInvalidParam03_n)
   EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
 }
 
-#if defined(ENABLE_MQTT)
-/**
- * @brief Edge event callback for test.
- */
-static int
-_test_edge_hybrid_event_cb (nns_edge_event_h event_h, void *user_data)
-{
-  ne_test_data_s *_td = (ne_test_data_s *) user_data;
-  nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN;
-  nns_edge_data_h data_h;
-  void *data;
-  nns_size_t data_len;
-  char *val;
-  unsigned int i, count;
-  int ret;
-
-  if (!_td) {
-    /* Cannot update event status. */
-    return NNS_EDGE_ERROR_NONE;
-  }
-
-  ret = nns_edge_event_get_type (event_h, &event);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  switch (event) {
-    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
-      _td->received++;
-
-      ret = nns_edge_event_parse_new_data (event_h, &data_h);
-      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-      /* Compare metadata */
-      ret = nns_edge_data_get_info (data_h, "test-key", &val);
-      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-      EXPECT_STREQ (val, "test-value");
-      SAFE_FREE (val);
-
-      if (_td->is_server) {
-        /**
-         * @note This is test code, responding to client.
-         * Recommend not to call edge API in event callback.
-         */
-        ret = nns_edge_send (_td->handle, data_h);
-        EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-      } else {
-        /* Compare received data */
-        ret = nns_edge_data_get_count (data_h, &count);
-        EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-        ret = nns_edge_data_get (data_h, 0, &data, &data_len);
-        EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-        EXPECT_EQ (count, 1U);
-        for (i = 0; i < 10U; i++)
-          EXPECT_EQ (((unsigned int *) data)[i], i);
-      }
-
-      ret = nns_edge_data_destroy (data_h);
-      EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-      break;
-    default:
-      break;
-  }
-
-  return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Check whether MQTT broker is running or not.
- */
-static bool
-_check_mqtt_broker ()
-{
-  int ret = 0;
-
-  ret = system ("ps aux | grep mosquitto | grep -v grep");
-  if (0 != ret) {
-    nns_edge_logw ("MQTT broker is not running. Skip query hybrid test.");
-    return false;
-  }
-
-  return true;
-}
-
-/**
- * @brief Connect to the local host using the information received from mqtt.
- */
-TEST(edgeMqtt, connectLocal)
-{
-  nns_edge_h server_h, client_h;
-  ne_test_data_s *_td_server, *_td_client;
-  nns_edge_data_h data_h;
-  nns_size_t data_len;
-  void *data;
-  unsigned int i, retry;
-  int ret = 0;
-  char *val;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  _td_server = _get_test_data (true);
-  _td_client = _get_test_data (false);
-  ASSERT_TRUE (_td_server != NULL && _td_client != NULL);
-
-  /* Prepare server (127.0.0.1:port) */
-  nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
-      NNS_EDGE_NODE_TYPE_QUERY_SERVER, &server_h);
-  nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server);
-  nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1");
-  nns_edge_set_info (server_h, "DEST_PORT", "1883");
-  nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic");
-  nns_edge_set_info (server_h, "CAPS", "test server");
-  nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW");
-  _td_server->handle = server_h;
-
-  /* Prepare client */
-  nns_edge_create_handle ("temp-client", NNS_EDGE_CONNECT_TYPE_HYBRID,
-     NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &client_h);
-  nns_edge_set_event_callback (client_h, _test_edge_hybrid_event_cb, _td_client);
-  nns_edge_set_info (client_h, "CAPS", "test client");
-  nns_edge_set_info (client_h, "TOPIC", "temp-mqtt-topic");
-  _td_client->handle = client_h;
-
-  ret = nns_edge_start (server_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  ret = nns_edge_start (client_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  usleep (200000);
-
-  ret = nns_edge_connect (client_h, "127.0.0.1", 1883);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  usleep (10000);
-
-  sleep (2);
-
-  /* Send request to server */
-  data_len = 10U * sizeof (unsigned int);
-  data = malloc (data_len);
-  ASSERT_TRUE (data != NULL);
-
-  for (i = 0; i < 10U; i++)
-    ((unsigned int *) data)[i] = i;
-
-  ret = nns_edge_data_create (&data_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_data_add (data_h, data, data_len, nns_edge_free);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  nns_edge_get_info (client_h, "client_id", &val);
-  nns_edge_data_set_info (data_h, "client_id", val);
-  SAFE_FREE (val);
-
-  ret = nns_edge_data_set_info (data_h, "test-key", "test-value");
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  for (i = 0; i < 5U; i++) {
-    ret = nns_edge_send (client_h, data_h);
-    EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-    usleep (10000);
-  }
-
-  ret = nns_edge_data_destroy (data_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  /* Wait for responding data (20 seconds) */
-  retry = 0U;
-  do {
-    usleep (100000);
-    if (_td_client->received > 0)
-      break;
-  } while (retry++ < 200U);
-
-  ret = nns_edge_release_handle (server_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  ret = nns_edge_release_handle (client_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  EXPECT_TRUE (_td_server->received > 0);
-  EXPECT_TRUE (_td_client->received > 0);
-
-  _free_test_data (_td_server);
-  _free_test_data (_td_client);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam1_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam2_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam3_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam4_n)
-{
-  int ret = -1;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid host address.
- */
-TEST(edgeMqtt, connectInvalidParam5_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid port number.
- */
-TEST(edgeMqtt, connectInvalidParam6_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Close the mqtt handle with invalid param.
- */
-TEST(edgeMqtt, closeInvalidParam_n)
-{
-  int ret = -1;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_close (NULL);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Publish with invalid param.
- */
-TEST(edgeMqtt, publishInvalidParam_n)
-{
-  int ret = -1;
-  const char *msg = "TEMP_MESSAGE";
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Publish with invalid param.
- */
-TEST(edgeMqtt, publishInvalidParam2_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-  const char *msg = "TEMP_MESSAGE";
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  /* data is null */
-  ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_close (broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Publish with invalid param.
- */
-TEST(edgeMqtt, publishInvalidParam3_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-  const char *msg = "TEMP_MESSAGE";
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  /* data length is 0 */
-  ret = nns_edge_mqtt_publish (broker_h, msg, 0);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_close (broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Subscribe the topic with invalid param.
- */
-TEST(edgeMqtt, subscribeInvalidParam_n)
-{
-  int ret = -1;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_subscribe (NULL);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message with invalid param.
- */
-TEST(edgeMqtt, getMessageInvalidParam1_n)
-{
-  int ret = -1;
-  void *msg = NULL;
-  nns_size_t msg_len;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message with invalid param.
- */
-TEST(edgeMqtt, getMessageInvalidParam2_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-  nns_size_t msg_len;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_close (broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message with invalid param.
- */
-TEST(edgeMqtt, getMessageInvalidParam3_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-  void *msg = NULL;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_close (broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message from empty message queue.
- */
-TEST(edgeMqtt, getMessageWithinTimeout_n)
-{
-  int ret = -1;
-  nns_edge_broker_h broker_h;
-  void *msg = NULL;
-  nns_size_t msg_len;
-
-  if (!_check_mqtt_broker ())
-    return;
-
-  ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len);
-  EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
-  ret = nns_edge_mqtt_close (broker_h);
-  EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-#endif /* ENABLE_MQTT */
-
 /**
  * @brief Main gtest
  */