From: gichan Date: Fri, 14 Oct 2022 02:11:22 +0000 (+0900) Subject: [MQTT] Separate MQTT implementation X-Git-Tag: accepted/tizen/unified/20221115.172904~18 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=4703c596b24090b6bfb2acbdd48f742e1298940f;p=platform%2Fupstream%2Fnnstreamer-edge.git [MQTT] Separate MQTT implementation Separate implementation for MQTT into paho-mqtt-c and mosquitto. Let's use mosquitto lib in environments where the paho lib is not supported, such as tizenRT. Signed-off-by: gichan --- diff --git a/CMakeLists.txt b/CMakeLists.txt index 668422d..679fd27 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,7 +50,12 @@ IF(MQTT_SUPPORT) FIND_LIBRARY(PAHO_MQTT_LIB NAMES paho-mqtt3a paho-mqtt3c paho-mqtt3as paho-mqtt3cs) IF(NOT PAHO_MQTT_LIB) - MESSAGE(FATAL_ERROR "Cannot find Paho MQTT library.") + FIND_LIBRARY(MOSQUITTO_LIB NAMES mosquitto) + IF(NOT MOSQUITTO_LIB) + MESSAGE("FATAL_ERROR Cannot find paho-mqtt-c and mosquitto library.") + ELSE() + MESSAGE("FOUND MOSQUITTO LIB.") + ENDIF() ELSE() MESSAGE("Found Paho MQTT library.") ENDIF() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8920f47..e1354ce 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,7 +10,11 @@ SET(NNS_EDGE_SRCS ) IF(MQTT_SUPPORT) - SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c) + IF(PAHO_MQTT_LIB) + SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt-paho.c) + ELSE() + SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt-mosquitto.c) + ENDIF() ENDIF() IF(AITT_SUPPORT) @@ -23,7 +27,11 @@ TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PRIVATE ${INCLUDE_DIR} ${EDGE_RE TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${EDGE_REQUIRE_PKGS_LDFLAGS}) IF(MQTT_SUPPORT) - TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB}) + IF(PAHO_MQTT_LIB) + TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB}) + ELSE() + TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${MOSQUITTO_LIB}) + ENDIF() ENDIF() IF(AITT_SUPPORT) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c new file mode 100644 index 0000000..291a13e --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -0,0 +1,94 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-mqtt-mosquitto.c + * @date 14 Oct 2022 + * @brief Internal functions to support MQTT protocol (mosquitto Library). + * @see https://github.com/nnstreamer/nnstreamer-edge + * @author Gichan Jang + * @bug No known bugs except for NYI items + */ + +#include "nnstreamer-edge-internal.h" +#include "nnstreamer-edge-log.h" +#include "nnstreamer-edge-util.h" +#include "nnstreamer-edge-queue.h" + +/** + * @brief Data structure for mqtt broker handle. + */ +typedef struct +{ + void *mqtt_h; + nns_edge_queue_h server_list; + char *topic; +} nns_edge_broker_s; + +/** + * @brief Connect to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) +{ + UNUSED (edge_h); + UNUSED (topic); + return NNS_EDGE_ERROR_NOT_SUPPORTED; +} + +/** + * @brief Close the connection to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_close (nns_edge_h edge_h) +{ + UNUSED (edge_h); + return NNS_EDGE_ERROR_NOT_SUPPORTED; +} + +/** + * @brief Publish raw data. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) +{ + UNUSED (edge_h); + UNUSED (data); + UNUSED (length); + return NNS_EDGE_ERROR_NOT_SUPPORTED; +} + +/** + * @brief Subscribe a topic. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_subscribe (nns_edge_h edge_h) +{ + UNUSED (edge_h); + return NNS_EDGE_ERROR_NOT_SUPPORTED; +} + +/** + * @brief Check mqtt connection + */ +bool +nns_edge_mqtt_is_connected (nns_edge_h edge_h) +{ + UNUSED (edge_h); + return false; +} + +/** + * @brief Get message from mqtt broker. + */ +int +nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) +{ + UNUSED (edge_h); + UNUSED (msg); + return NNS_EDGE_ERROR_NOT_SUPPORTED; +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c new file mode 100644 index 0000000..49c23ff --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c @@ -0,0 +1,371 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-mqtt-paho.c + * @date 11 May 2022 + * @brief Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library). + * @see https://github.com/nnstreamer/nnstreamer + * @author Sangjung Woo + * @bug No known bugs except for NYI items + */ + +#if !defined(ENABLE_MQTT) +#error "This file can be built with Paho MQTT library." +#endif + +#include +#include "nnstreamer-edge-internal.h" +#include "nnstreamer-edge-log.h" +#include "nnstreamer-edge-util.h" +#include "nnstreamer-edge-queue.h" + +/** + * @brief Data structure for mqtt broker handle. + */ +typedef struct +{ + void *mqtt_h; + nns_edge_queue_h server_list; + char *topic; +} nns_edge_broker_s; + +/** + * @brief Callback function to be called when a message is arrived. + * @return Return TRUE to prevent delivering the message again. + */ +static int +mqtt_cb_message_arrived (void *context, char *topic, int topic_len, + MQTTAsync_message * message) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + char *msg = NULL; + + UNUSED (topic); + UNUSED (topic_len); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return TRUE; + } + + if (0 >= message->payloadlen) { + nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen); + return TRUE; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + + nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", + eh->id, eh->topic); + + msg = nns_edge_memdup (message->payload, message->payloadlen); + if (msg) + nns_edge_queue_push (bh->server_list, msg, nns_edge_free); + + return TRUE; +} + +/** + * @brief Connect to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; + int ret = NNS_EDGE_ERROR_NONE; + MQTTAsync handle; + char *url; + char *client_id; + unsigned int wait_count; + + if (!STR_IS_VALID (topic)) { + nns_edge_loge ("Invalid param, given topic is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->dest_host, eh->dest_port); + + bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s)); + if (!bh) { + nns_edge_loge ("Failed to allocate memory for broker handle."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + url = nns_edge_get_host_string (eh->dest_host, eh->dest_port); + client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); + + ret = MQTTAsync_create (&handle, url, client_id, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + SAFE_FREE (url); + SAFE_FREE (client_id); + + if (MQTTASYNC_SUCCESS != ret) { + nns_edge_loge ("Failed to create MQTT handle."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + bh->topic = nns_edge_strdup (topic); + bh->mqtt_h = handle; + nns_edge_queue_create (&bh->server_list); + eh->broker_h = bh; + + MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL); + + options.cleansession = 1; + options.keepAliveInterval = 6; + options.context = edge_h; + + if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to connect MQTT."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + /* Waiting for the connection */ + wait_count = 0U; + do { + if (wait_count > 500U) { + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + usleep (10000); + wait_count++; + } while (!MQTTAsync_isConnected (handle)); + + return NNS_EDGE_ERROR_NONE; + +error: + nns_edge_mqtt_close (eh); + return ret; +} + +/** + * @brief Close the connection to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_close (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + MQTTAsync handle; + MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; + unsigned int wait_count; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (handle) { + nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->dest_host, eh->dest_port); + + options.context = edge_h; + + /* Clear retained message */ + MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL); + + wait_count = 0U; + do { + if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to disconnect MQTT."); + break; + } + + if (wait_count > 500U) { + nns_edge_loge ("Failed to disconnect MQTT, timed out."); + break; + } + + usleep (10000); + wait_count++; + } while (MQTTAsync_isConnected (handle)); + + MQTTAsync_destroy (&handle); + } + + nns_edge_queue_destroy (bh->server_list); + bh->server_list = NULL; + + SAFE_FREE (bh->topic); + SAFE_FREE (bh); + + eh->broker_h = NULL; + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Publish raw data. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + MQTTAsync handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || length <= 0) { + nns_edge_loge ("Invalid param, given data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (!handle) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!MQTTAsync_isConnected (handle)) { + nns_edge_loge ("Failed to publish message, MQTT is not connected."); + return NNS_EDGE_ERROR_IO; + } + + /* Publish a message (default QoS 1 - at least once and retained true). */ + ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL); + if (ret != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Subscribe a topic. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_subscribe (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + MQTTAsync handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (!handle) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!MQTTAsync_isConnected (handle)) { + nns_edge_loge ("Failed to subscribe, MQTT is not connected."); + return NNS_EDGE_ERROR_IO; + } + + /* Subscribe a topic (default QoS 1 - at least once). */ + ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL); + if (ret != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Check mqtt connection + */ +bool +nns_edge_mqtt_is_connected (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + MQTTAsync handle; + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return false; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + handle = bh->mqtt_h; + + if (!handle) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return false; + } + + if (MQTTAsync_isConnected (handle)) { + return true; + } + + return false; +} + +/** + * @brief Get message from mqtt broker. + */ +int +nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!msg) { + nns_edge_loge ("Invalid param, given msg param is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + + /* Wait for 1 second */ + if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { + nns_edge_loge ("Failed to get message from mqtt broker within timeout."); + return NNS_EDGE_ERROR_UNKNOWN; + } + + return NNS_EDGE_ERROR_NONE; +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c deleted file mode 100644 index db743c7..0000000 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c +++ /dev/null @@ -1,371 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 */ -/** - * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. - * - * @file nnstreamer-edge-mqtt.c - * @date 11 May 2022 - * @brief Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library). - * @see https://github.com/nnstreamer/nnstreamer - * @author Sangjung Woo - * @bug No known bugs except for NYI items - */ - -#if !defined(ENABLE_MQTT) -#error "This file can be built with Paho MQTT library." -#endif - -#include -#include "nnstreamer-edge-internal.h" -#include "nnstreamer-edge-log.h" -#include "nnstreamer-edge-util.h" -#include "nnstreamer-edge-queue.h" - -/** - * @brief Data structure for mqtt broker handle. - */ -typedef struct -{ - void *mqtt_h; - nns_edge_queue_h server_list; - char *topic; -} nns_edge_broker_s; - -/** - * @brief Callback function to be called when a message is arrived. - * @return Return TRUE to prevent delivering the message again. - */ -static int -mqtt_cb_message_arrived (void *context, char *topic, int topic_len, - MQTTAsync_message * message) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - char *msg = NULL; - - UNUSED (topic); - UNUSED (topic_len); - eh = (nns_edge_handle_s *) context; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return TRUE; - } - - if (0 >= message->payloadlen) { - nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen); - return TRUE; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - - nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", - eh->id, eh->topic); - - msg = nns_edge_memdup (message->payload, message->payloadlen); - if (msg) - nns_edge_queue_push (bh->server_list, msg, nns_edge_free); - - return TRUE; -} - -/** - * @brief Connect to MQTT. - * @note This is internal function for MQTT broker. You should call this with edge-handle lock. - */ -int -nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; - int ret = NNS_EDGE_ERROR_NONE; - MQTTAsync handle; - char *url; - char *client_id; - unsigned int wait_count; - - if (!STR_IS_VALID (topic)) { - nns_edge_loge ("Invalid param, given topic is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); - - bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s)); - if (!bh) { - nns_edge_loge ("Failed to allocate memory for broker handle."); - return NNS_EDGE_ERROR_OUT_OF_MEMORY; - } - - url = nns_edge_get_host_string (eh->dest_host, eh->dest_port); - client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); - - ret = MQTTAsync_create (&handle, url, client_id, - MQTTCLIENT_PERSISTENCE_NONE, NULL); - SAFE_FREE (url); - SAFE_FREE (client_id); - - if (MQTTASYNC_SUCCESS != ret) { - nns_edge_loge ("Failed to create MQTT handle."); - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; - goto error; - } - - bh->topic = nns_edge_strdup (topic); - bh->mqtt_h = handle; - nns_edge_queue_create (&bh->server_list); - eh->broker_h = bh; - - MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL); - - options.cleansession = 1; - options.keepAliveInterval = 6; - options.context = edge_h; - - if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) { - nns_edge_loge ("Failed to connect MQTT."); - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; - goto error; - } - - /* Waiting for the connection */ - wait_count = 0U; - do { - if (wait_count > 500U) { - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; - goto error; - } - - usleep (10000); - wait_count++; - } while (!MQTTAsync_isConnected (handle)); - - return NNS_EDGE_ERROR_NONE; - -error: - nns_edge_mqtt_close (eh); - return ret; -} - -/** - * @brief Close the connection to MQTT. - * @note This is internal function for MQTT broker. You should call this with edge-handle lock. - */ -int -nns_edge_mqtt_close (nns_edge_h edge_h) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - MQTTAsync handle; - MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; - unsigned int wait_count; - - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - handle = bh->mqtt_h; - - if (handle) { - nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); - - options.context = edge_h; - - /* Clear retained message */ - MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL); - - wait_count = 0U; - do { - if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { - nns_edge_loge ("Failed to disconnect MQTT."); - break; - } - - if (wait_count > 500U) { - nns_edge_loge ("Failed to disconnect MQTT, timed out."); - break; - } - - usleep (10000); - wait_count++; - } while (MQTTAsync_isConnected (handle)); - - MQTTAsync_destroy (&handle); - } - - nns_edge_queue_destroy (bh->server_list); - bh->server_list = NULL; - - SAFE_FREE (bh->topic); - SAFE_FREE (bh); - - eh->broker_h = NULL; - return NNS_EDGE_ERROR_NONE; -} - -/** - * @brief Publish raw data. - * @note This is internal function for MQTT broker. You should call this with edge-handle lock. - */ -int -nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - MQTTAsync handle; - int ret; - - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - if (!data || length <= 0) { - nns_edge_loge ("Invalid param, given data is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - handle = bh->mqtt_h; - - if (!handle) { - nns_edge_loge ("Invalid state, MQTT connection was not completed."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - if (!MQTTAsync_isConnected (handle)) { - nns_edge_loge ("Failed to publish message, MQTT is not connected."); - return NNS_EDGE_ERROR_IO; - } - - /* Publish a message (default QoS 1 - at least once and retained true). */ - ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL); - if (ret != MQTTASYNC_SUCCESS) { - nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", - eh->id, eh->topic); - return NNS_EDGE_ERROR_IO; - } - - return NNS_EDGE_ERROR_NONE; -} - -/** - * @brief Subscribe a topic. - * @note This is internal function for MQTT broker. You should call this with edge-handle lock. - */ -int -nns_edge_mqtt_subscribe (nns_edge_h edge_h) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - MQTTAsync handle; - int ret; - - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - handle = bh->mqtt_h; - - if (!handle) { - nns_edge_loge ("Invalid state, MQTT connection was not completed."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - if (!MQTTAsync_isConnected (handle)) { - nns_edge_loge ("Failed to subscribe, MQTT is not connected."); - return NNS_EDGE_ERROR_IO; - } - - /* Subscribe a topic (default QoS 1 - at least once). */ - ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL); - if (ret != MQTTASYNC_SUCCESS) { - nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", - eh->id, eh->topic); - return NNS_EDGE_ERROR_IO; - } - - return NNS_EDGE_ERROR_NONE; -} - -/** - * @brief Check mqtt connection - */ -bool -nns_edge_mqtt_is_connected (nns_edge_h edge_h) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - MQTTAsync handle; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return false; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - handle = bh->mqtt_h; - - if (!handle) { - nns_edge_loge ("Invalid state, MQTT connection was not completed."); - return false; - } - - if (MQTTAsync_isConnected (handle)) { - return true; - } - - return false; -} - -/** - * @brief Get message from mqtt broker. - */ -int -nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) -{ - nns_edge_handle_s *eh; - nns_edge_broker_s *bh; - - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - if (!msg) { - nns_edge_loge ("Invalid param, given msg param is invalid."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - bh = (nns_edge_broker_s *) eh->broker_h; - - /* Wait for 1 second */ - if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { - nns_edge_loge ("Failed to get message from mqtt broker within timeout."); - return NNS_EDGE_ERROR_UNKNOWN; - } - - return NNS_EDGE_ERROR_NONE; -}