multiqueue: do not post messages holding the lock
authorThiago Santos <thiagoss@osg.samsung.com>
Thu, 11 Sep 2014 21:01:58 +0000 (18:01 -0300)
committerThiago Santos <thiagoss@osg.samsung.com>
Tue, 16 Sep 2014 19:56:53 +0000 (16:56 -0300)
It might cause deadlocks to post messages while holding the multiqueue
lock. To avoid this a new boolean flag is set whenever a new buffering percent
is found. The message is posted after the lock can be released.

To make sure the buffering messages are posted in the right order, messages
are posted holding another lock. This prevents 2 threads trying to post
messages at the same time.

https://bugzilla.gnome.org/show_bug.cgi?id=736295

plugins/elements/gstmultiqueue.c
plugins/elements/gstmultiqueue.h

index d01ee75..9cd30f2 100644 (file)
@@ -200,6 +200,7 @@ static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 
 static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq);
+static void gst_multi_queue_post_buffering (GstMultiQueue * mq);
 
 static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
 
@@ -271,6 +272,14 @@ enum
   g_mutex_unlock (&q->qlock);                                            \
 } G_STMT_END
 
+#define SET_PERCENT(mq, perc) G_STMT_START {                             \
+  if (perc != mq->percent) {                                             \
+    mq->percent = perc;                                                  \
+    mq->percent_changed = TRUE;                                          \
+    GST_DEBUG_OBJECT (mq, "buffering %d percent", perc);                 \
+  }                                                                      \
+} G_STMT_END
+
 static void gst_multi_queue_finalize (GObject * object);
 static void gst_multi_queue_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec);
@@ -461,6 +470,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue)
   mqueue->high_time = GST_CLOCK_TIME_NONE;
 
   g_mutex_init (&mqueue->qlock);
+  g_mutex_init (&mqueue->buffering_post_lock);
 }
 
 static void
@@ -475,6 +485,7 @@ gst_multi_queue_finalize (GObject * object)
 
   /* free/unref instance data */
   g_mutex_clear (&mqueue->qlock);
+  g_mutex_clear (&mqueue->buffering_post_lock);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -502,6 +513,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
       mq->max_size.bytes = g_value_get_uint (value);
       SET_CHILD_PROPERTY (mq, bytes);
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+      gst_multi_queue_post_buffering (mq);
       break;
     case PROP_MAX_SIZE_BUFFERS:
     {
@@ -537,6 +549,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
       }
 
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+      gst_multi_queue_post_buffering (mq);
 
       break;
     }
@@ -545,6 +558,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
       mq->max_size.time = g_value_get_uint64 (value);
       SET_CHILD_PROPERTY (mq, time);
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+      gst_multi_queue_post_buffering (mq);
       break;
     case PROP_EXTRA_SIZE_BYTES:
       mq->extra_size.bytes = g_value_get_uint (value);
@@ -558,13 +572,11 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
     case PROP_USE_BUFFERING:
       mq->use_buffering = g_value_get_boolean (value);
       if (!mq->use_buffering && mq->buffering) {
-        GstMessage *message;
-
+        GST_MULTI_QUEUE_MUTEX_LOCK (mq);
         mq->buffering = FALSE;
         GST_DEBUG_OBJECT (mq, "buffering 100 percent");
-        message = gst_message_new_buffering (GST_OBJECT_CAST (mq), 100);
-
-        gst_element_post_message (GST_ELEMENT_CAST (mq), message);
+        SET_PERCENT (mq, 100);
+        GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       }
 
       if (mq->use_buffering) {
@@ -583,6 +595,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
 
         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       }
+      gst_multi_queue_post_buffering (mq);
       break;
     case PROP_LOW_PERCENT:
       mq->low_percent = g_value_get_int (value);
@@ -775,6 +788,7 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
       SET_CHILD_PROPERTY (mqueue, visible);
 
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+      gst_multi_queue_post_buffering (mqueue);
 
       break;
     }
