if (eh->flags & NNS_EDGE_FLAG_SERVER) {
if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
- gchar *device, *topic, *msg;
+ char *topic, *msg;
- if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+ /** @todo Set unique device name.
+ * Device name should be unique. Consider using MAC address later.
+ * Now, use ID received from the user.
+ */
+ topic = nns_edge_strdup_printf ("edge/inference/device-%s/%s/",
+ eh->id, eh->topic);
+
+ ret = nns_edge_mqtt_connect (eh, topic);
+ SAFE_FREE (topic);
+
+ if (NNS_EDGE_ERROR_NONE != ret) {
nns_edge_loge
("Failed to start nnstreamer-edge. Connection failure to broker.");
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
goto error;
}
- /** @todo Set unique device name.
- * Device name should be unique. Consider using MAC address later.
- * Now, use ID received from the user.
- */
- device = g_strdup_printf ("device-%s", eh->id);
- topic = g_strdup_printf ("edge/inference/%s/%s/", device, eh->topic);
-
- g_free (device);
- g_free (eh->topic);
- eh->topic = topic;
- msg = nns_edge_strdup_printf ("%s:%d", eh->host, eh->port);
-
- if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_publish (eh, msg,
- strlen (msg) + 1)) {
- nns_edge_loge ("Failed to publish the meesage: %s", msg);
- ret = NNS_EDGE_ERROR_IO;
+ msg = nns_edge_get_host_string (eh->host, eh->port);
+
+ ret = nns_edge_mqtt_publish (eh, msg, strlen (msg) + 1);
+ SAFE_FREE (msg);
+
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to publish the meesage to broker.");
goto error;
}
- nns_edge_free (msg);
}
}
{
nns_edge_handle_s *eh;
int ret;
- char *server_ip = NULL;
- int server_port;
eh = (nns_edge_handle_s *) edge_h;
if (!eh) {
eh->dest_port = dest_port;
if (NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
- gchar *topic, *msg = NULL;
+ char *topic, *msg = NULL;
+ char *server_ip = NULL;
+ int server_port;
if (!nns_edge_mqtt_is_connected (eh)) {
- if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+ topic = nns_edge_strdup_printf ("edge/inference/+/%s/#", eh->topic);
+
+ ret = nns_edge_mqtt_connect (eh, topic);
+ SAFE_FREE (topic);
+
+ if (NNS_EDGE_ERROR_NONE != ret) {
nns_edge_loge ("Connection failure to broker.");
nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ return ret;
}
- topic = g_strdup_printf ("edge/inference/+/%s/#", eh->topic);
- g_free (eh->topic);
- eh->topic = topic;
- if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_subscribe (eh)) {
+ ret = nns_edge_mqtt_subscribe (eh);
+ if (NNS_EDGE_ERROR_NONE != ret) {
nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ return ret;
}
}
- ret = nns_edge_mqtt_get_message (eh, &msg);
- while (NNS_EDGE_ERROR_NONE == ret) {
- gchar **splits;
- splits = g_strsplit (msg, ":", -1);
- server_ip = g_strdup (splits[0]);
- server_port = g_ascii_strtoull (splits[1], NULL, 10);
+ while ((ret = nns_edge_mqtt_get_message (eh, &msg)) == NNS_EDGE_ERROR_NONE) {
+ nns_edge_parse_host_string (msg, &server_ip, &server_port);
+ SAFE_FREE (msg);
+
nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
server_port);
- g_strfreev (splits);
- g_free (msg);
-
ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
+ SAFE_FREE (server_ip);
+
if (NNS_EDGE_ERROR_NONE == ret) {
break;
}
- SAFE_FREE (server_ip);
- ret = nns_edge_mqtt_get_message (eh, &msg);
}
- } else { /** case for NNS_EDGE_CONNECT_TYPE_TCP == eh->protocol */
- server_ip = nns_edge_strdup (dest_host);
- server_port = dest_port;
- ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
+ } else {
+ ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port);
if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to connect to %s:%d", server_ip, server_port);
+ nns_edge_loge ("Failed to connect to %s:%d", dest_host, dest_port);
}
}
- SAFE_FREE (server_ip);
nns_edge_unlock (eh);
return ret;
}
GAsyncQueue *server_list;
GMutex mqtt_mutex;
GCond mqtt_gcond;
- gboolean mqtt_is_connected;
+ bool mqtt_is_connected;
+ char *topic;
} nns_edge_broker_s;
/**
bh = (nns_edge_broker_s *) eh->broker_h;
nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause);
g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = FALSE;
+ bh->mqtt_is_connected = false;
g_cond_broadcast (&bh->mqtt_gcond);
g_mutex_unlock (&bh->mqtt_mutex);
-
- if (eh->event_cb) {
- /** @todo send new event (MQTT disconnected) */
- }
}
/**
bh = (nns_edge_broker_s *) eh->broker_h;
g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = TRUE;
+ bh->mqtt_is_connected = true;
g_cond_broadcast (&bh->mqtt_gcond);
g_mutex_unlock (&bh->mqtt_mutex);
-
- if (eh->event_cb) {
- /** @todo send new event (MQTT connected) */
- }
}
/**
nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id);
g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = FALSE;
+ bh->mqtt_is_connected = false;
g_cond_broadcast (&bh->mqtt_gcond);
g_mutex_unlock (&bh->mqtt_mutex);
-
- if (eh->event_cb) {
- /** @todo send new event (MQTT connection failure) */
- }
}
/**
nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id);
g_mutex_lock (&bh->mqtt_mutex);
- bh->mqtt_is_connected = FALSE;
+ bh->mqtt_is_connected = false;
g_cond_broadcast (&bh->mqtt_gcond);
g_mutex_unlock (&bh->mqtt_mutex);
-
- if (eh->event_cb) {
- /** @todo send new event (MQTT disconnected) */
- }
}
/**
}
nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id);
- if (eh->event_cb) {
- /** @todo send new event (MQTT disconnection failure) */
- }
}
/**
nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
eh->id, eh->topic);
- msg = (char *) malloc (message->payloadlen);
- memcpy (msg, message->payload, message->payloadlen);
+ msg = nns_edge_memdup (message->payload, message->payloadlen);
g_async_queue_push (bh->server_list, msg);
- if (eh->event_cb) {
- /** @todo send new event (message arrived) */
- }
-
return TRUE;
}
* @note This is internal function for MQTT broker. You should call this with edge-handle lock.
*/
int
-nns_edge_mqtt_connect (nns_edge_h edge_h)
+nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
{
nns_edge_handle_s *eh;
nns_edge_broker_s *bh;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- bh = (nns_edge_broker_s *) malloc (sizeof (nns_edge_broker_s));
+ nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+ eh->id, eh->dest_host, eh->dest_port);
+
+ bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
if (!bh) {
nns_edge_loge ("Failed to allocate memory for broker handle.");
return NNS_EDGE_ERROR_OUT_OF_MEMORY;
}
- url = nns_edge_strdup_printf ("%s:%d", eh->dest_host, eh->dest_port);
+ url = nns_edge_get_host_string (eh->dest_host, eh->dest_port);
client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
ret = MQTTAsync_create (&handle, url, client_id,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
+ SAFE_FREE (url);
+ SAFE_FREE (client_id);
+
if (MQTTASYNC_SUCCESS != ret) {
nns_edge_loge ("Failed to create MQTT handle.");
ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
g_cond_init (&bh->mqtt_gcond);
g_mutex_init (&bh->mqtt_mutex);
- bh->mqtt_is_connected = FALSE;
+ bh->topic = nns_edge_strdup (topic);
+ bh->mqtt_is_connected = false;
bh->mqtt_h = handle;
bh->server_list = g_async_queue_new ();
eh->broker_h = bh;
- bh = (nns_edge_broker_s *) eh->broker_h;
- if (!bh->mqtt_h) {
- nns_edge_loge ("Invalid state, MQTT connection was not completed.");
- ret = NNS_EDGE_ERROR_IO;
- goto error;
- }
- handle = bh->mqtt_h;
-
- nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
-
MQTTAsync_setCallbacks (handle, edge_h,
mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL);
}
bh = (nns_edge_broker_s *) eh->broker_h;
-
- if (!bh->mqtt_h) {
- nns_edge_loge ("Invalid state, MQTT connection was not completed.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
handle = bh->mqtt_h;
- nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->dest_host, eh->dest_port);
+ if (handle) {
+ nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
+ eh->id, eh->dest_host, eh->dest_port);
- options.onSuccess = mqtt_cb_disconnection_success;
- options.onFailure = mqtt_cb_disconnection_failure;
- options.context = edge_h;
+ options.onSuccess = mqtt_cb_disconnection_success;
+ options.onFailure = mqtt_cb_disconnection_failure;
+ options.context = edge_h;
- /** Clear retained message */
- MQTTAsync_send (handle, eh->topic, 0, NULL, 1, 1, NULL);
+ /* Clear retained message */
+ MQTTAsync_send (handle, bh->topic, 0, NULL, 1, 1, NULL);
- while (MQTTAsync_isConnected (handle)) {
- if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
- nns_edge_loge ("Failed to disconnect MQTT.");
- return NNS_EDGE_ERROR_IO;
+ while (MQTTAsync_isConnected (handle)) {
+ if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
+ nns_edge_loge ("Failed to disconnect MQTT.");
+ return NNS_EDGE_ERROR_IO;
+ }
+ g_usleep (10000);
}
- g_usleep (10000);
+
+ MQTTAsync_destroy (&handle);
}
+
g_cond_clear (&bh->mqtt_gcond);
g_mutex_clear (&bh->mqtt_mutex);
- MQTTAsync_destroy (&handle);
-
while ((msg = g_async_queue_try_pop (bh->server_list))) {
SAFE_FREE (msg);
}
g_async_queue_unref (bh->server_list);
bh->server_list = NULL;
+
+ SAFE_FREE (bh->topic);
SAFE_FREE (bh);
+ eh->broker_h = NULL;
return NNS_EDGE_ERROR_NONE;
}
}
/* Publish a message (default QoS 1 - at least once and retained true). */
- ret = MQTTAsync_send (handle, eh->topic, length, data, 1, 1, NULL);
+ ret = MQTTAsync_send (handle, bh->topic, length, data, 1, 1, NULL);
if (ret != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
eh->id, eh->topic);
}
/* Subscribe a topic (default QoS 1 - at least once). */
- ret = MQTTAsync_subscribe (handle, eh->topic, 1, NULL);
+ ret = MQTTAsync_subscribe (handle, bh->topic, 1, NULL);
if (ret != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
eh->id, eh->topic);
*msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT);
if (!*msg) {
- nns_edge_loge ("Failed to get message from mqtt broker within timeout");
+ nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
return NNS_EDGE_ERROR_UNKNOWN;
}