Separate implementation for MQTT into paho-mqtt-c and mosquitto.
Let's use mosquitto lib in environments where the paho lib is not supported, such as tizenRT.
Signed-off-by: gichan <gichan2.jang@samsung.com>
FIND_LIBRARY(PAHO_MQTT_LIB NAMES paho-mqtt3a paho-mqtt3c paho-mqtt3as paho-mqtt3cs)
IF(NOT PAHO_MQTT_LIB)
- MESSAGE(FATAL_ERROR "Cannot find Paho MQTT library.")
+ FIND_LIBRARY(MOSQUITTO_LIB NAMES mosquitto)
+ IF(NOT MOSQUITTO_LIB)
+ MESSAGE("FATAL_ERROR Cannot find paho-mqtt-c and mosquitto library.")
+ ELSE()
+ MESSAGE("FOUND MOSQUITTO LIB.")
+ ENDIF()
ELSE()
MESSAGE("Found Paho MQTT library.")
ENDIF()
)
IF(MQTT_SUPPORT)
- SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c)
+ IF(PAHO_MQTT_LIB)
+ SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt-paho.c)
+ ELSE()
+ SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt-mosquitto.c)
+ ENDIF()
ENDIF()
IF(AITT_SUPPORT)
TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${EDGE_REQUIRE_PKGS_LDFLAGS})
IF(MQTT_SUPPORT)
- TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB})
+ IF(PAHO_MQTT_LIB)
+ TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB})
+ ELSE()
+ TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${MOSQUITTO_LIB})
+ ENDIF()
ENDIF()
IF(AITT_SUPPORT)
--- /dev/null
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file nnstreamer-edge-mqtt-mosquitto.c
+ * @date 14 Oct 2022
+ * @brief Internal functions to support MQTT protocol (mosquitto Library).
+ * @see https://github.com/nnstreamer/nnstreamer-edge
+ * @author Gichan Jang <gichan2.jang@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#include "nnstreamer-edge-internal.h"
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-queue.h"
+
+/**
+ * @brief Data structure for mqtt broker handle.
+ */
+typedef struct
+{
+ void *mqtt_h;
+ nns_edge_queue_h server_list;
+ char *topic;
+} nns_edge_broker_s;
+
+/**
+ * @brief Connect to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+{
+ UNUSED (edge_h);
+ UNUSED (topic);
+ return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Close the connection to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_close (nns_edge_h edge_h)
+{
+ UNUSED (edge_h);
+ return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+{
+ UNUSED (edge_h);
+ UNUSED (data);
+ UNUSED (length);
+ return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+{
+ UNUSED (edge_h);
+ return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
+
+/**
+ * @brief Check mqtt connection
+ */
+bool
+nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+{
+ UNUSED (edge_h);
+ return false;
+}
+
+/**
+ * @brief Get message from mqtt broker.
+ */
+int
+nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+{
+ UNUSED (edge_h);
+ UNUSED (msg);
+ return NNS_EDGE_ERROR_NOT_SUPPORTED;
+}
--- /dev/null
+/* SPDX-License-Identifier: Apache-2.0 */
+/**
+ * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
+ *
+ * @file nnstreamer-edge-mqtt-paho.c
+ * @date 11 May 2022
+ * @brief Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library).
+ * @see https://github.com/nnstreamer/nnstreamer
+ * @author Sangjung Woo <sangjung.woo@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#if !defined(ENABLE_MQTT)
+#error "This file can be built with Paho MQTT library."
+#endif
+
+#include <MQTTAsync.h>
+#include "nnstreamer-edge-internal.h"
+#include "nnstreamer-edge-log.h"
+#include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-queue.h"
+
+/**
+ * @brief Data structure for mqtt broker handle.
+ */
+typedef struct
+{
+ void *mqtt_h;
+ nns_edge_queue_h server_list;
+ char *topic;
+} nns_edge_broker_s;
+
+/**
+ * @brief Callback function to be called when a message is arrived.
+ * @return Return TRUE to prevent delivering the message again.
+ */
+static int
+mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
+ MQTTAsync_message * message)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ char *msg = NULL;
+
+ UNUSED (topic);
+ UNUSED (topic_len);
+ eh = (nns_edge_handle_s *) context;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return TRUE;
+ }
+
+ if (0 >= message->payloadlen) {
+ nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
+ return TRUE;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+
+ nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
+ eh->id, eh->topic);
+
+ msg = nns_edge_memdup (message->payload, message->payloadlen);
+ if (msg)
+ nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
+
+ return TRUE;
+}
+
+/**
+ * @brief Connect to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
+ int ret = NNS_EDGE_ERROR_NONE;
+ MQTTAsync handle;
+ char *url;
+ char *client_id;
+ unsigned int wait_count;
+
+ if (!STR_IS_VALID (topic)) {
+ nns_edge_loge ("Invalid param, given topic is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+ eh->id, eh->dest_host, eh->dest_port);
+
+ bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
+ if (!bh) {
+ nns_edge_loge ("Failed to allocate memory for broker handle.");
+ return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+ }
+
+ url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
+ client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+
+ ret = MQTTAsync_create (&handle, url, client_id,
+ MQTTCLIENT_PERSISTENCE_NONE, NULL);
+ SAFE_FREE (url);
+ SAFE_FREE (client_id);
+
+ if (MQTTASYNC_SUCCESS != ret) {
+ nns_edge_loge ("Failed to create MQTT handle.");
+ ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ goto error;
+ }
+
+ bh->topic = nns_edge_strdup (topic);
+ bh->mqtt_h = handle;
+ nns_edge_queue_create (&bh->server_list);
+ eh->broker_h = bh;
+
+ MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
+
+ options.cleansession = 1;
+ options.keepAliveInterval = 6;
+ options.context = edge_h;
+
+ if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
+ nns_edge_loge ("Failed to connect MQTT.");
+ ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ goto error;
+ }
+
+ /* Waiting for the connection */
+ wait_count = 0U;
+ do {
+ if (wait_count > 500U) {
+ ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ goto error;
+ }
+
+ usleep (10000);
+ wait_count++;
+ } while (!MQTTAsync_isConnected (handle));
+
+ return NNS_EDGE_ERROR_NONE;
+
+error:
+ nns_edge_mqtt_close (eh);
+ return ret;
+}
+
+/**
+ * @brief Close the connection to MQTT.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_close (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ MQTTAsync handle;
+ MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
+ unsigned int wait_count;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ handle = bh->mqtt_h;
+
+ if (handle) {
+ nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
+ eh->id, eh->dest_host, eh->dest_port);
+
+ options.context = edge_h;
+
+ /* Clear retained message */
+ MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
+
+ wait_count = 0U;
+ do {
+ if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
+ nns_edge_loge ("Failed to disconnect MQTT.");
+ break;
+ }
+
+ if (wait_count > 500U) {
+ nns_edge_loge ("Failed to disconnect MQTT, timed out.");
+ break;
+ }
+
+ usleep (10000);
+ wait_count++;
+ } while (MQTTAsync_isConnected (handle));
+
+ MQTTAsync_destroy (&handle);
+ }
+
+ nns_edge_queue_destroy (bh->server_list);
+ bh->server_list = NULL;
+
+ SAFE_FREE (bh->topic);
+ SAFE_FREE (bh);
+
+ eh->broker_h = NULL;
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ MQTTAsync handle;
+ int ret;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!data || length <= 0) {
+ nns_edge_loge ("Invalid param, given data is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ handle = bh->mqtt_h;
+
+ if (!handle) {
+ nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!MQTTAsync_isConnected (handle)) {
+ nns_edge_loge ("Failed to publish message, MQTT is not connected.");
+ return NNS_EDGE_ERROR_IO;
+ }
+
+ /* Publish a message (default QoS 1 - at least once and retained true). */
+ ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
+ if (ret != MQTTASYNC_SUCCESS) {
+ nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
+ eh->id, eh->topic);
+ return NNS_EDGE_ERROR_IO;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
+ */
+int
+nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ MQTTAsync handle;
+ int ret;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ handle = bh->mqtt_h;
+
+ if (!handle) {
+ nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!MQTTAsync_isConnected (handle)) {
+ nns_edge_loge ("Failed to subscribe, MQTT is not connected.");
+ return NNS_EDGE_ERROR_IO;
+ }
+
+ /* Subscribe a topic (default QoS 1 - at least once). */
+ ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
+ if (ret != MQTTASYNC_SUCCESS) {
+ nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
+ eh->id, eh->topic);
+ return NNS_EDGE_ERROR_IO;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Check mqtt connection
+ */
+bool
+nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ MQTTAsync handle;
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return false;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ handle = bh->mqtt_h;
+
+ if (!handle) {
+ nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+ return false;
+ }
+
+ if (MQTTAsync_isConnected (handle)) {
+ return true;
+ }
+
+ return false;
+}
+
+/**
+ * @brief Get message from mqtt broker.
+ */
+int
+nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!msg) {
+ nns_edge_loge ("Invalid param, given msg param is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+
+ /* Wait for 1 second */
+ if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
+ nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+++ /dev/null
-/* SPDX-License-Identifier: Apache-2.0 */
-/**
- * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
- *
- * @file nnstreamer-edge-mqtt.c
- * @date 11 May 2022
- * @brief Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library).
- * @see https://github.com/nnstreamer/nnstreamer
- * @author Sangjung Woo <sangjung.woo@samsung.com>
- * @bug No known bugs except for NYI items
- */
-
-#if !defined(ENABLE_MQTT)
-#error "This file can be built with Paho MQTT library."
-#endif
-
-#include <MQTTAsync.h>
-#include "nnstreamer-edge-internal.h"
-#include "nnstreamer-edge-log.h"
-#include "nnstreamer-edge-util.h"
-#include "nnstreamer-edge-queue.h"
-
-/**
- * @brief Data structure for mqtt broker handle.
- */
-typedef struct
-{
- void *mqtt_h;
- nns_edge_queue_h server_list;
- char *topic;
-} nns_edge_broker_s;
-
-/**
- * @brief Callback function to be called when a message is arrived.
- * @return Return TRUE to prevent delivering the message again.
- */
-static int
-mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
- MQTTAsync_message * message)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
- char *msg = NULL;
-
- UNUSED (topic);
- UNUSED (topic_len);
- eh = (nns_edge_handle_s *) context;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return TRUE;
- }
-
- if (0 >= message->payloadlen) {
- nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
- return TRUE;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
-
- nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
- eh->id, eh->topic);
-
- msg = nns_edge_memdup (message->payload, message->payloadlen);
- if (msg)
- nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
-
- return TRUE;
-}
-
-/**
- * @brief Connect to MQTT.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
- MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
- int ret = NNS_EDGE_ERROR_NONE;
- MQTTAsync handle;
- char *url;
- char *client_id;
- unsigned int wait_count;
-
- if (!STR_IS_VALID (topic)) {
- nns_edge_loge ("Invalid param, given topic is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
-
- bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
- if (!bh) {
- nns_edge_loge ("Failed to allocate memory for broker handle.");
- return NNS_EDGE_ERROR_OUT_OF_MEMORY;
- }
-
- url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
- client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
-
- ret = MQTTAsync_create (&handle, url, client_id,
- MQTTCLIENT_PERSISTENCE_NONE, NULL);
- SAFE_FREE (url);
- SAFE_FREE (client_id);
-
- if (MQTTASYNC_SUCCESS != ret) {
- nns_edge_loge ("Failed to create MQTT handle.");
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
- goto error;
- }
-
- bh->topic = nns_edge_strdup (topic);
- bh->mqtt_h = handle;
- nns_edge_queue_create (&bh->server_list);
- eh->broker_h = bh;
-
- MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
-
- options.cleansession = 1;
- options.keepAliveInterval = 6;
- options.context = edge_h;
-
- if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
- nns_edge_loge ("Failed to connect MQTT.");
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
- goto error;
- }
-
- /* Waiting for the connection */
- wait_count = 0U;
- do {
- if (wait_count > 500U) {
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
- goto error;
- }
-
- usleep (10000);
- wait_count++;
- } while (!MQTTAsync_isConnected (handle));
-
- return NNS_EDGE_ERROR_NONE;
-
-error:
- nns_edge_mqtt_close (eh);
- return ret;
-}
-
-/**
- * @brief Close the connection to MQTT.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_close (nns_edge_h edge_h)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
- MQTTAsync handle;
- MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
- unsigned int wait_count;
-
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
- handle = bh->mqtt_h;
-
- if (handle) {
- nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
-
- options.context = edge_h;
-
- /* Clear retained message */
- MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
-
- wait_count = 0U;
- do {
- if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
- nns_edge_loge ("Failed to disconnect MQTT.");
- break;
- }
-
- if (wait_count > 500U) {
- nns_edge_loge ("Failed to disconnect MQTT, timed out.");
- break;
- }
-
- usleep (10000);
- wait_count++;
- } while (MQTTAsync_isConnected (handle));
-
- MQTTAsync_destroy (&handle);
- }
-
- nns_edge_queue_destroy (bh->server_list);
- bh->server_list = NULL;
-
- SAFE_FREE (bh->topic);
- SAFE_FREE (bh);
-
- eh->broker_h = NULL;
- return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Publish raw data.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
- MQTTAsync handle;
- int ret;
-
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- if (!data || length <= 0) {
- nns_edge_loge ("Invalid param, given data is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
- handle = bh->mqtt_h;
-
- if (!handle) {
- nns_edge_loge ("Invalid state, MQTT connection was not completed.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- if (!MQTTAsync_isConnected (handle)) {
- nns_edge_loge ("Failed to publish message, MQTT is not connected.");
- return NNS_EDGE_ERROR_IO;
- }
-
- /* Publish a message (default QoS 1 - at least once and retained true). */
- ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
- if (ret != MQTTASYNC_SUCCESS) {
- nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
- eh->id, eh->topic);
- return NNS_EDGE_ERROR_IO;
- }
-
- return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Subscribe a topic.
- * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
- */
-int
-nns_edge_mqtt_subscribe (nns_edge_h edge_h)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
- MQTTAsync handle;
- int ret;
-
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
- handle = bh->mqtt_h;
-
- if (!handle) {
- nns_edge_loge ("Invalid state, MQTT connection was not completed.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- if (!MQTTAsync_isConnected (handle)) {
- nns_edge_loge ("Failed to subscribe, MQTT is not connected.");
- return NNS_EDGE_ERROR_IO;
- }
-
- /* Subscribe a topic (default QoS 1 - at least once). */
- ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
- if (ret != MQTTASYNC_SUCCESS) {
- nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
- eh->id, eh->topic);
- return NNS_EDGE_ERROR_IO;
- }
-
- return NNS_EDGE_ERROR_NONE;
-}
-
-/**
- * @brief Check mqtt connection
- */
-bool
-nns_edge_mqtt_is_connected (nns_edge_h edge_h)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
- MQTTAsync handle;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return false;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
- handle = bh->mqtt_h;
-
- if (!handle) {
- nns_edge_loge ("Invalid state, MQTT connection was not completed.");
- return false;
- }
-
- if (MQTTAsync_isConnected (handle)) {
- return true;
- }
-
- return false;
-}
-
-/**
- * @brief Get message from mqtt broker.
- */
-int
-nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
-{
- nns_edge_handle_s *eh;
- nns_edge_broker_s *bh;
-
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- if (!msg) {
- nns_edge_loge ("Invalid param, given msg param is invalid.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- bh = (nns_edge_broker_s *) eh->broker_h;
-
- /* Wait for 1 second */
- if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
- nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
- return NNS_EDGE_ERROR_UNKNOWN;
- }
-
- return NNS_EDGE_ERROR_NONE;
-}