Imported Upstream version 0.10.36
[profile/ivi/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  *
4  * gstcollectpads.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21 /**
22  * SECTION:gstcollectpads
23  * @short_description: manages a set of pads that operate in collect mode
24  * @see_also:
25  *
26  * Manages a set of pads that operate in collect mode. This means that control
27  * is given to the manager of this object when all pads have data.
28  * <itemizedlist>
29  *   <listitem><para>
30  *     Collectpads are created with gst_collect_pads_new(). A callback should then
31  *     be installed with gst_collect_pads_set_function ().
32  *   </para></listitem>
33  *   <listitem><para>
34  *     Pads are added to the collection with gst_collect_pads_add_pad()/
35  *     gst_collect_pads_remove_pad(). The pad
36  *     has to be a sinkpad. The chain and event functions of the pad are
37  *     overridden. The element_private of the pad is used to store
38  *     private information for the collectpads.
39  *   </para></listitem>
40  *   <listitem><para>
41  *     For each pad, data is queued in the _chain function or by
42  *     performing a pull_range.
43  *   </para></listitem>
44  *   <listitem><para>
45  *     When data is queued on all pads, the callback function is called.
46  *   </para></listitem>
47  *   <listitem><para>
48  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
49  *     One can peek at the data with the gst_collect_pads_peek() function.
50  *     These functions will return NULL if the pad received an EOS event. When all
51  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
52  *     event itself.
53  *   </para></listitem>
54  *   <listitem><para>
55  *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
56  *     gst_collect_pads_read() and gst_collect_pads_flush() calls.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
60  *     their state change functions to start and stop the processing of the collecpads.
61  *     The gst_collect_pads_stop() call should be called before calling the parent
62  *     element state change function in the PAUSED_TO_READY state change to ensure
63  *     no pad is blocked and the element can finish streaming.
64  *   </para></listitem>
65  *   <listitem><para>
66  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
67  *     elements that start a #GstTask to drive the collect_pads. This feature is however
68  *     not yet implemented.
69  *   </para></listitem>
70  * </itemizedlist>
71  *
72  * Last reviewed on 2006-05-10 (0.10.6)
73  */
74
75 #include "gstcollectpads.h"
76 #include "gst/glib-compat-private.h"
77
78 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
79 #define GST_CAT_DEFAULT collect_pads_debug
80
81 #define GST_COLLECT_PADS_GET_PRIVATE(obj)  \
82   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_COLLECT_PADS, GstCollectPadsPrivate))
83
84 struct _GstCollectPadsPrivate
85 {
86   GstCollectPadsClipFunction clipfunc;
87   gpointer clipfunc_user_data;
88 };
89
90 GST_BOILERPLATE (GstCollectPads, gst_collect_pads, GstObject, GST_TYPE_OBJECT);
91
92 static void gst_collect_pads_clear (GstCollectPads * pads,
93     GstCollectData * data);
94 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer);
95 static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event);
96 static void gst_collect_pads_finalize (GObject * object);
97 static void ref_data (GstCollectData * data);
98 static void unref_data (GstCollectData * data);
99 static void gst_collect_pads_check_pads_unlocked (GstCollectPads * pads);
100
101 static void
102 gst_collect_pads_base_init (gpointer g_class)
103 {
104   /* Do nothing here */
105 }
106
107 static void
108 gst_collect_pads_class_init (GstCollectPadsClass * klass)
109 {
110   GObjectClass *gobject_class = (GObjectClass *) klass;
111
112   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
113
114   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
115       "GstCollectPads");
116
117   gobject_class->finalize = gst_collect_pads_finalize;
118 }
119
120 static void
121 gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class)
122 {
123   pads->abidata.ABI.priv = GST_COLLECT_PADS_GET_PRIVATE (pads);
124
125   pads->cond = g_cond_new ();
126   pads->data = NULL;
127   pads->cookie = 0;
128   pads->numpads = 0;
129   pads->queuedpads = 0;
130   pads->eospads = 0;
131   pads->started = FALSE;
132
133   /* members to manage the pad list */
134   pads->abidata.ABI.pad_lock = g_mutex_new ();
135   pads->abidata.ABI.pad_cookie = 0;
136   pads->abidata.ABI.pad_list = NULL;
137 }
138
139 static void
140 gst_collect_pads_finalize (GObject * object)
141 {
142   GSList *collected;
143   GstCollectPads *pads = GST_COLLECT_PADS (object);
144
145   GST_DEBUG ("finalize");
146
147   g_cond_free (pads->cond);
148   g_mutex_free (pads->abidata.ABI.pad_lock);
149
150   /* Remove pads */
151   collected = pads->abidata.ABI.pad_list;
152   for (; collected; collected = g_slist_next (collected)) {
153     GstCollectData *pdata = (GstCollectData *) collected->data;
154
155     unref_data (pdata);
156   }
157   /* Free pads list */
158   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
159   g_slist_free (pads->data);
160   g_slist_free (pads->abidata.ABI.pad_list);
161
162   G_OBJECT_CLASS (parent_class)->finalize (object);
163 }
164
165 /**
166  * gst_collect_pads_new:
167  *
168  * Create a new instance of #GstCollectPads.
169  *
170  * MT safe.
171  *
172  * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
173  */
174 GstCollectPads *
175 gst_collect_pads_new (void)
176 {
177   GstCollectPads *newcoll;
178
179   newcoll = g_object_newv (GST_TYPE_COLLECT_PADS, 0, NULL);
180
181   return newcoll;
182 }
183
184 /**
185  * gst_collect_pads_set_function:
186  * @pads: the collectspads to use
187  * @func: the function to set
188  * @user_data: (closure): user data passed to the function
189  *
190  * Set the callback function and user data that will be called when
191  * all the pads added to the collection have buffers queued.
192  *
193  * MT safe.
194  */
195 void
196 gst_collect_pads_set_function (GstCollectPads * pads,
197     GstCollectPadsFunction func, gpointer user_data)
198 {
199   g_return_if_fail (pads != NULL);
200   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
201
202   GST_OBJECT_LOCK (pads);
203   pads->func = func;
204   pads->user_data = user_data;
205   GST_OBJECT_UNLOCK (pads);
206 }
207
208 static void
209 ref_data (GstCollectData * data)
210 {
211   g_assert (data != NULL);
212
213   g_atomic_int_inc (&(data->abidata.ABI.refcount));
214 }
215
216 static void
217 unref_data (GstCollectData * data)
218 {
219   GstCollectDataDestroyNotify destroy_notify;
220
221   g_assert (data != NULL);
222   g_assert (data->abidata.ABI.refcount > 0);
223
224   if (!g_atomic_int_dec_and_test (&(data->abidata.ABI.refcount)))
225     return;
226
227   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
228   destroy_notify = (GstCollectDataDestroyNotify)
229       g_object_get_data (G_OBJECT (data->pad),
230       "gst-collect-data-destroy-notify");
231
232   if (destroy_notify)
233     destroy_notify (data);
234
235   g_object_unref (data->pad);
236   if (data->buffer) {
237     gst_buffer_unref (data->buffer);
238   }
239   g_free (data);
240 }
241
242 /**
243  * gst_collect_pads_add_pad:
244  * @pads: the collectspads to use
245  * @pad: (transfer none): the pad to add
246  * @size: the size of the returned #GstCollectData structure
247  *
248  * Add a pad to the collection of collect pads. The pad has to be
249  * a sinkpad. The refcount of the pad is incremented. Use
250  * gst_collect_pads_remove_pad() to remove the pad from the collection
251  * again.
252  *
253  * This function will override the chain and event functions of the pad
254  * along with the element_private data, which is used to store private
255  * information for the collectpads.
256  *
257  * You specify a size for the returned #GstCollectData structure
258  * so that you can use it to store additional information.
259  *
260  * The pad will be automatically activated in push mode when @pads is
261  * started.
262  *
263  * This function calls gst_collect_pads_add_pad_full() passing a value of NULL
264  * for destroy_notify.
265  *
266  * MT safe.
267  *
268  * Returns: a new #GstCollectData to identify the new pad. Or NULL
269  *   if wrong parameters are supplied.
270  */
271 GstCollectData *
272 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
273 {
274   return gst_collect_pads_add_pad_full (pads, pad, size, NULL);
275 }
276
277 /**
278  * gst_collect_pads_add_pad_full:
279  * @pads: the collectspads to use
280  * @pad: (transfer none): the pad to add
281  * @size: the size of the returned #GstCollectData structure
282  * @destroy_notify: function to be called before the returned #GstCollectData
283  * structure is freed
284  *
285  * Add a pad to the collection of collect pads. The pad has to be
286  * a sinkpad. The refcount of the pad is incremented. Use
287  * gst_collect_pads_remove_pad() to remove the pad from the collection
288  * again.
289  *
290  * You specify a size for the returned #GstCollectData structure
291  * so that you can use it to store additional information.
292  *
293  * You can also specify a #GstCollectDataDestroyNotify that will be called
294  * just before the #GstCollectData structure is freed. It is passed the
295  * pointer to the structure and should free any custom memory and resources
296  * allocated for it.
297  *
298  * The pad will be automatically activated in push mode when @pads is
299  * started.
300  *
301  * MT safe.
302  *
303  * Since: 0.10.12
304  *
305  * Returns: a new #GstCollectData to identify the new pad. Or NULL
306  *   if wrong parameters are supplied.
307  */
308 GstCollectData *
309 gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad, guint size,
310     GstCollectDataDestroyNotify destroy_notify)
311 {
312   GstCollectData *data;
313
314   g_return_val_if_fail (pads != NULL, NULL);
315   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
316   g_return_val_if_fail (pad != NULL, NULL);
317   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
318   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
319
320   GST_DEBUG ("adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
321
322   data = g_malloc0 (size);
323   data->collect = pads;
324   data->pad = gst_object_ref (pad);
325   data->buffer = NULL;
326   data->pos = 0;
327   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
328   data->abidata.ABI.flushing = FALSE;
329   data->abidata.ABI.new_segment = FALSE;
330   data->abidata.ABI.eos = FALSE;
331   data->abidata.ABI.refcount = 1;
332
333   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
334   g_object_set_data (G_OBJECT (pad), "gst-collect-data-destroy-notify",
335       (void *) destroy_notify);
336
337   GST_COLLECT_PADS_PAD_LOCK (pads);
338   GST_OBJECT_LOCK (pad);
339   gst_pad_set_element_private (pad, data);
340   GST_OBJECT_UNLOCK (pad);
341   pads->abidata.ABI.pad_list =
342       g_slist_append (pads->abidata.ABI.pad_list, data);
343   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
344   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
345   /* activate the pad when needed */
346   if (pads->started)
347     gst_pad_set_active (pad, TRUE);
348   pads->abidata.ABI.pad_cookie++;
349   GST_COLLECT_PADS_PAD_UNLOCK (pads);
350
351   return data;
352 }
353
354 static gint
355 find_pad (GstCollectData * data, GstPad * pad)
356 {
357   if (data->pad == pad)
358     return 0;
359   return 1;
360 }
361
362 /**
363  * gst_collect_pads_set_clip_function:
364  * @pads: the collectspads to use
365  * @clipfunc: clip function to install
366  * @user_data: (closure): user data to pass to @clip_func
367  *
368  * Install a clipping function that is called right after a buffer is received
369  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
370  *
371  * Since: 0.10.26
372  */
373 void
374 gst_collect_pads_set_clip_function (GstCollectPads * pads,
375     GstCollectPadsClipFunction clipfunc, gpointer user_data)
376 {
377   GstCollectPadsPrivate *priv;
378
379   g_return_if_fail (pads != NULL);
380   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
381
382   priv = pads->abidata.ABI.priv;
383
384   priv->clipfunc = clipfunc;
385   priv->clipfunc_user_data = user_data;
386 }
387
388 /**
389  * gst_collect_pads_remove_pad:
390  * @pads: the collectspads to use
391  * @pad: (transfer none): the pad to remove
392  *
393  * Remove a pad from the collection of collect pads. This function will also
394  * free the #GstCollectData and all the resources that were allocated with
395  * gst_collect_pads_add_pad().
396  *
397  * The pad will be deactivated automatically when @pads is stopped.
398  *
399  * MT safe.
400  *
401  * Returns: %TRUE if the pad could be removed.
402  */
403 gboolean
404 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
405 {
406   GstCollectData *data;
407   GSList *list;
408
409   g_return_val_if_fail (pads != NULL, FALSE);
410   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
411   g_return_val_if_fail (pad != NULL, FALSE);
412   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
413
414   GST_DEBUG ("removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
415
416   GST_COLLECT_PADS_PAD_LOCK (pads);
417   list =
418       g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
419       (GCompareFunc) find_pad);
420   if (!list)
421     goto unknown_pad;
422
423   data = (GstCollectData *) list->data;
424
425   GST_DEBUG ("found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad), data);
426
427   /* clear the stuff we configured */
428   gst_pad_set_chain_function (pad, NULL);
429   gst_pad_set_event_function (pad, NULL);
430   GST_OBJECT_LOCK (pad);
431   gst_pad_set_element_private (pad, NULL);
432   GST_OBJECT_UNLOCK (pad);
433
434   /* backward compat, also remove from data if stopped, note that this function
435    * can only be called when we are stopped because we don't take the LOCK to
436    * protect the pads->data list. */
437   if (!pads->started) {
438     GSList *dlist;
439
440     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
441     if (dlist) {
442       GstCollectData *pdata = dlist->data;
443
444       pads->data = g_slist_delete_link (pads->data, dlist);
445       unref_data (pdata);
446     }
447   }
448   /* remove from the pad list */
449   pads->abidata.ABI.pad_list =
450       g_slist_delete_link (pads->abidata.ABI.pad_list, list);
451   pads->abidata.ABI.pad_cookie++;
452
453   /* signal waiters because something changed */
454   GST_COLLECT_PADS_BROADCAST (pads);
455
456   /* deactivate the pad when needed */
457   if (!pads->started)
458     gst_pad_set_active (pad, FALSE);
459
460   /* clean and free the collect data */
461   unref_data (data);
462
463   GST_COLLECT_PADS_PAD_UNLOCK (pads);
464
465   return TRUE;
466
467 unknown_pad:
468   {
469     GST_WARNING ("cannot remove unknown pad %s:%s", GST_DEBUG_PAD_NAME (pad));
470     GST_COLLECT_PADS_PAD_UNLOCK (pads);
471     return FALSE;
472   }
473 }
474
475 /**
476  * gst_collect_pads_is_active:
477  * @pads: (transfer none): the collectspads to use
478  * @pad: the pad to check
479  *
480  * Check if a pad is active.
481  *
482  * This function is currently not implemented.
483  *
484  * MT safe.
485  *
486  * Returns: %TRUE if the pad is active.
487  */
488 gboolean
489 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
490 {
491   g_return_val_if_fail (pads != NULL, FALSE);
492   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
493   g_return_val_if_fail (pad != NULL, FALSE);
494   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
495
496   g_warning ("gst_collect_pads_is_active() is not implemented");
497
498   return FALSE;
499 }
500
501 /**
502  * gst_collect_pads_collect:
503  * @pads: the collectspads to use
504  *
505  * Collect data on all pads. This function is usually called
506  * from a #GstTask function in an element.
507  *
508  * This function is currently not implemented.
509  *
510  * MT safe.
511  *
512  * Returns: #GstFlowReturn of the operation.
513  */
514 GstFlowReturn
515 gst_collect_pads_collect (GstCollectPads * pads)
516 {
517   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
518   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
519
520   g_warning ("gst_collect_pads_collect() is not implemented");
521
522   return GST_FLOW_NOT_SUPPORTED;
523 }
524
525 /**
526  * gst_collect_pads_collect_range:
527  * @pads: the collectspads to use
528  * @offset: the offset to collect
529  * @length: the length to collect
530  *
531  * Collect data with @offset and @length on all pads. This function
532  * is typically called in the getrange function of an element.
533  *
534  * This function is currently not implemented.
535  *
536  * MT safe.
537  *
538  * Returns: #GstFlowReturn of the operation.
539  */
540 GstFlowReturn
541 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
542     guint length)
543 {
544   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
545   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
546
547   g_warning ("gst_collect_pads_collect_range() is not implemented");
548
549   return GST_FLOW_NOT_SUPPORTED;
550 }
551
552 static gboolean
553 gst_collect_pads_is_flushing (GstCollectPads * pads)
554 {
555   GSList *walk = NULL;
556   gboolean res = TRUE;
557
558   GST_COLLECT_PADS_PAD_LOCK (pads);
559
560   /* Ensure pads->data state */
561   gst_collect_pads_check_pads_unlocked (pads);
562
563   GST_DEBUG ("Getting flushing state (pads:%p, pads->data:%p)",
564       pads, pads->data);
565
566   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
567     GstCollectData *cdata = walk->data;
568
569     GST_DEBUG_OBJECT (cdata->pad, "flushing:%d", cdata->abidata.ABI.flushing);
570
571     if (cdata->abidata.ABI.flushing) {
572       goto done;
573     }
574   }
575
576   res = FALSE;
577 done:
578   GST_COLLECT_PADS_PAD_UNLOCK (pads);
579   return res;
580 }
581
582 /* FIXME, I think this function is used to work around bad behaviour
583  * of elements that add pads to themselves without activating them.
584  *
585  * Must be called with PAD_LOCK.
586  */
587 static void
588 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
589     gboolean flushing)
590 {
591   GSList *walk = NULL;
592
593   GST_DEBUG ("Setting flushing (%d)", flushing);
594
595   /* Update the pads flushing flag */
596   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
597     GstCollectData *cdata = walk->data;
598
599     if (GST_IS_PAD (cdata->pad)) {
600       GST_OBJECT_LOCK (cdata->pad);
601       if (flushing)
602         GST_PAD_SET_FLUSHING (cdata->pad);
603       else
604         GST_PAD_UNSET_FLUSHING (cdata->pad);
605       cdata->abidata.ABI.flushing = flushing;
606       gst_collect_pads_clear (pads, cdata);
607       GST_OBJECT_UNLOCK (cdata->pad);
608     }
609   }
610   /* Setting the pads to flushing means that we changed the values which
611    * are 'protected' by the cookie. We therefore update it to force a
612    * recalculation of the current pad status. */
613   pads->abidata.ABI.pad_cookie++;
614 }
615
616 /**
617  * gst_collect_pads_set_flushing:
618  * @pads: the collectspads to use
619  * @flushing: desired state of the pads
620  *
621  * Change the flushing state of all the pads in the collection. No pad
622  * is able to accept anymore data when @flushing is %TRUE. Calling this
623  * function with @flushing %FALSE makes @pads accept data again.
624  *
625  * MT safe.
626  *
627  * Since: 0.10.7.
628  */
629 void
630 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
631 {
632   g_return_if_fail (pads != NULL);
633   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
634
635   GST_COLLECT_PADS_PAD_LOCK (pads);
636   /* Ensure pads->data state */
637   gst_collect_pads_check_pads_unlocked (pads);
638   gst_collect_pads_set_flushing_unlocked (pads, flushing);
639   GST_COLLECT_PADS_PAD_UNLOCK (pads);
640 }
641
642 /**
643  * gst_collect_pads_start:
644  * @pads: the collectspads to use
645  *
646  * Starts the processing of data in the collect_pads.
647  *
648  * MT safe.
649  */
650 void
651 gst_collect_pads_start (GstCollectPads * pads)
652 {
653   GSList *collected;
654
655   g_return_if_fail (pads != NULL);
656   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
657
658   GST_DEBUG_OBJECT (pads, "starting collect pads");
659
660   /* make sure stop and collect cannot be called anymore */
661   GST_OBJECT_LOCK (pads);
662
663   /* make pads streamable */
664   GST_COLLECT_PADS_PAD_LOCK (pads);
665
666   /* loop over the master pad list and reset the segment */
667   collected = pads->abidata.ABI.pad_list;
668   for (; collected; collected = g_slist_next (collected)) {
669     GstCollectData *data;
670
671     data = collected->data;
672     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
673   }
674
675   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
676
677   /* Start collect pads */
678   pads->started = TRUE;
679   GST_COLLECT_PADS_PAD_UNLOCK (pads);
680   GST_OBJECT_UNLOCK (pads);
681 }
682
683 /**
684  * gst_collect_pads_stop:
685  * @pads: the collectspads to use
686  *
687  * Stops the processing of data in the collect_pads. this function
688  * will also unblock any blocking operations.
689  *
690  * MT safe.
691  */
692 void
693 gst_collect_pads_stop (GstCollectPads * pads)
694 {
695   GSList *collected;
696
697   g_return_if_fail (pads != NULL);
698   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
699
700   GST_DEBUG_OBJECT (pads, "stopping collect pads");
701
702   /* make sure collect and start cannot be called anymore */
703   GST_OBJECT_LOCK (pads);
704
705   /* make pads not accept data anymore */
706   GST_COLLECT_PADS_PAD_LOCK (pads);
707   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
708
709   /* Stop collect pads */
710   pads->started = FALSE;
711   pads->eospads = 0;
712   pads->queuedpads = 0;
713
714   /* loop over the master pad list and flush buffers */
715   collected = pads->abidata.ABI.pad_list;
716   for (; collected; collected = g_slist_next (collected)) {
717     GstCollectData *data;
718     GstBuffer **buffer_p;
719
720     data = collected->data;
721     if (data->buffer) {
722       buffer_p = &data->buffer;
723       gst_buffer_replace (buffer_p, NULL);
724       data->pos = 0;
725     }
726     data->abidata.ABI.eos = FALSE;
727   }
728
729   GST_COLLECT_PADS_PAD_UNLOCK (pads);
730   /* Wake them up so they can end the chain functions. */
731   GST_COLLECT_PADS_BROADCAST (pads);
732
733   GST_OBJECT_UNLOCK (pads);
734 }
735
736 /**
737  * gst_collect_pads_peek:
738  * @pads: the collectspads to peek
739  * @data: the data to use
740  *
741  * Peek at the buffer currently queued in @data. This function
742  * should be called with the @pads LOCK held, such as in the callback
743  * handler.
744  *
745  * MT safe.
746  *
747  * Returns: (transfer full): The buffer in @data or NULL if no buffer is queued.
748  *  should unref the buffer after usage.
749  */
750 GstBuffer *
751 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
752 {
753   GstBuffer *result;
754
755   g_return_val_if_fail (pads != NULL, NULL);
756   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
757   g_return_val_if_fail (data != NULL, NULL);
758
759   if ((result = data->buffer))
760     gst_buffer_ref (result);
761
762   GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
763       GST_DEBUG_PAD_NAME (data->pad), result);
764
765   return result;
766 }
767
768 /**
769  * gst_collect_pads_pop:
770  * @pads: the collectspads to pop
771  * @data: the data to use
772  *
773  * Pop the buffer currently queued in @data. This function
774  * should be called with the @pads LOCK held, such as in the callback
775  * handler.
776  *
777  * Free-function: gst_buffer_unref
778  *
779  * MT safe.
780  *
781  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
782  *   queued. You should unref the buffer after usage.
783  */
784 GstBuffer *
785 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
786 {
787   GstBuffer *result;
788
789   g_return_val_if_fail (pads != NULL, NULL);
790   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
791   g_return_val_if_fail (data != NULL, NULL);
792
793   if ((result = data->buffer)) {
794     data->buffer = NULL;
795     data->pos = 0;
796     /* one less pad with queued data now */
797     pads->queuedpads--;
798   }
799
800   GST_COLLECT_PADS_BROADCAST (pads);
801
802   GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p",
803       GST_DEBUG_PAD_NAME (data->pad), result);
804
805   return result;
806 }
807
808 /* pop and unref the currently queued buffer, should e called with the LOCK
809  * helt. */
810 static void
811 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
812 {
813   GstBuffer *buf;
814
815   if ((buf = gst_collect_pads_pop (pads, data)))
816     gst_buffer_unref (buf);
817 }
818
819 /**
820  * gst_collect_pads_available:
821  * @pads: the collectspads to query
822  *
823  * Query how much bytes can be read from each queued buffer. This means
824  * that the result of this call is the maximum number of bytes that can
825  * be read from each of the pads.
826  *
827  * This function should be called with @pads LOCK held, such as
828  * in the callback.
829  *
830  * MT safe.
831  *
832  * Returns: The maximum number of bytes queued on all pads. This function
833  * returns 0 if a pad has no queued buffer.
834  */
835 /* FIXME, we can do this in the _chain functions */
836 guint
837 gst_collect_pads_available (GstCollectPads * pads)
838 {
839   GSList *collected;
840   guint result = G_MAXUINT;
841
842   g_return_val_if_fail (pads != NULL, 0);
843   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
844
845   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
846     GstCollectData *pdata;
847     GstBuffer *buffer;
848     gint size;
849
850     pdata = (GstCollectData *) collected->data;
851
852     /* ignore pad with EOS */
853     if (G_UNLIKELY (pdata->abidata.ABI.eos)) {
854       GST_DEBUG ("pad %s:%s is EOS", GST_DEBUG_PAD_NAME (pdata->pad));
855       continue;
856     }
857
858     /* an empty buffer without EOS is weird when we get here.. */
859     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
860       GST_WARNING ("pad %s:%s has no buffer", GST_DEBUG_PAD_NAME (pdata->pad));
861       goto not_filled;
862     }
863
864     /* this is the size left of the buffer */
865     size = GST_BUFFER_SIZE (buffer) - pdata->pos;
866     GST_DEBUG ("pad %s:%s has %d bytes left",
867         GST_DEBUG_PAD_NAME (pdata->pad), size);
868
869     /* need to return the min of all available data */
870     if (size < result)
871       result = size;
872   }
873   /* nothing changed, all must be EOS then, return 0 */
874   if (G_UNLIKELY (result == G_MAXUINT))
875     result = 0;
876
877   return result;
878
879 not_filled:
880   {
881     return 0;
882   }
883 }
884
885 /**
886  * gst_collect_pads_read:
887  * @pads: the collectspads to query
888  * @data: the data to use
889  * @bytes: (out) (transfer none) (array length=size): a pointer to a byte array
890  * @size: the number of bytes to read
891  *
892  * Get a pointer in @bytes where @size bytes can be read from the
893  * given pad @data.
894  *
895  * This function should be called with @pads LOCK held, such as
896  * in the callback.
897  *
898  * MT safe.
899  *
900  * Returns: The number of bytes available for consumption in the
901  * memory pointed to by @bytes. This can be less than @size and
902  * is 0 if the pad is end-of-stream.
903  */
904 guint
905 gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data,
906     guint8 ** bytes, guint size)
907 {
908   guint readsize;
909   GstBuffer *buffer;
910
911   g_return_val_if_fail (pads != NULL, 0);
912   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
913   g_return_val_if_fail (data != NULL, 0);
914   g_return_val_if_fail (bytes != NULL, 0);
915
916   /* no buffer, must be EOS */
917   if ((buffer = data->buffer) == NULL)
918     return 0;
919
920   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
921
922   *bytes = GST_BUFFER_DATA (buffer) + data->pos;
923
924   return readsize;
925 }
926
927 /**
928  * gst_collect_pads_read_buffer:
929  * @pads: the collectspads to query
930  * @data: the data to use
931  * @size: the number of bytes to read
932  *
933  * Get a buffer of @size bytes from the given pad @data.
934  *
935  * This function should be called with @pads LOCK held, such as in the callback.
936  *
937  * Free-function: gst_buffer_unref
938  *
939  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
940  *     that requested. A return of NULL signals that the pad is end-of-stream.
941  *     Unref the buffer with gst_buffer_unref() after use.
942  *
943  * MT safe.
944  *
945  * Since: 0.10.18
946  */
947 GstBuffer *
948 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
949     guint size)
950 {
951   guint readsize, bufsize;
952   GstBuffer *buffer;
953
954   g_return_val_if_fail (pads != NULL, NULL);
955   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
956   g_return_val_if_fail (data != NULL, NULL);
957
958   /* no buffer, must be EOS */
959   if ((buffer = data->buffer) == NULL)
960     return NULL;
961
962   bufsize = GST_BUFFER_SIZE (buffer);
963
964   readsize = MIN (size, bufsize - data->pos);
965
966   if (data->pos == 0 && readsize == bufsize)
967     return gst_buffer_ref (buffer);
968   else
969     return gst_buffer_create_sub (buffer, data->pos, readsize);
970 }
971
972 /**
973  * gst_collect_pads_take_buffer:
974  * @pads: the collectspads to query
975  * @data: the data to use
976  * @size: the number of bytes to read
977  *
978  * Get a buffer of @size bytes from the given pad @data. Flushes the amount
979  * of read bytes.
980  *
981  * This function should be called with @pads LOCK held, such as in the callback.
982  *
983  * Free-function: gst_buffer_unref
984  *
985  * MT safe.
986  *
987  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
988  *     that requested. A return of NULL signals that the pad is end-of-stream.
989  *     Unref the buffer after use.
990  *
991  * Since: 0.10.18
992  */
993 GstBuffer *
994 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
995     guint size)
996 {
997   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
998
999   if (buffer) {
1000     gst_collect_pads_flush (pads, data, GST_BUFFER_SIZE (buffer));
1001   }
1002   return buffer;
1003 }
1004
1005 /**
1006  * gst_collect_pads_flush:
1007  * @pads: the collectspads to query
1008  * @data: the data to use
1009  * @size: the number of bytes to flush
1010  *
1011  * Flush @size bytes from the pad @data.
1012  *
1013  * This function should be called with @pads LOCK held, such as
1014  * in the callback.
1015  *
1016  * MT safe.
1017  *
1018  * Returns: The number of bytes flushed. This can be less than @size and
1019  * is 0 if the pad was end-of-stream.
1020  */
1021 guint
1022 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1023     guint size)
1024 {
1025   guint flushsize;
1026   GstBuffer *buffer;
1027
1028   g_return_val_if_fail (pads != NULL, 0);
1029   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1030   g_return_val_if_fail (data != NULL, 0);
1031
1032   /* no buffer, must be EOS */
1033   if ((buffer = data->buffer) == NULL)
1034     return 0;
1035
1036   /* this is what we can flush at max */
1037   flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1038
1039   data->pos += size;
1040
1041   GST_LOG_OBJECT (pads, "Flushing %d bytes, requested %u", flushsize, size);
1042
1043   if (data->pos >= GST_BUFFER_SIZE (buffer))
1044     /* _clear will also reset data->pos to 0 */
1045     gst_collect_pads_clear (pads, data);
1046
1047   return flushsize;
1048 }
1049
1050 /* see if pads were added or removed and update our stats. Any pad
1051  * added after releasing the PAD_LOCK will get collected in the next
1052  * round.
1053  *
1054  * We can do a quick check by checking the cookies, that get changed
1055  * whenever the pad list is updated.
1056  *
1057  * Must be called with LOCK.
1058  */
1059 static void
1060 gst_collect_pads_check_pads_unlocked (GstCollectPads * pads)
1061 {
1062   GST_DEBUG ("stored cookie : %d, used_cookie:%d",
1063       pads->abidata.ABI.pad_cookie, pads->cookie);
1064   if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) {
1065     GSList *collected;
1066
1067     /* clear list and stats */
1068     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1069     g_slist_free (pads->data);
1070     pads->data = NULL;
1071     pads->numpads = 0;
1072     pads->queuedpads = 0;
1073     pads->eospads = 0;
1074
1075     /* loop over the master pad list */
1076     collected = pads->abidata.ABI.pad_list;
1077     for (; collected; collected = g_slist_next (collected)) {
1078       GstCollectData *data;
1079
1080       /* update the stats */
1081       pads->numpads++;
1082       data = collected->data;
1083
1084       if (G_LIKELY (!data->abidata.ABI.flushing)) {
1085         if (data->buffer)
1086           pads->queuedpads++;
1087         if (data->abidata.ABI.eos)
1088           pads->eospads++;
1089       }
1090
1091       /* add to the list of pads to collect */
1092       ref_data (data);
1093       pads->data = g_slist_prepend (pads->data, data);
1094     }
1095     /* and update the cookie */
1096     pads->cookie = pads->abidata.ABI.pad_cookie;
1097   }
1098 }
1099
1100 static inline void
1101 gst_collect_pads_check_pads (GstCollectPads * pads)
1102 {
1103   /* the master list and cookie are protected with the PAD_LOCK */
1104   GST_COLLECT_PADS_PAD_LOCK (pads);
1105   gst_collect_pads_check_pads_unlocked (pads);
1106   GST_COLLECT_PADS_PAD_UNLOCK (pads);
1107 }
1108
1109 /* checks if all the pads are collected and call the collectfunction
1110  *
1111  * Should be called with LOCK.
1112  *
1113  * Returns: The #GstFlowReturn of collection.
1114  */
1115 static GstFlowReturn
1116 gst_collect_pads_check_collected (GstCollectPads * pads)
1117 {
1118   GstFlowReturn flow_ret = GST_FLOW_OK;
1119
1120   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1121   g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
1122
1123   /* check for new pads, update stats etc.. */
1124   gst_collect_pads_check_pads (pads);
1125
1126   if (G_UNLIKELY (pads->eospads == pads->numpads)) {
1127     /* If all our pads are EOS just collect once to let the element
1128      * do its final EOS handling. */
1129     GST_DEBUG ("All active pads (%d) are EOS, calling %s",
1130         pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
1131     flow_ret = pads->func (pads, pads->user_data);
1132   } else {
1133     gboolean collected = FALSE;
1134
1135     /* We call the collected function as long as our condition matches.
1136      * FIXME: should we error out if the collect function did not pop anything ?
1137      * we can get a busy loop here if the element does not pop from the collect
1138      * function
1139      */
1140     while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
1141       GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s",
1142           pads->queuedpads, pads->eospads, pads->numpads,
1143           GST_DEBUG_FUNCPTR_NAME (pads->func));
1144       flow_ret = pads->func (pads, pads->user_data);
1145       collected = TRUE;
1146
1147       /* break on error */
1148       if (flow_ret != GST_FLOW_OK)
1149         break;
1150       /* Don't keep looping after telling the element EOS or flushing */
1151       if (pads->queuedpads == 0)
1152         break;
1153     }
1154     if (!collected)
1155       GST_DEBUG ("Not all active pads (%d) have data, continuing",
1156           pads->numpads);
1157   }
1158   return flow_ret;
1159 }
1160
1161 static gboolean
1162 gst_collect_pads_event (GstPad * pad, GstEvent * event)
1163 {
1164   gboolean res;
1165   GstCollectData *data;
1166   GstCollectPads *pads;
1167
1168   /* some magic to get the managing collect_pads */
1169   GST_OBJECT_LOCK (pad);
1170   data = (GstCollectData *) gst_pad_get_element_private (pad);
1171   if (G_UNLIKELY (data == NULL))
1172     goto pad_removed;
1173   ref_data (data);
1174   GST_OBJECT_UNLOCK (pad);
1175
1176   res = TRUE;
1177
1178   pads = data->collect;
1179
1180   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
1181       GST_DEBUG_PAD_NAME (data->pad));
1182
1183   switch (GST_EVENT_TYPE (event)) {
1184     case GST_EVENT_FLUSH_START:
1185     {
1186       /* forward event to unblock check_collected */
1187       gst_pad_event_default (pad, event);
1188
1189       /* now unblock the chain function.
1190        * no cond per pad, so they all unblock,
1191        * non-flushing block again */
1192       GST_OBJECT_LOCK (pads);
1193       data->abidata.ABI.flushing = TRUE;
1194       gst_collect_pads_clear (pads, data);
1195       GST_OBJECT_UNLOCK (pads);
1196
1197       /* event already cleaned up by forwarding */
1198       goto done;
1199     }
1200     case GST_EVENT_FLUSH_STOP:
1201     {
1202       /* flush the 1 buffer queue */
1203       GST_OBJECT_LOCK (pads);
1204       data->abidata.ABI.flushing = FALSE;
1205       gst_collect_pads_clear (pads, data);
1206       /* we need new segment info after the flush */
1207       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1208       data->abidata.ABI.new_segment = FALSE;
1209       /* if the pad was EOS, remove the EOS flag and
1210        * decrement the number of eospads */
1211       if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) {
1212         pads->eospads--;
1213         data->abidata.ABI.eos = FALSE;
1214       }
1215
1216       if (!gst_collect_pads_is_flushing (pads)) {
1217         /* forward event if all pads are no longer flushing */
1218         GST_DEBUG ("No more pads are flushing, forwarding FLUSH_STOP");
1219         GST_OBJECT_UNLOCK (pads);
1220         goto forward;
1221       }
1222       gst_event_unref (event);
1223       GST_OBJECT_UNLOCK (pads);
1224       goto done;
1225     }
1226     case GST_EVENT_EOS:
1227     {
1228       GST_OBJECT_LOCK (pads);
1229       /* if the pad was not EOS, make it EOS and so we
1230        * have one more eospad */
1231       if (G_LIKELY (data->abidata.ABI.eos == FALSE)) {
1232         data->abidata.ABI.eos = TRUE;
1233         pads->eospads++;
1234       }
1235       /* check if we need collecting anything, we ignore the
1236        * result. */
1237       gst_collect_pads_check_collected (pads);
1238       GST_OBJECT_UNLOCK (pads);
1239
1240       /* We eat this event, element should do something
1241        * in the collected callback. */
1242       gst_event_unref (event);
1243       goto done;
1244     }
1245     case GST_EVENT_NEWSEGMENT:
1246     {
1247       gint64 start, stop, time;
1248       gdouble rate, arate;
1249       GstFormat format;
1250       gboolean update;
1251
1252       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1253           &start, &stop, &time);
1254
1255       GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
1256           ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
1257           GST_TIME_ARGS (stop));
1258
1259       gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
1260           format, start, stop, time);
1261
1262       data->abidata.ABI.new_segment = TRUE;
1263
1264       /* we must not forward this event since multiple segments will be
1265        * accumulated and this is certainly not what we want. */
1266       gst_event_unref (event);
1267       /* FIXME: collect-pads based elements need to create their own newsegment
1268        * event (and only one really)
1269        * (a) make the segment part of the GstCollectData structure of each pad,
1270        * so you can just check that once you have a buffer queued on that pad,
1271        * (b) you can override a pad's event function with your own,
1272        * catch the newsegment event and then pass it on to the original
1273        * gstcollectpads event function
1274        * (that's what avimux does for something IIRC)
1275        * see #340060
1276        */
1277       goto done;
1278     }
1279     default:
1280       /* forward other events */
1281       goto forward;
1282   }
1283
1284 forward:
1285   GST_DEBUG_OBJECT (pads, "forward unhandled event: %s",
1286       GST_EVENT_TYPE_NAME (event));
1287   res = gst_pad_event_default (pad, event);
1288
1289 done:
1290   unref_data (data);
1291   return res;
1292
1293   /* ERRORS */
1294 pad_removed:
1295   {
1296     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1297     GST_OBJECT_UNLOCK (pad);
1298     return FALSE;
1299   }
1300 }
1301
1302 /* For each buffer we receive we check if our collected condition is reached
1303  * and if so we call the collected function. When this is done we check if
1304  * data has been unqueued. If data is still queued we wait holding the stream
1305  * lock to make sure no EOS event can happen while we are ready to be
1306  * collected
1307  */
1308 static GstFlowReturn
1309 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
1310 {
1311   GstCollectData *data;
1312   GstCollectPads *pads;
1313   GstCollectPadsPrivate *priv;
1314   GstFlowReturn ret;
1315
1316   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1317
1318   /* some magic to get the managing collect_pads */
1319   GST_OBJECT_LOCK (pad);
1320   data = (GstCollectData *) gst_pad_get_element_private (pad);
1321   if (G_UNLIKELY (data == NULL))
1322     goto no_data;
1323   ref_data (data);
1324   GST_OBJECT_UNLOCK (pad);
1325
1326   pads = data->collect;
1327   priv = pads->abidata.ABI.priv;
1328
1329   GST_OBJECT_LOCK (pads);
1330   /* if not started, bail out */
1331   if (G_UNLIKELY (!pads->started))
1332     goto not_started;
1333   /* check if this pad is flushing */
1334   if (G_UNLIKELY (data->abidata.ABI.flushing))
1335     goto flushing;
1336   /* pad was EOS, we can refuse this data */
1337   if (G_UNLIKELY (data->abidata.ABI.eos))
1338     goto unexpected;
1339
1340   /* see if we need to clip */
1341   if (priv->clipfunc) {
1342     buffer = priv->clipfunc (pads, data, buffer, priv->clipfunc_user_data);
1343
1344     if (G_UNLIKELY (buffer == NULL))
1345       goto clipped;
1346   }
1347
1348   GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
1349       GST_DEBUG_PAD_NAME (pad));
1350
1351   /* One more pad has data queued */
1352   pads->queuedpads++;
1353   /* take ownership of the buffer */
1354   if (data->buffer)
1355     gst_buffer_unref (data->buffer);
1356   data->buffer = buffer;
1357   buffer = NULL;
1358
1359   /* update segment last position if in TIME */
1360   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1361     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (data->buffer);
1362
1363     if (GST_CLOCK_TIME_IS_VALID (timestamp))
1364       gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
1365   }
1366
1367   /* While we have data queued on this pad try to collect stuff */
1368   do {
1369     GST_DEBUG ("Pad %s:%s checking", GST_DEBUG_PAD_NAME (pad));
1370     /* Check if our collected condition is matched and call the collected function
1371      * if it is */
1372     ret = gst_collect_pads_check_collected (pads);
1373     /* when an error occurs, we want to report this back to the caller ASAP
1374      * without having to block if the buffer was not popped */
1375     if (G_UNLIKELY (ret != GST_FLOW_OK))
1376       goto error;
1377
1378     /* data was consumed, we can exit and accept new data */
1379     if (data->buffer == NULL)
1380       break;
1381
1382     /* Check if we got removed in the mean time, FIXME, this is racy.
1383      * Between this check and the _WAIT, the pad could be removed which will
1384      * makes us hang in the _WAIT. */
1385     GST_OBJECT_LOCK (pad);
1386     if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL))
1387       goto pad_removed;
1388     GST_OBJECT_UNLOCK (pad);
1389
1390     GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
1391         GST_DEBUG_PAD_NAME (pad));
1392
1393     /* wait to be collected, this must happen from another thread triggered
1394      * by the _chain function of another pad. We release the lock so we
1395      * can get stopped or flushed as well. We can however not get EOS
1396      * because we still hold the STREAM_LOCK.
1397      */
1398     GST_COLLECT_PADS_WAIT (pads);
1399
1400     GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
1401
1402     /* after a signal, we could be stopped */
1403     if (G_UNLIKELY (!pads->started))
1404       goto not_started;
1405     /* check if this pad is flushing */
1406     if (G_UNLIKELY (data->abidata.ABI.flushing))
1407       goto flushing;
1408   }
1409   while (data->buffer != NULL);
1410
1411 unlock_done:
1412   GST_DEBUG ("Pad %s:%s done", GST_DEBUG_PAD_NAME (pad));
1413   GST_OBJECT_UNLOCK (pads);
1414   unref_data (data);
1415   if (buffer)
1416     gst_buffer_unref (buffer);
1417   return ret;
1418
1419 pad_removed:
1420   {
1421     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1422     GST_OBJECT_UNLOCK (pad);
1423     ret = GST_FLOW_NOT_LINKED;
1424     goto unlock_done;
1425   }
1426   /* ERRORS */
1427 no_data:
1428   {
1429     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1430     GST_OBJECT_UNLOCK (pad);
1431     gst_buffer_unref (buffer);
1432     return GST_FLOW_NOT_LINKED;
1433   }
1434 not_started:
1435   {
1436     GST_DEBUG ("not started");
1437     gst_collect_pads_clear (pads, data);
1438     ret = GST_FLOW_WRONG_STATE;
1439     goto unlock_done;
1440   }
1441 flushing:
1442   {
1443     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
1444     gst_collect_pads_clear (pads, data);
1445     ret = GST_FLOW_WRONG_STATE;
1446     goto unlock_done;
1447   }
1448 unexpected:
1449   {
1450     /* we should not post an error for this, just inform upstream that
1451      * we don't expect anything anymore */
1452     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
1453     ret = GST_FLOW_UNEXPECTED;
1454     goto unlock_done;
1455   }
1456 clipped:
1457   {
1458     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1459     ret = GST_FLOW_OK;
1460     goto unlock_done;
1461   }
1462 error:
1463   {
1464     /* we print the error, the element should post a reasonable error
1465      * message for fatal errors */
1466     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
1467     gst_collect_pads_clear (pads, data);
1468     goto unlock_done;
1469   }
1470 }