@@ -906,7 +920,6 @@ static void
 update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
 {
   gint percent;
-  gboolean post = FALSE;
 
   /* nothing to dowhen we are not in buffering mode */
   if (!mq->use_buffering)
@@ -915,19 +928,13 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
   percent = get_percentage (sq);
 
   if (mq->buffering) {
-    post = TRUE;
     if (percent >= mq->high_percent) {
       mq->buffering = FALSE;
     }
     /* make sure it increases */
     percent = MAX (mq->percent, percent);
 
-    if (percent == mq->percent)
-      /* don't post if nothing changed */
-      post = FALSE;
-    else
-      /* else keep last value we posted */
-      mq->percent = percent;
+    SET_PERCENT (mq, percent);
   } else {
     GList *iter;
     gboolean is_buffering = TRUE;
@@ -944,26 +951,37 @@ update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
 
     if (is_buffering && percent < mq->low_percent) {
       mq->buffering = TRUE;
-      mq->percent = percent;
-      post = TRUE;
+      SET_PERCENT (mq, percent);
     }
   }
-  if (post) {
-    GstMessage *message;
+}
+
+static void
+gst_multi_queue_post_buffering (GstMultiQueue * mq)
+{
+  GstMessage *msg = NULL;
+
+  g_mutex_lock (&mq->buffering_post_lock);
+  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+  if (mq->percent_changed) {
+    gint percent = mq->percent;
+
+    mq->percent_changed = FALSE;
 
-    /* scale to high percent so that it becomes the 100% mark */
     percent = percent * 100 / mq->high_percent;
     /* clip */
     if (percent > 100)
       percent = 100;
 
-    GST_DEBUG_OBJECT (mq, "buffering %d percent", percent);
-    message = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
-
-    gst_element_post_message (GST_ELEMENT_CAST (mq), message);
-  } else {
-    GST_DEBUG_OBJECT (mq, "filled %d percent", percent);
+    GST_DEBUG_OBJECT (mq, "Going to post buffering: %d%%", percent);
+    msg = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
   }
+  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+
+  if (msg != NULL)
+    gst_element_post_message (GST_ELEMENT_CAST (mq), msg);
+
+  g_mutex_unlock (&mq->buffering_post_lock);
 }
 
 /* calculate the diff between running time on the sink and src of the queue.
@@ -1064,6 +1082,7 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
   update_time_level (mq, sq);
 
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+  gst_multi_queue_post_buffering (mq);
 }
 
 /* take a buffer and update segment, updating the time level of the queue. */
@@ -1095,6 +1114,7 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
   /* calc diff with other end */
   update_time_level (mq, sq);
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+  gst_multi_queue_post_buffering (mq);
 }
 
 static GstClockTime
@@ -1473,6 +1493,7 @@ gst_multi_queue_loop (GstPad * pad)
     update_buffering (mq, sq);
 
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+  gst_multi_queue_post_buffering (mq);
 
   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
       && result != GST_FLOW_EOS)
@@ -1732,6 +1753,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       update_buffering (mq, sq);
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       single_queue_overrun_cb (sq->queue, sq);
+      gst_multi_queue_post_buffering (mq);
       break;
     case GST_EVENT_SEGMENT:
       apply_segment (mq, sq, sref, &sq->sink_segment);
@@ -2200,6 +2222,7 @@ gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
   GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue);
   update_buffering (sq->mqueue, sq);
   GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue);
+  gst_multi_queue_post_buffering (sq->mqueue);
 }
 
 static void
index adeeb18..d63eda5 100644 (file)
@@ -74,6 +74,9 @@ struct _GstMultiQueue {
                        /* GstMultiQueueSize, counter and highid */
 
   gint numwaiting;     /* number of not-linked pads waiting */
+
+  gboolean percent_changed;
+  GMutex buffering_post_lock; /* assures only one posted at a time */
 };
 
 struct _GstMultiQueueClass {