From c367da858310bb44e2ab13ad250d22f08257253d Mon Sep 17 00:00:00 2001 From: gichan Date: Mon, 17 Oct 2022 18:41:50 +0900 Subject: [PATCH] [MQTT] Implement mqtt using mosquitto lib Implement mqtt using moosquitto lib. Signed-off-by: gichan --- .../nnstreamer-edge-mqtt-mosquitto.c | 307 ++++++++++++++++-- tests/unittest_nnstreamer-edge.cc | 18 +- 2 files changed, 294 insertions(+), 31 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c index 291a13e..fa52859 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -10,6 +10,7 @@ * @bug No known bugs except for NYI items */ +#include #include "nnstreamer-edge-internal.h" #include "nnstreamer-edge-log.h" #include "nnstreamer-edge-util.h" @@ -23,8 +24,102 @@ typedef struct void *mqtt_h; nns_edge_queue_h server_list; char *topic; + bool connected; } nns_edge_broker_s; +/** + * @brief Callback function to be called when a message is arrived. + */ +static void +on_message_callback (struct mosquitto *client, void *data, + const struct mosquitto_message *message) +{ + nns_edge_broker_s *bh = (nns_edge_broker_s *) data; + char *msg = NULL; + + if (!bh) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); + return; + } + + if (0 >= message->payloadlen) { + nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen); + return; + } + + nns_edge_logd ("MQTT message is arrived (ID:%d, Topic:%s).", + message->mid, message->topic); + + msg = nns_edge_memdup (message->payload, message->payloadlen); + if (msg) + nns_edge_queue_push (bh->server_list, msg, nns_edge_free); + + return; +} + +/** + * @brief Initializes MQTT object. + */ +static int +_nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic) +{ + nns_edge_broker_s *bh; + int mret; + char *client_id; + struct mosquitto *handle; + int ver = MQTT_PROTOCOL_V311; + + mosquitto_lib_init (); + + bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s)); + if (!bh) { + nns_edge_loge ("Failed to allocate memory for broker handle."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); + + handle = mosquitto_new (client_id, TRUE, NULL); + SAFE_FREE (client_id); + + if (!handle) { + nns_edge_loge ("Failed to create mosquitto client instance."); + SAFE_FREE (bh); + mosquitto_lib_cleanup (); + return NNS_EDGE_ERROR_UNKNOWN; + } + + mosquitto_user_data_set (handle, bh); + + mret = mosquitto_opts_set (handle, MOSQ_OPT_PROTOCOL_VERSION, &ver); + if (MOSQ_ERR_SUCCESS != mret) { + nns_edge_loge ("Failed to set MQTT protocol version 3.1.1."); + goto error; + } + + mosquitto_message_callback_set (handle, on_message_callback); + + mret = mosquitto_loop_start (handle); + if (mret != MOSQ_ERR_SUCCESS) { + nns_edge_loge ("Failed to start mosquitto loop."); + goto error; + } + + bh->topic = nns_edge_strdup (topic); + bh->mqtt_h = handle; + bh->connected = false; + nns_edge_queue_create (&bh->server_list); + eh->broker_h = bh; + + return NNS_EDGE_ERROR_NONE; + +error: + SAFE_FREE (bh); + mosquitto_destroy (handle); + mosquitto_lib_cleanup (); + return NNS_EDGE_ERROR_UNKNOWN; +} + /** * @brief Connect to MQTT. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. @@ -32,9 +127,47 @@ typedef struct int nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) { - UNUSED (edge_h); - UNUSED (topic); - return NNS_EDGE_ERROR_NOT_SUPPORTED; + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + int ret = NNS_EDGE_ERROR_NONE; + struct mosquitto *handle; + + if (!STR_IS_VALID (topic)) { + nns_edge_loge ("Invalid param, given topic is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->dest_host, eh->dest_port); + + if (NNS_EDGE_ERROR_NONE != _nns_edge_mqtt_init_client (eh, topic)) { + nns_edge_loge ("Failed to initialize the mqtt client objects."); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (MOSQ_ERR_SUCCESS != mosquitto_connect (handle, eh->dest_host, + eh->dest_port, 60)) { + nns_edge_loge ("Failed to connect MQTT."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + bh->connected = true; + return NNS_EDGE_ERROR_NONE; + +error: + nns_edge_mqtt_close (eh); + return ret; } /** @@ -44,8 +177,39 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) int nns_edge_mqtt_close (nns_edge_h edge_h) { - UNUSED (edge_h); - return NNS_EDGE_ERROR_NOT_SUPPORTED; + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + struct mosquitto *handle; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (handle) { + nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->dest_host, eh->dest_port); + + /* Clear retained message */ + mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true); + + mosquitto_disconnect (handle); + mosquitto_destroy (handle); + mosquitto_lib_cleanup (); + } + + nns_edge_queue_destroy (bh->server_list); + bh->server_list = NULL; + SAFE_FREE (bh->topic); + SAFE_FREE (bh); + eh->broker_h = NULL; + + return NNS_EDGE_ERROR_NONE; } /** @@ -55,10 +219,45 @@ nns_edge_mqtt_close (nns_edge_h edge_h) int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) { - UNUSED (edge_h); - UNUSED (data); - UNUSED (length); - return NNS_EDGE_ERROR_NOT_SUPPORTED; + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + struct mosquitto *handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || length <= 0) { + nns_edge_loge ("Invalid param, given data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (!handle) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!bh->connected) { + nns_edge_loge ("Failed to publish message, MQTT is not connected."); + return NNS_EDGE_ERROR_IO; + } + + /* Publish a message (default QoS 1 - at least once and retained true). */ + ret = mosquitto_publish (handle, NULL, bh->topic, length, data, 1, true); + if (MOSQ_ERR_SUCCESS != ret) { + nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; } /** @@ -68,27 +267,91 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) int nns_edge_mqtt_subscribe (nns_edge_h edge_h) { - UNUSED (edge_h); - return NNS_EDGE_ERROR_NOT_SUPPORTED; + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + void *handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (!handle) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!bh->connected) { + nns_edge_loge ("Failed to subscribe, MQTT is not connected."); + return NNS_EDGE_ERROR_IO; + } + + /* Subscribe a topic (default QoS 1 - at least once). */ + ret = mosquitto_subscribe (handle, NULL, bh->topic, 1); + if (MOSQ_ERR_SUCCESS != ret) { + nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; } /** - * @brief Check mqtt connection + * @brief Get message from mqtt broker. */ -bool -nns_edge_mqtt_is_connected (nns_edge_h edge_h) +int +nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) { - UNUSED (edge_h); - return false; + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!msg) { + nns_edge_loge ("Invalid param, given msg param is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + + /* Wait for 1 second */ + if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { + nns_edge_loge ("Failed to get message from mqtt broker within timeout."); + return NNS_EDGE_ERROR_UNKNOWN; + } + + return NNS_EDGE_ERROR_NONE; } /** - * @brief Get message from mqtt broker. + * @brief Check mqtt connection */ -int -nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) +bool +nns_edge_mqtt_is_connected (nns_edge_h edge_h) { - UNUSED (edge_h); - UNUSED (msg); - return NNS_EDGE_ERROR_NOT_SUPPORTED; + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return false; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + + return bh->connected; } diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index 8748969..3ceabd0 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -3548,7 +3548,7 @@ TEST(edgeMqtt, connectLocal) nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server); nns_edge_set_info (server_h, "HOST", "localhost"); nns_edge_set_info (server_h, "PORT", "0"); - nns_edge_set_info (server_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (server_h, "DEST_PORT", "1883"); nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic"); nns_edge_set_info (server_h, "CAPS", "test server"); @@ -3571,7 +3571,7 @@ TEST(edgeMqtt, connectLocal) usleep (200000); - ret = nns_edge_connect (client_h, "tcp://localhost", 1883); + ret = nns_edge_connect (client_h, "127.0.0.1", 1883); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); usleep (10000); @@ -3655,7 +3655,7 @@ TEST(edgeMqtt, connectInvalidParam2_n) ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (edge_h, "DEST_PORT", "1883"); ret = nns_edge_mqtt_connect (edge_h, NULL); @@ -3679,7 +3679,7 @@ TEST(edgeMqtt, connectInvalidParam3_n) ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (edge_h, "DEST_PORT", "1883"); ret = nns_edge_mqtt_connect (edge_h, ""); @@ -3779,7 +3779,7 @@ TEST(edgeMqtt, publishInvalidParam2_n) ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (edge_h, "DEST_PORT", "1883"); ret = nns_edge_start (edge_h); @@ -3808,7 +3808,7 @@ TEST(edgeMqtt, publishInvalidParam3_n) ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (edge_h, "DEST_PORT", "1883"); ret = nns_edge_start (edge_h); @@ -3837,7 +3837,7 @@ TEST(edgeMqtt, publishInvalidParam4_n) ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (edge_h, "DEST_PORT", "1883"); ret = nns_edge_mqtt_publish (edge_h, msg, strlen (msg) + 1); @@ -3912,7 +3912,7 @@ TEST(edgeMqtt, getMessageInvalidParam2_n) ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (edge_h, "DEST_PORT", "1883"); ret = nns_edge_start (edge_h); @@ -3940,7 +3940,7 @@ TEST(edgeMqtt, getMessageWithinTimeout_n) ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost"); + nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); nns_edge_set_info (edge_h, "DEST_PORT", "1883"); ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic"); -- 2.34.1