/* flowreturn of previous srcpad push */
GstFlowReturn srcresult;
+ /* If something was actually pushed on
+ * this pad after flushing/pad activation
+ * and the srcresult corresponds to something
+ * real
+ */
+ gboolean pushed;
/* segments */
GstSegment sink_segment;
sq->id);
if (flush) {
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_set_flushing (sq->queue, TRUE);
sq->flushing = TRUE;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id);
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
g_cond_signal (sq->turn);
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
result = gst_pad_pause_task (sq->srcpad);
sq->sink_tainted = sq->src_tainted = TRUE;
} else {
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
gst_data_queue_flush (sq->queue);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
/* All pads start off not-linked for a smooth kick-off */
sq->srcresult = GST_FLOW_OK;
+ sq->pushed = FALSE;
sq->cur_time = 0;
sq->max_size.visible = mq->max_size.visible;
sq->is_eos = FALSE;
gst_data_queue_set_flushing (sq->queue, FALSE);
/* Reset high time to be recomputed next */
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->high_time = GST_CLOCK_TIME_NONE;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
sq->flushing = FALSE;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result =
* or it's the first loop, or we just passed the previous highid,
* we might need to wake some sleeping pad up, so there's extra work
* there too */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (sq->srcresult == GST_FLOW_NOT_LINKED
|| (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
|| sq->last_oldid > mq->highid) {
GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
-
/* Check again if we're flushing after the lock is taken,
* the flush flag might have been changed in the meantime */
if (sq->flushing) {
/* We're done waiting, we can clear the nextid and nexttime */
sq->nextid = 0;
sq->next_time = GST_CLOCK_TIME_NONE;
-
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
if (sq->flushing)
goto out_flushing;
gst_flow_get_name (sq->srcresult));
/* Update time stats */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
next_time = get_running_time (&sq->src_segment, object, FALSE);
if (next_time != GST_CLOCK_TIME_NONE) {
if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
wake_up_next_non_linked (mq);
}
}
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object);
+
+ /* Check if we pushed something already and if this is
+ * now a switch from an active to a non-active stream.
+ *
+ * If it is, we reset all the waiting streams, let them
+ * push another buffer to see if they're now active again.
+ * This allows faster switching between streams and prevents
+ * deadlocks if downstream does any waiting too.
+ */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ if (sq->pushed && sq->srcresult == GST_FLOW_OK
+ && result == GST_FLOW_NOT_LINKED) {
+ GList *tmp;
+
+ GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
+ sq->id);
+
+ compute_high_id (mq);
+
+ /* maybe no-one is waiting */
+ if (mq->numwaiting > 0) {
+ /* Else figure out which singlequeue(s) need waking up */
+ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+ GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
+
+ if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
+ GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
+ sq2->pushed = FALSE;
+ sq2->srcresult = GST_FLOW_OK;
+ g_cond_signal (sq2->turn);
+ }
+ }
+ }
+ }
+
+ if (GST_IS_BUFFER (object))
+ sq->pushed = TRUE;
sq->srcresult = result;
+ sq->last_oldid = newid;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+
object = NULL;
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
- sq->last_oldid = newid;
-
return;
out_flushing:
gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
{
GstSingleQueue *sq;
+ GstMultiQueue *mq;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
+ mq = (GstMultiQueue *) gst_pad_get_parent (pad);
+
+ /* mq is NULL if the pad is activated/deactivated before being
+ * added to the multiqueue */
+ if (mq)
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
if (active) {
/* All pads start off linked until they push one buffer */
sq->srcresult = GST_FLOW_OK;
+ sq->pushed = FALSE;
} else {
sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_flush (sq->queue);
}
+
+ if (mq) {
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ gst_object_unref (mq);
+ }
+
return TRUE;
}
sq->mqueue = mqueue;
sq->srcresult = GST_FLOW_WRONG_STATE;
+ sq->pushed = FALSE;
sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction)
single_queue_check_full,
(GstDataQueueFullCallback) single_queue_overrun_cb,