[Gst/MQTTSink] Reduce unnecessary mutex lock when changing states
authorWook Song <wook16.song@samsung.com>
Fri, 28 May 2021 13:56:24 +0000 (22:56 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Mon, 7 Jun 2021 07:41:04 +0000 (16:41 +0900)
This patch reduces unnecessary mutex lock operations when changing
MQTTSink's internal states. In addition, g_cond_wait is replaced with
g_cond_wait_until to avoid possible deadlocks.

Signed-off-by: Wook Song <wook16.song@samsung.com>
gst/mqtt/mqttsink.c

index 513f9a9..4c724c2 100644 (file)
@@ -551,18 +551,24 @@ gst_mqtt_sink_stop (GstBaseSink * basesink)
   disconn_opts.onFailure = cb_mqtt_on_disconnect_failure;
   disconn_opts.context = self;
 
-  g_mutex_lock (&self->mqtt_sink_mutex);
-  self->mqtt_sink_state = SINK_RENDER_STOPPED;
+  g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_STOPPED);
   while (MQTTAsync_isConnected (self->mqtt_client_handle)) {
+    gint64 end_time = g_get_monotonic_time () +
+        DEFAULT_MQTT_DISCONNECT_TIMEOUT / 10;
+    mqtt_sink_state_t cur_state;
+
     MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts);
-    g_cond_wait (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex);
-    if ((self->mqtt_sink_state == MQTT_DISCONNECTED) ||
-        (self->mqtt_sink_state == MQTT_DISCONNECT_FAILED) ||
-        (self->mqtt_sink_state == SINK_RENDER_EOS) ||
-        (self->mqtt_sink_state == SINK_RENDER_ERROR))
+    g_mutex_lock (&self->mqtt_sink_mutex);
+    g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
+        end_time);
+    g_mutex_unlock (&self->mqtt_sink_mutex);
+    cur_state = g_atomic_int_get (&self->mqtt_sink_state);
+
+    if ((cur_state == MQTT_DISCONNECTED) ||
+        (cur_state == MQTT_DISCONNECT_FAILED) ||
+        (cur_state == SINK_RENDER_EOS) || (cur_state == SINK_RENDER_ERROR))
       break;
   }
-  g_mutex_unlock (&self->mqtt_sink_mutex);
   MQTTAsync_destroy (&self->mqtt_client_handle);
 
   return TRUE;
@@ -667,19 +673,25 @@ gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
   static gboolean is_static_sized_buf = FALSE;
   GstMqttSink *self = GST_MQTT_SINK (basesink);
   GstFlowReturn ret = GST_FLOW_ERROR;
+  mqtt_sink_state_t cur_state;
   GstMemory *in_buf_mem;
   GstMapInfo in_buf_map;
   gint mqtt_rc;
   guint8 *msg_pub;
 
