g_object_class_install_property (gobject_class, PROP_MQTT_HOST,
g_param_spec_string ("mqtt-host", "MQTT Host",
"MQTT host address to connect.",
- DEFAULT_MQTT_HOST_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ DEFAULT_MQTT_HOST_ADDRESS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MQTT_PORT,
- g_param_spec_string ("mqtt-port", "MQTT Port",
- "MQTT port to connect.",
+ g_param_spec_string ("mqtt-port", "MQTT Port", "MQTT port to connect.",
DEFAULT_MQTT_HOST_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
self->srv_info_queue = g_async_queue_new ();
self->mqtt_host = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
self->mqtt_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
+ self->mqtt_state = MQTT_INITIALIZING;
gst_tensors_config_init (&self->in_config);
gst_tensors_config_init (&self->out_config);
}
/**
+ * @brief MQTT State change callback
+ */
+static void
+_state_change_cb (void *user_data, query_mqtt_state_t state)
+{
+ GstTensorQueryClient *self = (GstTensorQueryClient *) (user_data);
+ self->mqtt_state = state;
+ nns_logd ("MQTT stated changed to %d", self->mqtt_state);
+}
+
+/**
* @brief MQTT raw message received callback function.
*/
static void
gboolean ret = TRUE;
topic = g_strdup_printf ("edge/inference/+/%s/#", self->operation);
- err = query_open_connection (&self->query_handle, self->mqtt_host, self->mqtt_port, NULL, NULL);
+ err =
+ query_open_connection (&self->query_handle, self->mqtt_host,
+ self->mqtt_port, _state_change_cb, self);
if (err != 0) {
nns_loge ("[MQTT] Failed to connect mqtt broker. err: %d\n", err);
ret = FALSE;
goto done;
}
+
/** Wait until connection is established. */
- g_usleep (2000000);
+ while (MQTT_CONNECTED != self->mqtt_state) {
+ g_usleep (10000);
+ }
+
err =
query_subscribe_topic (self->query_handle, topic, _msg_received_cb, self);
if (err != 0) {
nns_loge ("[MQTT] Failed to subscribe mqtt broker. err: %d\n", err);
ret = FALSE;
}
- g_usleep (2000000);
done:
g_free (topic);
/**
* @todo Need to update server selection policy. Now, use first received info.
*/
- if ((srv_info = g_async_queue_try_pop (self->srv_info_queue))) {
+ if ((srv_info =
+ g_async_queue_timeout_pop (self->srv_info_queue,
+ DEFAULT_TIMEOUT_MS))) {
_copy_srv_info (self, srv_info);
_free_srv_info (srv_info);
}
src->mqtt_topic = NULL;
src->mqtt_host = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
src->mqtt_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
+ src->mqtt_state = MQTT_INITIALIZING;
gst_tensors_config_init (&src->src_config);
src->server_data = nnstreamer_query_server_data_new ();
}
}
/**
+ * @brief MQTT State change callback
+ */
+static void
+_state_change_cb (void *user_data, query_mqtt_state_t state)
+{
+ GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) (user_data);
+ src->mqtt_state = state;
+ nns_logd ("MQTT stated changed to %d", src->mqtt_state);
+}
+
+/**
* @brief start processing of query_serversrc, setting up the server
*/
static gboolean
err =
query_open_connection (&src->query_handle, src->mqtt_host,
- src->mqtt_port, NULL, NULL);
+ src->mqtt_port, _state_change_cb, src);
if (err != 0) {
nns_loge ("[MQTT] Failed to connect mqtt broker. err: %d\n", err);
ret = FALSE;
goto done;
}
- g_usleep (1000000);
+
+ /** Wait until connection is established. */
+ while (MQTT_CONNECTED != src->mqtt_state) {
+ g_usleep (10000);
+ }
+
err =
query_publish_raw_data (src->query_handle, src->mqtt_topic, msg,
strlen (msg), TRUE);