Rework GstSegment handling
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  *
4  * gstcollectpads.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21 /**
22  * SECTION:gstcollectpads
23  * @short_description: manages a set of pads that operate in collect mode
24  * @see_also:
25  *
26  * Manages a set of pads that operate in collect mode. This means that control
27  * is given to the manager of this object when all pads have data.
28  * <itemizedlist>
29  *   <listitem><para>
30  *     Collectpads are created with gst_collect_pads_new(). A callback should then
31  *     be installed with gst_collect_pads_set_function ().
32  *   </para></listitem>
33  *   <listitem><para>
34  *     Pads are added to the collection with gst_collect_pads_add_pad()/
35  *     gst_collect_pads_remove_pad(). The pad
36  *     has to be a sinkpad. The chain and event functions of the pad are
37  *     overridden. The element_private of the pad is used to store
38  *     private information for the collectpads.
39  *   </para></listitem>
40  *   <listitem><para>
41  *     For each pad, data is queued in the _chain function or by
42  *     performing a pull_range.
43  *   </para></listitem>
44  *   <listitem><para>
45  *     When data is queued on all pads, the callback function is called.
46  *   </para></listitem>
47  *   <listitem><para>
48  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
49  *     One can peek at the data with the gst_collect_pads_peek() function.
50  *     These functions will return NULL if the pad received an EOS event. When all
51  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
52  *     event itself.
53  *   </para></listitem>
54  *   <listitem><para>
55  *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
56  *     gst_collect_pads_read() and gst_collect_pads_flush() calls.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
60  *     their state change functions to start and stop the processing of the collecpads.
61  *     The gst_collect_pads_stop() call should be called before calling the parent
62  *     element state change function in the PAUSED_TO_READY state change to ensure
63  *     no pad is blocked and the element can finish streaming.
64  *   </para></listitem>
65  *   <listitem><para>
66  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
67  *     elements that start a #GstTask to drive the collect_pads. This feature is however
68  *     not yet implemented.
69  *   </para></listitem>
70  * </itemizedlist>
71  *
72  * Last reviewed on 2006-05-10 (0.10.6)
73  */
74
75 #include "gstcollectpads.h"
76
77 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
78 #define GST_CAT_DEFAULT collect_pads_debug
79
80 #define GST_COLLECT_PADS_GET_PRIVATE(obj)  \
81   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_COLLECT_PADS, GstCollectPadsPrivate))
82
83 struct _GstCollectPadsPrivate
84 {
85   GstCollectPadsClipFunction clipfunc;
86   gpointer clipfunc_user_data;
87 };
88
89 #define gst_collect_pads_parent_class parent_class
90 G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
91
92 static void gst_collect_pads_clear (GstCollectPads * pads,
93     GstCollectData * data);
94 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer);
95 static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event);
96 static void gst_collect_pads_finalize (GObject * object);
97 static void ref_data (GstCollectData * data);
98 static void unref_data (GstCollectData * data);
99 static void gst_collect_pads_check_pads_unlocked (GstCollectPads * pads);
100
101 static void
102 gst_collect_pads_class_init (GstCollectPadsClass * klass)
103 {
104   GObjectClass *gobject_class = (GObjectClass *) klass;
105
106   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
107
108   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
109       "GstCollectPads");
110
111   gobject_class->finalize = gst_collect_pads_finalize;
112 }
113
114 static void
115 gst_collect_pads_init (GstCollectPads * pads)
116 {
117   pads->abidata.ABI.priv = GST_COLLECT_PADS_GET_PRIVATE (pads);
118
119   pads->cond = g_cond_new ();
120   pads->data = NULL;
121   pads->cookie = 0;
122   pads->numpads = 0;
123   pads->queuedpads = 0;
124   pads->eospads = 0;
125   pads->started = FALSE;
126
127   /* members to manage the pad list */
128   pads->abidata.ABI.pad_lock = g_mutex_new ();
129   pads->abidata.ABI.pad_cookie = 0;
130   pads->abidata.ABI.pad_list = NULL;
131 }
132
133 static void
134 gst_collect_pads_finalize (GObject * object)
135 {
136   GSList *collected;
137   GstCollectPads *pads = GST_COLLECT_PADS (object);
138
139   GST_DEBUG ("finalize");
140
141   g_cond_free (pads->cond);
142   g_mutex_free (pads->abidata.ABI.pad_lock);
143
144   /* Remove pads */
145   collected = pads->abidata.ABI.pad_list;
146   for (; collected; collected = g_slist_next (collected)) {
147     GstCollectData *pdata = (GstCollectData *) collected->data;
148
149     unref_data (pdata);
150   }
151   /* Free pads list */
152   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
153   g_slist_free (pads->data);
154   g_slist_free (pads->abidata.ABI.pad_list);
155
156   G_OBJECT_CLASS (parent_class)->finalize (object);
157 }
158
159 /**
160  * gst_collect_pads_new:
161  *
162  * Create a new instance of #GstCollectPads.
163  *
164  * MT safe.
165  *
166  * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
167  */
168 GstCollectPads *
169 gst_collect_pads_new (void)
170 {
171   GstCollectPads *newcoll;
172
173   newcoll = g_object_newv (GST_TYPE_COLLECT_PADS, 0, NULL);
174
175   return newcoll;
176 }
177
178 /**
179  * gst_collect_pads_set_function:
180  * @pads: the collectspads to use
181  * @func: the function to set
182  * @user_data: (closure): user data passed to the function
183  *
184  * Set the callback function and user data that will be called when
185  * all the pads added to the collection have buffers queued.
186  *
187  * MT safe.
188  */
189 void
190 gst_collect_pads_set_function (GstCollectPads * pads,
191     GstCollectPadsFunction func, gpointer user_data)
192 {
193   g_return_if_fail (pads != NULL);
194   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
195
196   GST_OBJECT_LOCK (pads);
197   pads->func = func;
198   pads->user_data = user_data;
199   GST_OBJECT_UNLOCK (pads);
200 }
201
202 static void
203 ref_data (GstCollectData * data)
204 {
205   g_assert (data != NULL);
206
207   g_atomic_int_inc (&(data->abidata.ABI.refcount));
208 }
209
210 static void
211 unref_data (GstCollectData * data)
212 {
213   GstCollectDataDestroyNotify destroy_notify;
214
215   g_assert (data != NULL);
216   g_assert (data->abidata.ABI.refcount > 0);
217
218   if (!g_atomic_int_dec_and_test (&(data->abidata.ABI.refcount)))
219     return;
220
221   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
222   destroy_notify = (GstCollectDataDestroyNotify)
223       g_object_get_data (G_OBJECT (data->pad),
224       "gst-collect-data-destroy-notify");
225
226   if (destroy_notify)
227     destroy_notify (data);
228
229   g_object_unref (data->pad);
230   if (data->buffer) {
231     gst_buffer_unref (data->buffer);
232   }
233   g_free (data);
234 }
235
236 /**
237  * gst_collect_pads_add_pad:
238  * @pads: the collectspads to use
239  * @pad: (transfer none): the pad to add
240  * @size: the size of the returned #GstCollectData structure
241  *
242  * Add a pad to the collection of collect pads. The pad has to be
243  * a sinkpad. The refcount of the pad is incremented. Use
244  * gst_collect_pads_remove_pad() to remove the pad from the collection
245  * again.
246  *
247  * This function will override the chain and event functions of the pad
248  * along with the element_private data, which is used to store private
249  * information for the collectpads.
250  *
251  * You specify a size for the returned #GstCollectData structure
252  * so that you can use it to store additional information.
253  *
254  * The pad will be automatically activated in push mode when @pads is
255  * started.
256  *
257  * This function calls gst_collect_pads_add_pad_full() passing a value of NULL
258  * for destroy_notify.
259  *
260  * MT safe.
261  *
262  * Returns: a new #GstCollectData to identify the new pad. Or NULL
263  *   if wrong parameters are supplied.
264  */
265 GstCollectData *
266 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
267 {
268   return gst_collect_pads_add_pad_full (pads, pad, size, NULL);
269 }
270
271 /**
272  * gst_collect_pads_add_pad_full:
273  * @pads: the collectspads to use
274  * @pad: (transfer none): the pad to add
275  * @size: the size of the returned #GstCollectData structure
276  * @destroy_notify: function to be called before the returned #GstCollectData
277  * structure is freed
278  *
279  * Add a pad to the collection of collect pads. The pad has to be
280  * a sinkpad. The refcount of the pad is incremented. Use
281  * gst_collect_pads_remove_pad() to remove the pad from the collection
282  * again.
283  *
284  * You specify a size for the returned #GstCollectData structure
285  * so that you can use it to store additional information.
286  *
287  * You can also specify a #GstCollectDataDestroyNotify that will be called
288  * just before the #GstCollectData structure is freed. It is passed the
289  * pointer to the structure and should free any custom memory and resources
290  * allocated for it.
291  *
292  * The pad will be automatically activated in push mode when @pads is
293  * started.
294  *
295  * MT safe.
296  *
297  * Since: 0.10.12
298  *
299  * Returns: a new #GstCollectData to identify the new pad. Or NULL
300  *   if wrong parameters are supplied.
301  */
302 GstCollectData *
303 gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad, guint size,
304     GstCollectDataDestroyNotify destroy_notify)
305 {
306   GstCollectData *data;
307
308   g_return_val_if_fail (pads != NULL, NULL);
309   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
310   g_return_val_if_fail (pad != NULL, NULL);
311   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
312   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
313
314   GST_DEBUG ("adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
315
316   data = g_malloc0 (size);
317   data->collect = pads;
318   data->pad = gst_object_ref (pad);
319   data->buffer = NULL;
320   data->pos = 0;
321   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
322   data->abidata.ABI.flushing = FALSE;
323   data->abidata.ABI.new_segment = FALSE;
324   data->abidata.ABI.eos = FALSE;
325   data->abidata.ABI.refcount = 1;
326
327   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
328   g_object_set_data (G_OBJECT (pad), "gst-collect-data-destroy-notify",
329       (void *) destroy_notify);
330
331   GST_COLLECT_PADS_PAD_LOCK (pads);
332   GST_OBJECT_LOCK (pad);
333   gst_pad_set_element_private (pad, data);
334   GST_OBJECT_UNLOCK (pad);
335   pads->abidata.ABI.pad_list =
336       g_slist_append (pads->abidata.ABI.pad_list, data);
337   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
338   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
339   /* activate the pad when needed */
340   if (pads->started)
341     gst_pad_set_active (pad, TRUE);
342   pads->abidata.ABI.pad_cookie++;
343   GST_COLLECT_PADS_PAD_UNLOCK (pads);
344
345   return data;
346 }
347
348 static gint
349 find_pad (GstCollectData * data, GstPad * pad)
350 {
351   if (data->pad == pad)
352     return 0;
353   return 1;
354 }
355
356 /**
357  * gst_collect_pads_set_clip_function:
358  * @pads: the collectspads to use
359  * @clipfunc: clip function to install
360  * @user_data: (closure): user data to pass to @clip_func
361  *
362  * Install a clipping function that is called right after a buffer is received
363  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
364  *
365  * Since: 0.10.26
366  */
367 void
368 gst_collect_pads_set_clip_function (GstCollectPads * pads,
369     GstCollectPadsClipFunction clipfunc, gpointer user_data)
370 {
371   GstCollectPadsPrivate *priv;
372
373   g_return_if_fail (pads != NULL);
374   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
375
376   priv = pads->abidata.ABI.priv;
377
378   priv->clipfunc = clipfunc;
379   priv->clipfunc_user_data = user_data;
380 }
381
382 /**
383  * gst_collect_pads_remove_pad:
384  * @pads: the collectspads to use
385  * @pad: (transfer none): the pad to remove
386  *
387  * Remove a pad from the collection of collect pads. This function will also
388  * free the #GstCollectData and all the resources that were allocated with
389  * gst_collect_pads_add_pad().
390  *
391  * The pad will be deactivated automatically when @pads is stopped.
392  *
393  * MT safe.
394  *
395  * Returns: %TRUE if the pad could be removed.
396  */
397 gboolean
398 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
399 {
400   GstCollectData *data;
401   GSList *list;
402
403   g_return_val_if_fail (pads != NULL, FALSE);
404   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
405   g_return_val_if_fail (pad != NULL, FALSE);
406   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
407
408   GST_DEBUG ("removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
409
410   GST_COLLECT_PADS_PAD_LOCK (pads);
411   list =
412       g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
413       (GCompareFunc) find_pad);
414   if (!list)
415     goto unknown_pad;
416
417   data = (GstCollectData *) list->data;
418
419   GST_DEBUG ("found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad), data);
420
421   /* clear the stuff we configured */
422   gst_pad_set_chain_function (pad, NULL);
423   gst_pad_set_event_function (pad, NULL);
424   GST_OBJECT_LOCK (pad);
425   gst_pad_set_element_private (pad, NULL);
426   GST_OBJECT_UNLOCK (pad);
427
428   /* backward compat, also remove from data if stopped, note that this function
429    * can only be called when we are stopped because we don't take the LOCK to
430    * protect the pads->data list. */
431   if (!pads->started) {
432     GSList *dlist;
433
434     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
435     if (dlist) {
436       GstCollectData *pdata = dlist->data;
437
438       pads->data = g_slist_delete_link (pads->data, dlist);
439       unref_data (pdata);
440     }
441   }
442   /* remove from the pad list */
443   pads->abidata.ABI.pad_list =
444       g_slist_delete_link (pads->abidata.ABI.pad_list, list);
445   pads->abidata.ABI.pad_cookie++;
446
447   /* signal waiters because something changed */
448   GST_COLLECT_PADS_BROADCAST (pads);
449
450   /* deactivate the pad when needed */
451   if (!pads->started)
452     gst_pad_set_active (pad, FALSE);
453
454   /* clean and free the collect data */
455   unref_data (data);
456
457   GST_COLLECT_PADS_PAD_UNLOCK (pads);
458
459   return TRUE;
460
461 unknown_pad:
462   {
463     GST_WARNING ("cannot remove unknown pad %s:%s", GST_DEBUG_PAD_NAME (pad));
464     GST_COLLECT_PADS_PAD_UNLOCK (pads);
465     return FALSE;
466   }
467 }
468
469 /**
470  * gst_collect_pads_is_active:
471  * @pads: (transfer none): the collectspads to use
472  * @pad: the pad to check
473  *
474  * Check if a pad is active.
475  *
476  * This function is currently not implemented.
477  *
478  * MT safe.
479  *
480  * Returns: %TRUE if the pad is active.
481  */
482 gboolean
483 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
484 {
485   g_return_val_if_fail (pads != NULL, FALSE);
486   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
487   g_return_val_if_fail (pad != NULL, FALSE);
488   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
489
490   g_warning ("gst_collect_pads_is_active() is not implemented");
491
492   return FALSE;
493 }
494
495 /**
496  * gst_collect_pads_collect:
497  * @pads: the collectspads to use
498  *
499  * Collect data on all pads. This function is usually called
500  * from a #GstTask function in an element.
501  *
502  * This function is currently not implemented.
503  *
504  * MT safe.
505  *
506  * Returns: #GstFlowReturn of the operation.
507  */
508 GstFlowReturn
509 gst_collect_pads_collect (GstCollectPads * pads)
510 {
511   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
512   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
513
514   g_warning ("gst_collect_pads_collect() is not implemented");
515
516   return GST_FLOW_NOT_SUPPORTED;
517 }
518
519 /**
520  * gst_collect_pads_collect_range:
521  * @pads: the collectspads to use
522  * @offset: the offset to collect
523  * @length: the length to collect
524  *
525  * Collect data with @offset and @length on all pads. This function
526  * is typically called in the getrange function of an element.
527  *
528  * This function is currently not implemented.
529  *
530  * MT safe.
531  *
532  * Returns: #GstFlowReturn of the operation.
533  */
534 GstFlowReturn
535 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
536     guint length)
537 {
538   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
539   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
540
541   g_warning ("gst_collect_pads_collect_range() is not implemented");
542
543   return GST_FLOW_NOT_SUPPORTED;
544 }
545
546 static gboolean
547 gst_collect_pads_is_flushing (GstCollectPads * pads)
548 {
549   GSList *walk = NULL;
550   gboolean res = TRUE;
551
552   GST_COLLECT_PADS_PAD_LOCK (pads);
553
554   /* Ensure pads->data state */
555   gst_collect_pads_check_pads_unlocked (pads);
556
557   GST_DEBUG ("Getting flushing state (pads:%p, pads->data:%p)",
558       pads, pads->data);
559
560   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
561     GstCollectData *cdata = walk->data;
562
563     GST_DEBUG_OBJECT (cdata->pad, "flushing:%d", cdata->abidata.ABI.flushing);
564
565     if (cdata->abidata.ABI.flushing) {
566       goto done;
567     }
568   }
569
570   res = FALSE;
571 done:
572   GST_COLLECT_PADS_PAD_UNLOCK (pads);
573   return res;
574 }
575
576 /* FIXME, I think this function is used to work around bad behaviour
577  * of elements that add pads to themselves without activating them.
578  *
579  * Must be called with PAD_LOCK.
580  */
581 static void
582 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
583     gboolean flushing)
584 {
585   GSList *walk = NULL;
586
587   GST_DEBUG ("Setting flushing (%d)", flushing);
588
589   /* Update the pads flushing flag */
590   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
591     GstCollectData *cdata = walk->data;
592
593     if (GST_IS_PAD (cdata->pad)) {
594       GST_OBJECT_LOCK (cdata->pad);
595       if (flushing)
596         GST_PAD_SET_FLUSHING (cdata->pad);
597       else
598         GST_PAD_UNSET_FLUSHING (cdata->pad);
599       cdata->abidata.ABI.flushing = flushing;
600       gst_collect_pads_clear (pads, cdata);
601       GST_OBJECT_UNLOCK (cdata->pad);
602     }
603   }
604   /* Setting the pads to flushing means that we changed the values which
605    * are 'protected' by the cookie. We therefore update it to force a
606    * recalculation of the current pad status. */
607   pads->abidata.ABI.pad_cookie++;
608 }
609
610 /**
611  * gst_collect_pads_set_flushing:
612  * @pads: the collectspads to use
613  * @flushing: desired state of the pads
614  *
615  * Change the flushing state of all the pads in the collection. No pad
616  * is able to accept anymore data when @flushing is %TRUE. Calling this
617  * function with @flushing %FALSE makes @pads accept data again.
618  *
619  * MT safe.
620  *
621  * Since: 0.10.7.
622  */
623 void
624 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
625 {
626   g_return_if_fail (pads != NULL);
627   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
628
629   GST_COLLECT_PADS_PAD_LOCK (pads);
630   /* Ensure pads->data state */
631   gst_collect_pads_check_pads_unlocked (pads);
632   gst_collect_pads_set_flushing_unlocked (pads, flushing);
633   GST_COLLECT_PADS_PAD_UNLOCK (pads);
634 }
635
636 /**
637  * gst_collect_pads_start:
638  * @pads: the collectspads to use
639  *
640  * Starts the processing of data in the collect_pads.
641  *
642  * MT safe.
643  */
644 void
645 gst_collect_pads_start (GstCollectPads * pads)
646 {
647   GSList *collected;
648
649   g_return_if_fail (pads != NULL);
650   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
651
652   GST_DEBUG_OBJECT (pads, "starting collect pads");
653
654   /* make sure stop and collect cannot be called anymore */
655   GST_OBJECT_LOCK (pads);
656
657   /* make pads streamable */
658   GST_COLLECT_PADS_PAD_LOCK (pads);
659
660   /* loop over the master pad list and reset the segment */
661   collected = pads->abidata.ABI.pad_list;
662   for (; collected; collected = g_slist_next (collected)) {
663     GstCollectData *data;
664
665     data = collected->data;
666     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
667   }
668
669   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
670
671   /* Start collect pads */
672   pads->started = TRUE;
673   GST_COLLECT_PADS_PAD_UNLOCK (pads);
674   GST_OBJECT_UNLOCK (pads);
675 }
676
677 /**
678  * gst_collect_pads_stop:
679  * @pads: the collectspads to use
680  *
681  * Stops the processing of data in the collect_pads. this function
682  * will also unblock any blocking operations.
683  *
684  * MT safe.
685  */
686 void
687 gst_collect_pads_stop (GstCollectPads * pads)
688 {
689   GSList *collected;
690
691   g_return_if_fail (pads != NULL);
692   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
693
694   GST_DEBUG_OBJECT (pads, "stopping collect pads");
695
696   /* make sure collect and start cannot be called anymore */
697   GST_OBJECT_LOCK (pads);
698
699   /* make pads not accept data anymore */
700   GST_COLLECT_PADS_PAD_LOCK (pads);
701   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
702
703   /* Stop collect pads */
704   pads->started = FALSE;
705   pads->eospads = 0;
706   pads->queuedpads = 0;
707
708   /* loop over the master pad list and flush buffers */
709   collected = pads->abidata.ABI.pad_list;
710   for (; collected; collected = g_slist_next (collected)) {
711     GstCollectData *data;
712     GstBuffer **buffer_p;
713
714     data = collected->data;
715     if (data->buffer) {
716       buffer_p = &data->buffer;
717       gst_buffer_replace (buffer_p, NULL);
718       data->pos = 0;
719     }
720     data->abidata.ABI.eos = FALSE;
721   }
722
723   GST_COLLECT_PADS_PAD_UNLOCK (pads);
724   /* Wake them up so they can end the chain functions. */
725   GST_COLLECT_PADS_BROADCAST (pads);
726
727   GST_OBJECT_UNLOCK (pads);
728 }
729
730 /**
731  * gst_collect_pads_peek:
732  * @pads: the collectspads to peek
733  * @data: the data to use
734  *
735  * Peek at the buffer currently queued in @data. This function
736  * should be called with the @pads LOCK held, such as in the callback
737  * handler.
738  *
739  * MT safe.
740  *
741  * Returns: (transfer full): The buffer in @data or NULL if no buffer is queued.
742  *  should unref the buffer after usage.
743  */
744 GstBuffer *
745 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
746 {
747   GstBuffer *result;
748
749   g_return_val_if_fail (pads != NULL, NULL);
750   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
751   g_return_val_if_fail (data != NULL, NULL);
752
753   if ((result = data->buffer))
754     gst_buffer_ref (result);
755
756   GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
757       GST_DEBUG_PAD_NAME (data->pad), result);
758
759   return result;
760 }
761
762 /**
763  * gst_collect_pads_pop:
764  * @pads: the collectspads to pop
765  * @data: the data to use
766  *
767  * Pop the buffer currently queued in @data. This function
768  * should be called with the @pads LOCK held, such as in the callback
769  * handler.
770  *
771  * Free-function: gst_buffer_unref
772  *
773  * MT safe.
774  *
775  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
776  *   queued. You should unref the buffer after usage.
777  */
778 GstBuffer *
779 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
780 {
781   GstBuffer *result;
782
783   g_return_val_if_fail (pads != NULL, NULL);
784   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
785   g_return_val_if_fail (data != NULL, NULL);
786
787   if ((result = data->buffer)) {
788     data->buffer = NULL;
789     data->pos = 0;
790     /* one less pad with queued data now */
791     pads->queuedpads--;
792   }
793
794   GST_COLLECT_PADS_BROADCAST (pads);
795
796   GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p",
797       GST_DEBUG_PAD_NAME (data->pad), result);
798
799   return result;
800 }
801
802 /* pop and unref the currently queued buffer, should e called with the LOCK
803  * helt. */
804 static void
805 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
806 {
807   GstBuffer *buf;
808
809   if ((buf = gst_collect_pads_pop (pads, data)))
810     gst_buffer_unref (buf);
811 }
812
813 /**
814  * gst_collect_pads_available:
815  * @pads: the collectspads to query
816  *
817  * Query how much bytes can be read from each queued buffer. This means
818  * that the result of this call is the maximum number of bytes that can
819  * be read from each of the pads.
820  *
821  * This function should be called with @pads LOCK held, such as
822  * in the callback.
823  *
824  * MT safe.
825  *
826  * Returns: The maximum number of bytes queued on all pads. This function
827  * returns 0 if a pad has no queued buffer.
828  */
829 /* FIXME, we can do this in the _chain functions */
830 guint
831 gst_collect_pads_available (GstCollectPads * pads)
832 {
833   GSList *collected;
834   guint result = G_MAXUINT;
835
836   g_return_val_if_fail (pads != NULL, 0);
837   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
838
839   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
840     GstCollectData *pdata;
841     GstBuffer *buffer;
842     gint size;
843
844     pdata = (GstCollectData *) collected->data;
845
846     /* ignore pad with EOS */
847     if (G_UNLIKELY (pdata->abidata.ABI.eos)) {
848       GST_DEBUG ("pad %s:%s is EOS", GST_DEBUG_PAD_NAME (pdata->pad));
849       continue;
850     }
851
852     /* an empty buffer without EOS is weird when we get here.. */
853     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
854       GST_WARNING ("pad %s:%s has no buffer", GST_DEBUG_PAD_NAME (pdata->pad));
855       goto not_filled;
856     }
857
858     /* this is the size left of the buffer */
859     size = gst_buffer_get_size (buffer) - pdata->pos;
860     GST_DEBUG ("pad %s:%s has %d bytes left",
861         GST_DEBUG_PAD_NAME (pdata->pad), size);
862
863     /* need to return the min of all available data */
864     if (size < result)
865       result = size;
866   }
867   /* nothing changed, all must be EOS then, return 0 */
868   if (G_UNLIKELY (result == G_MAXUINT))
869     result = 0;
870
871   return result;
872
873 not_filled:
874   {
875     return 0;
876   }
877 }
878
879 /**
880  * gst_collect_pads_read_buffer:
881  * @pads: the collectspads to query
882  * @data: the data to use
883  * @size: the number of bytes to read
884  *
885  * Get a buffer of @size bytes from the given pad @data.
886  *
887  * This function should be called with @pads LOCK held, such as in the callback.
888  *
889  * Free-function: gst_buffer_unref
890  *
891  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
892  *     that requested. A return of NULL signals that the pad is end-of-stream.
893  *     Unref the buffer with gst_buffer_unref() after use.
894  *
895  * MT safe.
896  *
897  * Since: 0.10.18
898  */
899 GstBuffer *
900 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
901     guint size)
902 {
903   guint readsize, bufsize;
904   GstBuffer *buffer;
905
906   g_return_val_if_fail (pads != NULL, NULL);
907   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
908   g_return_val_if_fail (data != NULL, NULL);
909
910   /* no buffer, must be EOS */
911   if ((buffer = data->buffer) == NULL)
912     return NULL;
913
914   bufsize = gst_buffer_get_size (buffer);
915
916   readsize = MIN (size, bufsize - data->pos);
917
918   if (data->pos == 0 && readsize == bufsize)
919     return gst_buffer_ref (buffer);
920   else
921     return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
922         readsize);
923 }
924
925 /**
926  * gst_collect_pads_take_buffer:
927  * @pads: the collectspads to query
928  * @data: the data to use
929  * @size: the number of bytes to read
930  *
931  * Get a buffer of @size bytes from the given pad @data. Flushes the amount
932  * of read bytes.
933  *
934  * This function should be called with @pads LOCK held, such as in the callback.
935  *
936  * Free-function: gst_buffer_unref
937  *
938  * MT safe.
939  *
940  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
941  *     that requested. A return of NULL signals that the pad is end-of-stream.
942  *     Unref the buffer after use.
943  *
944  * Since: 0.10.18
945  */
946 GstBuffer *
947 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
948     guint size)
949 {
950   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
951
952   if (buffer) {
953     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
954   }
955   return buffer;
956 }
957
958 /**
959  * gst_collect_pads_flush:
960  * @pads: the collectspads to query
961  * @data: the data to use
962  * @size: the number of bytes to flush
963  *
964  * Flush @size bytes from the pad @data.
965  *
966  * This function should be called with @pads LOCK held, such as
967  * in the callback.
968  *
969  * MT safe.
970  *
971  * Returns: The number of bytes flushed. This can be less than @size and
972  * is 0 if the pad was end-of-stream.
973  */
974 guint
975 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
976     guint size)
977 {
978   guint flushsize;
979   GstBuffer *buffer;
980   gsize bsize;
981
982   g_return_val_if_fail (pads != NULL, 0);
983   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
984   g_return_val_if_fail (data != NULL, 0);
985
986   /* no buffer, must be EOS */
987   if ((buffer = data->buffer) == NULL)
988     return 0;
989
990   bsize = gst_buffer_get_size (buffer);
991
992   /* this is what we can flush at max */
993   flushsize = MIN (size, bsize - data->pos);
994
995   data->pos += size;
996
997   GST_LOG_OBJECT (pads, "Flushing %d bytes, requested %u", flushsize, size);
998
999   if (data->pos >= bsize)
1000     /* _clear will also reset data->pos to 0 */
1001     gst_collect_pads_clear (pads, data);
1002
1003   return flushsize;
1004 }
1005
1006 /* see if pads were added or removed and update our stats. Any pad
1007  * added after releasing the PAD_LOCK will get collected in the next
1008  * round.
1009  *
1010  * We can do a quick check by checking the cookies, that get changed
1011  * whenever the pad list is updated.
1012  *
1013  * Must be called with LOCK.
1014  */
1015 static void
1016 gst_collect_pads_check_pads_unlocked (GstCollectPads * pads)
1017 {
1018   GST_DEBUG ("stored cookie : %d, used_cookie:%d",
1019       pads->abidata.ABI.pad_cookie, pads->cookie);
1020   if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) {
1021     GSList *collected;
1022
1023     /* clear list and stats */
1024     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1025     g_slist_free (pads->data);
1026     pads->data = NULL;
1027     pads->numpads = 0;
1028     pads->queuedpads = 0;
1029     pads->eospads = 0;
1030
1031     /* loop over the master pad list */
1032     collected = pads->abidata.ABI.pad_list;
1033     for (; collected; collected = g_slist_next (collected)) {
1034       GstCollectData *data;
1035
1036       /* update the stats */
1037       pads->numpads++;
1038       data = collected->data;
1039
1040       if (G_LIKELY (!data->abidata.ABI.flushing)) {
1041         if (data->buffer)
1042           pads->queuedpads++;
1043         if (data->abidata.ABI.eos)
1044           pads->eospads++;
1045       }
1046
1047       /* add to the list of pads to collect */
1048       ref_data (data);
1049       pads->data = g_slist_prepend (pads->data, data);
1050     }
1051     /* and update the cookie */
1052     pads->cookie = pads->abidata.ABI.pad_cookie;
1053   }
1054 }
1055
1056 static inline void
1057 gst_collect_pads_check_pads (GstCollectPads * pads)
1058 {
1059   /* the master list and cookie are protected with the PAD_LOCK */
1060   GST_COLLECT_PADS_PAD_LOCK (pads);
1061   gst_collect_pads_check_pads_unlocked (pads);
1062   GST_COLLECT_PADS_PAD_UNLOCK (pads);
1063 }
1064
1065 /* checks if all the pads are collected and call the collectfunction
1066  *
1067  * Should be called with LOCK.
1068  *
1069  * Returns: The #GstFlowReturn of collection.
1070  */
1071 static GstFlowReturn
1072 gst_collect_pads_check_collected (GstCollectPads * pads)
1073 {
1074   GstFlowReturn flow_ret = GST_FLOW_OK;
1075
1076   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1077   g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
1078
1079   /* check for new pads, update stats etc.. */
1080   gst_collect_pads_check_pads (pads);
1081
1082   if (G_UNLIKELY (pads->eospads == pads->numpads)) {
1083     /* If all our pads are EOS just collect once to let the element
1084      * do its final EOS handling. */
1085     GST_DEBUG ("All active pads (%d) are EOS, calling %s",
1086         pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
1087     flow_ret = pads->func (pads, pads->user_data);
1088   } else {
1089     gboolean collected = FALSE;
1090
1091     /* We call the collected function as long as our condition matches.
1092      * FIXME: should we error out if the collect function did not pop anything ?
1093      * we can get a busy loop here if the element does not pop from the collect
1094      * function
1095      */
1096     while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
1097       GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s",
1098           pads->queuedpads, pads->eospads, pads->numpads,
1099           GST_DEBUG_FUNCPTR_NAME (pads->func));
1100       flow_ret = pads->func (pads, pads->user_data);
1101       collected = TRUE;
1102
1103       /* break on error */
1104       if (flow_ret != GST_FLOW_OK)
1105         break;
1106       /* Don't keep looping after telling the element EOS or flushing */
1107       if (pads->queuedpads == 0)
1108         break;
1109     }
1110     if (!collected)
1111       GST_DEBUG ("Not all active pads (%d) have data, continuing",
1112           pads->numpads);
1113   }
1114   return flow_ret;
1115 }
1116
1117 static gboolean
1118 gst_collect_pads_event (GstPad * pad, GstEvent * event)
1119 {
1120   gboolean res;
1121   GstCollectData *data;
1122   GstCollectPads *pads;
1123
1124   /* some magic to get the managing collect_pads */
1125   GST_OBJECT_LOCK (pad);
1126   data = (GstCollectData *) gst_pad_get_element_private (pad);
1127   if (G_UNLIKELY (data == NULL))
1128     goto pad_removed;
1129   ref_data (data);
1130   GST_OBJECT_UNLOCK (pad);
1131
1132   res = TRUE;
1133
1134   pads = data->collect;
1135
1136   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
1137       GST_DEBUG_PAD_NAME (data->pad));
1138
1139   switch (GST_EVENT_TYPE (event)) {
1140     case GST_EVENT_FLUSH_START:
1141     {
1142       /* forward event to unblock check_collected */
1143       gst_pad_event_default (pad, event);
1144
1145       /* now unblock the chain function.
1146        * no cond per pad, so they all unblock,
1147        * non-flushing block again */
1148       GST_OBJECT_LOCK (pads);
1149       data->abidata.ABI.flushing = TRUE;
1150       gst_collect_pads_clear (pads, data);
1151       GST_OBJECT_UNLOCK (pads);
1152
1153       /* event already cleaned up by forwarding */
1154       goto done;
1155     }
1156     case GST_EVENT_FLUSH_STOP:
1157     {
1158       /* flush the 1 buffer queue */
1159       GST_OBJECT_LOCK (pads);
1160       data->abidata.ABI.flushing = FALSE;
1161       gst_collect_pads_clear (pads, data);
1162       /* we need new segment info after the flush */
1163       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1164       data->abidata.ABI.new_segment = FALSE;
1165       /* if the pad was EOS, remove the EOS flag and
1166        * decrement the number of eospads */
1167       if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) {
1168         pads->eospads--;
1169         data->abidata.ABI.eos = FALSE;
1170       }
1171
1172       if (!gst_collect_pads_is_flushing (pads)) {
1173         /* forward event if all pads are no longer flushing */
1174         GST_DEBUG ("No more pads are flushing, forwarding FLUSH_STOP");
1175         GST_OBJECT_UNLOCK (pads);
1176         goto forward;
1177       }
1178       gst_event_unref (event);
1179       GST_OBJECT_UNLOCK (pads);
1180       goto done;
1181     }
1182     case GST_EVENT_EOS:
1183     {
1184       GST_OBJECT_LOCK (pads);
1185       /* if the pad was not EOS, make it EOS and so we
1186        * have one more eospad */
1187       if (G_LIKELY (data->abidata.ABI.eos == FALSE)) {
1188         data->abidata.ABI.eos = TRUE;
1189         pads->eospads++;
1190       }
1191       /* check if we need collecting anything, we ignore the
1192        * result. */
1193       gst_collect_pads_check_collected (pads);
1194       GST_OBJECT_UNLOCK (pads);
1195
1196       /* We eat this event, element should do something
1197        * in the collected callback. */
1198       gst_event_unref (event);
1199       goto done;
1200     }
1201     case GST_EVENT_SEGMENT:
1202     {
1203       gst_event_parse_segment (event, &data->segment);
1204
1205       GST_DEBUG_OBJECT (data->pad, "got newsegment %" GST_SEGMENT_FORMAT,
1206           &data->segment);
1207       data->abidata.ABI.new_segment = TRUE;
1208
1209       /* we must not forward this event since multiple segments will be
1210        * accumulated and this is certainly not what we want. */
1211       gst_event_unref (event);
1212       /* FIXME: collect-pads based elements need to create their own newsegment
1213        * event (and only one really)
1214        * (a) make the segment part of the GstCollectData structure of each pad,
1215        * so you can just check that once you have a buffer queued on that pad,
1216        * (b) you can override a pad's event function with your own,
1217        * catch the newsegment event and then pass it on to the original
1218        * gstcollectpads event function
1219        * (that's what avimux does for something IIRC)
1220        * see #340060
1221        */
1222       goto done;
1223     }
1224     default:
1225       /* forward other events */
1226       goto forward;
1227   }
1228
1229 forward:
1230   GST_DEBUG_OBJECT (pads, "forward unhandled event: %s",
1231       GST_EVENT_TYPE_NAME (event));
1232   res = gst_pad_event_default (pad, event);
1233
1234 done:
1235   unref_data (data);
1236   return res;
1237
1238   /* ERRORS */
1239 pad_removed:
1240   {
1241     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1242     GST_OBJECT_UNLOCK (pad);
1243     return FALSE;
1244   }
1245 }
1246
1247 /* For each buffer we receive we check if our collected condition is reached
1248  * and if so we call the collected function. When this is done we check if
1249  * data has been unqueued. If data is still queued we wait holding the stream
1250  * lock to make sure no EOS event can happen while we are ready to be
1251  * collected
1252  */
1253 static GstFlowReturn
1254 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
1255 {
1256   GstCollectData *data;
1257   GstCollectPads *pads;
1258   GstCollectPadsPrivate *priv;
1259   GstFlowReturn ret;
1260
1261   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1262
1263   /* some magic to get the managing collect_pads */
1264   GST_OBJECT_LOCK (pad);
1265   data = (GstCollectData *) gst_pad_get_element_private (pad);
1266   if (G_UNLIKELY (data == NULL))
1267     goto no_data;
1268   ref_data (data);
1269   GST_OBJECT_UNLOCK (pad);
1270
1271   pads = data->collect;
1272   priv = pads->abidata.ABI.priv;
1273
1274   GST_OBJECT_LOCK (pads);
1275   /* if not started, bail out */
1276   if (G_UNLIKELY (!pads->started))
1277     goto not_started;
1278   /* check if this pad is flushing */
1279   if (G_UNLIKELY (data->abidata.ABI.flushing))
1280     goto flushing;
1281   /* pad was EOS, we can refuse this data */
1282   if (G_UNLIKELY (data->abidata.ABI.eos))
1283     goto unexpected;
1284
1285   /* see if we need to clip */
1286   if (priv->clipfunc) {
1287     buffer = priv->clipfunc (pads, data, buffer, priv->clipfunc_user_data);
1288
1289     if (G_UNLIKELY (buffer == NULL))
1290       goto clipped;
1291   }
1292
1293   GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
1294       GST_DEBUG_PAD_NAME (pad));
1295
1296   /* One more pad has data queued */
1297   pads->queuedpads++;
1298   /* take ownership of the buffer */
1299   if (data->buffer)
1300     gst_buffer_unref (data->buffer);
1301   data->buffer = buffer;
1302   buffer = NULL;
1303
1304   /* update segment last position if in TIME */
1305   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1306     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (data->buffer);
1307
1308     if (GST_CLOCK_TIME_IS_VALID (timestamp))
1309       data->segment.position = timestamp;
1310   }
1311
1312   /* While we have data queued on this pad try to collect stuff */
1313   do {
1314     GST_DEBUG ("Pad %s:%s checking", GST_DEBUG_PAD_NAME (pad));
1315     /* Check if our collected condition is matched and call the collected function
1316      * if it is */
1317     ret = gst_collect_pads_check_collected (pads);
1318     /* when an error occurs, we want to report this back to the caller ASAP
1319      * without having to block if the buffer was not popped */
1320     if (G_UNLIKELY (ret != GST_FLOW_OK))
1321       goto error;
1322
1323     /* data was consumed, we can exit and accept new data */
1324     if (data->buffer == NULL)
1325       break;
1326
1327     /* Check if we got removed in the mean time, FIXME, this is racy.
1328      * Between this check and the _WAIT, the pad could be removed which will
1329      * makes us hang in the _WAIT. */
1330     GST_OBJECT_LOCK (pad);
1331     if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL))
1332       goto pad_removed;
1333     GST_OBJECT_UNLOCK (pad);
1334
1335     GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
1336         GST_DEBUG_PAD_NAME (pad));
1337
1338     /* wait to be collected, this must happen from another thread triggered
1339      * by the _chain function of another pad. We release the lock so we
1340      * can get stopped or flushed as well. We can however not get EOS
1341      * because we still hold the STREAM_LOCK.
1342      */
1343     GST_COLLECT_PADS_WAIT (pads);
1344
1345     GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
1346
1347     /* after a signal, we could be stopped */
1348     if (G_UNLIKELY (!pads->started))
1349       goto not_started;
1350     /* check if this pad is flushing */
1351     if (G_UNLIKELY (data->abidata.ABI.flushing))
1352       goto flushing;
1353   }
1354   while (data->buffer != NULL);
1355
1356 unlock_done:
1357   GST_DEBUG ("Pad %s:%s done", GST_DEBUG_PAD_NAME (pad));
1358   GST_OBJECT_UNLOCK (pads);
1359   unref_data (data);
1360   if (buffer)
1361     gst_buffer_unref (buffer);
1362   return ret;
1363
1364 pad_removed:
1365   {
1366     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1367     GST_OBJECT_UNLOCK (pad);
1368     ret = GST_FLOW_NOT_LINKED;
1369     goto unlock_done;
1370   }
1371   /* ERRORS */
1372 no_data:
1373   {
1374     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1375     GST_OBJECT_UNLOCK (pad);
1376     gst_buffer_unref (buffer);
1377     return GST_FLOW_NOT_LINKED;
1378   }
1379 not_started:
1380   {
1381     GST_DEBUG ("not started");
1382     gst_collect_pads_clear (pads, data);
1383     ret = GST_FLOW_WRONG_STATE;
1384     goto unlock_done;
1385   }
1386 flushing:
1387   {
1388     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
1389     gst_collect_pads_clear (pads, data);
1390     ret = GST_FLOW_WRONG_STATE;
1391     goto unlock_done;
1392   }
1393 unexpected:
1394   {
1395     /* we should not post an error for this, just inform upstream that
1396      * we don't expect anything anymore */
1397     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
1398     ret = GST_FLOW_UNEXPECTED;
1399     goto unlock_done;
1400   }
1401 clipped:
1402   {
1403     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1404     ret = GST_FLOW_OK;
1405     goto unlock_done;
1406   }
1407 error:
1408   {
1409     /* we print the error, the element should post a reasonable error
1410      * message for fatal errors */
1411     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
1412     gst_collect_pads_clear (pads, data);
1413     goto unlock_done;
1414   }
1415 }