From 11304c125708b06df910716bd56e4fd610664681 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 24 Mar 2010 18:18:13 +0100 Subject: [PATCH] queue2: implement flushing in download buffering Maintain a separate variable to control src and sink flowreturn values so that we can unlock the src part without shutting down the sink part. Add flushing for upstream pull based elements that unblocks our getrange function. This implements seeking when blocking for more data. Add some arbitrary threshold before attempting a seek. Add a FIXME for this because we need to find a sensible threshold based on the input rate. --- plugins/elements/gstqueue2.c | 111 ++++++++++++++++++++++++++++++------------- plugins/elements/gstqueue2.h | 1 + 2 files changed, 80 insertions(+), 32 deletions(-) diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index aa85e0a..5c4a620 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -152,9 +152,9 @@ enum g_mutex_lock (q->qlock); \ } G_STMT_END -#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \ - GST_QUEUE2_MUTEX_LOCK (q); \ - if (q->srcresult != GST_FLOW_OK) \ +#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \ + GST_QUEUE2_MUTEX_LOCK (q); \ + if (res != GST_FLOW_OK) \ goto label; \ } G_STMT_END @@ -162,24 +162,24 @@ enum g_mutex_unlock (q->qlock); \ } G_STMT_END -#define GST_QUEUE2_WAIT_DEL_CHECK(q, label) G_STMT_START { \ +#define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \ STATUS (queue, q->sinkpad, "wait for DEL"); \ q->waiting_del = TRUE; \ g_cond_wait (q->item_del, queue->qlock); \ q->waiting_del = FALSE; \ - if (q->srcresult != GST_FLOW_OK) { \ + if (res != GST_FLOW_OK) { \ STATUS (queue, q->srcpad, "received DEL wakeup"); \ goto label; \ } \ STATUS (queue, q->sinkpad, "received DEL"); \ } G_STMT_END -#define GST_QUEUE2_WAIT_ADD_CHECK(q, label) G_STMT_START { \ +#define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \ STATUS (queue, q->srcpad, "wait for ADD"); \ q->waiting_add = TRUE; \ g_cond_wait (q->item_add, q->qlock); \ q->waiting_add = FALSE; \ - if (q->srcresult != GST_FLOW_OK) { \ + if (res != GST_FLOW_OK) { \ STATUS (queue, q->srcpad, "received ADD wakeup"); \ goto label; \ } \ @@ -403,6 +403,7 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class) gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); queue->srcresult = GST_FLOW_WRONG_STATE; + queue->sinkresult = GST_FLOW_WRONG_STATE; queue->is_eos = FALSE; queue->in_timer = g_timer_new (); queue->out_timer = g_timer_new (); @@ -987,7 +988,12 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) return TRUE; } else { - /* we don't have the range, see how far away we are */ + /* we don't have the range, see how far away we are, FIXME, find a good + * threshold based on the incomming rate. */ + if (queue->current && offset < queue->current->writing_pos + 200000) + return FALSE; + + /* too far away, do a seek */ perform_seek_to_offset (queue, offset); } @@ -1005,7 +1011,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, /* check if we have enough data at @offset. If there is not enough data, we * block and wait. */ while (!gst_queue2_have_data (queue, offset, length)) { - GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing); + GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing); } #ifdef HAVE_FSEEKO @@ -1438,9 +1444,10 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) /* now unblock the chain function */ GST_QUEUE2_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; + queue->sinkresult = GST_FLOW_WRONG_STATE; /* unblock the loop and chain functions */ - g_cond_signal (queue->item_add); - g_cond_signal (queue->item_del); + GST_QUEUE2_SIGNAL_ADD (queue); + GST_QUEUE2_SIGNAL_DEL (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); /* make sure it pauses, this should happen since we sent @@ -1461,6 +1468,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) GST_QUEUE2_MUTEX_LOCK (queue); gst_queue2_locked_flush (queue); queue->srcresult = GST_FLOW_OK; + queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; /* reset rate counters */ @@ -1480,7 +1488,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) default: if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ - GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing); /* refuse more events on EOS */ if (queue->is_eos) goto out_eos; @@ -1575,7 +1583,7 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer) GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); /* we have to lock the queue since we span threads */ - GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing); /* when we received EOS, we refuse more data */ if (queue->is_eos) goto out_eos; @@ -1597,7 +1605,7 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer) "queue is full, waiting for free space"); do { /* Wait for space to be available, we could be unlocked because of a flush. */ - GST_QUEUE2_WAIT_DEL_CHECK (queue, out_flushing); + GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); } while (gst_queue2_is_filled (queue)); @@ -1615,7 +1623,7 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer) /* special conditions */ out_flushing: { - GstFlowReturn ret = queue->srcresult; + GstFlowReturn ret = queue->sinkresult; GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %s", gst_flow_get_name (ret)); @@ -1673,7 +1681,7 @@ next: result = gst_pad_push (queue->srcpad, buffer); /* need to check for srcresult here as well */ - GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); if (result == GST_FLOW_UNEXPECTED) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream"); @@ -1718,7 +1726,7 @@ next: gst_pad_push_event (queue->srcpad, event); - GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); /* if we're EOS, return UNEXPECTED so that the task pauses. */ if (type == GST_EVENT_EOS) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, @@ -1753,7 +1761,7 @@ gst_queue2_loop (GstPad * pad) queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); /* have to lock for thread-safety */ - GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); if (gst_queue2_is_empty (queue)) { gboolean started; @@ -1767,7 +1775,7 @@ gst_queue2_loop (GstPad * pad) "queue is empty, waiting for new data"); do { /* Wait for data to be available, we could be unlocked because of a flush. */ - GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing); + GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing); } while (gst_queue2_is_empty (queue)); @@ -1777,6 +1785,7 @@ gst_queue2_loop (GstPad * pad) } ret = gst_queue2_push_one (queue); queue->srcresult = ret; + queue->sinkresult = ret; if (ret != GST_FLOW_OK) goto out_flushing; @@ -1819,13 +1828,42 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) event, GST_EVENT_TYPE_NAME (event)); #endif - if (!QUEUE_IS_USING_TEMP_FILE (queue)) { - /* just forward upstream */ - res = gst_pad_push_event (queue->sinkpad, event); - } else { - /* when using a temp file, we eat the event */ - res = TRUE; - gst_event_unref (event); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + /* just forward upstream */ + res = gst_pad_push_event (queue->sinkpad, event); + } else { + /* now unblock the getrange function */ + GST_QUEUE2_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "flushing"); + queue->srcresult = GST_FLOW_WRONG_STATE; + GST_QUEUE2_SIGNAL_ADD (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); + + /* when using a temp file, we eat the event */ + res = TRUE; + gst_event_unref (event); + } + break; + case GST_EVENT_FLUSH_STOP: + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + /* just forward upstream */ + res = gst_pad_push_event (queue->sinkpad, event); + } else { + /* now unblock the getrange function */ + GST_QUEUE2_MUTEX_LOCK (queue); + queue->srcresult = GST_FLOW_OK; + GST_QUEUE2_MUTEX_UNLOCK (queue); + + /* when using a temp file, we eat the event */ + res = TRUE; + gst_event_unref (event); + } + break; + default: + res = gst_pad_push_event (queue->sinkpad, event); + break; } return res; @@ -1972,7 +2010,7 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad)); - GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); length = (length == -1) ? DEFAULT_BUFFER_SIZE : length; offset = (offset == -1) ? queue->current->reading_pos : offset; @@ -1987,9 +2025,11 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, /* ERRORS */ out_flushing: { + ret = queue->srcresult; + GST_DEBUG_OBJECT (queue, "we are flushing"); GST_QUEUE2_MUTEX_UNLOCK (queue); - return GST_FLOW_WRONG_STATE; + return ret; } } @@ -2022,6 +2062,7 @@ gst_queue2_sink_activate_push (GstPad * pad, gboolean active) GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; + queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; reset_rate_timer (queue); @@ -2031,6 +2072,7 @@ gst_queue2_sink_activate_push (GstPad * pad, gboolean active) GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_WRONG_STATE; + queue->sinkresult = GST_FLOW_WRONG_STATE; gst_queue2_locked_flush (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); } @@ -2054,6 +2096,7 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active) GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating push mode"); queue->srcresult = GST_FLOW_OK; + queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad); @@ -2063,8 +2106,9 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active) GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_WRONG_STATE; + queue->sinkresult = GST_FLOW_WRONG_STATE; /* the item add signal will unblock */ - g_cond_signal (queue->item_add); + GST_QUEUE2_SIGNAL_ADD (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); /* step 2, make sure streaming finishes */ @@ -2093,6 +2137,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active) GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating pull mode"); queue->srcresult = GST_FLOW_OK; + queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; GST_QUEUE2_MUTEX_UNLOCK (queue); @@ -2102,6 +2147,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active) /* this is not allowed, we cannot operate in pull mode without a temp * file. */ queue->srcresult = GST_FLOW_WRONG_STATE; + queue->sinkresult = GST_FLOW_WRONG_STATE; result = FALSE; GST_QUEUE2_MUTEX_UNLOCK (queue); } @@ -2109,8 +2155,9 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active) GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating pull mode"); queue->srcresult = GST_FLOW_WRONG_STATE; + queue->sinkresult = GST_FLOW_WRONG_STATE; /* this will unlock getrange */ - g_cond_signal (queue->item_add); + GST_QUEUE2_SIGNAL_ADD (queue); result = TRUE; GST_QUEUE2_MUTEX_UNLOCK (queue); } @@ -2170,14 +2217,14 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) * the _chain function, it might have more room now * to store the buffer/event in the queue */ #define QUEUE_CAPACITY_CHANGE(q)\ - g_cond_signal (queue->item_del); + GST_QUEUE2_SIGNAL_DEL (queue); /* Changing the minimum required fill level must * wake up the _loop function as it might now * be able to preceed. */ #define QUEUE_THRESHOLD_CHANGE(q)\ - g_cond_signal (queue->item_add); + GST_QUEUE2_SIGNAL_ADD (queue); static void gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template) diff --git a/plugins/elements/gstqueue2.h b/plugins/elements/gstqueue2.h index a47f883..19b5431 100644 --- a/plugins/elements/gstqueue2.h +++ b/plugins/elements/gstqueue2.h @@ -80,6 +80,7 @@ struct _GstQueue2 /* flowreturn when srcpad is paused */ GstFlowReturn srcresult; + GstFlowReturn sinkresult; gboolean is_eos; gboolean unexpected; -- 2.7.4