multiqueue: Wake up all not-linked streams when a stream switches from linked to...
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 29 Mar 2012 12:45:41 +0000 (14:45 +0200)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 29 Mar 2012 12:49:53 +0000 (14:49 +0200)
We reset all the waiting streams, let them push another buffer to
see if they're now active again. This allows faster switching
between streams and prevents deadlocks if downstream does any
waiting too.

Also improve locking a bit, srcresult must be protected by the
multiqueue lock too because it's used/set from random threads.

plugins/elements/gstmultiqueue.c

index e882ff7..9891b93 100644 (file)
@@ -140,6 +140,12 @@ struct _GstSingleQueue
 
   /* flowreturn of previous srcpad push */
   GstFlowReturn srcresult;
+  /* If something was actually pushed on
+   * this pad after flushing/pad activation
+   * and the srcresult corresponds to something
+   * real
+   */
+  gboolean pushed;
 
   /* segments */
   GstSegment sink_segment;
@@ -748,27 +754,29 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
       sq->id);
 
   if (flush) {
+    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     sq->srcresult = GST_FLOW_WRONG_STATE;
     gst_data_queue_set_flushing (sq->queue, TRUE);
 
     sq->flushing = TRUE;
+    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
     /* wake up non-linked task */
     GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
         sq->id);
-    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     g_cond_signal (sq->turn);
-    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
     GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
     result = gst_pad_pause_task (sq->srcpad);
     sq->sink_tainted = sq->src_tainted = TRUE;
   } else {
+    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     gst_data_queue_flush (sq->queue);
     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
     /* All pads start off not-linked for a smooth kick-off */
     sq->srcresult = GST_FLOW_OK;
+    sq->pushed = FALSE;
     sq->cur_time = 0;
     sq->max_size.visible = mq->max_size.visible;
     sq->is_eos = FALSE;
@@ -780,11 +788,10 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
     gst_data_queue_set_flushing (sq->queue, FALSE);
 
     /* Reset high time to be recomputed next */
-    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
     mq->high_time = GST_CLOCK_TIME_NONE;
-    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
     sq->flushing = FALSE;
+    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
     GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
     result =
@@ -1218,14 +1225,13 @@ gst_multi_queue_loop (GstPad * pad)
    * or it's the first loop, or we just passed the previous highid, 
    * we might need to wake some sleeping pad up, so there's extra work 
    * there too */
+  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   if (sq->srcresult == GST_FLOW_NOT_LINKED
       || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
       || sq->last_oldid > mq->highid) {
     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
         gst_flow_get_name (sq->srcresult));
 
-    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
-
     /* Check again if we're flushing after the lock is taken,
      * the flush flag might have been changed in the meantime */
     if (sq->flushing) {
@@ -1292,9 +1298,8 @@ gst_multi_queue_loop (GstPad * pad)
     /* We're done waiting, we can clear the nextid and nexttime */
     sq->nextid = 0;
     sq->next_time = GST_CLOCK_TIME_NONE;
-
-    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   }
+  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
   if (sq->flushing)
     goto out_flushing;
@@ -1303,6 +1308,7 @@ gst_multi_queue_loop (GstPad * pad)
       gst_flow_get_name (sq->srcresult));
 
   /* Update time stats */
+  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   next_time = get_running_time (&sq->src_segment, object, FALSE);
   if (next_time != GST_CLOCK_TIME_NONE) {
     if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
@@ -1313,10 +1319,51 @@ gst_multi_queue_loop (GstPad * pad)
       wake_up_next_non_linked (mq);
     }
   }
+  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
   /* Try to push out the new object */
   result = gst_single_queue_push_one (mq, sq, object);
+
+  /* Check if we pushed something already and if this is
+   * now a switch from an active to a non-active stream.
+   *
+   * If it is, we reset all the waiting streams, let them
+   * push another buffer to see if they're now active again.
+   * This allows faster switching between streams and prevents
+   * deadlocks if downstream does any waiting too.
+   */
+  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+  if (sq->pushed && sq->srcresult == GST_FLOW_OK
+      && result == GST_FLOW_NOT_LINKED) {
+    GList *tmp;
+
+    GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
+        sq->id);
+
+    compute_high_id (mq);
+
+    /* maybe no-one is waiting */
+    if (mq->numwaiting > 0) {
+      /* Else figure out which singlequeue(s) need waking up */
+      for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+        GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
+
+        if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
+          GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
+          sq2->pushed = FALSE;
+          sq2->srcresult = GST_FLOW_OK;
+          g_cond_signal (sq2->turn);
+        }
+      }
+    }
+  }
+
+  if (GST_IS_BUFFER (object))
+    sq->pushed = TRUE;
   sq->srcresult = result;
+  sq->last_oldid = newid;
+  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+
   object = NULL;
 
   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
@@ -1326,8 +1373,6 @@ gst_multi_queue_loop (GstPad * pad)
   GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
       gst_flow_get_name (sq->srcresult));
 
-  sq->last_oldid = newid;
-
   return;
 
 out_flushing:
@@ -1422,16 +1467,30 @@ static gboolean
 gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
 {
   GstSingleQueue *sq;
+  GstMultiQueue *mq;
 
   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
+  mq = (GstMultiQueue *) gst_pad_get_parent (pad);
+
+  /* mq is NULL if the pad is activated/deactivated before being
+   * added to the multiqueue */
+  if (mq)
+    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
 
   if (active) {
     /* All pads start off linked until they push one buffer */
     sq->srcresult = GST_FLOW_OK;
+    sq->pushed = FALSE;
   } else {
     sq->srcresult = GST_FLOW_WRONG_STATE;
     gst_data_queue_flush (sq->queue);
   }
+
+  if (mq) {
+    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+    gst_object_unref (mq);
+  }
+
   return TRUE;
 }
 
@@ -1944,6 +2003,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, gint id)
 
   sq->mqueue = mqueue;
   sq->srcresult = GST_FLOW_WRONG_STATE;
+  sq->pushed = FALSE;
   sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction)
       single_queue_check_full,
       (GstDataQueueFullCallback) single_queue_overrun_cb,