From 0c6e13227881ac270b54d8fd80a3c5065e1526a8 Mon Sep 17 00:00:00 2001 From: gichan Date: Mon, 13 Mar 2023 16:44:23 +0900 Subject: [PATCH] [MQTT] Support MQTT direct data transmission - Support MQTT direct data transmission. - Add unit test for mqtt Signed-off-by: gichan --- packaging/nnstreamer-edge.spec | 8 + .../nnstreamer-edge-internal.c | 155 ++-- .../nnstreamer-edge-mqtt-mosquitto.c | 82 +- .../nnstreamer-edge-mqtt-paho.c | 81 +- src/libnnstreamer-edge/nnstreamer-edge-mqtt.h | 13 + tests/CMakeLists.txt | 8 + tests/unittest_nnstreamer-edge-mqtt.cc | 727 ++++++++++++++++++ tests/unittest_nnstreamer-edge.cc | 457 ----------- 8 files changed, 1021 insertions(+), 510 deletions(-) create mode 100644 tests/unittest_nnstreamer-edge-mqtt.cc diff --git a/packaging/nnstreamer-edge.spec b/packaging/nnstreamer-edge.spec index f4a4bb0..31e14ff 100644 --- a/packaging/nnstreamer-edge.spec +++ b/packaging/nnstreamer-edge.spec @@ -142,6 +142,10 @@ LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge-aitt %endif +%if 0%{?mqtt_support} +LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge-mqtt +%endif + %if 0%{?testcoverage} # 'lcov' generates the date format with UTC time zone by default. Let's replace UTC with KST. # If you can get a root privilege, run ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime @@ -191,6 +195,10 @@ rm -rf %{buildroot} %{_bindir}/unittest_nnstreamer-edge-aitt %endif +%if 0%{?mqtt_support} +%{_bindir}/unittest_nnstreamer-edge-mqtt +%endif + %if 0%{?testcoverage} %files unittest-coverage %{_datadir}/nnstreamer-edge/unittest/* diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 8eef0dc..dbc1b29 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -862,6 +862,11 @@ _nns_edge_send_thread (void *thread_data) if (NNS_EDGE_ERROR_NONE != ret) nns_edge_loge ("Failed to send data via AITT connection."); break; + case NNS_EDGE_CONNECT_TYPE_MQTT: + ret = nns_edge_mqtt_publish_data (eh->broker_h, data_h); + if (NNS_EDGE_ERROR_NONE != ret) + nns_edge_loge ("Failed to send data via MQTT connection."); + break; default: break; } @@ -1283,8 +1288,9 @@ nns_edge_start (nns_edge_h edge_h) if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type) || (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) { - if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { - char *topic, *msg; + if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type + || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) { + char *topic; /** @todo Set unique device name. * Device name should be unique. Consider using MAC address later. @@ -1303,14 +1309,24 @@ nns_edge_start (nns_edge_h edge_h) goto done; } - msg = nns_edge_get_host_string (eh->host, eh->port); + if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { + char *msg; + msg = nns_edge_get_host_string (eh->host, eh->port); - ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1); - SAFE_FREE (msg); + ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1); + SAFE_FREE (msg); - if (NNS_EDGE_ERROR_NONE != ret) { - nns_edge_loge ("Failed to publish the meesage to broker."); - goto done; + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to publish the meesage to broker."); + goto done; + } + } else { + ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb, + eh->user_data); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to set event callback to MQTT broker."); + goto done; + } } } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) { ret = nns_edge_aitt_connect (eh->id, eh->topic, eh->dest_host, @@ -1370,6 +1386,7 @@ nns_edge_release_handle (nns_edge_h edge_h) switch (eh->connect_type) { case NNS_EDGE_CONNECT_TYPE_HYBRID: + case NNS_EDGE_CONNECT_TYPE_MQTT: if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) { nns_edge_logw ("Failed to close mqtt connection."); } @@ -1464,6 +1481,71 @@ nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb, return NNS_EDGE_ERROR_NONE; } +/** + * @brief Parse the message received from the MQTT broker and connect to the server directly. + */ +static int +_mqtt_hybrid_direct_connection (nns_edge_handle_s * eh) +{ + int ret; + + do { + char *msg = NULL; + char *server_ip = NULL; + int server_port = 0; + nns_size_t msg_len = 0; + + ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len); + if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0) + return ret; + + nns_edge_parse_host_string (msg, &server_ip, &server_port); + SAFE_FREE (msg); + + nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip, + server_port); + + ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port); + SAFE_FREE (server_ip); + + if (NNS_EDGE_ERROR_NONE == ret) { + return ret; + } + } while (TRUE); + + return ret; +} + +/** + * @brief Start subsciption to MQTT message + */ +static int +_nns_edge_start_mqtt_sub (nns_edge_handle_s * eh) +{ + char *topic; + int ret; + + if (!nns_edge_mqtt_is_connected (eh->broker_h)) { + topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic); + + ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port, + &eh->broker_h); + SAFE_FREE (topic); + + if (NNS_EDGE_ERROR_NONE != ret) { + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + + ret = nns_edge_mqtt_subscribe (eh->broker_h); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic); + return ret; + } + } + + return NNS_EDGE_ERROR_NONE; +} + /** * @brief Connect to the destination node. */ @@ -1506,51 +1588,22 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) eh->dest_host = nns_edge_strdup (dest_host); eh->dest_port = dest_port; - if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { - char *topic; - - if (!nns_edge_mqtt_is_connected (eh->broker_h)) { - topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic); - - ret = nns_edge_mqtt_connect (eh->id, topic, dest_host, dest_port, - &eh->broker_h); - SAFE_FREE (topic); - - if (NNS_EDGE_ERROR_NONE != ret) { - nns_edge_loge ("Connection failure to broker."); - goto done; - } + if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type + || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) { + ret = _nns_edge_start_mqtt_sub (eh); + if (NNS_EDGE_ERROR_NONE != ret) + goto done; - ret = nns_edge_mqtt_subscribe (eh->broker_h); + if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { + ret = _mqtt_hybrid_direct_connection (eh); + } else { + ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb, + eh->user_data); if (NNS_EDGE_ERROR_NONE != ret) { - nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic); - goto done; + nns_edge_loge ("Failed to set event callback to MQTT broker."); + return ret; } } - - do { - char *msg = NULL; - char *server_ip = NULL; - int server_port = 0; - nns_size_t msg_len = 0; - - ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len); - if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0) - break; - - nns_edge_parse_host_string (msg, &server_ip, &server_port); - SAFE_FREE (msg); - - nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip, - server_port); - - ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port); - SAFE_FREE (server_ip); - - if (NNS_EDGE_ERROR_NONE == ret) { - break; - } - } while (TRUE); } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) { ret = nns_edge_aitt_connect (eh->id, eh->topic, dest_host, dest_port, &eh->broker_h); @@ -1624,6 +1677,10 @@ _nns_edge_is_connected (nns_edge_h edge_h) NNS_EDGE_ERROR_NONE == nns_edge_aitt_is_connected (eh->broker_h)) return true; + if (NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type && + nns_edge_mqtt_is_connected (eh->broker_h)) + return true; + conn_data = (nns_edge_conn_data_s *) eh->connections; while (conn_data) { conn = conn_data->sink_conn; diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c index 460599a..cd8a659 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -19,6 +19,8 @@ #include "nnstreamer-edge-log.h" #include "nnstreamer-edge-util.h" #include "nnstreamer-edge-queue.h" +#include "nnstreamer-edge-data.h" +#include "nnstreamer-edge-event.h" /** * @brief Data structure for mqtt broker handle. @@ -32,6 +34,10 @@ typedef struct char *host; int port; bool connected; + + /* event callback for new message */ + nns_edge_event_cb event_cb; + void *user_data; } nns_edge_broker_s; /** @@ -44,6 +50,7 @@ on_message_callback (struct mosquitto *client, void *data, nns_edge_broker_s *bh = (nns_edge_broker_s *) data; char *msg = NULL; nns_size_t msg_len; + int ret; if (!bh) { nns_edge_loge ("Invalid param, given broker handle is invalid."); @@ -60,8 +67,31 @@ on_message_callback (struct mosquitto *client, void *data, msg_len = (nns_size_t) message->payloadlen; msg = nns_edge_memdup (message->payload, msg_len); - if (msg) - nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free); + + if (msg) { + if (bh->event_cb) { + nns_edge_data_h data_h; + + if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create data handle in msg thread."); + return; + } + + nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len); + + ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data, + NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h), + NULL); + if (ret != NNS_EDGE_ERROR_NONE) + nns_edge_loge ("Failed to send an event for received message."); + + nns_edge_data_destroy (data_h); + SAFE_FREE (msg); + return; + } else { + nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free); + } + } return; } @@ -127,6 +157,8 @@ _nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host, bh->host = nns_edge_strdup (host); bh->port = port; bh->connected = true; + bh->event_cb = NULL; + bh->user_data = NULL; *broker_h = bh; return NNS_EDGE_ERROR_NONE; @@ -221,6 +253,30 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) return NNS_EDGE_ERROR_NONE; } +/** + * @brief Internal util function to send edge-data via MQTT connection. + */ +int +nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h) +{ + int ret; + void *data = NULL; + nns_size_t size; + + ret = nns_edge_data_serialize (data_h, &data, &size); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to serialize the edge data."); + return ret; + } + + ret = nns_edge_mqtt_publish (handle, data, size); + if (NNS_EDGE_ERROR_NONE != ret) + nns_edge_loge ("Failed to send data to destination."); + + SAFE_FREE (data); + return ret; +} + /** * @brief Publish raw data. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. @@ -354,3 +410,25 @@ nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h) return bh->connected; } + +/** + * @brief Set event callback for new message. + */ +int +nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h, + nns_edge_event_cb cb, void *user_data) +{ + nns_edge_broker_s *bh; + + if (!broker_h) { + nns_edge_loge ("Invalid param, given MQTT handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) broker_h; + + bh->event_cb = cb; + bh->user_data = user_data; + + return NNS_EDGE_ERROR_NONE; +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c index 804795f..ef90e4f 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c @@ -19,6 +19,8 @@ #include "nnstreamer-edge-log.h" #include "nnstreamer-edge-util.h" #include "nnstreamer-edge-queue.h" +#include "nnstreamer-edge-data.h" +#include "nnstreamer-edge-event.h" /** * @brief Data structure for mqtt broker handle. @@ -31,6 +33,10 @@ typedef struct char *topic; char *host; int port; + + /* event callback for new message */ + nns_edge_event_cb event_cb; + void *user_data; } nns_edge_broker_s; /** @@ -64,8 +70,31 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, msg_len = (nns_size_t) message->payloadlen; msg = nns_edge_memdup (message->payload, msg_len); - if (msg) - nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free); + + if (msg) { + if (bh->event_cb) { + nns_edge_data_h data_h; + + if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create data handle in msg thread."); + return; + } + + nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len); + + ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data, + NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h), + NULL); + if (ret != NNS_EDGE_ERROR_NONE) + nns_edge_loge ("Failed to send an event for received message."); + + nns_edge_data_destroy (data_h); + SAFE_FREE (msg); + return; + } else { + nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free); + } + } return TRUE; } @@ -138,6 +167,8 @@ nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, bh->host = nns_edge_strdup (host); bh->port = port; bh->mqtt_h = handle; + bh->event_cb = NULL; + bh->user_data = NULL; nns_edge_queue_create (&bh->message_queue); MQTTAsync_setCallbacks (handle, bh, NULL, mqtt_cb_message_arrived, NULL); @@ -231,6 +262,30 @@ nns_edge_mqtt_close (nns_edge_broker_h broker_h) return NNS_EDGE_ERROR_NONE; } +/** + * @brief Internal util function to send edge-data via MQTT connection. + */ +int +nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h) +{ + int ret; + void *data = NULL; + nns_size_t size; + + ret = nns_edge_data_serialize (data_h, &data, &size); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to serialize the edge data."); + return ret; + } + + ret = nns_edge_mqtt_publish (handle, data, size); + if (NNS_EDGE_ERROR_NONE != ret) + nns_edge_loge ("Failed to send data to destination."); + + SAFE_FREE (data); + return ret; +} + /** * @brief Publish raw data. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. @@ -375,3 +430,25 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, return NNS_EDGE_ERROR_NONE; } + +/** + * @brief Set event callback for new message. + */ +int +nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h, + nns_edge_event_cb cb, void *user_data) +{ + nns_edge_broker_s *bh; + + if (!broker_h) { + nns_edge_loge ("Invalid param, given MQTT handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) broker_h; + + bh->event_cb = cb; + bh->user_data = user_data; + + return NNS_EDGE_ERROR_NONE; +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h index f6bc732..2a8d642 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h @@ -56,6 +56,17 @@ bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h); * @brief Get message from mqtt broker. If no message in the queue, it waits up to 1 second for new message. */ int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t *msg_len); + +/** + * @brief Internal util function to send edge-data via MQTT connection. + */ +int nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h); + +/** + * @brief Set event callback for new message. + */ +int nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h, nns_edge_event_cb cb, void *user_data); + #else /** * @todo consider to change code style later. @@ -72,6 +83,8 @@ int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_ #define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #define nns_edge_mqtt_is_connected(...) (false) #define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_publish_data(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_set_event_callback(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #endif /* ENABLE_MQTT */ #ifdef __cplusplus diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 3efd8c0..65bc4e0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -11,3 +11,11 @@ TARGET_INCLUDE_DIRECTORIES(unittest_nnstreamer-edge-aitt PRIVATE ${EDGE_REQUIRE_ TARGET_LINK_LIBRARIES(unittest_nnstreamer-edge-aitt ${TEST_REQUIRE_PKGS_LDFLAGS} nnstreamer-edge) INSTALL (TARGETS unittest_nnstreamer-edge-aitt DESTINATION ${BIN_INSTALL_DIR}) ENDIF() + +# MQTT test +IF(MQTT_SUPPORT) +ADD_EXECUTABLE(unittest_nnstreamer-edge-mqtt unittest_nnstreamer-edge-mqtt.cc) +TARGET_INCLUDE_DIRECTORIES(unittest_nnstreamer-edge-mqtt PRIVATE ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS} ${INCLUDE_DIR} ${NNS_EDGE_SRC_DIR}) +TARGET_LINK_LIBRARIES(unittest_nnstreamer-edge-mqtt ${TEST_REQUIRE_PKGS_LDFLAGS} nnstreamer-edge) +INSTALL (TARGETS unittest_nnstreamer-edge-mqtt DESTINATION ${BIN_INSTALL_DIR}) +ENDIF() diff --git a/tests/unittest_nnstreamer-edge-mqtt.cc b/tests/unittest_nnstreamer-edge-mqtt.cc new file mode 100644 index 0000000..1a55a92 --- /dev/null +++ b/tests/unittest_nnstreamer-edge-mqtt.cc @@ -0,0 +1,727 @@ +/** + * @file unittest_nnstreamer-edge-mqtt.cc + * @date 15 Mar 2023 + * @brief Unittest for nnstreamer-edge MQTT direct data transmission. + * @see https://github.com/nnstreamer/nnstreamer-edge + * @author Gichan Jang + * @bug No known bugs + */ + +#include +#include "nnstreamer-edge.h" +#include "nnstreamer-edge-mqtt.h" +#include "nnstreamer-edge-log.h" +#include "nnstreamer-edge-util.h" + +/** + * @brief Data struct for unittest. + */ +typedef struct +{ + nns_edge_h handle; + bool running; + bool is_server; + bool event_cb_released; + unsigned int received; +} ne_test_data_s; + +/** + * @brief Allocate and initialize test data. + */ +static ne_test_data_s * +_get_test_data (bool is_server) +{ + ne_test_data_s *_td; + + _td = (ne_test_data_s *) calloc (1, sizeof (ne_test_data_s)); + + if (_td) { + _td->is_server = is_server; + } + + return _td; +} + +/** + * @brief Release test data. + */ +static void +_free_test_data (ne_test_data_s *_td) +{ + if (!_td) + return; + + SAFE_FREE (_td); +} + +/** + * @brief Edge event callback for test. + */ +static int +_test_edge_hybrid_event_cb (nns_edge_event_h event_h, void *user_data) +{ + ne_test_data_s *_td = (ne_test_data_s *) user_data; + nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN; + nns_edge_data_h data_h; + void *data; + nns_size_t data_len; + char *val; + unsigned int i, count; + int ret; + + if (!_td) { + /* Cannot update event status. */ + return NNS_EDGE_ERROR_NONE; + } + + ret = nns_edge_event_get_type (event_h, &event); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + switch (event) { + case NNS_EDGE_EVENT_NEW_DATA_RECEIVED: + _td->received++; + + ret = nns_edge_event_parse_new_data (event_h, &data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* Compare metadata */ + ret = nns_edge_data_get_info (data_h, "test-key", &val); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + EXPECT_STREQ (val, "test-value"); + SAFE_FREE (val); + + if (_td->is_server) { + /** + * @note This is test code, responding to client. + * Recommend not to call edge API in event callback. + */ + ret = nns_edge_send (_td->handle, data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + } else { + /* Compare received data */ + ret = nns_edge_data_get_count (data_h, &count); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_data_get (data_h, 0, &data, &data_len); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + EXPECT_EQ (count, 1U); + for (i = 0; i < 10U; i++) + EXPECT_EQ (((unsigned int *) data)[i], i); + } + + ret = nns_edge_data_destroy (data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + break; + default: + break; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Check whether MQTT broker is running or not. + */ +static bool +_check_mqtt_broker () +{ + int ret = 0; + + ret = system ("ps aux | grep mosquitto | grep -v grep"); + if (0 != ret) { + nns_edge_logw ("MQTT broker is not running. Skip query hybrid test."); + return false; + } + + return true; +} + +/** + * @brief Connect to the local host using the information received from mqtt. + */ +TEST(edgeMqttHybrid, connectLocal) +{ + nns_edge_h server_h, client_h; + ne_test_data_s *_td_server, *_td_client; + nns_edge_data_h data_h; + nns_size_t data_len; + void *data; + unsigned int i, retry; + int ret = 0; + char *val; + + if (!_check_mqtt_broker ()) + return; + + _td_server = _get_test_data (true); + _td_client = _get_test_data (false); + ASSERT_TRUE (_td_server != NULL && _td_client != NULL); + + /* Prepare server (127.0.0.1:port) */ + nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, + NNS_EDGE_NODE_TYPE_QUERY_SERVER, &server_h); + nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server); + nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1"); + nns_edge_set_info (server_h, "DEST_PORT", "1883"); + nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic"); + nns_edge_set_info (server_h, "CAPS", "test server"); + nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW"); + _td_server->handle = server_h; + + /* Prepare client */ + nns_edge_create_handle ("temp-client", NNS_EDGE_CONNECT_TYPE_HYBRID, + NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &client_h); + nns_edge_set_event_callback (client_h, _test_edge_hybrid_event_cb, _td_client); + nns_edge_set_info (client_h, "CAPS", "test client"); + nns_edge_set_info (client_h, "TOPIC", "temp-mqtt-topic"); + _td_client->handle = client_h; + + ret = nns_edge_start (server_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_start (client_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + usleep (200000); + + ret = nns_edge_connect (client_h, "127.0.0.1", 1883); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + usleep (10000); + + sleep (2); + + /* Send request to server */ + data_len = 10U * sizeof (unsigned int); + data = malloc (data_len); + ASSERT_TRUE (data != NULL); + + for (i = 0; i < 10U; i++) + ((unsigned int *) data)[i] = i; + + ret = nns_edge_data_create (&data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_data_add (data_h, data, data_len, nns_edge_free); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + nns_edge_get_info (client_h, "client_id", &val); + nns_edge_data_set_info (data_h, "client_id", val); + SAFE_FREE (val); + + ret = nns_edge_data_set_info (data_h, "test-key", "test-value"); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + for (i = 0; i < 5U; i++) { + ret = nns_edge_send (client_h, data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + usleep (10000); + } + + ret = nns_edge_data_destroy (data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* Wait for responding data (20 seconds) */ + retry = 0U; + do { + usleep (100000); + if (_td_client->received > 0) + break; + } while (retry++ < 200U); + + ret = nns_edge_release_handle (server_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_release_handle (client_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + EXPECT_TRUE (_td_server->received > 0); + EXPECT_TRUE (_td_client->received > 0); + + _free_test_data (_td_server); + _free_test_data (_td_client); +} + +/** + * @brief Connect to the mqtt broker with invalid param. + */ +TEST(edgeMqttHybrid, connectInvalidParam1_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Connect to the mqtt broker with invalid param. + */ +TEST(edgeMqttHybrid, connectInvalidParam2_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Connect to the mqtt broker with invalid param. + */ +TEST(edgeMqttHybrid, connectInvalidParam3_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Connect to the mqtt broker with invalid param. + */ +TEST(edgeMqttHybrid, connectInvalidParam4_n) +{ + int ret = -1; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Connect to the mqtt broker with invalid host address. + */ +TEST(edgeMqttHybrid, connectInvalidParam5_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Connect to the mqtt broker with invalid port number. + */ +TEST(edgeMqttHybrid, connectInvalidParam6_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Close the mqtt handle with invalid param. + */ +TEST(edgeMqttHybrid, closeInvalidParam_n) +{ + int ret = -1; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_close (NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Publish with invalid param. + */ +TEST(edgeMqttHybrid, publishInvalidParam_n) +{ + int ret = -1; + const char *msg = "TEMP_MESSAGE"; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Publish with invalid param. + */ +TEST(edgeMqttHybrid, publishInvalidParam2_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + const char *msg = "TEMP_MESSAGE"; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* data is null */ + ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Publish with invalid param. + */ +TEST(edgeMqttHybrid, publishInvalidParam3_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + const char *msg = "TEMP_MESSAGE"; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* data length is 0 */ + ret = nns_edge_mqtt_publish (broker_h, msg, 0); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Subscribe the topic with invalid param. + */ +TEST(edgeMqttHybrid, subscribeInvalidParam_n) +{ + int ret = -1; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_subscribe (NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message with invalid param. + */ +TEST(edgeMqttHybrid, getMessageInvalidParam1_n) +{ + int ret = -1; + void *msg = NULL; + nns_size_t msg_len; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message with invalid param. + */ +TEST(edgeMqttHybrid, getMessageInvalidParam2_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + nns_size_t msg_len; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message with invalid param. + */ +TEST(edgeMqttHybrid, getMessageInvalidParam3_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + void *msg = NULL; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message from empty message queue. + */ +TEST(edgeMqttHybrid, getMessageWithinTimeout_n) +{ + int ret = -1; + nns_edge_broker_h broker_h; + void *msg = NULL; + nns_size_t msg_len; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (broker_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Edge event callback for test MQTT data transmission. + */ +static int +_test_edge_event_cb (nns_edge_event_h event_h, void *user_data) +{ + ne_test_data_s *_td = (ne_test_data_s *) user_data; + nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN; + nns_edge_data_h data_h; + void *data; + nns_size_t data_len; + unsigned int i, count; + int ret; + + if (!_td) { + /* Cannot update event status. */ + return NNS_EDGE_ERROR_NONE; + } + + ret = nns_edge_event_get_type (event_h, &event); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + switch (event) { + case NNS_EDGE_EVENT_CALLBACK_RELEASED: + _td->event_cb_released = true; + break; + case NNS_EDGE_EVENT_NEW_DATA_RECEIVED: + _td->received++; + ret = nns_edge_event_parse_new_data (event_h, &data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* Compare received data */ + ret = nns_edge_data_get_count (data_h, &count); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + EXPECT_EQ (count, 2U); + + ret = nns_edge_data_get (data_h, 0, &data, &data_len); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + for (i = 0; i < 10U; i++) + EXPECT_EQ (((unsigned int *) data)[i], i); + + ret = nns_edge_data_get (data_h, 1, &data, &data_len); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + for (i = 0; i < 20U; i++) + EXPECT_EQ (((unsigned int *) data)[i], 20 - i); + + ret = nns_edge_data_destroy (data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + break; + default: + break; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Connect to local host, multiple clients. + */ +TEST(edgeMqtt, connectLocal) +{ + nns_edge_h server_h, client1_h, client2_h; + ne_test_data_s *_td_server, *_td_client1, *_td_client2; + nns_edge_data_h data_h; + nns_size_t data_len; + void *data1, *data2; + unsigned int i, retry; + int ret, port; + char *val; + + if (!_check_mqtt_broker ()) + return; + + _td_server = _get_test_data (true); + _td_client1 = _get_test_data (false); + _td_client2 = _get_test_data (false); + ASSERT_TRUE (_td_server != NULL && _td_client1 != NULL && _td_client2 != NULL); + port = nns_edge_get_available_port (); + + /* Prepare server (127.0.0.1:port) */ + val = nns_edge_strdup_printf ("%d", port); + nns_edge_create_handle ("temp-sender", NNS_EDGE_CONNECT_TYPE_MQTT, + NNS_EDGE_NODE_TYPE_PUB, &server_h); + nns_edge_set_info (server_h, "IP", "127.0.0.1"); + nns_edge_set_info (server_h, "PORT", val); + nns_edge_set_info (server_h, "DEST_IP", "127.0.0.1"); + nns_edge_set_info (server_h, "DEST_PORT", "1883"); + nns_edge_set_info (server_h, "TOPIC", "MQTT_TEST_TOPIC"); + _td_server->handle = server_h; + SAFE_FREE (val); + + /* Prepare client */ + nns_edge_create_handle ("temp-receiver", NNS_EDGE_CONNECT_TYPE_MQTT, + NNS_EDGE_NODE_TYPE_SUB, &client1_h); + nns_edge_set_event_callback (client1_h, _test_edge_event_cb, _td_client1); + nns_edge_set_info (client1_h, "TOPIC", "MQTT_TEST_TOPIC"); + _td_client1->handle = client1_h; + + nns_edge_create_handle ("temp-client2", NNS_EDGE_CONNECT_TYPE_MQTT, + NNS_EDGE_NODE_TYPE_SUB, &client2_h); + nns_edge_set_event_callback (client2_h, _test_edge_event_cb, _td_client2); + nns_edge_set_info (client2_h, "TOPIC", "MQTT_TEST_TOPIC"); + _td_client2->handle = client2_h; + + + ret = nns_edge_start (server_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_start (client1_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_start (client2_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + usleep (200000); + + ret = nns_edge_connect (client1_h, "127.0.0.1", 1883); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + usleep (10000); + ret = nns_edge_connect (client2_h, "127.0.0.1", 1883); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + sleep (2); + + /* Send request to server */ + data_len = 10U * sizeof (unsigned int); + data1 = malloc (data_len); + ASSERT_TRUE (data1 != NULL); + + data2 = malloc (data_len * 2); + ASSERT_TRUE (data2 != NULL); + + for (i = 0; i < 10U; i++) + ((unsigned int *) data1)[i] = i; + + for (i = 0; i < 20U; i++) + ((unsigned int *) data2)[i] = 20 - i; + + ret = nns_edge_data_create (&data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_data_add (data_h, data1, data_len, nns_edge_free); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_data_add (data_h, data2, data_len * 2, nns_edge_free); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + for (i = 0; i < 5U; i++) { + ret = nns_edge_send (server_h, data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + usleep (10000); + } + + ret = nns_edge_data_destroy (data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* Wait for responding data (20 seconds) */ + retry = 0U; + do { + usleep (100000); + if (_td_client1->received > 0 && _td_client2->received > 0) + break; + } while (retry++ < 200U); + + ret = nns_edge_disconnect (server_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (server_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_release_handle (client1_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_release_handle (client2_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + EXPECT_TRUE (_td_client1->received > 0); + EXPECT_TRUE (_td_client2->received > 0); + + _free_test_data (_td_server); + _free_test_data (_td_client1); + _free_test_data (_td_client2); +} + +/** + * @brief Check connection with invalid param. + */ +TEST(edgeMqtt, checkConnectionInvalidParam_n) +{ + int ret = -1; + + if (!_check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_is_connected (NULL); + EXPECT_NE (ret, true); +} + +/** + * @brief Main gtest + */ +int +main (int argc, char **argv) +{ + int result = -1; + + try { + testing::InitGoogleTest (&argc, argv); + } catch (...) { + nns_edge_loge ("Catch exception, failed to init google test."); + } + + try { + result = RUN_ALL_TESTS (); + } catch (...) { + nns_edge_loge ("Catch exception, failed to run the unittest."); + } + + return result; +} diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index e6b564a..9bd8cdc 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -3657,463 +3657,6 @@ TEST(edgeQueue, waitPopInvalidParam03_n) EXPECT_TRUE (nns_edge_queue_destroy (queue_h)); } -#if defined(ENABLE_MQTT) -/** - * @brief Edge event callback for test. - */ -static int -_test_edge_hybrid_event_cb (nns_edge_event_h event_h, void *user_data) -{ - ne_test_data_s *_td = (ne_test_data_s *) user_data; - nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN; - nns_edge_data_h data_h; - void *data; - nns_size_t data_len; - char *val; - unsigned int i, count; - int ret; - - if (!_td) { - /* Cannot update event status. */ - return NNS_EDGE_ERROR_NONE; - } - - ret = nns_edge_event_get_type (event_h, &event); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - switch (event) { - case NNS_EDGE_EVENT_NEW_DATA_RECEIVED: - _td->received++; - - ret = nns_edge_event_parse_new_data (event_h, &data_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - /* Compare metadata */ - ret = nns_edge_data_get_info (data_h, "test-key", &val); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - EXPECT_STREQ (val, "test-value"); - SAFE_FREE (val); - - if (_td->is_server) { - /** - * @note This is test code, responding to client. - * Recommend not to call edge API in event callback. - */ - ret = nns_edge_send (_td->handle, data_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - } else { - /* Compare received data */ - ret = nns_edge_data_get_count (data_h, &count); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_data_get (data_h, 0, &data, &data_len); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - EXPECT_EQ (count, 1U); - for (i = 0; i < 10U; i++) - EXPECT_EQ (((unsigned int *) data)[i], i); - } - - ret = nns_edge_data_destroy (data_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - break; - default: - break; - } - - return NNS_EDGE_ERROR_NONE; -} - -/** - * @brief Check whether MQTT broker is running or not. - */ -static bool -_check_mqtt_broker () -{ - int ret = 0; - - ret = system ("ps aux | grep mosquitto | grep -v grep"); - if (0 != ret) { - nns_edge_logw ("MQTT broker is not running. Skip query hybrid test."); - return false; - } - - return true; -} - -/** - * @brief Connect to the local host using the information received from mqtt. - */ -TEST(edgeMqtt, connectLocal) -{ - nns_edge_h server_h, client_h; - ne_test_data_s *_td_server, *_td_client; - nns_edge_data_h data_h; - nns_size_t data_len; - void *data; - unsigned int i, retry; - int ret = 0; - char *val; - - if (!_check_mqtt_broker ()) - return; - - _td_server = _get_test_data (true); - _td_client = _get_test_data (false); - ASSERT_TRUE (_td_server != NULL && _td_client != NULL); - - /* Prepare server (127.0.0.1:port) */ - nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &server_h); - nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server); - nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (server_h, "DEST_PORT", "1883"); - nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic"); - nns_edge_set_info (server_h, "CAPS", "test server"); - nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW"); - _td_server->handle = server_h; - - /* Prepare client */ - nns_edge_create_handle ("temp-client", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &client_h); - nns_edge_set_event_callback (client_h, _test_edge_hybrid_event_cb, _td_client); - nns_edge_set_info (client_h, "CAPS", "test client"); - nns_edge_set_info (client_h, "TOPIC", "temp-mqtt-topic"); - _td_client->handle = client_h; - - ret = nns_edge_start (server_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_start (client_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - usleep (200000); - - ret = nns_edge_connect (client_h, "127.0.0.1", 1883); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - usleep (10000); - - sleep (2); - - /* Send request to server */ - data_len = 10U * sizeof (unsigned int); - data = malloc (data_len); - ASSERT_TRUE (data != NULL); - - for (i = 0; i < 10U; i++) - ((unsigned int *) data)[i] = i; - - ret = nns_edge_data_create (&data_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_data_add (data_h, data, data_len, nns_edge_free); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - nns_edge_get_info (client_h, "client_id", &val); - nns_edge_data_set_info (data_h, "client_id", val); - SAFE_FREE (val); - - ret = nns_edge_data_set_info (data_h, "test-key", "test-value"); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - for (i = 0; i < 5U; i++) { - ret = nns_edge_send (client_h, data_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - usleep (10000); - } - - ret = nns_edge_data_destroy (data_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - /* Wait for responding data (20 seconds) */ - retry = 0U; - do { - usleep (100000); - if (_td_client->received > 0) - break; - } while (retry++ < 200U); - - ret = nns_edge_release_handle (server_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_release_handle (client_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - EXPECT_TRUE (_td_server->received > 0); - EXPECT_TRUE (_td_client->received > 0); - - _free_test_data (_td_server); - _free_test_data (_td_client); -} - -/** - * @brief Connect to the mqtt broker with invalid param. - */ -TEST(edgeMqtt, connectInvalidParam1_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Connect to the mqtt broker with invalid param. - */ -TEST(edgeMqtt, connectInvalidParam2_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Connect to the mqtt broker with invalid param. - */ -TEST(edgeMqtt, connectInvalidParam3_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Connect to the mqtt broker with invalid param. - */ -TEST(edgeMqtt, connectInvalidParam4_n) -{ - int ret = -1; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Connect to the mqtt broker with invalid host address. - */ -TEST(edgeMqtt, connectInvalidParam5_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Connect to the mqtt broker with invalid port number. - */ -TEST(edgeMqtt, connectInvalidParam6_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Close the mqtt handle with invalid param. - */ -TEST(edgeMqtt, closeInvalidParam_n) -{ - int ret = -1; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_close (NULL); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Publish with invalid param. - */ -TEST(edgeMqtt, publishInvalidParam_n) -{ - int ret = -1; - const char *msg = "TEMP_MESSAGE"; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Publish with invalid param. - */ -TEST(edgeMqtt, publishInvalidParam2_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - const char *msg = "TEMP_MESSAGE"; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - /* data is null */ - ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_close (broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Publish with invalid param. - */ -TEST(edgeMqtt, publishInvalidParam3_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - const char *msg = "TEMP_MESSAGE"; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - /* data length is 0 */ - ret = nns_edge_mqtt_publish (broker_h, msg, 0); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_close (broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Subscribe the topic with invalid param. - */ -TEST(edgeMqtt, subscribeInvalidParam_n) -{ - int ret = -1; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_subscribe (NULL); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Get message with invalid param. - */ -TEST(edgeMqtt, getMessageInvalidParam1_n) -{ - int ret = -1; - void *msg = NULL; - nns_size_t msg_len; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Get message with invalid param. - */ -TEST(edgeMqtt, getMessageInvalidParam2_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - nns_size_t msg_len; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_close (broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Get message with invalid param. - */ -TEST(edgeMqtt, getMessageInvalidParam3_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - void *msg = NULL; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_close (broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); -} - -/** - * @brief Get message from empty message queue. - */ -TEST(edgeMqtt, getMessageWithinTimeout_n) -{ - int ret = -1; - nns_edge_broker_h broker_h; - void *msg = NULL; - nns_size_t msg_len; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_close (broker_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); -} -#endif /* ENABLE_MQTT */ - /** * @brief Main gtest */ -- 2.34.1