-  g_mutex_lock (&self->mqtt_sink_mutex);
-  while (self->mqtt_sink_state != MQTT_CONNECTED) {
+  while ((cur_state =
+          g_atomic_int_get (&self->mqtt_sink_state)) != MQTT_CONNECTED) {
     gint64 end_time = g_get_monotonic_time ();
+    mqtt_sink_state_t _state;
 
     end_time += (self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND);
+    g_mutex_lock (&self->mqtt_sink_mutex);
     g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
         end_time);
-    switch (self->mqtt_sink_state) {
+    g_mutex_unlock (&self->mqtt_sink_mutex);
+
+    _state = g_atomic_int_get (&self->mqtt_sink_state);
+    switch (_state) {
       case MQTT_CONNECT_FAILURE:
       case MQTT_DISCONNECTED:
       case MQTT_CONNECTION_LOST:
@@ -692,9 +704,8 @@ gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
       default:
         continue;
     }
-    goto ret_unlock;
+    goto ret_with;
   }
-  g_mutex_unlock (&self->mqtt_sink_mutex);
 
   if (self->num_buffers == 0) {
     ret = GST_FLOW_EOS;
@@ -775,11 +786,6 @@ ret_unref_in_buf_mem:
 
 ret_with:
   return ret;
-
-ret_unlock:
-  g_mutex_unlock (&self->mqtt_sink_mutex);
-
-  return ret;
 }
 
 /**
@@ -816,8 +822,8 @@ gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event)
 
   switch (type) {
     case GST_EVENT_EOS:
+      g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
       g_mutex_lock (&self->mqtt_sink_mutex);
-      self->mqtt_sink_state = SINK_RENDER_EOS;
       g_cond_broadcast (&self->mqtt_sink_gcond);
       g_mutex_unlock (&self->mqtt_sink_mutex);
       break;
@@ -1072,8 +1078,8 @@ cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
 {
   GstMqttSink *self = (GstMqttSink *) context;
 
+  g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTED);
   g_mutex_lock (&self->mqtt_sink_mutex);
-  self->mqtt_sink_state = MQTT_CONNECTED;
   g_cond_broadcast (&self->mqtt_sink_gcond);
   g_mutex_unlock (&self->mqtt_sink_mutex);
 }
@@ -1088,8 +1094,8 @@ cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
 {
   GstMqttSink *self = (GstMqttSink *) context;
 
+  g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECT_FAILURE);
   g_mutex_lock (&self->mqtt_sink_mutex);
-  self->mqtt_sink_state = MQTT_CONNECT_FAILURE;
   g_cond_broadcast (&self->mqtt_sink_gcond);
   g_mutex_unlock (&self->mqtt_sink_mutex);
 }
@@ -1104,8 +1110,8 @@ cb_mqtt_on_disconnect (void *context, MQTTAsync_successData * response)
 {
   GstMqttSink *self = (GstMqttSink *) context;
 
+  g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECTED);
   g_mutex_lock (&self->mqtt_sink_mutex);
-  self->mqtt_sink_state = MQTT_DISCONNECTED;
   g_cond_broadcast (&self->mqtt_sink_gcond);
   g_mutex_unlock (&self->mqtt_sink_mutex);
 }
@@ -1120,11 +1126,10 @@ cb_mqtt_on_disconnect_failure (void *context, MQTTAsync_failureData * response)
 {
   GstMqttSink *self = (GstMqttSink *) context;
 
+  g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECT_FAILED);
   g_mutex_lock (&self->mqtt_sink_mutex);
-  self->mqtt_sink_state = MQTT_DISCONNECT_FAILED;
   g_cond_broadcast (&self->mqtt_sink_gcond);
   g_mutex_unlock (&self->mqtt_sink_mutex);
-
 }
 
 /**
@@ -1151,8 +1156,8 @@ cb_mqtt_on_connection_lost (void *context, char *cause)
 {
   GstMqttSink *self = (GstMqttSink *) context;
 
+  g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTION_LOST);
   g_mutex_lock (&self->mqtt_sink_mutex);
-  self->mqtt_sink_state = MQTT_CONNECTION_LOST;
   g_cond_broadcast (&self->mqtt_sink_gcond);
   g_mutex_unlock (&self->mqtt_sink_mutex);
 }
@@ -1177,13 +1182,15 @@ static void
 cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response)
 {
   GstMqttSink *self = (GstMqttSink *) context;
+  mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
 
-  g_mutex_lock (&self->mqtt_sink_mutex);
-  if (self->mqtt_sink_state == SINK_RENDER_STOPPED) {
-    self->mqtt_sink_state = SINK_RENDER_EOS;
+  if (state == SINK_RENDER_STOPPED) {
+    g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
+
+    g_mutex_lock (&self->mqtt_sink_mutex);
     g_cond_broadcast (&self->mqtt_sink_gcond);
+    g_mutex_unlock (&self->mqtt_sink_mutex);
   }
-  g_mutex_unlock (&self->mqtt_sink_mutex);
 }
 
 /**
@@ -1194,11 +1201,14 @@ static void
 cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response)
 {
   GstMqttSink *self = (GstMqttSink *) context;
+  mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
 
-  g_mutex_lock (&self->mqtt_sink_mutex);
-  if (self->mqtt_sink_state == SINK_RENDER_STOPPED) {
-    self->mqtt_sink_state = SINK_RENDER_ERROR;
+  if (state == SINK_RENDER_STOPPED) {
+    g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_ERROR);
+
+    g_mutex_lock (&self->mqtt_sink_mutex);
     g_cond_broadcast (&self->mqtt_sink_gcond);
+    g_mutex_unlock (&self->mqtt_sink_mutex);
   }
-  g_mutex_unlock (&self->mqtt_sink_mutex);
+
 }