From ed8fcd46b268a6d73224da259f54f770097e3824 Mon Sep 17 00:00:00 2001 From: Julien Moutte Date: Wed, 14 Dec 2005 17:08:36 +0000 Subject: [PATCH] libs/gst/base/gstcollectpads.c: Refactoring of collectpads. This version removes a lot of races without touching API/... Original commit message from CVS: 2005-12-14 Julien MOUTTE * libs/gst/base/gstcollectpads.c: (gst_collect_pads_base_init), (gst_collect_pads_remove_pad), (gst_collect_pads_is_collected), (gst_collect_pads_event), (gst_collect_pads_chain): Refactoring of collectpads. This version removes a lot of races without touching API/ABI. Yay ! --- ChangeLog | 8 ++++ libs/gst/base/gstcollectpads.c | 88 ++++++++++++++++++++++++++++-------------- 2 files changed, 68 insertions(+), 28 deletions(-) diff --git a/ChangeLog b/ChangeLog index ed943d6..fbab0bc 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +2005-12-14 Julien MOUTTE + + * libs/gst/base/gstcollectpads.c: (gst_collect_pads_base_init), + (gst_collect_pads_remove_pad), (gst_collect_pads_is_collected), + (gst_collect_pads_event), (gst_collect_pads_chain): Refactoring + of collectpads. This version removes a lot of races without + touching API/ABI. Yay ! + 2005-12-14 Jan Schmidt * gst/gstpad.c: (gst_pad_activate_pull), (gst_pad_link_prepare): diff --git a/libs/gst/base/gstcollectpads.c b/libs/gst/base/gstcollectpads.c index affa92d..e7ce98f 100644 --- a/libs/gst/base/gstcollectpads.c +++ b/libs/gst/base/gstcollectpads.c @@ -246,6 +246,8 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad) pads->data = g_slist_delete_link (pads->data, list); } pads->numpads--; + /* FIXME : if the pad has data queued we should decrease the number of + queuedpads */ pads->cookie++; GST_OBJECT_UNLOCK (pads); @@ -562,6 +564,46 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data, } static gboolean +gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret) +{ + GstFlowReturn flow_ret = GST_FLOW_OK; + gboolean res = FALSE; + + g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE); + + /* If all our pads are EOS just collect once */ + if (pads->eospads == pads->numpads) { + GST_DEBUG ("All active pads are EOS, calling %s", + GST_DEBUG_FUNCPTR_NAME (pads->func)); + flow_ret = pads->func (pads, pads->user_data); + res = TRUE; + goto beach; + } + + /* We call the collected function as long as our condition matches. + FIXME: should we error out if the collect function did not pop anything ? + we can get a busy loop here if the element does not pop from the collect + function */ + while (((pads->queuedpads + pads->eospads) >= pads->numpads) && pads->func) { + GST_DEBUG ("All active pads have data, calling %s", + GST_DEBUG_FUNCPTR_NAME (pads->func)); + flow_ret = pads->func (pads, pads->user_data); + res = TRUE; + } + +beach: + if (!res) { + GST_DEBUG ("Not all active pads have data, continuing"); + } + + if (ret) { + *ret = flow_ret; + } + + return res; +} + +static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event) { GstCollectData *data; @@ -580,23 +622,17 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: { - GstFlowReturn ret = GST_FLOW_OK; - GST_OBJECT_LOCK (pads); pads->eospads++; - /* if all pads are EOS and we have a function, call it */ - if ((pads->eospads == pads->numpads) && pads->func) { - ret = pads->func (pads, pads->user_data); - } + gst_collect_pads_is_collected (pads, NULL); GST_OBJECT_UNLOCK (pads); /* We eat this event */ gst_event_unref (event); return TRUE; - break; } case GST_EVENT_NEWSEGMENT: { @@ -627,7 +663,11 @@ not_ours: } } - +/* For each buffer we receive we check if our collected condition is reached + and if so we call the collected function. When this is done we check if + data has been unqueued. If data is still queued we wait holding the stream + lock to make sure no EOS event can happen while we are ready to be + collected */ static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer) { @@ -652,13 +692,20 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer) if (!pads->started) goto not_started; - /* Call the collected callback until a pad with a buffer is popped. */ - while (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) - ret = pads->func (pads, pads->user_data); + GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer, + GST_DEBUG_PAD_NAME (pad)); + + /* One more pad has data queued */ + pads->queuedpads++; + gst_buffer_replace (&data->buffer, buffer); - /* queue buffer on this pad, block if filled */ + /* Check if our collected condition is matched and call the collected function + if it is */ + gst_collect_pads_is_collected (pads, &ret); + + /* We still have data queued on this pad, wait for something to happen */ while (data->buffer != NULL) { - GST_DEBUG ("Pad %s:%s already has a buffer queued, waiting", + GST_DEBUG ("Pad %s:%s has a buffer queued, waiting", GST_DEBUG_PAD_NAME (pad)); GST_COLLECT_PADS_WAIT (pads); GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad)); @@ -667,21 +714,6 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer) goto not_started; } - GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer, - GST_DEBUG_PAD_NAME (pad)); - - pads->queuedpads++; - gst_buffer_replace (&data->buffer, buffer); - - /* if all pads have data and we have a function, call it */ - if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) { - GST_DEBUG ("All active pads have data, calling %s", - GST_DEBUG_FUNCPTR_NAME (pads->func)); - ret = pads->func (pads, pads->user_data); - } else { - GST_DEBUG ("Not all active pads have data, continuing"); - ret = GST_FLOW_OK; - } GST_OBJECT_UNLOCK (pads); return ret; -- 2.7.4