multiqueue: Fix tracking of unlinked streams.
authorJan Schmidt <thaytan@noraisin.net>
Wed, 27 Oct 2010 16:12:36 +0000 (18:12 +0200)
committerEdward Hervey <bilboed@bilboed.com>
Wed, 27 Oct 2010 16:12:36 +0000 (18:12 +0200)
33082eb9e42c52e4df848195946f1b7bbce768c5 introduced a bug
preventing sparse unlinked streams from advancing properly,
leading to the queue blocking.

Fixes: #633176

plugins/elements/gstmultiqueue.c

index 38d263e..17a6f8a 100644 (file)
@@ -152,6 +152,7 @@ struct _GstSingleQueue
   /* Protected by global lock */
   guint32 nextid;               /* ID of the next object waiting to be pushed */
   guint32 oldid;                /* ID of the last object pushed (last in a series) */
+  guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
   GCond *turn;                  /* SingleQueue turn waiting conditional */
 };
 
@@ -681,6 +682,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
     sq->is_eos = FALSE;
     sq->nextid = 0;
     sq->oldid = 0;
+    sq->last_oldid = G_MAXUINT32;
     gst_data_queue_set_flushing (sq->queue, FALSE);
 
     GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
@@ -1035,14 +1037,17 @@ gst_multi_queue_loop (GstPad * pad)
   object = gst_multi_queue_item_steal_object (item);
   gst_multi_queue_item_destroy (item);
 
-  GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d", sq->id, newid);
+  GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
+      sq->id, newid, sq->last_oldid);
 
   /* If we're not-linked, we do some extra work because we might need to
    * wait before pushing. If we're linked but there's a gap in the IDs,
    * or it's the first loop, or we just passed the previous highid, 
    * we might need to wake some sleeping pad up, so there's extra work 
    * there too */
-  if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+  if (sq->srcresult == GST_FLOW_NOT_LINKED ||
+      (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) ||
+      sq->last_oldid > mq->highid) {
     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
         gst_flow_get_name (sq->srcresult));
 
@@ -1051,6 +1056,10 @@ gst_multi_queue_loop (GstPad * pad)
     /* Update the nextid so other threads know when to wake us up */
     sq->nextid = newid;
 
+    /* Update the oldid (the last ID we output) for highid tracking */
+    if (sq->last_oldid != G_MAXUINT32)
+      sq->oldid = sq->last_oldid;
+
     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
       /* Go to sleep until it's time to push this buffer */
 
@@ -1091,7 +1100,6 @@ gst_multi_queue_loop (GstPad * pad)
   /* Try to push out the new object */
   result = gst_single_queue_push_one (mq, sq, object);
   sq->srcresult = result;
-  sq->oldid = newid;
 
   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
       && result != GST_FLOW_UNEXPECTED)
@@ -1100,6 +1108,7 @@ gst_multi_queue_loop (GstPad * pad)
   GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
       gst_flow_get_name (sq->srcresult));
 
+  sq->last_oldid = newid;
   return;
 
 out_flushing: