- Support MQTT direct data transmission.
- Add unit test for mqtt
Signed-off-by: gichan <gichan2.jang@samsung.com>
LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge-aitt
%endif
+%if 0%{?mqtt_support}
+LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_nnstreamer-edge-mqtt
+%endif
+
%if 0%{?testcoverage}
# 'lcov' generates the date format with UTC time zone by default. Let's replace UTC with KST.
# If you can get a root privilege, run ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime
%{_bindir}/unittest_nnstreamer-edge-aitt
%endif
+%if 0%{?mqtt_support}
+%{_bindir}/unittest_nnstreamer-edge-mqtt
+%endif
+
%if 0%{?testcoverage}
%files unittest-coverage
%{_datadir}/nnstreamer-edge/unittest/*
if (NNS_EDGE_ERROR_NONE != ret)
nns_edge_loge ("Failed to send data via AITT connection.");
break;
+ case NNS_EDGE_CONNECT_TYPE_MQTT:
+ ret = nns_edge_mqtt_publish_data (eh->broker_h, data_h);
+ if (NNS_EDGE_ERROR_NONE != ret)
+ nns_edge_loge ("Failed to send data via MQTT connection.");
+ break;
default:
break;
}
if ((NNS_EDGE_NODE_TYPE_QUERY_SERVER == eh->node_type)
|| (NNS_EDGE_NODE_TYPE_PUB == eh->node_type)) {
- if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
- char *topic, *msg;
+ if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
+ || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
+ char *topic;
/** @todo Set unique device name.
* Device name should be unique. Consider using MAC address later.
goto done;
}
- msg = nns_edge_get_host_string (eh->host, eh->port);
+ if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
+ char *msg;
+ msg = nns_edge_get_host_string (eh->host, eh->port);
- ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
- SAFE_FREE (msg);
+ ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
+ SAFE_FREE (msg);
- if (NNS_EDGE_ERROR_NONE != ret) {
- nns_edge_loge ("Failed to publish the meesage to broker.");
- goto done;
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to publish the meesage to broker.");
+ goto done;
+ }
+ } else {
+ ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
+ eh->user_data);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to set event callback to MQTT broker.");
+ goto done;
+ }
}
} else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
ret = nns_edge_aitt_connect (eh->id, eh->topic, eh->dest_host,
switch (eh->connect_type) {
case NNS_EDGE_CONNECT_TYPE_HYBRID:
+ case NNS_EDGE_CONNECT_TYPE_MQTT:
if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) {
nns_edge_logw ("Failed to close mqtt connection.");
}
return NNS_EDGE_ERROR_NONE;
}
+/**
+ * @brief Parse the message received from the MQTT broker and connect to the server directly.
+ */
+static int
+_mqtt_hybrid_direct_connection (nns_edge_handle_s * eh)
+{
+ int ret;
+
+ do {
+ char *msg = NULL;
+ char *server_ip = NULL;
+ int server_port = 0;
+ nns_size_t msg_len = 0;
+
+ ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len);
+ if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
+ return ret;
+
+ nns_edge_parse_host_string (msg, &server_ip, &server_port);
+ SAFE_FREE (msg);
+
+ nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
+ server_port);
+
+ ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
+ SAFE_FREE (server_ip);
+
+ if (NNS_EDGE_ERROR_NONE == ret) {
+ return ret;
+ }
+ } while (TRUE);
+
+ return ret;
+}
+
+/**
+ * @brief Start subsciption to MQTT message
+ */
+static int
+_nns_edge_start_mqtt_sub (nns_edge_handle_s * eh)
+{
+ char *topic;
+ int ret;
+
+ if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
+ topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
+
+ ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
+ &eh->broker_h);
+ SAFE_FREE (topic);
+
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ }
+
+ ret = nns_edge_mqtt_subscribe (eh->broker_h);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
+ return ret;
+ }
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
/**
* @brief Connect to the destination node.
*/
eh->dest_host = nns_edge_strdup (dest_host);
eh->dest_port = dest_port;
- if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
- char *topic;
-
- if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
- topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
-
- ret = nns_edge_mqtt_connect (eh->id, topic, dest_host, dest_port,
- &eh->broker_h);
- SAFE_FREE (topic);
-
- if (NNS_EDGE_ERROR_NONE != ret) {
- nns_edge_loge ("Connection failure to broker.");
- goto done;
- }
+ if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type
+ || NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type) {
+ ret = _nns_edge_start_mqtt_sub (eh);
+ if (NNS_EDGE_ERROR_NONE != ret)
+ goto done;
- ret = nns_edge_mqtt_subscribe (eh->broker_h);
+ if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
+ ret = _mqtt_hybrid_direct_connection (eh);
+ } else {
+ ret = nns_edge_mqtt_set_event_callback (eh->broker_h, eh->event_cb,
+ eh->user_data);
if (NNS_EDGE_ERROR_NONE != ret) {
- nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
- goto done;
+ nns_edge_loge ("Failed to set event callback to MQTT broker.");
+ return ret;
}
}
-
- do {
- char *msg = NULL;
- char *server_ip = NULL;
- int server_port = 0;
- nns_size_t msg_len = 0;
-
- ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len);
- if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
- break;
-
- nns_edge_parse_host_string (msg, &server_ip, &server_port);
- SAFE_FREE (msg);
-
- nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
- server_port);
-
- ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
- SAFE_FREE (server_ip);
-
- if (NNS_EDGE_ERROR_NONE == ret) {
- break;
- }
- } while (TRUE);
} else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
ret = nns_edge_aitt_connect (eh->id, eh->topic, dest_host, dest_port,
&eh->broker_h);
NNS_EDGE_ERROR_NONE == nns_edge_aitt_is_connected (eh->broker_h))
return true;
+ if (NNS_EDGE_CONNECT_TYPE_MQTT == eh->connect_type &&
+ nns_edge_mqtt_is_connected (eh->broker_h))
+ return true;
+
conn_data = (nns_edge_conn_data_s *) eh->connections;
while (conn_data) {
conn = conn_data->sink_conn;
#include "nnstreamer-edge-log.h"
#include "nnstreamer-edge-util.h"
#include "nnstreamer-edge-queue.h"
+#include "nnstreamer-edge-data.h"
+#include "nnstreamer-edge-event.h"
/**
* @brief Data structure for mqtt broker handle.
char *host;
int port;
bool connected;
+
+ /* event callback for new message */
+ nns_edge_event_cb event_cb;
+ void *user_data;
} nns_edge_broker_s;
/**
nns_edge_broker_s *bh = (nns_edge_broker_s *) data;
char *msg = NULL;
nns_size_t msg_len;
+ int ret;
if (!bh) {
nns_edge_loge ("Invalid param, given broker handle is invalid.");
msg_len = (nns_size_t) message->payloadlen;
msg = nns_edge_memdup (message->payload, msg_len);
- if (msg)
- nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+
+ if (msg) {
+ if (bh->event_cb) {
+ nns_edge_data_h data_h;
+
+ if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to create data handle in msg thread.");
+ return;
+ }
+
+ nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len);
+
+ ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data,
+ NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
+ NULL);
+ if (ret != NNS_EDGE_ERROR_NONE)
+ nns_edge_loge ("Failed to send an event for received message.");
+
+ nns_edge_data_destroy (data_h);
+ SAFE_FREE (msg);
+ return;
+ } else {
+ nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+ }
+ }
return;
}
bh->host = nns_edge_strdup (host);
bh->port = port;
bh->connected = true;
+ bh->event_cb = NULL;
+ bh->user_data = NULL;
*broker_h = bh;
return NNS_EDGE_ERROR_NONE;
return NNS_EDGE_ERROR_NONE;
}
+/**
+ * @brief Internal util function to send edge-data via MQTT connection.
+ */
+int
+nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h)
+{
+ int ret;
+ void *data = NULL;
+ nns_size_t size;
+
+ ret = nns_edge_data_serialize (data_h, &data, &size);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to serialize the edge data.");
+ return ret;
+ }
+
+ ret = nns_edge_mqtt_publish (handle, data, size);
+ if (NNS_EDGE_ERROR_NONE != ret)
+ nns_edge_loge ("Failed to send data to destination.");
+
+ SAFE_FREE (data);
+ return ret;
+}
+
/**
* @brief Publish raw data.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
return bh->connected;
}
+
+/**
+ * @brief Set event callback for new message.
+ */
+int
+nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h,
+ nns_edge_event_cb cb, void *user_data)
+{
+ nns_edge_broker_s *bh;
+
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given MQTT handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ bh = (nns_edge_broker_s *) broker_h;
+
+ bh->event_cb = cb;
+ bh->user_data = user_data;
+
+ return NNS_EDGE_ERROR_NONE;
+}
#include "nnstreamer-edge-log.h"
#include "nnstreamer-edge-util.h"
#include "nnstreamer-edge-queue.h"
+#include "nnstreamer-edge-data.h"
+#include "nnstreamer-edge-event.h"
/**
* @brief Data structure for mqtt broker handle.
char *topic;
char *host;
int port;
+
+ /* event callback for new message */
+ nns_edge_event_cb event_cb;
+ void *user_data;
} nns_edge_broker_s;
/**
msg_len = (nns_size_t) message->payloadlen;
msg = nns_edge_memdup (message->payload, msg_len);
- if (msg)
- nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+
+ if (msg) {
+ if (bh->event_cb) {
+ nns_edge_data_h data_h;
+
+ if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to create data handle in msg thread.");
+ return;
+ }
+
+ nns_edge_data_deserialize (data_h, (void *) msg, (nns_size_t) msg_len);
+
+ ret = nns_edge_event_invoke_callback (bh->event_cb, bh->user_data,
+ NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, sizeof (nns_edge_data_h),
+ NULL);
+ if (ret != NNS_EDGE_ERROR_NONE)
+ nns_edge_loge ("Failed to send an event for received message.");
+
+ nns_edge_data_destroy (data_h);
+ SAFE_FREE (msg);
+ return;
+ } else {
+ nns_edge_queue_push (bh->message_queue, msg, msg_len, nns_edge_free);
+ }
+ }
return TRUE;
}
bh->host = nns_edge_strdup (host);
bh->port = port;
bh->mqtt_h = handle;
+ bh->event_cb = NULL;
+ bh->user_data = NULL;
nns_edge_queue_create (&bh->message_queue);
MQTTAsync_setCallbacks (handle, bh, NULL, mqtt_cb_message_arrived, NULL);
return NNS_EDGE_ERROR_NONE;
}
+/**
+ * @brief Internal util function to send edge-data via MQTT connection.
+ */
+int
+nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h)
+{
+ int ret;
+ void *data = NULL;
+ nns_size_t size;
+
+ ret = nns_edge_data_serialize (data_h, &data, &size);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to serialize the edge data.");
+ return ret;
+ }
+
+ ret = nns_edge_mqtt_publish (handle, data, size);
+ if (NNS_EDGE_ERROR_NONE != ret)
+ nns_edge_loge ("Failed to send data to destination.");
+
+ SAFE_FREE (data);
+ return ret;
+}
+
/**
* @brief Publish raw data.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
return NNS_EDGE_ERROR_NONE;
}
+
+/**
+ * @brief Set event callback for new message.
+ */
+int
+nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h,
+ nns_edge_event_cb cb, void *user_data)
+{
+ nns_edge_broker_s *bh;
+
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given MQTT handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ bh = (nns_edge_broker_s *) broker_h;
+
+ bh->event_cb = cb;
+ bh->user_data = user_data;
+
+ return NNS_EDGE_ERROR_NONE;
+}
* @brief Get message from mqtt broker. If no message in the queue, it waits up to 1 second for new message.
*/
int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t *msg_len);
+
+/**
+ * @brief Internal util function to send edge-data via MQTT connection.
+ */
+int nns_edge_mqtt_publish_data (nns_edge_broker_h handle, nns_edge_data_h data_h);
+
+/**
+ * @brief Set event callback for new message.
+ */
+int nns_edge_mqtt_set_event_callback (nns_edge_broker_h broker_h, nns_edge_event_cb cb, void *user_data);
+
#else
/**
* @todo consider to change code style later.
#define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
#define nns_edge_mqtt_is_connected(...) (false)
#define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_publish_data(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_mqtt_set_event_callback(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
#endif /* ENABLE_MQTT */
#ifdef __cplusplus
TARGET_LINK_LIBRARIES(unittest_nnstreamer-edge-aitt ${TEST_REQUIRE_PKGS_LDFLAGS} nnstreamer-edge)
INSTALL (TARGETS unittest_nnstreamer-edge-aitt DESTINATION ${BIN_INSTALL_DIR})
ENDIF()
+
+# MQTT test
+IF(MQTT_SUPPORT)
+ADD_EXECUTABLE(unittest_nnstreamer-edge-mqtt unittest_nnstreamer-edge-mqtt.cc)
+TARGET_INCLUDE_DIRECTORIES(unittest_nnstreamer-edge-mqtt PRIVATE ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS} ${INCLUDE_DIR} ${NNS_EDGE_SRC_DIR})
+TARGET_LINK_LIBRARIES(unittest_nnstreamer-edge-mqtt ${TEST_REQUIRE_PKGS_LDFLAGS} nnstreamer-edge)
+INSTALL (TARGETS unittest_nnstreamer-edge-mqtt DESTINATION ${BIN_INSTALL_DIR})
+ENDIF()
--- /dev/null
+/**
+ * @file unittest_nnstreamer-edge-mqtt.cc
+ * @date 15 Mar 2023
+ * @brief Unittest for nnstreamer-edge MQTT direct data transmission.
+ * @see https://github.com/nnstreamer/nnstreamer-edge
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug No known bugs
+ */
+
+#include <gtest/gtest.h>
+#include "nnstreamer-edge.h"
+#include "nnstreamer-edge-mqtt.h"
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-util.h"
+
+/**
+ * @brief Data struct for unittest.
+ */
+typedef struct
+{
+ nns_edge_h handle;
+ bool running;
+ bool is_server;
+ bool event_cb_released;
+ unsigned int received;
+} ne_test_data_s;
+
+/**
+ * @brief Allocate and initialize test data.
+ */
+static ne_test_data_s *
+_get_test_data (bool is_server)
+{
+ ne_test_data_s *_td;
+
+ _td = (ne_test_data_s *) calloc (1, sizeof (ne_test_data_s));
+
+ if (_td) {
+ _td->is_server = is_server;
+ }
+
+ return _td;
+}
+
+/**
+ * @brief Release test data.
+ */
+static void
+_free_test_data (ne_test_data_s *_td)
+{
+ if (!_td)
+ return;
+
+ SAFE_FREE (_td);
+}
+
+/**
+ * @brief Edge event callback for test.
+ */
+static int
+_test_edge_hybrid_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+ ne_test_data_s *_td = (ne_test_data_s *) user_data;
+ nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN;
+ nns_edge_data_h data_h;
+ void *data;
+ nns_size_t data_len;
+ char *val;
+ unsigned int i, count;
+ int ret;
+
+ if (!_td) {
+ /* Cannot update event status. */
+ return NNS_EDGE_ERROR_NONE;
+ }
+
+ ret = nns_edge_event_get_type (event_h, &event);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ switch (event) {
+ case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+ _td->received++;
+
+ ret = nns_edge_event_parse_new_data (event_h, &data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ /* Compare metadata */
+ ret = nns_edge_data_get_info (data_h, "test-key", &val);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ EXPECT_STREQ (val, "test-value");
+ SAFE_FREE (val);
+
+ if (_td->is_server) {
+ /**
+ * @note This is test code, responding to client.
+ * Recommend not to call edge API in event callback.
+ */
+ ret = nns_edge_send (_td->handle, data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ } else {
+ /* Compare received data */
+ ret = nns_edge_data_get_count (data_h, &count);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_data_get (data_h, 0, &data, &data_len);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ EXPECT_EQ (count, 1U);
+ for (i = 0; i < 10U; i++)
+ EXPECT_EQ (((unsigned int *) data)[i], i);
+ }
+
+ ret = nns_edge_data_destroy (data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ break;
+ default:
+ break;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Check whether MQTT broker is running or not.
+ */
+static bool
+_check_mqtt_broker ()
+{
+ int ret = 0;
+
+ ret = system ("ps aux | grep mosquitto | grep -v grep");
+ if (0 != ret) {
+ nns_edge_logw ("MQTT broker is not running. Skip query hybrid test.");
+ return false;
+ }
+
+ return true;
+}
+
+/**
+ * @brief Connect to the local host using the information received from mqtt.
+ */
+TEST(edgeMqttHybrid, connectLocal)
+{
+ nns_edge_h server_h, client_h;
+ ne_test_data_s *_td_server, *_td_client;
+ nns_edge_data_h data_h;
+ nns_size_t data_len;
+ void *data;
+ unsigned int i, retry;
+ int ret = 0;
+ char *val;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ _td_server = _get_test_data (true);
+ _td_client = _get_test_data (false);
+ ASSERT_TRUE (_td_server != NULL && _td_client != NULL);
+
+ /* Prepare server (127.0.0.1:port) */
+ nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
+ NNS_EDGE_NODE_TYPE_QUERY_SERVER, &server_h);
+ nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server);
+ nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1");
+ nns_edge_set_info (server_h, "DEST_PORT", "1883");
+ nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic");
+ nns_edge_set_info (server_h, "CAPS", "test server");
+ nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW");
+ _td_server->handle = server_h;
+
+ /* Prepare client */
+ nns_edge_create_handle ("temp-client", NNS_EDGE_CONNECT_TYPE_HYBRID,
+ NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &client_h);
+ nns_edge_set_event_callback (client_h, _test_edge_hybrid_event_cb, _td_client);
+ nns_edge_set_info (client_h, "CAPS", "test client");
+ nns_edge_set_info (client_h, "TOPIC", "temp-mqtt-topic");
+ _td_client->handle = client_h;
+
+ ret = nns_edge_start (server_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_start (client_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ usleep (200000);
+
+ ret = nns_edge_connect (client_h, "127.0.0.1", 1883);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ usleep (10000);
+
+ sleep (2);
+
+ /* Send request to server */
+ data_len = 10U * sizeof (unsigned int);
+ data = malloc (data_len);
+ ASSERT_TRUE (data != NULL);
+
+ for (i = 0; i < 10U; i++)
+ ((unsigned int *) data)[i] = i;
+
+ ret = nns_edge_data_create (&data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_data_add (data_h, data, data_len, nns_edge_free);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ nns_edge_get_info (client_h, "client_id", &val);
+ nns_edge_data_set_info (data_h, "client_id", val);
+ SAFE_FREE (val);
+
+ ret = nns_edge_data_set_info (data_h, "test-key", "test-value");
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ for (i = 0; i < 5U; i++) {
+ ret = nns_edge_send (client_h, data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ usleep (10000);
+ }
+
+ ret = nns_edge_data_destroy (data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ /* Wait for responding data (20 seconds) */
+ retry = 0U;
+ do {
+ usleep (100000);
+ if (_td_client->received > 0)
+ break;
+ } while (retry++ < 200U);
+
+ ret = nns_edge_release_handle (server_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_release_handle (client_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ EXPECT_TRUE (_td_server->received > 0);
+ EXPECT_TRUE (_td_client->received > 0);
+
+ _free_test_data (_td_server);
+ _free_test_data (_td_client);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam1_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam2_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam3_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid param.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam4_n)
+{
+ int ret = -1;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid host address.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam5_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Connect to the mqtt broker with invalid port number.
+ */
+TEST(edgeMqttHybrid, connectInvalidParam6_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Close the mqtt handle with invalid param.
+ */
+TEST(edgeMqttHybrid, closeInvalidParam_n)
+{
+ int ret = -1;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_close (NULL);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Publish with invalid param.
+ */
+TEST(edgeMqttHybrid, publishInvalidParam_n)
+{
+ int ret = -1;
+ const char *msg = "TEMP_MESSAGE";
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Publish with invalid param.
+ */
+TEST(edgeMqttHybrid, publishInvalidParam2_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+ const char *msg = "TEMP_MESSAGE";
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ /* data is null */
+ ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_close (broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Publish with invalid param.
+ */
+TEST(edgeMqttHybrid, publishInvalidParam3_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+ const char *msg = "TEMP_MESSAGE";
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ /* data length is 0 */
+ ret = nns_edge_mqtt_publish (broker_h, msg, 0);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_close (broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Subscribe the topic with invalid param.
+ */
+TEST(edgeMqttHybrid, subscribeInvalidParam_n)
+{
+ int ret = -1;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_subscribe (NULL);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqttHybrid, getMessageInvalidParam1_n)
+{
+ int ret = -1;
+ void *msg = NULL;
+ nns_size_t msg_len;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqttHybrid, getMessageInvalidParam2_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+ nns_size_t msg_len;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_close (broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message with invalid param.
+ */
+TEST(edgeMqttHybrid, getMessageInvalidParam3_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+ void *msg = NULL;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_close (broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Get message from empty message queue.
+ */
+TEST(edgeMqttHybrid, getMessageWithinTimeout_n)
+{
+ int ret = -1;
+ nns_edge_broker_h broker_h;
+ void *msg = NULL;
+ nns_size_t msg_len;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_close (broker_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+}
+
+/**
+ * @brief Edge event callback for test MQTT data transmission.
+ */
+static int
+_test_edge_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+ ne_test_data_s *_td = (ne_test_data_s *) user_data;
+ nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN;
+ nns_edge_data_h data_h;
+ void *data;
+ nns_size_t data_len;
+ unsigned int i, count;
+ int ret;
+
+ if (!_td) {
+ /* Cannot update event status. */
+ return NNS_EDGE_ERROR_NONE;
+ }
+
+ ret = nns_edge_event_get_type (event_h, &event);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ switch (event) {
+ case NNS_EDGE_EVENT_CALLBACK_RELEASED:
+ _td->event_cb_released = true;
+ break;
+ case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+ _td->received++;
+ ret = nns_edge_event_parse_new_data (event_h, &data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ /* Compare received data */
+ ret = nns_edge_data_get_count (data_h, &count);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ EXPECT_EQ (count, 2U);
+
+ ret = nns_edge_data_get (data_h, 0, &data, &data_len);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ for (i = 0; i < 10U; i++)
+ EXPECT_EQ (((unsigned int *) data)[i], i);
+
+ ret = nns_edge_data_get (data_h, 1, &data, &data_len);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ for (i = 0; i < 20U; i++)
+ EXPECT_EQ (((unsigned int *) data)[i], 20 - i);
+
+ ret = nns_edge_data_destroy (data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ break;
+ default:
+ break;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Connect to local host, multiple clients.
+ */
+TEST(edgeMqtt, connectLocal)
+{
+ nns_edge_h server_h, client1_h, client2_h;
+ ne_test_data_s *_td_server, *_td_client1, *_td_client2;
+ nns_edge_data_h data_h;
+ nns_size_t data_len;
+ void *data1, *data2;
+ unsigned int i, retry;
+ int ret, port;
+ char *val;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ _td_server = _get_test_data (true);
+ _td_client1 = _get_test_data (false);
+ _td_client2 = _get_test_data (false);
+ ASSERT_TRUE (_td_server != NULL && _td_client1 != NULL && _td_client2 != NULL);
+ port = nns_edge_get_available_port ();
+
+ /* Prepare server (127.0.0.1:port) */
+ val = nns_edge_strdup_printf ("%d", port);
+ nns_edge_create_handle ("temp-sender", NNS_EDGE_CONNECT_TYPE_MQTT,
+ NNS_EDGE_NODE_TYPE_PUB, &server_h);
+ nns_edge_set_info (server_h, "IP", "127.0.0.1");
+ nns_edge_set_info (server_h, "PORT", val);
+ nns_edge_set_info (server_h, "DEST_IP", "127.0.0.1");
+ nns_edge_set_info (server_h, "DEST_PORT", "1883");
+ nns_edge_set_info (server_h, "TOPIC", "MQTT_TEST_TOPIC");
+ _td_server->handle = server_h;
+ SAFE_FREE (val);
+
+ /* Prepare client */
+ nns_edge_create_handle ("temp-receiver", NNS_EDGE_CONNECT_TYPE_MQTT,
+ NNS_EDGE_NODE_TYPE_SUB, &client1_h);
+ nns_edge_set_event_callback (client1_h, _test_edge_event_cb, _td_client1);
+ nns_edge_set_info (client1_h, "TOPIC", "MQTT_TEST_TOPIC");
+ _td_client1->handle = client1_h;
+
+ nns_edge_create_handle ("temp-client2", NNS_EDGE_CONNECT_TYPE_MQTT,
+ NNS_EDGE_NODE_TYPE_SUB, &client2_h);
+ nns_edge_set_event_callback (client2_h, _test_edge_event_cb, _td_client2);
+ nns_edge_set_info (client2_h, "TOPIC", "MQTT_TEST_TOPIC");
+ _td_client2->handle = client2_h;
+
+
+ ret = nns_edge_start (server_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_start (client1_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_start (client2_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ usleep (200000);
+
+ ret = nns_edge_connect (client1_h, "127.0.0.1", 1883);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ usleep (10000);
+ ret = nns_edge_connect (client2_h, "127.0.0.1", 1883);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ sleep (2);
+
+ /* Send request to server */
+ data_len = 10U * sizeof (unsigned int);
+ data1 = malloc (data_len);
+ ASSERT_TRUE (data1 != NULL);
+
+ data2 = malloc (data_len * 2);
+ ASSERT_TRUE (data2 != NULL);
+
+ for (i = 0; i < 10U; i++)
+ ((unsigned int *) data1)[i] = i;
+
+ for (i = 0; i < 20U; i++)
+ ((unsigned int *) data2)[i] = 20 - i;
+
+ ret = nns_edge_data_create (&data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_data_add (data_h, data1, data_len, nns_edge_free);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_data_add (data_h, data2, data_len * 2, nns_edge_free);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ for (i = 0; i < 5U; i++) {
+ ret = nns_edge_send (server_h, data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ usleep (10000);
+ }
+
+ ret = nns_edge_data_destroy (data_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ /* Wait for responding data (20 seconds) */
+ retry = 0U;
+ do {
+ usleep (100000);
+ if (_td_client1->received > 0 && _td_client2->received > 0)
+ break;
+ } while (retry++ < 200U);
+
+ ret = nns_edge_disconnect (server_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_release_handle (server_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_release_handle (client1_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_release_handle (client2_h);
+ EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+
+ EXPECT_TRUE (_td_client1->received > 0);
+ EXPECT_TRUE (_td_client2->received > 0);
+
+ _free_test_data (_td_server);
+ _free_test_data (_td_client1);
+ _free_test_data (_td_client2);
+}
+
+/**
+ * @brief Check connection with invalid param.
+ */
+TEST(edgeMqtt, checkConnectionInvalidParam_n)
+{
+ int ret = -1;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ ret = nns_edge_mqtt_is_connected (NULL);
+ EXPECT_NE (ret, true);
+}
+
+/**
+ * @brief Main gtest
+ */
+int
+main (int argc, char **argv)
+{
+ int result = -1;
+
+ try {
+ testing::InitGoogleTest (&argc, argv);
+ } catch (...) {
+ nns_edge_loge ("Catch exception, failed to init google test.");
+ }
+
+ try {
+ result = RUN_ALL_TESTS ();
+ } catch (...) {
+ nns_edge_loge ("Catch exception, failed to run the unittest.");
+ }
+
+ return result;
+}
EXPECT_TRUE (nns_edge_queue_destroy (queue_h));
}
-#if defined(ENABLE_MQTT)
-/**
- * @brief Edge event callback for test.
- */
-static int
-_test_edge_hybrid_event_cb (nns_edge_event_h event_h, void *user_data)
-{
- ne_test_data_s *_td = (ne_test_data_s *) user_data;
- nns_edge_event_e event = NNS_EDGE_EVENT_UNKNOWN;
- nns_edge_data_h data_h;
- void *data;
- nns_size_t data_len;
- char *val;
- unsigned int i, count;
- int ret;
-
- if (!_td) {
- /* Cannot update event status. */
- return NNS_EDGE_ERROR_NONE;
- }
-
- ret = nns_edge_event_get_type (event_h, &event);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- switch (event) {
- case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
- _td->received++;
-
- ret = nns_edge_event_parse_new_data (event_h, &data_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- /* Compare metadata */
- ret = nns_edge_data_get_info (data_h, "test-key", &val);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- EXPECT_STREQ (val, "test-value");
- SAFE_FREE (val);
-
- if (_td->is_server) {
- /**
- * @note This is test code, responding to client.
- * Recommend not to call edge API in event callback.
- */
- ret = nns_edge_send (_td->handle, data_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- } else {
- /* Compare received data */
- ret = nns_edge_data_get_count (data_h, &count);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_data_get (data_h, 0, &data, &data_len);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- EXPECT_EQ (count, 1U);
- for (i = 0; i < 10U; i++)
- EXPECT_EQ (((unsigned int *) data)[i], i);
- }
-
- ret = nns_edge_data_destroy (data_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- break;
- default:
- break;
- }
-
- return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Check whether MQTT broker is running or not.
- */
-static bool
-_check_mqtt_broker ()
-{
- int ret = 0;
-
- ret = system ("ps aux | grep mosquitto | grep -v grep");
- if (0 != ret) {
- nns_edge_logw ("MQTT broker is not running. Skip query hybrid test.");
- return false;
- }
-
- return true;
-}
-
-/**
- * @brief Connect to the local host using the information received from mqtt.
- */
-TEST(edgeMqtt, connectLocal)
-{
- nns_edge_h server_h, client_h;
- ne_test_data_s *_td_server, *_td_client;
- nns_edge_data_h data_h;
- nns_size_t data_len;
- void *data;
- unsigned int i, retry;
- int ret = 0;
- char *val;
-
- if (!_check_mqtt_broker ())
- return;
-
- _td_server = _get_test_data (true);
- _td_client = _get_test_data (false);
- ASSERT_TRUE (_td_server != NULL && _td_client != NULL);
-
- /* Prepare server (127.0.0.1:port) */
- nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &server_h);
- nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server);
- nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (server_h, "DEST_PORT", "1883");
- nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic");
- nns_edge_set_info (server_h, "CAPS", "test server");
- nns_edge_set_info (server_h, "QUEUE_SIZE", "10:NEW");
- _td_server->handle = server_h;
-
- /* Prepare client */
- nns_edge_create_handle ("temp-client", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &client_h);
- nns_edge_set_event_callback (client_h, _test_edge_hybrid_event_cb, _td_client);
- nns_edge_set_info (client_h, "CAPS", "test client");
- nns_edge_set_info (client_h, "TOPIC", "temp-mqtt-topic");
- _td_client->handle = client_h;
-
- ret = nns_edge_start (server_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_start (client_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- usleep (200000);
-
- ret = nns_edge_connect (client_h, "127.0.0.1", 1883);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- usleep (10000);
-
- sleep (2);
-
- /* Send request to server */
- data_len = 10U * sizeof (unsigned int);
- data = malloc (data_len);
- ASSERT_TRUE (data != NULL);
-
- for (i = 0; i < 10U; i++)
- ((unsigned int *) data)[i] = i;
-
- ret = nns_edge_data_create (&data_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_data_add (data_h, data, data_len, nns_edge_free);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- nns_edge_get_info (client_h, "client_id", &val);
- nns_edge_data_set_info (data_h, "client_id", val);
- SAFE_FREE (val);
-
- ret = nns_edge_data_set_info (data_h, "test-key", "test-value");
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- for (i = 0; i < 5U; i++) {
- ret = nns_edge_send (client_h, data_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- usleep (10000);
- }
-
- ret = nns_edge_data_destroy (data_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- /* Wait for responding data (20 seconds) */
- retry = 0U;
- do {
- usleep (100000);
- if (_td_client->received > 0)
- break;
- } while (retry++ < 200U);
-
- ret = nns_edge_release_handle (server_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_release_handle (client_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- EXPECT_TRUE (_td_server->received > 0);
- EXPECT_TRUE (_td_client->received > 0);
-
- _free_test_data (_td_server);
- _free_test_data (_td_client);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam1_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam2_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam3_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid param.
- */
-TEST(edgeMqtt, connectInvalidParam4_n)
-{
- int ret = -1;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid host address.
- */
-TEST(edgeMqtt, connectInvalidParam5_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Connect to the mqtt broker with invalid port number.
- */
-TEST(edgeMqtt, connectInvalidParam6_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Close the mqtt handle with invalid param.
- */
-TEST(edgeMqtt, closeInvalidParam_n)
-{
- int ret = -1;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_close (NULL);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Publish with invalid param.
- */
-TEST(edgeMqtt, publishInvalidParam_n)
-{
- int ret = -1;
- const char *msg = "TEMP_MESSAGE";
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Publish with invalid param.
- */
-TEST(edgeMqtt, publishInvalidParam2_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
- const char *msg = "TEMP_MESSAGE";
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- /* data is null */
- ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_close (broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Publish with invalid param.
- */
-TEST(edgeMqtt, publishInvalidParam3_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
- const char *msg = "TEMP_MESSAGE";
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- /* data length is 0 */
- ret = nns_edge_mqtt_publish (broker_h, msg, 0);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_close (broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Subscribe the topic with invalid param.
- */
-TEST(edgeMqtt, subscribeInvalidParam_n)
-{
- int ret = -1;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_subscribe (NULL);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message with invalid param.
- */
-TEST(edgeMqtt, getMessageInvalidParam1_n)
-{
- int ret = -1;
- void *msg = NULL;
- nns_size_t msg_len;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message with invalid param.
- */
-TEST(edgeMqtt, getMessageInvalidParam2_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
- nns_size_t msg_len;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_close (broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message with invalid param.
- */
-TEST(edgeMqtt, getMessageInvalidParam3_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
- void *msg = NULL;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_close (broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
-/**
- * @brief Get message from empty message queue.
- */
-TEST(edgeMqtt, getMessageWithinTimeout_n)
-{
- int ret = -1;
- nns_edge_broker_h broker_h;
- void *msg = NULL;
- nns_size_t msg_len;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_close (broker_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-#endif /* ENABLE_MQTT */
-
/**
* @brief Main gtest
*/