collectpads2: always recording incoming segment info if no buffer_func set
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads2.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads2.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23 /**
24  * SECTION:gstcollectpads2
25  * @short_description: manages a set of pads that operate in collect mode
26  * @see_also:
27  *
28  * Manages a set of pads that operate in collect mode. This means that control
29  * is given to the manager of this object when all pads have data.
30  * <itemizedlist>
31  *   <listitem><para>
32  *     Collectpads are created with gst_collect_pads2_new(). A callback should then
33  *     be installed with gst_collect_pads2_set_function ().
34  *   </para></listitem>
35  *   <listitem><para>
36  *     Pads are added to the collection with gst_collect_pads2_add_pad()/
37  *     gst_collect_pads2_remove_pad(). The pad
38  *     has to be a sinkpad. The chain and event functions of the pad are
39  *     overridden. The element_private of the pad is used to store
40  *     private information for the collectpads.
41  *   </para></listitem>
42  *   <listitem><para>
43  *     For each pad, data is queued in the _chain function or by
44  *     performing a pull_range.
45  *   </para></listitem>
46  *   <listitem><para>
47  *     When data is queued on all pads in waiting mode, the callback function is called.
48  *   </para></listitem>
49  *   <listitem><para>
50  *     Data can be dequeued from the pad with the gst_collect_pads2_pop() method.
51  *     One can peek at the data with the gst_collect_pads2_peek() function.
52  *     These functions will return NULL if the pad received an EOS event. When all
53  *     pads return NULL from a gst_collect_pads2_peek(), the element can emit an EOS
54  *     event itself.
55  *   </para></listitem>
56  *   <listitem><para>
57  *     Data can also be dequeued in byte units using the gst_collect_pads2_available(),
58  *     gst_collect_pads2_read() and gst_collect_pads2_flush() calls.
59  *   </para></listitem>
60  *   <listitem><para>
61  *     Elements should call gst_collect_pads2_start() and gst_collect_pads2_stop() in
62  *     their state change functions to start and stop the processing of the collectpads.
63  *     The gst_collect_pads2_stop() call should be called before calling the parent
64  *     element state change function in the PAUSED_TO_READY state change to ensure
65  *     no pad is blocked and the element can finish streaming.
66  *   </para></listitem>
67  *   <listitem><para>
68  *     gst_collect_pads2_collect() and gst_collect_pads2_collect_range() can be used by
69  *     elements that start a #GstTask to drive the collect_pads2. This feature is however
70  *     not yet implemented.
71  *   </para></listitem>
72  *   <listitem><para>
73  *     gst_collect_pads2_set_waiting() sets a pad to waiting or non-waiting mode.
74  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
75  *     Thus these pads may but need not have data when the callback is called.
76  *     All pads are in waiting mode by default.
77  *   </para></listitem>
78  * </itemizedlist>
79  *
80  * Last reviewed on 2011-10-28 (0.10.36)
81  *
82  * Since: 0.10.36
83  */
84
85 #ifdef HAVE_CONFIG_H
86 #  include "config.h"
87 #endif
88
89 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
90  * with newer GLib versions (>= 2.31.0) */
91 #define GLIB_DISABLE_DEPRECATION_WARNINGS
92 #include <gst/gst_private.h>
93
94 #include "gstcollectpads2.h"
95
96 #include "../../../gst/glib-compat-private.h"
97
98 GST_DEBUG_CATEGORY_STATIC (collect_pads2_debug);
99 #define GST_CAT_DEFAULT collect_pads2_debug
100
101 #define parent_class gst_collect_pads2_parent_class
102 G_DEFINE_TYPE (GstCollectPads2, gst_collect_pads2, GST_TYPE_OBJECT);
103
104 struct _GstCollectData2Private
105 {
106   /* refcounting for struct, and destroy callback */
107   GstCollectData2DestroyNotify destroy_notify;
108   gint refcount;
109 };
110
111 struct _GstCollectPads2Private
112 {
113   /* with LOCK and/or STREAM_LOCK */
114   gboolean started;
115
116   /* with STREAM_LOCK */
117   guint32 cookie;               /* @data list cookie */
118   guint numpads;                /* number of pads in @data */
119   guint queuedpads;             /* number of pads with a buffer */
120   guint eospads;                /* number of pads that are EOS */
121   GstClockTime earliest_time;   /* Current earliest time */
122   GstCollectData2 *earliest_data;       /* Pad data for current earliest time */
123
124   /* with LOCK */
125   GSList *pad_list;             /* updated pad list */
126   guint32 pad_cookie;           /* updated cookie */
127
128   GstCollectPads2Function func; /* function and user_data for callback */
129   gpointer user_data;
130   GstCollectPads2BufferFunction buffer_func;    /* function and user_data for buffer callback */
131   gpointer buffer_user_data;
132   GstCollectPads2CompareFunction compare_func;
133   gpointer compare_user_data;
134   GstCollectPads2EventFunction event_func;      /* function and data for event callback */
135   gpointer event_user_data;
136   GstCollectPads2ClipFunction clip_func;
137   gpointer clip_user_data;
138
139   /* no other lock needed */
140   GMutex *evt_lock;             /* these make up sort of poor man's event signaling */
141   GCond *evt_cond;
142   guint32 evt_cookie;
143 };
144
145 static void gst_collect_pads2_clear (GstCollectPads2 * pads,
146     GstCollectData2 * data);
147 static GstFlowReturn gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer);
148 static gboolean gst_collect_pads2_event (GstPad * pad, GstEvent * event);
149 static void gst_collect_pads2_finalize (GObject * object);
150 static GstFlowReturn gst_collect_pads2_default_collected (GstCollectPads2 *
151     pads, gpointer user_data);
152 static gint gst_collect_pads2_default_compare_func (GstCollectPads2 * pads,
153     GstCollectData2 * data1, GstClockTime timestamp1, GstCollectData2 * data2,
154     GstClockTime timestamp2, gpointer user_data);
155 static gboolean gst_collect_pads2_recalculate_full (GstCollectPads2 * pads);
156 static void ref_data (GstCollectData2 * data);
157 static void unref_data (GstCollectData2 * data);
158
159 /* Some properties are protected by LOCK, others by STREAM_LOCK
160  * However, manipulating either of these partitions may require
161  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
162  * Alternative implementations are possible, e.g. some low-level re-implementing
163  * of the 2 above locks to drop both of them atomically when going into _WAIT.
164  */
165 #define GST_COLLECT_PADS2_GET_EVT_COND(pads) (((GstCollectPads2 *)pads)->priv->evt_cond)
166 #define GST_COLLECT_PADS2_GET_EVT_LOCK(pads) (((GstCollectPads2 *)pads)->priv->evt_lock)
167 #define GST_COLLECT_PADS2_EVT_WAIT(pads, cookie) G_STMT_START {    \
168   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
169   /* should work unless a lot of event'ing and thread starvation */\
170   while (cookie == ((GstCollectPads2 *) pads)->priv->evt_cookie)         \
171     g_cond_wait (GST_COLLECT_PADS2_GET_EVT_COND (pads),            \
172         GST_COLLECT_PADS2_GET_EVT_LOCK (pads));                    \
173   cookie = ((GstCollectPads2 *) pads)->priv->evt_cookie;                 \
174   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
175 } G_STMT_END
176 #define GST_COLLECT_PADS2_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
177   GTimeVal __tv; \
178   \
179   g_get_current_time (&tv); \
180   g_time_val_add (&tv, timeout); \
181   \
182   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
183   /* should work unless a lot of event'ing and thread starvation */\
184   while (cookie == ((GstCollectPads2 *) pads)->priv->evt_cookie)         \
185     g_cond_timed_wait (GST_COLLECT_PADS2_GET_EVT_COND (pads),            \
186         GST_COLLECT_PADS2_GET_EVT_LOCK (pads), &tv);                    \
187   cookie = ((GstCollectPads2 *) pads)->priv->evt_cookie;                 \
188   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
189 } G_STMT_END
190 #define GST_COLLECT_PADS2_EVT_BROADCAST(pads) G_STMT_START {       \
191   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
192   /* never mind wrap-around */                                     \
193   ++(((GstCollectPads2 *) pads)->priv->evt_cookie);                      \
194   g_cond_broadcast (GST_COLLECT_PADS2_GET_EVT_COND (pads));        \
195   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
196 } G_STMT_END
197 #define GST_COLLECT_PADS2_EVT_INIT(cookie) G_STMT_START {          \
198   g_mutex_lock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));            \
199   cookie = ((GstCollectPads2 *) pads)->priv->evt_cookie;                 \
200   g_mutex_unlock (GST_COLLECT_PADS2_GET_EVT_LOCK (pads));          \
201 } G_STMT_END
202
203 static void
204 gst_collect_pads2_class_init (GstCollectPads2Class * klass)
205 {
206   GObjectClass *gobject_class = (GObjectClass *) klass;
207
208   g_type_class_add_private (klass, sizeof (GstCollectPads2Private));
209
210   GST_DEBUG_CATEGORY_INIT (collect_pads2_debug, "collectpads2", 0,
211       "GstCollectPads2");
212
213   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads2_finalize);
214 }
215
216 static void
217 gst_collect_pads2_init (GstCollectPads2 * pads)
218 {
219   pads->priv =
220       G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS2,
221       GstCollectPads2Private);
222
223   pads->data = NULL;
224   pads->priv->cookie = 0;
225   pads->priv->numpads = 0;
226   pads->priv->queuedpads = 0;
227   pads->priv->eospads = 0;
228   pads->priv->started = FALSE;
229
230   g_static_rec_mutex_init (&pads->stream_lock);
231
232   pads->priv->func = gst_collect_pads2_default_collected;
233   pads->priv->user_data = NULL;
234   pads->priv->event_func = NULL;
235   pads->priv->event_user_data = NULL;
236
237   /* members for default muxing */
238   pads->priv->buffer_func = NULL;
239   pads->priv->buffer_user_data = NULL;
240   pads->priv->compare_func = gst_collect_pads2_default_compare_func;
241   pads->priv->compare_user_data = NULL;
242   pads->priv->earliest_data = NULL;
243   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
244
245   /* members to manage the pad list */
246   pads->priv->pad_cookie = 0;
247   pads->priv->pad_list = NULL;
248
249   /* members for event */
250   pads->priv->evt_lock = g_mutex_new ();
251   pads->priv->evt_cond = g_cond_new ();
252   pads->priv->evt_cookie = 0;
253 }
254
255 static void
256 gst_collect_pads2_finalize (GObject * object)
257 {
258   GstCollectPads2 *pads = GST_COLLECT_PADS2 (object);
259
260   GST_DEBUG_OBJECT (object, "finalize");
261
262   g_static_rec_mutex_free (&pads->stream_lock);
263
264   g_cond_free (pads->priv->evt_cond);
265   g_mutex_free (pads->priv->evt_lock);
266
267   /* Remove pads and free pads list */
268   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
269   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
270   g_slist_free (pads->data);
271   g_slist_free (pads->priv->pad_list);
272
273   G_OBJECT_CLASS (parent_class)->finalize (object);
274 }
275
276 /**
277  * gst_collect_pads2_new:
278  *
279  * Create a new instance of #GstCollectsPads.
280  *
281  * MT safe.
282  *
283  * Returns: (transfer full): a new #GstCollectPads2, or NULL in case of an error.
284  *
285  * Since: 0.10.36
286  */
287 GstCollectPads2 *
288 gst_collect_pads2_new (void)
289 {
290   GstCollectPads2 *newcoll;
291
292   newcoll = g_object_new (GST_TYPE_COLLECT_PADS2, NULL);
293
294   return newcoll;
295 }
296
297 /* Must be called with GstObject lock! */
298 static void
299 gst_collect_pads2_set_buffer_function_locked (GstCollectPads2 * pads,
300     GstCollectPads2BufferFunction func, gpointer user_data)
301 {
302   pads->priv->buffer_func = func;
303   pads->priv->buffer_user_data = user_data;
304 }
305
306 /**
307  * gst_collect_pads2_set_buffer_function:
308  * @pads: the collectpads to use
309  * @func: the function to set
310  * @user_data: (closure): user data passed to the function
311  *
312  * Set the callback function and user data that will be called with
313  * the oldest buffer when all pads have been collected.
314  *
315  * MT safe.
316  *
317  * Since: 0.10.36
318  */
319 void
320 gst_collect_pads2_set_buffer_function (GstCollectPads2 * pads,
321     GstCollectPads2BufferFunction func, gpointer user_data)
322 {
323   g_return_if_fail (pads != NULL);
324   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
325
326   GST_OBJECT_LOCK (pads);
327   gst_collect_pads2_set_buffer_function_locked (pads, func, user_data);
328   GST_OBJECT_UNLOCK (pads);
329 }
330
331 /**
332  * gst_collect_pads2_set_compare_function:
333  * @pads: the pads to use
334  * @func: the function to set
335  * @user_data: (closure): user data passed to the function
336  *
337  * Set the timestamp comparison function.
338  *
339  * MT safe.
340  *
341  * Since: 0.10.36
342  */
343 /* NOTE allowing to change comparison seems not advisable;
344 no known use-case, and collaboration with default algorithm is unpredictable.
345 If custom compairing/operation is needed, just use a collect function of
346 your own */
347 void
348 gst_collect_pads2_set_compare_function (GstCollectPads2 * pads,
349     GstCollectPads2CompareFunction func, gpointer user_data)
350 {
351   g_return_if_fail (pads != NULL);
352   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
353
354   GST_OBJECT_LOCK (pads);
355   pads->priv->compare_func = func;
356   pads->priv->compare_user_data = user_data;
357   GST_OBJECT_UNLOCK (pads);
358 }
359
360 /**
361  * gst_collect_pads2_set_function:
362  * @pads: the collectspads to use
363  * @func: the function to set
364  * @user_data: user data passed to the function
365  *
366  * CollectPads provides a default collection algorithm that will determine
367  * the oldest buffer available on all of its pads, and then delegate
368  * to a configured callback.
369  * However, if circumstances are more complicated and/or more control
370  * is desired, this sets a callback that will be invoked instead when
371  * all the pads added to the collection have buffers queued.
372  * Evidently, this callback is not compatible with
373  * gst_collect_pads2_set_buffer_function() callback.
374  * If this callback is set, the former will be unset.
375  *
376  * MT safe.
377  *
378  * Since: 0.10.36
379  */
380 void
381 gst_collect_pads2_set_function (GstCollectPads2 * pads,
382     GstCollectPads2Function func, gpointer user_data)
383 {
384   g_return_if_fail (pads != NULL);
385   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
386
387   GST_OBJECT_LOCK (pads);
388   pads->priv->func = func;
389   pads->priv->user_data = user_data;
390   gst_collect_pads2_set_buffer_function_locked (pads, NULL, NULL);
391   GST_OBJECT_UNLOCK (pads);
392 }
393
394 static void
395 ref_data (GstCollectData2 * data)
396 {
397   g_assert (data != NULL);
398
399   g_atomic_int_inc (&(data->priv->refcount));
400 }
401
402 static void
403 unref_data (GstCollectData2 * data)
404 {
405   g_assert (data != NULL);
406   g_assert (data->priv->refcount > 0);
407
408   if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
409     return;
410
411   if (data->priv->destroy_notify)
412     data->priv->destroy_notify (data);
413
414   g_object_unref (data->pad);
415   if (data->buffer) {
416     gst_buffer_unref (data->buffer);
417   }
418   g_free (data->priv);
419   g_free (data);
420 }
421
422 /**
423  * gst_collect_pads2_set_event_function:
424  * @pads: the collectspads to use
425  * @func: the function to set
426  * @user_data: user data passed to the function
427  *
428  * Set the event callback function and user data that will be called after
429  * collectpads has processed and event originating from one of the collected
430  * pads.  If the event being processed is a serialized one, this callback is
431  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
432  * held when calling a number of CollectPads functions, it should be acquired
433  * if so (unusually) needed.
434  *
435  * MT safe.
436  *
437  * Since: 0.10.36
438  */
439 void
440 gst_collect_pads2_set_event_function (GstCollectPads2 * pads,
441     GstCollectPads2EventFunction func, gpointer user_data)
442 {
443   g_return_if_fail (pads != NULL);
444   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
445
446   GST_OBJECT_LOCK (pads);
447   pads->priv->event_func = func;
448   pads->priv->event_user_data = user_data;
449   GST_OBJECT_UNLOCK (pads);
450 }
451
452 /**
453 * gst_collect_pads2_clip_running_time:
454 * @pads: the collectspads to use
455 * @cdata: collect data of corresponding pad
456 * @buf: buffer being clipped
457 * @outbuf: output buffer with running time, or NULL if clipped
458 * @user_data: user data (unused)
459 *
460 * Convenience clipping function that converts incoming buffer's timestamp
461 * to running time, or clips the buffer if outside configured segment.
462 *
463 * Since: 0.10.37
464 */
465 GstFlowReturn
466 gst_collect_pads2_clip_running_time (GstCollectPads2 * pads,
467     GstCollectData2 * cdata, GstBuffer * buf, GstBuffer ** outbuf,
468     gpointer user_data)
469 {
470   GstClockTime time;
471
472   *outbuf = buf;
473   time = GST_BUFFER_TIMESTAMP (buf);
474
475   /* invalid left alone and passed */
476   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
477     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
478     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
479       GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
480       gst_buffer_unref (buf);
481       *outbuf = NULL;
482     } else {
483       GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
484           GST_TIME_FORMAT " running time",
485           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
486       *outbuf = gst_buffer_make_metadata_writable (buf);
487       GST_BUFFER_TIMESTAMP (*outbuf) = time;
488     }
489   }
490
491   return GST_FLOW_OK;
492 }
493
494  /**
495  * gst_collect_pads2_set_clip_function:
496  * @pads: the collectspads to use
497  * @clipfunc: clip function to install
498  * @user_data: user data to pass to @clip_func
499  *
500  * Install a clipping function that is called right after a buffer is received
501  * on a pad managed by @pads. See #GstCollectPad2ClipFunction for more info.
502  *
503  * Since: 0.10.36
504  */
505 void
506 gst_collect_pads2_set_clip_function (GstCollectPads2 * pads,
507     GstCollectPads2ClipFunction clipfunc, gpointer user_data)
508 {
509   g_return_if_fail (pads != NULL);
510   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
511
512   pads->priv->clip_func = clipfunc;
513   pads->priv->clip_user_data = user_data;
514 }
515
516 /**
517  * gst_collect_pads2_add_pad:
518  * @pads: the collectspads to use
519  * @pad: (transfer none): the pad to add
520  * @size: the size of the returned #GstCollectData2 structure
521  *
522  * Add a pad to the collection of collect pads. The pad has to be
523  * a sinkpad. The refcount of the pad is incremented. Use
524  * gst_collect_pads2_remove_pad() to remove the pad from the collection
525  * again.
526  *
527  * You specify a size for the returned #GstCollectData2 structure
528  * so that you can use it to store additional information.
529  *
530  * The pad will be automatically activated in push mode when @pads is
531  * started.
532  *
533  * This function calls gst_collect_pads2_add_pad_full() passing a value of NULL
534  * for destroy_notify and TRUE for locked.
535  *
536  * MT safe.
537  *
538  * Returns: a new #GstCollectData2 to identify the new pad. Or NULL
539  *   if wrong parameters are supplied.
540  *
541  * Since: 0.10.36
542  */
543 GstCollectData2 *
544 gst_collect_pads2_add_pad (GstCollectPads2 * pads, GstPad * pad, guint size)
545 {
546   return gst_collect_pads2_add_pad_full (pads, pad, size, NULL, TRUE);
547 }
548
549 /**
550  * gst_collect_pads2_add_pad_full:
551  * @pads: the collectspads to use
552  * @pad: (transfer none): the pad to add
553  * @size: the size of the returned #GstCollectData2 structure
554  * @destroy_notify: function to be called before the returned #GstCollectData2
555  * structure is freed
556  * @lock: whether to lock this pad in usual waiting state
557  *
558  * Add a pad to the collection of collect pads. The pad has to be
559  * a sinkpad. The refcount of the pad is incremented. Use
560  * gst_collect_pads2_remove_pad() to remove the pad from the collection
561  * again.
562  *
563  * You specify a size for the returned #GstCollectData2 structure
564  * so that you can use it to store additional information.
565  *
566  * You can also specify a #GstCollectData2DestroyNotify that will be called
567  * just before the #GstCollectData2 structure is freed. It is passed the
568  * pointer to the structure and should free any custom memory and resources
569  * allocated for it.
570  *
571  * Keeping a pad locked in waiting state is only relevant when using
572  * the default collection algorithm (providing the oldest buffer).
573  * It ensures a buffer must be available on this pad for a collection
574  * to take place.  This is of typical use to a muxer element where
575  * non-subtitle streams should always be in waiting state,
576  * e.g. to assure that caps information is available on all these streams
577  * when initial headers have to be written.
578  *
579  * The pad will be automatically activated in push mode when @pads is
580  * started.
581  *
582  * MT safe.
583  *
584  * Since: 0.10.36
585  *
586  * Returns: a new #GstCollectData2 to identify the new pad. Or NULL
587  *   if wrong parameters are supplied.
588  */
589 GstCollectData2 *
590 gst_collect_pads2_add_pad_full (GstCollectPads2 * pads, GstPad * pad,
591     guint size, GstCollectData2DestroyNotify destroy_notify, gboolean lock)
592 {
593   GstCollectData2 *data;
594
595   g_return_val_if_fail (pads != NULL, NULL);
596   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
597   g_return_val_if_fail (pad != NULL, NULL);
598   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
599   g_return_val_if_fail (size >= sizeof (GstCollectData2), NULL);
600
601   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
602
603   data = g_malloc0 (size);
604   data->priv = g_new0 (GstCollectData2Private, 1);
605   data->collect = pads;
606   data->pad = gst_object_ref (pad);
607   data->buffer = NULL;
608   data->pos = 0;
609   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
610   data->state = GST_COLLECT_PADS2_STATE_WAITING;
611   data->state |= lock ? GST_COLLECT_PADS2_STATE_LOCKED : 0;
612   data->priv->refcount = 1;
613   data->priv->destroy_notify = destroy_notify;
614
615   GST_OBJECT_LOCK (pads);
616   GST_OBJECT_LOCK (pad);
617   gst_pad_set_element_private (pad, data);
618   GST_OBJECT_UNLOCK (pad);
619   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
620   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_chain));
621   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads2_event));
622   /* backward compat, also add to data if stopped, so that the element already
623    * has this in the public data list before going PAUSED (typically)
624    * this can only be done when we are stopped because we don't take the
625    * STREAM_LOCK to protect the pads->data list. */
626   if (!pads->priv->started) {
627     pads->data = g_slist_append (pads->data, data);
628     ref_data (data);
629   }
630   /* activate the pad when needed */
631   if (pads->priv->started)
632     gst_pad_set_active (pad, TRUE);
633   pads->priv->pad_cookie++;
634   GST_OBJECT_UNLOCK (pads);
635
636   return data;
637 }
638
639 static gint
640 find_pad (GstCollectData2 * data, GstPad * pad)
641 {
642   if (data->pad == pad)
643     return 0;
644   return 1;
645 }
646
647 /**
648  * gst_collect_pads2_remove_pad:
649  * @pads: the collectspads to use
650  * @pad: (transfer none): the pad to remove
651  *
652  * Remove a pad from the collection of collect pads. This function will also
653  * free the #GstCollectData2 and all the resources that were allocated with
654  * gst_collect_pads2_add_pad().
655  *
656  * The pad will be deactivated automatically when @pads is stopped.
657  *
658  * MT safe.
659  *
660  * Returns: %TRUE if the pad could be removed.
661  *
662  * Since: 0.10.36
663  */
664 gboolean
665 gst_collect_pads2_remove_pad (GstCollectPads2 * pads, GstPad * pad)
666 {
667   GstCollectData2 *data;
668   GSList *list;
669
670   g_return_val_if_fail (pads != NULL, FALSE);
671   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), FALSE);
672   g_return_val_if_fail (pad != NULL, FALSE);
673   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
674
675   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
676
677   GST_OBJECT_LOCK (pads);
678   list =
679       g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
680   if (!list)
681     goto unknown_pad;
682
683   data = (GstCollectData2 *) list->data;
684
685   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
686       data);
687
688   /* clear the stuff we configured */
689   gst_pad_set_chain_function (pad, NULL);
690   gst_pad_set_event_function (pad, NULL);
691   GST_OBJECT_LOCK (pad);
692   gst_pad_set_element_private (pad, NULL);
693   GST_OBJECT_UNLOCK (pad);
694
695   /* backward compat, also remove from data if stopped, note that this function
696    * can only be called when we are stopped because we don't take the
697    * STREAM_LOCK to protect the pads->data list. */
698   if (!pads->priv->started) {
699     GSList *dlist;
700
701     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
702     if (dlist) {
703       GstCollectData2 *pdata = dlist->data;
704
705       pads->data = g_slist_delete_link (pads->data, dlist);
706       unref_data (pdata);
707     }
708   }
709   /* remove from the pad list */
710   pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
711   pads->priv->pad_cookie++;
712
713   /* signal waiters because something changed */
714   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
715
716   /* deactivate the pad when needed */
717   if (!pads->priv->started)
718     gst_pad_set_active (pad, FALSE);
719
720   /* clean and free the collect data */
721   unref_data (data);
722
723   GST_OBJECT_UNLOCK (pads);
724
725   return TRUE;
726
727 unknown_pad:
728   {
729     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
730         GST_DEBUG_PAD_NAME (pad));
731     GST_OBJECT_UNLOCK (pads);
732     return FALSE;
733   }
734 }
735
736 /**
737  * gst_collect_pads2_is_active:
738  * @pads: the collectspads to use
739  * @pad: the pad to check
740  *
741  * Check if a pad is active.
742  *
743  * This function is currently not implemented.
744  *
745  * MT safe.
746  *
747  * Returns: %TRUE if the pad is active.
748  *
749  * Since: 0.10.36
750  */
751 gboolean
752 gst_collect_pads2_is_active (GstCollectPads2 * pads, GstPad * pad)
753 {
754   g_return_val_if_fail (pads != NULL, FALSE);
755   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), FALSE);
756   g_return_val_if_fail (pad != NULL, FALSE);
757   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
758
759   g_warning ("gst_collect_pads2_is_active() is not implemented");
760
761   return FALSE;
762 }
763
764 /**
765  * gst_collect_pads2_collect:
766  * @pads: the collectspads to use
767  *
768  * Collect data on all pads. This function is usually called
769  * from a #GstTask function in an element. 
770  *
771  * This function is currently not implemented.
772  *
773  * MT safe.
774  *
775  * Returns: #GstFlowReturn of the operation.
776  *
777  * Since: 0.10.36
778  */
779 GstFlowReturn
780 gst_collect_pads2_collect (GstCollectPads2 * pads)
781 {
782   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
783   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
784
785   g_warning ("gst_collect_pads2_collect() is not implemented");
786
787   return GST_FLOW_NOT_SUPPORTED;
788 }
789
790 /**
791  * gst_collect_pads2_collect_range:
792  * @pads: the collectspads to use
793  * @offset: the offset to collect
794  * @length: the length to collect
795  *
796  * Collect data with @offset and @length on all pads. This function
797  * is typically called in the getrange function of an element. 
798  *
799  * This function is currently not implemented.
800  *
801  * MT safe.
802  *
803  * Returns: #GstFlowReturn of the operation.
804  *
805  * Since: 0.10.36
806  */
807 GstFlowReturn
808 gst_collect_pads2_collect_range (GstCollectPads2 * pads, guint64 offset,
809     guint length)
810 {
811   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
812   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
813
814   g_warning ("gst_collect_pads2_collect_range() is not implemented");
815
816   return GST_FLOW_NOT_SUPPORTED;
817 }
818
819 /*
820  * Must be called with STREAM_LOCK.
821  */
822 static void
823 gst_collect_pads2_set_flushing_unlocked (GstCollectPads2 * pads,
824     gboolean flushing)
825 {
826   GSList *walk = NULL;
827
828   /* Update the pads flushing flag */
829   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
830     GstCollectData2 *cdata = walk->data;
831
832     if (GST_IS_PAD (cdata->pad)) {
833       GST_OBJECT_LOCK (cdata->pad);
834       if (flushing)
835         GST_PAD_SET_FLUSHING (cdata->pad);
836       else
837         GST_PAD_UNSET_FLUSHING (cdata->pad);
838       if (flushing)
839         GST_COLLECT_PADS2_STATE_SET (cdata, GST_COLLECT_PADS2_STATE_FLUSHING);
840       else
841         GST_COLLECT_PADS2_STATE_UNSET (cdata, GST_COLLECT_PADS2_STATE_FLUSHING);
842       gst_collect_pads2_clear (pads, cdata);
843       GST_OBJECT_UNLOCK (cdata->pad);
844     }
845   }
846
847   /* inform _chain of changes */
848   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
849 }
850
851 /**
852  * gst_collect_pads2_set_flushing:
853  * @pads: the collectspads to use
854  * @flushing: desired state of the pads
855  *
856  * Change the flushing state of all the pads in the collection. No pad
857  * is able to accept anymore data when @flushing is %TRUE. Calling this
858  * function with @flushing %FALSE makes @pads accept data again.
859  * Caller must ensure that downstream streaming (thread) is not blocked,
860  * e.g. by sending a FLUSH_START downstream.
861  *
862  * MT safe.
863  *
864  * Since: 0.10.36
865  */
866 void
867 gst_collect_pads2_set_flushing (GstCollectPads2 * pads, gboolean flushing)
868 {
869   g_return_if_fail (pads != NULL);
870   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
871
872   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
873   GST_COLLECT_PADS2_STREAM_LOCK (pads);
874   gst_collect_pads2_set_flushing_unlocked (pads, flushing);
875   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
876 }
877
878 /**
879  * gst_collect_pads2_start:
880  * @pads: the collectspads to use
881  *
882  * Starts the processing of data in the collect_pads2.
883  *
884  * MT safe.
885  *
886  * Since: 0.10.36
887  */
888 void
889 gst_collect_pads2_start (GstCollectPads2 * pads)
890 {
891   GSList *collected;
892
893   g_return_if_fail (pads != NULL);
894   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
895
896   GST_DEBUG_OBJECT (pads, "starting collect pads");
897
898   /* make sure stop and collect cannot be called anymore */
899   GST_COLLECT_PADS2_STREAM_LOCK (pads);
900
901   /* make pads streamable */
902   GST_OBJECT_LOCK (pads);
903
904   /* loop over the master pad list and reset the segment */
905   collected = pads->priv->pad_list;
906   for (; collected; collected = g_slist_next (collected)) {
907     GstCollectData2 *data;
908
909     data = collected->data;
910     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
911   }
912
913   gst_collect_pads2_set_flushing_unlocked (pads, FALSE);
914
915   /* Start collect pads */
916   pads->priv->started = TRUE;
917   GST_OBJECT_UNLOCK (pads);
918   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
919 }
920
921 /**
922  * gst_collect_pads2_stop:
923  * @pads: the collectspads to use
924  *
925  * Stops the processing of data in the collect_pads2. this function
926  * will also unblock any blocking operations.
927  *
928  * MT safe.
929  *
930  * Since: 0.10.36
931  */
932 void
933 gst_collect_pads2_stop (GstCollectPads2 * pads)
934 {
935   GSList *collected;
936
937   g_return_if_fail (pads != NULL);
938   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
939
940   GST_DEBUG_OBJECT (pads, "stopping collect pads");
941
942   /* make sure collect and start cannot be called anymore */
943   GST_COLLECT_PADS2_STREAM_LOCK (pads);
944
945   /* make pads not accept data anymore */
946   GST_OBJECT_LOCK (pads);
947   gst_collect_pads2_set_flushing_unlocked (pads, TRUE);
948
949   /* Stop collect pads */
950   pads->priv->started = FALSE;
951   pads->priv->eospads = 0;
952   pads->priv->queuedpads = 0;
953
954   /* loop over the master pad list and flush buffers */
955   collected = pads->priv->pad_list;
956   for (; collected; collected = g_slist_next (collected)) {
957     GstCollectData2 *data;
958     GstBuffer **buffer_p;
959
960     data = collected->data;
961     if (data->buffer) {
962       buffer_p = &data->buffer;
963       gst_buffer_replace (buffer_p, NULL);
964       data->pos = 0;
965     }
966     GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_EOS);
967   }
968
969   if (pads->priv->earliest_data)
970     unref_data (pads->priv->earliest_data);
971   pads->priv->earliest_data = NULL;
972   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
973
974   GST_OBJECT_UNLOCK (pads);
975   /* Wake them up so they can end the chain functions. */
976   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
977
978   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
979 }
980
981 /**
982  * gst_collect_pads2_peek:
983  * @pads: the collectspads to peek
984  * @data: the data to use
985  *
986  * Peek at the buffer currently queued in @data. This function
987  * should be called with the @pads STREAM_LOCK held, such as in the callback
988  * handler.
989  *
990  * MT safe.
991  *
992  * Returns: The buffer in @data or NULL if no buffer is queued.
993  *  should unref the buffer after usage.
994  *
995  * Since: 0.10.36
996  */
997 GstBuffer *
998 gst_collect_pads2_peek (GstCollectPads2 * pads, GstCollectData2 * data)
999 {
1000   GstBuffer *result;
1001
1002   g_return_val_if_fail (pads != NULL, NULL);
1003   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
1004   g_return_val_if_fail (data != NULL, NULL);
1005
1006   if ((result = data->buffer))
1007     gst_buffer_ref (result);
1008
1009   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
1010       GST_DEBUG_PAD_NAME (data->pad), result);
1011
1012   return result;
1013 }
1014
1015 /**
1016  * gst_collect_pads2_pop:
1017  * @pads: the collectspads to pop
1018  * @data: the data to use
1019  *
1020  * Pop the buffer currently queued in @data. This function
1021  * should be called with the @pads STREAM_LOCK held, such as in the callback
1022  * handler.
1023  *
1024  * MT safe.
1025  *
1026  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
1027  *   queued. You should unref the buffer after usage.
1028  *
1029  * Since: 0.10.36
1030  */
1031 GstBuffer *
1032 gst_collect_pads2_pop (GstCollectPads2 * pads, GstCollectData2 * data)
1033 {
1034   GstBuffer *result;
1035
1036   g_return_val_if_fail (pads != NULL, NULL);
1037   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
1038   g_return_val_if_fail (data != NULL, NULL);
1039
1040   if ((result = data->buffer)) {
1041     data->buffer = NULL;
1042     data->pos = 0;
1043     /* one less pad with queued data now */
1044     if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING))
1045       pads->priv->queuedpads--;
1046   }
1047
1048   GST_COLLECT_PADS2_EVT_BROADCAST (pads);
1049
1050   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
1051       GST_DEBUG_PAD_NAME (data->pad), result);
1052
1053   return result;
1054 }
1055
1056 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
1057  * held */
1058 static void
1059 gst_collect_pads2_clear (GstCollectPads2 * pads, GstCollectData2 * data)
1060 {
1061   GstBuffer *buf;
1062
1063   if ((buf = gst_collect_pads2_pop (pads, data)))
1064     gst_buffer_unref (buf);
1065 }
1066
1067 /**
1068  * gst_collect_pads2_available:
1069  * @pads: the collectspads to query
1070  *
1071  * Query how much bytes can be read from each queued buffer. This means
1072  * that the result of this call is the maximum number of bytes that can
1073  * be read from each of the pads.
1074  *
1075  * This function should be called with @pads STREAM_LOCK held, such as
1076  * in the callback.
1077  *
1078  * MT safe.
1079  *
1080  * Returns: The maximum number of bytes queued on all pads. This function
1081  * returns 0 if a pad has no queued buffer.
1082  *
1083  * Since: 0.10.36
1084  */
1085 /* we might pre-calculate this in some struct field,
1086  * but would then have to maintain this in _chain and particularly _pop, etc,
1087  * even if element is never interested in this information */
1088 guint
1089 gst_collect_pads2_available (GstCollectPads2 * pads)
1090 {
1091   GSList *collected;
1092   guint result = G_MAXUINT;
1093
1094   g_return_val_if_fail (pads != NULL, 0);
1095   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
1096
1097   collected = pads->data;
1098   for (; collected; collected = g_slist_next (collected)) {
1099     GstCollectData2 *pdata;
1100     GstBuffer *buffer;
1101     gint size;
1102
1103     pdata = (GstCollectData2 *) collected->data;
1104
1105     /* ignore pad with EOS */
1106     if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (pdata,
1107                 GST_COLLECT_PADS2_STATE_EOS))) {
1108       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1109       continue;
1110     }
1111
1112     /* an empty buffer without EOS is weird when we get here.. */
1113     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1114       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1115       goto not_filled;
1116     }
1117
1118     /* this is the size left of the buffer */
1119     size = GST_BUFFER_SIZE (buffer) - pdata->pos;
1120     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1121
1122     /* need to return the min of all available data */
1123     if (size < result)
1124       result = size;
1125   }
1126   /* nothing changed, all must be EOS then, return 0 */
1127   if (G_UNLIKELY (result == G_MAXUINT))
1128     result = 0;
1129
1130   return result;
1131
1132 not_filled:
1133   {
1134     return 0;
1135   }
1136 }
1137
1138 /**
1139  * gst_collect_pads2_read:
1140  * @pads: the collectspads to query
1141  * @data: the data to use
1142  * @bytes: (out) (transfer none) (array length=size): a pointer to a byte array
1143  * @size: the number of bytes to read
1144  *
1145  * Get a pointer in @bytes where @size bytes can be read from the
1146  * given pad data.
1147  *
1148  * This function should be called with @pads STREAM_LOCK held, such as
1149  * in the callback.
1150  *
1151  * MT safe.
1152  *
1153  * Returns: The number of bytes available for consumption in the
1154  * memory pointed to by @bytes. This can be less than @size and
1155  * is 0 if the pad is end-of-stream.
1156  *
1157  * Since: 0.10.36
1158  */
1159 guint
1160 gst_collect_pads2_read (GstCollectPads2 * pads, GstCollectData2 * data,
1161     guint8 ** bytes, guint size)
1162 {
1163   guint readsize;
1164   GstBuffer *buffer;
1165
1166   g_return_val_if_fail (pads != NULL, 0);
1167   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
1168   g_return_val_if_fail (data != NULL, 0);
1169   g_return_val_if_fail (bytes != NULL, 0);
1170
1171   /* no buffer, must be EOS */
1172   if ((buffer = data->buffer) == NULL)
1173     return 0;
1174
1175   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1176
1177   *bytes = GST_BUFFER_DATA (buffer) + data->pos;
1178
1179   return readsize;
1180 }
1181
1182 /**
1183  * gst_collect_pads2_flush:
1184  * @pads: the collectspads to query
1185  * @data: the data to use
1186  * @size: the number of bytes to flush
1187  *
1188  * Flush @size bytes from the pad @data.
1189  *
1190  * This function should be called with @pads STREAM_LOCK held, such as
1191  * in the callback.
1192  *
1193  * MT safe.
1194  *
1195  * Returns: The number of bytes flushed This can be less than @size and
1196  * is 0 if the pad was end-of-stream.
1197  *
1198  * Since: 0.10.36
1199  */
1200 guint
1201 gst_collect_pads2_flush (GstCollectPads2 * pads, GstCollectData2 * data,
1202     guint size)
1203 {
1204   guint flushsize;
1205   GstBuffer *buffer;
1206
1207   g_return_val_if_fail (pads != NULL, 0);
1208   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), 0);
1209   g_return_val_if_fail (data != NULL, 0);
1210
1211   /* no buffer, must be EOS */
1212   if ((buffer = data->buffer) == NULL)
1213     return 0;
1214
1215   /* this is what we can flush at max */
1216   flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1217
1218   data->pos += size;
1219
1220   if (data->pos >= GST_BUFFER_SIZE (buffer))
1221     /* _clear will also reset data->pos to 0 */
1222     gst_collect_pads2_clear (pads, data);
1223
1224   return flushsize;
1225 }
1226
1227 /**
1228  * gst_collect_pads2_read_buffer:
1229  * @pads: the collectspads to query
1230  * @data: the data to use
1231  * @size: the number of bytes to read
1232  *
1233  * Get a subbuffer of @size bytes from the given pad @data.
1234  *
1235  * This function should be called with @pads STREAM_LOCK held, such as in the
1236  * callback.
1237  *
1238  * MT safe.
1239  *
1240  * Since: 0.10.36
1241  *
1242  * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
1243  * A return of NULL signals that the pad is end-of-stream.
1244  * Unref the buffer after use.
1245  */
1246 GstBuffer *
1247 gst_collect_pads2_read_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
1248     guint size)
1249 {
1250   guint readsize;
1251   GstBuffer *buffer;
1252
1253   g_return_val_if_fail (pads != NULL, NULL);
1254   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), NULL);
1255   g_return_val_if_fail (data != NULL, NULL);
1256
1257   /* no buffer, must be EOS */
1258   if ((buffer = data->buffer) == NULL)
1259     return NULL;
1260
1261   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1262
1263   return gst_buffer_create_sub (buffer, data->pos, readsize);
1264 }
1265
1266 /**
1267  * gst_collect_pads2_take_buffer:
1268  * @pads: the collectspads to query
1269  * @data: the data to use
1270  * @size: the number of bytes to read
1271  *
1272  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1273  * of read bytes.
1274  *
1275  * This function should be called with @pads STREAM_LOCK held, such as in the
1276  * callback.
1277  *
1278  * MT safe.
1279  *
1280  * Since: 0.10.36
1281  *
1282  * Returns: A sub buffer. The size of the buffer can be less that requested.
1283  * A return of NULL signals that the pad is end-of-stream.
1284  * Unref the buffer after use.
1285  */
1286 GstBuffer *
1287 gst_collect_pads2_take_buffer (GstCollectPads2 * pads, GstCollectData2 * data,
1288     guint size)
1289 {
1290   GstBuffer *buffer = gst_collect_pads2_read_buffer (pads, data, size);
1291
1292   if (buffer) {
1293     gst_collect_pads2_flush (pads, data, GST_BUFFER_SIZE (buffer));
1294   }
1295   return buffer;
1296 }
1297
1298 /**
1299  * gst_collect_pads2_set_waiting:
1300  * @pads: the collectspads
1301  * @data: the data to use
1302  * @waiting: boolean indicating whether this pad should operate
1303  *           in waiting or non-waiting mode
1304  *
1305  * Sets a pad to waiting or non-waiting mode, if at least this pad
1306  * has not been created with locked waiting state,
1307  * in which case nothing happens.
1308  *
1309  * This function should be called with @pads STREAM_LOCK held, such as
1310  * in the callback.
1311  *
1312  * MT safe.
1313  *
1314  * Since: 0.10.36
1315  */
1316 void
1317 gst_collect_pads2_set_waiting (GstCollectPads2 * pads, GstCollectData2 * data,
1318     gboolean waiting)
1319 {
1320   g_return_if_fail (pads != NULL);
1321   g_return_if_fail (GST_IS_COLLECT_PADS2 (pads));
1322   g_return_if_fail (data != NULL);
1323
1324   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1325       GST_PAD_NAME (data->pad), waiting,
1326       GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_LOCKED));
1327
1328   /* Do something only on a change and if not locked */
1329   if (!GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_LOCKED) &&
1330       (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING) !=
1331           !!waiting)) {
1332     /* Set waiting state for this pad */
1333     if (waiting)
1334       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_WAITING);
1335     else
1336       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_WAITING);
1337     /* Update number of queued pads if needed */
1338     if (!data->buffer &&
1339         !GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_EOS)) {
1340       if (waiting)
1341         pads->priv->queuedpads--;
1342       else
1343         pads->priv->queuedpads++;
1344     }
1345
1346     /* signal waiters because something changed */
1347     GST_COLLECT_PADS2_EVT_BROADCAST (pads);
1348   }
1349 }
1350
1351 /* see if pads were added or removed and update our stats. Any pad
1352  * added after releasing the LOCK will get collected in the next
1353  * round.
1354  *
1355  * We can do a quick check by checking the cookies, that get changed
1356  * whenever the pad list is updated.
1357  *
1358  * Must be called with STREAM_LOCK.
1359  */
1360 static void
1361 gst_collect_pads2_check_pads (GstCollectPads2 * pads)
1362 {
1363   /* the master list and cookie are protected with LOCK */
1364   GST_OBJECT_LOCK (pads);
1365   if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1366     GSList *collected;
1367
1368     /* clear list and stats */
1369     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1370     g_slist_free (pads->data);
1371     pads->data = NULL;
1372     pads->priv->numpads = 0;
1373     pads->priv->queuedpads = 0;
1374     pads->priv->eospads = 0;
1375     if (pads->priv->earliest_data)
1376       unref_data (pads->priv->earliest_data);
1377     pads->priv->earliest_data = NULL;
1378     pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1379
1380     /* loop over the master pad list */
1381     collected = pads->priv->pad_list;
1382     for (; collected; collected = g_slist_next (collected)) {
1383       GstCollectData2 *data;
1384
1385       /* update the stats */
1386       pads->priv->numpads++;
1387       data = collected->data;
1388       if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_EOS))
1389         pads->priv->eospads++;
1390       else if (data->buffer || !GST_COLLECT_PADS2_STATE_IS_SET (data,
1391               GST_COLLECT_PADS2_STATE_WAITING))
1392         pads->priv->queuedpads++;
1393
1394       /* add to the list of pads to collect */
1395       ref_data (data);
1396       /* preserve order of adding/requesting pads */
1397       pads->data = g_slist_append (pads->data, data);
1398     }
1399     /* and update the cookie */
1400     pads->priv->cookie = pads->priv->pad_cookie;
1401   }
1402   GST_OBJECT_UNLOCK (pads);
1403 }
1404
1405 /* checks if all the pads are collected and call the collectfunction
1406  *
1407  * Should be called with STREAM_LOCK.
1408  *
1409  * Returns: The #GstFlowReturn of collection.
1410  */
1411 static GstFlowReturn
1412 gst_collect_pads2_check_collected (GstCollectPads2 * pads)
1413 {
1414   GstFlowReturn flow_ret = GST_FLOW_OK;
1415   GstCollectPads2Function func;
1416   gpointer user_data;
1417
1418   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
1419
1420   GST_OBJECT_LOCK (pads);
1421   func = pads->priv->func;
1422   user_data = pads->priv->user_data;
1423   GST_OBJECT_UNLOCK (pads);
1424
1425   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1426
1427   /* check for new pads, update stats etc.. */
1428   gst_collect_pads2_check_pads (pads);
1429
1430   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1431     /* If all our pads are EOS just collect once to let the element
1432      * do its final EOS handling. */
1433     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1434         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1435
1436     flow_ret = func (pads, user_data);
1437   } else {
1438     gboolean collected = FALSE;
1439
1440     /* We call the collected function as long as our condition matches. */
1441     while (((pads->priv->queuedpads + pads->priv->eospads) >=
1442             pads->priv->numpads)) {
1443       GST_DEBUG_OBJECT (pads,
1444           "All active pads (%d + %d >= %d) have data, " "calling %s",
1445           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1446           GST_DEBUG_FUNCPTR_NAME (func));
1447
1448       flow_ret = func (pads, user_data);
1449       collected = TRUE;
1450
1451       /* break on error */
1452       if (flow_ret != GST_FLOW_OK)
1453         break;
1454       /* Don't keep looping after telling the element EOS or flushing */
1455       if (pads->priv->queuedpads == 0)
1456         break;
1457     }
1458     if (!collected)
1459       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1460           pads->priv->numpads);
1461   }
1462   return flow_ret;
1463 }
1464
1465
1466 /* General overview:
1467  * - only pad with a buffer can determine earliest_data (and earliest_time)
1468  * - only segment info determines (non-)waiting state
1469  * - ? perhaps use _stream_time for comparison
1470  *   (which muxers might have use as well ?)
1471  */
1472
1473 /*
1474  * Function to recalculate the waiting state of all pads.
1475  *
1476  * Must be called with STREAM_LOCK.
1477  *
1478  * Returns TRUE if a pad was set to waiting
1479  * (from non-waiting state).
1480  */
1481 static gboolean
1482 gst_collect_pads2_recalculate_waiting (GstCollectPads2 * pads)
1483 {
1484   GSList *collected;
1485   gboolean result = FALSE;
1486
1487   /* If earliest time is not known, there is nothing to do. */
1488   if (pads->priv->earliest_data == NULL)
1489     return FALSE;
1490
1491   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1492     GstCollectData2 *data = (GstCollectData2 *) collected->data;
1493     int cmp_res;
1494
1495     /* check if pad has a segment */
1496     if (data->segment.format == GST_FORMAT_UNDEFINED) {
1497       GST_WARNING_OBJECT (pads,
1498           "GstCollectPads2 has no time segment, assuming 0 based.");
1499       gst_segment_init (&data->segment, GST_FORMAT_TIME);
1500       gst_segment_set_newsegment (&data->segment, FALSE, 1.0f,
1501           GST_FORMAT_TIME, 0, -1, 0);
1502       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
1503     }
1504
1505     /* check segment format */
1506     if (data->segment.format != GST_FORMAT_TIME) {
1507       GST_ERROR_OBJECT (pads, "GstCollectPads2 can handle only time segments.");
1508       continue;
1509     }
1510
1511     /* check if the waiting state should be changed */
1512     cmp_res = pads->priv->compare_func (pads, data, data->segment.start,
1513         pads->priv->earliest_data, pads->priv->earliest_time,
1514         pads->priv->compare_user_data);
1515     if (cmp_res > 0)
1516       /* stop waiting */
1517       gst_collect_pads2_set_waiting (pads, data, FALSE);
1518     else {
1519       if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1520               GST_COLLECT_PADS2_STATE_WAITING)) {
1521         /* start waiting */
1522         gst_collect_pads2_set_waiting (pads, data, TRUE);
1523         result = TRUE;
1524       }
1525     }
1526   }
1527
1528   return result;
1529 }
1530
1531 /**
1532  * gst_collect_pads2_find_best_pad:
1533  * @pads: the collectpads to use
1534  * @data: returns the collectdata for earliest data
1535  * @time: returns the earliest available buffertime
1536  *
1537  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1538  * and return the corresponding #GstCollectData2 and buffertime.
1539  *
1540  * This function should be called with STREAM_LOCK held,
1541  * such as in the callback.
1542  *
1543  * Since: 0.10.36
1544  */
1545 static void
1546 gst_collect_pads2_find_best_pad (GstCollectPads2 * pads,
1547     GstCollectData2 ** data, GstClockTime * time)
1548 {
1549   GSList *collected;
1550   GstCollectData2 *best = NULL;
1551   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1552
1553   g_return_if_fail (data != NULL);
1554   g_return_if_fail (time != NULL);
1555
1556   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1557     GstBuffer *buffer;
1558     GstCollectData2 *data = (GstCollectData2 *) collected->data;
1559     GstClockTime timestamp;
1560
1561     buffer = gst_collect_pads2_peek (pads, data);
1562     /* if we have a buffer check if it is better then the current best one */
1563     if (buffer != NULL) {
1564       timestamp = GST_BUFFER_TIMESTAMP (buffer);
1565       gst_buffer_unref (buffer);
1566       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1567               best, best_time, pads->priv->compare_user_data) < 0) {
1568         best = data;
1569         best_time = timestamp;
1570       }
1571     }
1572   }
1573
1574   /* set earliest time */
1575   *data = best;
1576   *time = best_time;
1577
1578   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1579       best ? GST_PAD_NAME (((GstCollectData2 *) best)->pad) : "(nil)",
1580       GST_TIME_ARGS (best_time));
1581 }
1582
1583 /*
1584  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1585  * gst_collect_pads2_recalculate_waiting
1586  *
1587  * Must be called with STREAM_LOCK.
1588  */
1589 static gboolean
1590 gst_collect_pads2_recalculate_full (GstCollectPads2 * pads)
1591 {
1592   if (pads->priv->earliest_data)
1593     unref_data (pads->priv->earliest_data);
1594   gst_collect_pads2_find_best_pad (pads, &pads->priv->earliest_data,
1595       &pads->priv->earliest_time);
1596   if (pads->priv->earliest_data)
1597     ref_data (pads->priv->earliest_data);
1598   return gst_collect_pads2_recalculate_waiting (pads);
1599 }
1600
1601 /*
1602  * Default collect callback triggered when #GstCollectPads2 gathered all data.
1603  *
1604  * Called with STREAM_LOCK.
1605  */
1606 static GstFlowReturn
1607 gst_collect_pads2_default_collected (GstCollectPads2 * pads, gpointer user_data)
1608 {
1609   GstCollectData2 *best = NULL;
1610   GstBuffer *buffer;
1611   GstFlowReturn ret = GST_FLOW_OK;
1612   GstCollectPads2BufferFunction func;
1613   gpointer buffer_user_data;
1614
1615   g_return_val_if_fail (GST_IS_COLLECT_PADS2 (pads), GST_FLOW_ERROR);
1616
1617   GST_OBJECT_LOCK (pads);
1618   func = pads->priv->buffer_func;
1619   buffer_user_data = pads->priv->buffer_user_data;
1620   GST_OBJECT_UNLOCK (pads);
1621
1622   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1623
1624   /* Find the oldest pad at all cost */
1625   if (gst_collect_pads2_recalculate_full (pads)) {
1626     /* waiting was switched on,
1627      * so give another thread a chance to deliver a possibly
1628      * older buffer; don't charge on yet with the current oldest */
1629     ret = GST_FLOW_OK;
1630     goto done;
1631   }
1632
1633   best = pads->priv->earliest_data;
1634
1635   /* No data collected means EOS. */
1636   if (G_UNLIKELY (best == NULL)) {
1637     ret = func (pads, best, NULL, buffer_user_data);
1638     if (ret == GST_FLOW_OK)
1639       ret = GST_FLOW_UNEXPECTED;
1640     goto done;
1641   }
1642
1643   /* make sure that the pad we take a buffer from is waiting;
1644    * otherwise popping a buffer will seem not to have happened
1645    * and collectpads can get into a busy loop */
1646   gst_collect_pads2_set_waiting (pads, best, TRUE);
1647
1648   /* Send buffer */
1649   buffer = gst_collect_pads2_pop (pads, best);
1650   ret = func (pads, best, buffer, buffer_user_data);
1651
1652   /* maybe non-waiting was forced to waiting above due to
1653    * newsegment events coming too sparsely,
1654    * so re-check to restore state to avoid hanging/waiting */
1655   gst_collect_pads2_recalculate_full (pads);
1656
1657 done:
1658   return ret;
1659 }
1660
1661 /*
1662  * Default timestamp compare function.
1663  */
1664 static gint
1665 gst_collect_pads2_default_compare_func (GstCollectPads2 * pads,
1666     GstCollectData2 * data1, GstClockTime timestamp1,
1667     GstCollectData2 * data2, GstClockTime timestamp2, gpointer user_data)
1668 {
1669
1670   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1671       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1672       GST_TIME_ARGS (timestamp2));
1673   /* non-valid timestamps go first as they are probably headers or so */
1674   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1675     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1676
1677   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1678     return 1;
1679
1680   /* compare timestamp */
1681   if (timestamp1 < timestamp2)
1682     return -1;
1683
1684   if (timestamp1 > timestamp2)
1685     return 1;
1686
1687   return 0;
1688 }
1689
1690 static gboolean
1691 gst_collect_pads2_event (GstPad * pad, GstEvent * event)
1692 {
1693   gboolean res = FALSE, need_unlock = FALSE;
1694   GstCollectData2 *data;
1695   GstCollectPads2 *pads;
1696   GstCollectPads2EventFunction event_func;
1697   GstCollectPads2BufferFunction buffer_func;
1698   gpointer event_user_data;
1699
1700   /* some magic to get the managing collect_pads2 */
1701   GST_OBJECT_LOCK (pad);
1702   data = (GstCollectData2 *) gst_pad_get_element_private (pad);
1703   if (G_UNLIKELY (data == NULL))
1704     goto pad_removed;
1705   ref_data (data);
1706   GST_OBJECT_UNLOCK (pad);
1707
1708   res = FALSE;
1709
1710   pads = data->collect;
1711
1712   GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad from %s",
1713       GST_EVENT_TYPE_NAME (event), GST_OBJECT_NAME (GST_EVENT_SRC (event)));
1714
1715   GST_OBJECT_LOCK (pads);
1716   event_func = pads->priv->event_func;
1717   event_user_data = pads->priv->event_user_data;
1718   buffer_func = pads->priv->buffer_func;
1719   GST_OBJECT_UNLOCK (pads);
1720
1721   switch (GST_EVENT_TYPE (event)) {
1722     case GST_EVENT_FLUSH_START:
1723     {
1724       /* forward event to unblock check_collected */
1725       if (event_func)
1726         res = event_func (pads, data, event, event_user_data);
1727       if (!res) {
1728         GST_DEBUG_OBJECT (pad, "forwarding flush start");
1729         res = gst_pad_event_default (pad, event);
1730       }
1731
1732       /* now unblock the chain function.
1733        * no cond per pad, so they all unblock, 
1734        * non-flushing block again */
1735       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1736       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_FLUSHING);
1737       gst_collect_pads2_clear (pads, data);
1738
1739       /* cater for possible default muxing functionality */
1740       if (buffer_func) {
1741         /* restore to initial state */
1742         gst_collect_pads2_set_waiting (pads, data, TRUE);
1743         /* if the current pad is affected, reset state, recalculate later */
1744         if (pads->priv->earliest_data == data) {
1745           unref_data (data);
1746           pads->priv->earliest_data = NULL;
1747           pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1748         }
1749       }
1750
1751       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1752
1753       /* event already cleaned up by forwarding */
1754       res = TRUE;
1755       goto done;
1756     }
1757     case GST_EVENT_FLUSH_STOP:
1758     {
1759       /* flush the 1 buffer queue */
1760       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1761       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_FLUSHING);
1762       gst_collect_pads2_clear (pads, data);
1763       /* we need new segment info after the flush */
1764       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1765       GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
1766       /* if the pad was EOS, remove the EOS flag and
1767        * decrement the number of eospads */
1768       if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1769                   GST_COLLECT_PADS2_STATE_EOS))) {
1770         if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1771                 GST_COLLECT_PADS2_STATE_WAITING))
1772           pads->priv->queuedpads++;
1773         pads->priv->eospads--;
1774         GST_COLLECT_PADS2_STATE_UNSET (data, GST_COLLECT_PADS2_STATE_EOS);
1775       }
1776       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1777
1778       /* forward event */
1779       goto forward_or_default;
1780     }
1781     case GST_EVENT_EOS:
1782     {
1783       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1784       /* if the pad was not EOS, make it EOS and so we
1785        * have one more eospad */
1786       if (G_LIKELY (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1787                   GST_COLLECT_PADS2_STATE_EOS))) {
1788         GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_EOS);
1789         if (!GST_COLLECT_PADS2_STATE_IS_SET (data,
1790                 GST_COLLECT_PADS2_STATE_WAITING))
1791           pads->priv->queuedpads--;
1792         pads->priv->eospads++;
1793       }
1794       /* check if we need collecting anything, we ignore the result. */
1795       gst_collect_pads2_check_collected (pads);
1796       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1797
1798       goto forward_or_eat;
1799     }
1800     case GST_EVENT_NEWSEGMENT:
1801     {
1802       gint64 start, stop, time;
1803       gdouble rate, arate;
1804       GstFormat format;
1805       gboolean update;
1806       gint cmp_res;
1807
1808       GST_COLLECT_PADS2_STREAM_LOCK (pads);
1809
1810       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1811           &start, &stop, &time);
1812
1813       GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
1814           ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
1815           GST_TIME_ARGS (stop));
1816
1817       /* default collection can not handle other segment formats than time */
1818       if (buffer_func && format != GST_FORMAT_TIME) {
1819         GST_WARNING_OBJECT (pads, "GstCollectPads2 default collecting "
1820             "can only handle time segments. Non time segment ignored.");
1821         goto newsegment_done;
1822       }
1823
1824       /* accept segment */
1825       gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
1826           format, start, stop, time);
1827
1828       GST_COLLECT_PADS2_STATE_SET (data, GST_COLLECT_PADS2_STATE_NEW_SEGMENT);
1829
1830       /* default muxing functionality */
1831       if (!buffer_func)
1832         goto newsegment_done;
1833
1834       /* If oldest time is not known, or current pad got newsegment;
1835        * recalculate the state */
1836       if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1837         gst_collect_pads2_recalculate_full (pads);
1838         goto newsegment_done;
1839       }
1840
1841       /* Check if the waiting state of the pad should change. */
1842       cmp_res =
1843           pads->priv->compare_func (pads, data, start,
1844           pads->priv->earliest_data, pads->priv->earliest_time,
1845           pads->priv->compare_user_data);
1846
1847       if (cmp_res > 0)
1848         /* Stop waiting */
1849         gst_collect_pads2_set_waiting (pads, data, FALSE);
1850
1851     newsegment_done:
1852       GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1853       /* we must not forward this event since multiple segments will be
1854        * accumulated and this is certainly not what we want. */
1855       goto forward_or_eat;
1856     }
1857     default:
1858       /* forward other events */
1859       goto forward_or_default;
1860   }
1861
1862 forward_or_default:
1863   if (GST_EVENT_IS_SERIALIZED (event)) {
1864     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1865     need_unlock = TRUE;
1866   }
1867   if (event_func)
1868     res = event_func (pads, data, event, event_user_data);
1869   if (!res) {
1870     GST_DEBUG_OBJECT (pad, "forwarding %s", GST_EVENT_TYPE_NAME (event));
1871     res = gst_pad_event_default (pad, event);
1872   }
1873   if (need_unlock)
1874     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1875   goto done;
1876
1877 forward_or_eat:
1878   if (GST_EVENT_IS_SERIALIZED (event)) {
1879     GST_COLLECT_PADS2_STREAM_LOCK (pads);
1880     need_unlock = TRUE;
1881   }
1882   if (event_func)
1883     res = event_func (pads, data, event, event_user_data);
1884   if (!res) {
1885     gst_event_unref (event);
1886     res = TRUE;
1887   }
1888   if (need_unlock)
1889     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
1890   goto done;
1891
1892 done:
1893   unref_data (data);
1894   return res;
1895
1896   /* ERRORS */
1897 pad_removed:
1898   {
1899     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1900     GST_OBJECT_UNLOCK (pad);
1901     return FALSE;
1902   }
1903 }
1904
1905 /* For each buffer we receive we check if our collected condition is reached
1906  * and if so we call the collected function. When this is done we check if
1907  * data has been unqueued. If data is still queued we wait holding the stream
1908  * lock to make sure no EOS event can happen while we are ready to be
1909  * collected 
1910  */
1911 static GstFlowReturn
1912 gst_collect_pads2_chain (GstPad * pad, GstBuffer * buffer)
1913 {
1914   GstCollectData2 *data;
1915   GstCollectPads2 *pads;
1916   GstFlowReturn ret;
1917   GstBuffer **buffer_p;
1918   guint32 cookie;
1919
1920   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1921
1922   /* some magic to get the managing collect_pads2 */
1923   GST_OBJECT_LOCK (pad);
1924   data = (GstCollectData2 *) gst_pad_get_element_private (pad);
1925   if (G_UNLIKELY (data == NULL))
1926     goto no_data;
1927   ref_data (data);
1928   GST_OBJECT_UNLOCK (pad);
1929
1930   pads = data->collect;
1931
1932   GST_COLLECT_PADS2_STREAM_LOCK (pads);
1933   /* if not started, bail out */
1934   if (G_UNLIKELY (!pads->priv->started))
1935     goto not_started;
1936   /* check if this pad is flushing */
1937   if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1938               GST_COLLECT_PADS2_STATE_FLUSHING)))
1939     goto flushing;
1940   /* pad was EOS, we can refuse this data */
1941   if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
1942               GST_COLLECT_PADS2_STATE_EOS)))
1943     goto unexpected;
1944
1945   /* see if we need to clip */
1946   if (pads->priv->clip_func) {
1947     GstBuffer *outbuf = NULL;
1948     ret =
1949         pads->priv->clip_func (pads, data, buffer, &outbuf,
1950         pads->priv->clip_user_data);
1951     buffer = outbuf;
1952
1953     if (G_UNLIKELY (outbuf == NULL))
1954       goto clipped;
1955
1956     if (G_UNLIKELY (ret == GST_FLOW_UNEXPECTED))
1957       goto unexpected;
1958     else if (G_UNLIKELY (ret != GST_FLOW_OK))
1959       goto error;
1960   }
1961
1962   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
1963       GST_DEBUG_PAD_NAME (pad));
1964
1965   /* One more pad has data queued */
1966   if (GST_COLLECT_PADS2_STATE_IS_SET (data, GST_COLLECT_PADS2_STATE_WAITING))
1967     pads->priv->queuedpads++;
1968   buffer_p = &data->buffer;
1969   gst_buffer_replace (buffer_p, buffer);
1970
1971   /* update segment last position if in TIME */
1972   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1973     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
1974
1975     if (GST_CLOCK_TIME_IS_VALID (timestamp))
1976       gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
1977   }
1978
1979   /* While we have data queued on this pad try to collect stuff */
1980   do {
1981     /* Check if our collected condition is matched and call the collected
1982      * function if it is */
1983     ret = gst_collect_pads2_check_collected (pads);
1984     /* when an error occurs, we want to report this back to the caller ASAP
1985      * without having to block if the buffer was not popped */
1986     if (G_UNLIKELY (ret != GST_FLOW_OK))
1987       goto error;
1988
1989     /* data was consumed, we can exit and accept new data */
1990     if (data->buffer == NULL)
1991       break;
1992
1993     /* Having the _INIT here means we don't care about any broadcast up to here
1994      * (most of which occur with STREAM_LOCK held, so could not have happened
1995      * anyway).  We do care about e.g. a remove initiated broadcast as of this
1996      * point.  Putting it here also makes this thread ignores any evt it raised
1997      * itself (as is a usual WAIT semantic).
1998      */
1999     GST_COLLECT_PADS2_EVT_INIT (cookie);
2000
2001     /* pad could be removed and re-added */
2002     unref_data (data);
2003     GST_OBJECT_LOCK (pad);
2004     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2005       goto pad_removed;
2006     ref_data (data);
2007     GST_OBJECT_UNLOCK (pad);
2008
2009     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2010         GST_DEBUG_PAD_NAME (pad));
2011
2012     /* wait to be collected, this must happen from another thread triggered
2013      * by the _chain function of another pad. We release the lock so we
2014      * can get stopped or flushed as well. We can however not get EOS
2015      * because we still hold the STREAM_LOCK.
2016      */
2017     GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
2018     GST_COLLECT_PADS2_EVT_WAIT (pads, cookie);
2019     GST_COLLECT_PADS2_STREAM_LOCK (pads);
2020
2021     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2022
2023     /* after a signal, we could be stopped */
2024     if (G_UNLIKELY (!pads->priv->started))
2025       goto not_started;
2026     /* check if this pad is flushing */
2027     if (G_UNLIKELY (GST_COLLECT_PADS2_STATE_IS_SET (data,
2028                 GST_COLLECT_PADS2_STATE_FLUSHING)))
2029       goto flushing;
2030   }
2031   while (data->buffer != NULL);
2032
2033 unlock_done:
2034   GST_COLLECT_PADS2_STREAM_UNLOCK (pads);
2035   unref_data (data);
2036   if (buffer)
2037     gst_buffer_unref (buffer);
2038   return ret;
2039
2040 pad_removed:
2041   {
2042     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2043     GST_OBJECT_UNLOCK (pad);
2044     ret = GST_FLOW_NOT_LINKED;
2045     goto unlock_done;
2046   }
2047   /* ERRORS */
2048 no_data:
2049   {
2050     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2051     GST_OBJECT_UNLOCK (pad);
2052     gst_buffer_unref (buffer);
2053     return GST_FLOW_NOT_LINKED;
2054   }
2055 not_started:
2056   {
2057     GST_DEBUG ("not started");
2058     gst_collect_pads2_clear (pads, data);
2059     ret = GST_FLOW_WRONG_STATE;
2060     goto unlock_done;
2061   }
2062 flushing:
2063   {
2064     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2065     gst_collect_pads2_clear (pads, data);
2066     ret = GST_FLOW_WRONG_STATE;
2067     goto unlock_done;
2068   }
2069 unexpected:
2070   {
2071     /* we should not post an error for this, just inform upstream that
2072      * we don't expect anything anymore */
2073     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2074     ret = GST_FLOW_UNEXPECTED;
2075     goto unlock_done;
2076   }
2077 clipped:
2078   {
2079     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2080     ret = GST_FLOW_OK;
2081     goto unlock_done;
2082   }
2083 error:
2084   {
2085     /* we print the error, the element should post a reasonable error
2086      * message for fatal errors */
2087     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2088     gst_collect_pads2_clear (pads, data);
2089     goto unlock_done;
2090   }
2091 }