multiqueue: avoid returning downstream GST_FLOW_EOS from previous segment to current...
authorMark Nauwelaerts <mnauw@users.sourceforge.net>
Sat, 21 Feb 2015 16:13:26 +0000 (17:13 +0100)
committerMark Nauwelaerts <mnauw@users.sourceforge.net>
Mon, 23 Feb 2015 19:08:20 +0000 (20:08 +0100)
plugins/elements/gstmultiqueue.c

index c60596b..aef27ad 100644 (file)
@@ -1209,7 +1209,7 @@ done:
 
 static GstFlowReturn
 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
-    GstMiniObject * object)
+    GstMiniObject * object, gboolean * allow_drop)
 {
   GstFlowReturn result = sq->srcresult;
 
@@ -1226,11 +1226,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     /* Applying the buffer may have made the queue non-full again, unblock it if needed */
     gst_data_queue_limits_changed (sq->queue);
 
-    GST_DEBUG_OBJECT (mq,
-        "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
-        sq->id, buffer, GST_TIME_ARGS (timestamp));
-
-    result = gst_pad_push (sq->srcpad, buffer);
+    if (G_UNLIKELY (*allow_drop)) {
+      GST_DEBUG_OBJECT (mq,
+          "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
+          sq->id, buffer, GST_TIME_ARGS (timestamp));
+      gst_buffer_unref (buffer);
+    } else {
+      GST_DEBUG_OBJECT (mq,
+          "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
+          sq->id, buffer, GST_TIME_ARGS (timestamp));
+      result = gst_pad_push (sq->srcpad, buffer);
+    }
   } else if (GST_IS_EVENT (object)) {
     GstEvent *event;
 
@@ -1239,11 +1245,17 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     switch (GST_EVENT_TYPE (event)) {
       case GST_EVENT_EOS:
         result = GST_FLOW_EOS;
+        if (G_UNLIKELY (*allow_drop))
+          *allow_drop = FALSE;
         break;
       case GST_EVENT_SEGMENT:
         apply_segment (mq, sq, event, &sq->src_segment);
         /* Applying the segment may have made the queue non-full again, unblock it if needed */
         gst_data_queue_limits_changed (sq->queue);
+        if (G_UNLIKELY (*allow_drop)) {
+          result = GST_FLOW_OK;
+          *allow_drop = FALSE;
+        }
         break;
       case GST_EVENT_GAP:
         apply_gap (mq, sq, event, &sq->src_segment);
@@ -1254,18 +1266,32 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
         break;
     }
 
-    GST_DEBUG_OBJECT (mq,
-        "SingleQueue %d : Pushing event %p of type %s",
-        sq->id, event, GST_EVENT_TYPE_NAME (event));
+    if (G_UNLIKELY (*allow_drop)) {
+      GST_DEBUG_OBJECT (mq,
+          "SingleQueue %d : Dropping EOS event %p of type %s",
+          sq->id, event, GST_EVENT_TYPE_NAME (event));
+      gst_event_unref (event);
+    } else {
+      GST_DEBUG_OBJECT (mq,
+          "SingleQueue %d : Pushing event %p of type %s",
+          sq->id, event, GST_EVENT_TYPE_NAME (event));
 
-    gst_pad_push_event (sq->srcpad, event);
+      gst_pad_push_event (sq->srcpad, event);
+    }
   } else if (GST_IS_QUERY (object)) {
     GstQuery *query;
     gboolean res;
 
     query = GST_QUERY_CAST (object);
 
-    res = gst_pad_peer_query (sq->srcpad, query);
+    if (G_UNLIKELY (*allow_drop)) {
+      GST_DEBUG_OBJECT (mq,
+          "SingleQueue %d : Dropping EOS query %p", sq->id, query);
+      gst_query_unref (query);
+      res = FALSE;
+    } else {
+      res = gst_pad_peer_query (sq->srcpad, query);
+    }
 
     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     sq->last_query = res;
@@ -1354,10 +1380,12 @@ gst_multi_queue_loop (GstPad * pad)
   GstClockTime next_time;
   gboolean is_buffer;
   gboolean do_update_buffering = FALSE;
+  gboolean dropping = FALSE;
 
   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
   mq = sq->mqueue;
 
+next:
   GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
 
   if (sq->flushing)
@@ -1485,7 +1513,7 @@ gst_multi_queue_loop (GstPad * pad)
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
   /* Try to push out the new object */
-  result = gst_single_queue_push_one (mq, sq, object);
+  result = gst_single_queue_push_one (mq, sq, object, &dropping);
   object = NULL;
 
   /* Check if we pushed something already and if this is
@@ -1525,6 +1553,25 @@ gst_multi_queue_loop (GstPad * pad)
 
   if (is_buffer)
     sq->pushed = TRUE;
+
+  /* now hold on a bit;
+   * can not simply throw this result to upstream, because
+   * that might already be onto another segment, so we have to make
+   * sure we are relaying the correct info wrt proper segment */
+  if (result == GST_FLOW_EOS && !dropping &&
+      sq->srcresult != GST_FLOW_NOT_LINKED) {
+    GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
+    dropping = TRUE;
+    /* pretend we have not seen EOS yet for upstream's sake */
+    result = sq->srcresult;
+  } else if (dropping && gst_data_queue_is_empty (sq->queue)) {
+    /* queue empty, so stop dropping
+     * we can commit the result we have now,
+     * which is either OK after a segment, or EOS */
+    GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
+    dropping = FALSE;
+    result = GST_FLOW_EOS;
+  }
   sq->srcresult = result;
   sq->last_oldid = newid;
 
@@ -1534,6 +1581,9 @@ gst_multi_queue_loop (GstPad * pad)
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   gst_multi_queue_post_buffering (mq);
 
+  if (dropping)
+    goto next;
+
   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
       && result != GST_FLOW_EOS)
     goto out_flushing;
@@ -1807,6 +1857,11 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
     case GST_EVENT_SEGMENT:
       apply_segment (mq, sq, sref, &sq->sink_segment);
       gst_event_unref (sref);
+      /* a new segment allows us to accept more buffers if we got EOS
+       * from downstream */
+      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+      sq->srcresult = GST_FLOW_OK;
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       break;
     case GST_EVENT_GAP:
       apply_gap (mq, sq, sref, &sq->sink_segment);