From: Wim Taymans Date: Tue, 19 Jul 2005 17:46:37 +0000 (+0000) Subject: gst/gstqueue.*: Propagate GstFlowReturn more intelligently upstream and output an... X-Git-Tag: RELEASE-0_9_2~205 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=f529a0bafcd29b2516e02a960a90fadd27ab6155;p=platform%2Fupstream%2Fgstreamer.git gst/gstqueue.*: Propagate GstFlowReturn more intelligently upstream and output an ERROR/EOS when streaming stopped du... Original commit message from CVS: * gst/gstqueue.c: (gst_queue_init), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_event), (gst_queue_handle_src_query), (gst_queue_sink_activate_push), (gst_queue_src_activate_push), (gst_queue_change_state), (gst_queue_get_property): * gst/gstqueue.h: Propagate GstFlowReturn more intelligently upstream and output an ERROR/EOS when streaming stopped due to fatal error. --- diff --git a/ChangeLog b/ChangeLog index 42acdcd..8ae0249 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,16 @@ 2005-07-19 Wim Taymans + * gst/gstqueue.c: (gst_queue_init), (gst_queue_handle_sink_event), + (gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_event), + (gst_queue_handle_src_query), (gst_queue_sink_activate_push), + (gst_queue_src_activate_push), (gst_queue_change_state), + (gst_queue_get_property): + * gst/gstqueue.h: + Propagate GstFlowReturn more intelligently upstream and output + an ERROR/EOS when streaming stopped due to fatal error. + +2005-07-19 Wim Taymans + * tools/gst-launch.c: (check_intr), (event_loop), (main): Don't block forever for the state change to complete, the pipeline already did with a sensible timeout. diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 0629763..f8d3708 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -338,7 +338,7 @@ gst_queue_init (GstQueue * queue) queue->leaky = GST_QUEUE_NO_LEAK; queue->may_deadlock = TRUE; queue->block_timeout = GST_CLOCK_TIME_NONE; - queue->flushing = FALSE; + queue->srcresult = GST_FLOW_WRONG_STATE; queue->qlock = g_mutex_new (); queue->item_add = g_cond_new (); @@ -473,22 +473,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) gst_pad_push_event (queue->srcpad, event); if (GST_EVENT_FLUSH_DONE (event)) { GST_QUEUE_MUTEX_LOCK; - queue->flushing = FALSE; + gst_queue_locked_flush (queue); + queue->srcresult = GST_FLOW_OK; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); GST_QUEUE_MUTEX_UNLOCK; } else { /* now unblock the chain function */ GST_QUEUE_MUTEX_LOCK; - queue->flushing = TRUE; - gst_queue_locked_flush (queue); + queue->srcresult = GST_FLOW_WRONG_STATE; /* unblock the loop function */ g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; STATUS (queue, "after flush"); - /* make sure it stops */ + /* make sure it pauses */ gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); } @@ -548,6 +548,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK; + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer)); @@ -559,6 +562,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); GST_QUEUE_MUTEX_LOCK; + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + /* how are we going to make space for this buffer? */ switch (queue->leaky) { /* leak current buffer */ @@ -621,7 +627,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) STATUS (queue, "waiting for item_del signal from thread using qlock"); g_cond_wait (queue->item_del, queue->qlock); - if (queue->flushing) + if (queue->srcresult != GST_FLOW_OK) goto out_flushing; /* if there's a pending state change for this queue @@ -634,6 +640,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) GST_QUEUE_MUTEX_UNLOCK; g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); GST_QUEUE_MUTEX_LOCK; + + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + break; } } @@ -654,6 +664,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) return GST_FLOW_OK; + /* special conditions */ out_unref: { GST_QUEUE_MUTEX_UNLOCK; @@ -664,13 +675,15 @@ out_unref: } out_flushing: { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); + GstFlowReturn ret = queue->srcresult; + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "exit because task paused, reason: %d", ret); GST_QUEUE_MUTEX_UNLOCK; - gst_pad_pause_task (queue->srcpad); gst_buffer_unref (buffer); - return GST_FLOW_WRONG_STATE; + return ret; } } @@ -687,23 +700,28 @@ gst_queue_loop (GstPad * pad) GST_QUEUE_MUTEX_LOCK; restart: + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + while (gst_queue_is_empty (queue)) { GST_QUEUE_MUTEX_UNLOCK; g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); GST_QUEUE_MUTEX_LOCK; + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + STATUS (queue, "pre-empty wait"); while (gst_queue_is_empty (queue)) { STATUS (queue, "waiting for item_add"); - if (queue->flushing) - goto out_flushing; - GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", g_thread_self ()); g_cond_wait (queue->item_add, queue->qlock); - if (queue->flushing) + /* we released the lock in the g_cond above so we might be + * flushing now */ + if (queue->srcresult != GST_FLOW_OK) goto out_flushing; GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p", @@ -715,6 +733,9 @@ restart: GST_QUEUE_MUTEX_UNLOCK; g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); GST_QUEUE_MUTEX_LOCK; + + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; } /* There's something in the list now, whatever it is */ @@ -734,11 +755,23 @@ restart: GST_QUEUE_MUTEX_UNLOCK; result = gst_pad_push (pad, GST_BUFFER (data)); GST_QUEUE_MUTEX_LOCK; + /* can opt to check for srcresult here but the push should + * return an error value that is more accurate */ if (result != GST_FLOW_OK) { + queue->srcresult = result; + if (GST_FLOW_IS_FATAL (result)) { + GST_ELEMENT_ERROR (queue, STREAM, STOPPED, + ("streaming stopped, reason %d", result), + ("streaming stopped, reason %d", result)); + gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS)); + } gst_pad_pause_task (queue->srcpad); } } else { if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) { + /* all incomming data is now unexpected */ + queue->srcresult = GST_FLOW_UNEXPECTED; + /* and we don't need to process anymore */ gst_pad_pause_task (queue->srcpad); restart = FALSE; } @@ -754,13 +787,17 @@ restart: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK; + return; out_flushing: - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); - gst_pad_pause_task (pad); - GST_QUEUE_MUTEX_UNLOCK; - return; + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "exit because task paused, reason: %d", queue->srcresult); + GST_QUEUE_MUTEX_UNLOCK; + + return; + } } @@ -768,15 +805,14 @@ static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event) { gboolean res = TRUE; - -#ifndef GST_DISABLE_GST_DEBUG GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); +#ifndef GST_DISABLE_GST_DEBUG GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", event, GST_EVENT_TYPE (event)); #endif - res = gst_pad_event_default (pad, event); + res = gst_pad_push_event (queue->sinkpad, event); return res; } @@ -785,10 +821,15 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query) { GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); + GstPad *peer; + gboolean res; - if (!GST_PAD_PEER (queue->sinkpad)) + if (!(peer = gst_pad_get_peer (queue->sinkpad))) return FALSE; - if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query)) + + res = gst_pad_query (peer, query); + gst_object_unref (peer); + if (!res) return FALSE; switch (GST_QUERY_TYPE (query)) { @@ -829,15 +870,15 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) gboolean result = FALSE; GstQueue *queue; - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { - queue->flushing = FALSE; + queue->srcresult = GST_FLOW_OK; result = TRUE; } else { /* step 1, unblock chain and loop functions */ GST_QUEUE_MUTEX_LOCK; - queue->flushing = TRUE; + queue->srcresult = GST_FLOW_WRONG_STATE; gst_queue_locked_flush (queue); g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK; @@ -845,6 +886,7 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); } + gst_object_unref (queue); return result; } @@ -855,17 +897,17 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) gboolean result = FALSE; GstQueue *queue; - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { GST_QUEUE_MUTEX_LOCK; - queue->flushing = FALSE; + queue->srcresult = GST_FLOW_OK; result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); GST_QUEUE_MUTEX_UNLOCK; } else { /* step 1, unblock chain and loop functions */ GST_QUEUE_MUTEX_LOCK; - queue->flushing = TRUE; + queue->srcresult = GST_FLOW_WRONG_STATE; g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; @@ -873,6 +915,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) result = gst_pad_stop_task (pad); } + gst_object_unref (queue); + return result; } @@ -886,9 +930,6 @@ gst_queue_change_state (GstElement * element) GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change"); - /* lock the queue so another thread (not in sync with this thread's state) - * can't call this queue's _loop (or whatever) */ - switch (GST_STATE_TRANSITION (element)) { case GST_STATE_NULL_TO_READY: break; @@ -913,8 +954,6 @@ gst_queue_change_state (GstElement * element) break; } - GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); - return ret; } @@ -970,6 +1009,8 @@ gst_queue_get_property (GObject * object, { GstQueue *queue = GST_QUEUE (object); + GST_QUEUE_MUTEX_LOCK; + switch (prop_id) { case ARG_CUR_LEVEL_BYTES: g_value_set_uint (value, queue->cur_level.bytes); @@ -1011,4 +1052,6 @@ gst_queue_get_property (GObject * object, G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + GST_QUEUE_MUTEX_UNLOCK; } diff --git a/gst/gstqueue.h b/gst/gstqueue.h index b0a2479..aec0946 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -62,6 +62,9 @@ struct _GstQueue { GstPad *sinkpad; GstPad *srcpad; + /* flowreturn when srcpad is paused */ + GstFlowReturn srcresult; + /* the queue of data we're keeping our grubby hands on */ GQueue *queue; @@ -79,7 +82,6 @@ struct _GstQueue { /* it the queue should fail on possible deadlocks */ gboolean may_deadlock; - gboolean flushing; GMutex *qlock; /* lock for queue (vs object lock) */ GCond *item_add; /* signals buffers now available for reading */ diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 0629763..f8d3708 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -338,7 +338,7 @@ gst_queue_init (GstQueue * queue) queue->leaky = GST_QUEUE_NO_LEAK; queue->may_deadlock = TRUE; queue->block_timeout = GST_CLOCK_TIME_NONE; - queue->flushing = FALSE; + queue->srcresult = GST_FLOW_WRONG_STATE; queue->qlock = g_mutex_new (); queue->item_add = g_cond_new (); @@ -473,22 +473,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) gst_pad_push_event (queue->srcpad, event); if (GST_EVENT_FLUSH_DONE (event)) { GST_QUEUE_MUTEX_LOCK; - queue->flushing = FALSE; + gst_queue_locked_flush (queue); + queue->srcresult = GST_FLOW_OK; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); GST_QUEUE_MUTEX_UNLOCK; } else { /* now unblock the chain function */ GST_QUEUE_MUTEX_LOCK; - queue->flushing = TRUE; - gst_queue_locked_flush (queue); + queue->srcresult = GST_FLOW_WRONG_STATE; /* unblock the loop function */ g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; STATUS (queue, "after flush"); - /* make sure it stops */ + /* make sure it pauses */ gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); } @@ -548,6 +548,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK; + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer)); @@ -559,6 +562,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); GST_QUEUE_MUTEX_LOCK; + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + /* how are we going to make space for this buffer? */ switch (queue->leaky) { /* leak current buffer */ @@ -621,7 +627,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) STATUS (queue, "waiting for item_del signal from thread using qlock"); g_cond_wait (queue->item_del, queue->qlock); - if (queue->flushing) + if (queue->srcresult != GST_FLOW_OK) goto out_flushing; /* if there's a pending state change for this queue @@ -634,6 +640,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) GST_QUEUE_MUTEX_UNLOCK; g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); GST_QUEUE_MUTEX_LOCK; + + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + break; } } @@ -654,6 +664,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) return GST_FLOW_OK; + /* special conditions */ out_unref: { GST_QUEUE_MUTEX_UNLOCK; @@ -664,13 +675,15 @@ out_unref: } out_flushing: { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); + GstFlowReturn ret = queue->srcresult; + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "exit because task paused, reason: %d", ret); GST_QUEUE_MUTEX_UNLOCK; - gst_pad_pause_task (queue->srcpad); gst_buffer_unref (buffer); - return GST_FLOW_WRONG_STATE; + return ret; } } @@ -687,23 +700,28 @@ gst_queue_loop (GstPad * pad) GST_QUEUE_MUTEX_LOCK; restart: + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + while (gst_queue_is_empty (queue)) { GST_QUEUE_MUTEX_UNLOCK; g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); GST_QUEUE_MUTEX_LOCK; + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; + STATUS (queue, "pre-empty wait"); while (gst_queue_is_empty (queue)) { STATUS (queue, "waiting for item_add"); - if (queue->flushing) - goto out_flushing; - GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", g_thread_self ()); g_cond_wait (queue->item_add, queue->qlock); - if (queue->flushing) + /* we released the lock in the g_cond above so we might be + * flushing now */ + if (queue->srcresult != GST_FLOW_OK) goto out_flushing; GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p", @@ -715,6 +733,9 @@ restart: GST_QUEUE_MUTEX_UNLOCK; g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); GST_QUEUE_MUTEX_LOCK; + + if (queue->srcresult != GST_FLOW_OK) + goto out_flushing; } /* There's something in the list now, whatever it is */ @@ -734,11 +755,23 @@ restart: GST_QUEUE_MUTEX_UNLOCK; result = gst_pad_push (pad, GST_BUFFER (data)); GST_QUEUE_MUTEX_LOCK; + /* can opt to check for srcresult here but the push should + * return an error value that is more accurate */ if (result != GST_FLOW_OK) { + queue->srcresult = result; + if (GST_FLOW_IS_FATAL (result)) { + GST_ELEMENT_ERROR (queue, STREAM, STOPPED, + ("streaming stopped, reason %d", result), + ("streaming stopped, reason %d", result)); + gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS)); + } gst_pad_pause_task (queue->srcpad); } } else { if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) { + /* all incomming data is now unexpected */ + queue->srcresult = GST_FLOW_UNEXPECTED; + /* and we don't need to process anymore */ gst_pad_pause_task (queue->srcpad); restart = FALSE; } @@ -754,13 +787,17 @@ restart: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del"); g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK; + return; out_flushing: - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush"); - gst_pad_pause_task (pad); - GST_QUEUE_MUTEX_UNLOCK; - return; + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "exit because task paused, reason: %d", queue->srcresult); + GST_QUEUE_MUTEX_UNLOCK; + + return; + } } @@ -768,15 +805,14 @@ static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event) { gboolean res = TRUE; - -#ifndef GST_DISABLE_GST_DEBUG GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); +#ifndef GST_DISABLE_GST_DEBUG GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", event, GST_EVENT_TYPE (event)); #endif - res = gst_pad_event_default (pad, event); + res = gst_pad_push_event (queue->sinkpad, event); return res; } @@ -785,10 +821,15 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query) { GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); + GstPad *peer; + gboolean res; - if (!GST_PAD_PEER (queue->sinkpad)) + if (!(peer = gst_pad_get_peer (queue->sinkpad))) return FALSE; - if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query)) + + res = gst_pad_query (peer, query); + gst_object_unref (peer); + if (!res) return FALSE; switch (GST_QUERY_TYPE (query)) { @@ -829,15 +870,15 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) gboolean result = FALSE; GstQueue *queue; - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { - queue->flushing = FALSE; + queue->srcresult = GST_FLOW_OK; result = TRUE; } else { /* step 1, unblock chain and loop functions */ GST_QUEUE_MUTEX_LOCK; - queue->flushing = TRUE; + queue->srcresult = GST_FLOW_WRONG_STATE; gst_queue_locked_flush (queue); g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK; @@ -845,6 +886,7 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active) /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); } + gst_object_unref (queue); return result; } @@ -855,17 +897,17 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) gboolean result = FALSE; GstQueue *queue; - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { GST_QUEUE_MUTEX_LOCK; - queue->flushing = FALSE; + queue->srcresult = GST_FLOW_OK; result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); GST_QUEUE_MUTEX_UNLOCK; } else { /* step 1, unblock chain and loop functions */ GST_QUEUE_MUTEX_LOCK; - queue->flushing = TRUE; + queue->srcresult = GST_FLOW_WRONG_STATE; g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; @@ -873,6 +915,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) result = gst_pad_stop_task (pad); } + gst_object_unref (queue); + return result; } @@ -886,9 +930,6 @@ gst_queue_change_state (GstElement * element) GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change"); - /* lock the queue so another thread (not in sync with this thread's state) - * can't call this queue's _loop (or whatever) */ - switch (GST_STATE_TRANSITION (element)) { case GST_STATE_NULL_TO_READY: break; @@ -913,8 +954,6 @@ gst_queue_change_state (GstElement * element) break; } - GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); - return ret; } @@ -970,6 +1009,8 @@ gst_queue_get_property (GObject * object, { GstQueue *queue = GST_QUEUE (object); + GST_QUEUE_MUTEX_LOCK; + switch (prop_id) { case ARG_CUR_LEVEL_BYTES: g_value_set_uint (value, queue->cur_level.bytes); @@ -1011,4 +1052,6 @@ gst_queue_get_property (GObject * object, G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + GST_QUEUE_MUTEX_UNLOCK; } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index b0a2479..aec0946 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -62,6 +62,9 @@ struct _GstQueue { GstPad *sinkpad; GstPad *srcpad; + /* flowreturn when srcpad is paused */ + GstFlowReturn srcresult; + /* the queue of data we're keeping our grubby hands on */ GQueue *queue; @@ -79,7 +82,6 @@ struct _GstQueue { /* it the queue should fail on possible deadlocks */ gboolean may_deadlock; - gboolean flushing; GMutex *qlock; /* lock for queue (vs object lock) */ GCond *item_add; /* signals buffers now available for reading */