From: Wim Taymans Date: Wed, 25 May 2005 19:33:39 +0000 (+0000) Subject: gst/: Fix state changes for non sinks. We now change sinks, then elements with unconn... X-Git-Tag: RELEASE-0_9_2~455 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=56d9730d2059513388f21fdefbb8c32bc1c6348a;p=platform%2Fupstream%2Fgstreamer.git gst/: Fix state changes for non sinks. We now change sinks, then elements with unconnected srcpads, then the rest. Original commit message from CVS: * gst/gstbin.c: (bin_element_is_sink), (has_ancestor), (bin_element_is_semi_sink), (append_child), (gst_bin_change_state): * gst/gstpad.c: (gst_pad_set_active), (gst_pad_link_prepare), (gst_pad_link), (gst_pad_accept_caps), (gst_pad_query), (gst_pad_send_event), (gst_pad_start_task): * gst/gstqueue.c: (gst_queue_init), (gst_queue_locked_flush), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop), (gst_queue_sink_activate), (gst_queue_src_activate), (gst_queue_change_state): * gst/gstqueue.h: Fix state changes for non sinks. We now change sinks, then elements with unconnected srcpads, then the rest. More efficient queue unlocking in flush and state changes. Set the pad activate mode even if it does not have an activate function. --- diff --git a/ChangeLog b/ChangeLog index d936cc47ef..fca4155f89 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2005-05-25 Wim Taymans + + * gst/gstbin.c: (bin_element_is_sink), (has_ancestor), + (bin_element_is_semi_sink), (append_child), (gst_bin_change_state): + * gst/gstpad.c: (gst_pad_set_active), (gst_pad_link_prepare), + (gst_pad_link), (gst_pad_accept_caps), (gst_pad_query), + (gst_pad_send_event), (gst_pad_start_task): + * gst/gstqueue.c: (gst_queue_init), (gst_queue_locked_flush), + (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop), + (gst_queue_sink_activate), (gst_queue_src_activate), + (gst_queue_change_state): + * gst/gstqueue.h: + Fix state changes for non sinks. We now change sinks, then elements + with unconnected srcpads, then the rest. + More efficient queue unlocking in flush and state changes. + Set the pad activate mode even if it does not have an activate + function. + 2005-05-25 Ronald S. Bultje * gst/base/gstbasesrc.c: (gst_basesrc_activate): diff --git a/gst/gstbin.c b/gst/gstbin.c index abb80f9164..92b33269b1 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -686,25 +686,97 @@ gst_bin_iterate_recurse (GstBin * bin) return result; } +/* returns 0 when TRUE because this is a GCompareFunc */ /* MT safe */ static gint bin_element_is_sink (GstElement * child, GstBin * bin) { + gboolean is_sink; + /* we lock the child here for the remainder of the function to * get its name safely. */ GST_LOCK (child); - if (GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK)) { + is_sink = GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK); + GST_UNLOCK (child); + + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, + "child %s %s sink", GST_OBJECT_NAME (child), is_sink ? "is" : "is not"); + + return is_sink ? 0 : 1; +} + +static gboolean +has_ancestor (GstObject * object, GstObject * ancestor) +{ + GstObject *parent; + gboolean result = FALSE; + + if (object == NULL) + return FALSE; + + if (object == ancestor) + return TRUE; + + parent = gst_object_get_parent (object); + result = has_ancestor (parent, ancestor); + if (parent) + gst_object_unref (GST_OBJECT_CAST (parent)); + + return result; +} + +/* returns 0 when TRUE because this is a GCompareFunc. + * This function returns elements that have no connected srcpads and + * are therefore not reachable from a real sink. */ +/* MT safe */ +static gint +bin_element_is_semi_sink (GstElement * child, GstBin * bin) +{ + int ret = 1; + + /* we lock the child here for the remainder of the function to + * get its pads and name safely. */ + GST_LOCK (child); + + /* check if this is a sink element, these are the elements + * without (linked) source pads. */ + if (child->numsrcpads == 0) { + /* shortcut */ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, - "finding child %s as sink", GST_OBJECT_NAME (child)); - GST_UNLOCK (child); - /* returns 0 because this is a GCompareFunc */ - return 0; + "adding child %s as sink", GST_OBJECT_NAME (child)); + ret = 0; } else { - GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, - "child %s is not a sink", GST_OBJECT_NAME (child)); - GST_UNLOCK (child); - return 1; + /* loop over all pads, try to figure out if this element + * is a semi sink because it has no linked source pads */ + GList *pads; + gboolean connected_src = FALSE; + + for (pads = child->srcpads; pads; pads = g_list_next (pads)) { + GstPad *peer; + + if ((peer = gst_pad_get_peer (GST_PAD_CAST (pads->data)))) { + connected_src = + has_ancestor (GST_OBJECT_CAST (peer), GST_OBJECT_CAST (bin)); + gst_object_unref (GST_OBJECT_CAST (peer)); + if (connected_src) { + break; + } + } + } + if (connected_src) { + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, + "not adding child %s as sink: linked source pads", + GST_OBJECT_NAME (child)); + } else { + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, + "adding child %s as sink since it has unlinked source pads in this bin", + GST_OBJECT_NAME (child)); + ret = 0; + } } + GST_UNLOCK (child); + + return ret; } static gint @@ -828,6 +900,12 @@ restart: return ret; } +static void +append_child (gpointer child, GQueue * queue) +{ + g_queue_push_tail (queue, child); +} + /* this function is called with the STATE_LOCK held. It works * as follows: * @@ -852,7 +930,8 @@ gst_bin_change_state (GstElement * element) GList *children; guint32 children_cookie; GQueue *elem_queue; /* list of elements waiting for a state change */ - GQueue *temp; /* temp queue of non sinks */ + GQueue *semi_queue; /* list of elements with no connected srcpads */ + GQueue *temp; /* temp queue of leftovers */ bin = GST_BIN (element); @@ -871,6 +950,7 @@ gst_bin_change_state (GstElement * element) /* all elements added to this queue should have their refcount * incremented */ elem_queue = g_queue_new (); + semi_queue = g_queue_new (); temp = g_queue_new (); /* first step, find all sink elements, these are the elements @@ -887,6 +967,8 @@ restart: if (bin_element_is_sink (child, bin) == 0) { g_queue_push_tail (elem_queue, child); + } else if (bin_element_is_semi_sink (child, bin) == 0) { + g_queue_push_tail (semi_queue, child); } else { g_queue_push_tail (temp, child); } @@ -895,8 +977,10 @@ restart: if (G_UNLIKELY (children_cookie != bin->children_cookie)) { /* undo what we had */ g_queue_foreach (elem_queue, (GFunc) gst_object_unref, NULL); + g_queue_foreach (semi_queue, (GFunc) gst_object_unref, NULL); g_queue_foreach (temp, (GFunc) gst_object_unref, NULL); while (g_queue_pop_head (elem_queue)); + while (g_queue_pop_head (semi_queue)); while (g_queue_pop_head (temp)); goto restart; } @@ -905,12 +989,17 @@ restart: } GST_UNLOCK (bin); + /* now change state for semi sink elements first so add them in + * front of the other elements */ + g_queue_foreach (temp, (GFunc) append_child, semi_queue); + g_queue_free (temp); + /* can be the case for a bin like ( identity ) */ - if (g_queue_is_empty (elem_queue) && !g_queue_is_empty (temp)) { + if (g_queue_is_empty (elem_queue) && !g_queue_is_empty (semi_queue)) { GQueue *q = elem_queue; - elem_queue = temp; - temp = q; + elem_queue = semi_queue; + semi_queue = q; } /* second step, change state of elements in the queue */ @@ -921,15 +1010,15 @@ restart: /* take element */ qelement = g_queue_pop_head (elem_queue); - /* we don't need it in the temp anymore */ - g_queue_remove_all (temp, qelement); + /* we don't need it in the semi_queue anymore */ + g_queue_remove_all (semi_queue, qelement); /* if queue is empty now, continue with a non-sink */ if (g_queue_is_empty (elem_queue)) { GstElement *non_sink; GST_DEBUG ("sinks and upstream elements exhausted"); - non_sink = g_queue_pop_head (temp); + non_sink = g_queue_pop_head (semi_queue); if (non_sink) { GST_DEBUG ("found lefover non-sink %s", GST_OBJECT_NAME (non_sink)); g_queue_push_tail (elem_queue, non_sink); @@ -1044,8 +1133,8 @@ exit: /* release refcounts in queue, should normally be empty */ g_queue_foreach (elem_queue, (GFunc) gst_object_unref, NULL); g_queue_free (elem_queue); - g_queue_foreach (temp, (GFunc) gst_object_unref, NULL); - g_queue_free (temp); + g_queue_foreach (semi_queue, (GFunc) gst_object_unref, NULL); + g_queue_free (semi_queue); return ret; } diff --git a/gst/gstpad.c b/gst/gstpad.c index f553f5b366..a1d5b5214b 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -506,10 +506,9 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode) GST_LOCK (realpad); if (result == FALSE) goto activate_error; - - /* store the mode */ - GST_RPAD_ACTIVATE_MODE (realpad) = mode; } + /* store the mode */ + GST_RPAD_ACTIVATE_MODE (realpad) = mode; /* when going to active allow data passing now */ if (active) { @@ -529,7 +528,8 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode) was_ok: { GST_CAT_DEBUG (GST_CAT_PADS, - "pad %s:%s was active", GST_DEBUG_PAD_NAME (realpad)); + "pad %s:%s was active, old %d, new %d", + GST_DEBUG_PAD_NAME (realpad), old, mode); GST_UNLOCK (realpad); return TRUE; } diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 256d0d10eb..5fe3fb1e75 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -141,6 +141,7 @@ static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer); static void gst_queue_locked_flush (GstQueue * queue); static gboolean gst_queue_src_activate (GstPad * pad, GstActivateMode mode); +static gboolean gst_queue_sink_activate (GstPad * pad, GstActivateMode mode); static GstElementStateReturn gst_queue_change_state (GstElement * element); @@ -299,6 +300,8 @@ gst_queue_init (GstQueue * queue) "sink"); gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_chain)); + gst_pad_set_activate_function (queue->sinkpad, + GST_DEBUG_FUNCPTR (gst_queue_sink_activate)); gst_pad_set_event_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event)); gst_pad_set_link_function (queue->sinkpad, @@ -338,7 +341,7 @@ gst_queue_init (GstQueue * queue) queue->leaky = GST_QUEUE_NO_LEAK; queue->may_deadlock = TRUE; queue->block_timeout = GST_CLOCK_TIME_NONE; - queue->flush = FALSE; + queue->flushing = FALSE; queue->qlock = g_mutex_new (); queue->item_add = g_cond_new (); @@ -446,9 +449,6 @@ gst_queue_locked_flush (GstQueue * queue) queue->cur_level.bytes = 0; queue->cur_level.time = 0; - /* make sure any pending buffers to be added are flushed too */ - queue->flush = TRUE; - /* we deleted something... */ g_cond_signal (queue->item_del); } @@ -483,19 +483,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) /* forward event */ gst_pad_event_default (pad, event); if (GST_EVENT_FLUSH_DONE (event)) { + GST_QUEUE_MUTEX_LOCK; + queue->flushing = FALSE; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); + GST_QUEUE_MUTEX_UNLOCK; } else { /* now unblock the chain function */ GST_QUEUE_MUTEX_LOCK; + queue->flushing = TRUE; gst_queue_locked_flush (queue); + /* unblock the loop function */ + g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; STATUS (queue, "after flush"); - /* unblock the loop function */ - g_cond_signal (queue->item_add); - /* make sure it stops */ gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); @@ -628,8 +631,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) STATUS (queue, "waiting for item_del signal from thread using qlock"); g_cond_wait (queue->item_del, queue->qlock); - if (GST_RPAD_IS_FLUSHING (pad)) - goto out_flushing; + if (queue->flushing) +#if 0 + if (GST_RPAD_IS_FLUSHING (pad)) +#endif + goto out_flushing; /* if there's a pending state change for this queue * or its manager, switch back to iterator so bottom @@ -644,9 +650,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) break; } } +#if 0 /* we are flushing */ if (GST_RPAD_IS_FLUSHING (pad)) goto out_flushing; +#endif g_queue_push_tail (queue->queue, buffer); @@ -703,17 +711,30 @@ restart: while (gst_queue_is_empty (queue)) { STATUS (queue, "waiting for item_add"); +#if 0 /* we are flushing */ + if (GST_RPAD_IS_FLUSHING (pad)) + goto out_flushing; if (GST_RPAD_IS_FLUSHING (queue->sinkpad)) goto out_flushing; +#endif + + if (queue->flushing) + goto out_flushing; GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", g_thread_self ()); g_cond_wait (queue->item_add, queue->qlock); + if (queue->flushing) + goto out_flushing; +#if 0 /* we got unlocked because we are flushing */ + if (GST_RPAD_IS_FLUSHING (pad)) + goto out_flushing; if (GST_RPAD_IS_FLUSHING (queue->sinkpad)) goto out_flushing; +#endif GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p", g_thread_self ()); @@ -843,28 +864,67 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) } static gboolean -gst_queue_src_activate (GstPad * pad, GstActivateMode mode) +gst_queue_sink_activate (GstPad * pad, GstActivateMode mode) { gboolean result = FALSE; GstQueue *queue; queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - if (mode == GST_ACTIVATE_PUSH) { - result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); - } else { - /* step 1, unblock chain and loop functions */ - GST_QUEUE_MUTEX_LOCK; - g_cond_signal (queue->item_add); - g_cond_signal (queue->item_del); - GST_QUEUE_MUTEX_UNLOCK; + switch (mode) { + case GST_ACTIVATE_PUSH: + queue->flushing = FALSE; + result = TRUE; + break; + case GST_ACTIVATE_PULL: + result = FALSE; + break; + case GST_ACTIVATE_NONE: + /* step 1, unblock chain and loop functions */ + GST_QUEUE_MUTEX_LOCK; + queue->flushing = TRUE; + gst_queue_locked_flush (queue); + g_cond_signal (queue->item_del); + GST_QUEUE_MUTEX_UNLOCK; - /* step 2, make sure streaming finishes */ - result = gst_pad_stop_task (pad); + /* step 2, make sure streaming finishes */ + result = gst_pad_stop_task (pad); + break; } return result; } +static gboolean +gst_queue_src_activate (GstPad * pad, GstActivateMode mode) +{ + gboolean result = FALSE; + GstQueue *queue; + + queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + + switch (mode) { + case GST_ACTIVATE_PUSH: + GST_QUEUE_MUTEX_LOCK; + queue->flushing = FALSE; + result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); + GST_QUEUE_MUTEX_UNLOCK; + break; + case GST_ACTIVATE_PULL: + result = FALSE; + break; + case GST_ACTIVATE_NONE: + /* step 1, unblock chain and loop functions */ + GST_QUEUE_MUTEX_LOCK; + queue->flushing = TRUE; + g_cond_signal (queue->item_add); + GST_QUEUE_MUTEX_UNLOCK; + + /* step 2, make sure streaming finishes */ + result = gst_pad_stop_task (pad); + break; + } + return result; +} static GstElementStateReturn gst_queue_change_state (GstElement * element) @@ -883,9 +943,6 @@ gst_queue_change_state (GstElement * element) case GST_STATE_NULL_TO_READY: break; case GST_STATE_READY_TO_PAUSED: - GST_QUEUE_MUTEX_LOCK; - gst_queue_locked_flush (queue); - GST_QUEUE_MUTEX_UNLOCK; break; case GST_STATE_PAUSED_TO_PLAYING: break; @@ -899,9 +956,6 @@ gst_queue_change_state (GstElement * element) case GST_STATE_PLAYING_TO_PAUSED: break; case GST_STATE_PAUSED_TO_READY: - GST_QUEUE_MUTEX_LOCK; - gst_queue_locked_flush (queue); - GST_QUEUE_MUTEX_UNLOCK; break; case GST_STATE_READY_TO_NULL: break; diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 220a5cb456..b0a2479036 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -79,7 +79,7 @@ struct _GstQueue { /* it the queue should fail on possible deadlocks */ gboolean may_deadlock; - gboolean flush; + gboolean flushing; GMutex *qlock; /* lock for queue (vs object lock) */ GCond *item_add; /* signals buffers now available for reading */ diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 256d0d10eb..5fe3fb1e75 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -141,6 +141,7 @@ static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer); static void gst_queue_locked_flush (GstQueue * queue); static gboolean gst_queue_src_activate (GstPad * pad, GstActivateMode mode); +static gboolean gst_queue_sink_activate (GstPad * pad, GstActivateMode mode); static GstElementStateReturn gst_queue_change_state (GstElement * element); @@ -299,6 +300,8 @@ gst_queue_init (GstQueue * queue) "sink"); gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_chain)); + gst_pad_set_activate_function (queue->sinkpad, + GST_DEBUG_FUNCPTR (gst_queue_sink_activate)); gst_pad_set_event_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event)); gst_pad_set_link_function (queue->sinkpad, @@ -338,7 +341,7 @@ gst_queue_init (GstQueue * queue) queue->leaky = GST_QUEUE_NO_LEAK; queue->may_deadlock = TRUE; queue->block_timeout = GST_CLOCK_TIME_NONE; - queue->flush = FALSE; + queue->flushing = FALSE; queue->qlock = g_mutex_new (); queue->item_add = g_cond_new (); @@ -446,9 +449,6 @@ gst_queue_locked_flush (GstQueue * queue) queue->cur_level.bytes = 0; queue->cur_level.time = 0; - /* make sure any pending buffers to be added are flushed too */ - queue->flush = TRUE; - /* we deleted something... */ g_cond_signal (queue->item_del); } @@ -483,19 +483,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) /* forward event */ gst_pad_event_default (pad, event); if (GST_EVENT_FLUSH_DONE (event)) { + GST_QUEUE_MUTEX_LOCK; + queue->flushing = FALSE; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); + GST_QUEUE_MUTEX_UNLOCK; } else { /* now unblock the chain function */ GST_QUEUE_MUTEX_LOCK; + queue->flushing = TRUE; gst_queue_locked_flush (queue); + /* unblock the loop function */ + g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK; STATUS (queue, "after flush"); - /* unblock the loop function */ - g_cond_signal (queue->item_add); - /* make sure it stops */ gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); @@ -628,8 +631,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) STATUS (queue, "waiting for item_del signal from thread using qlock"); g_cond_wait (queue->item_del, queue->qlock); - if (GST_RPAD_IS_FLUSHING (pad)) - goto out_flushing; + if (queue->flushing) +#if 0 + if (GST_RPAD_IS_FLUSHING (pad)) +#endif + goto out_flushing; /* if there's a pending state change for this queue * or its manager, switch back to iterator so bottom @@ -644,9 +650,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer) break; } } +#if 0 /* we are flushing */ if (GST_RPAD_IS_FLUSHING (pad)) goto out_flushing; +#endif g_queue_push_tail (queue->queue, buffer); @@ -703,17 +711,30 @@ restart: while (gst_queue_is_empty (queue)) { STATUS (queue, "waiting for item_add"); +#if 0 /* we are flushing */ + if (GST_RPAD_IS_FLUSHING (pad)) + goto out_flushing; if (GST_RPAD_IS_FLUSHING (queue->sinkpad)) goto out_flushing; +#endif + + if (queue->flushing) + goto out_flushing; GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", g_thread_self ()); g_cond_wait (queue->item_add, queue->qlock); + if (queue->flushing) + goto out_flushing; +#if 0 /* we got unlocked because we are flushing */ + if (GST_RPAD_IS_FLUSHING (pad)) + goto out_flushing; if (GST_RPAD_IS_FLUSHING (queue->sinkpad)) goto out_flushing; +#endif GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p", g_thread_self ()); @@ -843,28 +864,67 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) } static gboolean -gst_queue_src_activate (GstPad * pad, GstActivateMode mode) +gst_queue_sink_activate (GstPad * pad, GstActivateMode mode) { gboolean result = FALSE; GstQueue *queue; queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - if (mode == GST_ACTIVATE_PUSH) { - result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); - } else { - /* step 1, unblock chain and loop functions */ - GST_QUEUE_MUTEX_LOCK; - g_cond_signal (queue->item_add); - g_cond_signal (queue->item_del); - GST_QUEUE_MUTEX_UNLOCK; + switch (mode) { + case GST_ACTIVATE_PUSH: + queue->flushing = FALSE; + result = TRUE; + break; + case GST_ACTIVATE_PULL: + result = FALSE; + break; + case GST_ACTIVATE_NONE: + /* step 1, unblock chain and loop functions */ + GST_QUEUE_MUTEX_LOCK; + queue->flushing = TRUE; + gst_queue_locked_flush (queue); + g_cond_signal (queue->item_del); + GST_QUEUE_MUTEX_UNLOCK; - /* step 2, make sure streaming finishes */ - result = gst_pad_stop_task (pad); + /* step 2, make sure streaming finishes */ + result = gst_pad_stop_task (pad); + break; } return result; } +static gboolean +gst_queue_src_activate (GstPad * pad, GstActivateMode mode) +{ + gboolean result = FALSE; + GstQueue *queue; + + queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + + switch (mode) { + case GST_ACTIVATE_PUSH: + GST_QUEUE_MUTEX_LOCK; + queue->flushing = FALSE; + result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); + GST_QUEUE_MUTEX_UNLOCK; + break; + case GST_ACTIVATE_PULL: + result = FALSE; + break; + case GST_ACTIVATE_NONE: + /* step 1, unblock chain and loop functions */ + GST_QUEUE_MUTEX_LOCK; + queue->flushing = TRUE; + g_cond_signal (queue->item_add); + GST_QUEUE_MUTEX_UNLOCK; + + /* step 2, make sure streaming finishes */ + result = gst_pad_stop_task (pad); + break; + } + return result; +} static GstElementStateReturn gst_queue_change_state (GstElement * element) @@ -883,9 +943,6 @@ gst_queue_change_state (GstElement * element) case GST_STATE_NULL_TO_READY: break; case GST_STATE_READY_TO_PAUSED: - GST_QUEUE_MUTEX_LOCK; - gst_queue_locked_flush (queue); - GST_QUEUE_MUTEX_UNLOCK; break; case GST_STATE_PAUSED_TO_PLAYING: break; @@ -899,9 +956,6 @@ gst_queue_change_state (GstElement * element) case GST_STATE_PLAYING_TO_PAUSED: break; case GST_STATE_PAUSED_TO_READY: - GST_QUEUE_MUTEX_LOCK; - gst_queue_locked_flush (queue); - GST_QUEUE_MUTEX_UNLOCK; break; case GST_STATE_READY_TO_NULL: break; diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 220a5cb456..b0a2479036 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -79,7 +79,7 @@ struct _GstQueue { /* it the queue should fail on possible deadlocks */ gboolean may_deadlock; - gboolean flush; + gboolean flushing; GMutex *qlock; /* lock for queue (vs object lock) */ GCond *item_add; /* signals buffers now available for reading */