From ee801ff3849f2d2d924745bc43a3f84a073038cb Mon Sep 17 00:00:00 2001 From: Wook Song Date: Fri, 28 May 2021 22:56:24 +0900 Subject: [PATCH] [Gst/MQTTSink] Reduce unnecessary mutex lock when changing states This patch reduces unnecessary mutex lock operations when changing MQTTSink's internal states. In addition, g_cond_wait is replaced with g_cond_wait_until to avoid possible deadlocks. Signed-off-by: Wook Song --- gst/mqtt/mqttsink.c | 76 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 33 deletions(-) diff --git a/gst/mqtt/mqttsink.c b/gst/mqtt/mqttsink.c index 513f9a9..4c724c2 100644 --- a/gst/mqtt/mqttsink.c +++ b/gst/mqtt/mqttsink.c @@ -551,18 +551,24 @@ gst_mqtt_sink_stop (GstBaseSink * basesink) disconn_opts.onFailure = cb_mqtt_on_disconnect_failure; disconn_opts.context = self; - g_mutex_lock (&self->mqtt_sink_mutex); - self->mqtt_sink_state = SINK_RENDER_STOPPED; + g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_STOPPED); while (MQTTAsync_isConnected (self->mqtt_client_handle)) { + gint64 end_time = g_get_monotonic_time () + + DEFAULT_MQTT_DISCONNECT_TIMEOUT / 10; + mqtt_sink_state_t cur_state; + MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts); - g_cond_wait (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex); - if ((self->mqtt_sink_state == MQTT_DISCONNECTED) || - (self->mqtt_sink_state == MQTT_DISCONNECT_FAILED) || - (self->mqtt_sink_state == SINK_RENDER_EOS) || - (self->mqtt_sink_state == SINK_RENDER_ERROR)) + g_mutex_lock (&self->mqtt_sink_mutex); + g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex, + end_time); + g_mutex_unlock (&self->mqtt_sink_mutex); + cur_state = g_atomic_int_get (&self->mqtt_sink_state); + + if ((cur_state == MQTT_DISCONNECTED) || + (cur_state == MQTT_DISCONNECT_FAILED) || + (cur_state == SINK_RENDER_EOS) || (cur_state == SINK_RENDER_ERROR)) break; } - g_mutex_unlock (&self->mqtt_sink_mutex); MQTTAsync_destroy (&self->mqtt_client_handle); return TRUE; @@ -667,19 +673,25 @@ gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf) static gboolean is_static_sized_buf = FALSE; GstMqttSink *self = GST_MQTT_SINK (basesink); GstFlowReturn ret = GST_FLOW_ERROR; + mqtt_sink_state_t cur_state; GstMemory *in_buf_mem; GstMapInfo in_buf_map; gint mqtt_rc; guint8 *msg_pub; - g_mutex_lock (&self->mqtt_sink_mutex); - while (self->mqtt_sink_state != MQTT_CONNECTED) { + while ((cur_state = + g_atomic_int_get (&self->mqtt_sink_state)) != MQTT_CONNECTED) { gint64 end_time = g_get_monotonic_time (); + mqtt_sink_state_t _state; end_time += (self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND); + g_mutex_lock (&self->mqtt_sink_mutex); g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex, end_time); - switch (self->mqtt_sink_state) { + g_mutex_unlock (&self->mqtt_sink_mutex); + + _state = g_atomic_int_get (&self->mqtt_sink_state); + switch (_state) { case MQTT_CONNECT_FAILURE: case MQTT_DISCONNECTED: case MQTT_CONNECTION_LOST: @@ -692,9 +704,8 @@ gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf) default: continue; } - goto ret_unlock; + goto ret_with; } - g_mutex_unlock (&self->mqtt_sink_mutex); if (self->num_buffers == 0) { ret = GST_FLOW_EOS; @@ -775,11 +786,6 @@ ret_unref_in_buf_mem: ret_with: return ret; - -ret_unlock: - g_mutex_unlock (&self->mqtt_sink_mutex); - - return ret; } /** @@ -816,8 +822,8 @@ gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event) switch (type) { case GST_EVENT_EOS: + g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS); g_mutex_lock (&self->mqtt_sink_mutex); - self->mqtt_sink_state = SINK_RENDER_EOS; g_cond_broadcast (&self->mqtt_sink_gcond); g_mutex_unlock (&self->mqtt_sink_mutex); break; @@ -1072,8 +1078,8 @@ cb_mqtt_on_connect (void *context, MQTTAsync_successData * response) { GstMqttSink *self = (GstMqttSink *) context; + g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTED); g_mutex_lock (&self->mqtt_sink_mutex); - self->mqtt_sink_state = MQTT_CONNECTED; g_cond_broadcast (&self->mqtt_sink_gcond); g_mutex_unlock (&self->mqtt_sink_mutex); } @@ -1088,8 +1094,8 @@ cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response) { GstMqttSink *self = (GstMqttSink *) context; + g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECT_FAILURE); g_mutex_lock (&self->mqtt_sink_mutex); - self->mqtt_sink_state = MQTT_CONNECT_FAILURE; g_cond_broadcast (&self->mqtt_sink_gcond); g_mutex_unlock (&self->mqtt_sink_mutex); } @@ -1104,8 +1110,8 @@ cb_mqtt_on_disconnect (void *context, MQTTAsync_successData * response) { GstMqttSink *self = (GstMqttSink *) context; + g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECTED); g_mutex_lock (&self->mqtt_sink_mutex); - self->mqtt_sink_state = MQTT_DISCONNECTED; g_cond_broadcast (&self->mqtt_sink_gcond); g_mutex_unlock (&self->mqtt_sink_mutex); } @@ -1120,11 +1126,10 @@ cb_mqtt_on_disconnect_failure (void *context, MQTTAsync_failureData * response) { GstMqttSink *self = (GstMqttSink *) context; + g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECT_FAILED); g_mutex_lock (&self->mqtt_sink_mutex); - self->mqtt_sink_state = MQTT_DISCONNECT_FAILED; g_cond_broadcast (&self->mqtt_sink_gcond); g_mutex_unlock (&self->mqtt_sink_mutex); - } /** @@ -1151,8 +1156,8 @@ cb_mqtt_on_connection_lost (void *context, char *cause) { GstMqttSink *self = (GstMqttSink *) context; + g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTION_LOST); g_mutex_lock (&self->mqtt_sink_mutex); - self->mqtt_sink_state = MQTT_CONNECTION_LOST; g_cond_broadcast (&self->mqtt_sink_gcond); g_mutex_unlock (&self->mqtt_sink_mutex); } @@ -1177,13 +1182,15 @@ static void cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response) { GstMqttSink *self = (GstMqttSink *) context; + mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state); - g_mutex_lock (&self->mqtt_sink_mutex); - if (self->mqtt_sink_state == SINK_RENDER_STOPPED) { - self->mqtt_sink_state = SINK_RENDER_EOS; + if (state == SINK_RENDER_STOPPED) { + g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS); + + g_mutex_lock (&self->mqtt_sink_mutex); g_cond_broadcast (&self->mqtt_sink_gcond); + g_mutex_unlock (&self->mqtt_sink_mutex); } - g_mutex_unlock (&self->mqtt_sink_mutex); } /** @@ -1194,11 +1201,14 @@ static void cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response) { GstMqttSink *self = (GstMqttSink *) context; + mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state); - g_mutex_lock (&self->mqtt_sink_mutex); - if (self->mqtt_sink_state == SINK_RENDER_STOPPED) { - self->mqtt_sink_state = SINK_RENDER_ERROR; + if (state == SINK_RENDER_STOPPED) { + g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_ERROR); + + g_mutex_lock (&self->mqtt_sink_mutex); g_cond_broadcast (&self->mqtt_sink_gcond); + g_mutex_unlock (&self->mqtt_sink_mutex); } - g_mutex_unlock (&self->mqtt_sink_mutex); + } -- 2.7.4