collectpads: implement flushing seek support
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:gstcollectpads
25  * @short_description: manages a set of pads that operate in collect mode
26  * @see_also:
27  *
28  * Manages a set of pads that operate in collect mode. This means that control
29  * is given to the manager of this object when all pads have data.
30  * <itemizedlist>
31  *   <listitem><para>
32  *     Collectpads are created with gst_collect_pads_new(). A callback should then
33  *     be installed with gst_collect_pads_set_function ().
34  *   </para></listitem>
35  *   <listitem><para>
36  *     Pads are added to the collection with gst_collect_pads_add_pad()/
37  *     gst_collect_pads_remove_pad(). The pad
38  *     has to be a sinkpad. The chain and event functions of the pad are
39  *     overridden. The element_private of the pad is used to store
40  *     private information for the collectpads.
41  *   </para></listitem>
42  *   <listitem><para>
43  *     For each pad, data is queued in the _chain function or by
44  *     performing a pull_range.
45  *   </para></listitem>
46  *   <listitem><para>
47  *     When data is queued on all pads in waiting mode, the callback function is called.
48  *   </para></listitem>
49  *   <listitem><para>
50  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
51  *     One can peek at the data with the gst_collect_pads_peek() function.
52  *     These functions will return NULL if the pad received an EOS event. When all
53  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
54  *     event itself.
55  *   </para></listitem>
56  *   <listitem><para>
57  *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
58  *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
59  *   </para></listitem>
60  *   <listitem><para>
61  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
62  *     their state change functions to start and stop the processing of the collectpads.
63  *     The gst_collect_pads_stop() call should be called before calling the parent
64  *     element state change function in the PAUSED_TO_READY state change to ensure
65  *     no pad is blocked and the element can finish streaming.
66  *   </para></listitem>
67  *   <listitem><para>
68  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
69  *     elements that start a #GstTask to drive the collect_pads. This feature is however
70  *     not yet implemented.
71  *   </para></listitem>
72  *   <listitem><para>
73  *     gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
74  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
75  *     Thus these pads may but need not have data when the callback is called.
76  *     All pads are in waiting mode by default.
77  *   </para></listitem>
78  * </itemizedlist>
79  *
80  * Last reviewed on 2011-10-28 (0.10.36)
81  */
82
83 #ifdef HAVE_CONFIG_H
84 #  include "config.h"
85 #endif
86
87 #include <gst/gst_private.h>
88
89 #include "gstcollectpads.h"
90
91 #include "../../../gst/glib-compat-private.h"
92
93 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
94 #define GST_CAT_DEFAULT collect_pads_debug
95
96 #define parent_class gst_collect_pads_parent_class
97 G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
98
99 struct _GstCollectDataPrivate
100 {
101   /* refcounting for struct, and destroy callback */
102   GstCollectDataDestroyNotify destroy_notify;
103   gint refcount;
104 };
105
106 struct _GstCollectPadsPrivate
107 {
108   /* with LOCK and/or STREAM_LOCK */
109   gboolean started;
110
111   /* with STREAM_LOCK */
112   guint32 cookie;               /* @data list cookie */
113   guint numpads;                /* number of pads in @data */
114   guint queuedpads;             /* number of pads with a buffer */
115   guint eospads;                /* number of pads that are EOS */
116   GstClockTime earliest_time;   /* Current earliest time */
117   GstCollectData *earliest_data;        /* Pad data for current earliest time */
118
119   /* with LOCK */
120   GSList *pad_list;             /* updated pad list */
121   guint32 pad_cookie;           /* updated cookie */
122
123   GstCollectPadsFunction func;  /* function and user_data for callback */
124   gpointer user_data;
125   GstCollectPadsBufferFunction buffer_func;     /* function and user_data for buffer callback */
126   gpointer buffer_user_data;
127   GstCollectPadsCompareFunction compare_func;
128   gpointer compare_user_data;
129   GstCollectPadsEventFunction event_func;       /* function and data for event callback */
130   gpointer event_user_data;
131   GstCollectPadsQueryFunction query_func;
132   gpointer query_user_data;
133   GstCollectPadsClipFunction clip_func;
134   gpointer clip_user_data;
135   GstCollectPadsFlushFunction flush_func;
136   gpointer flush_user_data;
137
138   /* no other lock needed */
139   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
140   GCond evt_cond;
141   guint32 evt_cookie;
142
143   gboolean seeking;
144   gboolean pending_flush_start;
145   gboolean pending_flush_stop;
146 };
147
148 static void gst_collect_pads_clear (GstCollectPads * pads,
149     GstCollectData * data);
150 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
151     GstBuffer * buffer);
152 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
153     GstEvent * event);
154 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
155     GstQuery * query);
156 static void gst_collect_pads_finalize (GObject * object);
157 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
158     pads, gpointer user_data);
159 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
160     GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
161     GstClockTime timestamp2, gpointer user_data);
162 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
163 static void ref_data (GstCollectData * data);
164 static void unref_data (GstCollectData * data);
165
166 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
167     pads, GstCollectData * data, GstEvent * event, gpointer user_data);
168 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
169     pads, GstCollectData * data, GstQuery * query, gpointer user_data);
170
171
172 /* Some properties are protected by LOCK, others by STREAM_LOCK
173  * However, manipulating either of these partitions may require
174  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
175  * Alternative implementations are possible, e.g. some low-level re-implementing
176  * of the 2 above locks to drop both of them atomically when going into _WAIT.
177  */
178 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
179 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
180 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START {    \
181   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
182   /* should work unless a lot of event'ing and thread starvation */\
183   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
184     g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
185         GST_COLLECT_PADS_GET_EVT_LOCK (pads));                    \
186   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
187   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
188 } G_STMT_END
189 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
190   GTimeVal __tv; \
191   \
192   g_get_current_time (&tv); \
193   g_time_val_add (&tv, timeout); \
194   \
195   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
196   /* should work unless a lot of event'ing and thread starvation */\
197   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
198     g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
199         GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv);                    \
200   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
201   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
202 } G_STMT_END
203 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START {       \
204   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
205   /* never mind wrap-around */                                     \
206   ++(((GstCollectPads *) pads)->priv->evt_cookie);                      \
207   g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads));        \
208   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
209 } G_STMT_END
210 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START {          \
211   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
212   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
213   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
214 } G_STMT_END
215
216 static void
217 gst_collect_pads_class_init (GstCollectPadsClass * klass)
218 {
219   GObjectClass *gobject_class = (GObjectClass *) klass;
220
221   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
222
223   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
224       "GstCollectPads");
225
226   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
227 }
228
229 static void
230 gst_collect_pads_init (GstCollectPads * pads)
231 {
232   pads->priv =
233       G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS,
234       GstCollectPadsPrivate);
235
236   pads->data = NULL;
237   pads->priv->cookie = 0;
238   pads->priv->numpads = 0;
239   pads->priv->queuedpads = 0;
240   pads->priv->eospads = 0;
241   pads->priv->started = FALSE;
242
243   g_rec_mutex_init (&pads->stream_lock);
244
245   pads->priv->func = gst_collect_pads_default_collected;
246   pads->priv->user_data = NULL;
247   pads->priv->event_func = NULL;
248   pads->priv->event_user_data = NULL;
249
250   /* members for default muxing */
251   pads->priv->buffer_func = NULL;
252   pads->priv->buffer_user_data = NULL;
253   pads->priv->compare_func = gst_collect_pads_default_compare_func;
254   pads->priv->compare_user_data = NULL;
255   pads->priv->earliest_data = NULL;
256   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
257
258   pads->priv->event_func = gst_collect_pads_event_default_internal;
259   pads->priv->query_func = gst_collect_pads_query_default_internal;
260
261   /* members to manage the pad list */
262   pads->priv->pad_cookie = 0;
263   pads->priv->pad_list = NULL;
264
265   /* members for event */
266   g_mutex_init (&pads->priv->evt_lock);
267   g_cond_init (&pads->priv->evt_cond);
268   pads->priv->evt_cookie = 0;
269
270   pads->priv->seeking = FALSE;
271   pads->priv->pending_flush_start = FALSE;
272   pads->priv->pending_flush_stop = FALSE;
273 }
274
275 static void
276 gst_collect_pads_finalize (GObject * object)
277 {
278   GstCollectPads *pads = GST_COLLECT_PADS (object);
279
280   GST_DEBUG_OBJECT (object, "finalize");
281
282   g_rec_mutex_clear (&pads->stream_lock);
283
284   g_cond_clear (&pads->priv->evt_cond);
285   g_mutex_clear (&pads->priv->evt_lock);
286
287   /* Remove pads and free pads list */
288   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
289   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
290   g_slist_free (pads->data);
291   g_slist_free (pads->priv->pad_list);
292
293   G_OBJECT_CLASS (parent_class)->finalize (object);
294 }
295
296 /**
297  * gst_collect_pads_new:
298  *
299  * Create a new instance of #GstCollectPads.
300  *
301  * MT safe.
302  *
303  * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
304  */
305 GstCollectPads *
306 gst_collect_pads_new (void)
307 {
308   GstCollectPads *newcoll;
309
310   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
311
312   return newcoll;
313 }
314
315 /* Must be called with GstObject lock! */
316 static void
317 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
318     GstCollectPadsBufferFunction func, gpointer user_data)
319 {
320   pads->priv->buffer_func = func;
321   pads->priv->buffer_user_data = user_data;
322 }
323
324 /**
325  * gst_collect_pads_set_buffer_function:
326  * @pads: the collectpads to use
327  * @func: the function to set
328  * @user_data: (closure): user data passed to the function
329  *
330  * Set the callback function and user data that will be called with
331  * the oldest buffer when all pads have been collected, or NULL on EOS.
332  * If a buffer is passed, the callback owns a reference and must unref
333  * it.
334  *
335  * MT safe.
336  */
337 void
338 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
339     GstCollectPadsBufferFunction func, gpointer user_data)
340 {
341   g_return_if_fail (pads != NULL);
342   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
343
344   GST_OBJECT_LOCK (pads);
345   gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
346   GST_OBJECT_UNLOCK (pads);
347 }
348
349 /**
350  * gst_collect_pads_set_compare_function:
351  * @pads: the pads to use
352  * @func: the function to set
353  * @user_data: (closure): user data passed to the function
354  *
355  * Set the timestamp comparison function.
356  *
357  * MT safe.
358  */
359 /* NOTE allowing to change comparison seems not advisable;
360 no known use-case, and collaboration with default algorithm is unpredictable.
361 If custom compairing/operation is needed, just use a collect function of
362 your own */
363 void
364 gst_collect_pads_set_compare_function (GstCollectPads * pads,
365     GstCollectPadsCompareFunction func, gpointer user_data)
366 {
367   g_return_if_fail (pads != NULL);
368   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
369
370   GST_OBJECT_LOCK (pads);
371   pads->priv->compare_func = func;
372   pads->priv->compare_user_data = user_data;
373   GST_OBJECT_UNLOCK (pads);
374 }
375
376 /**
377  * gst_collect_pads_set_function:
378  * @pads: the collectpads to use
379  * @func: the function to set
380  * @user_data: user data passed to the function
381  *
382  * CollectPads provides a default collection algorithm that will determine
383  * the oldest buffer available on all of its pads, and then delegate
384  * to a configured callback.
385  * However, if circumstances are more complicated and/or more control
386  * is desired, this sets a callback that will be invoked instead when
387  * all the pads added to the collection have buffers queued.
388  * Evidently, this callback is not compatible with
389  * gst_collect_pads_set_buffer_function() callback.
390  * If this callback is set, the former will be unset.
391  *
392  * MT safe.
393  */
394 void
395 gst_collect_pads_set_function (GstCollectPads * pads,
396     GstCollectPadsFunction func, gpointer user_data)
397 {
398   g_return_if_fail (pads != NULL);
399   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
400
401   GST_OBJECT_LOCK (pads);
402   pads->priv->func = func;
403   pads->priv->user_data = user_data;
404   gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
405   GST_OBJECT_UNLOCK (pads);
406 }
407
408 static void
409 ref_data (GstCollectData * data)
410 {
411   g_assert (data != NULL);
412
413   g_atomic_int_inc (&(data->priv->refcount));
414 }
415
416 static void
417 unref_data (GstCollectData * data)
418 {
419   g_assert (data != NULL);
420   g_assert (data->priv->refcount > 0);
421
422   if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
423     return;
424
425   if (data->priv->destroy_notify)
426     data->priv->destroy_notify (data);
427
428   g_object_unref (data->pad);
429   if (data->buffer) {
430     gst_buffer_unref (data->buffer);
431   }
432   g_free (data->priv);
433   g_free (data);
434 }
435
436 /**
437  * gst_collect_pads_set_event_function:
438  * @pads: the collectpads to use
439  * @func: the function to set
440  * @user_data: user data passed to the function
441  *
442  * Set the event callback function and user data that will be called when
443  * collectpads has received an event originating from one of the collected
444  * pads.  If the event being processed is a serialized one, this callback is
445  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
446  * held when calling a number of CollectPads functions, it should be acquired
447  * if so (unusually) needed.
448  *
449  * MT safe.
450  */
451 void
452 gst_collect_pads_set_event_function (GstCollectPads * pads,
453     GstCollectPadsEventFunction func, gpointer user_data)
454 {
455   g_return_if_fail (pads != NULL);
456   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
457
458   GST_OBJECT_LOCK (pads);
459   pads->priv->event_func = func;
460   pads->priv->event_user_data = user_data;
461   GST_OBJECT_UNLOCK (pads);
462 }
463
464 /**
465  * gst_collect_pads_set_query_function:
466  * @pads: the collectpads to use
467  * @func: the function to set
468  * @user_data: user data passed to the function
469  *
470  * Set the query callback function and user data that will be called after
471  * collectpads has received a query originating from one of the collected
472  * pads.  If the query being processed is a serialized one, this callback is
473  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
474  * held when calling a number of CollectPads functions, it should be acquired
475  * if so (unusually) needed.
476  *
477  * MT safe.
478  */
479 void
480 gst_collect_pads_set_query_function (GstCollectPads * pads,
481     GstCollectPadsQueryFunction func, gpointer user_data)
482 {
483   g_return_if_fail (pads != NULL);
484   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
485
486   GST_OBJECT_LOCK (pads);
487   pads->priv->query_func = func;
488   pads->priv->query_user_data = user_data;
489   GST_OBJECT_UNLOCK (pads);
490 }
491
492 /**
493 * gst_collect_pads_clip_running_time:
494 * @pads: the collectpads to use
495 * @cdata: collect data of corresponding pad
496 * @buf: buffer being clipped
497 * @outbuf: (allow-none): output buffer with running time, or NULL if clipped
498 * @user_data: user data (unused)
499 *
500 * Convenience clipping function that converts incoming buffer's timestamp
501 * to running time, or clips the buffer if outside configured segment.
502 */
503 GstFlowReturn
504 gst_collect_pads_clip_running_time (GstCollectPads * pads,
505     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
506     gpointer user_data)
507 {
508   GstClockTime time;
509
510   *outbuf = buf;
511   time = GST_BUFFER_PTS (buf);
512
513   /* invalid left alone and passed */
514   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
515     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
516     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
517       GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
518       gst_buffer_unref (buf);
519       *outbuf = NULL;
520     } else {
521       GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
522           GST_TIME_FORMAT " running time",
523           GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
524       *outbuf = gst_buffer_make_writable (buf);
525       GST_BUFFER_PTS (*outbuf) = time;
526       GST_BUFFER_DTS (*outbuf) = gst_segment_to_running_time (&cdata->segment,
527           GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf));
528     }
529   }
530
531   return GST_FLOW_OK;
532 }
533
534  /**
535  * gst_collect_pads_set_clip_function:
536  * @pads: the collectpads to use
537  * @clipfunc: clip function to install
538  * @user_data: user data to pass to @clip_func
539  *
540  * Install a clipping function that is called right after a buffer is received
541  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
542  */
543 void
544 gst_collect_pads_set_clip_function (GstCollectPads * pads,
545     GstCollectPadsClipFunction clipfunc, gpointer user_data)
546 {
547   g_return_if_fail (pads != NULL);
548   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
549
550   pads->priv->clip_func = clipfunc;
551   pads->priv->clip_user_data = user_data;
552 }
553
554 void
555 gst_collect_pads_set_flush_function (GstCollectPads * pads,
556     GstCollectPadsFlushFunction func, gpointer user_data)
557 {
558   g_return_if_fail (pads != NULL);
559   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
560
561   pads->priv->flush_func = func;
562   pads->priv->flush_user_data = user_data;
563 }
564
565 /**
566  * gst_collect_pads_add_pad:
567  * @pads: the collectpads to use
568  * @pad: (transfer none): the pad to add
569  * @size: the size of the returned #GstCollectData structure
570  * @destroy_notify: function to be called before the returned #GstCollectData
571  * structure is freed
572  * @lock: whether to lock this pad in usual waiting state
573  *
574  * Add a pad to the collection of collect pads. The pad has to be
575  * a sinkpad. The refcount of the pad is incremented. Use
576  * gst_collect_pads_remove_pad() to remove the pad from the collection
577  * again.
578  *
579  * You specify a size for the returned #GstCollectData structure
580  * so that you can use it to store additional information.
581  *
582  * You can also specify a #GstCollectDataDestroyNotify that will be called
583  * just before the #GstCollectData structure is freed. It is passed the
584  * pointer to the structure and should free any custom memory and resources
585  * allocated for it.
586  *
587  * Keeping a pad locked in waiting state is only relevant when using
588  * the default collection algorithm (providing the oldest buffer).
589  * It ensures a buffer must be available on this pad for a collection
590  * to take place.  This is of typical use to a muxer element where
591  * non-subtitle streams should always be in waiting state,
592  * e.g. to assure that caps information is available on all these streams
593  * when initial headers have to be written.
594  *
595  * The pad will be automatically activated in push mode when @pads is
596  * started.
597  *
598  * MT safe.
599  *
600  * Returns: a new #GstCollectData to identify the new pad. Or NULL
601  *   if wrong parameters are supplied.
602  */
603 GstCollectData *
604 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
605     GstCollectDataDestroyNotify destroy_notify, gboolean lock)
606 {
607   GstCollectData *data;
608
609   g_return_val_if_fail (pads != NULL, NULL);
610   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
611   g_return_val_if_fail (pad != NULL, NULL);
612   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
613   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
614
615   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
616
617   data = g_malloc0 (size);
618   data->priv = g_new0 (GstCollectDataPrivate, 1);
619   data->collect = pads;
620   data->pad = gst_object_ref (pad);
621   data->buffer = NULL;
622   data->pos = 0;
623   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
624   data->state = GST_COLLECT_PADS_STATE_WAITING;
625   data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
626   data->priv->refcount = 1;
627   data->priv->destroy_notify = destroy_notify;
628
629   GST_OBJECT_LOCK (pads);
630   GST_OBJECT_LOCK (pad);
631   gst_pad_set_element_private (pad, data);
632   GST_OBJECT_UNLOCK (pad);
633   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
634   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
635   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
636   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
637   /* backward compat, also add to data if stopped, so that the element already
638    * has this in the public data list before going PAUSED (typically)
639    * this can only be done when we are stopped because we don't take the
640    * STREAM_LOCK to protect the pads->data list. */
641   if (!pads->priv->started) {
642     pads->data = g_slist_append (pads->data, data);
643     ref_data (data);
644   }
645   /* activate the pad when needed */
646   if (pads->priv->started)
647     gst_pad_set_active (pad, TRUE);
648   pads->priv->pad_cookie++;
649   GST_OBJECT_UNLOCK (pads);
650
651   return data;
652 }
653
654 static gint
655 find_pad (GstCollectData * data, GstPad * pad)
656 {
657   if (data->pad == pad)
658     return 0;
659   return 1;
660 }
661
662 /**
663  * gst_collect_pads_remove_pad:
664  * @pads: the collectpads to use
665  * @pad: (transfer none): the pad to remove
666  *
667  * Remove a pad from the collection of collect pads. This function will also
668  * free the #GstCollectData and all the resources that were allocated with
669  * gst_collect_pads_add_pad().
670  *
671  * The pad will be deactivated automatically when @pads is stopped.
672  *
673  * MT safe.
674  *
675  * Returns: %TRUE if the pad could be removed.
676  */
677 gboolean
678 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
679 {
680   GstCollectData *data;
681   GSList *list;
682
683   g_return_val_if_fail (pads != NULL, FALSE);
684   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
685   g_return_val_if_fail (pad != NULL, FALSE);
686   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
687
688   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
689
690   GST_OBJECT_LOCK (pads);
691   list =
692       g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
693   if (!list)
694     goto unknown_pad;
695
696   data = (GstCollectData *) list->data;
697
698   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
699       data);
700
701   /* clear the stuff we configured */
702   gst_pad_set_chain_function (pad, NULL);
703   gst_pad_set_event_function (pad, NULL);
704   GST_OBJECT_LOCK (pad);
705   gst_pad_set_element_private (pad, NULL);
706   GST_OBJECT_UNLOCK (pad);
707
708   /* backward compat, also remove from data if stopped, note that this function
709    * can only be called when we are stopped because we don't take the
710    * STREAM_LOCK to protect the pads->data list. */
711   if (!pads->priv->started) {
712     GSList *dlist;
713
714     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
715     if (dlist) {
716       GstCollectData *pdata = dlist->data;
717
718       pads->data = g_slist_delete_link (pads->data, dlist);
719       unref_data (pdata);
720     }
721   }
722   /* remove from the pad list */
723   pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
724   pads->priv->pad_cookie++;
725
726   /* signal waiters because something changed */
727   GST_COLLECT_PADS_EVT_BROADCAST (pads);
728
729   /* deactivate the pad when needed */
730   if (!pads->priv->started)
731     gst_pad_set_active (pad, FALSE);
732
733   /* clean and free the collect data */
734   unref_data (data);
735
736   GST_OBJECT_UNLOCK (pads);
737
738   return TRUE;
739
740 unknown_pad:
741   {
742     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
743         GST_DEBUG_PAD_NAME (pad));
744     GST_OBJECT_UNLOCK (pads);
745     return FALSE;
746   }
747 }
748
749 /*
750  * Must be called with STREAM_LOCK and OBJECT_LOCK.
751  */
752 static void
753 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
754     gboolean flushing)
755 {
756   GSList *walk = NULL;
757
758   /* Update the pads flushing flag */
759   for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
760     GstCollectData *cdata = walk->data;
761
762     if (GST_IS_PAD (cdata->pad)) {
763       GST_OBJECT_LOCK (cdata->pad);
764       if (flushing)
765         GST_PAD_SET_FLUSHING (cdata->pad);
766       else
767         GST_PAD_UNSET_FLUSHING (cdata->pad);
768       if (flushing)
769         GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
770       else
771         GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
772       gst_collect_pads_clear (pads, cdata);
773       GST_OBJECT_UNLOCK (cdata->pad);
774     }
775   }
776
777   /* inform _chain of changes */
778   GST_COLLECT_PADS_EVT_BROADCAST (pads);
779 }
780
781 /**
782  * gst_collect_pads_set_flushing:
783  * @pads: the collectpads to use
784  * @flushing: desired state of the pads
785  *
786  * Change the flushing state of all the pads in the collection. No pad
787  * is able to accept anymore data when @flushing is %TRUE. Calling this
788  * function with @flushing %FALSE makes @pads accept data again.
789  * Caller must ensure that downstream streaming (thread) is not blocked,
790  * e.g. by sending a FLUSH_START downstream.
791  *
792  * MT safe.
793  */
794 void
795 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
796 {
797   g_return_if_fail (pads != NULL);
798   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
799
800   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
801   GST_COLLECT_PADS_STREAM_LOCK (pads);
802   GST_OBJECT_LOCK (pads);
803   gst_collect_pads_set_flushing_unlocked (pads, flushing);
804   GST_OBJECT_UNLOCK (pads);
805   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
806 }
807
808 /**
809  * gst_collect_pads_start:
810  * @pads: the collectpads to use
811  *
812  * Starts the processing of data in the collect_pads.
813  *
814  * MT safe.
815  */
816 void
817 gst_collect_pads_start (GstCollectPads * pads)
818 {
819   GSList *collected;
820
821   g_return_if_fail (pads != NULL);
822   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
823
824   GST_DEBUG_OBJECT (pads, "starting collect pads");
825
826   /* make sure stop and collect cannot be called anymore */
827   GST_COLLECT_PADS_STREAM_LOCK (pads);
828
829   /* make pads streamable */
830   GST_OBJECT_LOCK (pads);
831
832   /* loop over the master pad list and reset the segment */
833   collected = pads->priv->pad_list;
834   for (; collected; collected = g_slist_next (collected)) {
835     GstCollectData *data;
836
837     data = collected->data;
838     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
839   }
840
841   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
842
843   /* Start collect pads */
844   pads->priv->started = TRUE;
845   GST_OBJECT_UNLOCK (pads);
846   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
847 }
848
849 /**
850  * gst_collect_pads_stop:
851  * @pads: the collectpads to use
852  *
853  * Stops the processing of data in the collect_pads. this function
854  * will also unblock any blocking operations.
855  *
856  * MT safe.
857  */
858 void
859 gst_collect_pads_stop (GstCollectPads * pads)
860 {
861   GSList *collected;
862
863   g_return_if_fail (pads != NULL);
864   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
865
866   GST_DEBUG_OBJECT (pads, "stopping collect pads");
867
868   /* make sure collect and start cannot be called anymore */
869   GST_COLLECT_PADS_STREAM_LOCK (pads);
870
871   /* make pads not accept data anymore */
872   GST_OBJECT_LOCK (pads);
873   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
874
875   /* Stop collect pads */
876   pads->priv->started = FALSE;
877   pads->priv->eospads = 0;
878   pads->priv->queuedpads = 0;
879
880   /* loop over the master pad list and flush buffers */
881   collected = pads->priv->pad_list;
882   for (; collected; collected = g_slist_next (collected)) {
883     GstCollectData *data;
884     GstBuffer **buffer_p;
885
886     data = collected->data;
887     if (data->buffer) {
888       buffer_p = &data->buffer;
889       gst_buffer_replace (buffer_p, NULL);
890       data->pos = 0;
891     }
892     GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
893   }
894
895   if (pads->priv->earliest_data)
896     unref_data (pads->priv->earliest_data);
897   pads->priv->earliest_data = NULL;
898   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
899
900   GST_OBJECT_UNLOCK (pads);
901   /* Wake them up so they can end the chain functions. */
902   GST_COLLECT_PADS_EVT_BROADCAST (pads);
903
904   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
905 }
906
907 /**
908  * gst_collect_pads_peek:
909  * @pads: the collectpads to peek
910  * @data: the data to use
911  *
912  * Peek at the buffer currently queued in @data. This function
913  * should be called with the @pads STREAM_LOCK held, such as in the callback
914  * handler.
915  *
916  * MT safe.
917  *
918  * Returns: The buffer in @data or NULL if no buffer is queued.
919  *  should unref the buffer after usage.
920  */
921 GstBuffer *
922 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
923 {
924   GstBuffer *result;
925
926   g_return_val_if_fail (pads != NULL, NULL);
927   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
928   g_return_val_if_fail (data != NULL, NULL);
929
930   if ((result = data->buffer))
931     gst_buffer_ref (result);
932
933   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
934       GST_DEBUG_PAD_NAME (data->pad), result);
935
936   return result;
937 }
938
939 /**
940  * gst_collect_pads_pop:
941  * @pads: the collectpads to pop
942  * @data: the data to use
943  *
944  * Pop the buffer currently queued in @data. This function
945  * should be called with the @pads STREAM_LOCK held, such as in the callback
946  * handler.
947  *
948  * MT safe.
949  *
950  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
951  *   queued. You should unref the buffer after usage.
952  */
953 GstBuffer *
954 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
955 {
956   GstBuffer *result;
957
958   g_return_val_if_fail (pads != NULL, NULL);
959   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
960   g_return_val_if_fail (data != NULL, NULL);
961
962   if ((result = data->buffer)) {
963     data->buffer = NULL;
964     data->pos = 0;
965     /* one less pad with queued data now */
966     if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
967       pads->priv->queuedpads--;
968   }
969
970   GST_COLLECT_PADS_EVT_BROADCAST (pads);
971
972   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
973       GST_DEBUG_PAD_NAME (data->pad), result);
974
975   return result;
976 }
977
978 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
979  * held */
980 static void
981 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
982 {
983   GstBuffer *buf;
984
985   if ((buf = gst_collect_pads_pop (pads, data)))
986     gst_buffer_unref (buf);
987 }
988
989 /**
990  * gst_collect_pads_available:
991  * @pads: the collectpads to query
992  *
993  * Query how much bytes can be read from each queued buffer. This means
994  * that the result of this call is the maximum number of bytes that can
995  * be read from each of the pads.
996  *
997  * This function should be called with @pads STREAM_LOCK held, such as
998  * in the callback.
999  *
1000  * MT safe.
1001  *
1002  * Returns: The maximum number of bytes queued on all pads. This function
1003  * returns 0 if a pad has no queued buffer.
1004  */
1005 /* we might pre-calculate this in some struct field,
1006  * but would then have to maintain this in _chain and particularly _pop, etc,
1007  * even if element is never interested in this information */
1008 guint
1009 gst_collect_pads_available (GstCollectPads * pads)
1010 {
1011   GSList *collected;
1012   guint result = G_MAXUINT;
1013
1014   g_return_val_if_fail (pads != NULL, 0);
1015   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1016
1017   collected = pads->data;
1018   for (; collected; collected = g_slist_next (collected)) {
1019     GstCollectData *pdata;
1020     GstBuffer *buffer;
1021     gint size;
1022
1023     pdata = (GstCollectData *) collected->data;
1024
1025     /* ignore pad with EOS */
1026     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1027                 GST_COLLECT_PADS_STATE_EOS))) {
1028       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1029       continue;
1030     }
1031
1032     /* an empty buffer without EOS is weird when we get here.. */
1033     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1034       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1035       goto not_filled;
1036     }
1037
1038     /* this is the size left of the buffer */
1039     size = gst_buffer_get_size (buffer) - pdata->pos;
1040     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1041
1042     /* need to return the min of all available data */
1043     if (size < result)
1044       result = size;
1045   }
1046   /* nothing changed, all must be EOS then, return 0 */
1047   if (G_UNLIKELY (result == G_MAXUINT))
1048     result = 0;
1049
1050   return result;
1051
1052 not_filled:
1053   {
1054     return 0;
1055   }
1056 }
1057
1058 /**
1059  * gst_collect_pads_flush:
1060  * @pads: the collectpads to query
1061  * @data: the data to use
1062  * @size: the number of bytes to flush
1063  *
1064  * Flush @size bytes from the pad @data.
1065  *
1066  * This function should be called with @pads STREAM_LOCK held, such as
1067  * in the callback.
1068  *
1069  * MT safe.
1070  *
1071  * Returns: The number of bytes flushed This can be less than @size and
1072  * is 0 if the pad was end-of-stream.
1073  */
1074 guint
1075 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1076     guint size)
1077 {
1078   guint flushsize;
1079   gsize bsize;
1080   GstBuffer *buffer;
1081
1082   g_return_val_if_fail (pads != NULL, 0);
1083   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1084   g_return_val_if_fail (data != NULL, 0);
1085
1086   /* no buffer, must be EOS */
1087   if ((buffer = data->buffer) == NULL)
1088     return 0;
1089
1090   bsize = gst_buffer_get_size (buffer);
1091
1092   /* this is what we can flush at max */
1093   flushsize = MIN (size, bsize - data->pos);
1094
1095   data->pos += size;
1096
1097   if (data->pos >= bsize)
1098     /* _clear will also reset data->pos to 0 */
1099     gst_collect_pads_clear (pads, data);
1100
1101   return flushsize;
1102 }
1103
1104 /**
1105  * gst_collect_pads_read_buffer:
1106  * @pads: the collectpads to query
1107  * @data: the data to use
1108  * @size: the number of bytes to read
1109  *
1110  * Get a subbuffer of @size bytes from the given pad @data.
1111  *
1112  * This function should be called with @pads STREAM_LOCK held, such as in the
1113  * callback.
1114  *
1115  * MT safe.
1116  *
1117  * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
1118  * A return of NULL signals that the pad is end-of-stream.
1119  * Unref the buffer after use.
1120  */
1121 GstBuffer *
1122 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1123     guint size)
1124 {
1125   guint readsize;
1126   GstBuffer *buffer;
1127
1128   g_return_val_if_fail (pads != NULL, NULL);
1129   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1130   g_return_val_if_fail (data != NULL, NULL);
1131
1132   /* no buffer, must be EOS */
1133   if ((buffer = data->buffer) == NULL)
1134     return NULL;
1135
1136   readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
1137
1138   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1139       readsize);
1140 }
1141
1142 /**
1143  * gst_collect_pads_take_buffer:
1144  * @pads: the collectpads to query
1145  * @data: the data to use
1146  * @size: the number of bytes to read
1147  *
1148  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1149  * of read bytes.
1150  *
1151  * This function should be called with @pads STREAM_LOCK held, such as in the
1152  * callback.
1153  *
1154  * MT safe.
1155  *
1156  * Returns: A sub buffer. The size of the buffer can be less that requested.
1157  * A return of NULL signals that the pad is end-of-stream.
1158  * Unref the buffer after use.
1159  */
1160 GstBuffer *
1161 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1162     guint size)
1163 {
1164   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1165
1166   if (buffer) {
1167     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1168   }
1169   return buffer;
1170 }
1171
1172 /**
1173  * gst_collect_pads_set_waiting:
1174  * @pads: the collectpads
1175  * @data: the data to use
1176  * @waiting: boolean indicating whether this pad should operate
1177  *           in waiting or non-waiting mode
1178  *
1179  * Sets a pad to waiting or non-waiting mode, if at least this pad
1180  * has not been created with locked waiting state,
1181  * in which case nothing happens.
1182  *
1183  * This function should be called with @pads STREAM_LOCK held, such as
1184  * in the callback.
1185  *
1186  * MT safe.
1187  */
1188 void
1189 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1190     gboolean waiting)
1191 {
1192   g_return_if_fail (pads != NULL);
1193   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1194   g_return_if_fail (data != NULL);
1195
1196   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1197       GST_PAD_NAME (data->pad), waiting,
1198       GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1199
1200   /* Do something only on a change and if not locked */
1201   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1202       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1203           !!waiting)) {
1204     /* Set waiting state for this pad */
1205     if (waiting)
1206       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1207     else
1208       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1209     /* Update number of queued pads if needed */
1210     if (!data->buffer &&
1211         !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1212       if (waiting)
1213         pads->priv->queuedpads--;
1214       else
1215         pads->priv->queuedpads++;
1216     }
1217
1218     /* signal waiters because something changed */
1219     GST_COLLECT_PADS_EVT_BROADCAST (pads);
1220   }
1221 }
1222
1223 /* see if pads were added or removed and update our stats. Any pad
1224  * added after releasing the LOCK will get collected in the next
1225  * round.
1226  *
1227  * We can do a quick check by checking the cookies, that get changed
1228  * whenever the pad list is updated.
1229  *
1230  * Must be called with STREAM_LOCK.
1231  */
1232 static void
1233 gst_collect_pads_check_pads (GstCollectPads * pads)
1234 {
1235   /* the master list and cookie are protected with LOCK */
1236   GST_OBJECT_LOCK (pads);
1237   if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1238     GSList *collected;
1239
1240     /* clear list and stats */
1241     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1242     g_slist_free (pads->data);
1243     pads->data = NULL;
1244     pads->priv->numpads = 0;
1245     pads->priv->queuedpads = 0;
1246     pads->priv->eospads = 0;
1247     if (pads->priv->earliest_data)
1248       unref_data (pads->priv->earliest_data);
1249     pads->priv->earliest_data = NULL;
1250     pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1251
1252     /* loop over the master pad list */
1253     collected = pads->priv->pad_list;
1254     for (; collected; collected = g_slist_next (collected)) {
1255       GstCollectData *data;
1256
1257       /* update the stats */
1258       pads->priv->numpads++;
1259       data = collected->data;
1260       if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1261         pads->priv->eospads++;
1262       else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1263               GST_COLLECT_PADS_STATE_WAITING))
1264         pads->priv->queuedpads++;
1265
1266       /* add to the list of pads to collect */
1267       ref_data (data);
1268       /* preserve order of adding/requesting pads */
1269       pads->data = g_slist_append (pads->data, data);
1270     }
1271     /* and update the cookie */
1272     pads->priv->cookie = pads->priv->pad_cookie;
1273   }
1274   GST_OBJECT_UNLOCK (pads);
1275 }
1276
1277 /* checks if all the pads are collected and call the collectfunction
1278  *
1279  * Should be called with STREAM_LOCK.
1280  *
1281  * Returns: The #GstFlowReturn of collection.
1282  */
1283 static GstFlowReturn
1284 gst_collect_pads_check_collected (GstCollectPads * pads)
1285 {
1286   GstFlowReturn flow_ret = GST_FLOW_OK;
1287   GstCollectPadsFunction func;
1288   gpointer user_data;
1289
1290   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1291
1292   GST_OBJECT_LOCK (pads);
1293   func = pads->priv->func;
1294   user_data = pads->priv->user_data;
1295   GST_OBJECT_UNLOCK (pads);
1296
1297   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1298
1299   /* check for new pads, update stats etc.. */
1300   gst_collect_pads_check_pads (pads);
1301
1302   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1303     /* If all our pads are EOS just collect once to let the element
1304      * do its final EOS handling. */
1305     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1306         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1307
1308     if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1309                 TRUE, FALSE) == TRUE)) {
1310       GST_INFO_OBJECT (pads, "finished seeking");
1311     }
1312     do {
1313       flow_ret = func (pads, user_data);
1314     } while (flow_ret == GST_FLOW_OK);
1315   } else {
1316     gboolean collected = FALSE;
1317
1318     /* We call the collected function as long as our condition matches. */
1319     while (((pads->priv->queuedpads + pads->priv->eospads) >=
1320             pads->priv->numpads)) {
1321       GST_DEBUG_OBJECT (pads,
1322           "All active pads (%d + %d >= %d) have data, " "calling %s",
1323           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1324           GST_DEBUG_FUNCPTR_NAME (func));
1325
1326       if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1327                   TRUE, FALSE) == TRUE)) {
1328         GST_INFO_OBJECT (pads, "finished seeking");
1329       }
1330       flow_ret = func (pads, user_data);
1331       collected = TRUE;
1332
1333       /* break on error */
1334       if (flow_ret != GST_FLOW_OK)
1335         break;
1336       /* Don't keep looping after telling the element EOS or flushing */
1337       if (pads->priv->queuedpads == 0)
1338         break;
1339     }
1340     if (!collected)
1341       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1342           pads->priv->numpads);
1343   }
1344   return flow_ret;
1345 }
1346
1347
1348 /* General overview:
1349  * - only pad with a buffer can determine earliest_data (and earliest_time)
1350  * - only segment info determines (non-)waiting state
1351  * - ? perhaps use _stream_time for comparison
1352  *   (which muxers might have use as well ?)
1353  */
1354
1355 /*
1356  * Function to recalculate the waiting state of all pads.
1357  *
1358  * Must be called with STREAM_LOCK.
1359  *
1360  * Returns TRUE if a pad was set to waiting
1361  * (from non-waiting state).
1362  */
1363 static gboolean
1364 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1365 {
1366   GSList *collected;
1367   gboolean result = FALSE;
1368
1369   /* If earliest time is not known, there is nothing to do. */
1370   if (pads->priv->earliest_data == NULL)
1371     return FALSE;
1372
1373   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1374     GstCollectData *data = (GstCollectData *) collected->data;
1375     int cmp_res;
1376     GstClockTime comp_time;
1377
1378     /* check if pad has a segment */
1379     if (data->segment.format == GST_FORMAT_UNDEFINED) {
1380       GST_WARNING_OBJECT (pads,
1381           "GstCollectPads has no time segment, assuming 0 based.");
1382       gst_segment_init (&data->segment, GST_FORMAT_TIME);
1383       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1384     }
1385
1386     /* check segment format */
1387     if (data->segment.format != GST_FORMAT_TIME) {
1388       GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1389       continue;
1390     }
1391
1392     /* check if the waiting state should be changed */
1393     comp_time = data->segment.position;
1394     cmp_res = pads->priv->compare_func (pads, data, comp_time,
1395         pads->priv->earliest_data, pads->priv->earliest_time,
1396         pads->priv->compare_user_data);
1397     if (cmp_res > 0)
1398       /* stop waiting */
1399       gst_collect_pads_set_waiting (pads, data, FALSE);
1400     else {
1401       if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1402         /* start waiting */
1403         gst_collect_pads_set_waiting (pads, data, TRUE);
1404         result = TRUE;
1405       }
1406     }
1407   }
1408
1409   return result;
1410 }
1411
1412 /**
1413  * gst_collect_pads_find_best_pad:
1414  * @pads: the collectpads to use
1415  * @data: returns the collectdata for earliest data
1416  * @time: returns the earliest available buffertime
1417  *
1418  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1419  * and return the corresponding #GstCollectData and buffertime.
1420  *
1421  * This function should be called with STREAM_LOCK held,
1422  * such as in the callback.
1423  */
1424 static void
1425 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1426     GstCollectData ** data, GstClockTime * time)
1427 {
1428   GSList *collected;
1429   GstCollectData *best = NULL;
1430   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1431
1432   g_return_if_fail (data != NULL);
1433   g_return_if_fail (time != NULL);
1434
1435   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1436     GstBuffer *buffer;
1437     GstCollectData *data = (GstCollectData *) collected->data;
1438     GstClockTime timestamp;
1439
1440     buffer = gst_collect_pads_peek (pads, data);
1441     /* if we have a buffer check if it is better then the current best one */
1442     if (buffer != NULL) {
1443       timestamp = GST_BUFFER_DTS (buffer);
1444       if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1445         timestamp = GST_BUFFER_PTS (buffer);
1446       }
1447       gst_buffer_unref (buffer);
1448       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1449               best, best_time, pads->priv->compare_user_data) < 0) {
1450         best = data;
1451         best_time = timestamp;
1452       }
1453     }
1454   }
1455
1456   /* set earliest time */
1457   *data = best;
1458   *time = best_time;
1459
1460   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1461       best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1462       GST_TIME_ARGS (best_time));
1463 }
1464
1465 /*
1466  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1467  * gst_collect_pads_recalculate_waiting
1468  *
1469  * Must be called with STREAM_LOCK.
1470  */
1471 static gboolean
1472 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1473 {
1474   if (pads->priv->earliest_data)
1475     unref_data (pads->priv->earliest_data);
1476   gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1477       &pads->priv->earliest_time);
1478   if (pads->priv->earliest_data)
1479     ref_data (pads->priv->earliest_data);
1480   return gst_collect_pads_recalculate_waiting (pads);
1481 }
1482
1483 /*
1484  * Default collect callback triggered when #GstCollectPads gathered all data.
1485  *
1486  * Called with STREAM_LOCK.
1487  */
1488 static GstFlowReturn
1489 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1490 {
1491   GstCollectData *best = NULL;
1492   GstBuffer *buffer;
1493   GstFlowReturn ret = GST_FLOW_OK;
1494   GstCollectPadsBufferFunction func;
1495   gpointer buffer_user_data;
1496
1497   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1498
1499   GST_OBJECT_LOCK (pads);
1500   func = pads->priv->buffer_func;
1501   buffer_user_data = pads->priv->buffer_user_data;
1502   GST_OBJECT_UNLOCK (pads);
1503
1504   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1505
1506   /* Find the oldest pad at all cost */
1507   if (gst_collect_pads_recalculate_full (pads)) {
1508     /* waiting was switched on,
1509      * so give another thread a chance to deliver a possibly
1510      * older buffer; don't charge on yet with the current oldest */
1511     ret = GST_FLOW_OK;
1512     goto done;
1513   }
1514
1515   best = pads->priv->earliest_data;
1516
1517   /* No data collected means EOS. */
1518   if (G_UNLIKELY (best == NULL)) {
1519     ret = func (pads, best, NULL, buffer_user_data);
1520     if (ret == GST_FLOW_OK)
1521       ret = GST_FLOW_EOS;
1522     goto done;
1523   }
1524
1525   /* make sure that the pad we take a buffer from is waiting;
1526    * otherwise popping a buffer will seem not to have happened
1527    * and collectpads can get into a busy loop */
1528   gst_collect_pads_set_waiting (pads, best, TRUE);
1529
1530   /* Send buffer */
1531   buffer = gst_collect_pads_pop (pads, best);
1532   ret = func (pads, best, buffer, buffer_user_data);
1533
1534   /* maybe non-waiting was forced to waiting above due to
1535    * newsegment events coming too sparsely,
1536    * so re-check to restore state to avoid hanging/waiting */
1537   gst_collect_pads_recalculate_full (pads);
1538
1539 done:
1540   return ret;
1541 }
1542
1543 /*
1544  * Default timestamp compare function.
1545  */
1546 static gint
1547 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1548     GstCollectData * data1, GstClockTime timestamp1,
1549     GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1550 {
1551
1552   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1553       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1554       GST_TIME_ARGS (timestamp2));
1555   /* non-valid timestamps go first as they are probably headers or so */
1556   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1557     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1558
1559   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1560     return 1;
1561
1562   /* compare timestamp */
1563   if (timestamp1 < timestamp2)
1564     return -1;
1565
1566   if (timestamp1 > timestamp2)
1567     return 1;
1568
1569   return 0;
1570 }
1571
1572 /* called with STREAM_LOCK */
1573 static void
1574 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1575     GstCollectData * data, GstClockTime new_pos)
1576 {
1577   gint cmp_res;
1578
1579   /* If oldest time is not known, or current pad got newsegment;
1580    * recalculate the state */
1581   if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1582     gst_collect_pads_recalculate_full (pads);
1583     goto exit;
1584   }
1585
1586   /* Check if the waiting state of the pad should change. */
1587   cmp_res =
1588       pads->priv->compare_func (pads, data, new_pos,
1589       pads->priv->earliest_data, pads->priv->earliest_time,
1590       pads->priv->compare_user_data);
1591
1592   if (cmp_res > 0)
1593     /* Stop waiting */
1594     gst_collect_pads_set_waiting (pads, data, FALSE);
1595
1596 exit:
1597   return;
1598
1599 }
1600
1601 static GstClockTime
1602 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1603     GstClockTime time)
1604 {
1605   GstClockTime otime = time;
1606   GstBuffer *in, *out = NULL;
1607
1608   if (pads->priv->clip_func) {
1609     in = gst_buffer_new ();
1610     GST_BUFFER_PTS (in) = time;
1611     GST_BUFFER_DTS (in) = time;
1612     pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1613     if (out) {
1614       otime = GST_BUFFER_PTS (out);
1615       gst_buffer_unref (out);
1616     } else {
1617       /* FIXME should distinguish between ahead or after segment,
1618        * let's assume after segment and use some large time ... */
1619       otime = G_MAXINT64 / 2;
1620     }
1621   }
1622
1623   return otime;
1624 }
1625
1626 /**
1627  * gst_collect_pads_event_default:
1628  * @pads: the collectpads to use
1629  * @data: collect data of corresponding pad
1630  * @event: event being processed
1631  * @discard: process but do not send event downstream
1632  *
1633  * Default GstCollectPads event handling that elements should always
1634  * chain up to to ensure proper operation.  Element might however indicate
1635  * event should not be forwarded downstream.
1636  */
1637 gboolean
1638 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1639     GstEvent * event, gboolean discard)
1640 {
1641   gboolean res = TRUE;
1642   GstCollectPadsBufferFunction buffer_func;
1643   GstObject *parent;
1644   GstPad *pad;
1645
1646   GST_OBJECT_LOCK (pads);
1647   buffer_func = pads->priv->buffer_func;
1648   GST_OBJECT_UNLOCK (pads);
1649
1650   pad = data->pad;
1651   parent = GST_OBJECT_PARENT (pad);
1652
1653   switch (GST_EVENT_TYPE (event)) {
1654     case GST_EVENT_FLUSH_START:
1655     {
1656       if (g_atomic_int_get (&pads->priv->seeking)) {
1657         /* drop all but the first FLUSH_STARTs when seeking */
1658         if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_start,
1659                 TRUE, FALSE) == FALSE)
1660           goto eat;
1661
1662         /* unblock collect pads */
1663         gst_pad_event_default (pad, parent, event);
1664         event = NULL;
1665
1666         GST_COLLECT_PADS_STREAM_LOCK (pads);
1667         /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
1668          * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
1669          * non-flushing (which happens in gst_collect_pads_event_default).
1670          */
1671         gst_collect_pads_set_flushing (pads, TRUE);
1672
1673         if (pads->priv->flush_func)
1674           pads->priv->flush_func (pads, pads->priv->flush_user_data);
1675
1676         g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
1677         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1678
1679         goto eat;
1680       } else {
1681         /* forward event to unblock check_collected */
1682         GST_DEBUG_OBJECT (pad, "forwarding flush start");
1683         res = gst_pad_event_default (pad, parent, event);
1684         event = NULL;
1685
1686         /* now unblock the chain function.
1687          * no cond per pad, so they all unblock,
1688          * non-flushing block again */
1689         GST_COLLECT_PADS_STREAM_LOCK (pads);
1690         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1691         gst_collect_pads_clear (pads, data);
1692
1693         /* cater for possible default muxing functionality */
1694         if (buffer_func) {
1695           /* restore to initial state */
1696           gst_collect_pads_set_waiting (pads, data, TRUE);
1697           /* if the current pad is affected, reset state, recalculate later */
1698           if (pads->priv->earliest_data == data) {
1699             unref_data (data);
1700             pads->priv->earliest_data = NULL;
1701             pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1702           }
1703         }
1704
1705         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1706
1707         goto eat;
1708       }
1709     }
1710     case GST_EVENT_FLUSH_STOP:
1711     {
1712       /* flush the 1 buffer queue */
1713       GST_COLLECT_PADS_STREAM_LOCK (pads);
1714       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1715       gst_collect_pads_clear (pads, data);
1716       /* we need new segment info after the flush */
1717       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1718       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1719       /* if the pad was EOS, remove the EOS flag and
1720        * decrement the number of eospads */
1721       if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1722                   GST_COLLECT_PADS_STATE_EOS))) {
1723         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1724                 GST_COLLECT_PADS_STATE_WAITING))
1725           pads->priv->queuedpads++;
1726         pads->priv->eospads--;
1727         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1728       }
1729       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1730
1731       if (g_atomic_int_get (&pads->priv->seeking)) {
1732         if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
1733                 TRUE, FALSE))
1734           goto forward;
1735         else
1736           goto eat;
1737       } else {
1738         goto forward;
1739       }
1740     }
1741     case GST_EVENT_EOS:
1742     {
1743       GST_COLLECT_PADS_STREAM_LOCK (pads);
1744       /* if the pad was not EOS, make it EOS and so we
1745        * have one more eospad */
1746       if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1747                   GST_COLLECT_PADS_STATE_EOS))) {
1748         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1749         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1750                 GST_COLLECT_PADS_STATE_WAITING))
1751           pads->priv->queuedpads--;
1752         pads->priv->eospads++;
1753       }
1754       /* check if we need collecting anything, we ignore the result. */
1755       gst_collect_pads_check_collected (pads);
1756       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1757
1758       goto eat;
1759     }
1760     case GST_EVENT_SEGMENT:
1761     {
1762       GstSegment seg;
1763
1764       GST_COLLECT_PADS_STREAM_LOCK (pads);
1765
1766       gst_event_copy_segment (event, &seg);
1767
1768       GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1769
1770       /* default collection can not handle other segment formats than time */
1771       if (buffer_func && seg.format != GST_FORMAT_TIME) {
1772         GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1773             "can only handle time segments. Non time segment ignored.");
1774         goto newsegment_done;
1775       }
1776
1777       /* need to update segment first */
1778       data->segment = seg;
1779       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1780
1781       /* now we can use for e.g. running time */
1782       seg.position = gst_collect_pads_clip_time (pads, data, seg.start);
1783       /* update again */
1784       data->segment = seg;
1785
1786       /* default muxing functionality */
1787       if (!buffer_func)
1788         goto newsegment_done;
1789
1790       gst_collect_pads_handle_position_update (pads, data, seg.position);
1791
1792     newsegment_done:
1793       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1794       /* we must not forward this event since multiple segments will be
1795        * accumulated and this is certainly not what we want. */
1796       goto eat;
1797     }
1798     case GST_EVENT_GAP:
1799     {
1800       GstClockTime start, duration;
1801
1802       GST_COLLECT_PADS_STREAM_LOCK (pads);
1803
1804       gst_event_parse_gap (event, &start, &duration);
1805       if (GST_CLOCK_TIME_IS_VALID (duration))
1806         start += duration;
1807       /* we do not expect another buffer until after gap,
1808        * so that is our position now */
1809       data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1810
1811       gst_collect_pads_handle_position_update (pads, data,
1812           data->segment.position);
1813
1814       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1815       goto eat;
1816     }
1817     case GST_EVENT_STREAM_START:
1818       /* drop stream start events, element must create its own start event,
1819        * we can't just forward the first random stream start event we get */
1820       goto eat;
1821     case GST_EVENT_CAPS:
1822       goto eat;
1823     default:
1824       /* forward other events */
1825       goto forward;
1826   }
1827
1828 eat:
1829   if (event)
1830     gst_event_unref (event);
1831   return res;
1832
1833 forward:
1834   if (discard)
1835     goto eat;
1836   else
1837     return gst_pad_event_default (pad, parent, event);
1838 }
1839
1840 typedef struct
1841 {
1842   GstEvent *event;
1843   gboolean result;
1844 } EventData;
1845
1846 static gboolean
1847 event_forward_func (GstPad * pad, EventData * data)
1848 {
1849   data->result &= gst_pad_push_event (pad, gst_event_ref (data->event));
1850   return !data->result;
1851 }
1852
1853 static gboolean
1854 forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
1855 {
1856   EventData data;
1857
1858   data.event = event;
1859   data.result = TRUE;
1860
1861   gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
1862   return data.result;
1863 }
1864
1865 gboolean
1866 gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
1867     GstEvent * event)
1868 {
1869   GstObject *parent;
1870   gboolean res = TRUE;
1871
1872   parent = GST_OBJECT_PARENT (pad);
1873
1874   switch (GST_EVENT_TYPE (event)) {
1875     case GST_EVENT_SEEK:{
1876       GstSeekFlags flags;
1877
1878       GST_INFO_OBJECT (pads, "starting seek");
1879
1880       gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
1881       if (flags & GST_SEEK_FLAG_FLUSH) {
1882         g_atomic_int_set (&pads->priv->seeking, TRUE);
1883         g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
1884         /* forward the seek upstream */
1885         res = forward_event_to_all_sinkpads (pad, event);
1886         event = NULL;
1887         if (!res) {
1888           g_atomic_int_set (&pads->priv->seeking, FALSE);
1889           g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
1890         }
1891       }
1892
1893       GST_INFO_OBJECT (pads, "seek done, result: %d", res);
1894
1895       break;
1896     }
1897     default:
1898       break;
1899   }
1900
1901   if (event)
1902     res = gst_pad_event_default (pad, parent, event);
1903
1904   return res;
1905 }
1906
1907 static gboolean
1908 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1909     GstCollectData * data, GstEvent * event, gpointer user_data)
1910 {
1911   return gst_collect_pads_event_default (pads, data, event, FALSE);
1912 }
1913
1914 static gboolean
1915 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1916 {
1917   gboolean res = FALSE, need_unlock = FALSE;
1918   GstCollectData *data;
1919   GstCollectPads *pads;
1920   GstCollectPadsEventFunction event_func;
1921   gpointer event_user_data;
1922
1923   /* some magic to get the managing collect_pads */
1924   GST_OBJECT_LOCK (pad);
1925   data = (GstCollectData *) gst_pad_get_element_private (pad);
1926   if (G_UNLIKELY (data == NULL))
1927     goto pad_removed;
1928   ref_data (data);
1929   GST_OBJECT_UNLOCK (pad);
1930
1931   res = FALSE;
1932
1933   pads = data->collect;
1934
1935   GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1936       GST_EVENT_TYPE_NAME (event));
1937
1938   GST_OBJECT_LOCK (pads);
1939   event_func = pads->priv->event_func;
1940   event_user_data = pads->priv->event_user_data;
1941   GST_OBJECT_UNLOCK (pads);
1942
1943   if (GST_EVENT_IS_SERIALIZED (event)) {
1944     GST_COLLECT_PADS_STREAM_LOCK (pads);
1945     need_unlock = TRUE;
1946   }
1947
1948   if (G_LIKELY (event_func)) {
1949     res = event_func (pads, data, event, event_user_data);
1950   }
1951
1952   if (need_unlock)
1953     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1954
1955   unref_data (data);
1956   return res;
1957
1958   /* ERRORS */
1959 pad_removed:
1960   {
1961     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1962     GST_OBJECT_UNLOCK (pad);
1963     return FALSE;
1964   }
1965 }
1966
1967 /**
1968  * gst_collect_pads_query_default:
1969  * @pads: the collectpads to use
1970  * @data: collect data of corresponding pad
1971  * @query: query being processed
1972  * @discard: process but do not send event downstream
1973  *
1974  * Default GstCollectPads query handling that elements should always
1975  * chain up to to ensure proper operation.  Element might however indicate
1976  * query should not be forwarded downstream.
1977  */
1978 gboolean
1979 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
1980     GstQuery * query, gboolean discard)
1981 {
1982   gboolean res = TRUE;
1983   GstObject *parent;
1984   GstPad *pad;
1985
1986   pad = data->pad;
1987   parent = GST_OBJECT_PARENT (pad);
1988
1989   switch (GST_QUERY_TYPE (query)) {
1990     case GST_QUERY_SEEKING:
1991     {
1992       GstFormat format;
1993
1994       /* don't pass it along as some (file)sink might claim it does
1995        * whereas with a collectpads in between that will not likely work */
1996       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1997       gst_query_set_seeking (query, format, FALSE, 0, -1);
1998       res = TRUE;
1999       discard = TRUE;
2000       break;
2001     }
2002     default:
2003       break;
2004   }
2005
2006   if (!discard)
2007     return gst_pad_query_default (pad, parent, query);
2008   else
2009     return res;
2010 }
2011
2012 static gboolean
2013 gst_collect_pads_query_default_internal (GstCollectPads * pads,
2014     GstCollectData * data, GstQuery * query, gpointer user_data)
2015 {
2016   return gst_collect_pads_query_default (pads, data, query, FALSE);
2017 }
2018
2019 static gboolean
2020 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
2021 {
2022   gboolean res = FALSE, need_unlock = FALSE;
2023   GstCollectData *data;
2024   GstCollectPads *pads;
2025   GstCollectPadsQueryFunction query_func;
2026   gpointer query_user_data;
2027
2028   GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
2029       GST_QUERY_TYPE_NAME (query));
2030
2031   /* some magic to get the managing collect_pads */
2032   GST_OBJECT_LOCK (pad);
2033   data = (GstCollectData *) gst_pad_get_element_private (pad);
2034   if (G_UNLIKELY (data == NULL))
2035     goto pad_removed;
2036   ref_data (data);
2037   GST_OBJECT_UNLOCK (pad);
2038
2039   pads = data->collect;
2040
2041   GST_OBJECT_LOCK (pads);
2042   query_func = pads->priv->query_func;
2043   query_user_data = pads->priv->query_user_data;
2044   GST_OBJECT_UNLOCK (pads);
2045
2046   if (GST_QUERY_IS_SERIALIZED (query)) {
2047     GST_COLLECT_PADS_STREAM_LOCK (pads);
2048     need_unlock = TRUE;
2049   }
2050
2051   if (G_LIKELY (query_func)) {
2052     res = query_func (pads, data, query, query_user_data);
2053   }
2054
2055   if (need_unlock)
2056     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2057
2058   unref_data (data);
2059   return res;
2060
2061   /* ERRORS */
2062 pad_removed:
2063   {
2064     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2065     GST_OBJECT_UNLOCK (pad);
2066     return FALSE;
2067   }
2068 }
2069
2070
2071 /* For each buffer we receive we check if our collected condition is reached
2072  * and if so we call the collected function. When this is done we check if
2073  * data has been unqueued. If data is still queued we wait holding the stream
2074  * lock to make sure no EOS event can happen while we are ready to be
2075  * collected 
2076  */
2077 static GstFlowReturn
2078 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2079 {
2080   GstCollectData *data;
2081   GstCollectPads *pads;
2082   GstFlowReturn ret;
2083   GstBuffer **buffer_p;
2084   guint32 cookie;
2085
2086   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2087
2088   /* some magic to get the managing collect_pads */
2089   GST_OBJECT_LOCK (pad);
2090   data = (GstCollectData *) gst_pad_get_element_private (pad);
2091   if (G_UNLIKELY (data == NULL))
2092     goto no_data;
2093   ref_data (data);
2094   GST_OBJECT_UNLOCK (pad);
2095
2096   pads = data->collect;
2097
2098   GST_COLLECT_PADS_STREAM_LOCK (pads);
2099   /* if not started, bail out */
2100   if (G_UNLIKELY (!pads->priv->started))
2101     goto not_started;
2102   /* check if this pad is flushing */
2103   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2104               GST_COLLECT_PADS_STATE_FLUSHING)))
2105     goto flushing;
2106   /* pad was EOS, we can refuse this data */
2107   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2108               GST_COLLECT_PADS_STATE_EOS)))
2109     goto eos;
2110
2111   /* see if we need to clip */
2112   if (pads->priv->clip_func) {
2113     GstBuffer *outbuf = NULL;
2114     ret =
2115         pads->priv->clip_func (pads, data, buffer, &outbuf,
2116         pads->priv->clip_user_data);
2117     buffer = outbuf;
2118
2119     if (G_UNLIKELY (outbuf == NULL))
2120       goto clipped;
2121
2122     if (G_UNLIKELY (ret == GST_FLOW_EOS))
2123       goto eos;
2124     else if (G_UNLIKELY (ret != GST_FLOW_OK))
2125       goto error;
2126   }
2127
2128   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
2129       GST_DEBUG_PAD_NAME (pad));
2130
2131   /* One more pad has data queued */
2132   if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
2133     pads->priv->queuedpads++;
2134   buffer_p = &data->buffer;
2135   gst_buffer_replace (buffer_p, buffer);
2136
2137   /* update segment last position if in TIME */
2138   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2139     GstClockTime timestamp;
2140
2141     timestamp = GST_BUFFER_DTS (buffer);
2142     if (!GST_CLOCK_TIME_IS_VALID (timestamp))
2143       timestamp = GST_BUFFER_PTS (buffer);
2144
2145     if (GST_CLOCK_TIME_IS_VALID (timestamp))
2146       data->segment.position = timestamp;
2147   }
2148
2149   /* While we have data queued on this pad try to collect stuff */
2150   do {
2151     /* Check if our collected condition is matched and call the collected
2152      * function if it is */
2153     ret = gst_collect_pads_check_collected (pads);
2154     /* when an error occurs, we want to report this back to the caller ASAP
2155      * without having to block if the buffer was not popped */
2156     if (G_UNLIKELY (ret != GST_FLOW_OK))
2157       goto error;
2158
2159     /* data was consumed, we can exit and accept new data */
2160     if (data->buffer == NULL)
2161       break;
2162
2163     /* Having the _INIT here means we don't care about any broadcast up to here
2164      * (most of which occur with STREAM_LOCK held, so could not have happened
2165      * anyway).  We do care about e.g. a remove initiated broadcast as of this
2166      * point.  Putting it here also makes this thread ignores any evt it raised
2167      * itself (as is a usual WAIT semantic).
2168      */
2169     GST_COLLECT_PADS_EVT_INIT (cookie);
2170
2171     /* pad could be removed and re-added */
2172     unref_data (data);
2173     GST_OBJECT_LOCK (pad);
2174     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2175       goto pad_removed;
2176     ref_data (data);
2177     GST_OBJECT_UNLOCK (pad);
2178
2179     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2180         GST_DEBUG_PAD_NAME (pad));
2181
2182     /* wait to be collected, this must happen from another thread triggered
2183      * by the _chain function of another pad. We release the lock so we
2184      * can get stopped or flushed as well. We can however not get EOS
2185      * because we still hold the STREAM_LOCK.
2186      */
2187     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2188     GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2189     GST_COLLECT_PADS_STREAM_LOCK (pads);
2190
2191     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2192
2193     /* after a signal, we could be stopped */
2194     if (G_UNLIKELY (!pads->priv->started))
2195       goto not_started;
2196     /* check if this pad is flushing */
2197     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2198                 GST_COLLECT_PADS_STATE_FLUSHING)))
2199       goto flushing;
2200   }
2201   while (data->buffer != NULL);
2202
2203 unlock_done:
2204   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2205   /* data is definitely NULL if pad_removed goto was run. */
2206   if (data)
2207     unref_data (data);
2208   if (buffer)
2209     gst_buffer_unref (buffer);
2210   return ret;
2211
2212 pad_removed:
2213   {
2214     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2215     GST_OBJECT_UNLOCK (pad);
2216     ret = GST_FLOW_NOT_LINKED;
2217     goto unlock_done;
2218   }
2219   /* ERRORS */
2220 no_data:
2221   {
2222     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2223     GST_OBJECT_UNLOCK (pad);
2224     gst_buffer_unref (buffer);
2225     return GST_FLOW_NOT_LINKED;
2226   }
2227 not_started:
2228   {
2229     GST_DEBUG ("not started");
2230     gst_collect_pads_clear (pads, data);
2231     ret = GST_FLOW_FLUSHING;
2232     goto unlock_done;
2233   }
2234 flushing:
2235   {
2236     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2237     gst_collect_pads_clear (pads, data);
2238     ret = GST_FLOW_FLUSHING;
2239     goto unlock_done;
2240   }
2241 eos:
2242   {
2243     /* we should not post an error for this, just inform upstream that
2244      * we don't expect anything anymore */
2245     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2246     ret = GST_FLOW_EOS;
2247     goto unlock_done;
2248   }
2249 clipped:
2250   {
2251     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2252     ret = GST_FLOW_OK;
2253     goto unlock_done;
2254   }
2255 error:
2256   {
2257     /* we print the error, the element should post a reasonable error
2258      * message for fatal errors */
2259     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2260     gst_collect_pads_clear (pads, data);
2261     goto unlock_done;
2262   }
2263 }