From: Jaeyun Date: Mon, 21 Nov 2022 10:20:01 +0000 (+0900) Subject: [CodeClean/MQTT] broker handle X-Git-Tag: accepted/tizen/unified/20221226.070011~8 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=eda1604942df827ed32908b188f61186d5398997;p=platform%2Fupstream%2Fnnstreamer-edge.git [CodeClean/MQTT] broker handle Code clean, add broker handle and revise MQTT functions. This will remove dependency to edge-handle in MQTT impl. Signed-off-by: Jaeyun --- diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 831f217..cb65f59 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -1269,7 +1269,8 @@ nns_edge_start (nns_edge_h edge_h) topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/", eh->id, eh->topic); - ret = nns_edge_mqtt_connect (eh, topic); + ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port, + &eh->broker_h); SAFE_FREE (topic); if (NNS_EDGE_ERROR_NONE != ret) { @@ -1280,7 +1281,7 @@ nns_edge_start (nns_edge_h edge_h) msg = nns_edge_get_host_string (eh->host, eh->port); - ret = nns_edge_mqtt_publish (eh, msg, strlen (msg) + 1); + ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1); SAFE_FREE (msg); if (NNS_EDGE_ERROR_NONE != ret) { @@ -1337,7 +1338,7 @@ nns_edge_release_handle (nns_edge_h edge_h) switch (eh->connect_type) { case NNS_EDGE_CONNECT_TYPE_HYBRID: - if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) { + if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) { nns_edge_logw ("Failed to close mqtt connection."); } eh->broker_h = NULL; @@ -1474,14 +1475,13 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) eh->dest_port = dest_port; if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { - char *topic, *msg = NULL; - char *server_ip = NULL; - int server_port; + char *topic; - if (!nns_edge_mqtt_is_connected (eh)) { + if (!nns_edge_mqtt_is_connected (eh->broker_h)) { topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic); - ret = nns_edge_mqtt_connect (eh, topic); + ret = nns_edge_mqtt_connect (eh->id, topic, dest_host, dest_port, + &eh->broker_h); SAFE_FREE (topic); if (NNS_EDGE_ERROR_NONE != ret) { @@ -1489,14 +1489,22 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) goto done; } - ret = nns_edge_mqtt_subscribe (eh); + ret = nns_edge_mqtt_subscribe (eh->broker_h); if (NNS_EDGE_ERROR_NONE != ret) { nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic); goto done; } } - while ((ret = nns_edge_mqtt_get_message (eh, &msg)) == NNS_EDGE_ERROR_NONE) { + do { + char *msg = NULL; + char *server_ip = NULL; + int server_port = 0; + + ret = nns_edge_mqtt_get_message (eh->broker_h, &msg); + if (ret != NNS_EDGE_ERROR_NONE || !msg) + break; + nns_edge_parse_host_string (msg, &server_ip, &server_port); SAFE_FREE (msg); @@ -1509,7 +1517,7 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) if (NNS_EDGE_ERROR_NONE == ret) { break; } - } + } while (TRUE); } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) { ret = nns_edge_aitt_connect (eh); if (ret != NNS_EDGE_ERROR_NONE) { diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c index 921ee79..235b302 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -27,7 +27,10 @@ typedef struct { void *mqtt_h; nns_edge_queue_h server_list; + char *id; char *topic; + char *host; + int port; bool connected; } nns_edge_broker_s; @@ -65,7 +68,8 @@ on_message_callback (struct mosquitto *client, void *data, * @brief Initializes MQTT object. */ static int -_nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic) +_nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host, + const int port, nns_edge_broker_h * broker_h) { nns_edge_broker_s *bh; int mret; @@ -73,8 +77,7 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic) struct mosquitto *handle; int ver = MQTT_PROTOCOL_V311; /** @todo check mqtt version (TizenRT repo) */ - nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port); bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s)); if (!bh) { @@ -83,7 +86,7 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic) } mosquitto_lib_init (); - client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); + client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ()); handle = mosquitto_new (client_id, TRUE, NULL); SAFE_FREE (client_id); @@ -109,7 +112,7 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic) goto error; } - mret = mosquitto_connect (handle, eh->dest_host, eh->dest_port, 60); + mret = mosquitto_connect (handle, host, port, 60); if (mret != MOSQ_ERR_SUCCESS) { nns_edge_loge ("Failed to connect MQTT."); goto error; @@ -117,10 +120,13 @@ _nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic) nns_edge_queue_create (&bh->server_list); bh->mqtt_h = handle; + bh->id = nns_edge_strdup (id); bh->topic = nns_edge_strdup (topic); + bh->host = nns_edge_strdup (host); + bh->port = port; bh->connected = true; - eh->broker_h = bh; + *broker_h = bh; return NNS_EDGE_ERROR_NONE; error: @@ -136,24 +142,37 @@ error: * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) +nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, + const int port, nns_edge_broker_h * broker_h) { - nns_edge_handle_s *eh; int ret = NNS_EDGE_ERROR_NONE; + if (!STR_IS_VALID (id)) { + nns_edge_loge ("Invalid param, given id is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + if (!STR_IS_VALID (topic)) { nns_edge_loge ("Invalid param, given topic is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - eh = (nns_edge_handle_s *) edge_h; + if (!STR_IS_VALID (host)) { + nns_edge_loge ("Invalid param, given host is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!PORT_IS_VALID (port)) { + nns_edge_loge ("Invalid param, given port is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - ret = _nns_edge_mqtt_init_client (eh, topic); + if (!broker_h) { + nns_edge_loge ("Invalid param, mqtt_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = _nns_edge_mqtt_init_client (id, topic, host, port, broker_h); if (NNS_EDGE_ERROR_NONE != ret) nns_edge_loge ("Failed to initialize the MQTT client object."); @@ -165,25 +184,22 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_close (nns_edge_h edge_h) +nns_edge_mqtt_close (nns_edge_broker_h broker_h) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; struct mosquitto *handle; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; handle = bh->mqtt_h; if (handle) { nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); + bh->id, bh->host, bh->port); /* Clear retained message */ mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true); @@ -195,9 +211,10 @@ nns_edge_mqtt_close (nns_edge_h edge_h) nns_edge_queue_destroy (bh->server_list); bh->server_list = NULL; + SAFE_FREE (bh->id); SAFE_FREE (bh->topic); + SAFE_FREE (bh->host); SAFE_FREE (bh); - eh->broker_h = NULL; return NNS_EDGE_ERROR_NONE; } @@ -207,17 +224,15 @@ nns_edge_mqtt_close (nns_edge_h edge_h) * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) +nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, + const int length) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; struct mosquitto *handle; int ret; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -226,7 +241,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; handle = bh->mqtt_h; if (!handle) { @@ -243,7 +258,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) ret = mosquitto_publish (handle, NULL, bh->topic, length, data, 1, true); if (MOSQ_ERR_SUCCESS != ret) { nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", - eh->id, eh->topic); + bh->id, bh->topic); return NNS_EDGE_ERROR_IO; } @@ -255,21 +270,18 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_subscribe (nns_edge_h edge_h) +nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; void *handle; int ret; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; handle = bh->mqtt_h; if (!handle) { @@ -286,7 +298,7 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h) ret = mosquitto_subscribe (handle, NULL, bh->topic, 1); if (MOSQ_ERR_SUCCESS != ret) { nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", - eh->id, eh->topic); + bh->id, bh->topic); return NNS_EDGE_ERROR_IO; } @@ -297,15 +309,12 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h) * @brief Get message from mqtt broker. */ int -nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) +nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -314,7 +323,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; /* Wait for 1 second */ if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { @@ -329,19 +338,16 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) * @brief Check mqtt connection */ bool -nns_edge_mqtt_is_connected (nns_edge_h edge_h) +nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return false; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; return bh->connected; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c index 49c23ff..b45dde1 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c @@ -27,7 +27,10 @@ typedef struct { void *mqtt_h; nns_edge_queue_h server_list; + char *id; char *topic; + char *host; + int port; } nns_edge_broker_s; /** @@ -38,28 +41,25 @@ static int mqtt_cb_message_arrived (void *context, char *topic, int topic_len, MQTTAsync_message * message) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; char *msg = NULL; UNUSED (topic); UNUSED (topic_len); - eh = (nns_edge_handle_s *) context; + bh = (nns_edge_broker_s *) context; - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!bh) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return TRUE; } if (0 >= message->payloadlen) { - nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen); + nns_edge_logw ("Invalid payload length: %d", message->payloadlen); return TRUE; } - bh = (nns_edge_broker_s *) eh->broker_h; - nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", - eh->id, eh->topic); + bh->id, bh->topic); msg = nns_edge_memdup (message->payload, message->payloadlen); if (msg) @@ -73,9 +73,9 @@ mqtt_cb_message_arrived (void *context, char *topic, int topic_len, * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) +nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, + const int port, nns_edge_broker_h * broker_h) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; int ret = NNS_EDGE_ERROR_NONE; @@ -84,20 +84,32 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) char *client_id; unsigned int wait_count; + if (!STR_IS_VALID (id)) { + nns_edge_loge ("Invalid param, given id is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + if (!STR_IS_VALID (topic)) { nns_edge_loge ("Invalid param, given topic is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - eh = (nns_edge_handle_s *) edge_h; + if (!STR_IS_VALID (host)) { + nns_edge_loge ("Invalid param, given host is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!PORT_IS_VALID (port)) { + nns_edge_loge ("Invalid param, given port is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } - if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, mqtt_h should not be null."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port); bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s)); if (!bh) { @@ -105,8 +117,8 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) return NNS_EDGE_ERROR_OUT_OF_MEMORY; } - url = nns_edge_get_host_string (eh->dest_host, eh->dest_port); - client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); + url = nns_edge_get_host_string (host, port); + client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ()); ret = MQTTAsync_create (&handle, url, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL); @@ -119,16 +131,18 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) goto error; } + bh->id = nns_edge_strdup (id); bh->topic = nns_edge_strdup (topic); + bh->host = nns_edge_strdup (host); + bh->port = port; bh->mqtt_h = handle; nns_edge_queue_create (&bh->server_list); - eh->broker_h = bh; - MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL); + MQTTAsync_setCallbacks (handle, bh, NULL, mqtt_cb_message_arrived, NULL); options.cleansession = 1; options.keepAliveInterval = 6; - options.context = edge_h; + options.context = bh; if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to connect MQTT."); @@ -148,10 +162,11 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic) wait_count++; } while (!MQTTAsync_isConnected (handle)); + *broker_h = bh; return NNS_EDGE_ERROR_NONE; error: - nns_edge_mqtt_close (eh); + nns_edge_mqtt_close (bh); return ret; } @@ -160,29 +175,26 @@ error: * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_close (nns_edge_h edge_h) +nns_edge_mqtt_close (nns_edge_broker_h broker_h) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; MQTTAsync handle; MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; unsigned int wait_count; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; handle = bh->mqtt_h; if (handle) { nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", - eh->id, eh->dest_host, eh->dest_port); + bh->id, bh->host, bh->port); - options.context = edge_h; + options.context = bh; /* Clear retained message */ MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL); @@ -209,10 +221,11 @@ nns_edge_mqtt_close (nns_edge_h edge_h) nns_edge_queue_destroy (bh->server_list); bh->server_list = NULL; + SAFE_FREE (bh->id); SAFE_FREE (bh->topic); + SAFE_FREE (bh->host); SAFE_FREE (bh); - eh->broker_h = NULL; return NNS_EDGE_ERROR_NONE; } @@ -221,17 +234,15 @@ nns_edge_mqtt_close (nns_edge_h edge_h) * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) +nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, + const int length) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; MQTTAsync handle; int ret; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -240,7 +251,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; handle = bh->mqtt_h; if (!handle) { @@ -257,7 +268,7 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL); if (ret != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", - eh->id, eh->topic); + bh->id, bh->topic); return NNS_EDGE_ERROR_IO; } @@ -269,21 +280,18 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ int -nns_edge_mqtt_subscribe (nns_edge_h edge_h) +nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; MQTTAsync handle; int ret; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; handle = bh->mqtt_h; if (!handle) { @@ -300,7 +308,7 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h) ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL); if (ret != MQTTASYNC_SUCCESS) { nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", - eh->id, eh->topic); + bh->id, bh->topic); return NNS_EDGE_ERROR_IO; } @@ -311,19 +319,17 @@ nns_edge_mqtt_subscribe (nns_edge_h edge_h) * @brief Check mqtt connection */ bool -nns_edge_mqtt_is_connected (nns_edge_h edge_h) +nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; MQTTAsync handle; - eh = (nns_edge_handle_s *) edge_h; - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return false; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; handle = bh->mqtt_h; if (!handle) { @@ -342,15 +348,12 @@ nns_edge_mqtt_is_connected (nns_edge_h edge_h) * @brief Get message from mqtt broker. */ int -nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) +nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg) { - nns_edge_handle_s *eh; nns_edge_broker_s *bh; - eh = (nns_edge_handle_s *) edge_h; - - if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { - nns_edge_loge ("Invalid param, given edge handle is invalid."); + if (!broker_h) { + nns_edge_loge ("Invalid param, given broker handle is invalid."); return NNS_EDGE_ERROR_INVALID_PARAMETER; } @@ -359,7 +362,7 @@ nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - bh = (nns_edge_broker_s *) eh->broker_h; + bh = (nns_edge_broker_s *) broker_h; /* Wait for 1 second */ if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) { diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h index 1253d1e..76cd8aa 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h @@ -20,49 +20,49 @@ extern "C" { #endif /* __cplusplus */ -typedef void *nns_edge_mqtt_h; +typedef void *nns_edge_broker_h; #if defined(ENABLE_MQTT) /** * @brief Connect to MQTT. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ -int nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic); +int nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, const int port, nns_edge_broker_h *broker_h); /** * @brief Close the connection to MQTT. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ -int nns_edge_mqtt_close (nns_edge_h edge_h); +int nns_edge_mqtt_close (nns_edge_broker_h broker_h); /** * @brief Publish raw data. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ -int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length); +int nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, const int length); /** * @brief Subscribe a topic. * @note This is internal function for MQTT broker. You should call this with edge-handle lock. */ -int nns_edge_mqtt_subscribe (nns_edge_h edge_h); +int nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h); /** * @brief Check mqtt connection */ -bool nns_edge_mqtt_is_connected (nns_edge_h edge_h); +bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h); /** * @brief Get message from mqtt broker. */ -int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg); +int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg); #else /** * @todo consider to change code style later. * If MQTT is disabled, nnstreamer does not include nnstreamer_edge_mqtt.c, and changing code style will make error as it is not used function now. * - * static int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) + * static int nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, const int length) * { * return NNS_EDGE_ERROR_NOT_SUPPORTED; * } diff --git a/tests/unittest_nnstreamer-edge.cc b/tests/unittest_nnstreamer-edge.cc index 8ebe257..5f1f575 100644 --- a/tests/unittest_nnstreamer-edge.cc +++ b/tests/unittest_nnstreamer-edge.cc @@ -3769,11 +3769,15 @@ TEST(edgeMqtt, connectLocal) TEST(edgeMqtt, connectInvalidParam1_n) { int ret = -1; + nns_edge_broker_h broker_h; if (!_check_mqtt_broker ()) return; - ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic"); + ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); + + ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } @@ -3783,22 +3787,16 @@ TEST(edgeMqtt, connectInvalidParam1_n) TEST(edgeMqtt, connectInvalidParam2_n) { int ret = -1; - nns_edge_h edge_h; + nns_edge_broker_h broker_h; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - - ret = nns_edge_mqtt_connect (edge_h, NULL); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_release_handle (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } /** @@ -3807,179 +3805,134 @@ TEST(edgeMqtt, connectInvalidParam2_n) TEST(edgeMqtt, connectInvalidParam3_n) { int ret = -1; - nns_edge_h edge_h; + nns_edge_broker_h broker_h; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - - ret = nns_edge_mqtt_connect (edge_h, ""); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_release_handle (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h); + EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } /** - * @brief Connect to the mqtt broker with invalid hostaddress. + * @brief Connect to the mqtt broker with invalid param. */ TEST(edgeMqtt, connectInvalidParam4_n) { int ret = -1; - nns_edge_h edge_h; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "tcp://none"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - - ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic"); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_release_handle (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } /** - * @brief Close the mqtt handle with invalid param. + * @brief Connect to the mqtt broker with invalid host address. */ -TEST(edgeMqtt, closeInvalidParam_n) +TEST(edgeMqtt, connectInvalidParam5_n) { int ret = -1; + nns_edge_broker_h broker_h; if (!_check_mqtt_broker ()) return; - ret = nns_edge_mqtt_close (NULL); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } /** - * @brief Close the mqtt handle before the connection. + * @brief Connect to the mqtt broker with invalid port number. */ -TEST(edgeMqtt, closeInvalidParam2_n) +TEST(edgeMqtt, connectInvalidParam6_n) { int ret = -1; - nns_edge_h edge_h; + nns_edge_broker_h broker_h; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_close (edge_h); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_release_handle (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } /** - * @brief Publish with invalid param. + * @brief Close the mqtt handle with invalid param. */ -TEST(edgeMqtt, publishInvalidParam_n) +TEST(edgeMqtt, closeInvalidParam_n) { int ret = -1; - const char *msg = "TEMP_MESSAGE"; if (!_check_mqtt_broker ()) return; - ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1); + ret = nns_edge_mqtt_close (NULL); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } /** * @brief Publish with invalid param. */ -TEST(edgeMqtt, publishInvalidParam2_n) +TEST(edgeMqtt, publishInvalidParam_n) { int ret = -1; - nns_edge_h edge_h; const char *msg = "TEMP_MESSAGE"; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - - ret = nns_edge_start (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - /* data is null */ - ret = nns_edge_mqtt_publish (edge_h, NULL, strlen (msg) + 1); + ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_release_handle (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } /** * @brief Publish with invalid param. */ -TEST(edgeMqtt, publishInvalidParam3_n) +TEST(edgeMqtt, publishInvalidParam2_n) { int ret = -1; - nns_edge_h edge_h; + nns_edge_broker_h broker_h; const char *msg = "TEMP_MESSAGE"; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - - ret = nns_edge_start (edge_h); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - /* data length is 0 */ - ret = nns_edge_mqtt_publish (edge_h, msg, 0); + /* data is null */ + ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_release_handle (edge_h); + ret = nns_edge_mqtt_close (broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } /** - * @brief Publish the message without the connection. + * @brief Publish with invalid param. */ -TEST(edgeMqtt, publishInvalidParam4_n) +TEST(edgeMqtt, publishInvalidParam3_n) { int ret = -1; - nns_edge_h edge_h; + nns_edge_broker_h broker_h; const char *msg = "TEMP_MESSAGE"; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - ret = nns_edge_mqtt_publish (edge_h, msg, strlen (msg) + 1); + /* data length is 0 */ + ret = nns_edge_mqtt_publish (broker_h, msg, 0); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_release_handle (edge_h); + ret = nns_edge_mqtt_close (broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } @@ -3997,28 +3950,6 @@ TEST(edgeMqtt, subscribeInvalidParam_n) EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } -/** - * @brief Subscribe the topic before the connection. - */ -TEST(edgeMqtt, subscribeInvalidParam2_n) -{ - int ret = -1; - nns_edge_h edge_h; - - if (!_check_mqtt_broker ()) - return; - - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_mqtt_subscribe (edge_h); - EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_release_handle (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); -} - /** * @brief Get message with invalid param. */ @@ -4040,24 +3971,18 @@ TEST(edgeMqtt, getMessageInvalidParam_n) TEST(edgeMqtt, getMessageInvalidParam2_n) { int ret = -1; - nns_edge_h edge_h; + nns_edge_broker_h broker_h; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - - ret = nns_edge_start (edge_h); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_get_message (edge_h, NULL); + ret = nns_edge_mqtt_get_message (broker_h, NULL); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_release_handle (edge_h); + ret = nns_edge_mqtt_close (broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } @@ -4067,28 +3992,19 @@ TEST(edgeMqtt, getMessageInvalidParam2_n) TEST(edgeMqtt, getMessageWithinTimeout_n) { int ret = -1; - nns_edge_h edge_h; + nns_edge_broker_h broker_h; char *msg = NULL; if (!_check_mqtt_broker ()) return; - ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID, - NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1"); - nns_edge_set_info (edge_h, "DEST_PORT", "1883"); - - ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic"); + ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_get_message (edge_h, &msg); + ret = nns_edge_mqtt_get_message (broker_h, &msg); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_close (edge_h); - EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - - ret = nns_edge_release_handle (edge_h); + ret = nns_edge_mqtt_close (broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); } #endif /* ENABLE_MQTT */