Update for new gthread API
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads2.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads2.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23 /**
24  * SECTION:gstcollectpads2
25  * @short_description: manages a set of pads that operate in collect mode
26  * @see_also:
27  *
28  * Manages a set of pads that operate in collect mode. This means that control
29  * is given to the manager of this object when all pads have data.
30  * <itemizedlist>
31  *   <listitem><para>
32  *     Collectpads are created with gst_collect_pads2_new(). A callback should then
33  *     be installed with gst_collect_pads2_set_function ().
34  *   </para></listitem>
35  *   <listitem><para>
36  *     Pads are added to the collection with gst_collect_pads2_add_pad()/
37  *     gst_collect_pads2_remove_pad(). The pad
38  *     has to be a sinkpad. The chain and event functions of the pad are
39  *     overridden. The element_private of the pad is used to store
40  *     private information for the collectpads.
41  *   </para></listitem>
42  *   <listitem><para>
43  *     For each pad, data is queued in the _chain function or by
44  *     performing a pull_range.
45  *   </para></listitem>
46  *   <listitem><para>
47  *     When data is queued on all pads in waiting mode, the callback function is called.
48  *   </para></listitem>
49  *   <listitem><para>
50  *     Data can be dequeued from the pad with the gst_collect_pads2_pop() method.
51  *     One can peek at the data with the gst_collect_pads2_peek() function.
52  *     These functions will return NULL if the pad received an EOS event. When all
53  *     pads return NULL from a gst_collect_pads2_peek(), the element can emit an EOS
54  *     event itself.
55  *   </para></listitem>
56  *   <listitem><para>
57  *     Data can also be dequeued in byte units using the gst_collect_pads2_available(),
58  *     gst_collect_pads2_read() and gst_collect_pads2_flush() calls.
59  *   </para></listitem>
60  *   <listitem><para>
61  *     Elements should call gst_collect_pads2_start() and gst_collect_pads2_stop() in
62  *     their state change functions to start and stop the processing of the collectpads.
63  *     The gst_collect_pads2_stop() call should be called before calling the parent
64  *     element state change function in the PAUSED_TO_READY state change to ensure
65  *     no pad is blocked and the element can finish streaming.
66  *   </para></listitem>
67  *   <listitem><para>
68  *     gst_collect_pads2_collect() and gst_collect_pads2_collect_range() can be used by
69  *     elements that start a #GstTask to drive the collect_pads2. This feature is however
70  *     not yet implemented.
71  *   </para></listitem>
72  *   <listitem><para>
73  *     gst_collect_pads2_set_waiting() sets a pad to waiting or non-waiting mode.
74  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
75  *     Thus these pads may but need not have data when the callback is called.
76  *     All pads are in waiting mode by default.
77  *   </para></listitem>
78  * </itemizedlist>
79  *
80  * Last reviewed on 2011-10-28 (0.10.36)
81  *
82  * Since: 0.10.36
83  */
84
85 #ifdef HAVE_CONFIG_H
86 #  include "config.h"
87 #endif
88
89 #include <gst/gst_private.h>
90
91 #include "gstcollectpads2.h"
92
93 #include "../../../gst/glib-compat-private.h"
94
95 GST_DEBUG_CATEGORY_STATIC (collect_pads2_debug);
96 #define GST_CAT_DEFAULT collect_pads2_debug
97
98 #define parent_class gst_collect_pads2_parent_class
99 G_DEFINE_TYPE (GstCollectPads2, gst_collect_pads2, GST_TYPE_OBJECT);
100
101 static void gst_collect_pads2_clear (GstCollectPads2 * pads,
102     GstCollectData2 * data);
103 static GstFlowReturn gst_collect_pads2_chain (GstPad * pad, GstObject * parent,
104     GstBuffer * buffer);
105 static gboolean gst_collect_pads2_event (GstPad * pad, GstObject * parent,
106     GstEvent * event);
107 static void gst_collect_pads2_finalize (GObject * object);
108 static GstFlowReturn gst_collect_pads2_default_collected (GstCollectPads2 *
109     pads, gpointer user_data);
110 static gint gst_collect_pads2_default_compare_func (GstCollectPads2 * pads,
111     GstCollectData2 * data1, GstClockTime timestamp1, GstCollectData2 * data2,
112     GstClockTime timestamp2, gpointer user_data);
113 static gboolean gst_collect_pads2_recalculate_full (GstCollectPads2 * pads);
114 static void ref_data (GstCollectData2 * data);
115 static void unref_data (GstCollectData2 * data);
116
117 /* Some properties are protected by LOCK, others by STREAM_LOCK
118  * However, manipulating either of these partitions may require
119  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
120  * Alternative implementations are possible, e.g. some low-level re-implementing
121  * of the 2 above locks to drop both of them atomically when going into _WAIT.
122  */
123 #define GST_COLLECT_PADS2_GET_EVT_COND(pads) (&((GstCollectPads2 *)pads)->evt_cond)
124 #define GST_COLLECT_PADS2_GET_EVT_LOCK(pads) (&((GstCollectPads2 *)pads)->evt_lock)
125 #define GST_COLLECT_PADS2_EVT_WAIT(pads, cookie) G_STMT_START {    \
126   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
127   /* should work unless a lot of event'ing and thread starvation */\
128   while (cookie == ((GstCollectPads2 *) pads)->evt_cookie)         \
129     g_cond_wait (GST_COLLECT_PADS2_GET_EVT_COND (pads),            \
130         GST_COLLECT_PADS2_GET_EVT_LOCK (pads));                    \
131   cookie = ((GstCollectPads2 *) pads)->evt_cookie;                 \
132   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
133 } G_STMT_END
134 #define GST_COLLECT_PADS2_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
135   GTimeVal __tv; \
136   \
137   g_get_current_time (&tv); \
138   g_time_val_add (&tv, timeout); \
139   \
140   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
141   /* should work unless a lot of event'ing and thread starvation */\
142   while (cookie == ((GstCollectPads2 *) pads)->evt_cookie)         \
143     g_cond_timed_wait (GST_COLLECT_PADS2_GET_EVT_COND (pads),            \
144         GST_COLLECT_PADS2_GET_EVT_LOCK (pads), &tv);                    \
145   cookie = ((GstCollectPads2 *) pads)->evt_cookie;                 \
146   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
147 } G_STMT_END
148 #define GST_COLLECT_PADS2_EVT_BROADCAST(pads) G_STMT_START {       \
149   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
150   /* never mind wrap-around */                                     \
151   ++(((GstCollectPads2 *) pads)->evt_cookie);                      \
152   g_cond_broadcast (GST_COLLECT_PADS2_GET_EVT_COND (pads));        \
153   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
154 } G_STMT_END
155 #define GST_COLLECT_PADS2_EVT_INIT(cookie) G_STMT_START {          \
156   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
157   cookie = ((GstCollectPads2 *) pads)->evt_cookie;                 \
158   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
159 } G_STMT_END
160
161 static void
162 gst_collect_pads2_class_init (GstCollectPads2Class * klass)
163 {
164   GObjectClass *gobject_class = (GObjectClass *) klass;
165
166   GST_DEBUG_CATEGORY_INIT (collect_pads2_debug, "collectpads2", 0,
167       "GstCollectPads2");
168
169   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads2_finalize);
170 }
171
172 static void
173 gst_collect_pads2_init (GstCollectPads2 * pads)
174 {
175   pads->data = NULL;
176   pads->cookie = 0;
177   pads->numpads = 0;
178   pads->queuedpads = 0;
179   pads->eospads = 0;
180   pads->started = FALSE;
181
182   g_rec_mutex_init (&pads->stream_lock);
183
184   pads->func = gst_collect_pads2_default_collected;
185   pads->user_data = NULL;
186   pads->event_func = NULL;
187   pads->event_user_data = NULL;
188
189   /* members for default muxing */
190   pads->buffer_func = NULL;
191   pads->buffer_user_data = NULL;
192   pads->compare_func = gst_collect_pads2_default_compare_func;
193   pads->compare_user_data = NULL;
194   pads->earliest_data = NULL;
195   pads->earliest_time = GST_CLOCK_TIME_NONE;
196
197   /* members to manage the pad list */
198   pads->pad_cookie = 0;
199   pads->pad_list = NULL;
200
201   /* members for event */
202   g_mutex_init (&pads->evt_lock);
203   g_cond_init (&pads->evt_cond);
204   pads->evt_cookie = 0;
205 }
206
207 static void
208 gst_collect_pads2_finalize (GObject * object)
209 {
210   GstCollectPads2 *pads = GST_COLLECT_PADS2 (object);
211
212   GST_DEBUG_OBJECT (object, "finalize");
213
214   g_rec_mutex_clear (&pads->stream_lock);
215
216   g_cond_clear (&pads->evt_cond);
217   g_mutex_clear (&pads->evt_lock);
218
219   /* Remove pads and free pads list */
220   g_slist_foreach (pads->pad_list, (GFunc) unref_data, NULL);
221   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
222   g_slist_free (pads->data);
223   g_slist_free (pads->pad_list);
224
225   G_OBJECT_CLASS (parent_class)->finalize (object);
226 }
227
228 /**
229  * gst_collect_pads2_new:
230  *
231  * Create a new instance of #GstCollectsPads.
232  *
233  * Returns: a new #GstCollectPads2, or NULL in case of an error.
234  *
235  * MT safe.
236  *
237  * Since: 0.10.36
238  */
239 GstCollectPads2 *
240 gst_collect_pads2_new (void)
241 {
242   GstCollectPads2 *newcoll;
243
244   newcoll = g_object_new (GST_TYPE_COLLECT_PADS2, NULL);
245
246   return newcoll;
247 }
248
249 /* Must be called with GstObject lock! */
250 static void
251 gst_collect_pads2_set_buffer_function_locked (GstCollectPads2 * pads,
252     GstCollectPads2BufferFunction func, gpointer user_data)
253 {
254   pads->buffer_func = func;
255   pads->buffer_user_data = user_data;
256 }
257
258 /**
259  * gst_collect_pads2_set_buffer_function:
260  * @pads: the collectpads to use
261  * @func: the function to set
262  * @user_data: user data passed to the function
263  *
264  * Set the callback function and user data that will be called with
265  * the oldest buffer when all pads have been collected.
266  *
267  * MT safe.
268  *
269  * Since: 0.10.36
270  */
271 void
272 gst_collect_pads2_set_buffer_function (GstCollectPads2 * pads,
273     GstCollectPads2BufferFunction func, gpointer user_data)
274 {
275   g_return_if_fail (pads != NULL);
276   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
277
278   GST_OBJECT_LOCK (pads);
279   gst_collect_pads2_set_buffer_function_locked (pads, func, user_data);
280   GST_OBJECT_UNLOCK (pads);
281 }
282
283 /**
284  * gst_collect_pads2_set_compare_function:
285  * @pads: the pads to use
286  * @func: the function to set
287  * @user_data: user data passed to the function
288  *
289  * Set the timestamp comparison function.
290  *
291  * MT safe.
292  *
293  * Since: 0.10.36
294  */
295 /* NOTE allowing to change comparison seems not advisable;
296 no known use-case, and collaboration with default algorithm is unpredictable.
297 If custom compairing/operation is needed, just use a collect function of
298 your own */
299 void
300 gst_collect_pads2_set_compare_function (GstCollectPads2 * pads,
301     GstCollectPads2CompareFunction func, gpointer user_data)
302 {
303   g_return_if_fail (pads != NULL);
304   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
305
306   GST_OBJECT_LOCK (pads);
307   pads->compare_func = func;
308   pads->compare_user_data = user_data;
309   GST_OBJECT_UNLOCK (pads);
310 }
311
312 /**
313  * gst_collect_pads2_set_function:
314  * @pads: the collectspads to use
315  * @func: the function to set
316  * @user_data: user data passed to the function
317  *
318  * CollectPads provides a default collection algorithm that will determine
319  * the oldest buffer available on all of its pads, and then delegate
320  * to a configured callback.
321  * However, if circumstances are more complicated and/or more control
322  * is desired, this sets a callback that will be invoked instead when
323  * all the pads added to the collection have buffers queued.
324  * Evidently, this callback is not compatible with
325  * gst_collect_pads2_set_buffer_function() callback.
326  * If this callback is set, the former will be unset.
327  *
328  * MT safe.
329  *
330  * Since: 0.10.36
331  */
332 void
333 gst_collect_pads2_set_function (GstCollectPads2 * pads,
334     GstCollectPads2Function func, gpointer user_data)
335 {
336   g_return_if_fail (pads != NULL);
337   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
338
339   GST_OBJECT_LOCK (pads);
340   pads->func = func;
341   pads->user_data = user_data;
342   gst_collect_pads2_set_buffer_function_locked (pads, NULL, NULL);
343   GST_OBJECT_UNLOCK (pads);
344 }
345
346 static void
347 ref_data (GstCollectData2 * data)
348 {
349   g_assert (data != NULL);
350
351   g_atomic_int_inc (&(data->refcount));
352 }
353
354 static void
355 unref_data (GstCollectData2 * data)
356 {
357   g_assert (data != NULL);
358   g_assert (data->refcount > 0);
359
360   if (!g_atomic_int_dec_and_test (&(data->refcount)))
361     return;
362
363   if (data->destroy_notify)
364     data->destroy_notify (data);
365
366   g_object_unref (data->pad);
367   if (data->buffer) {
368     gst_buffer_unref (data->buffer);
369   }
370   g_free (data);
371 }
372
373 /**
374  * gst_collect_pads2_set_event_function:
375  * @pads: the collectspads to use
376  * @func: the function to set
377  * @user_data: user data passed to the function
378  *
379  * Set the event callback function and user data that will be called after
380  * collectpads has processed and event originating from one of the collected
381  * pads.  If the event being processed is a serialized one, this callback is
382  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
383  * held when calling a number of CollectPads functions, it should be acquired
384  * if so (unusually) needed.
385  *
386  * MT safe.
387  *
388  * Since: 0.10.36
389  */
390 void
391 gst_collect_pads2_set_event_function (GstCollectPads2 * pads,
392     GstCollectPads2EventFunction func, gpointer user_data)
393 {
394   g_return_if_fail (pads != NULL);
395   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
396
397   GST_OBJECT_LOCK (pads);
398   pads->event_func = func;
399   pads->event_user_data = user_data;
400   GST_OBJECT_UNLOCK (pads);
401 }
402
403 /**
404 * gst_collect_pads2_clip_running:
405 * @pads: the collectspads to use
406 * @cdata: collect data of corrsponding pad
407 * @buf: buffer being clipped
408 * @outbuf: output buffer with running time, or NULL if clipped
409 * @user_data: user data (unused)
410 *
411 * Convenience clipping function that converts incoming buffer's timestamp
412 * to running time, or clips the buffer if outside configured segment.
413 *
414 * Since: 0.10.37
415 */
416 GstFlowReturn
417 gst_collect_pads2_clip_running_time (GstCollectPads2 * pads,
418     GstCollectData2 * cdata, GstBuffer * buf, GstBuffer ** outbuf,
419     gpointer user_data)
420 {
421   GstClockTime time;
422
423   *outbuf = buf;
424   time = GST_BUFFER_TIMESTAMP (buf);
425
426   /* invalid left alone and passed */
427   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
428     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
429     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
430       GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
431       gst_buffer_unref (buf);
432       *outbuf = NULL;
433     } else {
434       GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
435           GST_TIME_FORMAT " running time",
436           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
437       *outbuf = gst_buffer_make_writable (buf);
438       GST_BUFFER_TIMESTAMP (*outbuf) = time;
439     }
440   }
441
442   return GST_FLOW_OK;
443 }
444
445  /**
446  * gst_collect_pads2_set_clip_function:
447  * @pads: the collectspads to use
448  * @clipfunc: clip function to install
449  * @user_data: user data to pass to @clip_func
450  *
451  * Install a clipping function that is called right after a buffer is received
452  * on a pad managed by @pads. See #GstCollectPad2ClipFunction for more info.
453  *
454  * Since: 0.10.36
455  */
456 void
457 gst_collect_pads2_set_clip_function (GstCollectPads2 * pads,
458     GstCollectPads2ClipFunction clipfunc, gpointer user_data)
459 {
460   g_return_if_fail (pads != NULL);
461   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
462
463   pads->clip_func = clipfunc;
464   pads->clip_user_data = user_data;
465 }
466
467 /**
468  * gst_collect_pads2_add_pad:
469  * @pads: the collectspads to use
470  * @pad: the pad to add
471  * @size: the size of the returned #GstCollectData2 structure
472  *
473  * Add a pad to the collection of collect pads. The pad has to be
474  * a sinkpad. The refcount of the pad is incremented. Use
475  * gst_collect_pads2_remove_pad() to remove the pad from the collection
476  * again.
477  *
478  * You specify a size for the returned #GstCollectData2 structure
479  * so that you can use it to store additional information.
480  *
481  * The pad will be automatically activated in push mode when @pads is
482  * started.
483  *
484  * This function calls gst_collect_pads2_add_pad() passing a value of NULL
485  * for destroy_notify and TRUE for locked.
486  *
487  * Returns: a new #GstCollectData2 to identify the new pad. Or NULL
488  *   if wrong parameters are supplied.
489  *
490  * MT safe.
491  *
492  * Since: 0.10.36
493  */
494 GstCollectData2 *
495 gst_collect_pads2_add_pad (GstCollectPads2 * pads, GstPad * pad, guint size)
496 {
497   return gst_collect_pads2_add_pad_full (pads, pad, size, NULL, TRUE);
498 }
499
500 /**
501  * gst_collect_pads2_add_pad_full:
502  * @pads: the collectspads to use
503  * @pad: the pad to add
504  * @size: the size of the returned #GstCollectData2 structure
505  * @destroy_notify: function to be called before the returned #GstCollectData2
506  * structure is freed
507  * @lock: whether to lock this pad in usual waiting state
508  *
509  * Add a pad to the collection of collect pads. The pad has to be
510  * a sinkpad. The refcount of the pad is incremented. Use
511  * gst_collect_pads2_remove_pad() to remove the pad from the collection
512  * again.
513  *
514  * You specify a size for the returned #GstCollectData2 structure
515  * so that you can use it to store additional information.
516  *
517  * You can also specify a #GstCollectData2DestroyNotify that will be called
518  * just before the #GstCollectData2 structure is freed. It is passed the
519  * pointer to the structure and should free any custom memory and resources
520  * allocated for it.
521  *
522  * Keeping a pad locked in waiting state is only relevant when using
523  * the default collection algorithm (providing the oldest buffer).
524  * It ensures a buffer must be available on this pad for a collection
525  * to take place.  This is of typical use to a muxer element where
526  * non-subtitle streams should always be in waiting state,
527  * e.g. to assure that caps information is available on all these streams
528  * when initial headers have to be written.
529  *
530  * The pad will be automatically activated in push mode when @pads is
531  * started.
532  *
533  * Since: 0.10.36
534  *
535  * Returns: a new #GstCollectData2 to identify the new pad. Or NULL
536  *   if wrong parameters are supplied.
537  *
538  * MT safe.
539  */
540 GstCollectData2 *
541 gst_collect_pads2_add_pad_full (GstCollectPads2 * pads, GstPad * pad,
542     guint size, GstCollectData2DestroyNotify destroy_notify, gboolean lock)
543 {
544   GstCollectData2 *data;
545
546   g_return_val_if_fail (pads != NULL, NULL);
547   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
548   g_return_val_if_fail (pad != NULL, NULL);
549   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
550   g_return_val_if_fail (size >= sizeof (GstCollectData2), NULL);
551
552   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
553
554   data = g_malloc0 (size);
555   data->collect = pads;
556   data->pad = gst_object_ref (pad);
557   data->buffer = NULL;
558   data->pos = 0;
559   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
560   data->state = GST_COLLECT_PADS2_STATE_WAITING;
561   data->state |= lock ? GST_COLLECT_PADS2_STATE_LOCKED : 0;
562   data->refcount = 1;
563   data->destroy_notify = destroy_notify;
564
565   GST_OBJECT_LOCK (pads);
566   GST_OBJECT_LOCK (pad);
567   gst_pad_set_element_private (pad, data);
568   GST_OBJECT_UNLOCK (pad);
569   pads->pad_list = g_slist_append (pads->pad_list, data);
570   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_chain));
571   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_event));
572   /* backward compat, also add to data if stopped, so that the element already
573    * has this in the public data list before going PAUSED (typically)
574    * this can only be done when we are stopped because we don't take the
575    * STREAM_LOCK to protect the pads->data list. */
576   if (!pads->started) {
577     pads->data = g_slist_append (pads->data, data);
578     ref_data (data);
579   }
580   /* activate the pad when needed */
581   if (pads->started)
582     gst_pad_set_active (pad, TRUE);
583   pads->pad_cookie++;
584   GST_OBJECT_UNLOCK (pads);
585
586   return data;
587 }
588
589 static gint
590 find_pad (GstCollectData2 * data, GstPad * pad)
591 {
592   if (data->pad == pad)
593     return 0;
594   return 1;
595 }
596
597 /**
598  * gst_collect_pads2_remove_pad:
599  * @pads: the collectspads to use
600  * @pad: the pad to remove
601  *
602  * Remove a pad from the collection of collect pads. This function will also
603  * free the #GstCollectData2 and all the resources that were allocated with
604  * gst_collect_pads2_add_pad().
605  *
606  * The pad will be deactivated automatically when @pads is stopped.
607  *
608  * Returns: %TRUE if the pad could be removed.
609  *
610  * MT safe.
611  *
612  * Since: 0.10.36
613  */
614 gboolean
615 gst_collect_pads2_remove_pad (GstCollectPads2 * pads, GstPad * pad)
616 {
617   GstCollectData2 *data;
618   GSList *list;
619
620   g_return_val_if_fail (pads != NULL, FALSE);
621   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), FALSE);
622   g_return_val_if_fail (pad != NULL, FALSE);
623   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
624
625   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
626
627   GST_OBJECT_LOCK (pads);
628   list = g_slist_find_custom (pads->pad_list, pad, (GCompareFunc) find_pad);
629   if (!list)
630     goto unknown_pad;
631
632   data = (GstCollectData2 *) list->data;
633
634   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
635       data);
636
637   /* clear the stuff we configured */
638   gst_pad_set_chain_function (pad, NULL);
639   gst_pad_set_event_function (pad, NULL);
640   GST_OBJECT_LOCK (pad);
641   gst_pad_set_element_private (pad, NULL);
642   GST_OBJECT_UNLOCK (pad);
643
644   /* backward compat, also remove from data if stopped, note that this function
645    * can only be called when we are stopped because we don't take the
646    * STREAM_LOCK to protect the pads->data list. */
647   if (!pads->started) {
648     GSList *dlist;
649
650     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
651     if (dlist) {
652       GstCollectData2 *pdata = dlist->data;
653
654       pads->data = g_slist_delete_link (pads->data, dlist);
655       unref_data (pdata);
656     }
657   }
658   /* remove from the pad list */
659   pads->pad_list = g_slist_delete_link (pads->pad_list, list);
660   pads->pad_cookie++;
661
662   /* signal waiters because something changed */
663   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
664
665   /* deactivate the pad when needed */
666   if (!pads->started)
667     gst_pad_set_active (pad, FALSE);
668
669   /* clean and free the collect data */
670   unref_data (data);
671
672   GST_OBJECT_UNLOCK (pads);
673
674   return TRUE;
675
676 unknown_pad:
677   {
678     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
679         GST_DEBUG_PAD_NAME (pad));
680     GST_OBJECT_UNLOCK (pads);
681     return FALSE;
682   }
683 }
684
685 /**
686  * gst_collect_pads2_is_active:
687  * @pads: the collectspads to use
688  * @pad: the pad to check
689  *
690  * Check if a pad is active.
691  *
692  * This function is currently not implemented.
693  *
694  * Returns: %TRUE if the pad is active.
695  *
696  * MT safe.
697  *
698  * Since: 0.10.36
699  */
700 gboolean
701 gst_collect_pads2_is_active (GstCollectPads2 * pads, GstPad * pad)
702 {
703   g_return_val_if_fail (pads != NULL, FALSE);
704   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), FALSE);
705   g_return_val_if_fail (pad != NULL, FALSE);
706   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
707
708   g_warning ("gst_collect_pads2_is_active() is not implemented");
709
710   return FALSE;
711 }
712
713 /**
714  * gst_collect_pads2_collect:
715  * @pads: the collectspads to use
716  *
717  * Collect data on all pads. This function is usually called
718  * from a #GstTask function in an element. 
719  *
720  * This function is currently not implemented.
721  *
722  * Returns: #GstFlowReturn of the operation.
723  *
724  * MT safe.
725  *
726  * Since: 0.10.36
727  */
728 GstFlowReturn
729 gst_collect_pads2_collect (GstCollectPads2 * pads)
730 {
731   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
732   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
733
734   g_warning ("gst_collect_pads2_collect() is not implemented");
735
736   return GST_FLOW_NOT_SUPPORTED;
737 }
738
739 /**
740  * gst_collect_pads2_collect_range:
741  * @pads: the collectspads to use
742  * @offset: the offset to collect
743  * @length: the length to collect
744  *
745  * Collect data with @offset and @length on all pads. This function
746  * is typically called in the getrange function of an element. 
747  *
748  * This function is currently not implemented.
749  *
750  * Returns: #GstFlowReturn of the operation.
751  *
752  * MT safe.
753  *
754  * Since: 0.10.36
755  */
756 GstFlowReturn
757 gst_collect_pads2_collect_range (GstCollectPads2 * pads, guint64 offset,
758     guint length)
759 {
760   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
761   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
762
763   g_warning ("gst_collect_pads2_collect_range() is not implemented");
764
765   return GST_FLOW_NOT_SUPPORTED;
766 }
767
768 /*
769  * Must be called with STREAM_LOCK.
770  */
771 static void
772 gst_collect_pads2_set_flushing_unlocked (GstCollectPads2 * pads,
773     gboolean flushing)
774 {
775   GSList *walk = NULL;
776
777   /* Update the pads flushing flag */
778   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
779     GstCollectData2 *cdata = walk->data;
780
781     if (GST_IS_PAD (cdata->pad)) {
782       GST_OBJECT_LOCK (cdata->pad);
783       if (flushing)
784         GST_PAD_SET_FLUSHING (cdata->pad);
785       else
786         GST_PAD_UNSET_FLUSHING (cdata->pad);
787       if (flushing)
788         GST_COLLECT_PADS2_STATE_SET (cdata, GST_COLLECT_PADS2_STATE_FLUSHING);
789       else
790         GST_COLLECT_PADS2_STATE_UNSET (cdata, GST_COLLECT_PADS2_STATE_FLUSHING);
791       gst_collect_pads2_clear (pads, cdata);
792       GST_OBJECT_UNLOCK (cdata->pad);
793     }
794   }
795
796   /* inform _chain of changes */
797   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
798 }
799
800 /**
801  * gst_collect_pads2_set_flushing:
802  * @pads: the collectspads to use
803  * @flushing: desired state of the pads
804  *
805  * Change the flushing state of all the pads in the collection. No pad
806  * is able to accept anymore data when @flushing is %TRUE. Calling this
807  * function with @flushing %FALSE makes @pads accept data again.
808  * Caller must ensure that downstream streaming (thread) is not blocked,
809  * e.g. by sending a FLUSH_START downstream.
810  *
811  * MT safe.
812  *
813  *
814  * Since: 0.10.36
815  */
816 void
817 gst_collect_pads2_set_flushing (GstCollectPads2 * pads, gboolean flushing)
818 {
819   g_return_if_fail (pads != NULL);
820   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
821
822   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
823   GST_COLLECT_PADS2_STREAM_LOCK (pads);
824   gst_collect_pads2_set_flushing_unlocked (pads, flushing);
825   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
826 }
827
828 /**
829  * gst_collect_pads2_start:
830  * @pads: the collectspads to use
831  *
832  * Starts the processing of data in the collect_pads2.
833  *
834  * MT safe.
835  *
836  * Since: 0.10.36
837  */
838 void
839 gst_collect_pads2_start (GstCollectPads2 * pads)
840 {
841   GSList *collected;
842
843   g_return_if_fail (pads != NULL);
844   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
845
846   GST_DEBUG_OBJECT (pads, "starting collect pads");
847
848   /* make sure stop and collect cannot be called anymore */
849   GST_COLLECT_PADS2_STREAM_LOCK (pads);
850
851   /* make pads streamable */
852   GST_OBJECT_LOCK (pads);
853
854   /* loop over the master pad list and reset the segment */
855   collected = pads->pad_list;
856   for (; collected; collected = g_slist_next (collected)) {
857     GstCollectData2 *data;
858
859     data = collected->data;
860     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
861   }
862
863   gst_collect_pads2_set_flushing_unlocked (pads, FALSE);
864
865   /* Start collect pads */
866   pads->started = TRUE;
867   GST_OBJECT_UNLOCK (pads);
868   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
869 }
870
871 /**
872  * gst_collect_pads2_stop:
873  * @pads: the collectspads to use
874  *
875  * Stops the processing of data in the collect_pads2. this function
876  * will also unblock any blocking operations.
877  *
878  * MT safe.
879  *
880  * Since: 0.10.36
881  */
882 void
883 gst_collect_pads2_stop (GstCollectPads2 * pads)
884 {
885   GSList *collected;
886
887   g_return_if_fail (pads != NULL);
888   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
889
890   GST_DEBUG_OBJECT (pads, "stopping collect pads");
891
892   /* make sure collect and start cannot be called anymore */
893   GST_COLLECT_PADS2_STREAM_LOCK (pads);
894
895   /* make pads not accept data anymore */
896   GST_OBJECT_LOCK (pads);
897   gst_collect_pads2_set_flushing_unlocked (pads, TRUE);
898
899   /* Stop collect pads */
900   pads->started = FALSE;
901   pads->eospads = 0;
902   pads->queuedpads = 0;
903
904   /* loop over the master pad list and flush buffers */
905   collected = pads->pad_list;
906   for (; collected; collected = g_slist_next (collected)) {
907     GstCollectData2 *data;
908     GstBuffer **buffer_p;
909
910     data = collected->data;
911     if (data->buffer) {
912       buffer_p = &data->buffer;
913       gst_buffer_replace (buffer_p, NULL);
914       data->pos = 0;
915     }
916     GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_EOS);
917   }
918
919   if (pads->earliest_data)
920     unref_data (pads->earliest_data);
921   pads->earliest_data = NULL;
922   pads->earliest_time = GST_CLOCK_TIME_NONE;
923
924   GST_OBJECT_UNLOCK (pads);
925   /* Wake them up so they can end the chain functions. */
926   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
927
928   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
929 }
930
931 /**
932  * gst_collect_pads2_peek:
933  * @pads: the collectspads to peek
934  * @data: the data to use
935  *
936  * Peek at the buffer currently queued in @data. This function
937  * should be called with the @pads STREAM_LOCK held, such as in the callback
938  * handler.
939  *
940  * Returns: The buffer in @data or NULL if no buffer is queued.
941  *  should unref the buffer after usage.
942  *
943  * MT safe.
944  *
945  * Since: 0.10.36
946  */
947 GstBuffer *
948 gst_collect_pads2_peek (GstCollectPads2 * pads, GstCollectData2 * data)
949 {
950   GstBuffer *result;
951
952   g_return_val_if_fail (pads != NULL, NULL);
953   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
954   g_return_val_if_fail (data != NULL, NULL);
955
956   if ((result = data->buffer))
957     gst_buffer_ref (result);
958
959   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
960       GST_DEBUG_PAD_NAME (data->pad), result);
961
962   return result;
963 }
964
965 /**
966  * gst_collect_pads2_pop:
967  * @pads: the collectspads to pop
968  * @data: the data to use
969  *
970  * Pop the buffer currently queued in @data. This function
971  * should be called with the @pads STREAM_LOCK held, such as in the callback
972  * handler.
973  *
974  * Returns: The buffer in @data or NULL if no buffer was queued.
975  *   You should unref the buffer after usage.
976  *
977  * MT safe.
978  *
979  * Since: 0.10.36
980  */
981 GstBuffer *
982 gst_collect_pads2_pop (GstCollectPads2 * pads, GstCollectData2 * data)
983 {
984   GstBuffer *result;
985
986   g_return_val_if_fail (pads != NULL, NULL);
987   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
988   g_return_val_if_fail (data != NULL, NULL);
989
990   if ((result = data->buffer)) {
991     data->buffer = NULL;
992     data->pos = 0;
993     /* one less pad with queued data now */
994     if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING))
995       pads->queuedpads--;
996   }
997
998   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
999
1000   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
1001       GST_DEBUG_PAD_NAME (data->pad), result);
1002
1003   return result;
1004 }
1005
1006 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
1007  * held */
1008 static void
1009 gst_collect_pads2_clear (GstCollectPads2 * pads, GstCollectData2 * data)
1010 {
1011   GstBuffer *buf;
1012
1013   if ((buf = gst_collect_pads2_pop (pads, data)))
1014     gst_buffer_unref (buf);
1015 }
1016
1017 /**
1018  * gst_collect_pads2_available:
1019  * @pads: the collectspads to query
1020  *
1021  * Query how much bytes can be read from each queued buffer. This means
1022  * that the result of this call is the maximum number of bytes that can
1023  * be read from each of the pads.
1024  *
1025  * This function should be called with @pads STREAM_LOCK held, such as
1026  * in the callback.
1027  *
1028  * Returns: The maximum number of bytes queued on all pads. This function
1029  * returns 0 if a pad has no queued buffer.
1030  *
1031  * MT safe.
1032  *
1033  * Since: 0.10.36
1034  */
1035 /* we might pre-calculate this in some struct field,
1036  * but would then have to maintain this in _chain and particularly _pop, etc,
1037  * even if element is never interested in this information */
1038 guint
1039 gst_collect_pads2_available (GstCollectPads2 * pads)
1040 {
1041   GSList *collected;
1042   guint result = G_MAXUINT;
1043
1044   g_return_val_if_fail (pads != NULL, 0);
1045   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
1046
1047   collected = pads->data;
1048   for (; collected; collected = g_slist_next (collected)) {
1049     GstCollectData2 *pdata;
1050     GstBuffer *buffer;
1051     gint size;
1052
1053     pdata = (GstCollectData2 *) collected->data;
1054
1055     /* ignore pad with EOS */
1056     if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (pdata,
1057                 GST_COLLECT_PADS2_STATE_EOS))) {
1058       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1059       continue;
1060     }
1061
1062     /* an empty buffer without EOS is weird when we get here.. */
1063     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1064       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1065       goto not_filled;
1066     }
1067
1068     /* this is the size left of the buffer */
1069     size = gst_buffer_get_size (buffer) - pdata->pos;
1070     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1071
1072     /* need to return the min of all available data */
1073     if (size < result)
1074       result = size;
1075   }
1076   /* nothing changed, all must be EOS then, return 0 */
1077   if (G_UNLIKELY (result == G_MAXUINT))
1078     result = 0;
1079
1080   return result;
1081
1082 not_filled:
1083   {
1084     return 0;
1085   }
1086 }
1087
1088 /**
1089  * gst_collect_pads2_flush:
1090  * @pads: the collectspads to query
1091  * @data: the data to use
1092  * @size: the number of bytes to flush
1093  *
1094  * Flush @size bytes from the pad @data.
1095  *
1096  * This function should be called with @pads STREAM_LOCK held, such as
1097  * in the callback.
1098  *
1099  * Returns: The number of bytes flushed This can be less than @size and
1100  * is 0 if the pad was end-of-stream.
1101  *
1102  * MT safe.
1103  *
1104  * Since: 0.10.36
1105  */
1106 guint
1107 gst_collect_pads2_flush (GstCollectPads2 * pads, GstCollectData2 * data,
1108     guint size)
1109 {
1110   guint flushsize;
1111   gsize bsize;
1112   GstBuffer *buffer;
1113
1114   g_return_val_if_fail (pads != NULL, 0);
1115   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
1116   g_return_val_if_fail (data != NULL, 0);
1117
1118   /* no buffer, must be EOS */
1119   if ((buffer = data->buffer) == NULL)
1120     return 0;
1121
1122   bsize = gst_buffer_get_size (buffer);
1123
1124   /* this is what we can flush at max */
1125   flushsize = MIN (size, bsize - data->pos);
1126
1127   data->pos += size;
1128
1129   if (data->pos >= bsize)
1130     /* _clear will also reset data->pos to 0 */
1131     gst_collect_pads2_clear (pads, data);
1132
1133   return flushsize;
1134 }
1135
1136 /**
1137  * gst_collect_pads2_read_buffer:
1138  * @pads: the collectspads to query
1139  * @data: the data to use
1140  * @size: the number of bytes to read
1141  *
1142  * Get a subbuffer of @size bytes from the given pad @data.
1143  *
1144  * This function should be called with @pads STREAM_LOCK held, such as in the
1145  * callback.
1146  *
1147  * Since: 0.10.36
1148  *
1149  * Returns: A sub buffer. The size of the buffer can be less that requested.
1150  * A return of NULL signals that the pad is end-of-stream.
1151  * Unref the buffer after use.
1152  *
1153  * MT safe.
1154  */
1155 GstBuffer *
1156 gst_collect_pads2_read_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
1157     guint size)
1158 {
1159   guint readsize;
1160   GstBuffer *buffer;
1161
1162   g_return_val_if_fail (pads != NULL, NULL);
1163   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
1164   g_return_val_if_fail (data != NULL, NULL);
1165
1166   /* no buffer, must be EOS */
1167   if ((buffer = data->buffer) == NULL)
1168     return NULL;
1169
1170   readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
1171
1172   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1173       readsize);
1174 }
1175
1176 /**
1177  * gst_collect_pads2_take_buffer:
1178  * @pads: the collectspads to query
1179  * @data: the data to use
1180  * @size: the number of bytes to read
1181  *
1182  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1183  * of read bytes.
1184  *
1185  * This function should be called with @pads STREAM_LOCK held, such as in the
1186  * callback.
1187  *
1188  * Since: 0.10.36
1189  *
1190  * Returns: A sub buffer. The size of the buffer can be less that requested.
1191  * A return of NULL signals that the pad is end-of-stream.
1192  * Unref the buffer after use.
1193  *
1194  * MT safe.
1195  */
1196 GstBuffer *
1197 gst_collect_pads2_take_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
1198     guint size)
1199 {
1200   GstBuffer *buffer = gst_collect_pads2_read_buffer (pads, data, size);
1201
1202   if (buffer) {
1203     gst_collect_pads2_flush (pads, data, gst_buffer_get_size (buffer));
1204   }
1205   return buffer;
1206 }
1207
1208 /**
1209  * gst_collect_pads2_set_waiting:
1210  * @pads: the collectspads
1211  * @data: the data to use
1212  * @waiting: boolean indicating whether this pad should operate
1213  *           in waiting or non-waiting mode
1214  *
1215  * Sets a pad to waiting or non-waiting mode, if at least this pad
1216  * has not been created with locked waiting state,
1217  * in which case nothing happens.
1218  *
1219  * This function should be called with @pads STREAM_LOCK held, such as
1220  * in the callback.
1221  *
1222  * MT safe.
1223  *
1224  * Since: 0.10.36
1225  */
1226 void
1227 gst_collect_pads2_set_waiting (GstCollectPads2 * pads, GstCollectData2 * data,
1228     gboolean waiting)
1229 {
1230   g_return_if_fail (pads != NULL);
1231   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
1232   g_return_if_fail (data != NULL);
1233
1234   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1235       GST_PAD_NAME (data->pad), waiting,
1236       GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_LOCKED));
1237
1238   /* Do something only on a change and if not locked */
1239   if (!GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_LOCKED) &&
1240       (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING) !=
1241           ! !waiting)) {
1242     /* Set waiting state for this pad */
1243     if (waiting)
1244       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_WAITING);
1245     else
1246       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_WAITING);
1247     /* Update number of queued pads if needed */
1248     if (!data->buffer &&
1249         !GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_EOS)) {
1250       if (waiting)
1251         pads->queuedpads--;
1252       else
1253         pads->queuedpads++;
1254     }
1255
1256     /* signal waiters because something changed */
1257     GST_COLLECT_PADS2_EVT_BROADCAST (pads);
1258   }
1259 }
1260
1261 /* see if pads were added or removed and update our stats. Any pad
1262  * added after releasing the LOCK will get collected in the next
1263  * round.
1264  *
1265  * We can do a quick check by checking the cookies, that get changed
1266  * whenever the pad list is updated.
1267  *
1268  * Must be called with STREAM_LOCK.
1269  */
1270 static void
1271 gst_collect_pads2_check_pads (GstCollectPads2 * pads)
1272 {
1273   /* the master list and cookie are protected with LOCK */
1274   GST_OBJECT_LOCK (pads);
1275   if (G_UNLIKELY (pads->pad_cookie != pads->cookie)) {
1276     GSList *collected;
1277
1278     /* clear list and stats */
1279     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1280     g_slist_free (pads->data);
1281     pads->data = NULL;
1282     pads->numpads = 0;
1283     pads->queuedpads = 0;
1284     pads->eospads = 0;
1285     if (pads->earliest_data)
1286       unref_data (pads->earliest_data);
1287     pads->earliest_data = NULL;
1288     pads->earliest_time = GST_CLOCK_TIME_NONE;
1289
1290     /* loop over the master pad list */
1291     collected = pads->pad_list;
1292     for (; collected; collected = g_slist_next (collected)) {
1293       GstCollectData2 *data;
1294
1295       /* update the stats */
1296       pads->numpads++;
1297       data = collected->data;
1298       if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_EOS))
1299         pads->eospads++;
1300       else if (data->buffer || !GST_COLLECT_PADS2_STATE_IS_SET (data,
1301               GST_COLLECT_PADS2_STATE_WAITING))
1302         pads->queuedpads++;
1303
1304       /* add to the list of pads to collect */
1305       ref_data (data);
1306       /* preserve order of adding/requesting pads */
1307       pads->data = g_slist_append (pads->data, data);
1308     }
1309     /* and update the cookie */
1310     pads->cookie = pads->pad_cookie;
1311   }
1312   GST_OBJECT_UNLOCK (pads);
1313 }
1314
1315 /* checks if all the pads are collected and call the collectfunction
1316  *
1317  * Should be called with STREAM_LOCK.
1318  *
1319  * Returns: The #GstFlowReturn of collection.
1320  */
1321 static GstFlowReturn
1322 gst_collect_pads2_check_collected (GstCollectPads2 * pads)
1323 {
1324   GstFlowReturn flow_ret = GST_FLOW_OK;
1325   GstCollectPads2Function func;
1326   gpointer user_data;
1327
1328   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
1329
1330   GST_OBJECT_LOCK (pads);
1331   func = pads->func;
1332   user_data = pads->user_data;
1333   GST_OBJECT_UNLOCK (pads);
1334
1335   g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
1336
1337   /* check for new pads, update stats etc.. */
1338   gst_collect_pads2_check_pads (pads);
1339
1340   if (G_UNLIKELY (pads->eospads == pads->numpads)) {
1341     /* If all our pads are EOS just collect once to let the element
1342      * do its final EOS handling. */
1343     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1344         pads->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1345
1346     flow_ret = func (pads, user_data);
1347   } else {
1348     gboolean collected = FALSE;
1349
1350     /* We call the collected function as long as our condition matches. */
1351     while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
1352       GST_DEBUG_OBJECT (pads, "All active pads (%d + %d >= %d) have data, "
1353           "calling %s", pads->queuedpads, pads->eospads, pads->numpads,
1354           GST_DEBUG_FUNCPTR_NAME (func));
1355
1356       flow_ret = func (pads, user_data);
1357       collected = TRUE;
1358
1359       /* break on error */
1360       if (flow_ret != GST_FLOW_OK)
1361         break;
1362       /* Don't keep looping after telling the element EOS or flushing */
1363       if (pads->queuedpads == 0)
1364         break;
1365     }
1366     if (!collected)
1367       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1368           pads->numpads);
1369   }
1370   return flow_ret;
1371 }
1372
1373
1374 /* General overview:
1375  * - only pad with a buffer can determine earliest_data (and earliest_time)
1376  * - only segment info determines (non-)waiting state
1377  * - ? perhaps use _stream_time for comparison
1378  *   (which muxers might have use as well ?)
1379  */
1380
1381 /*
1382  * Function to recalculate the waiting state of all pads.
1383  *
1384  * Must be called with STREAM_LOCK.
1385  *
1386  * Returns TRUE if a pad was set to waiting
1387  * (from non-waiting state).
1388  */
1389 static gboolean
1390 gst_collect_pads2_recalculate_waiting (GstCollectPads2 * pads)
1391 {
1392   GSList *collected;
1393   gboolean result = FALSE;
1394
1395   /* If earliest time is not known, there is nothing to do. */
1396   if (pads->earliest_data == NULL)
1397     return FALSE;
1398
1399   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1400     GstCollectData2 *data = (GstCollectData2 *) collected->data;
1401     int cmp_res;
1402
1403     /* check if pad has a segment */
1404     if (data->segment.format == GST_FORMAT_UNDEFINED)
1405       continue;
1406
1407     /* check segment format */
1408     if (data->segment.format != GST_FORMAT_TIME) {
1409       GST_ERROR_OBJECT (pads, "GstCollectPads2 can handle only time segments.");
1410       continue;
1411     }
1412
1413     /* check if the waiting state should be changed */
1414     cmp_res = pads->compare_func (pads, data, data->segment.start,
1415         pads->earliest_data, pads->earliest_time, pads->compare_user_data);
1416     if (cmp_res > 0)
1417       /* stop waiting */
1418       gst_collect_pads2_set_waiting (pads, data, FALSE);
1419     else {
1420       if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1421               GST_COLLECT_PADS2_STATE_WAITING)) {
1422         /* start waiting */
1423         gst_collect_pads2_set_waiting (pads, data, TRUE);
1424         result = TRUE;
1425       }
1426     }
1427   }
1428
1429   return result;
1430 }
1431
1432 /**
1433  * gst_collect_pads2_find_best_pad:
1434  * @pads: the collectpads to use
1435  * @data: returns the collectdata for earliest data
1436  * @time: returns the earliest available buffertime
1437  *
1438  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1439  * and return the corresponding #GstCollectData2 and buffertime.
1440  *
1441  * This function should be called with STREAM_LOCK held,
1442  * such as in the callback.
1443  *
1444  * Since: 0.10.36
1445  */
1446 static void
1447 gst_collect_pads2_find_best_pad (GstCollectPads2 * pads,
1448     GstCollectData2 ** data, GstClockTime * time)
1449 {
1450   GSList *collected;
1451   GstCollectData2 *best = NULL;
1452   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1453
1454   g_return_if_fail (data != NULL);
1455   g_return_if_fail (time != NULL);
1456
1457   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1458     GstBuffer *buffer;
1459     GstCollectData2 *data = (GstCollectData2 *) collected->data;
1460     GstClockTime timestamp;
1461
1462     buffer = gst_collect_pads2_peek (pads, data);
1463     /* if we have a buffer check if it is better then the current best one */
1464     if (buffer != NULL) {
1465       timestamp = GST_BUFFER_TIMESTAMP (buffer);
1466       gst_buffer_unref (buffer);
1467       if (best == NULL || pads->compare_func (pads, data, timestamp,
1468               best, best_time, pads->compare_user_data) < 0) {
1469         best = data;
1470         best_time = timestamp;
1471       }
1472     }
1473   }
1474
1475   /* set earliest time */
1476   *data = best;
1477   *time = best_time;
1478
1479   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1480       best ? GST_PAD_NAME (((GstCollectData2 *) best)->pad) : "(nil)",
1481       GST_TIME_ARGS (best_time));
1482 }
1483
1484 /*
1485  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1486  * gst_collect_pads2_recalculate_waiting
1487  *
1488  * Must be called with STREAM_LOCK.
1489  */
1490 static gboolean
1491 gst_collect_pads2_recalculate_full (GstCollectPads2 * pads)
1492 {
1493   if (pads->earliest_data)
1494     unref_data (pads->earliest_data);
1495   gst_collect_pads2_find_best_pad (pads, &pads->earliest_data,
1496       &pads->earliest_time);
1497   if (pads->earliest_data)
1498     ref_data (pads->earliest_data);
1499   return gst_collect_pads2_recalculate_waiting (pads);
1500 }
1501
1502 /*
1503  * Default collect callback triggered when #GstCollectPads2 gathered all data.
1504  *
1505  * Called with STREAM_LOCK.
1506  */
1507 static GstFlowReturn
1508 gst_collect_pads2_default_collected (GstCollectPads2 * pads, gpointer user_data)
1509 {
1510   GstCollectData2 *best = NULL;
1511   GstBuffer *buffer;
1512   GstFlowReturn ret = GST_FLOW_OK;
1513   GstCollectPads2BufferFunction func;
1514   gpointer buffer_user_data;
1515
1516   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
1517
1518   GST_OBJECT_LOCK (pads);
1519   func = pads->buffer_func;
1520   buffer_user_data = pads->buffer_user_data;
1521   GST_OBJECT_UNLOCK (pads);
1522
1523   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1524
1525   /* Find the oldest pad at all cost */
1526   if (gst_collect_pads2_recalculate_full (pads)) {
1527     /* waiting was switched on,
1528      * so give another thread a chance to deliver a possibly
1529      * older buffer; don't charge on yet with the current oldest */
1530     ret = GST_FLOW_OK;
1531     goto done;
1532   }
1533
1534   best = pads->earliest_data;
1535
1536   /* No data collected means EOS. */
1537   if (G_UNLIKELY (best == NULL)) {
1538     ret = func (pads, best, NULL, buffer_user_data);
1539     if (ret == GST_FLOW_OK)
1540       ret = GST_FLOW_EOS;
1541     goto done;
1542   }
1543
1544   /* make sure that the pad we take a buffer from is waiting;
1545    * otherwise popping a buffer will seem not to have happened
1546    * and collectpads can get into a busy loop */
1547   gst_collect_pads2_set_waiting (pads, best, TRUE);
1548
1549   /* Send buffer */
1550   buffer = gst_collect_pads2_pop (pads, best);
1551   ret = func (pads, best, buffer, buffer_user_data);
1552
1553   /* maybe non-waiting was forced to waiting above due to
1554    * newsegment events coming too sparsely,
1555    * so re-check to restore state to avoid hanging/waiting */
1556   gst_collect_pads2_recalculate_full (pads);
1557
1558 done:
1559   return ret;
1560 }
1561
1562 /*
1563  * Default timestamp compare function.
1564  */
1565 static gint
1566 gst_collect_pads2_default_compare_func (GstCollectPads2 * pads,
1567     GstCollectData2 * data1, GstClockTime timestamp1,
1568     GstCollectData2 * data2, GstClockTime timestamp2, gpointer user_data)
1569 {
1570
1571   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1572       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1573       GST_TIME_ARGS (timestamp2));
1574   /* non-valid timestamps go first as they are probably headers or so */
1575   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1576     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1577
1578   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1579     return 1;
1580
1581   /* compare timestamp */
1582   if (timestamp1 < timestamp2)
1583     return -1;
1584
1585   if (timestamp1 > timestamp2)
1586     return 1;
1587
1588   return 0;
1589 }
1590
1591 static gboolean
1592 gst_collect_pads2_event (GstPad * pad, GstObject * parent, GstEvent * event)
1593 {
1594   gboolean res = FALSE, need_unlock = FALSE;
1595   GstCollectData2 *data;
1596   GstCollectPads2 *pads;
1597   GstCollectPads2EventFunction event_func;
1598   GstCollectPads2BufferFunction buffer_func;
1599   gpointer event_user_data;
1600
1601   /* some magic to get the managing collect_pads2 */
1602   GST_OBJECT_LOCK (pad);
1603   data = (GstCollectData2 *) gst_pad_get_element_private (pad);
1604   if (G_UNLIKELY (data == NULL))
1605     goto pad_removed;
1606   ref_data (data);
1607   GST_OBJECT_UNLOCK (pad);
1608
1609   res = FALSE;
1610
1611   pads = data->collect;
1612
1613   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
1614       GST_DEBUG_PAD_NAME (data->pad));
1615
1616   GST_OBJECT_LOCK (pads);
1617   event_func = pads->event_func;
1618   event_user_data = pads->event_user_data;
1619   buffer_func = pads->buffer_func;
1620   GST_OBJECT_UNLOCK (pads);
1621
1622   switch (GST_EVENT_TYPE (event)) {
1623     case GST_EVENT_FLUSH_START:
1624     {
1625       /* forward event to unblock check_collected */
1626       if (event_func)
1627         res = event_func (pads, data, event, event_user_data);
1628       if (!res)
1629         res = gst_pad_event_default (pad, parent, event);
1630
1631       /* now unblock the chain function.
1632        * no cond per pad, so they all unblock, 
1633        * non-flushing block again */
1634       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1635       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_FLUSHING);
1636       gst_collect_pads2_clear (pads, data);
1637
1638       /* cater for possible default muxing functionality */
1639       if (buffer_func) {
1640         /* restore to initial state */
1641         gst_collect_pads2_set_waiting (pads, data, TRUE);
1642         /* if the current pad is affected, reset state, recalculate later */
1643         if (pads->earliest_data == data) {
1644           unref_data (data);
1645           pads->earliest_data = NULL;
1646           pads->earliest_time = GST_CLOCK_TIME_NONE;
1647         }
1648       }
1649
1650       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1651
1652       /* event already cleaned up by forwarding */
1653       res = TRUE;
1654       goto done;
1655     }
1656     case GST_EVENT_FLUSH_STOP:
1657     {
1658       /* flush the 1 buffer queue */
1659       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1660       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_FLUSHING);
1661       gst_collect_pads2_clear (pads, data);
1662       /* we need new segment info after the flush */
1663       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1664       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
1665       /* if the pad was EOS, remove the EOS flag and
1666        * decrement the number of eospads */
1667       if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1668                   GST_COLLECT_PADS2_STATE_EOS))) {
1669         if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1670                 GST_COLLECT_PADS2_STATE_WAITING))
1671           pads->queuedpads++;
1672         pads->eospads--;
1673         GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_EOS);
1674       }
1675       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1676
1677       /* forward event */
1678       goto forward_or_default;
1679     }
1680     case GST_EVENT_EOS:
1681     {
1682       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1683       /* if the pad was not EOS, make it EOS and so we
1684        * have one more eospad */
1685       if (G_LIKELY (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1686                   GST_COLLECT_PADS2_STATE_EOS))) {
1687         GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_EOS);
1688         if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1689                 GST_COLLECT_PADS2_STATE_WAITING))
1690           pads->queuedpads--;
1691         pads->eospads++;
1692       }
1693       /* check if we need collecting anything, we ignore the result. */
1694       gst_collect_pads2_check_collected (pads);
1695       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1696
1697       goto forward_or_eat;
1698     }
1699     case GST_EVENT_SEGMENT:
1700     {
1701       GstSegment seg;
1702       gint cmp_res;
1703
1704       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1705
1706       gst_event_copy_segment (event, &seg);
1707
1708       GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1709
1710       data->segment = seg;
1711       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
1712
1713       /* default muxing functionality */
1714       if (!buffer_func)
1715         goto newsegment_done;
1716
1717       /* default collection can not handle other segment formats than time */
1718       if (seg.format != GST_FORMAT_TIME) {
1719         GST_ERROR_OBJECT (pads, "GstCollectPads2 default collecting "
1720             "can only handle time segments.");
1721         goto newsegment_done;
1722       }
1723
1724       /* If oldest time is not known, or current pad got newsegment;
1725        * recalculate the state */
1726       if (!pads->earliest_data || pads->earliest_data == data) {
1727         gst_collect_pads2_recalculate_full (pads);
1728         goto newsegment_done;
1729       }
1730
1731       /* Check if the waiting state of the pad should change. */
1732       cmp_res = pads->compare_func (pads, data, seg.start, pads->earliest_data,
1733           pads->earliest_time, pads->compare_user_data);
1734
1735       if (cmp_res > 0)
1736         /* Stop waiting */
1737         gst_collect_pads2_set_waiting (pads, data, FALSE);
1738
1739     newsegment_done:
1740       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1741       /* we must not forward this event since multiple segments will be
1742        * accumulated and this is certainly not what we want. */
1743       goto forward_or_eat;
1744     }
1745     default:
1746       /* forward other events */
1747       goto forward_or_default;
1748   }
1749
1750 forward_or_default:
1751   if (GST_EVENT_IS_SERIALIZED (event)) {
1752     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1753     need_unlock = TRUE;
1754   }
1755   if (event_func)
1756     res = event_func (pads, data, event, event_user_data);
1757   if (!res)
1758     res = gst_pad_event_default (pad, parent, event);
1759   if (need_unlock)
1760     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1761   goto done;
1762
1763 forward_or_eat:
1764   if (GST_EVENT_IS_SERIALIZED (event)) {
1765     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1766     need_unlock = TRUE;
1767   }
1768   if (event_func)
1769     res = event_func (pads, data, event, event_user_data);
1770   if (!res) {
1771     gst_event_unref (event);
1772     res = TRUE;
1773   }
1774   if (need_unlock)
1775     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1776   goto done;
1777
1778 done:
1779   unref_data (data);
1780   return res;
1781
1782   /* ERRORS */
1783 pad_removed:
1784   {
1785     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1786     GST_OBJECT_UNLOCK (pad);
1787     return FALSE;
1788   }
1789 }
1790
1791 /* For each buffer we receive we check if our collected condition is reached
1792  * and if so we call the collected function. When this is done we check if
1793  * data has been unqueued. If data is still queued we wait holding the stream
1794  * lock to make sure no EOS event can happen while we are ready to be
1795  * collected 
1796  */
1797 static GstFlowReturn
1798 gst_collect_pads2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1799 {
1800   GstCollectData2 *data;
1801   GstCollectPads2 *pads;
1802   GstFlowReturn ret;
1803   GstBuffer **buffer_p;
1804   guint32 cookie;
1805
1806   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1807
1808   /* some magic to get the managing collect_pads2 */
1809   GST_OBJECT_LOCK (pad);
1810   data = (GstCollectData2 *) gst_pad_get_element_private (pad);
1811   if (G_UNLIKELY (data == NULL))
1812     goto no_data;
1813   ref_data (data);
1814   GST_OBJECT_UNLOCK (pad);
1815
1816   pads = data->collect;
1817
1818   GST_COLLECT_PADS2_STREAM_LOCK (pads);
1819   /* if not started, bail out */
1820   if (G_UNLIKELY (!pads->started))
1821     goto not_started;
1822   /* check if this pad is flushing */
1823   if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1824               GST_COLLECT_PADS2_STATE_FLUSHING)))
1825     goto flushing;
1826   /* pad was EOS, we can refuse this data */
1827   if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1828               GST_COLLECT_PADS2_STATE_EOS)))
1829     goto eos;
1830
1831   /* see if we need to clip */
1832   if (pads->clip_func) {
1833     GstBuffer *outbuf = NULL;
1834     ret = pads->clip_func (pads, data, buffer, &outbuf, pads->clip_user_data);
1835     buffer = outbuf;
1836
1837     if (G_UNLIKELY (outbuf == NULL))
1838       goto clipped;
1839
1840     if (G_UNLIKELY (ret == GST_FLOW_EOS))
1841       goto eos;
1842     else if (G_UNLIKELY (ret != GST_FLOW_OK))
1843       goto error;
1844   }
1845
1846   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
1847       GST_DEBUG_PAD_NAME (pad));
1848
1849   /* One more pad has data queued */
1850   if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING))
1851     pads->queuedpads++;
1852   buffer_p = &data->buffer;
1853   gst_buffer_replace (buffer_p, buffer);
1854
1855   /* update segment last position if in TIME */
1856   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1857     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
1858
1859     if (GST_CLOCK_TIME_IS_VALID (timestamp))
1860       data->segment.position = timestamp;
1861   }
1862
1863   /* While we have data queued on this pad try to collect stuff */
1864   do {
1865     /* Check if our collected condition is matched and call the collected
1866      * function if it is */
1867     ret = gst_collect_pads2_check_collected (pads);
1868     /* when an error occurs, we want to report this back to the caller ASAP
1869      * without having to block if the buffer was not popped */
1870     if (G_UNLIKELY (ret != GST_FLOW_OK))
1871       goto error;
1872
1873     /* data was consumed, we can exit and accept new data */
1874     if (data->buffer == NULL)
1875       break;
1876
1877     /* Having the _INIT here means we don't care about any broadcast up to here
1878      * (most of which occur with STREAM_LOCK held, so could not have happened
1879      * anyway).  We do care about e.g. a remove initiated broadcast as of this
1880      * point.  Putting it here also makes this thread ignores any evt it raised
1881      * itself (as is a usual WAIT semantic).
1882      */
1883     GST_COLLECT_PADS2_EVT_INIT (cookie);
1884
1885     /* pad could be removed and re-added */
1886     unref_data (data);
1887     GST_OBJECT_LOCK (pad);
1888     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
1889       goto pad_removed;
1890     ref_data (data);
1891     GST_OBJECT_UNLOCK (pad);
1892
1893     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
1894         GST_DEBUG_PAD_NAME (pad));
1895
1896     /* wait to be collected, this must happen from another thread triggered
1897      * by the _chain function of another pad. We release the lock so we
1898      * can get stopped or flushed as well. We can however not get EOS
1899      * because we still hold the STREAM_LOCK.
1900      */
1901     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1902     GST_COLLECT_PADS2_EVT_WAIT (pads, cookie);
1903     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1904
1905     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
1906
1907     /* after a signal, we could be stopped */
1908     if (G_UNLIKELY (!pads->started))
1909       goto not_started;
1910     /* check if this pad is flushing */
1911     if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1912                 GST_COLLECT_PADS2_STATE_FLUSHING)))
1913       goto flushing;
1914   }
1915   while (data->buffer != NULL);
1916
1917 unlock_done:
1918   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1919   unref_data (data);
1920   if (buffer)
1921     gst_buffer_unref (buffer);
1922   return ret;
1923
1924 pad_removed:
1925   {
1926     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1927     GST_OBJECT_UNLOCK (pad);
1928     ret = GST_FLOW_NOT_LINKED;
1929     goto unlock_done;
1930   }
1931   /* ERRORS */
1932 no_data:
1933   {
1934     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1935     GST_OBJECT_UNLOCK (pad);
1936     gst_buffer_unref (buffer);
1937     return GST_FLOW_NOT_LINKED;
1938   }
1939 not_started:
1940   {
1941     GST_DEBUG ("not started");
1942     gst_collect_pads2_clear (pads, data);
1943     ret = GST_FLOW_WRONG_STATE;
1944     goto unlock_done;
1945   }
1946 flushing:
1947   {
1948     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
1949     gst_collect_pads2_clear (pads, data);
1950     ret = GST_FLOW_WRONG_STATE;
1951     goto unlock_done;
1952   }
1953 eos:
1954   {
1955     /* we should not post an error for this, just inform upstream that
1956      * we don't expect anything anymore */
1957     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
1958     ret = GST_FLOW_EOS;
1959     goto unlock_done;
1960   }
1961 clipped:
1962   {
1963     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1964     ret = GST_FLOW_OK;
1965     goto unlock_done;
1966   }
1967 error:
1968   {
1969     /* we print the error, the element should post a reasonable error
1970      * message for fatal errors */
1971     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
1972     gst_collect_pads2_clear (pads, data);
1973     goto unlock_done;
1974   }
1975 }