From ff38415af587d8250c3ee8927f2745015808a775 Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Thu, 11 Aug 2022 16:19:13 +0900 Subject: [PATCH] [MQTT] topic in broker handle 1. Do not replace topic string in edge handle (set topic in mqtt-handle). 2. Use util function to get/parse host string. 3. Fix res leak case when mqtt connection is failed. 4. Code clean, remove unnecessary conversion after allocating new memory for mqtt-handle. Signed-off-by: Jaeyun --- .../nnstreamer-edge-internal.c | 88 +++++++------- .../nnstreamer-edge-internal.h | 4 +- src/libnnstreamer-edge/nnstreamer-edge-mqtt.c | 108 +++++++----------- 3 files changed, 84 insertions(+), 116 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 08322b2..2802621 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -1052,34 +1052,33 @@ nns_edge_start (nns_edge_h edge_h) if (eh->flags & NNS_EDGE_FLAG_SERVER) { if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { - gchar *device, *topic, *msg; + char *topic, *msg; - if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) { + /** @todo Set unique device name. + * Device name should be unique. Consider using MAC address later. + * Now, use ID received from the user. + */ + topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/", + eh->id, eh->topic); + + ret = nns_edge_mqtt_connect (eh, topic); + SAFE_FREE (topic); + + if (NNS_EDGE_ERROR_NONE != ret) { nns_edge_loge ("Failed to start nnstreamer-edge. Connection failure to broker."); - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; goto error; } - /** @todo Set unique device name. - * Device name should be unique. Consider using MAC address later. - * Now, use ID received from the user. - */ - device = g_strdup_printf ("device-%s", eh->id); - topic = g_strdup_printf ("edge/inference/%s/%s/", device, eh->topic); - - g_free (device); - g_free (eh->topic); - eh->topic = topic; - msg = nns_edge_strdup_printf ("%s:%d", eh->host, eh->port); - - if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_publish (eh, msg, - strlen (msg) + 1)) { - nns_edge_loge ("Failed to publish the meesage: %s", msg); - ret = NNS_EDGE_ERROR_IO; + msg = nns_edge_get_host_string (eh->host, eh->port); + + ret = nns_edge_mqtt_publish (eh, msg, strlen (msg) + 1); + SAFE_FREE (msg); + + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to publish the meesage to broker."); goto error; } - nns_edge_free (msg); } } @@ -1210,8 +1209,6 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) { nns_edge_handle_s *eh; int ret; - char *server_ip = NULL; - int server_port; eh = (nns_edge_handle_s *) edge_h; if (!eh) { @@ -1248,54 +1245,51 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) eh->dest_port = dest_port; if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { - gchar *topic, *msg = NULL; + char *topic, *msg = NULL; + char *server_ip = NULL; + int server_port; if (!nns_edge_mqtt_is_connected (eh)) { - if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) { + topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic); + + ret = nns_edge_mqtt_connect (eh, topic); + SAFE_FREE (topic); + + if (NNS_EDGE_ERROR_NONE != ret) { nns_edge_loge ("Connection failure to broker."); nns_edge_unlock (eh); - return NNS_EDGE_ERROR_CONNECTION_FAILURE; + return ret; } - topic = g_strdup_printf ("edge/inference/+/%s/#", eh->topic); - g_free (eh->topic); - eh->topic = topic; - if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_subscribe (eh)) { + ret = nns_edge_mqtt_subscribe (eh); + if (NNS_EDGE_ERROR_NONE != ret) { nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic); nns_edge_unlock (eh); - return NNS_EDGE_ERROR_CONNECTION_FAILURE; + return ret; } } - ret = nns_edge_mqtt_get_message (eh, &msg); - while (NNS_EDGE_ERROR_NONE == ret) { - gchar **splits; - splits = g_strsplit (msg, ":", -1); - server_ip = g_strdup (splits[0]); - server_port = g_ascii_strtoull (splits[1], NULL, 10); + while ((ret = nns_edge_mqtt_get_message (eh, &msg)) == NNS_EDGE_ERROR_NONE) { + nns_edge_parse_host_string (msg, &server_ip, &server_port); + SAFE_FREE (msg); + nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip, server_port); - g_strfreev (splits); - g_free (msg); - ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port); + SAFE_FREE (server_ip); + if (NNS_EDGE_ERROR_NONE == ret) { break; } - SAFE_FREE (server_ip); - ret = nns_edge_mqtt_get_message (eh, &msg); } - } else { /** case for NNS_EDGE_CONNECT_TYPE_TCP == eh->protocol */ - server_ip = nns_edge_strdup (dest_host); - server_port = dest_port; - ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port); + } else { + ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port); if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to connect to %s:%d", server_ip, server_port); + nns_edge_loge ("Failed to connect to %s:%d", dest_host, dest_port); } } - SAFE_FREE (server_ip); nns_edge_unlock (eh); return ret; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.h b/src/libnnstreamer-edge/nnstreamer-edge-internal.h index c3bec0e..2346177 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.h @@ -57,7 +57,7 @@ typedef struct { * @brief Connect to MQTT. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ -int nns_edge_mqtt_connect (nns_edge_h edge_h); +int nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic); /** * @brief Close the connection to MQTT. @@ -101,7 +101,7 @@ int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg); #define nns_edge_mqtt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #define nns_edge_mqtt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) -#define nns_edge_mqtt_is_connected(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_is_connected(...) (false) #define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #endif diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c index a543435..1403061 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c @@ -29,7 +29,8 @@ typedef struct GAsyncQueue *server_list; GMutex mqtt_mutex; GCond mqtt_gcond; - gboolean mqtt_is_connected; + bool mqtt_is_connected; + char *topic; } nns_edge_broker_s; /** @@ -51,13 +52,9 @@ mqtt_cb_connection_lost (void *context, char *cause) bh = (nns_edge_broker_s *) eh->broker_h; nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause); g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = FALSE; + bh->mqtt_is_connected = false; g_cond_broadcast (&bh->mqtt_gcond); g_mutex_unlock (&bh->mqtt_mutex); - - if (eh->event_cb) { - /** @todo send new event (MQTT disconnected) */ - } } /** @@ -80,13 +77,9 @@ mqtt_cb_connection_success (void *context, MQTTAsync_successData * response) bh = (nns_edge_broker_s *) eh->broker_h; g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = TRUE; + bh->mqtt_is_connected = true; g_cond_broadcast (&bh->mqtt_gcond); g_mutex_unlock (&bh->mqtt_mutex); - - if (eh->event_cb) { - /** @todo send new event (MQTT connected) */ - } } /** @@ -110,13 +103,9 @@ mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response) nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id); g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = FALSE; + bh->mqtt_is_connected = false; g_cond_broadcast (&bh->mqtt_gcond); g_mutex_unlock (&bh->mqtt_mutex); - - if (eh->event_cb) { - /** @todo send new event (MQTT connection failure) */ - } } /** @@ -140,13 +129,9 @@ mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response) nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id); g_mutex_lock (&bh->mqtt_mutex); - bh->mqtt_is_connected = FALSE; + bh->mqtt_is_connected = false; g_cond_broadcast (&bh->mqtt_gcond); g_mutex_unlock (&bh->mqtt_mutex); - - if (eh->event_cb) { - /** @todo send new event (MQTT disconnected) */ - } } /** @@ -166,9 +151,6 @@ mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response) } nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id); - if (eh->event_cb) { - /** @todo send new event (MQTT disconnection failure) */ - } } /** @@ -203,14 +185,9 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", eh->id, eh->topic); - msg = (char *) malloc (message->payloadlen); - memcpy (msg, message->payload, message->payloadlen); + msg = nns_edge_memdup (message->payload, message->payloadlen); g_async_queue_push (bh->server_list, msg); - if (eh->event_cb) { - /** @todo send new event (message arrived) */ - } - return TRUE; } @@ -219,7 +196,7 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_connect (nns_edge_h edge_h) +nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) { nns_edge_handle_s *eh; nns_edge_broker_s *bh; @@ -237,17 +214,23 @@ nns_edge_mqtt_connect (nns_edge_h edge_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) malloc (sizeof (nns_edge_broker_s)); + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->dest_host, eh->dest_port); + + bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s)); if (!bh) { nns_edge_loge ("Failed to allocate memory for broker handle."); return NNS_EDGE_ERROR_OUT_OF_MEMORY; } - url = nns_edge_strdup_printf ("%s:%d", eh->dest_host, eh->dest_port); + url = nns_edge_get_host_string (eh->dest_host, eh->dest_port); client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); ret = MQTTAsync_create (&handle, url, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL); + SAFE_FREE (url); + SAFE_FREE (client_id); + if (MQTTASYNC_SUCCESS != ret) { nns_edge_loge ("Failed to create MQTT handle."); ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; @@ -256,22 +239,12 @@ nns_edge_mqtt_connect (nns_edge_h edge_h) g_cond_init (&bh->mqtt_gcond); g_mutex_init (&bh->mqtt_mutex); - bh->mqtt_is_connected = FALSE; + bh->topic = nns_edge_strdup (topic); + bh->mqtt_is_connected = false; bh->mqtt_h = handle; bh->server_list = g_async_queue_new (); eh->broker_h = bh; - bh = (nns_edge_broker_s *) eh->broker_h; - if (!bh->mqtt_h) { - nns_edge_loge ("Invalid state, MQTT connection was not completed."); - ret = NNS_EDGE_ERROR_IO; - goto error; - } - handle = bh->mqtt_h; - - nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); - MQTTAsync_setCallbacks (handle, edge_h, mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL); @@ -327,42 +300,43 @@ nns_edge_mqtt_close (nns_edge_h edge_h) } bh = (nns_edge_broker_s *) eh->broker_h; - - if (!bh->mqtt_h) { - nns_edge_loge ("Invalid state, MQTT connection was not completed."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } handle = bh->mqtt_h; - nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); + if (handle) { + nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->dest_host, eh->dest_port); - options.onSuccess = mqtt_cb_disconnection_success; - options.onFailure = mqtt_cb_disconnection_failure; - options.context = edge_h; + options.onSuccess = mqtt_cb_disconnection_success; + options.onFailure = mqtt_cb_disconnection_failure; + options.context = edge_h; - /** Clear retained message */ - MQTTAsync_send (handle, eh->topic, 0, NULL, 1, 1, NULL); + /* Clear retained message */ + MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL); - while (MQTTAsync_isConnected (handle)) { - if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { - nns_edge_loge ("Failed to disconnect MQTT."); - return NNS_EDGE_ERROR_IO; + while (MQTTAsync_isConnected (handle)) { + if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to disconnect MQTT."); + return NNS_EDGE_ERROR_IO; + } + g_usleep (10000); } - g_usleep (10000); + + MQTTAsync_destroy (&handle); } + g_cond_clear (&bh->mqtt_gcond); g_mutex_clear (&bh->mqtt_mutex); - MQTTAsync_destroy (&handle); - while ((msg = g_async_queue_try_pop (bh->server_list))) { SAFE_FREE (msg); } g_async_queue_unref (bh->server_list); bh->server_list = NULL; + + SAFE_FREE (bh->topic); SAFE_FREE (bh); + eh->broker_h = NULL; return NNS_EDGE_ERROR_NONE; } @@ -403,7 +377,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) } /* Publish a message (default QoS 1 - at least once and retained true). */ - ret = MQTTAsync_send (handle, eh->topic, length, data, 1, 1, NULL); + ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL); if (ret != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", eh->id, eh->topic); @@ -445,7 +419,7 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h) } /* Subscribe a topic (default QoS 1 - at least once). */ - ret = MQTTAsync_subscribe (handle, eh->topic, 1, NULL); + ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL); if (ret != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", eh->id, eh->topic); @@ -505,7 +479,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT); if (!*msg) { - nns_edge_loge ("Failed to get message from mqtt broker within timeout"); + nns_edge_loge ("Failed to get message from mqtt broker within timeout."); return NNS_EDGE_ERROR_UNKNOWN; } -- 2.34.1