upload tizen1.0 source
[framework/multimedia/gst-plugins-good0.10.git] / gst / videomixer / gstcollectpads2.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads2.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23 /**
24  * SECTION:gstcollectpads2
25  * @short_description: manages a set of pads that operate in collect mode
26  * @see_also:
27  *
28  * Manages a set of pads that operate in collect mode. This means that control
29  * is given to the manager of this object when all pads have data.
30  * <itemizedlist>
31  *   <listitem><para>
32  *     Collectpads are created with gst_collect_pads2_new(). A callback should then
33  *     be installed with gst_collect_pads2_set_function ().
34  *   </para></listitem>
35  *   <listitem><para>
36  *     Pads are added to the collection with gst_collect_pads2_add_pad()/
37  *     gst_collect_pads2_remove_pad(). The pad
38  *     has to be a sinkpad. The chain and event functions of the pad are
39  *     overridden. The element_private of the pad is used to store
40  *     private information for the collectpads.
41  *   </para></listitem>
42  *   <listitem><para>
43  *     For each pad, data is queued in the _chain function or by
44  *     performing a pull_range.
45  *   </para></listitem>
46  *   <listitem><para>
47  *     When data is queued on all pads in waiting mode, the callback function is called.
48  *   </para></listitem>
49  *   <listitem><para>
50  *     Data can be dequeued from the pad with the gst_collect_pads2_pop() method.
51  *     One can peek at the data with the gst_collect_pads2_peek() function.
52  *     These functions will return NULL if the pad received an EOS event. When all
53  *     pads return NULL from a gst_collect_pads2_peek(), the element can emit an EOS
54  *     event itself.
55  *   </para></listitem>
56  *   <listitem><para>
57  *     Data can also be dequeued in byte units using the gst_collect_pads2_available(),
58  *     gst_collect_pads2_read() and gst_collect_pads2_flush() calls.
59  *   </para></listitem>
60  *   <listitem><para>
61  *     Elements should call gst_collect_pads2_start() and gst_collect_pads2_stop() in
62  *     their state change functions to start and stop the processing of the collectpads.
63  *     The gst_collect_pads2_stop() call should be called before calling the parent
64  *     element state change function in the PAUSED_TO_READY state change to ensure
65  *     no pad is blocked and the element can finish streaming.
66  *   </para></listitem>
67  *   <listitem><para>
68  *     gst_collect_pads2_collect() and gst_collect_pads2_collect_range() can be used by
69  *     elements that start a #GstTask to drive the collect_pads2. This feature is however
70  *     not yet implemented.
71  *   </para></listitem>
72  *   <listitem><para>
73  *     gst_collect_pads2_set_waiting() sets a pad to waiting or non-waiting mode.
74  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
75  *     Thus these pads may but need not have data when the callback is called.
76  *     All pads are in waiting mode by default.
77  *   </para></listitem>
78  * </itemizedlist>
79  *
80  * Last reviewed on 2008-03-14 (0.10.17)
81  */
82
83 #include "gstcollectpads2.h"
84
85 GST_DEBUG_CATEGORY_STATIC (collect_pads2_debug);
86 #define GST_CAT_DEFAULT collect_pads2_debug
87
88 GST_BOILERPLATE (GstCollectPads2, gst_collect_pads2, GstObject,
89     GST_TYPE_OBJECT);
90
91 static void gst_collect_pads2_clear (GstCollectPads2 * pads,
92     GstCollectData2 * data);
93 static GstFlowReturn gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer);
94 static gboolean gst_collect_pads2_event (GstPad * pad, GstEvent * event);
95 static void gst_collect_pads2_finalize (GObject * object);
96 static GstFlowReturn gst_collect_pads2_default_collected (GstCollectPads2 *
97     pads, gpointer user_data);
98 static gint gst_collect_pads2_default_compare_func (GstCollectPads2 * pads,
99     GstCollectData2 * data1, GstClockTime timestamp1, GstCollectData2 * data2,
100     GstClockTime timestamp2, gpointer user_data);
101 static gboolean gst_collect_pads2_recalculate_full (GstCollectPads2 * pads);
102 static void ref_data (GstCollectData2 * data);
103 static void unref_data (GstCollectData2 * data);
104
105 /* Some properties are protected by LOCK, others by STREAM_LOCK
106  * However, manipulating either of these partitions may require
107  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
108  * Alternative implementations are possible, e.g. some low-level re-implementing
109  * of the 2 above locks to drop both of them atomically when going into _WAIT.
110  */
111 #define GST_COLLECT_PADS2_GET_EVT_COND(pads) (((GstCollectPads2 *)pads)->evt_cond)
112 #define GST_COLLECT_PADS2_GET_EVT_LOCK(pads) (((GstCollectPads2 *)pads)->evt_lock)
113 #define GST_COLLECT_PADS2_EVT_WAIT(pads, cookie) G_STMT_START {    \
114   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
115   /* should work unless a lot of event'ing and thread starvation */\
116   while (cookie == ((GstCollectPads2 *) pads)->evt_cookie)         \
117     g_cond_wait (GST_COLLECT_PADS2_GET_EVT_COND (pads),            \
118         GST_COLLECT_PADS2_GET_EVT_LOCK (pads));                    \
119   cookie = ((GstCollectPads2 *) pads)->evt_cookie;                 \
120   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
121 } G_STMT_END
122 #define GST_COLLECT_PADS2_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
123   GTimeVal __tv; \
124   \
125   g_get_current_time (&tv); \
126   g_time_val_add (&tv, timeout); \
127   \
128   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
129   /* should work unless a lot of event'ing and thread starvation */\
130   while (cookie == ((GstCollectPads2 *) pads)->evt_cookie)         \
131     g_cond_timed_wait (GST_COLLECT_PADS2_GET_EVT_COND (pads),            \
132         GST_COLLECT_PADS2_GET_EVT_LOCK (pads), &tv);                    \
133   cookie = ((GstCollectPads2 *) pads)->evt_cookie;                 \
134   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
135 } G_STMT_END
136 #define GST_COLLECT_PADS2_EVT_BROADCAST(pads) G_STMT_START {       \
137   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
138   /* never mind wrap-around */                                     \
139   ++(((GstCollectPads2 *) pads)->evt_cookie);                      \
140   g_cond_broadcast (GST_COLLECT_PADS2_GET_EVT_COND (pads));        \
141   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
142 } G_STMT_END
143 #define GST_COLLECT_PADS2_EVT_INIT(cookie) G_STMT_START {          \
144   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
145   cookie = ((GstCollectPads2 *) pads)->evt_cookie;                 \
146   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
147 } G_STMT_END
148
149 static void
150 gst_collect_pads2_base_init (gpointer g_class)
151 {
152   /* Do nothing here */
153 }
154
155 static void
156 gst_collect_pads2_class_init (GstCollectPads2Class * klass)
157 {
158   GObjectClass *gobject_class = (GObjectClass *) klass;
159
160   GST_DEBUG_CATEGORY_INIT (collect_pads2_debug, "collectpads2", 0,
161       "GstCollectPads2");
162
163   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads2_finalize);
164 }
165
166 static void
167 gst_collect_pads2_init (GstCollectPads2 * pads, GstCollectPads2Class * g_class)
168 {
169   pads->data = NULL;
170   pads->cookie = 0;
171   pads->numpads = 0;
172   pads->queuedpads = 0;
173   pads->eospads = 0;
174   pads->started = FALSE;
175
176   g_static_rec_mutex_init (&pads->stream_lock);
177
178   pads->func = gst_collect_pads2_default_collected;
179   pads->user_data = NULL;
180   pads->event_func = NULL;
181   pads->event_user_data = NULL;
182
183   pads->prepare_buffer_func = NULL;
184   pads->prepare_buffer_user_data = NULL;
185
186   /* members for default muxing */
187   pads->buffer_func = NULL;
188   pads->buffer_user_data = NULL;
189   pads->compare_func = gst_collect_pads2_default_compare_func;
190   pads->compare_user_data = NULL;
191   pads->earliest_data = NULL;
192   pads->earliest_time = GST_CLOCK_TIME_NONE;
193
194   /* members to manage the pad list */
195   pads->pad_cookie = 0;
196   pads->pad_list = NULL;
197
198   /* members for event */
199   pads->evt_lock = g_mutex_new ();
200   pads->evt_cond = g_cond_new ();
201   pads->evt_cookie = 0;
202 }
203
204 static void
205 gst_collect_pads2_finalize (GObject * object)
206 {
207   GstCollectPads2 *pads = GST_COLLECT_PADS2 (object);
208
209   GST_DEBUG_OBJECT (object, "finalize");
210
211   g_static_rec_mutex_free (&pads->stream_lock);
212
213   g_cond_free (pads->evt_cond);
214   g_mutex_free (pads->evt_lock);
215
216   /* Remove pads and free pads list */
217   g_slist_foreach (pads->pad_list, (GFunc) unref_data, NULL);
218   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
219   g_slist_free (pads->data);
220   g_slist_free (pads->pad_list);
221
222   G_OBJECT_CLASS (parent_class)->finalize (object);
223 }
224
225 /**
226  * gst_collect_pads2_new:
227  *
228  * Create a new instance of #GstCollectsPads.
229  *
230  * Returns: a new #GstCollectPads2, or NULL in case of an error.
231  *
232  * MT safe.
233  */
234 GstCollectPads2 *
235 gst_collect_pads2_new (void)
236 {
237   GstCollectPads2 *newcoll;
238
239   newcoll = g_object_new (GST_TYPE_COLLECT_PADS2, NULL);
240
241   return newcoll;
242 }
243
244 /**
245  * gst_collect_pads2_set_prepare_buffer_function:
246  * @pads: the collectpads to use
247  * @func: the function to set
248  * @user_data: user data passed to the function
249  *
250  * Set the callback function and user data that will be called
251  * for every buffer that arrives.
252  *
253  * MT safe.
254  */
255 void
256 gst_collect_pads2_set_prepare_buffer_function (GstCollectPads2 * pads,
257     GstCollectPads2BufferFunction func, gpointer user_data)
258 {
259   g_return_if_fail (pads != NULL);
260   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
261
262   GST_OBJECT_LOCK (pads);
263   pads->prepare_buffer_func = func;
264   pads->prepare_buffer_user_data = user_data;
265   GST_OBJECT_UNLOCK (pads);
266 }
267
268 /* Must be called with GstObject lock! */
269 static void
270 gst_collect_pads2_set_buffer_function_locked (GstCollectPads2 * pads,
271     GstCollectPads2BufferFunction func, gpointer user_data)
272 {
273   pads->buffer_func = func;
274   pads->buffer_user_data = user_data;
275 }
276
277 /**
278  * gst_collect_pads2_set_buffer_function:
279  * @pads: the collectpads to use
280  * @func: the function to set
281  * @user_data: user data passed to the function
282  *
283  * Set the callback function and user data that will be called with
284  * the oldest buffer when all pads have been collected.
285  *
286  * MT safe.
287  */
288 void
289 gst_collect_pads2_set_buffer_function (GstCollectPads2 * pads,
290     GstCollectPads2BufferFunction func, gpointer user_data)
291 {
292   g_return_if_fail (pads != NULL);
293   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
294
295   GST_OBJECT_LOCK (pads);
296   gst_collect_pads2_set_buffer_function_locked (pads, func, user_data);
297   GST_OBJECT_UNLOCK (pads);
298 }
299
300 /**
301  * gst_collect_pads2_set_compare_function:
302  * @pads: the pads to use
303  * @func: the function to set
304  * @user_data: user data passed to the function
305  *
306  * Set the timestamp comparison function.
307  *
308  * MT safe.
309  */
310 /* NOTE allowing to change comparison seems not advisable;
311 no known use-case, and collaboration with default algorithm is unpredictable.
312 If custom compairing/operation is needed, just use a collect function of
313 your own */
314 void
315 gst_collect_pads2_set_compare_function (GstCollectPads2 * pads,
316     GstCollectPads2CompareFunction func, gpointer user_data)
317 {
318   g_return_if_fail (pads != NULL);
319   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
320
321   GST_OBJECT_LOCK (pads);
322   pads->compare_func = func;
323   pads->compare_user_data = user_data;
324   GST_OBJECT_UNLOCK (pads);
325 }
326
327 /**
328  * gst_collect_pads2_set_function:
329  * @pads: the collectspads to use
330  * @func: the function to set
331  * @user_data: user data passed to the function
332  *
333  * CollectPads provides a default collection algorithm that will determine
334  * the oldest buffer available on all of its pads, and then delegate
335  * to a configured callback.
336  * However, if circumstances are more complicated and/or more control
337  * is desired, this sets a callback that will be invoked instead when
338  * all the pads added to the collection have buffers queued.
339  * Evidently, this callback is not compatible with
340  * gst_collect_pads2_set_buffer_function() callback.
341  * If this callback is set, the former will be unset.
342  *
343  * MT safe.
344  */
345 void
346 gst_collect_pads2_set_function (GstCollectPads2 * pads,
347     GstCollectPads2Function func, gpointer user_data)
348 {
349   g_return_if_fail (pads != NULL);
350   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
351
352   GST_OBJECT_LOCK (pads);
353   pads->func = func;
354   pads->user_data = user_data;
355   gst_collect_pads2_set_buffer_function_locked (pads, NULL, NULL);
356   GST_OBJECT_UNLOCK (pads);
357 }
358
359 static void
360 ref_data (GstCollectData2 * data)
361 {
362   g_assert (data != NULL);
363
364   g_atomic_int_inc (&(data->refcount));
365 }
366
367 static void
368 unref_data (GstCollectData2 * data)
369 {
370   g_assert (data != NULL);
371   g_assert (data->refcount > 0);
372
373   if (!g_atomic_int_dec_and_test (&(data->refcount)))
374     return;
375
376   if (data->destroy_notify)
377     data->destroy_notify (data);
378
379   g_object_unref (data->pad);
380   if (data->buffer) {
381     gst_buffer_unref (data->buffer);
382   }
383   g_free (data);
384 }
385
386 /**
387  * gst_collect_pads2_set_event_function:
388  * @pads: the collectspads to use
389  * @func: the function to set
390  * @user_data: user data passed to the function
391  *
392  * Set the event callback function and user data that will be called after
393  * collectpads has processed and event originating from one of the collected
394  * pads.  If the event being processed is a serialized one, this callback is
395  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
396  * held when calling a number of CollectPads functions, it should be acquired
397  * if so (unusually) needed.
398  *
399  * MT safe.
400  */
401 void
402 gst_collect_pads2_set_event_function (GstCollectPads2 * pads,
403     GstCollectPads2EventFunction func, gpointer user_data)
404 {
405   g_return_if_fail (pads != NULL);
406   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
407
408   GST_OBJECT_LOCK (pads);
409   pads->event_func = func;
410   pads->event_user_data = user_data;
411   GST_OBJECT_UNLOCK (pads);
412 }
413
414 /**
415  * gst_collect_pads2_add_pad:
416  * @pads: the collectspads to use
417  * @pad: the pad to add
418  * @size: the size of the returned #GstCollectData2 structure
419  *
420  * Add a pad to the collection of collect pads. The pad has to be
421  * a sinkpad. The refcount of the pad is incremented. Use
422  * gst_collect_pads2_remove_pad() to remove the pad from the collection
423  * again.
424  *
425  * You specify a size for the returned #GstCollectData2 structure
426  * so that you can use it to store additional information.
427  *
428  * The pad will be automatically activated in push mode when @pads is
429  * started.
430  *
431  * This function calls gst_collect_pads2_add_pad() passing a value of NULL
432  * for destroy_notify and TRUE for locked.
433  *
434  * Returns: a new #GstCollectData2 to identify the new pad. Or NULL
435  *   if wrong parameters are supplied.
436  *
437  * MT safe.
438  */
439 GstCollectData2 *
440 gst_collect_pads2_add_pad (GstCollectPads2 * pads, GstPad * pad, guint size)
441 {
442   return gst_collect_pads2_add_pad_full (pads, pad, size, NULL, TRUE);
443 }
444
445 /**
446  * gst_collect_pads2_add_pad_full:
447  * @pads: the collectspads to use
448  * @pad: the pad to add
449  * @size: the size of the returned #GstCollectData2 structure
450  * @destroy_notify: function to be called before the returned #GstCollectData2
451  * structure is freed
452  * @lock: whether to lock this pad in usual waiting state
453  *
454  * Add a pad to the collection of collect pads. The pad has to be
455  * a sinkpad. The refcount of the pad is incremented. Use
456  * gst_collect_pads2_remove_pad() to remove the pad from the collection
457  * again.
458  *
459  * You specify a size for the returned #GstCollectData2 structure
460  * so that you can use it to store additional information.
461  *
462  * You can also specify a #GstCollectData2DestroyNotify that will be called
463  * just before the #GstCollectData2 structure is freed. It is passed the
464  * pointer to the structure and should free any custom memory and resources
465  * allocated for it.
466  *
467  * Keeping a pad locked in waiting state is only relevant when using
468  * the default collection algorithm (providing the oldest buffer).
469  * It ensures a buffer must be available on this pad for a collection
470  * to take place.  This is of typical use to a muxer element where
471  * non-subtitle streams should always be in waiting state,
472  * e.g. to assure that caps information is available on all these streams
473  * when initial headers have to be written.
474  *
475  * The pad will be automatically activated in push mode when @pads is
476  * started.
477  *
478  * Since: 0.10.12
479  *
480  * Returns: a new #GstCollectData2 to identify the new pad. Or NULL
481  *   if wrong parameters are supplied.
482  *
483  * MT safe.
484  */
485 GstCollectData2 *
486 gst_collect_pads2_add_pad_full (GstCollectPads2 * pads, GstPad * pad,
487     guint size, GstCollectData2DestroyNotify destroy_notify, gboolean lock)
488 {
489   GstCollectData2 *data;
490
491   g_return_val_if_fail (pads != NULL, NULL);
492   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
493   g_return_val_if_fail (pad != NULL, NULL);
494   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
495   g_return_val_if_fail (size >= sizeof (GstCollectData2), NULL);
496
497   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
498
499   data = g_malloc0 (size);
500   data->collect = pads;
501   data->pad = gst_object_ref (pad);
502   data->buffer = NULL;
503   data->pos = 0;
504   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
505   data->state = GST_COLLECT_PADS2_STATE_WAITING;
506   data->state |= lock ? GST_COLLECT_PADS2_STATE_LOCKED : 0;
507   data->refcount = 1;
508   data->destroy_notify = destroy_notify;
509
510   GST_OBJECT_LOCK (pads);
511   GST_OBJECT_LOCK (pad);
512   gst_pad_set_element_private (pad, data);
513   GST_OBJECT_UNLOCK (pad);
514   pads->pad_list = g_slist_append (pads->pad_list, data);
515   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_chain));
516   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_event));
517   /* backward compat, also add to data if stopped, so that the element already
518    * has this in the public data list before going PAUSED (typically)
519    * this can only be done when we are stopped because we don't take the
520    * STREAM_LOCK to protect the pads->data list. */
521   if (!pads->started) {
522     pads->data = g_slist_append (pads->data, data);
523     ref_data (data);
524   }
525   /* activate the pad when needed */
526   if (pads->started)
527     gst_pad_set_active (pad, TRUE);
528   pads->pad_cookie++;
529   GST_OBJECT_UNLOCK (pads);
530
531   return data;
532 }
533
534 static gint
535 find_pad (GstCollectData2 * data, GstPad * pad)
536 {
537   if (data->pad == pad)
538     return 0;
539   return 1;
540 }
541
542 /**
543  * gst_collect_pads2_remove_pad:
544  * @pads: the collectspads to use
545  * @pad: the pad to remove
546  *
547  * Remove a pad from the collection of collect pads. This function will also
548  * free the #GstCollectData2 and all the resources that were allocated with
549  * gst_collect_pads2_add_pad().
550  *
551  * The pad will be deactivated automatically when @pads is stopped.
552  *
553  * Returns: %TRUE if the pad could be removed.
554  *
555  * MT safe.
556  */
557 gboolean
558 gst_collect_pads2_remove_pad (GstCollectPads2 * pads, GstPad * pad)
559 {
560   GstCollectData2 *data;
561   GSList *list;
562
563   g_return_val_if_fail (pads != NULL, FALSE);
564   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), FALSE);
565   g_return_val_if_fail (pad != NULL, FALSE);
566   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
567
568   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
569
570   GST_OBJECT_LOCK (pads);
571   list = g_slist_find_custom (pads->pad_list, pad, (GCompareFunc) find_pad);
572   if (!list)
573     goto unknown_pad;
574
575   data = (GstCollectData2 *) list->data;
576
577   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
578       data);
579
580   /* clear the stuff we configured */
581   gst_pad_set_chain_function (pad, NULL);
582   gst_pad_set_event_function (pad, NULL);
583   GST_OBJECT_LOCK (pad);
584   gst_pad_set_element_private (pad, NULL);
585   GST_OBJECT_UNLOCK (pad);
586
587   /* backward compat, also remove from data if stopped, note that this function
588    * can only be called when we are stopped because we don't take the
589    * STREAM_LOCK to protect the pads->data list. */
590   if (!pads->started) {
591     GSList *dlist;
592
593     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
594     if (dlist) {
595       GstCollectData2 *pdata = dlist->data;
596
597       pads->data = g_slist_delete_link (pads->data, dlist);
598       unref_data (pdata);
599     }
600   }
601   /* remove from the pad list */
602   pads->pad_list = g_slist_delete_link (pads->pad_list, list);
603   pads->pad_cookie++;
604
605   /* signal waiters because something changed */
606   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
607
608   /* deactivate the pad when needed */
609   if (!pads->started)
610     gst_pad_set_active (pad, FALSE);
611
612   /* clean and free the collect data */
613   unref_data (data);
614
615   GST_OBJECT_UNLOCK (pads);
616
617   return TRUE;
618
619 unknown_pad:
620   {
621     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
622         GST_DEBUG_PAD_NAME (pad));
623     GST_OBJECT_UNLOCK (pads);
624     return FALSE;
625   }
626 }
627
628 /**
629  * gst_collect_pads2_is_active:
630  * @pads: the collectspads to use
631  * @pad: the pad to check
632  *
633  * Check if a pad is active.
634  *
635  * This function is currently not implemented.
636  *
637  * Returns: %TRUE if the pad is active.
638  *
639  * MT safe.
640  */
641 gboolean
642 gst_collect_pads2_is_active (GstCollectPads2 * pads, GstPad * pad)
643 {
644   g_return_val_if_fail (pads != NULL, FALSE);
645   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), FALSE);
646   g_return_val_if_fail (pad != NULL, FALSE);
647   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
648
649   g_warning ("gst_collect_pads2_is_active() is not implemented");
650
651   return FALSE;
652 }
653
654 /**
655  * gst_collect_pads2_collect:
656  * @pads: the collectspads to use
657  *
658  * Collect data on all pads. This function is usually called
659  * from a #GstTask function in an element. 
660  *
661  * This function is currently not implemented.
662  *
663  * Returns: #GstFlowReturn of the operation.
664  *
665  * MT safe.
666  */
667 GstFlowReturn
668 gst_collect_pads2_collect (GstCollectPads2 * pads)
669 {
670   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
671   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
672
673   g_warning ("gst_collect_pads2_collect() is not implemented");
674
675   return GST_FLOW_NOT_SUPPORTED;
676 }
677
678 /**
679  * gst_collect_pads2_collect_range:
680  * @pads: the collectspads to use
681  * @offset: the offset to collect
682  * @length: the length to collect
683  *
684  * Collect data with @offset and @length on all pads. This function
685  * is typically called in the getrange function of an element. 
686  *
687  * This function is currently not implemented.
688  *
689  * Returns: #GstFlowReturn of the operation.
690  *
691  * MT safe.
692  */
693 GstFlowReturn
694 gst_collect_pads2_collect_range (GstCollectPads2 * pads, guint64 offset,
695     guint length)
696 {
697   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
698   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
699
700   g_warning ("gst_collect_pads2_collect_range() is not implemented");
701
702   return GST_FLOW_NOT_SUPPORTED;
703 }
704
705 /*
706  * Must be called with STREAM_LOCK.
707  */
708 static void
709 gst_collect_pads2_set_flushing_unlocked (GstCollectPads2 * pads,
710     gboolean flushing)
711 {
712   GSList *walk = NULL;
713
714   /* Update the pads flushing flag */
715   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
716     GstCollectData2 *cdata = walk->data;
717
718     if (GST_IS_PAD (cdata->pad)) {
719       GST_OBJECT_LOCK (cdata->pad);
720       if (flushing)
721         GST_PAD_SET_FLUSHING (cdata->pad);
722       else
723         GST_PAD_UNSET_FLUSHING (cdata->pad);
724       if (flushing)
725         GST_COLLECT_PADS2_STATE_SET (cdata, GST_COLLECT_PADS2_STATE_FLUSHING);
726       else
727         GST_COLLECT_PADS2_STATE_UNSET (cdata, GST_COLLECT_PADS2_STATE_FLUSHING);
728       gst_collect_pads2_clear (pads, cdata);
729       GST_OBJECT_UNLOCK (cdata->pad);
730     }
731   }
732
733   /* inform _chain of changes */
734   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
735 }
736
737 /**
738  * gst_collect_pads2_set_flushing:
739  * @pads: the collectspads to use
740  * @flushing: desired state of the pads
741  *
742  * Change the flushing state of all the pads in the collection. No pad
743  * is able to accept anymore data when @flushing is %TRUE. Calling this
744  * function with @flushing %FALSE makes @pads accept data again.
745  * Caller must ensure that downstream streaming (thread) is not blocked,
746  * e.g. by sending a FLUSH_START downstream.
747  *
748  * MT safe.
749  *
750  * Since: 0.10.7.
751  */
752 void
753 gst_collect_pads2_set_flushing (GstCollectPads2 * pads, gboolean flushing)
754 {
755   g_return_if_fail (pads != NULL);
756   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
757
758   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
759   GST_COLLECT_PADS2_STREAM_LOCK (pads);
760   gst_collect_pads2_set_flushing_unlocked (pads, flushing);
761   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
762 }
763
764 /**
765  * gst_collect_pads2_start:
766  * @pads: the collectspads to use
767  *
768  * Starts the processing of data in the collect_pads2.
769  *
770  * MT safe.
771  */
772 void
773 gst_collect_pads2_start (GstCollectPads2 * pads)
774 {
775   GSList *collected;
776
777   g_return_if_fail (pads != NULL);
778   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
779
780   GST_DEBUG_OBJECT (pads, "starting collect pads");
781
782   /* make sure stop and collect cannot be called anymore */
783   GST_COLLECT_PADS2_STREAM_LOCK (pads);
784
785   /* make pads streamable */
786   GST_OBJECT_LOCK (pads);
787
788   /* loop over the master pad list and reset the segment */
789   collected = pads->pad_list;
790   for (; collected; collected = g_slist_next (collected)) {
791     GstCollectData2 *data;
792
793     data = collected->data;
794     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
795   }
796
797   gst_collect_pads2_set_flushing_unlocked (pads, FALSE);
798
799   /* Start collect pads */
800   pads->started = TRUE;
801   GST_OBJECT_UNLOCK (pads);
802   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
803 }
804
805 /**
806  * gst_collect_pads2_stop:
807  * @pads: the collectspads to use
808  *
809  * Stops the processing of data in the collect_pads2. this function
810  * will also unblock any blocking operations.
811  *
812  * MT safe.
813  */
814 void
815 gst_collect_pads2_stop (GstCollectPads2 * pads)
816 {
817   GSList *collected;
818
819   g_return_if_fail (pads != NULL);
820   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
821
822   GST_DEBUG_OBJECT (pads, "stopping collect pads");
823
824   /* make sure collect and start cannot be called anymore */
825   GST_COLLECT_PADS2_STREAM_LOCK (pads);
826
827   /* make pads not accept data anymore */
828   GST_OBJECT_LOCK (pads);
829   gst_collect_pads2_set_flushing_unlocked (pads, TRUE);
830
831   /* Stop collect pads */
832   pads->started = FALSE;
833   pads->eospads = 0;
834   pads->queuedpads = 0;
835
836   /* loop over the master pad list and flush buffers */
837   collected = pads->pad_list;
838   for (; collected; collected = g_slist_next (collected)) {
839     GstCollectData2 *data;
840     GstBuffer **buffer_p;
841
842     data = collected->data;
843     if (data->buffer) {
844       buffer_p = &data->buffer;
845       gst_buffer_replace (buffer_p, NULL);
846       data->pos = 0;
847     }
848     GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_EOS);
849   }
850
851   if (pads->earliest_data)
852     unref_data (pads->earliest_data);
853   pads->earliest_data = NULL;
854   pads->earliest_time = GST_CLOCK_TIME_NONE;
855
856   GST_OBJECT_UNLOCK (pads);
857   /* Wake them up so they can end the chain functions. */
858   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
859
860   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
861 }
862
863 /**
864  * gst_collect_pads2_peek:
865  * @pads: the collectspads to peek
866  * @data: the data to use
867  *
868  * Peek at the buffer currently queued in @data. This function
869  * should be called with the @pads STREAM_LOCK held, such as in the callback
870  * handler.
871  *
872  * Returns: The buffer in @data or NULL if no buffer is queued.
873  *  should unref the buffer after usage.
874  *
875  * MT safe.
876  */
877 GstBuffer *
878 gst_collect_pads2_peek (GstCollectPads2 * pads, GstCollectData2 * data)
879 {
880   GstBuffer *result;
881
882   g_return_val_if_fail (pads != NULL, NULL);
883   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
884   g_return_val_if_fail (data != NULL, NULL);
885
886   if ((result = data->buffer))
887     gst_buffer_ref (result);
888
889   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
890       GST_DEBUG_PAD_NAME (data->pad), result);
891
892   return result;
893 }
894
895 /**
896  * gst_collect_pads2_pop:
897  * @pads: the collectspads to pop
898  * @data: the data to use
899  *
900  * Pop the buffer currently queued in @data. This function
901  * should be called with the @pads STREAM_LOCK held, such as in the callback
902  * handler.
903  *
904  * Returns: The buffer in @data or NULL if no buffer was queued.
905  *   You should unref the buffer after usage.
906  *
907  * MT safe.
908  */
909 GstBuffer *
910 gst_collect_pads2_pop (GstCollectPads2 * pads, GstCollectData2 * data)
911 {
912   GstBuffer *result;
913
914   g_return_val_if_fail (pads != NULL, NULL);
915   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
916   g_return_val_if_fail (data != NULL, NULL);
917
918   if ((result = data->buffer)) {
919     data->buffer = NULL;
920     data->pos = 0;
921     /* one less pad with queued data now */
922     if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING))
923       pads->queuedpads--;
924   }
925
926   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
927
928   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
929       GST_DEBUG_PAD_NAME (data->pad), result);
930
931   return result;
932 }
933
934 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
935  * held */
936 static void
937 gst_collect_pads2_clear (GstCollectPads2 * pads, GstCollectData2 * data)
938 {
939   GstBuffer *buf;
940
941   if ((buf = gst_collect_pads2_pop (pads, data)))
942     gst_buffer_unref (buf);
943 }
944
945 /**
946  * gst_collect_pads2_available:
947  * @pads: the collectspads to query
948  *
949  * Query how much bytes can be read from each queued buffer. This means
950  * that the result of this call is the maximum number of bytes that can
951  * be read from each of the pads.
952  *
953  * This function should be called with @pads STREAM_LOCK held, such as
954  * in the callback.
955  *
956  * Returns: The maximum number of bytes queued on all pads. This function
957  * returns 0 if a pad has no queued buffer.
958  *
959  * MT safe.
960  */
961 /* we might pre-calculate this in some struct field,
962  * but would then have to maintain this in _chain and particularly _pop, etc,
963  * even if element is never interested in this information */
964 guint
965 gst_collect_pads2_available (GstCollectPads2 * pads)
966 {
967   GSList *collected;
968   guint result = G_MAXUINT;
969
970   g_return_val_if_fail (pads != NULL, 0);
971   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
972
973   collected = pads->data;
974   for (; collected; collected = g_slist_next (collected)) {
975     GstCollectData2 *pdata;
976     GstBuffer *buffer;
977     gint size;
978
979     pdata = (GstCollectData2 *) collected->data;
980
981     /* ignore pad with EOS */
982     if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (pdata,
983                 GST_COLLECT_PADS2_STATE_EOS))) {
984       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
985       continue;
986     }
987
988     /* an empty buffer without EOS is weird when we get here.. */
989     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
990       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
991       goto not_filled;
992     }
993
994     /* this is the size left of the buffer */
995     size = GST_BUFFER_SIZE (buffer) - pdata->pos;
996     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
997
998     /* need to return the min of all available data */
999     if (size < result)
1000       result = size;
1001   }
1002   /* nothing changed, all must be EOS then, return 0 */
1003   if (G_UNLIKELY (result == G_MAXUINT))
1004     result = 0;
1005
1006   return result;
1007
1008 not_filled:
1009   {
1010     return 0;
1011   }
1012 }
1013
1014 /**
1015  * gst_collect_pads2_read:
1016  * @pads: the collectspads to query
1017  * @data: the data to use
1018  * @bytes: a pointer to a byte array
1019  * @size: the number of bytes to read
1020  *
1021  * Get a pointer in @bytes where @size bytes can be read from the
1022  * given pad data.
1023  *
1024  * This function should be called with @pads STREAM_LOCK held, such as
1025  * in the callback.
1026  *
1027  * Returns: The number of bytes available for consumption in the
1028  * memory pointed to by @bytes. This can be less than @size and
1029  * is 0 if the pad is end-of-stream.
1030  *
1031  * MT safe.
1032  */
1033 guint
1034 gst_collect_pads2_read (GstCollectPads2 * pads, GstCollectData2 * data,
1035     guint8 ** bytes, guint size)
1036 {
1037   guint readsize;
1038   GstBuffer *buffer;
1039
1040   g_return_val_if_fail (pads != NULL, 0);
1041   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
1042   g_return_val_if_fail (data != NULL, 0);
1043   g_return_val_if_fail (bytes != NULL, 0);
1044
1045   /* no buffer, must be EOS */
1046   if ((buffer = data->buffer) == NULL)
1047     return 0;
1048
1049   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1050
1051   *bytes = GST_BUFFER_DATA (buffer) + data->pos;
1052
1053   return readsize;
1054 }
1055
1056 /**
1057  * gst_collect_pads2_flush:
1058  * @pads: the collectspads to query
1059  * @data: the data to use
1060  * @size: the number of bytes to flush
1061  *
1062  * Flush @size bytes from the pad @data.
1063  *
1064  * This function should be called with @pads STREAM_LOCK held, such as
1065  * in the callback.
1066  *
1067  * Returns: The number of bytes flushed This can be less than @size and
1068  * is 0 if the pad was end-of-stream.
1069  *
1070  * MT safe.
1071  */
1072 guint
1073 gst_collect_pads2_flush (GstCollectPads2 * pads, GstCollectData2 * data,
1074     guint size)
1075 {
1076   guint flushsize;
1077   GstBuffer *buffer;
1078
1079   g_return_val_if_fail (pads != NULL, 0);
1080   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
1081   g_return_val_if_fail (data != NULL, 0);
1082
1083   /* no buffer, must be EOS */
1084   if ((buffer = data->buffer) == NULL)
1085     return 0;
1086
1087   /* this is what we can flush at max */
1088   flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1089
1090   data->pos += size;
1091
1092   if (data->pos >= GST_BUFFER_SIZE (buffer))
1093     /* _clear will also reset data->pos to 0 */
1094     gst_collect_pads2_clear (pads, data);
1095
1096   return flushsize;
1097 }
1098
1099 /**
1100  * gst_collect_pads2_read_buffer:
1101  * @pads: the collectspads to query
1102  * @data: the data to use
1103  * @size: the number of bytes to read
1104  *
1105  * Get a subbuffer of @size bytes from the given pad @data.
1106  *
1107  * This function should be called with @pads STREAM_LOCK held, such as in the
1108  * callback.
1109  *
1110  * Since: 0.10.18
1111  *
1112  * Returns: A sub buffer. The size of the buffer can be less that requested.
1113  * A return of NULL signals that the pad is end-of-stream.
1114  * Unref the buffer after use.
1115  *
1116  * MT safe.
1117  */
1118 GstBuffer *
1119 gst_collect_pads2_read_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
1120     guint size)
1121 {
1122   guint readsize;
1123   GstBuffer *buffer;
1124
1125   g_return_val_if_fail (pads != NULL, NULL);
1126   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
1127   g_return_val_if_fail (data != NULL, NULL);
1128
1129   /* no buffer, must be EOS */
1130   if ((buffer = data->buffer) == NULL)
1131     return NULL;
1132
1133   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1134
1135   return gst_buffer_create_sub (buffer, data->pos, readsize);
1136 }
1137
1138 /**
1139  * gst_collect_pads2_take_buffer:
1140  * @pads: the collectspads to query
1141  * @data: the data to use
1142  * @size: the number of bytes to read
1143  *
1144  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1145  * of read bytes.
1146  *
1147  * This function should be called with @pads STREAM_LOCK held, such as in the
1148  * callback.
1149  *
1150  * Since: 0.10.18
1151  *
1152  * Returns: A sub buffer. The size of the buffer can be less that requested.
1153  * A return of NULL signals that the pad is end-of-stream.
1154  * Unref the buffer after use.
1155  *
1156  * MT safe.
1157  */
1158 GstBuffer *
1159 gst_collect_pads2_take_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
1160     guint size)
1161 {
1162   GstBuffer *buffer = gst_collect_pads2_read_buffer (pads, data, size);
1163
1164   if (buffer) {
1165     gst_collect_pads2_flush (pads, data, GST_BUFFER_SIZE (buffer));
1166   }
1167   return buffer;
1168 }
1169
1170 /**
1171  * gst_collect_pads2_set_waiting:
1172  * @pads: the collectspads
1173  * @data: the data to use
1174  * @waiting: boolean indicating whether this pad should operate
1175  *           in waiting or non-waiting mode
1176  *
1177  * Sets a pad to waiting or non-waiting mode, if at least this pad
1178  * has not been created with locked waiting state,
1179  * in which case nothing happens.
1180  *
1181  * This function should be called with @pads STREAM_LOCK held, such as
1182  * in the callback.
1183  *
1184  * MT safe.
1185  */
1186 void
1187 gst_collect_pads2_set_waiting (GstCollectPads2 * pads, GstCollectData2 * data,
1188     gboolean waiting)
1189 {
1190   g_return_if_fail (pads != NULL);
1191   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
1192   g_return_if_fail (data != NULL);
1193
1194   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1195       GST_PAD_NAME (data->pad), waiting,
1196       GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_LOCKED));
1197
1198   /* Do something only on a change and if not locked */
1199   if (!GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_LOCKED) &&
1200       (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING) !=
1201           ! !waiting)) {
1202     /* Set waiting state for this pad */
1203     if (waiting)
1204       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_WAITING);
1205     else
1206       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_WAITING);
1207     /* Update number of queued pads if needed */
1208     if (!data->buffer &&
1209         !GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_EOS)) {
1210       if (waiting)
1211         pads->queuedpads--;
1212       else
1213         pads->queuedpads++;
1214     }
1215
1216     /* signal waiters because something changed */
1217     GST_COLLECT_PADS2_EVT_BROADCAST (pads);
1218   }
1219 }
1220
1221 /* see if pads were added or removed and update our stats. Any pad
1222  * added after releasing the LOCK will get collected in the next
1223  * round.
1224  *
1225  * We can do a quick check by checking the cookies, that get changed
1226  * whenever the pad list is updated.
1227  *
1228  * Must be called with STREAM_LOCK.
1229  */
1230 static void
1231 gst_collect_pads2_check_pads (GstCollectPads2 * pads)
1232 {
1233   /* the master list and cookie are protected with LOCK */
1234   GST_OBJECT_LOCK (pads);
1235   if (G_UNLIKELY (pads->pad_cookie != pads->cookie)) {
1236     GSList *collected;
1237
1238     /* clear list and stats */
1239     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1240     g_slist_free (pads->data);
1241     pads->data = NULL;
1242     pads->numpads = 0;
1243     pads->queuedpads = 0;
1244     pads->eospads = 0;
1245     if (pads->earliest_data)
1246       unref_data (pads->earliest_data);
1247     pads->earliest_data = NULL;
1248     pads->earliest_time = GST_CLOCK_TIME_NONE;
1249
1250     /* loop over the master pad list */
1251     collected = pads->pad_list;
1252     for (; collected; collected = g_slist_next (collected)) {
1253       GstCollectData2 *data;
1254
1255       /* update the stats */
1256       pads->numpads++;
1257       data = collected->data;
1258       if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_EOS))
1259         pads->eospads++;
1260       else if (data->buffer || !GST_COLLECT_PADS2_STATE_IS_SET (data,
1261               GST_COLLECT_PADS2_STATE_WAITING))
1262         pads->queuedpads++;
1263
1264       /* add to the list of pads to collect */
1265       ref_data (data);
1266       /* preserve order of adding/requesting pads */
1267       pads->data = g_slist_append (pads->data, data);
1268     }
1269     /* and update the cookie */
1270     pads->cookie = pads->pad_cookie;
1271   }
1272   GST_OBJECT_UNLOCK (pads);
1273 }
1274
1275 /* checks if all the pads are collected and call the collectfunction
1276  *
1277  * Should be called with STREAM_LOCK.
1278  *
1279  * Returns: The #GstFlowReturn of collection.
1280  */
1281 static GstFlowReturn
1282 gst_collect_pads2_check_collected (GstCollectPads2 * pads)
1283 {
1284   GstFlowReturn flow_ret = GST_FLOW_OK;
1285   GstCollectPads2Function func;
1286   gpointer user_data;
1287
1288   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
1289
1290   GST_OBJECT_LOCK (pads);
1291   func = pads->func;
1292   user_data = pads->user_data;
1293   GST_OBJECT_UNLOCK (pads);
1294
1295   g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
1296
1297   /* check for new pads, update stats etc.. */
1298   gst_collect_pads2_check_pads (pads);
1299
1300   if (G_UNLIKELY (pads->eospads == pads->numpads)) {
1301     /* If all our pads are EOS just collect once to let the element
1302      * do its final EOS handling. */
1303     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1304         pads->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1305
1306     flow_ret = func (pads, user_data);
1307   } else {
1308     gboolean collected = FALSE;
1309
1310     /* We call the collected function as long as our condition matches. */
1311     while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
1312       GST_DEBUG_OBJECT (pads, "All active pads (%d + %d >= %d) have data, "
1313           "calling %s", pads->queuedpads, pads->eospads, pads->numpads,
1314           GST_DEBUG_FUNCPTR_NAME (func));
1315
1316       flow_ret = func (pads, user_data);
1317       collected = TRUE;
1318
1319       /* break on error */
1320       if (flow_ret != GST_FLOW_OK)
1321         break;
1322       /* Don't keep looping after telling the element EOS or flushing */
1323       if (pads->queuedpads == 0)
1324         break;
1325     }
1326     if (!collected)
1327       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1328           pads->numpads);
1329   }
1330   return flow_ret;
1331 }
1332
1333
1334 /* General overview:
1335  * - only pad with a buffer can determine earliest_data (and earliest_time)
1336  * - only segment info determines (non-)waiting state
1337  * - ? perhaps use _stream_time for comparison
1338  *   (which muxers might have use as well ?)
1339  */
1340
1341 /*
1342  * Function to recalculate the waiting state of all pads.
1343  *
1344  * Must be called with STREAM_LOCK.
1345  *
1346  * Returns TRUE if a pad was set to waiting
1347  * (from non-waiting state).
1348  */
1349 static gboolean
1350 gst_collect_pads2_recalculate_waiting (GstCollectPads2 * pads)
1351 {
1352   GSList *collected;
1353   gboolean result = FALSE;
1354
1355   /* If earliest time is not known, there is nothing to do. */
1356   if (pads->earliest_data == NULL)
1357     return FALSE;
1358
1359   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1360     GstCollectData2 *data = (GstCollectData2 *) collected->data;
1361     int cmp_res;
1362
1363     /* check if pad has a segment */
1364     if (data->segment.format == GST_FORMAT_UNDEFINED)
1365       continue;
1366
1367     /* check segment format */
1368     if (data->segment.format != GST_FORMAT_TIME) {
1369       GST_ERROR_OBJECT (pads, "GstCollectPads2 can handle only time segments.");
1370       continue;
1371     }
1372
1373     /* check if the waiting state should be changed */
1374     cmp_res = pads->compare_func (pads, data, data->segment.start,
1375         pads->earliest_data, pads->earliest_time, pads->compare_user_data);
1376     if (cmp_res > 0)
1377       /* stop waiting */
1378       gst_collect_pads2_set_waiting (pads, data, FALSE);
1379     else {
1380       if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1381               GST_COLLECT_PADS2_STATE_WAITING)) {
1382         /* start waiting */
1383         gst_collect_pads2_set_waiting (pads, data, TRUE);
1384         result = TRUE;
1385       }
1386     }
1387   }
1388
1389   return result;
1390 }
1391
1392 /**
1393  * gst_collect_pads2_find_best_pad:
1394  * @pads: the collectpads to use
1395  * @data: returns the collectdata for earliest data
1396  * @time: returns the earliest available buffertime
1397  *
1398  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1399  * and return the corresponding #GstCollectData2 and buffertime.
1400  *
1401  * This function should be called with STREAM_LOCK held,
1402  * such as in the callback.
1403  */
1404 static void
1405 gst_collect_pads2_find_best_pad (GstCollectPads2 * pads,
1406     GstCollectData2 ** data, GstClockTime * time)
1407 {
1408   GSList *collected;
1409   GstCollectData2 *best = NULL;
1410   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1411
1412   g_return_if_fail (data != NULL);
1413   g_return_if_fail (time != NULL);
1414
1415   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1416     GstBuffer *buffer;
1417     GstCollectData2 *data = (GstCollectData2 *) collected->data;
1418     GstClockTime timestamp;
1419
1420     buffer = gst_collect_pads2_peek (pads, data);
1421     /* if we have a buffer check if it is better then the current best one */
1422     if (buffer != NULL) {
1423       timestamp = GST_BUFFER_TIMESTAMP (buffer);
1424       gst_buffer_unref (buffer);
1425       if (best == NULL || pads->compare_func (pads, data, timestamp,
1426               best, best_time, pads->compare_user_data) < 0) {
1427         best = data;
1428         best_time = timestamp;
1429       }
1430     }
1431   }
1432
1433   /* set earliest time */
1434   *data = best;
1435   *time = best_time;
1436
1437   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1438       best ? GST_PAD_NAME (((GstCollectData2 *) best)->pad) : "(nil)",
1439       GST_TIME_ARGS (best_time));
1440 }
1441
1442 /*
1443  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1444  * gst_collect_pads2_recalculate_waiting
1445  *
1446  * Must be called with STREAM_LOCK.
1447  */
1448 static gboolean
1449 gst_collect_pads2_recalculate_full (GstCollectPads2 * pads)
1450 {
1451   if (pads->earliest_data)
1452     unref_data (pads->earliest_data);
1453   gst_collect_pads2_find_best_pad (pads, &pads->earliest_data,
1454       &pads->earliest_time);
1455   if (pads->earliest_data)
1456     ref_data (pads->earliest_data);
1457   return gst_collect_pads2_recalculate_waiting (pads);
1458 }
1459
1460 /*
1461  * Default collect callback triggered when #GstCollectPads2 gathered all data.
1462  *
1463  * Called with STREAM_LOCK.
1464  */
1465 static GstFlowReturn
1466 gst_collect_pads2_default_collected (GstCollectPads2 * pads, gpointer user_data)
1467 {
1468   GstCollectData2 *best = NULL;
1469   GstBuffer *buffer;
1470   GstFlowReturn ret = GST_FLOW_OK;
1471   GstCollectPads2BufferFunction func;
1472   gpointer buffer_user_data;
1473
1474   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
1475
1476   GST_OBJECT_LOCK (pads);
1477   func = pads->buffer_func;
1478   buffer_user_data = pads->buffer_user_data;
1479   GST_OBJECT_UNLOCK (pads);
1480
1481   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1482
1483   /* Find the oldest pad at all cost */
1484   if (gst_collect_pads2_recalculate_full (pads)) {
1485     /* waiting was switched on,
1486      * so give another thread a chance to deliver a possibly
1487      * older buffer; don't charge on yet with the current oldest */
1488     ret = GST_FLOW_OK;
1489   }
1490
1491   best = pads->earliest_data;
1492
1493   /* No data collected means EOS. */
1494   if (G_UNLIKELY (best == NULL)) {
1495     ret = func (pads, best, NULL, buffer_user_data);
1496     if (ret == GST_FLOW_OK)
1497       ret = GST_FLOW_UNEXPECTED;
1498     goto done;
1499   }
1500
1501   /* make sure that the pad we take a buffer from is waiting;
1502    * otherwise popping a buffer will seem not to have happened
1503    * and collectpads can get into a busy loop */
1504   gst_collect_pads2_set_waiting (pads, best, TRUE);
1505
1506   /* Send buffer */
1507   buffer = gst_collect_pads2_pop (pads, best);
1508   ret = func (pads, best, buffer, buffer_user_data);
1509
1510 done:
1511   return ret;
1512 }
1513
1514 /*
1515  * Default timestamp compare function.
1516  */
1517 static gint
1518 gst_collect_pads2_default_compare_func (GstCollectPads2 * pads,
1519     GstCollectData2 * data1, GstClockTime timestamp1,
1520     GstCollectData2 * data2, GstClockTime timestamp2, gpointer user_data)
1521 {
1522
1523   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1524       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1525       GST_TIME_ARGS (timestamp2));
1526   /* non-valid timestamps go first as they are probably headers or so */
1527   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1528     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1529
1530   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1531     return 1;
1532
1533   /* compare timestamp */
1534   if (timestamp1 < timestamp2)
1535     return -1;
1536
1537   if (timestamp1 > timestamp2)
1538     return 1;
1539
1540   return 0;
1541 }
1542
1543 static gboolean
1544 gst_collect_pads2_event (GstPad * pad, GstEvent * event)
1545 {
1546   gboolean res = FALSE, need_unlock = FALSE;
1547   GstCollectData2 *data;
1548   GstCollectPads2 *pads;
1549   GstCollectPads2EventFunction event_func;
1550   GstCollectPads2BufferFunction buffer_func;
1551   gpointer event_user_data;
1552
1553   /* some magic to get the managing collect_pads2 */
1554   GST_OBJECT_LOCK (pad);
1555   data = (GstCollectData2 *) gst_pad_get_element_private (pad);
1556   if (G_UNLIKELY (data == NULL))
1557     goto pad_removed;
1558   ref_data (data);
1559   GST_OBJECT_UNLOCK (pad);
1560
1561   res = FALSE;
1562
1563   pads = data->collect;
1564
1565   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
1566       GST_DEBUG_PAD_NAME (data->pad));
1567
1568   GST_OBJECT_LOCK (pads);
1569   event_func = pads->event_func;
1570   event_user_data = pads->event_user_data;
1571   buffer_func = pads->buffer_func;
1572   GST_OBJECT_UNLOCK (pads);
1573
1574   switch (GST_EVENT_TYPE (event)) {
1575     case GST_EVENT_FLUSH_START:
1576     {
1577       /* forward event to unblock check_collected */
1578       if (event_func)
1579         res = event_func (pads, data, event, event_user_data);
1580       if (!res)
1581         res = gst_pad_event_default (pad, event);
1582
1583       /* now unblock the chain function.
1584        * no cond per pad, so they all unblock, 
1585        * non-flushing block again */
1586       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1587       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_FLUSHING);
1588       gst_collect_pads2_clear (pads, data);
1589
1590       /* cater for possible default muxing functionality */
1591       if (buffer_func) {
1592         /* restore to initial state */
1593         gst_collect_pads2_set_waiting (pads, data, TRUE);
1594         /* if the current pad is affected, reset state, recalculate later */
1595         if (pads->earliest_data == data) {
1596           unref_data (data);
1597           pads->earliest_data = NULL;
1598           pads->earliest_time = GST_CLOCK_TIME_NONE;
1599         }
1600       }
1601
1602       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1603
1604       /* event already cleaned up by forwarding */
1605       res = TRUE;
1606       goto done;
1607     }
1608     case GST_EVENT_FLUSH_STOP:
1609     {
1610       /* flush the 1 buffer queue */
1611       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1612       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_FLUSHING);
1613       gst_collect_pads2_clear (pads, data);
1614       /* we need new segment info after the flush */
1615       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1616       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
1617       /* if the pad was EOS, remove the EOS flag and
1618        * decrement the number of eospads */
1619       if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1620                   GST_COLLECT_PADS2_STATE_EOS))) {
1621         if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1622                 GST_COLLECT_PADS2_STATE_WAITING))
1623           pads->queuedpads++;
1624         pads->eospads--;
1625         GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_EOS);
1626       }
1627       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1628
1629       /* forward event */
1630       goto forward_or_default;
1631     }
1632     case GST_EVENT_EOS:
1633     {
1634       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1635       /* if the pad was not EOS, make it EOS and so we
1636        * have one more eospad */
1637       if (G_LIKELY (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1638                   GST_COLLECT_PADS2_STATE_EOS))) {
1639         GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_EOS);
1640         if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1641                 GST_COLLECT_PADS2_STATE_WAITING))
1642           pads->queuedpads--;
1643         pads->eospads++;
1644       }
1645       /* check if we need collecting anything, we ignore the result. */
1646       gst_collect_pads2_check_collected (pads);
1647       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1648
1649       goto forward_or_eat;
1650     }
1651     case GST_EVENT_NEWSEGMENT:
1652     {
1653       gint64 start, stop, time;
1654       gdouble rate, arate;
1655       GstFormat format;
1656       gboolean update;
1657       gint cmp_res;
1658
1659       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1660
1661       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1662           &start, &stop, &time);
1663
1664       GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
1665           ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
1666           GST_TIME_ARGS (stop));
1667
1668       gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
1669           format, start, stop, time);
1670
1671       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
1672
1673       /* default muxing functionality */
1674       if (!buffer_func)
1675         goto newsegment_done;
1676
1677       /* default collection can not handle other segment formats than time */
1678       if (format != GST_FORMAT_TIME) {
1679         GST_ERROR_OBJECT (pads, "GstCollectPads2 default collecting "
1680             "can only handle time segments.");
1681         goto newsegment_done;
1682       }
1683
1684       /* If oldest time is not known, or current pad got newsegment;
1685        * recalculate the state */
1686       if (!pads->earliest_data || pads->earliest_data == data) {
1687         gst_collect_pads2_recalculate_full (pads);
1688         goto newsegment_done;
1689       }
1690
1691       /* Check if the waiting state of the pad should change. */
1692       cmp_res = pads->compare_func (pads, data, start, pads->earliest_data,
1693           pads->earliest_time, pads->compare_user_data);
1694
1695       if (cmp_res > 0)
1696         /* Stop waiting */
1697         gst_collect_pads2_set_waiting (pads, data, FALSE);
1698
1699     newsegment_done:
1700       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1701       /* we must not forward this event since multiple segments will be
1702        * accumulated and this is certainly not what we want. */
1703       goto forward_or_eat;
1704     }
1705     default:
1706       /* forward other events */
1707       goto forward_or_default;
1708   }
1709
1710 forward_or_default:
1711   if (GST_EVENT_IS_SERIALIZED (event)) {
1712     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1713     need_unlock = TRUE;
1714   }
1715   if (event_func)
1716     res = event_func (pads, data, event, event_user_data);
1717   if (!res)
1718     res = gst_pad_event_default (pad, event);
1719   if (need_unlock)
1720     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1721   goto done;
1722
1723 forward_or_eat:
1724   if (GST_EVENT_IS_SERIALIZED (event)) {
1725     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1726     need_unlock = TRUE;
1727   }
1728   if (event_func)
1729     res = event_func (pads, data, event, event_user_data);
1730   if (!res) {
1731     gst_event_unref (event);
1732     res = TRUE;
1733   }
1734   if (need_unlock)
1735     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1736   goto done;
1737
1738 done:
1739   unref_data (data);
1740   return res;
1741
1742   /* ERRORS */
1743 pad_removed:
1744   {
1745     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1746     GST_OBJECT_UNLOCK (pad);
1747     return FALSE;
1748   }
1749 }
1750
1751 /* For each buffer we receive we check if our collected condition is reached
1752  * and if so we call the collected function. When this is done we check if
1753  * data has been unqueued. If data is still queued we wait holding the stream
1754  * lock to make sure no EOS event can happen while we are ready to be
1755  * collected 
1756  */
1757 static GstFlowReturn
1758 gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer)
1759 {
1760   GstCollectData2 *data;
1761   GstCollectPads2 *pads;
1762   GstFlowReturn ret;
1763   GstBuffer **buffer_p;
1764   guint32 cookie;
1765   GstCollectPads2BufferFunction prepare_buffer_func;
1766   gpointer prepare_buffer_user_data;
1767
1768   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1769
1770   /* some magic to get the managing collect_pads2 */
1771   GST_OBJECT_LOCK (pad);
1772   data = (GstCollectData2 *) gst_pad_get_element_private (pad);
1773   if (G_UNLIKELY (data == NULL))
1774     goto no_data;
1775   ref_data (data);
1776   GST_OBJECT_UNLOCK (pad);
1777
1778   pads = data->collect;
1779   GST_OBJECT_LOCK (pads);
1780   prepare_buffer_func = pads->prepare_buffer_func;
1781   prepare_buffer_user_data = pads->prepare_buffer_user_data;
1782   GST_OBJECT_UNLOCK (pads);
1783
1784   GST_COLLECT_PADS2_STREAM_LOCK (pads);
1785   /* if not started, bail out */
1786   if (G_UNLIKELY (!pads->started))
1787     goto not_started;
1788   /* check if this pad is flushing */
1789   if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1790               GST_COLLECT_PADS2_STATE_FLUSHING)))
1791     goto flushing;
1792   /* pad was EOS, we can refuse this data */
1793   if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1794               GST_COLLECT_PADS2_STATE_EOS)))
1795     goto unexpected;
1796
1797   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
1798       GST_DEBUG_PAD_NAME (pad));
1799
1800   if (prepare_buffer_func) {
1801     ret = prepare_buffer_func (pads, data, buffer, prepare_buffer_user_data);
1802     if (ret == GST_COLLECT_PADS2_FLOW_DROP) {
1803       GST_DEBUG_OBJECT (pads, "Dropping buffer as requested");
1804       ret = GST_FLOW_OK;
1805       goto unlock_done;
1806     } else if (ret == GST_FLOW_UNEXPECTED) {
1807       goto unexpected;
1808     } else if (ret != GST_FLOW_OK) {
1809       goto error;
1810     }
1811   }
1812
1813   /* One more pad has data queued */
1814   if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING))
1815     pads->queuedpads++;
1816   buffer_p = &data->buffer;
1817   gst_buffer_replace (buffer_p, buffer);
1818
1819   /* update segment last position if in TIME */
1820   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1821     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
1822
1823     if (GST_CLOCK_TIME_IS_VALID (timestamp))
1824       gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
1825   }
1826
1827   /* While we have data queued on this pad try to collect stuff */
1828   do {
1829     /* Check if our collected condition is matched and call the collected
1830      * function if it is */
1831     ret = gst_collect_pads2_check_collected (pads);
1832     /* when an error occurs, we want to report this back to the caller ASAP
1833      * without having to block if the buffer was not popped */
1834     if (G_UNLIKELY (ret != GST_FLOW_OK))
1835       goto error;
1836
1837     /* data was consumed, we can exit and accept new data */
1838     if (data->buffer == NULL)
1839       break;
1840
1841     /* Having the _INIT here means we don't care about any broadcast up to here
1842      * (most of which occur with STREAM_LOCK held, so could not have happened
1843      * anyway).  We do care about e.g. a remove initiated broadcast as of this
1844      * point.  Putting it here also makes this thread ignores any evt it raised
1845      * itself (as is a usual WAIT semantic).
1846      */
1847     GST_COLLECT_PADS2_EVT_INIT (cookie);
1848
1849     /* pad could be removed and re-added */
1850     unref_data (data);
1851     GST_OBJECT_LOCK (pad);
1852     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
1853       goto pad_removed;
1854     ref_data (data);
1855     GST_OBJECT_UNLOCK (pad);
1856
1857     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
1858         GST_DEBUG_PAD_NAME (pad));
1859
1860     /* wait to be collected, this must happen from another thread triggered
1861      * by the _chain function of another pad. We release the lock so we
1862      * can get stopped or flushed as well. We can however not get EOS
1863      * because we still hold the STREAM_LOCK.
1864      */
1865     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1866     GST_COLLECT_PADS2_EVT_WAIT (pads, cookie);
1867     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1868
1869     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
1870
1871     /* after a signal, we could be stopped */
1872     if (G_UNLIKELY (!pads->started))
1873       goto not_started;
1874     /* check if this pad is flushing */
1875     if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1876                 GST_COLLECT_PADS2_STATE_FLUSHING)))
1877       goto flushing;
1878   }
1879   while (data->buffer != NULL);
1880
1881 unlock_done:
1882   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1883   unref_data (data);
1884   gst_buffer_unref (buffer);
1885   return ret;
1886
1887 pad_removed:
1888   {
1889     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1890     GST_OBJECT_UNLOCK (pad);
1891     ret = GST_FLOW_NOT_LINKED;
1892     goto unlock_done;
1893   }
1894   /* ERRORS */
1895 no_data:
1896   {
1897     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1898     GST_OBJECT_UNLOCK (pad);
1899     gst_buffer_unref (buffer);
1900     return GST_FLOW_NOT_LINKED;
1901   }
1902 not_started:
1903   {
1904     GST_DEBUG ("not started");
1905     gst_collect_pads2_clear (pads, data);
1906     ret = GST_FLOW_WRONG_STATE;
1907     goto unlock_done;
1908   }
1909 flushing:
1910   {
1911     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
1912     gst_collect_pads2_clear (pads, data);
1913     ret = GST_FLOW_WRONG_STATE;
1914     goto unlock_done;
1915   }
1916 unexpected:
1917   {
1918     /* we should not post an error for this, just inform upstream that
1919      * we don't expect anything anymore */
1920     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
1921     ret = GST_FLOW_UNEXPECTED;
1922     goto unlock_done;
1923   }
1924 error:
1925   {
1926     /* we print the error, the element should post a reasonable error
1927      * message for fatal errors */
1928     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
1929     gst_collect_pads2_clear (pads, data);
1930     goto unlock_done;
1931   }
1932 }