queue/queue2/multiqueue: When flushing, make sure to not lose any sticky events
authorSebastian Dröge <slomo@circular-chaos.org>
Mon, 27 May 2013 11:01:43 +0000 (13:01 +0200)
committerSebastian Dröge <slomo@circular-chaos.org>
Mon, 27 May 2013 11:01:43 +0000 (13:01 +0200)
https://bugzilla.gnome.org/show_bug.cgi?id=688824

plugins/elements/gstmultiqueue.c
plugins/elements/gstqueue.c
plugins/elements/gstqueue2.c

index fd5fe2e..1f258e8 100644 (file)
@@ -198,6 +198,8 @@ static void compute_high_time (GstMultiQueue * mq);
 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 
+static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
+
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
     GST_PAD_SINK,
     GST_PAD_REQUEST,
@@ -733,7 +735,8 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
 }
 
 static gboolean
-gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
+gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
+    gboolean full)
 {
   gboolean result;
 
@@ -760,7 +763,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
     sq->sink_tainted = sq->src_tainted = TRUE;
   } else {
     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
-    gst_data_queue_flush (sq->queue);
+    gst_single_queue_flush_queue (sq, full);
     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
     /* All pads start off not-linked for a smooth kick-off */
@@ -1372,7 +1375,7 @@ out_flushing:
      * so empty this one and trigger dynamic queue growth. At
      * this point the srcresult is not OK, NOT_LINKED
      * or EOS, i.e. a real failure */
-    gst_data_queue_flush (sq->queue);
+    gst_single_queue_flush_queue (sq, FALSE);
     single_queue_underrun_cb (sq->queue, sq);
     gst_data_queue_set_flushing (sq->queue, TRUE);
     gst_pad_pause_task (sq->srcpad);
@@ -1511,7 +1514,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
       res = gst_pad_push_event (sq->srcpad, event);
 
-      gst_single_queue_flush (mq, sq, TRUE);
+      gst_single_queue_flush (mq, sq, TRUE, FALSE);
       goto done;
 
     case GST_EVENT_FLUSH_STOP:
@@ -1520,7 +1523,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
       res = gst_pad_push_event (sq->srcpad, event);
 
-      gst_single_queue_flush (mq, sq, FALSE);
+      gst_single_queue_flush (mq, sq, FALSE, FALSE);
       goto done;
     case GST_EVENT_SEGMENT:
       /* take ref because the queue will take ownership and we need the event
@@ -1646,9 +1649,9 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
   switch (mode) {
     case GST_PAD_MODE_PUSH:
       if (active) {
-        result = gst_single_queue_flush (mq, sq, FALSE);
+        result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
       } else {
-        result = gst_single_queue_flush (mq, sq, TRUE);
+        result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
         /* make sure streaming finishes */
         result |= gst_pad_stop_task (pad);
       }
@@ -1952,6 +1955,41 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
 }
 
 static void
+gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
+{
+  GstDataQueueItem *sitem;
+  gboolean was_flushing = FALSE;
+
+  while (!gst_data_queue_is_empty (sq->queue)) {
+    GstMiniObject *data;
+
+    /* FIXME: If this fails here although the queue is not empty,
+     * we're flushing... but we want to rescue all sticky
+     * events nonetheless.
+     */
+    if (!gst_data_queue_pop (sq->queue, &sitem)) {
+      was_flushing = TRUE;
+      gst_data_queue_set_flushing (sq->queue, FALSE);
+      continue;
+    }
+
+    data = sitem->object;
+
+    if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
+        GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
+        && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
+      gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data));
+    }
+
+    sitem->destroy (sitem);
+  }
+
+  gst_data_queue_flush (sq->queue);
+  if (was_flushing)
+    gst_data_queue_set_flushing (sq->queue, TRUE);
+}
+
+static void
 gst_single_queue_free (GstSingleQueue * sq)
 {
   /* DRAIN QUEUE */
index d3780ba..8348265 100644 (file)
@@ -209,7 +209,7 @@ static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent,
 static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
-static void gst_queue_locked_flush (GstQueue * queue);
+static void gst_queue_locked_flush (GstQueue * queue, gboolean full);
 
 static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent,
     GstPadMode mode, gboolean active);
