From d353debf23fffdb1b48f9d6eb467661cae162ddd Mon Sep 17 00:00:00 2001 From: gichan Date: Mon, 8 Aug 2022 17:26:55 +0900 Subject: [PATCH] [MQTT] Implement query mqtt hybrid feature Implement query mqtt hybrid feature. Signed-off-by: gichan --- .../nnstreamer-edge-common.h | 2 + .../nnstreamer-edge-internal.c | 121 ++++++++- .../nnstreamer-edge-internal.h | 15 +- src/libnnstreamer-edge/nnstreamer-edge-mqtt.c | 231 +++++++++++++++--- 4 files changed, 331 insertions(+), 38 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-common.h b/src/libnnstreamer-edge/nnstreamer-edge-common.h index fe54543..4d9aa3e 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-common.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-common.h @@ -45,6 +45,8 @@ extern "C" { #define nns_edge_lock(h) do { pthread_mutex_lock (&(h)->lock); } while (0) #define nns_edge_unlock(h) do { pthread_mutex_unlock (&(h)->lock); } while (0) +typedef void *nns_edge_broker_h; + /** * @brief Internal data structure for metadata. */ diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 2a8d946..628233f 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -1002,7 +1002,10 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, eh->connect_type = connect_type; eh->host = nns_edge_strdup ("localhost"); eh->port = 0; + eh->dest_host = nns_edge_strdup ("localhost"); + eh->dest_port = 0; eh->flags = flags; + eh->broker_h = NULL; nns_edge_metadata_init (&eh->meta); /* Connection data for each client ID. */ @@ -1047,6 +1050,39 @@ nns_edge_start (nns_edge_h edge_h) } } + if (eh->flags & NNS_EDGE_FLAG_SERVER) { + if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { + gchar *device, *topic, *msg; + + if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) { + nns_edge_loge + ("Failed to start nnstreamer-edge. Connection failure to broker."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + /** @todo Set unique device name. + * Device name should be unique. Consider using MAC address later. + * Now, use ID received from the user. + */ + device = g_strdup_printf ("device-%s", eh->id); + topic = g_strdup_printf ("edge/inference/%s/%s/", device, eh->topic); + + g_free (device); + g_free (eh->topic); + eh->topic = topic; + msg = nns_edge_strdup_printf ("%s:%d", eh->host, eh->port); + + if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_publish (eh, msg, + strlen (msg) + 1)) { + nns_edge_loge ("Failed to publish the meesage: %s", msg); + ret = NNS_EDGE_ERROR_IO; + goto error; + } + nns_edge_free (msg); + } + } + /** Initialize server src data. */ eh->listener = g_socket_listener_new (); g_socket_listener_set_backlog (eh->listener, N_BACKLOG); @@ -1097,6 +1133,12 @@ nns_edge_release_handle (nns_edge_h edge_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } + if (nns_edge_mqtt_is_connected (eh)) { + if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) { + nns_edge_logw ("Failed to close mqtt connection."); + } + } + eh->magic = NNS_EDGE_MAGIC_DEAD; eh->event_cb = NULL; eh->user_data = NULL; @@ -1168,6 +1210,8 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) { nns_edge_handle_s *eh; int ret; + char *server_ip = NULL; + int server_port; eh = (nns_edge_handle_s *) edge_h; if (!eh) { @@ -1199,16 +1243,59 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) return NNS_EDGE_ERROR_CONNECTION_FAILURE; } - /* Connect to info channel. */ - ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to connect host %s:%d.", dest_host, dest_port); - } else { - SAFE_FREE (eh->dest_host); - eh->dest_host = nns_edge_strdup (dest_host); - eh->dest_port = dest_port; + SAFE_FREE (eh->dest_host); + eh->dest_host = nns_edge_strdup (dest_host); + eh->dest_port = dest_port; + + if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { + gchar *topic, *msg = NULL; + + if (!nns_edge_mqtt_is_connected (eh)) { + if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) { + nns_edge_loge ("Connection failure to broker."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + topic = g_strdup_printf ("edge/inference/+/%s/#", eh->topic); + g_free (eh->topic); + eh->topic = topic; + + if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_subscribe (eh)) { + nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + } + + ret = nns_edge_mqtt_get_message (eh, &msg); + while (NNS_EDGE_ERROR_NONE == ret) { + gchar **splits; + splits = g_strsplit (msg, ":", -1); + server_ip = g_strdup (splits[0]); + server_port = g_ascii_strtoull (splits[1], NULL, 10); + nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip, + server_port); + + g_strfreev (splits); + g_free (msg); + + ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port); + if (NNS_EDGE_ERROR_NONE == ret) { + break; + } + SAFE_FREE (server_ip); + ret = nns_edge_mqtt_get_message (eh, &msg); + } + } else { /** case for NNS_EDGE_CONNECT_TYPE_TCP == eh->protocol */ + server_ip = nns_edge_strdup (dest_host); + server_port = dest_port; + ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to connect to %s:%d", server_ip, server_port); + } } + SAFE_FREE (server_ip); nns_edge_unlock (eh); return ret; } @@ -1441,6 +1528,19 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) } else { eh->port = port; } + } else if (0 == strcasecmp (key, "DEST_IP") + || 0 == strcasecmp (key, "DEST_HOST")) { + SAFE_FREE (eh->dest_host); + eh->dest_host = nns_edge_strdup (value); + } else if (0 == strcasecmp (key, "DEST_PORT")) { + int port = (int) strtoll (value, NULL, 10); + + if (port <= 0 || port > 65535) { + nns_edge_loge ("Invalid port number %d.", port); + ret = NNS_EDGE_ERROR_INVALID_PARAMETER; + } else { + eh->dest_port = port; + } } else if (0 == strcasecmp (key, "TOPIC")) { SAFE_FREE (eh->topic); eh->topic = nns_edge_strdup (value); @@ -1503,6 +1603,11 @@ nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value) *value = nns_edge_strdup (eh->topic); } else if (0 == strcasecmp (key, "ID")) { *value = nns_edge_strdup (eh->id); + } else if (0 == strcasecmp (key, "DEST_IP") + || 0 == strcasecmp (key, "DEST_HOST")) { + *value = nns_edge_strdup (eh->dest_host); + } else if (0 == strcasecmp (key, "DEST_PORT")) { + *value = nns_edge_strdup_printf ("%d", eh->dest_port); } else if (0 == strcasecmp (key, "CLIENT_ID")) { if (eh->flags & NNS_EDGE_FLAG_SERVER) { nns_edge_loge ("Cannot get the client ID, it was started as a server."); diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.h b/src/libnnstreamer-edge/nnstreamer-edge-internal.h index ffb670d..c3bec0e 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.h @@ -49,7 +49,7 @@ typedef struct { GSocketListener *listener; /* MQTT */ - void *mqtt_handle; + nns_edge_broker_h broker_h; } nns_edge_handle_s; #if defined(ENABLE_MQTT) @@ -76,6 +76,17 @@ int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int nns_edge_mqtt_subscribe (nns_edge_h edge_h); + +/** + * @brief Check mqtt connection + */ +bool nns_edge_mqtt_is_connected (nns_edge_h edge_h); + +/** + * @brief Get message from mqtt broker. + */ +int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg); + #else /** * @todo consider to change code style later. @@ -90,6 +101,8 @@ int nns_edge_mqtt_subscribe (nns_edge_h edge_h); #define nns_edge_mqtt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #define nns_edge_mqtt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_is_connected(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #endif #ifdef __cplusplus diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c index 9761a63..a543435 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c @@ -13,12 +13,25 @@ #if !defined(ENABLE_MQTT) #error "This file can be built with Paho MQTT library." #endif +#define DEFAULT_SUB_TIMEOUT 1000000 /** 1 second */ #include #include #include "nnstreamer-edge-common.h" #include "nnstreamer-edge-internal.h" +/** + * @brief Data structure for mqtt broker handle. + */ +typedef struct +{ + void *mqtt_h; + GAsyncQueue *server_list; + GMutex mqtt_mutex; + GCond mqtt_gcond; + gboolean mqtt_is_connected; +} nns_edge_broker_s; + /** * @brief Callback function to be called when the connection is lost. */ @@ -26,15 +39,22 @@ static void mqtt_cb_connection_lost (void *context, char *cause) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; eh = (nns_edge_handle_s *) context; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return; } + bh = (nns_edge_broker_s *) eh->broker_h; nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause); + g_mutex_lock (&bh->mqtt_mutex); + bh->mqtt_is_connected = FALSE; + g_cond_broadcast (&bh->mqtt_gcond); + g_mutex_unlock (&bh->mqtt_mutex); + if (eh->event_cb) { /** @todo send new event (MQTT disconnected) */ } @@ -47,16 +67,23 @@ static void mqtt_cb_connection_success (void *context, MQTTAsync_successData * response) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; UNUSED (response); eh = (nns_edge_handle_s *) context; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return; } - nns_edge_logi ("MQTT connection is completed (ID:%s).", eh->id); + bh = (nns_edge_broker_s *) eh->broker_h; + + g_mutex_lock (&bh->mqtt_mutex); + bh->mqtt_is_connected = TRUE; + g_cond_broadcast (&bh->mqtt_gcond); + g_mutex_unlock (&bh->mqtt_mutex); + if (eh->event_cb) { /** @todo send new event (MQTT connected) */ } @@ -69,16 +96,24 @@ static void mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; UNUSED (response); eh = (nns_edge_handle_s *) context; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return; } + bh = (nns_edge_broker_s *) eh->broker_h; + nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id); + g_mutex_lock (&bh->mqtt_mutex); + bh->mqtt_is_connected = FALSE; + g_cond_broadcast (&bh->mqtt_gcond); + g_mutex_unlock (&bh->mqtt_mutex); + if (eh->event_cb) { /** @todo send new event (MQTT connection failure) */ } @@ -91,16 +126,24 @@ static void mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; UNUSED (response); eh = (nns_edge_handle_s *) context; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return; } + bh = (nns_edge_broker_s *) eh->broker_h; + nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id); + g_mutex_lock (&bh->mqtt_mutex); + bh->mqtt_is_connected = FALSE; + g_cond_broadcast (&bh->mqtt_gcond); + g_mutex_unlock (&bh->mqtt_mutex); + if (eh->event_cb) { /** @todo send new event (MQTT disconnected) */ } @@ -137,19 +180,33 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, MQTTAsync_message * message) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + char *msg = NULL; UNUSED (topic); UNUSED (topic_len); UNUSED (message); eh = (nns_edge_handle_s *) context; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return TRUE; } + if (0 >= message->payloadlen) { + nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen); + return TRUE; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", eh->id, eh->topic); + + msg = (char *) malloc (message->payloadlen); + memcpy (msg, message->payload, message->payloadlen); + g_async_queue_push (bh->server_list, msg); + if (eh->event_cb) { /** @todo send new event (message arrived) */ } @@ -165,11 +222,13 @@ int nns_edge_mqtt_connect (nns_edge_h edge_h) { nns_edge_handle_s *eh; - MQTTAsync handle; + nns_edge_broker_s *bh; MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; + int ret = NNS_EDGE_ERROR_NONE; + int64_t end_time; + MQTTAsync handle; char *url; char *client_id; - int ret; eh = (nns_edge_handle_s *) edge_h; @@ -178,10 +237,13 @@ nns_edge_mqtt_connect (nns_edge_h edge_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->host, eh->port); + bh = (nns_edge_broker_s *) malloc (sizeof (nns_edge_broker_s)); + if (!bh) { + nns_edge_loge ("Failed to allocate memory for broker handle."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } - url = nns_edge_get_host_string (eh->host, eh->port); + url = nns_edge_strdup_printf ("%s:%d", eh->dest_host, eh->dest_port); client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); ret = MQTTAsync_create (&handle, url, client_id, @@ -192,6 +254,24 @@ nns_edge_mqtt_connect (nns_edge_h edge_h) goto error; } + g_cond_init (&bh->mqtt_gcond); + g_mutex_init (&bh->mqtt_mutex); + bh->mqtt_is_connected = FALSE; + bh->mqtt_h = handle; + bh->server_list = g_async_queue_new (); + eh->broker_h = bh; + + bh = (nns_edge_broker_s *) eh->broker_h; + if (!bh->mqtt_h) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + ret = NNS_EDGE_ERROR_IO; + goto error; + } + handle = bh->mqtt_h; + + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->dest_host, eh->dest_port); + MQTTAsync_setCallbacks (handle, edge_h, mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL); @@ -203,17 +283,26 @@ nns_edge_mqtt_connect (nns_edge_h edge_h) if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to connect MQTT."); - MQTTAsync_destroy (&handle); ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; goto error; } - eh->mqtt_handle = handle; - ret = NNS_EDGE_ERROR_NONE; + /* Waiting for the connection */ + end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND; + g_mutex_lock (&bh->mqtt_mutex); + while (!bh->mqtt_is_connected) { + if (!g_cond_wait_until (&bh->mqtt_gcond, &bh->mqtt_mutex, end_time)) { + g_mutex_unlock (&bh->mqtt_mutex); + nns_edge_loge ("Failed to connect to MQTT broker." + "Please check broker is running status or broker host address."); + goto error; + } + } + g_mutex_unlock (&bh->mqtt_mutex); + return NNS_EDGE_ERROR_NONE; error: - nns_edge_free (url); - nns_edge_free (client_id); + nns_edge_mqtt_close (eh); return ret; } @@ -225,39 +314,54 @@ int nns_edge_mqtt_close (nns_edge_h edge_h) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; MQTTAsync handle; MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; + char *msg; eh = (nns_edge_handle_s *) edge_h; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - handle = eh->mqtt_handle; + bh = (nns_edge_broker_s *) eh->broker_h; - if (!handle) { + if (!bh->mqtt_h) { nns_edge_loge ("Invalid state, MQTT connection was not completed."); - return NNS_EDGE_ERROR_IO; + return NNS_EDGE_ERROR_INVALID_PARAMETER; } + handle = bh->mqtt_h; nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->host, eh->port); + eh->id, eh->dest_host, eh->dest_port); options.onSuccess = mqtt_cb_disconnection_success; options.onFailure = mqtt_cb_disconnection_failure; options.context = edge_h; + /** Clear retained message */ + MQTTAsync_send (handle, eh->topic, 0, NULL, 1, 1, NULL); + while (MQTTAsync_isConnected (handle)) { if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to disconnect MQTT."); return NNS_EDGE_ERROR_IO; } + g_usleep (10000); } + g_cond_clear (&bh->mqtt_gcond); + g_mutex_clear (&bh->mqtt_mutex); MQTTAsync_destroy (&handle); - eh->mqtt_handle = NULL; + + while ((msg = g_async_queue_try_pop (bh->server_list))) { + SAFE_FREE (msg); + } + g_async_queue_unref (bh->server_list); + bh->server_list = NULL; + SAFE_FREE (bh); return NNS_EDGE_ERROR_NONE; } @@ -270,12 +374,13 @@ int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; MQTTAsync handle; int ret; eh = (nns_edge_handle_s *) edge_h; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -285,10 +390,15 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - handle = eh->mqtt_handle; - - if (!handle || !MQTTAsync_isConnected (handle)) { + bh = (nns_edge_broker_s *) eh->broker_h; + if (!bh->mqtt_h) { nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + handle = bh->mqtt_h; + + if (!MQTTAsync_isConnected (handle)) { + nns_edge_loge ("Failed to publish message, MQTT is not connected."); return NNS_EDGE_ERROR_IO; } @@ -311,19 +421,25 @@ int nns_edge_mqtt_subscribe (nns_edge_h edge_h) { nns_edge_handle_s *eh; + nns_edge_broker_s *bh; MQTTAsync handle; int ret; eh = (nns_edge_handle_s *) edge_h; - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { nns_edge_loge ("Invalid param, given edge handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - handle = eh->mqtt_handle; + bh = (nns_edge_broker_s *) eh->broker_h; + if (!bh->mqtt_h) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + handle = bh->mqtt_h; - if (!handle || !MQTTAsync_isConnected (handle)) { + if (!MQTTAsync_isConnected (handle)) { nns_edge_loge ("Invalid state, MQTT connection was not completed."); return NNS_EDGE_ERROR_IO; } @@ -338,3 +454,60 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h) return NNS_EDGE_ERROR_NONE; } + +/** + * @brief Check mqtt connection + */ +bool +nns_edge_mqtt_is_connected (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + MQTTAsync handle; + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return false; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + if (!bh->mqtt_h) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return false; + } + handle = bh->mqtt_h; + + if (MQTTAsync_isConnected (handle)) { + return true; + } + + return false; +} + +/** + * @brief Get message from mqtt broker. + */ +int +nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) +{ + nns_edge_handle_s *eh; + nns_edge_broker_s *bh; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + bh = (nns_edge_broker_s *) eh->broker_h; + + *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT); + if (!*msg) { + nns_edge_loge ("Failed to get message from mqtt broker within timeout"); + return NNS_EDGE_ERROR_UNKNOWN; + } + + return NNS_EDGE_ERROR_NONE; +} -- 2.34.1