From cb408fc99a012ab9355177a987c8f05631989006 Mon Sep 17 00:00:00 2001 From: gichan Date: Thu, 11 Aug 2022 15:36:49 +0900 Subject: [PATCH] [Edge] Add mqtt hybrid unittest - Add mqtt hybrid unittest. Signed-off-by: gichan --- CMakeLists.txt | 1 + packaging/nnstreamer-edge.spec | 1 + src/libnnstreamer-edge/nnstreamer-edge-mqtt.c | 6 + tests/unittest_nnstreamer-edge.cc | 452 ++++++++++++++++++ 4 files changed, 460 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index ae8891e..8e8c221 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,7 @@ SET(CMAKE_CXX_STANDARD 14) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Werror") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${EXTRA_CFLAGS} -pthread -fPIE -fPIC -g") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_MQTT=1") +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_MQTT=1") IF (ENABLE_DEBUG) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DDEBUG=1") diff --git a/packaging/nnstreamer-edge.spec b/packaging/nnstreamer-edge.spec index 61830ad..5586cee 100644 --- a/packaging/nnstreamer-edge.spec +++ b/packaging/nnstreamer-edge.spec @@ -21,6 +21,7 @@ BuildRequires: glib2-devel %if 0%{?unit_test} BuildRequires: gtest-devel +BuildRequires: procps %if 0%{?testcoverage} BuildRequires: lcov diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c index 1403061..5991c73 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c @@ -268,6 +268,7 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) g_mutex_unlock (&bh->mqtt_mutex); nns_edge_loge ("Failed to connect to MQTT broker." "Please check broker is running status or broker host address."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; goto error; } } @@ -475,6 +476,11 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) return NNS_EDGE_ERROR_INVALID_PARAMETER; } + if (!msg) { + nns_edge_loge ("Invalid param, given msg param is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + bh = (nns_edge_broker_s *) eh->broker_h; *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT); diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index 02f7a37..025670b 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -2815,6 +2815,458 @@ TEST(edgeMeta, deserializeInvalidParam03_n) free (data); } +/** + * @brief Edge event callback for test. + */ +static int +_test_edge_hybrid_event_cb (nns_edge_event_h event_h, void *user_data) +{ + ne_test_data_s *_td = (ne_test_data_s *) user_data; + nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN; + nns_edge_data_h data_h; + void *data; + size_t data_len; + char *val; + unsigned int i, count; + int ret; + + if (!_td) { + /* Cannot update event status. */ + return NNS_EDGE_ERROR_NONE; + } + + ret = nns_edge_event_get_type (event_h, &event); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + switch (event) { + case NNS_EDGE_EVENT_NEW_DATA_RECEIVED: + _td->received++; + + ret = nns_edge_event_parse_new_data (event_h, &data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* Compare metadata */ + ret = nns_edge_data_get_info (data_h, "test-key", &val); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + EXPECT_STREQ (val, "test-value"); + nns_edge_free (val); + + if (_td->is_server) { + /** + * @note This is test code, responding to client. + * Recommend not to call edge API in event callback. + */ + ret = nns_edge_publish (_td->handle, data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + } else { + /* Compare received data */ + ret = nns_edge_data_get_count (data_h, &count); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_data_get (data_h, 0, &data, &data_len); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + EXPECT_EQ (count, 1U); + for (i = 0; i < 10U; i++) + EXPECT_EQ (((unsigned int *) data)[i], i); + } + + ret = nns_edge_data_destroy (data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + break; + default: + break; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Check whether MQTT broker is running or not. + */ +static int +_check_mqtt_broker () +{ + int ret = 0; + + ret = system ("ps aux | grep mosquitto | grep -v grep"); + if (0 != ret) { + nns_edge_loge ("MQTT broker is not running. Skip query hybrid test."); + ret = -1; + } + + return ret; +} + +/** + * @brief Connect to the local host using the information received from mqtt. + */ +TEST(edgeMqtt, connectLocal) +{ + nns_edge_h server_h, client_h; + ne_test_data_s *_td_server, *_td_client; + nns_edge_data_h data_h; + pthread_t server_thread, client_thread; + pthread_attr_t attr; + size_t data_len; + void *data; + unsigned int i, retry; + int ret = 0; + char *val; + + if (0 != _check_mqtt_broker ()) + return; + + _td_server = _get_test_data (true); + _td_client = _get_test_data (false); + + /* Prepare server (127.0.0.1:port) */ + nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &server_h); + nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server); + nns_edge_set_info (server_h, "HOST", "localhost"); + nns_edge_set_info (server_h, "PORT", "0"); + nns_edge_set_info (server_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (server_h, "DEST_PORT", "1883"); + nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic"); + nns_edge_set_info (server_h, "CAPS", "test server"); + _td_server->handle = server_h; + + /* Prepare client */ + nns_edge_create_handle ("temp-client", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND, &client_h); + nns_edge_set_event_callback (client_h, _test_edge_hybrid_event_cb, _td_client); + nns_edge_set_info (client_h, "CAPS", "test client"); + nns_edge_set_info (client_h, "HOST", "localhost"); + nns_edge_set_info (client_h, "port", "0"); + nns_edge_set_info (client_h, "TOPIC", "temp-mqtt-topic"); + _td_client->handle = client_h; + + /* Start server/client thread */ + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + pthread_create (&server_thread, &attr, _test_edge_thread, _td_server); + pthread_create (&client_thread, &attr, _test_edge_thread, _td_client); + pthread_attr_destroy (&attr); + + /* Wait for server/client thread */ + do { + usleep (20000); + } while (!g_main_loop_is_running (_td_server->loop)); + + do { + usleep (20000); + } while (!g_main_loop_is_running (_td_client->loop)); + + ret = nns_edge_connect (client_h, "tcp://localhost", 1883); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + usleep (10000); + + sleep (2); + + /* Send request to server */ + data_len = 10U * sizeof (unsigned int); + data = malloc (data_len); + ASSERT_TRUE (data != NULL); + + for (i = 0; i < 10U; i++) + ((unsigned int *) data)[i] = i; + + ret = nns_edge_data_create (&data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_data_add (data_h, data, data_len, nns_edge_free); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + nns_edge_get_info (client_h, "client_id", &val); + nns_edge_data_set_info (data_h, "client_id", val); + g_free (val); + + ret = nns_edge_data_set_info (data_h, "test-key", "test-value"); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + for (i = 0; i < 5U; i++) { + ret = nns_edge_publish (client_h, data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + usleep (10000); + } + + ret = nns_edge_data_destroy (data_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + /* Wait for responding data (20 seconds) */ + retry = 0U; + do { + usleep (100000); + if (_td_client->received > 0) + break; + } while (retry++ < 200U); + + g_main_loop_quit (_td_server->loop); + g_main_loop_quit (_td_client->loop); + + ret = nns_edge_release_handle (server_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_release_handle (client_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + EXPECT_TRUE (_td_server->received > 0); + EXPECT_TRUE (_td_client->received > 0); + + _free_test_data (_td_server); + _free_test_data (_td_client); +} + + +/** + * @brief Connect to the mqtt broker with invalid param. + */ +TEST(edgeMqtt, connectInvalidParam_n) +{ + int ret = -1; + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_connect (NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Connect to the mqtt broker with invalid hostaddress. + */ +TEST(edgeMqtt, connectInvalidParam2_n) +{ + int ret = -1; + nns_edge_h edge_h; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + nns_edge_set_info (edge_h, "DEST_HOST", "tcp://none"); + nns_edge_set_info (edge_h, "DEST_PORT", "1883"); + + ret = nns_edge_mqtt_connect (edge_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Close the mqtt handle with invalid param. + */ +TEST(edgeMqtt, closeInvalidParam_n) +{ + int ret = -1; + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_close (NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Close the mqtt handle before the connection. + */ +TEST(edgeMqtt, closeInvalidParam2_n) +{ + int ret = -1; + nns_edge_h edge_h; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (edge_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Publish with invalid param. + */ +TEST(edgeMqtt, publishInvalidParam_n) +{ + int ret = -1; + const char* msg = "TEMP_MESSAGE"; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Publish with invalid param. + */ +TEST(edgeMqtt, publishInvalidParam2_n) +{ + int ret = -1; + nns_edge_h edge_h; + const char* msg = "TEMP_MESSAGE"; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_publish (edge_h, NULL, strlen (msg) + 1); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + + +/** + * @brief Publish with invalid param. + */ +TEST(edgeMqtt, publishInvalidParam3_n) +{ + int ret = -1; + nns_edge_h edge_h; + const char* msg = "TEMP_MESSAGE"; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_publish (edge_h, msg, 0); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Publish the message without the connection. + */ +TEST(edgeMqtt, publishInvalidParam4_n) +{ + int ret = -1; + nns_edge_h edge_h; + const char* msg = "TEMP_MESSAGE"; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_publish (edge_h, msg, strlen (msg) + 1); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Subscribe the topic with invalid param. + */ +TEST(edgeMqtt, subscribeInvalidParam_n) +{ + int ret = -1; + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_subscribe (NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Subscribe the topic before the connection. + */ +TEST(edgeMqtt, subscribeInvalidParam2_n) +{ + int ret = -1; + nns_edge_h edge_h; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_subscribe (edge_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message with invalid param. + */ +TEST(edgeMqtt, getMessageInvalidParam_n) +{ + int ret = -1; + char *msg = NULL; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_mqtt_get_message (NULL, &msg); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message with invalid param. + */ +TEST(edgeMqtt, getMessageInvalidParam2_n) +{ + int ret = -1; + nns_edge_h edge_h; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_get_message (edge_h, NULL); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + +/** + * @brief Get message from empty message queue. + */ +TEST(edgeMqtt, getMessageWithinTimeout_n) +{ + int ret = -1; + nns_edge_h edge_h; + char *msg = NULL; + + if (0 != _check_mqtt_broker ()) + return; + + ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_FLAG_RECV | NNS_EDGE_FLAG_SEND | NNS_EDGE_FLAG_SERVER, &edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_PORT", "1883"); + + ret = nns_edge_mqtt_connect (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_get_message (edge_h, &msg); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_close (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_release_handle (edge_h); + EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); +} + /** * @brief Main gtest */ -- 2.34.1