Code clean, add broker handle and revise MQTT functions.
This will remove dependency to edge-handle in MQTT impl.
Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/",
eh->id, eh->topic);
- ret = nns_edge_mqtt_connect (eh, topic);
+ ret = nns_edge_mqtt_connect (eh->id, topic, eh->dest_host, eh->dest_port,
+ &eh->broker_h);
SAFE_FREE (topic);
if (NNS_EDGE_ERROR_NONE != ret) {
msg = nns_edge_get_host_string (eh->host, eh->port);
- ret = nns_edge_mqtt_publish (eh, msg, strlen (msg) + 1);
+ ret = nns_edge_mqtt_publish (eh->broker_h, msg, strlen (msg) + 1);
SAFE_FREE (msg);
if (NNS_EDGE_ERROR_NONE != ret) {
switch (eh->connect_type) {
case NNS_EDGE_CONNECT_TYPE_HYBRID:
- if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
+ if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh->broker_h)) {
nns_edge_logw ("Failed to close mqtt connection.");
}
eh->broker_h = NULL;
eh->dest_port = dest_port;
if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
- char *topic, *msg = NULL;
- char *server_ip = NULL;
- int server_port;
+ char *topic;
- if (!nns_edge_mqtt_is_connected (eh)) {
+ if (!nns_edge_mqtt_is_connected (eh->broker_h)) {
topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
- ret = nns_edge_mqtt_connect (eh, topic);
+ ret = nns_edge_mqtt_connect (eh->id, topic, dest_host, dest_port,
+ &eh->broker_h);
SAFE_FREE (topic);
if (NNS_EDGE_ERROR_NONE != ret) {
goto done;
}
- ret = nns_edge_mqtt_subscribe (eh);
+ ret = nns_edge_mqtt_subscribe (eh->broker_h);
if (NNS_EDGE_ERROR_NONE != ret) {
nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
goto done;
}
}
- while ((ret = nns_edge_mqtt_get_message (eh, &msg)) == NNS_EDGE_ERROR_NONE) {
+ do {
+ char *msg = NULL;
+ char *server_ip = NULL;
+ int server_port = 0;
+
+ ret = nns_edge_mqtt_get_message (eh->broker_h, &msg);
+ if (ret != NNS_EDGE_ERROR_NONE || !msg)
+ break;
+
nns_edge_parse_host_string (msg, &server_ip, &server_port);
SAFE_FREE (msg);
if (NNS_EDGE_ERROR_NONE == ret) {
break;
}
- }
+ } while (TRUE);
} else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
ret = nns_edge_aitt_connect (eh);
if (ret != NNS_EDGE_ERROR_NONE) {
{
void *mqtt_h;
nns_edge_queue_h server_list;
+ char *id;
char *topic;
+ char *host;
+ int port;
bool connected;
} nns_edge_broker_s;
* @brief Initializes MQTT object.
*/
static int
-_nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic)
+_nns_edge_mqtt_init_client (const char *id, const char *topic, const char *host,
+ const int port, nns_edge_broker_h * broker_h)
{
nns_edge_broker_s *bh;
int mret;
struct mosquitto *handle;
int ver = MQTT_PROTOCOL_V311; /** @todo check mqtt version (TizenRT repo) */
- nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
+ nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port);
bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
if (!bh) {
}
mosquitto_lib_init ();
- client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+ client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ());
handle = mosquitto_new (client_id, TRUE, NULL);
SAFE_FREE (client_id);
goto error;
}
- mret = mosquitto_connect (handle, eh->dest_host, eh->dest_port, 60);
+ mret = mosquitto_connect (handle, host, port, 60);
if (mret != MOSQ_ERR_SUCCESS) {
nns_edge_loge ("Failed to connect MQTT.");
goto error;
nns_edge_queue_create (&bh->server_list);
bh->mqtt_h = handle;
+ bh->id = nns_edge_strdup (id);
bh->topic = nns_edge_strdup (topic);
+ bh->host = nns_edge_strdup (host);
+ bh->port = port;
bh->connected = true;
- eh->broker_h = bh;
+ *broker_h = bh;
return NNS_EDGE_ERROR_NONE;
error:
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
+ const int port, nns_edge_broker_h * broker_h)
{
- nns_edge_handle_s *eh;
int ret = NNS_EDGE_ERROR_NONE;
+ if (!STR_IS_VALID (id)) {
+ nns_edge_loge ("Invalid param, given id is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
if (!STR_IS_VALID (topic)) {
nns_edge_loge ("Invalid param, given topic is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- eh = (nns_edge_handle_s *) edge_h;
+ if (!STR_IS_VALID (host)) {
+ nns_edge_loge ("Invalid param, given host is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!PORT_IS_VALID (port)) {
+ nns_edge_loge ("Invalid param, given port is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- ret = _nns_edge_mqtt_init_client (eh, topic);
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, mqtt_h should not be null.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ ret = _nns_edge_mqtt_init_client (id, topic, host, port, broker_h);
if (NNS_EDGE_ERROR_NONE != ret)
nns_edge_loge ("Failed to initialize the MQTT client object.");
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_close (nns_edge_h edge_h)
+nns_edge_mqtt_close (nns_edge_broker_h broker_h)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
struct mosquitto *handle;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
handle = bh->mqtt_h;
if (handle) {
nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
+ bh->id, bh->host, bh->port);
/* Clear retained message */
mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
nns_edge_queue_destroy (bh->server_list);
bh->server_list = NULL;
+ SAFE_FREE (bh->id);
SAFE_FREE (bh->topic);
+ SAFE_FREE (bh->host);
SAFE_FREE (bh);
- eh->broker_h = NULL;
return NNS_EDGE_ERROR_NONE;
}
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data,
+ const int length)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
struct mosquitto *handle;
int ret;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
handle = bh->mqtt_h;
if (!handle) {
ret = mosquitto_publish (handle, NULL, bh->topic, length, data, 1, true);
if (MOSQ_ERR_SUCCESS != ret) {
nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
- eh->id, eh->topic);
+ bh->id, bh->topic);
return NNS_EDGE_ERROR_IO;
}
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
void *handle;
int ret;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
handle = bh->mqtt_h;
if (!handle) {
ret = mosquitto_subscribe (handle, NULL, bh->topic, 1);
if (MOSQ_ERR_SUCCESS != ret) {
nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
- eh->id, eh->topic);
+ bh->id, bh->topic);
return NNS_EDGE_ERROR_IO;
}
* @brief Get message from mqtt broker.
*/
int
-nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
/* Wait for 1 second */
if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
* @brief Check mqtt connection
*/
bool
-nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return false;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
return bh->connected;
}
{
void *mqtt_h;
nns_edge_queue_h server_list;
+ char *id;
char *topic;
+ char *host;
+ int port;
} nns_edge_broker_s;
/**
mqtt_cb_message_arrived (void *context, char *topic, int topic_len,
MQTTAsync_message * message)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
char *msg = NULL;
UNUSED (topic);
UNUSED (topic_len);
- eh = (nns_edge_handle_s *) context;
+ bh = (nns_edge_broker_s *) context;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!bh) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return TRUE;
}
if (0 >= message->payloadlen) {
- nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
+ nns_edge_logw ("Invalid payload length: %d", message->payloadlen);
return TRUE;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
-
nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
- eh->id, eh->topic);
+ bh->id, bh->topic);
msg = nns_edge_memdup (message->payload, message->payloadlen);
if (msg)
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
+nns_edge_mqtt_connect (const char *id, const char *topic, const char *host,
+ const int port, nns_edge_broker_h * broker_h)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
int ret = NNS_EDGE_ERROR_NONE;
char *client_id;
unsigned int wait_count;
+ if (!STR_IS_VALID (id)) {
+ nns_edge_loge ("Invalid param, given id is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
if (!STR_IS_VALID (topic)) {
nns_edge_loge ("Invalid param, given topic is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- eh = (nns_edge_handle_s *) edge_h;
+ if (!STR_IS_VALID (host)) {
+ nns_edge_loge ("Invalid param, given host is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!PORT_IS_VALID (port)) {
+ nns_edge_loge ("Invalid param, given port is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, mqtt_h should not be null.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
+ nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", id, host, port);
bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
if (!bh) {
return NNS_EDGE_ERROR_OUT_OF_MEMORY;
}
- url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
- client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+ url = nns_edge_get_host_string (host, port);
+ client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", id, getpid ());
ret = MQTTAsync_create (&handle, url, client_id,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
goto error;
}
+ bh->id = nns_edge_strdup (id);
bh->topic = nns_edge_strdup (topic);
+ bh->host = nns_edge_strdup (host);
+ bh->port = port;
bh->mqtt_h = handle;
nns_edge_queue_create (&bh->server_list);
- eh->broker_h = bh;
- MQTTAsync_setCallbacks (handle, edge_h, NULL, mqtt_cb_message_arrived, NULL);
+ MQTTAsync_setCallbacks (handle, bh, NULL, mqtt_cb_message_arrived, NULL);
options.cleansession = 1;
options.keepAliveInterval = 6;
- options.context = edge_h;
+ options.context = bh;
if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to connect MQTT.");
wait_count++;
} while (!MQTTAsync_isConnected (handle));
+ *broker_h = bh;
return NNS_EDGE_ERROR_NONE;
error:
- nns_edge_mqtt_close (eh);
+ nns_edge_mqtt_close (bh);
return ret;
}
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_close (nns_edge_h edge_h)
+nns_edge_mqtt_close (nns_edge_broker_h broker_h)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
MQTTAsync handle;
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
unsigned int wait_count;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
handle = bh->mqtt_h;
if (handle) {
nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
+ bh->id, bh->host, bh->port);
- options.context = edge_h;
+ options.context = bh;
/* Clear retained message */
MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
nns_edge_queue_destroy (bh->server_list);
bh->server_list = NULL;
+ SAFE_FREE (bh->id);
SAFE_FREE (bh->topic);
+ SAFE_FREE (bh->host);
SAFE_FREE (bh);
- eh->broker_h = NULL;
return NNS_EDGE_ERROR_NONE;
}
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data,
+ const int length)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
MQTTAsync handle;
int ret;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
handle = bh->mqtt_h;
if (!handle) {
ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
if (ret != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
- eh->id, eh->topic);
+ bh->id, bh->topic);
return NNS_EDGE_ERROR_IO;
}
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_subscribe (nns_edge_h edge_h)
+nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
MQTTAsync handle;
int ret;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
handle = bh->mqtt_h;
if (!handle) {
ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
if (ret != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
- eh->id, eh->topic);
+ bh->id, bh->topic);
return NNS_EDGE_ERROR_IO;
}
* @brief Check mqtt connection
*/
bool
-nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
MQTTAsync handle;
- eh = (nns_edge_handle_s *) edge_h;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return false;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
handle = bh->mqtt_h;
if (!handle) {
* @brief Get message from mqtt broker.
*/
int
-nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg)
{
- nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
- eh = (nns_edge_handle_s *) edge_h;
-
- if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ if (!broker_h) {
+ nns_edge_loge ("Invalid param, given broker handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) eh->broker_h;
+ bh = (nns_edge_broker_s *) broker_h;
/* Wait for 1 second */
if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
extern "C" {
#endif /* __cplusplus */
-typedef void *nns_edge_mqtt_h;
+typedef void *nns_edge_broker_h;
#if defined(ENABLE_MQTT)
/**
* @brief Connect to MQTT.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
-int nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic);
+int nns_edge_mqtt_connect (const char *id, const char *topic, const char *host, const int port, nns_edge_broker_h *broker_h);
/**
* @brief Close the connection to MQTT.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
-int nns_edge_mqtt_close (nns_edge_h edge_h);
+int nns_edge_mqtt_close (nns_edge_broker_h broker_h);
/**
* @brief Publish raw data.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
-int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length);
+int nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, const int length);
/**
* @brief Subscribe a topic.
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
-int nns_edge_mqtt_subscribe (nns_edge_h edge_h);
+int nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h);
/**
* @brief Check mqtt connection
*/
-bool nns_edge_mqtt_is_connected (nns_edge_h edge_h);
+bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h);
/**
* @brief Get message from mqtt broker.
*/
-int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg);
+int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, char **msg);
#else
/**
* @todo consider to change code style later.
* If MQTT is disabled, nnstreamer does not include nnstreamer_edge_mqtt.c, and changing code style will make error as it is not used function now.
*
- * static int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
+ * static int nns_edge_mqtt_publish (nns_edge_broker_h broker_h, const void *data, const int length)
* {
* return NNS_EDGE_ERROR_NOT_SUPPORTED;
* }
TEST(edgeMqtt, connectInvalidParam1_n)
{
int ret = -1;
+ nns_edge_broker_h broker_h;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic");
+ ret = nns_edge_mqtt_connect (NULL, "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
+
+ ret = nns_edge_mqtt_connect ("", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
}
TEST(edgeMqtt, connectInvalidParam2_n)
{
int ret = -1;
- nns_edge_h edge_h;
+ nns_edge_broker_h broker_h;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
- ret = nns_edge_mqtt_connect (edge_h, NULL);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", NULL, "127.0.0.1", 1883, &broker_h);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_release_handle (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "", "127.0.0.1", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
}
/**
TEST(edgeMqtt, connectInvalidParam3_n)
{
int ret = -1;
- nns_edge_h edge_h;
+ nns_edge_broker_h broker_h;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
- ret = nns_edge_mqtt_connect (edge_h, "");
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", NULL, 1883, &broker_h);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_release_handle (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "", 1883, &broker_h);
+ EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
}
/**
- * @brief Connect to the mqtt broker with invalid hostaddress.
+ * @brief Connect to the mqtt broker with invalid param.
*/
TEST(edgeMqtt, connectInvalidParam4_n)
{
int ret = -1;
- nns_edge_h edge_h;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "tcp://none");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
- ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic");
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, NULL);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_release_handle (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
/**
- * @brief Close the mqtt handle with invalid param.
+ * @brief Connect to the mqtt broker with invalid host address.
*/
-TEST(edgeMqtt, closeInvalidParam_n)
+TEST(edgeMqtt, connectInvalidParam5_n)
{
int ret = -1;
+ nns_edge_broker_h broker_h;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_mqtt_close (NULL);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "tcp://none", 1883, &broker_h);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
}
/**
- * @brief Close the mqtt handle before the connection.
+ * @brief Connect to the mqtt broker with invalid port number.
*/
-TEST(edgeMqtt, closeInvalidParam2_n)
+TEST(edgeMqtt, connectInvalidParam6_n)
{
int ret = -1;
- nns_edge_h edge_h;
+ nns_edge_broker_h broker_h;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_close (edge_h);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 0, &broker_h);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_release_handle (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
/**
- * @brief Publish with invalid param.
+ * @brief Close the mqtt handle with invalid param.
*/
-TEST(edgeMqtt, publishInvalidParam_n)
+TEST(edgeMqtt, closeInvalidParam_n)
{
int ret = -1;
- const char *msg = "TEMP_MESSAGE";
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
+ ret = nns_edge_mqtt_close (NULL);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
}
/**
* @brief Publish with invalid param.
*/
-TEST(edgeMqtt, publishInvalidParam2_n)
+TEST(edgeMqtt, publishInvalidParam_n)
{
int ret = -1;
- nns_edge_h edge_h;
const char *msg = "TEMP_MESSAGE";
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
- ret = nns_edge_start (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- /* data is null */
- ret = nns_edge_mqtt_publish (edge_h, NULL, strlen (msg) + 1);
+ ret = nns_edge_mqtt_publish (NULL, msg, strlen (msg) + 1);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_release_handle (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
/**
* @brief Publish with invalid param.
*/
-TEST(edgeMqtt, publishInvalidParam3_n)
+TEST(edgeMqtt, publishInvalidParam2_n)
{
int ret = -1;
- nns_edge_h edge_h;
+ nns_edge_broker_h broker_h;
const char *msg = "TEMP_MESSAGE";
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
- ret = nns_edge_start (edge_h);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- /* data length is 0 */
- ret = nns_edge_mqtt_publish (edge_h, msg, 0);
+ /* data is null */
+ ret = nns_edge_mqtt_publish (broker_h, NULL, strlen (msg) + 1);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_release_handle (edge_h);
+ ret = nns_edge_mqtt_close (broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
/**
- * @brief Publish the message without the connection.
+ * @brief Publish with invalid param.
*/
-TEST(edgeMqtt, publishInvalidParam4_n)
+TEST(edgeMqtt, publishInvalidParam3_n)
{
int ret = -1;
- nns_edge_h edge_h;
+ nns_edge_broker_h broker_h;
const char *msg = "TEMP_MESSAGE";
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
- ret = nns_edge_mqtt_publish (edge_h, msg, strlen (msg) + 1);
+ /* data length is 0 */
+ ret = nns_edge_mqtt_publish (broker_h, msg, 0);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_release_handle (edge_h);
+ ret = nns_edge_mqtt_close (broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
}
-/**
- * @brief Subscribe the topic before the connection.
- */
-TEST(edgeMqtt, subscribeInvalidParam2_n)
-{
- int ret = -1;
- nns_edge_h edge_h;
-
- if (!_check_mqtt_broker ())
- return;
-
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_mqtt_subscribe (edge_h);
- EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_release_handle (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-}
-
/**
* @brief Get message with invalid param.
*/
TEST(edgeMqtt, getMessageInvalidParam2_n)
{
int ret = -1;
- nns_edge_h edge_h;
+ nns_edge_broker_h broker_h;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
- ret = nns_edge_start (edge_h);
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_mqtt_get_message (edge_h, NULL);
+ ret = nns_edge_mqtt_get_message (broker_h, NULL);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_release_handle (edge_h);
+ ret = nns_edge_mqtt_close (broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
TEST(edgeMqtt, getMessageWithinTimeout_n)
{
int ret = -1;
- nns_edge_h edge_h;
+ nns_edge_broker_h broker_h;
char *msg = NULL;
if (!_check_mqtt_broker ())
return;
- ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
- nns_edge_set_info (edge_h, "DEST_PORT", "1883");
-
- ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic");
+ ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_mqtt_get_message (edge_h, &msg);
+ ret = nns_edge_mqtt_get_message (broker_h, &msg);
EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
- ret = nns_edge_mqtt_close (edge_h);
- EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-
- ret = nns_edge_release_handle (edge_h);
+ ret = nns_edge_mqtt_close (broker_h);
EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
}
#endif /* ENABLE_MQTT */