@@ -573,7 +573,7 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
 }
 
 static void
-gst_queue_locked_flush (GstQueue * queue)
+gst_queue_locked_flush (GstQueue * queue, gboolean full)
 {
   GstMiniObject *data;
 
@@ -581,6 +581,11 @@ gst_queue_locked_flush (GstQueue * queue)
     data = gst_queue_array_pop_head (queue->queue);
     /* Then lose another reference because we are supposed to destroy that
        data when flushing */
+    if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
+        GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
+        && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
+      gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
+    }
     if (!GST_IS_QUERY (data))
       gst_mini_object_unref (data);
   }
@@ -627,7 +632,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
       /* Zero the thresholds, this makes sure the queue is completely
        * filled and we can read all data from the queue. */
       if (queue->flush_on_eos)
-        gst_queue_locked_flush (queue);
+        gst_queue_locked_flush (queue, FALSE);
       else
         GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
       /* mark the queue as EOS. This prevents us from accepting more data. */
@@ -758,7 +763,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       gst_pad_push_event (queue->srcpad, event);
 
       GST_QUEUE_MUTEX_LOCK (queue);
-      gst_queue_locked_flush (queue);
+      gst_queue_locked_flush (queue, FALSE);
       queue->srcresult = GST_FLOW_OK;
       queue->eos = FALSE;
       queue->unexpected = FALSE;
@@ -1228,7 +1233,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (ret));
     if (ret == GST_FLOW_FLUSHING)
-      gst_queue_locked_flush (queue);
+      gst_queue_locked_flush (queue, FALSE);
     else
       GST_QUEUE_SIGNAL_DEL (queue);
     GST_QUEUE_MUTEX_UNLOCK (queue);
@@ -1377,7 +1382,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
         /* step 1, unblock chain function */
         GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_FLUSHING;
-        gst_queue_locked_flush (queue);
+        gst_queue_locked_flush (queue, TRUE);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       }
       result = TRUE;
index c1f1ce6..90395df 100644 (file)
@@ -1520,7 +1520,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
 }
 
 static void
-gst_queue2_locked_flush (GstQueue2 * queue)
+gst_queue2_locked_flush (GstQueue2 * queue, gboolean full)
 {
   if (!QUEUE_IS_USING_QUEUE (queue)) {
     if (QUEUE_IS_USING_TEMP_FILE (queue))
@@ -1530,6 +1530,12 @@ gst_queue2_locked_flush (GstQueue2 * queue)
     while (!g_queue_is_empty (&queue->queue)) {
       GstMiniObject *data = g_queue_pop_head (&queue->queue);
 
+      if (!full && GST_IS_EVENT (data) && GST_EVENT_IS_STICKY (data) &&
+          GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
+          && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
+        gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (data));
+      }
+
       /* Then lose another reference because we are supposed to destroy that
          data when flushing */
       if (!GST_IS_QUERY (data))
@@ -2202,7 +2208,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
         gst_pad_push_event (queue->srcpad, event);
 
         GST_QUEUE2_MUTEX_LOCK (queue);
-        gst_queue2_locked_flush (queue);
+        gst_queue2_locked_flush (queue, FALSE);
         queue->srcresult = GST_FLOW_OK;
         queue->sinkresult = GST_FLOW_OK;
         queue->is_eos = FALSE;
@@ -3043,7 +3049,7 @@ gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
         GST_DEBUG_OBJECT (queue, "deactivating push mode");
         queue->srcresult = GST_FLOW_FLUSHING;
         queue->sinkresult = GST_FLOW_FLUSHING;
-        gst_queue2_locked_flush (queue);
+        gst_queue2_locked_flush (queue, TRUE);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
       }
       result = TRUE;