[Query] Add mqtt state change callback.
authorgichan <gichan2.jang@samsung.com>
Thu, 30 Sep 2021 09:45:46 +0000 (18:45 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Sat, 9 Oct 2021 05:20:32 +0000 (14:20 +0900)
Add mqtt state change callback instead of uncertain sleep.

Signed-off-by: gichan <gichan2.jang@samsung.com>
Signed-off-by: Gichan Jang <gichan2.jang@samsung.com>
gst/nnstreamer/tensor_query/tensor_query_client.c
gst/nnstreamer/tensor_query/tensor_query_client.h
gst/nnstreamer/tensor_query/tensor_query_serversrc.c

index 482dd43..6181ae0 100644 (file)
@@ -155,10 +155,10 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
   g_object_class_install_property (gobject_class, PROP_MQTT_HOST,
       g_param_spec_string ("mqtt-host", "MQTT Host",
           "MQTT host address to connect.",
-          DEFAULT_MQTT_HOST_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          DEFAULT_MQTT_HOST_ADDRESS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_MQTT_PORT,
-      g_param_spec_string ("mqtt-port", "MQTT Port",
-          "MQTT port to connect.",
+      g_param_spec_string ("mqtt-port", "MQTT Port", "MQTT port to connect.",
           DEFAULT_MQTT_HOST_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
@@ -213,6 +213,7 @@ gst_tensor_query_client_init (GstTensorQueryClient * self)
   self->srv_info_queue = g_async_queue_new ();
   self->mqtt_host = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
   self->mqtt_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
+  self->mqtt_state = MQTT_INITIALIZING;
 
   gst_tensors_config_init (&self->in_config);
   gst_tensors_config_init (&self->out_config);
@@ -441,6 +442,17 @@ _parse_mqtt_message (GstTensorQueryClient * self, gchar * payload)
 }
 
 /**
+ * @brief MQTT State change callback
+ */
+static void
+_state_change_cb (void *user_data, query_mqtt_state_t state)
+{
+  GstTensorQueryClient *self = (GstTensorQueryClient *) (user_data);
+  self->mqtt_state = state;
+  nns_logd ("MQTT stated changed to %d", self->mqtt_state);
+}
+
+/**
  * @brief MQTT raw message received callback function.
  */
 static void
@@ -478,21 +490,26 @@ _mqtt_subcribe_topic (GstTensorQueryClient * self)
   gboolean ret = TRUE;
   topic = g_strdup_printf ("edge/inference/+/%s/#", self->operation);
 
-  err = query_open_connection (&self->query_handle, self->mqtt_host, self->mqtt_port, NULL, NULL);
+  err =
+      query_open_connection (&self->query_handle, self->mqtt_host,
+      self->mqtt_port, _state_change_cb, self);
   if (err != 0) {
     nns_loge ("[MQTT] Failed to connect mqtt broker. err: %d\n", err);
     ret = FALSE;
     goto done;
   }
+
   /** Wait until connection is established. */
-  g_usleep (2000000);
+  while (MQTT_CONNECTED != self->mqtt_state) {
+    g_usleep (10000);
+  }
+
   err =
       query_subscribe_topic (self->query_handle, topic, _msg_received_cb, self);
   if (err != 0) {
     nns_loge ("[MQTT] Failed to subscribe mqtt broker. err: %d\n", err);
     ret = FALSE;
   }
-  g_usleep (2000000);
 
 done:
   g_free (topic);
@@ -649,7 +666,9 @@ gst_tensor_query_client_sink_event (GstPad * pad,
           /**
            * @todo Need to update server selection policy. Now, use first received info.
           */
-          if ((srv_info = g_async_queue_try_pop (self->srv_info_queue))) {
+          if ((srv_info =
+                  g_async_queue_timeout_pop (self->srv_info_queue,
+                      DEFAULT_TIMEOUT_MS))) {
             _copy_srv_info (self, srv_info);
             _free_srv_info (srv_info);
           }
index 2e658f4..31bb695 100644 (file)
@@ -55,6 +55,8 @@ struct _GstTensorQueryClient
   GAsyncQueue *srv_info_queue;
   gchar *mqtt_host;
   gchar *mqtt_port;
+  query_mqtt_state_t mqtt_state;
+
   /* src information (Connect to query server source) */
   query_connection_handle src_conn;
   gchar *src_host;
index a97fede..17cda82 100644 (file)
@@ -151,6 +151,7 @@ gst_tensor_query_serversrc_init (GstTensorQueryServerSrc * src)
   src->mqtt_topic = NULL;
   src->mqtt_host = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
   src->mqtt_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
+  src->mqtt_state = MQTT_INITIALIZING;
   gst_tensors_config_init (&src->src_config);
   src->server_data = nnstreamer_query_server_data_new ();
 }
@@ -273,6 +274,17 @@ gst_tensor_query_serversrc_get_property (GObject * object, guint prop_id,
 }
 
 /**
+ * @brief MQTT State change callback
+ */
+static void
+_state_change_cb (void *user_data, query_mqtt_state_t state)
+{
+  GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) (user_data);
+  src->mqtt_state = state;
+  nns_logd ("MQTT stated changed to %d", src->mqtt_state);
+}
+
+/**
  * @brief start processing of query_serversrc, setting up the server
  */
 static gboolean
@@ -333,13 +345,18 @@ gst_tensor_query_serversrc_start (GstBaseSrc * bsrc)
 
     err =
         query_open_connection (&src->query_handle, src->mqtt_host,
-        src->mqtt_port, NULL, NULL);
+        src->mqtt_port, _state_change_cb, src);
     if (err != 0) {
       nns_loge ("[MQTT] Failed to connect mqtt broker. err: %d\n", err);
       ret = FALSE;
       goto done;
     }
-    g_usleep (1000000);
+
+    /** Wait until connection is established. */
+    while (MQTT_CONNECTED != src->mqtt_state) {
+      g_usleep (10000);
+    }
+
     err =
         query_publish_raw_data (src->query_handle, src->mqtt_topic, msg,
         strlen (msg), TRUE);