pads->data = g_slist_delete_link (pads->data, list);
}
pads->numpads--;
+ /* FIXME : if the pad has data queued we should decrease the number of
+ queuedpads */
pads->cookie++;
GST_OBJECT_UNLOCK (pads);
}
static gboolean
+gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
+{
+ GstFlowReturn flow_ret = GST_FLOW_OK;
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
+
+ /* If all our pads are EOS just collect once */
+ if (pads->eospads == pads->numpads) {
+ GST_DEBUG ("All active pads are EOS, calling %s",
+ GST_DEBUG_FUNCPTR_NAME (pads->func));
+ flow_ret = pads->func (pads, pads->user_data);
+ res = TRUE;
+ goto beach;
+ }
+
+ /* We call the collected function as long as our condition matches.
+ FIXME: should we error out if the collect function did not pop anything ?
+ we can get a busy loop here if the element does not pop from the collect
+ function */
+ while (((pads->queuedpads + pads->eospads) >= pads->numpads) && pads->func) {
+ GST_DEBUG ("All active pads have data, calling %s",
+ GST_DEBUG_FUNCPTR_NAME (pads->func));
+ flow_ret = pads->func (pads, pads->user_data);
+ res = TRUE;
+ }
+
+beach:
+ if (!res) {
+ GST_DEBUG ("Not all active pads have data, continuing");
+ }
+
+ if (ret) {
+ *ret = flow_ret;
+ }
+
+ return res;
+}
+
+static gboolean
gst_collect_pads_event (GstPad * pad, GstEvent * event)
{
GstCollectData *data;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
{
- GstFlowReturn ret = GST_FLOW_OK;
-
GST_OBJECT_LOCK (pads);
pads->eospads++;
- /* if all pads are EOS and we have a function, call it */
- if ((pads->eospads == pads->numpads) && pads->func) {
- ret = pads->func (pads, pads->user_data);
- }
+ gst_collect_pads_is_collected (pads, NULL);
GST_OBJECT_UNLOCK (pads);
/* We eat this event */
gst_event_unref (event);
return TRUE;
- break;
}
case GST_EVENT_NEWSEGMENT:
{
}
}
-
+/* For each buffer we receive we check if our collected condition is reached
+ and if so we call the collected function. When this is done we check if
+ data has been unqueued. If data is still queued we wait holding the stream
+ lock to make sure no EOS event can happen while we are ready to be
+ collected */
static GstFlowReturn
gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
{
if (!pads->started)
goto not_started;
- /* Call the collected callback until a pad with a buffer is popped. */
- while (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func)
- ret = pads->func (pads, pads->user_data);
+ GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
+ GST_DEBUG_PAD_NAME (pad));
+
+ /* One more pad has data queued */
+ pads->queuedpads++;
+ gst_buffer_replace (&data->buffer, buffer);
- /* queue buffer on this pad, block if filled */
+ /* Check if our collected condition is matched and call the collected function
+ if it is */
+ gst_collect_pads_is_collected (pads, &ret);
+
+ /* We still have data queued on this pad, wait for something to happen */
while (data->buffer != NULL) {
- GST_DEBUG ("Pad %s:%s already has a buffer queued, waiting",
+ GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
GST_DEBUG_PAD_NAME (pad));
GST_COLLECT_PADS_WAIT (pads);
GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
goto not_started;
}
- GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
- GST_DEBUG_PAD_NAME (pad));
-
- pads->queuedpads++;
- gst_buffer_replace (&data->buffer, buffer);
-
- /* if all pads have data and we have a function, call it */
- if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) {
- GST_DEBUG ("All active pads have data, calling %s",
- GST_DEBUG_FUNCPTR_NAME (pads->func));
- ret = pads->func (pads, pads->user_data);
- } else {
- GST_DEBUG ("Not all active pads have data, continuing");
- ret = GST_FLOW_OK;
- }
GST_OBJECT_UNLOCK (pads);
return ret;