#include "nnstreamer-edge-common.h"
#include "nnstreamer-edge-internal.h"
+#include <arpa/inet.h>
+#include <ifaddrs.h>
#define N_BACKLOG 10
#define DEFAULT_TIMEOUT_SEC 10
/* If event callback is null, return ok. */
if (!eh->event_cb) {
- nns_edge_logw ("The event callback is null, do nothing!");
+ nns_edge_logi ("The event callback is null, do nothing!");
return NNS_EDGE_ERROR_NONE;
}
/* Send ip and port to destination. */
_nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
- _get_host_str (eh->ip, eh->port, &host);
+ _get_host_str (eh->listener_ip, eh->listener_port, &host);
cmd.info.num = 1;
cmd.info.mem_size[0] = strlen (host) + 1;
cmd.mem[0] = host;
break;
}
- /** Receive data from the client */
+ /* Receive data from the client */
_nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
ret = _nns_edge_cmd_receive (conn, &cmd);
if (ret != NNS_EDGE_ERROR_NONE) {
return NNS_EDGE_ERROR_OUT_OF_MEMORY;
}
- /** Create message receving thread */
+ /* Create message receving thread */
pthread_attr_init (&attr);
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- /**
+ /**
* @todo manage edge handles
* 1. consider adding hash table or list to manage edge handles.
* 2. compare topic and return error if existing topic in handle is different.
eh->ip = nns_edge_strdup ("localhost");
eh->port = 0;
eh->caps_str = NULL;
+ eh->broker_h = NULL;
/* Connection data for each client ID. */
eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
}
/**
+ * @brief Get IP address
+ */
+static gchar *
+_get_ip_address (void)
+{
+ struct ifaddrs *addrs;
+ gchar *ret = NULL;
+
+ getifaddrs (&addrs);
+ while (addrs) {
+ if (addrs->ifa_addr && addrs->ifa_addr->sa_family == AF_INET) {
+ struct sockaddr_in *pAddr = (struct sockaddr_in *) addrs->ifa_addr;
+
+ if (NULL != strstr (addrs->ifa_name, "en") ||
+ NULL != strstr (addrs->ifa_name, "et")) {
+ g_free (ret);
+ ret = g_strdup (inet_ntoa (pAddr->sin_addr));
+ break;
+ }
+ }
+ addrs = addrs->ifa_next;
+ }
+ while ((addrs = addrs->ifa_next));
+
+ freeifaddrs (addrs);
+
+ if (NULL == ret)
+ ret = g_strdup ("localhost");
+
+ return ret;
+}
+
+/**
* @brief Start the nnstreamer edge.
*/
int
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
+ eh->listener_ip = _get_ip_address ();
+ eh->listener_port = nns_edge_get_available_port ();
+
eh->is_server = is_server;
- if (!is_server && 0 == eh->port) {
- eh->port = nns_edge_get_available_port ();
- if (eh->port <= 0) {
- nns_edge_loge ("Failed to start edge. Cannot get available port.");
- nns_edge_unlock (eh);
- return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ if (eh->is_server) {
+ if (NNS_EDGE_PROTOCOL_MQTT == eh->protocol) {
+ gchar *device, *topic, *msg;
+
+ if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+ nns_edge_loge
+ ("Failed to start nnstreamer-edge. Connection failure to broker.");
+ ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ goto error;
+ }
+
+ /* @todo Set unique device name.
+ * Device name should be unique. Consider using MAC address later.
+ * Now, use ID received from the user.
+ */
+ device = g_strdup_printf ("device-%s", eh->id);
+ topic = g_strdup_printf ("edge/inference/%s/%s/", device, eh->topic);
+
+ g_free (device);
+ g_free (eh->topic);
+ eh->topic = topic;
+ msg = g_strdup_printf ("%s:%d", eh->listener_ip, eh->listener_port);
+
+ if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_publish (eh, msg,
+ strlen (msg) + 1)) {
+ nns_edge_loge ("Failed to publish the meesage: %s", msg);
+ ret = NNS_EDGE_ERROR_IO;
+ g_free (msg);
+ goto error;
+ }
+ g_free (msg);
+ } else { /** case for NNS_EDGE_PROTOCOL_TCP == eh->protocol */
+ g_free (eh->listener_ip);
+ eh->listener_ip = g_strdup (eh->ip);
+ eh->listener_port = eh->port;
}
}
- /** Initialize server src data. */
+ /* Initialize server src data. */
eh->listener = g_socket_listener_new ();
g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
- if (!_nns_edge_get_saddr (eh->ip, eh->port, &saddr)) {
+ if (!_nns_edge_get_saddr (eh->listener_ip, eh->listener_port, &saddr)) {
nns_edge_loge ("Failed to get socket address");
ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
goto error;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
+ if (nns_edge_mqtt_is_connected (eh)) {
+ if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
+ nns_edge_logw ("Failed to close mqtt connection.");
+ }
+ }
+
eh->magic = NNS_EDGE_MAGIC_DEAD;
eh->event_cb = NULL;
eh->user_data = NULL;
SAFE_FREE (eh->topic);
SAFE_FREE (eh->ip);
SAFE_FREE (eh->caps_str);
+ SAFE_FREE (eh->listener_ip);
nns_edge_unlock (eh);
nns_edge_lock_destroy (eh);
{
nns_edge_handle_s *eh;
int ret;
+ char *server_ip = NULL;
+ int server_port;
eh = (nns_edge_handle_s *) edge_h;
if (!eh) {
}
eh->protocol = protocol;
+ if (0 != g_ascii_strcasecmp (eh->topic, "TCP_DIRECT")) {
+ gchar *topic, *msg = NULL;
+
+ g_free (eh->ip);
+ eh->ip = g_strdup (ip);
+ eh->port = port;
+ if (!nns_edge_mqtt_is_connected (eh)) {
+ if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_connect (eh)) {
+ nns_edge_loge ("Connection failure to broker.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ }
+ topic = g_strdup_printf ("edge/inference/+/%s/#", eh->topic);
+ g_free (eh->topic);
+ eh->topic = topic;
+
+ if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_subscribe (eh)) {
+ nns_edge_loge ("Failed to subscribe to topic: %s.", eh->topic);
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ }
+ }
+
+ if (NNS_EDGE_ERROR_NONE == nns_edge_mqtt_get_message (eh, &msg)) {
+ gchar **splits;
+ splits = g_strsplit (msg, ":", -1);
+ server_ip = g_strdup (splits[0]);
+ server_port = g_ascii_strtoull (splits[1], NULL, 10);
+ nns_edge_logd ("[DEBUG] Parsed server info: Server [%s:%d] ", server_ip,
+ server_port);
+
+ g_strfreev (splits);
+ g_free (msg);
+ } else {
+ nns_edge_loge ("Failed to get server IP addr within timeout.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ }
+ } else { /** case for NNS_EDGE_PROTOCOL_TCP == eh->protocol */
+ server_ip = g_strdup (ip);
+ server_port = port;
+ }
/** Connect to info channel. */
- ret = _nns_edge_connect_to (eh, eh->client_id, ip, port);
+ ret = _nns_edge_connect_to (eh, eh->client_id, server_ip, server_port);
if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to connect to %s:%d", ip, port);
+ nns_edge_loge ("Failed to connect to %s:%d", server_ip, server_port);
}
nns_edge_unlock (eh);
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- /** @todo update code (publish data) */
+ /* @todo update code (publish data) */
nns_edge_unlock (eh);
return NNS_EDGE_ERROR_NONE;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- /** @todo update code (subscribe) */
+ /** @todo update code (subscribe) */
nns_edge_unlock (eh);
return NNS_EDGE_ERROR_NONE;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- /** @todo update code (unsubscribe) */
+ /** @todo update code (unsubscribe) */
nns_edge_unlock (eh);
return NNS_EDGE_ERROR_NONE;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- /**
+ /*
* @todo User handles (replace or append) the capability of edge handle.
* @todo Change key-value set as json or hash table.
*/
} else if (0 == strcasecmp (key, "TOPIC")) {
SAFE_FREE (eh->topic);
eh->topic = nns_edge_strdup (value);
+ } else if (0 == strcasecmp (key, "PROTOCOL")) {
+ eh->protocol = g_ascii_strtoll (value, NULL, 10);
} else {
nns_edge_logw ("Failed to set edge info. Unknown key: %s", key);
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- /**
+ /*
* @todo User handles (replace or append) the capability of edge handle.
* @todo Change key-value set as json or hash table.
*/
#if !defined(ENABLE_MQTT)
#error "This file can be built with Paho MQTT library."
#endif
+#define DEFAULT_SUB_TIMEOUT 1000000 /** 1 second */
#include <unistd.h>
#include <MQTTAsync.h>
#include "nnstreamer-edge-internal.h"
/**
+ * @brief Data structure for mqtt broker handle.
+ */
+typedef struct
+{
+ void *mqtt_h;
+ GAsyncQueue *server_list;
+ GMutex mqtt_mutex;
+ GCond mqtt_gcond;
+ gboolean mqtt_is_connected;
+} nns_edge_broker_s;
+
+/**
* @brief Callback function to be called when the connection is lost.
*/
static void
mqtt_cb_connection_lost (void *context, char *cause)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
eh = (nns_edge_handle_s *) context;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return;
}
+ bh = (nns_edge_broker_s *) eh->broker_h;
nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause);
+ g_mutex_lock (&bh->mqtt_mutex);
+ bh->mqtt_is_connected = FALSE;
+ g_cond_broadcast (&bh->mqtt_gcond);
+ g_mutex_unlock (&bh->mqtt_mutex);
+
if (eh->event_cb) {
/** @todo send new event (MQTT disconnected) */
}
mqtt_cb_connection_success (void *context, MQTTAsync_successData * response)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
UNUSED (response);
eh = (nns_edge_handle_s *) context;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return;
}
- nns_edge_logi ("MQTT connection is completed (ID:%s).", eh->id);
+ bh = (nns_edge_broker_s *) eh->broker_h;
+
+ g_mutex_lock (&bh->mqtt_mutex);
+ bh->mqtt_is_connected = TRUE;
+ g_cond_broadcast (&bh->mqtt_gcond);
+ g_mutex_unlock (&bh->mqtt_mutex);
+
if (eh->event_cb) {
/** @todo send new event (MQTT connected) */
}
mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
UNUSED (response);
eh = (nns_edge_handle_s *) context;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return;
}
+ bh = (nns_edge_broker_s *) eh->broker_h;
+
nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id);
+ g_mutex_lock (&bh->mqtt_mutex);
+ bh->mqtt_is_connected = FALSE;
+ g_cond_broadcast (&bh->mqtt_gcond);
+ g_mutex_unlock (&bh->mqtt_mutex);
+
if (eh->event_cb) {
/** @todo send new event (MQTT connection failure) */
}
mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
UNUSED (response);
eh = (nns_edge_handle_s *) context;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return;
}
+ bh = (nns_edge_broker_s *) eh->broker_h;
+
nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id);
+ g_mutex_lock (&bh->mqtt_mutex);
+ bh->mqtt_is_connected = FALSE;
+ g_cond_broadcast (&bh->mqtt_gcond);
+ g_mutex_unlock (&bh->mqtt_mutex);
+
if (eh->event_cb) {
/** @todo send new event (MQTT disconnected) */
}
MQTTAsync_message * message)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ char *msg = NULL;
UNUSED (topic);
UNUSED (topic_len);
UNUSED (message);
eh = (nns_edge_handle_s *) context;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return TRUE;
}
+ if (0 >= message->payloadlen) {
+ nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
+ return TRUE;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+
nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).",
eh->id, eh->topic);
+
+ msg = (char *) malloc (message->payloadlen);
+ memcpy (msg, message->payload, message->payloadlen);
+ g_async_queue_push (bh->server_list, msg);
+
if (eh->event_cb) {
/** @todo send new event (message arrived) */
}
nns_edge_mqtt_connect (nns_edge_h edge_h)
{
nns_edge_handle_s *eh;
- MQTTAsync handle;
+ nns_edge_broker_s *bh;
MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer;
+ int ret = NNS_EDGE_ERROR_NONE;
+ int64_t end_time;
+ MQTTAsync handle;
char *url;
char *client_id;
- int ret;
eh = (nns_edge_handle_s *) edge_h;
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
- eh->id, eh->ip, eh->port);
+ bh = (nns_edge_broker_s *) malloc (sizeof (nns_edge_broker_s));
+ if (!bh) {
+ nns_edge_loge ("Failed to allocate memory for broker handle.");
+ return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+ }
+
+ memset (bh, 0, sizeof (nns_edge_broker_s));
url = g_strdup_printf ("%s:%d", eh->ip, eh->port);
client_id = g_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
goto error;
}
+ g_cond_init (&bh->mqtt_gcond);
+ g_mutex_init (&bh->mqtt_mutex);
+ bh->mqtt_is_connected = FALSE;
+ bh->mqtt_h = handle;
+ bh->server_list = g_async_queue_new ();
+ eh->broker_h = bh;
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ if (!bh->mqtt_h) {
+ nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+ ret = NNS_EDGE_ERROR_IO;
+ goto error;
+ }
+ handle = bh->mqtt_h;
+
+ nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+ eh->id, eh->ip, eh->port);
+
MQTTAsync_setCallbacks (handle, edge_h,
mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL);
if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to connect MQTT.");
- MQTTAsync_destroy (&handle);
ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
goto error;
}
- eh->mqtt_handle = handle;
- ret = NNS_EDGE_ERROR_NONE;
+ /* Waiting for the connection */
+ end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND;
+ g_mutex_lock (&bh->mqtt_mutex);
+ while (!bh->mqtt_is_connected) {
+ if (!g_cond_wait_until (&bh->mqtt_gcond, &bh->mqtt_mutex, end_time)) {
+ g_mutex_unlock (&bh->mqtt_mutex);
+ nns_edge_loge ("Failed to connect to MQTT broker."
+ "Please check broker is running status or broker host address.");
+ goto error;
+ }
+ }
+ g_mutex_unlock (&bh->mqtt_mutex);
+ return NNS_EDGE_ERROR_NONE;
error:
- g_free (url);
- g_free (client_id);
+ nns_edge_mqtt_close (eh);
return ret;
}
nns_edge_mqtt_close (nns_edge_h edge_h)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
MQTTAsync handle;
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
+ char *msg;
eh = (nns_edge_handle_s *) edge_h;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- handle = eh->mqtt_handle;
+ bh = (nns_edge_broker_s *) eh->broker_h;
- if (!handle) {
+ if (!bh->mqtt_h) {
nns_edge_loge ("Invalid state, MQTT connection was not completed.");
- return NNS_EDGE_ERROR_IO;
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
+ handle = bh->mqtt_h;
nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
eh->id, eh->ip, eh->port);
options.onFailure = mqtt_cb_disconnection_failure;
options.context = edge_h;
+ /** Clear retained message */
+ MQTTAsync_send (handle, eh->topic, 0, NULL, 1, 1, NULL);
+
while (MQTTAsync_isConnected (handle)) {
if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) {
nns_edge_loge ("Failed to disconnect MQTT.");
return NNS_EDGE_ERROR_IO;
}
+ g_usleep (10000);
}
+ g_cond_clear (&bh->mqtt_gcond);
+ g_mutex_clear (&bh->mqtt_mutex);
MQTTAsync_destroy (&handle);
- eh->mqtt_handle = NULL;
+
+ while ((msg = g_async_queue_try_pop (bh->server_list))) {
+ SAFE_FREE (msg);
+ }
+ g_async_queue_unref (bh->server_list);
+ bh->server_list = NULL;
+ SAFE_FREE (bh);
return NNS_EDGE_ERROR_NONE;
}
nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
MQTTAsync handle;
int ret;
eh = (nns_edge_handle_s *) edge_h;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- handle = eh->mqtt_handle;
-
- if (!handle || MQTTAsync_isConnected (handle)) {
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ if (!bh->mqtt_h) {
nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+ handle = bh->mqtt_h;
+
+ if (!MQTTAsync_isConnected (handle)) {
+ nns_edge_loge ("Failed to publish message, MQTT is not connected.");
return NNS_EDGE_ERROR_IO;
}
nns_edge_mqtt_subscribe (nns_edge_h edge_h)
{
nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
MQTTAsync handle;
int ret;
eh = (nns_edge_handle_s *) edge_h;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
nns_edge_loge ("Invalid param, given edge handle is invalid.");
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- handle = eh->mqtt_handle;
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ if (!bh->mqtt_h) {
+ nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+ handle = bh->mqtt_h;
- if (!handle || MQTTAsync_isConnected (handle)) {
+ if (!MQTTAsync_isConnected (handle)) {
nns_edge_loge ("Invalid state, MQTT connection was not completed.");
return NNS_EDGE_ERROR_IO;
}
return NNS_EDGE_ERROR_NONE;
}
+
+/**
+ * @brief Check mqtt connection
+ */
+bool
+nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+ MQTTAsync handle;
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return false;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+ if (!bh->mqtt_h) {
+ nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+ return false;
+ }
+ handle = bh->mqtt_h;
+
+ if (MQTTAsync_isConnected (handle)) {
+ return true;
+ }
+
+ return false;
+}
+
+/**
+ * @brief Get message from mqtt broker.
+ */
+int
+nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_broker_s *bh;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ bh = (nns_edge_broker_s *) eh->broker_h;
+
+ *msg = g_async_queue_timeout_pop (bh->server_list, DEFAULT_SUB_TIMEOUT);
+ if (!*msg) {
+ nns_edge_loge ("Failed to get message from mqtt broker within timeout");
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}