Port gtk-doc comments to their equivalent markdown syntax
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:gstcollectpads
25  * @title: GstCollectPads
26  * @short_description: manages a set of pads that operate in collect mode
27  * @see_also:
28  *
29  * Manages a set of pads that operate in collect mode. This means that control
30  * is given to the manager of this object when all pads have data.
31  *
32  *   * Collectpads are created with gst_collect_pads_new(). A callback should then
33  *     be installed with gst_collect_pads_set_function ().
34  *
35  *   * Pads are added to the collection with gst_collect_pads_add_pad()/
36  *     gst_collect_pads_remove_pad(). The pad
37  *     has to be a sinkpad. The chain and event functions of the pad are
38  *     overridden. The element_private of the pad is used to store
39  *     private information for the collectpads.
40  *
41  *   * For each pad, data is queued in the _chain function or by
42  *     performing a pull_range.
43  *
44  *   * When data is queued on all pads in waiting mode, the callback function is called.
45  *
46  *   * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
47  *     One can peek at the data with the gst_collect_pads_peek() function.
48  *     These functions will return %NULL if the pad received an EOS event. When all
49  *     pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS
50  *     event itself.
51  *
52  *   * Data can also be dequeued in byte units using the gst_collect_pads_available(),
53  *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
54  *
55  *   * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
56  *     their state change functions to start and stop the processing of the collectpads.
57  *     The gst_collect_pads_stop() call should be called before calling the parent
58  *     element state change function in the PAUSED_TO_READY state change to ensure
59  *     no pad is blocked and the element can finish streaming.
60  *
61  *   * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
62  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
63  *     Thus these pads may but need not have data when the callback is called.
64  *     All pads are in waiting mode by default.
65  *
66  */
67
68 #ifdef HAVE_CONFIG_H
69 #  include "config.h"
70 #endif
71
72 #include <gst/gst_private.h>
73
74 #include "gstcollectpads.h"
75
76 #include "../../../gst/glib-compat-private.h"
77
78 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
79 #define GST_CAT_DEFAULT collect_pads_debug
80
81 #define parent_class gst_collect_pads_parent_class
82 G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
83
84 struct _GstCollectDataPrivate
85 {
86   /* refcounting for struct, and destroy callback */
87   GstCollectDataDestroyNotify destroy_notify;
88   gint refcount;
89 };
90
91 struct _GstCollectPadsPrivate
92 {
93   /* with LOCK and/or STREAM_LOCK */
94   gboolean started;
95
96   /* with STREAM_LOCK */
97   guint32 cookie;               /* @data list cookie */
98   guint numpads;                /* number of pads in @data */
99   guint queuedpads;             /* number of pads with a buffer */
100   guint eospads;                /* number of pads that are EOS */
101   GstClockTime earliest_time;   /* Current earliest time */
102   GstCollectData *earliest_data;        /* Pad data for current earliest time */
103
104   /* with LOCK */
105   GSList *pad_list;             /* updated pad list */
106   guint32 pad_cookie;           /* updated cookie */
107
108   GstCollectPadsFunction func;  /* function and user_data for callback */
109   gpointer user_data;
110   GstCollectPadsBufferFunction buffer_func;     /* function and user_data for buffer callback */
111   gpointer buffer_user_data;
112   GstCollectPadsCompareFunction compare_func;
113   gpointer compare_user_data;
114   GstCollectPadsEventFunction event_func;       /* function and data for event callback */
115   gpointer event_user_data;
116   GstCollectPadsQueryFunction query_func;
117   gpointer query_user_data;
118   GstCollectPadsClipFunction clip_func;
119   gpointer clip_user_data;
120   GstCollectPadsFlushFunction flush_func;
121   gpointer flush_user_data;
122
123   /* no other lock needed */
124   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
125   GCond evt_cond;
126   guint32 evt_cookie;
127
128   gboolean seeking;
129   gboolean pending_flush_start;
130   gboolean pending_flush_stop;
131 };
132
133 static void gst_collect_pads_clear (GstCollectPads * pads,
134     GstCollectData * data);
135 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
136     GstBuffer * buffer);
137 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
138     GstEvent * event);
139 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
140     GstQuery * query);
141 static void gst_collect_pads_finalize (GObject * object);
142 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
143     pads, gpointer user_data);
144 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
145     GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
146     GstClockTime timestamp2, gpointer user_data);
147 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
148 static void ref_data (GstCollectData * data);
149 static void unref_data (GstCollectData * data);
150
151 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
152     pads, GstCollectData * data, GstEvent * event, gpointer user_data);
153 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
154     pads, GstCollectData * data, GstQuery * query, gpointer user_data);
155
156
157 /* Some properties are protected by LOCK, others by STREAM_LOCK
158  * However, manipulating either of these partitions may require
159  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
160  * Alternative implementations are possible, e.g. some low-level re-implementing
161  * of the 2 above locks to drop both of them atomically when going into _WAIT.
162  */
163 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
164 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
165 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START {    \
166   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
167   /* should work unless a lot of event'ing and thread starvation */\
168   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
169     g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
170         GST_COLLECT_PADS_GET_EVT_LOCK (pads));                    \
171   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
172   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
173 } G_STMT_END
174 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
175   GTimeVal __tv; \
176   \
177   g_get_current_time (&tv); \
178   g_time_val_add (&tv, timeout); \
179   \
180   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
181   /* should work unless a lot of event'ing and thread starvation */\
182   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
183     g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
184         GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv);                    \
185   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
186   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
187 } G_STMT_END
188 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START {       \
189   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
190   /* never mind wrap-around */                                     \
191   ++(((GstCollectPads *) pads)->priv->evt_cookie);                      \
192   g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads));        \
193   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
194 } G_STMT_END
195 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START {          \
196   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
197   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
198   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
199 } G_STMT_END
200
201 static void
202 gst_collect_pads_class_init (GstCollectPadsClass * klass)
203 {
204   GObjectClass *gobject_class = (GObjectClass *) klass;
205
206   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
207
208   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
209       "GstCollectPads");
210
211   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
212 }
213
214 static void
215 gst_collect_pads_init (GstCollectPads * pads)
216 {
217   pads->priv =
218       G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS,
219       GstCollectPadsPrivate);
220
221   pads->data = NULL;
222   pads->priv->cookie = 0;
223   pads->priv->numpads = 0;
224   pads->priv->queuedpads = 0;
225   pads->priv->eospads = 0;
226   pads->priv->started = FALSE;
227
228   g_rec_mutex_init (&pads->stream_lock);
229
230   pads->priv->func = gst_collect_pads_default_collected;
231   pads->priv->user_data = NULL;
232   pads->priv->event_func = NULL;
233   pads->priv->event_user_data = NULL;
234
235   /* members for default muxing */
236   pads->priv->buffer_func = NULL;
237   pads->priv->buffer_user_data = NULL;
238   pads->priv->compare_func = gst_collect_pads_default_compare_func;
239   pads->priv->compare_user_data = NULL;
240   pads->priv->earliest_data = NULL;
241   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
242
243   pads->priv->event_func = gst_collect_pads_event_default_internal;
244   pads->priv->query_func = gst_collect_pads_query_default_internal;
245
246   /* members to manage the pad list */
247   pads->priv->pad_cookie = 0;
248   pads->priv->pad_list = NULL;
249
250   /* members for event */
251   g_mutex_init (&pads->priv->evt_lock);
252   g_cond_init (&pads->priv->evt_cond);
253   pads->priv->evt_cookie = 0;
254
255   pads->priv->seeking = FALSE;
256   pads->priv->pending_flush_start = FALSE;
257   pads->priv->pending_flush_stop = FALSE;
258
259   /* clear floating flag */
260   gst_object_ref_sink (pads);
261 }
262
263 static void
264 gst_collect_pads_finalize (GObject * object)
265 {
266   GstCollectPads *pads = GST_COLLECT_PADS (object);
267
268   GST_DEBUG_OBJECT (object, "finalize");
269
270   g_rec_mutex_clear (&pads->stream_lock);
271
272   g_cond_clear (&pads->priv->evt_cond);
273   g_mutex_clear (&pads->priv->evt_lock);
274
275   /* Remove pads and free pads list */
276   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
277   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
278   g_slist_free (pads->data);
279   g_slist_free (pads->priv->pad_list);
280
281   G_OBJECT_CLASS (parent_class)->finalize (object);
282 }
283
284 /**
285  * gst_collect_pads_new:
286  *
287  * Create a new instance of #GstCollectPads.
288  *
289  * MT safe.
290  *
291  * Returns: (transfer full): a new #GstCollectPads, or %NULL in case of an error.
292  */
293 GstCollectPads *
294 gst_collect_pads_new (void)
295 {
296   GstCollectPads *newcoll;
297
298   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
299
300   return newcoll;
301 }
302
303 /* Must be called with GstObject lock! */
304 static void
305 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
306     GstCollectPadsBufferFunction func, gpointer user_data)
307 {
308   pads->priv->buffer_func = func;
309   pads->priv->buffer_user_data = user_data;
310 }
311
312 /**
313  * gst_collect_pads_set_buffer_function:
314  * @pads: the collectpads to use
315  * @func: (scope call): the function to set
316  * @user_data: (closure): user data passed to the function
317  *
318  * Set the callback function and user data that will be called with
319  * the oldest buffer when all pads have been collected, or %NULL on EOS.
320  * If a buffer is passed, the callback owns a reference and must unref
321  * it.
322  *
323  * MT safe.
324  */
325 void
326 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
327     GstCollectPadsBufferFunction func, gpointer user_data)
328 {
329   g_return_if_fail (pads != NULL);
330   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
331
332   GST_OBJECT_LOCK (pads);
333   gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
334   GST_OBJECT_UNLOCK (pads);
335 }
336
337 /**
338  * gst_collect_pads_set_compare_function:
339  * @pads: the pads to use
340  * @func: (scope call): the function to set
341  * @user_data: (closure): user data passed to the function
342  *
343  * Set the timestamp comparison function.
344  *
345  * MT safe.
346  */
347 /* NOTE allowing to change comparison seems not advisable;
348 no known use-case, and collaboration with default algorithm is unpredictable.
349 If custom compairing/operation is needed, just use a collect function of
350 your own */
351 void
352 gst_collect_pads_set_compare_function (GstCollectPads * pads,
353     GstCollectPadsCompareFunction func, gpointer user_data)
354 {
355   g_return_if_fail (pads != NULL);
356   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
357
358   GST_OBJECT_LOCK (pads);
359   pads->priv->compare_func = func;
360   pads->priv->compare_user_data = user_data;
361   GST_OBJECT_UNLOCK (pads);
362 }
363
364 /**
365  * gst_collect_pads_set_function:
366  * @pads: the collectpads to use
367  * @func: (scope call): the function to set
368  * @user_data: user data passed to the function
369  *
370  * CollectPads provides a default collection algorithm that will determine
371  * the oldest buffer available on all of its pads, and then delegate
372  * to a configured callback.
373  * However, if circumstances are more complicated and/or more control
374  * is desired, this sets a callback that will be invoked instead when
375  * all the pads added to the collection have buffers queued.
376  * Evidently, this callback is not compatible with
377  * gst_collect_pads_set_buffer_function() callback.
378  * If this callback is set, the former will be unset.
379  *
380  * MT safe.
381  */
382 void
383 gst_collect_pads_set_function (GstCollectPads * pads,
384     GstCollectPadsFunction func, gpointer user_data)
385 {
386   g_return_if_fail (pads != NULL);
387   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
388
389   GST_OBJECT_LOCK (pads);
390   pads->priv->func = func;
391   pads->priv->user_data = user_data;
392   gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
393   GST_OBJECT_UNLOCK (pads);
394 }
395
396 static void
397 ref_data (GstCollectData * data)
398 {
399   g_assert (data != NULL);
400
401   g_atomic_int_inc (&(data->priv->refcount));
402 }
403
404 static void
405 unref_data (GstCollectData * data)
406 {
407   g_assert (data != NULL);
408   g_assert (data->priv->refcount > 0);
409
410   if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
411     return;
412
413   if (data->priv->destroy_notify)
414     data->priv->destroy_notify (data);
415
416   g_object_unref (data->pad);
417   if (data->buffer) {
418     gst_buffer_unref (data->buffer);
419   }
420   g_free (data->priv);
421   g_free (data);
422 }
423
424 /**
425  * gst_collect_pads_set_event_function:
426  * @pads: the collectpads to use
427  * @func: (scope call): the function to set
428  * @user_data: user data passed to the function
429  *
430  * Set the event callback function and user data that will be called when
431  * collectpads has received an event originating from one of the collected
432  * pads.  If the event being processed is a serialized one, this callback is
433  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
434  * held when calling a number of CollectPads functions, it should be acquired
435  * if so (unusually) needed.
436  *
437  * MT safe.
438  */
439 void
440 gst_collect_pads_set_event_function (GstCollectPads * pads,
441     GstCollectPadsEventFunction func, gpointer user_data)
442 {
443   g_return_if_fail (pads != NULL);
444   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
445
446   GST_OBJECT_LOCK (pads);
447   pads->priv->event_func = func;
448   pads->priv->event_user_data = user_data;
449   GST_OBJECT_UNLOCK (pads);
450 }
451
452 /**
453  * gst_collect_pads_set_query_function:
454  * @pads: the collectpads to use
455  * @func: (scope call): the function to set
456  * @user_data: user data passed to the function
457  *
458  * Set the query callback function and user data that will be called after
459  * collectpads has received a query originating from one of the collected
460  * pads.  If the query being processed is a serialized one, this callback is
461  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
462  * held when calling a number of CollectPads functions, it should be acquired
463  * if so (unusually) needed.
464  *
465  * MT safe.
466  */
467 void
468 gst_collect_pads_set_query_function (GstCollectPads * pads,
469     GstCollectPadsQueryFunction func, gpointer user_data)
470 {
471   g_return_if_fail (pads != NULL);
472   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
473
474   GST_OBJECT_LOCK (pads);
475   pads->priv->query_func = func;
476   pads->priv->query_user_data = user_data;
477   GST_OBJECT_UNLOCK (pads);
478 }
479
480 /**
481 * gst_collect_pads_clip_running_time:
482 * @pads: the collectpads to use
483 * @cdata: collect data of corresponding pad
484 * @buf: buffer being clipped
485 * @outbuf: (allow-none): output buffer with running time, or NULL if clipped
486 * @user_data: user data (unused)
487 *
488 * Convenience clipping function that converts incoming buffer's timestamp
489 * to running time, or clips the buffer if outside configured segment.
490 *
491 * Since 1.6, this clipping function also sets the DTS parameter of the
492 * GstCollectData structure. This version of the running time DTS can be
493 * negative. G_MININT64 is used to indicate invalid value.
494 */
495 GstFlowReturn
496 gst_collect_pads_clip_running_time (GstCollectPads * pads,
497     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
498     gpointer user_data)
499 {
500   *outbuf = buf;
501
502   /* invalid left alone and passed */
503   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) {
504     GstClockTime time;
505     GstClockTime buf_dts, abs_dts;
506     gint dts_sign;
507
508     time = GST_BUFFER_PTS (buf);
509
510     if (GST_CLOCK_TIME_IS_VALID (time)) {
511       time =
512           gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
513       if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
514         GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
515             GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
516         gst_buffer_unref (buf);
517         *outbuf = NULL;
518         return GST_FLOW_OK;
519       }
520     }
521
522     GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
523         GST_TIME_FORMAT " running time",
524         GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
525     *outbuf = gst_buffer_make_writable (buf);
526     GST_BUFFER_PTS (*outbuf) = time;
527
528     dts_sign = gst_segment_to_running_time_full (&cdata->segment,
529         GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts);
530     buf_dts = GST_BUFFER_DTS (*outbuf);
531     if (dts_sign > 0) {
532       GST_BUFFER_DTS (*outbuf) = abs_dts;
533       GST_COLLECT_PADS_DTS (cdata) = abs_dts;
534     } else if (dts_sign < 0) {
535       GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
536       GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts);
537     } else {
538       GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
539       GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE;
540     }
541
542     GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
543         GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
544         GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata)));
545   }
546
547   return GST_FLOW_OK;
548 }
549
550 /**
551  * gst_collect_pads_set_clip_function:
552  * @pads: the collectpads to use
553  * @clipfunc: (scope call): clip function to install
554  * @user_data: user data to pass to @clip_func
555  *
556  * Install a clipping function that is called right after a buffer is received
557  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
558  */
559 void
560 gst_collect_pads_set_clip_function (GstCollectPads * pads,
561     GstCollectPadsClipFunction clipfunc, gpointer user_data)
562 {
563   g_return_if_fail (pads != NULL);
564   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
565
566   pads->priv->clip_func = clipfunc;
567   pads->priv->clip_user_data = user_data;
568 }
569
570 /**
571  * gst_collect_pads_set_flush_function:
572  * @pads: the collectpads to use
573  * @func: (scope call): flush function to install
574  * @user_data: user data to pass to @func
575  *
576  * Install a flush function that is called when the internal
577  * state of all pads should be flushed as part of flushing seek
578  * handling. See #GstCollectPadsFlushFunction for more info.
579  *
580  * Since: 1.4
581  */
582 void
583 gst_collect_pads_set_flush_function (GstCollectPads * pads,
584     GstCollectPadsFlushFunction func, gpointer user_data)
585 {
586   g_return_if_fail (pads != NULL);
587   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
588
589   pads->priv->flush_func = func;
590   pads->priv->flush_user_data = user_data;
591 }
592
593 /**
594  * gst_collect_pads_add_pad:
595  * @pads: the collectpads to use
596  * @pad: (transfer none): the pad to add
597  * @size: the size of the returned #GstCollectData structure
598  * @destroy_notify: (scope async): function to be called before the returned
599  *   #GstCollectData structure is freed
600  * @lock: whether to lock this pad in usual waiting state
601  *
602  * Add a pad to the collection of collect pads. The pad has to be
603  * a sinkpad. The refcount of the pad is incremented. Use
604  * gst_collect_pads_remove_pad() to remove the pad from the collection
605  * again.
606  *
607  * You specify a size for the returned #GstCollectData structure
608  * so that you can use it to store additional information.
609  *
610  * You can also specify a #GstCollectDataDestroyNotify that will be called
611  * just before the #GstCollectData structure is freed. It is passed the
612  * pointer to the structure and should free any custom memory and resources
613  * allocated for it.
614  *
615  * Keeping a pad locked in waiting state is only relevant when using
616  * the default collection algorithm (providing the oldest buffer).
617  * It ensures a buffer must be available on this pad for a collection
618  * to take place.  This is of typical use to a muxer element where
619  * non-subtitle streams should always be in waiting state,
620  * e.g. to assure that caps information is available on all these streams
621  * when initial headers have to be written.
622  *
623  * The pad will be automatically activated in push mode when @pads is
624  * started.
625  *
626  * MT safe.
627  *
628  * Returns: (nullable) (transfer none): a new #GstCollectData to identify the
629  *   new pad. Or %NULL if wrong parameters are supplied.
630  */
631 GstCollectData *
632 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
633     GstCollectDataDestroyNotify destroy_notify, gboolean lock)
634 {
635   GstCollectData *data;
636
637   g_return_val_if_fail (pads != NULL, NULL);
638   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
639   g_return_val_if_fail (pad != NULL, NULL);
640   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
641   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
642
643   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
644
645   data = g_malloc0 (size);
646   data->priv = g_new0 (GstCollectDataPrivate, 1);
647   data->collect = pads;
648   data->pad = gst_object_ref (pad);
649   data->buffer = NULL;
650   data->pos = 0;
651   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
652   data->state = GST_COLLECT_PADS_STATE_WAITING;
653   data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
654   data->priv->refcount = 1;
655   data->priv->destroy_notify = destroy_notify;
656   data->ABI.abi.dts = G_MININT64;
657
658   GST_OBJECT_LOCK (pads);
659   GST_OBJECT_LOCK (pad);
660   gst_pad_set_element_private (pad, data);
661   GST_OBJECT_UNLOCK (pad);
662   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
663   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
664   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
665   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
666   /* backward compat, also add to data if stopped, so that the element already
667    * has this in the public data list before going PAUSED (typically)
668    * this can only be done when we are stopped because we don't take the
669    * STREAM_LOCK to protect the pads->data list. */
670   if (!pads->priv->started) {
671     pads->data = g_slist_append (pads->data, data);
672     ref_data (data);
673   }
674   /* activate the pad when needed */
675   if (pads->priv->started)
676     gst_pad_set_active (pad, TRUE);
677   pads->priv->pad_cookie++;
678   GST_OBJECT_UNLOCK (pads);
679
680   return data;
681 }
682
683 static gint
684 find_pad (GstCollectData * data, GstPad * pad)
685 {
686   if (data->pad == pad)
687     return 0;
688   return 1;
689 }
690
691 /**
692  * gst_collect_pads_remove_pad:
693  * @pads: the collectpads to use
694  * @pad: (transfer none): the pad to remove
695  *
696  * Remove a pad from the collection of collect pads. This function will also
697  * free the #GstCollectData and all the resources that were allocated with
698  * gst_collect_pads_add_pad().
699  *
700  * The pad will be deactivated automatically when @pads is stopped.
701  *
702  * MT safe.
703  *
704  * Returns: %TRUE if the pad could be removed.
705  */
706 gboolean
707 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
708 {
709   GstCollectData *data;
710   GSList *list;
711
712   g_return_val_if_fail (pads != NULL, FALSE);
713   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
714   g_return_val_if_fail (pad != NULL, FALSE);
715   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
716
717   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
718
719   GST_OBJECT_LOCK (pads);
720   list =
721       g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
722   if (!list)
723     goto unknown_pad;
724
725   data = (GstCollectData *) list->data;
726
727   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
728       data);
729
730   /* clear the stuff we configured */
731   gst_pad_set_chain_function (pad, NULL);
732   gst_pad_set_event_function (pad, NULL);
733   GST_OBJECT_LOCK (pad);
734   gst_pad_set_element_private (pad, NULL);
735   GST_OBJECT_UNLOCK (pad);
736
737   /* backward compat, also remove from data if stopped, note that this function
738    * can only be called when we are stopped because we don't take the
739    * STREAM_LOCK to protect the pads->data list. */
740   if (!pads->priv->started) {
741     GSList *dlist;
742
743     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
744     if (dlist) {
745       GstCollectData *pdata = dlist->data;
746
747       pads->data = g_slist_delete_link (pads->data, dlist);
748       unref_data (pdata);
749     }
750   }
751   /* remove from the pad list */
752   pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
753   pads->priv->pad_cookie++;
754
755   /* signal waiters because something changed */
756   GST_COLLECT_PADS_EVT_BROADCAST (pads);
757
758   /* deactivate the pad when needed */
759   if (!pads->priv->started)
760     gst_pad_set_active (pad, FALSE);
761
762   /* clean and free the collect data */
763   unref_data (data);
764
765   GST_OBJECT_UNLOCK (pads);
766
767   return TRUE;
768
769 unknown_pad:
770   {
771     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
772         GST_DEBUG_PAD_NAME (pad));
773     GST_OBJECT_UNLOCK (pads);
774     return FALSE;
775   }
776 }
777
778 /*
779  * Must be called with STREAM_LOCK and OBJECT_LOCK.
780  */
781 static void
782 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
783     gboolean flushing)
784 {
785   GSList *walk = NULL;
786
787   /* Update the pads flushing flag */
788   for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
789     GstCollectData *cdata = walk->data;
790
791     if (GST_IS_PAD (cdata->pad)) {
792       GST_OBJECT_LOCK (cdata->pad);
793       if (flushing)
794         GST_PAD_SET_FLUSHING (cdata->pad);
795       else
796         GST_PAD_UNSET_FLUSHING (cdata->pad);
797       if (flushing)
798         GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
799       else
800         GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
801       gst_collect_pads_clear (pads, cdata);
802       GST_OBJECT_UNLOCK (cdata->pad);
803     }
804   }
805
806   /* inform _chain of changes */
807   GST_COLLECT_PADS_EVT_BROADCAST (pads);
808 }
809
810 /**
811  * gst_collect_pads_set_flushing:
812  * @pads: the collectpads to use
813  * @flushing: desired state of the pads
814  *
815  * Change the flushing state of all the pads in the collection. No pad
816  * is able to accept anymore data when @flushing is %TRUE. Calling this
817  * function with @flushing %FALSE makes @pads accept data again.
818  * Caller must ensure that downstream streaming (thread) is not blocked,
819  * e.g. by sending a FLUSH_START downstream.
820  *
821  * MT safe.
822  */
823 void
824 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
825 {
826   g_return_if_fail (pads != NULL);
827   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
828
829   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
830   GST_COLLECT_PADS_STREAM_LOCK (pads);
831   GST_OBJECT_LOCK (pads);
832   gst_collect_pads_set_flushing_unlocked (pads, flushing);
833   GST_OBJECT_UNLOCK (pads);
834   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
835 }
836
837 /**
838  * gst_collect_pads_start:
839  * @pads: the collectpads to use
840  *
841  * Starts the processing of data in the collect_pads.
842  *
843  * MT safe.
844  */
845 void
846 gst_collect_pads_start (GstCollectPads * pads)
847 {
848   GSList *collected;
849
850   g_return_if_fail (pads != NULL);
851   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
852
853   GST_DEBUG_OBJECT (pads, "starting collect pads");
854
855   /* make sure stop and collect cannot be called anymore */
856   GST_COLLECT_PADS_STREAM_LOCK (pads);
857
858   /* make pads streamable */
859   GST_OBJECT_LOCK (pads);
860
861   /* loop over the master pad list and reset the segment */
862   collected = pads->priv->pad_list;
863   for (; collected; collected = g_slist_next (collected)) {
864     GstCollectData *data;
865
866     data = collected->data;
867     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
868   }
869
870   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
871
872   /* Start collect pads */
873   pads->priv->started = TRUE;
874   GST_OBJECT_UNLOCK (pads);
875   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
876 }
877
878 /**
879  * gst_collect_pads_stop:
880  * @pads: the collectpads to use
881  *
882  * Stops the processing of data in the collect_pads. this function
883  * will also unblock any blocking operations.
884  *
885  * MT safe.
886  */
887 void
888 gst_collect_pads_stop (GstCollectPads * pads)
889 {
890   GSList *collected;
891
892   g_return_if_fail (pads != NULL);
893   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
894
895   GST_DEBUG_OBJECT (pads, "stopping collect pads");
896
897   /* make sure collect and start cannot be called anymore */
898   GST_COLLECT_PADS_STREAM_LOCK (pads);
899
900   /* make pads not accept data anymore */
901   GST_OBJECT_LOCK (pads);
902   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
903
904   /* Stop collect pads */
905   pads->priv->started = FALSE;
906   pads->priv->eospads = 0;
907   pads->priv->queuedpads = 0;
908
909   /* loop over the master pad list and flush buffers */
910   collected = pads->priv->pad_list;
911   for (; collected; collected = g_slist_next (collected)) {
912     GstCollectData *data;
913     GstBuffer **buffer_p;
914
915     data = collected->data;
916     if (data->buffer) {
917       buffer_p = &data->buffer;
918       gst_buffer_replace (buffer_p, NULL);
919       data->pos = 0;
920     }
921     GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
922   }
923
924   if (pads->priv->earliest_data)
925     unref_data (pads->priv->earliest_data);
926   pads->priv->earliest_data = NULL;
927   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
928
929   GST_OBJECT_UNLOCK (pads);
930   /* Wake them up so they can end the chain functions. */
931   GST_COLLECT_PADS_EVT_BROADCAST (pads);
932
933   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
934 }
935
936 /**
937  * gst_collect_pads_peek:
938  * @pads: the collectpads to peek
939  * @data: the data to use
940  *
941  * Peek at the buffer currently queued in @data. This function
942  * should be called with the @pads STREAM_LOCK held, such as in the callback
943  * handler.
944  *
945  * MT safe.
946  *
947  * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
948  * buffer is queued. should unref the buffer after usage.
949  */
950 GstBuffer *
951 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
952 {
953   GstBuffer *result;
954
955   g_return_val_if_fail (pads != NULL, NULL);
956   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
957   g_return_val_if_fail (data != NULL, NULL);
958
959   if ((result = data->buffer))
960     gst_buffer_ref (result);
961
962   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%" GST_PTR_FORMAT,
963       GST_DEBUG_PAD_NAME (data->pad), result);
964
965   return result;
966 }
967
968 /**
969  * gst_collect_pads_pop:
970  * @pads: the collectpads to pop
971  * @data: the data to use
972  *
973  * Pop the buffer currently queued in @data. This function
974  * should be called with the @pads STREAM_LOCK held, such as in the callback
975  * handler.
976  *
977  * MT safe.
978  *
979  * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
980  * buffer was queued. You should unref the buffer after usage.
981  */
982 GstBuffer *
983 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
984 {
985   GstBuffer *result;
986
987   g_return_val_if_fail (pads != NULL, NULL);
988   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
989   g_return_val_if_fail (data != NULL, NULL);
990
991   if ((result = data->buffer)) {
992     data->buffer = NULL;
993     data->pos = 0;
994     /* one less pad with queued data now */
995     if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
996       pads->priv->queuedpads--;
997   }
998
999   GST_COLLECT_PADS_EVT_BROADCAST (pads);
1000
1001   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%" GST_PTR_FORMAT,
1002       GST_DEBUG_PAD_NAME (data->pad), result);
1003
1004   return result;
1005 }
1006
1007 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
1008  * held */
1009 static void
1010 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
1011 {
1012   GstBuffer *buf;
1013
1014   if ((buf = gst_collect_pads_pop (pads, data)))
1015     gst_buffer_unref (buf);
1016 }
1017
1018 /**
1019  * gst_collect_pads_available:
1020  * @pads: the collectpads to query
1021  *
1022  * Query how much bytes can be read from each queued buffer. This means
1023  * that the result of this call is the maximum number of bytes that can
1024  * be read from each of the pads.
1025  *
1026  * This function should be called with @pads STREAM_LOCK held, such as
1027  * in the callback.
1028  *
1029  * MT safe.
1030  *
1031  * Returns: The maximum number of bytes queued on all pads. This function
1032  * returns 0 if a pad has no queued buffer.
1033  */
1034 /* we might pre-calculate this in some struct field,
1035  * but would then have to maintain this in _chain and particularly _pop, etc,
1036  * even if element is never interested in this information */
1037 guint
1038 gst_collect_pads_available (GstCollectPads * pads)
1039 {
1040   GSList *collected;
1041   guint result = G_MAXUINT;
1042
1043   g_return_val_if_fail (pads != NULL, 0);
1044   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1045
1046   collected = pads->data;
1047   for (; collected; collected = g_slist_next (collected)) {
1048     GstCollectData *pdata;
1049     GstBuffer *buffer;
1050     gint size;
1051
1052     pdata = (GstCollectData *) collected->data;
1053
1054     /* ignore pad with EOS */
1055     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1056                 GST_COLLECT_PADS_STATE_EOS))) {
1057       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1058       continue;
1059     }
1060
1061     /* an empty buffer without EOS is weird when we get here.. */
1062     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1063       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1064       goto not_filled;
1065     }
1066
1067     /* this is the size left of the buffer */
1068     size = gst_buffer_get_size (buffer) - pdata->pos;
1069     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1070
1071     /* need to return the min of all available data */
1072     if (size < result)
1073       result = size;
1074   }
1075   /* nothing changed, all must be EOS then, return 0 */
1076   if (G_UNLIKELY (result == G_MAXUINT))
1077     result = 0;
1078
1079   return result;
1080
1081 not_filled:
1082   {
1083     return 0;
1084   }
1085 }
1086
1087 /**
1088  * gst_collect_pads_flush:
1089  * @pads: the collectpads to query
1090  * @data: the data to use
1091  * @size: the number of bytes to flush
1092  *
1093  * Flush @size bytes from the pad @data.
1094  *
1095  * This function should be called with @pads STREAM_LOCK held, such as
1096  * in the callback.
1097  *
1098  * MT safe.
1099  *
1100  * Returns: The number of bytes flushed This can be less than @size and
1101  * is 0 if the pad was end-of-stream.
1102  */
1103 guint
1104 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1105     guint size)
1106 {
1107   guint flushsize;
1108   gsize bsize;
1109   GstBuffer *buffer;
1110
1111   g_return_val_if_fail (pads != NULL, 0);
1112   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1113   g_return_val_if_fail (data != NULL, 0);
1114
1115   /* no buffer, must be EOS */
1116   if ((buffer = data->buffer) == NULL)
1117     return 0;
1118
1119   bsize = gst_buffer_get_size (buffer);
1120
1121   /* this is what we can flush at max */
1122   flushsize = MIN (size, bsize - data->pos);
1123
1124   data->pos += size;
1125
1126   if (data->pos >= bsize)
1127     /* _clear will also reset data->pos to 0 */
1128     gst_collect_pads_clear (pads, data);
1129
1130   return flushsize;
1131 }
1132
1133 /**
1134  * gst_collect_pads_read_buffer:
1135  * @pads: the collectpads to query
1136  * @data: the data to use
1137  * @size: the number of bytes to read
1138  *
1139  * Get a subbuffer of @size bytes from the given pad @data.
1140  *
1141  * This function should be called with @pads STREAM_LOCK held, such as in the
1142  * callback.
1143  *
1144  * MT safe.
1145  *
1146  * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1147  * be less that requested. A return of %NULL signals that the pad is
1148  * end-of-stream. Unref the buffer after use.
1149  */
1150 GstBuffer *
1151 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1152     guint size)
1153 {
1154   guint readsize, buf_size;
1155   GstBuffer *buffer;
1156
1157   g_return_val_if_fail (pads != NULL, NULL);
1158   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1159   g_return_val_if_fail (data != NULL, NULL);
1160
1161   /* no buffer, must be EOS */
1162   if ((buffer = data->buffer) == NULL)
1163     return NULL;
1164
1165   buf_size = gst_buffer_get_size (buffer);
1166   readsize = MIN (size, buf_size - data->pos);
1167
1168   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1169       readsize);
1170 }
1171
1172 /**
1173  * gst_collect_pads_take_buffer:
1174  * @pads: the collectpads to query
1175  * @data: the data to use
1176  * @size: the number of bytes to read
1177  *
1178  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1179  * of read bytes.
1180  *
1181  * This function should be called with @pads STREAM_LOCK held, such as in the
1182  * callback.
1183  *
1184  * MT safe.
1185  *
1186  * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1187  * be less that requested. A return of %NULL signals that the pad is
1188  * end-of-stream. Unref the buffer after use.
1189  */
1190 GstBuffer *
1191 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1192     guint size)
1193 {
1194   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1195
1196   if (buffer) {
1197     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1198   }
1199   return buffer;
1200 }
1201
1202 /**
1203  * gst_collect_pads_set_waiting:
1204  * @pads: the collectpads
1205  * @data: the data to use
1206  * @waiting: boolean indicating whether this pad should operate
1207  *           in waiting or non-waiting mode
1208  *
1209  * Sets a pad to waiting or non-waiting mode, if at least this pad
1210  * has not been created with locked waiting state,
1211  * in which case nothing happens.
1212  *
1213  * This function should be called with @pads STREAM_LOCK held, such as
1214  * in the callback.
1215  *
1216  * MT safe.
1217  */
1218 void
1219 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1220     gboolean waiting)
1221 {
1222   g_return_if_fail (pads != NULL);
1223   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1224   g_return_if_fail (data != NULL);
1225
1226   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1227       GST_PAD_NAME (data->pad), waiting,
1228       GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1229
1230   /* Do something only on a change and if not locked */
1231   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1232       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1233           ! !waiting)) {
1234     /* Set waiting state for this pad */
1235     if (waiting)
1236       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1237     else
1238       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1239     /* Update number of queued pads if needed */
1240     if (!data->buffer &&
1241         !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1242       if (waiting)
1243         pads->priv->queuedpads--;
1244       else
1245         pads->priv->queuedpads++;
1246     }
1247
1248     /* signal waiters because something changed */
1249     GST_COLLECT_PADS_EVT_BROADCAST (pads);
1250   }
1251 }
1252
1253 /* see if pads were added or removed and update our stats. Any pad
1254  * added after releasing the LOCK will get collected in the next
1255  * round.
1256  *
1257  * We can do a quick check by checking the cookies, that get changed
1258  * whenever the pad list is updated.
1259  *
1260  * Must be called with STREAM_LOCK.
1261  */
1262 static void
1263 gst_collect_pads_check_pads (GstCollectPads * pads)
1264 {
1265   /* the master list and cookie are protected with LOCK */
1266   GST_OBJECT_LOCK (pads);
1267   if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1268     GSList *collected;
1269
1270     /* clear list and stats */
1271     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1272     g_slist_free (pads->data);
1273     pads->data = NULL;
1274     pads->priv->numpads = 0;
1275     pads->priv->queuedpads = 0;
1276     pads->priv->eospads = 0;
1277     if (pads->priv->earliest_data)
1278       unref_data (pads->priv->earliest_data);
1279     pads->priv->earliest_data = NULL;
1280     pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1281
1282     /* loop over the master pad list */
1283     collected = pads->priv->pad_list;
1284     for (; collected; collected = g_slist_next (collected)) {
1285       GstCollectData *data;
1286
1287       /* update the stats */
1288       pads->priv->numpads++;
1289       data = collected->data;
1290       if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1291         pads->priv->eospads++;
1292       else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1293               GST_COLLECT_PADS_STATE_WAITING))
1294         pads->priv->queuedpads++;
1295
1296       /* add to the list of pads to collect */
1297       ref_data (data);
1298       /* preserve order of adding/requesting pads */
1299       pads->data = g_slist_append (pads->data, data);
1300     }
1301     /* and update the cookie */
1302     pads->priv->cookie = pads->priv->pad_cookie;
1303   }
1304   GST_OBJECT_UNLOCK (pads);
1305 }
1306
1307 /* checks if all the pads are collected and call the collectfunction
1308  *
1309  * Should be called with STREAM_LOCK.
1310  *
1311  * Returns: The #GstFlowReturn of collection.
1312  */
1313 static GstFlowReturn
1314 gst_collect_pads_check_collected (GstCollectPads * pads)
1315 {
1316   GstFlowReturn flow_ret = GST_FLOW_OK;
1317   GstCollectPadsFunction func;
1318   gpointer user_data;
1319
1320   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1321
1322   GST_OBJECT_LOCK (pads);
1323   func = pads->priv->func;
1324   user_data = pads->priv->user_data;
1325   GST_OBJECT_UNLOCK (pads);
1326
1327   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1328
1329   /* check for new pads, update stats etc.. */
1330   gst_collect_pads_check_pads (pads);
1331
1332   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1333     /* If all our pads are EOS just collect once to let the element
1334      * do its final EOS handling. */
1335     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1336         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1337
1338     if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1339                 TRUE, FALSE))) {
1340       GST_INFO_OBJECT (pads, "finished seeking");
1341     }
1342     do {
1343       flow_ret = func (pads, user_data);
1344     } while (flow_ret == GST_FLOW_OK);
1345   } else {
1346     gboolean collected = FALSE;
1347
1348     /* We call the collected function as long as our condition matches. */
1349     while (((pads->priv->queuedpads + pads->priv->eospads) >=
1350             pads->priv->numpads)) {
1351       GST_DEBUG_OBJECT (pads,
1352           "All active pads (%d + %d >= %d) have data, " "calling %s",
1353           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1354           GST_DEBUG_FUNCPTR_NAME (func));
1355
1356       if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1357                   TRUE, FALSE))) {
1358         GST_INFO_OBJECT (pads, "finished seeking");
1359       }
1360       flow_ret = func (pads, user_data);
1361       collected = TRUE;
1362
1363       /* break on error */
1364       if (flow_ret != GST_FLOW_OK)
1365         break;
1366       /* Don't keep looping after telling the element EOS or flushing */
1367       if (pads->priv->queuedpads == 0)
1368         break;
1369     }
1370     if (!collected)
1371       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1372           pads->priv->numpads);
1373   }
1374   return flow_ret;
1375 }
1376
1377
1378 /* General overview:
1379  * - only pad with a buffer can determine earliest_data (and earliest_time)
1380  * - only segment info determines (non-)waiting state
1381  * - ? perhaps use _stream_time for comparison
1382  *   (which muxers might have use as well ?)
1383  */
1384
1385 /*
1386  * Function to recalculate the waiting state of all pads.
1387  *
1388  * Must be called with STREAM_LOCK.
1389  *
1390  * Returns %TRUE if a pad was set to waiting
1391  * (from non-waiting state).
1392  */
1393 static gboolean
1394 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1395 {
1396   GSList *collected;
1397   gboolean result = FALSE;
1398
1399   /* If earliest time is not known, there is nothing to do. */
1400   if (pads->priv->earliest_data == NULL)
1401     return FALSE;
1402
1403   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1404     GstCollectData *data = (GstCollectData *) collected->data;
1405     int cmp_res;
1406     GstClockTime comp_time;
1407
1408     /* check if pad has a segment */
1409     if (data->segment.format == GST_FORMAT_UNDEFINED) {
1410       GST_WARNING_OBJECT (pads,
1411           "GstCollectPads has no time segment, assuming 0 based.");
1412       gst_segment_init (&data->segment, GST_FORMAT_TIME);
1413       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1414     }
1415
1416     /* check segment format */
1417     if (data->segment.format != GST_FORMAT_TIME) {
1418       GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1419       continue;
1420     }
1421
1422     /* check if the waiting state should be changed */
1423     comp_time = data->segment.position;
1424     cmp_res = pads->priv->compare_func (pads, data, comp_time,
1425         pads->priv->earliest_data, pads->priv->earliest_time,
1426         pads->priv->compare_user_data);
1427     if (cmp_res > 0)
1428       /* stop waiting */
1429       gst_collect_pads_set_waiting (pads, data, FALSE);
1430     else {
1431       if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1432         /* start waiting */
1433         gst_collect_pads_set_waiting (pads, data, TRUE);
1434         result = TRUE;
1435       }
1436     }
1437   }
1438
1439   return result;
1440 }
1441
1442 /**
1443  * gst_collect_pads_find_best_pad:
1444  * @pads: the collectpads to use
1445  * @data: returns the collectdata for earliest data
1446  * @time: returns the earliest available buffertime
1447  *
1448  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1449  * and return the corresponding #GstCollectData and buffertime.
1450  *
1451  * This function should be called with STREAM_LOCK held,
1452  * such as in the callback.
1453  */
1454 static void
1455 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1456     GstCollectData ** data, GstClockTime * time)
1457 {
1458   GSList *collected;
1459   GstCollectData *best = NULL;
1460   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1461
1462   g_return_if_fail (data != NULL);
1463   g_return_if_fail (time != NULL);
1464
1465   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1466     GstBuffer *buffer;
1467     GstCollectData *data = (GstCollectData *) collected->data;
1468     GstClockTime timestamp;
1469
1470     buffer = gst_collect_pads_peek (pads, data);
1471     /* if we have a buffer check if it is better then the current best one */
1472     if (buffer != NULL) {
1473       timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1474       gst_buffer_unref (buffer);
1475       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1476               best, best_time, pads->priv->compare_user_data) < 0) {
1477         best = data;
1478         best_time = timestamp;
1479       }
1480     }
1481   }
1482
1483   /* set earliest time */
1484   *data = best;
1485   *time = best_time;
1486
1487   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1488       best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1489       GST_TIME_ARGS (best_time));
1490 }
1491
1492 /*
1493  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1494  * gst_collect_pads_recalculate_waiting
1495  *
1496  * Must be called with STREAM_LOCK.
1497  */
1498 static gboolean
1499 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1500 {
1501   if (pads->priv->earliest_data)
1502     unref_data (pads->priv->earliest_data);
1503   gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1504       &pads->priv->earliest_time);
1505   if (pads->priv->earliest_data)
1506     ref_data (pads->priv->earliest_data);
1507   return gst_collect_pads_recalculate_waiting (pads);
1508 }
1509
1510 /*
1511  * Default collect callback triggered when #GstCollectPads gathered all data.
1512  *
1513  * Called with STREAM_LOCK.
1514  */
1515 static GstFlowReturn
1516 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1517 {
1518   GstCollectData *best = NULL;
1519   GstBuffer *buffer;
1520   GstFlowReturn ret = GST_FLOW_OK;
1521   GstCollectPadsBufferFunction func;
1522   gpointer buffer_user_data;
1523
1524   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1525
1526   GST_OBJECT_LOCK (pads);
1527   func = pads->priv->buffer_func;
1528   buffer_user_data = pads->priv->buffer_user_data;
1529   GST_OBJECT_UNLOCK (pads);
1530
1531   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1532
1533   /* Find the oldest pad at all cost */
1534   if (gst_collect_pads_recalculate_full (pads)) {
1535     /* waiting was switched on,
1536      * so give another thread a chance to deliver a possibly
1537      * older buffer; don't charge on yet with the current oldest */
1538     ret = GST_FLOW_OK;
1539     goto done;
1540   }
1541
1542   best = pads->priv->earliest_data;
1543
1544   /* No data collected means EOS. */
1545   if (G_UNLIKELY (best == NULL)) {
1546     ret = func (pads, best, NULL, buffer_user_data);
1547     if (ret == GST_FLOW_OK)
1548       ret = GST_FLOW_EOS;
1549     goto done;
1550   }
1551
1552   /* make sure that the pad we take a buffer from is waiting;
1553    * otherwise popping a buffer will seem not to have happened
1554    * and collectpads can get into a busy loop */
1555   gst_collect_pads_set_waiting (pads, best, TRUE);
1556
1557   /* Send buffer */
1558   buffer = gst_collect_pads_pop (pads, best);
1559   ret = func (pads, best, buffer, buffer_user_data);
1560
1561   /* maybe non-waiting was forced to waiting above due to
1562    * newsegment events coming too sparsely,
1563    * so re-check to restore state to avoid hanging/waiting */
1564   gst_collect_pads_recalculate_full (pads);
1565
1566 done:
1567   return ret;
1568 }
1569
1570 /*
1571  * Default timestamp compare function.
1572  */
1573 static gint
1574 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1575     GstCollectData * data1, GstClockTime timestamp1,
1576     GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1577 {
1578
1579   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1580       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1581       GST_TIME_ARGS (timestamp2));
1582   /* non-valid timestamps go first as they are probably headers or so */
1583   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1584     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1585
1586   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1587     return 1;
1588
1589   /* compare timestamp */
1590   if (timestamp1 < timestamp2)
1591     return -1;
1592
1593   if (timestamp1 > timestamp2)
1594     return 1;
1595
1596   return 0;
1597 }
1598
1599 /* called with STREAM_LOCK */
1600 static void
1601 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1602     GstCollectData * data, GstClockTime new_pos)
1603 {
1604   gint cmp_res;
1605
1606   /* If oldest time is not known, or current pad got newsegment;
1607    * recalculate the state */
1608   if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1609     gst_collect_pads_recalculate_full (pads);
1610     goto exit;
1611   }
1612
1613   /* Check if the waiting state of the pad should change. */
1614   cmp_res =
1615       pads->priv->compare_func (pads, data, new_pos,
1616       pads->priv->earliest_data, pads->priv->earliest_time,
1617       pads->priv->compare_user_data);
1618
1619   if (cmp_res > 0)
1620     /* Stop waiting */
1621     gst_collect_pads_set_waiting (pads, data, FALSE);
1622
1623 exit:
1624   return;
1625
1626 }
1627
1628 static GstClockTime
1629 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1630     GstClockTime time)
1631 {
1632   GstClockTime otime = time;
1633   GstBuffer *in, *out = NULL;
1634
1635   if (pads->priv->clip_func) {
1636     in = gst_buffer_new ();
1637     GST_BUFFER_PTS (in) = time;
1638     GST_BUFFER_DTS (in) = GST_CLOCK_TIME_NONE;
1639     pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1640     if (out) {
1641       otime = GST_BUFFER_PTS (out);
1642       gst_buffer_unref (out);
1643     } else {
1644       /* FIXME should distinguish between ahead or after segment,
1645        * let's assume after segment and use some large time ... */
1646       otime = G_MAXINT64 / 2;
1647     }
1648   }
1649
1650   return otime;
1651 }
1652
1653 /**
1654  * gst_collect_pads_event_default:
1655  * @pads: the collectpads to use
1656  * @data: collect data of corresponding pad
1657  * @event: event being processed
1658  * @discard: process but do not send event downstream
1659  *
1660  * Default #GstCollectPads event handling that elements should always
1661  * chain up to to ensure proper operation.  Element might however indicate
1662  * event should not be forwarded downstream.
1663  */
1664 gboolean
1665 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1666     GstEvent * event, gboolean discard)
1667 {
1668   gboolean res = TRUE;
1669   GstCollectPadsBufferFunction buffer_func;
1670   GstObject *parent;
1671   GstPad *pad;
1672
1673   GST_OBJECT_LOCK (pads);
1674   buffer_func = pads->priv->buffer_func;
1675   GST_OBJECT_UNLOCK (pads);
1676
1677   pad = data->pad;
1678   parent = GST_OBJECT_PARENT (pad);
1679
1680   switch (GST_EVENT_TYPE (event)) {
1681     case GST_EVENT_FLUSH_START:
1682     {
1683       if (g_atomic_int_get (&pads->priv->seeking)) {
1684         /* drop all but the first FLUSH_STARTs when seeking */
1685         if (!g_atomic_int_compare_and_exchange (&pads->
1686                 priv->pending_flush_start, TRUE, FALSE))
1687           goto eat;
1688
1689         /* unblock collect pads */
1690         gst_pad_event_default (pad, parent, event);
1691         event = NULL;
1692
1693         GST_COLLECT_PADS_STREAM_LOCK (pads);
1694         /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
1695          * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
1696          * non-flushing (which happens in gst_collect_pads_event_default).
1697          */
1698         gst_collect_pads_set_flushing (pads, TRUE);
1699
1700         if (pads->priv->flush_func)
1701           pads->priv->flush_func (pads, pads->priv->flush_user_data);
1702
1703         g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
1704         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1705
1706         goto eat;
1707       } else {
1708         /* forward event to unblock check_collected */
1709         GST_DEBUG_OBJECT (pad, "forwarding flush start");
1710         res = gst_pad_event_default (pad, parent, event);
1711         event = NULL;
1712
1713         /* now unblock the chain function.
1714          * no cond per pad, so they all unblock,
1715          * non-flushing block again */
1716         GST_COLLECT_PADS_STREAM_LOCK (pads);
1717         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1718         gst_collect_pads_clear (pads, data);
1719
1720         /* cater for possible default muxing functionality */
1721         if (buffer_func) {
1722           /* restore to initial state */
1723           gst_collect_pads_set_waiting (pads, data, TRUE);
1724           /* if the current pad is affected, reset state, recalculate later */
1725           if (pads->priv->earliest_data == data) {
1726             unref_data (data);
1727             pads->priv->earliest_data = NULL;
1728             pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1729           }
1730         }
1731
1732         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1733
1734         goto eat;
1735       }
1736     }
1737     case GST_EVENT_FLUSH_STOP:
1738     {
1739       /* flush the 1 buffer queue */
1740       GST_COLLECT_PADS_STREAM_LOCK (pads);
1741       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1742       gst_collect_pads_clear (pads, data);
1743       /* we need new segment info after the flush */
1744       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1745       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1746       /* if the pad was EOS, remove the EOS flag and
1747        * decrement the number of eospads */
1748       if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1749                   GST_COLLECT_PADS_STATE_EOS))) {
1750         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1751                 GST_COLLECT_PADS_STATE_WAITING))
1752           pads->priv->queuedpads++;
1753         if (!g_atomic_int_get (&pads->priv->seeking)) {
1754           pads->priv->eospads--;
1755         }
1756         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1757       }
1758       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1759
1760       if (g_atomic_int_get (&pads->priv->seeking)) {
1761         if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
1762                 TRUE, FALSE))
1763           goto forward;
1764         else
1765           goto eat;
1766       } else {
1767         goto forward;
1768       }
1769     }
1770     case GST_EVENT_EOS:
1771     {
1772       GST_COLLECT_PADS_STREAM_LOCK (pads);
1773       /* if the pad was not EOS, make it EOS and so we
1774        * have one more eospad */
1775       if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1776                   GST_COLLECT_PADS_STATE_EOS))) {
1777         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1778         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1779                 GST_COLLECT_PADS_STATE_WAITING))
1780           pads->priv->queuedpads--;
1781         pads->priv->eospads++;
1782       }
1783       /* check if we need collecting anything, we ignore the result. */
1784       gst_collect_pads_check_collected (pads);
1785       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1786
1787       goto eat;
1788     }
1789     case GST_EVENT_SEGMENT:
1790     {
1791       GstSegment seg;
1792
1793       GST_COLLECT_PADS_STREAM_LOCK (pads);
1794
1795       gst_event_copy_segment (event, &seg);
1796
1797       GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1798
1799       /* default collection can not handle other segment formats than time */
1800       if (buffer_func && seg.format != GST_FORMAT_TIME) {
1801         GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1802             "can only handle time segments. Non time segment ignored.");
1803         goto newsegment_done;
1804       }
1805
1806       /* need to update segment first */
1807       data->segment = seg;
1808       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1809
1810       /* now we can use for e.g. running time */
1811       seg.position =
1812           gst_collect_pads_clip_time (pads, data, seg.start + seg.offset);
1813       /* update again */
1814       data->segment = seg;
1815
1816       /* default muxing functionality */
1817       if (!buffer_func)
1818         goto newsegment_done;
1819
1820       gst_collect_pads_handle_position_update (pads, data, seg.position);
1821
1822     newsegment_done:
1823       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1824       /* we must not forward this event since multiple segments will be
1825        * accumulated and this is certainly not what we want. */
1826       goto eat;
1827     }
1828     case GST_EVENT_GAP:
1829     {
1830       GstClockTime start, duration;
1831
1832       GST_COLLECT_PADS_STREAM_LOCK (pads);
1833
1834       gst_event_parse_gap (event, &start, &duration);
1835       /* FIXME, handle reverse playback case */
1836       if (GST_CLOCK_TIME_IS_VALID (duration))
1837         start += duration;
1838       /* we do not expect another buffer until after gap,
1839        * so that is our position now */
1840       data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1841
1842       gst_collect_pads_handle_position_update (pads, data,
1843           data->segment.position);
1844
1845       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1846       goto eat;
1847     }
1848     case GST_EVENT_STREAM_START:
1849       /* drop stream start events, element must create its own start event,
1850        * we can't just forward the first random stream start event we get */
1851       goto eat;
1852     case GST_EVENT_CAPS:
1853       goto eat;
1854     default:
1855       /* forward other events */
1856       goto forward;
1857   }
1858
1859 eat:
1860   if (event)
1861     gst_event_unref (event);
1862   return res;
1863
1864 forward:
1865   if (discard)
1866     goto eat;
1867   else
1868     return gst_pad_event_default (pad, parent, event);
1869 }
1870
1871 typedef struct
1872 {
1873   GstEvent *event;
1874   gboolean result;
1875 } EventData;
1876
1877 static gboolean
1878 event_forward_func (GstPad * pad, EventData * data)
1879 {
1880   gboolean ret = TRUE;
1881   GstPad *peer = gst_pad_get_peer (pad);
1882
1883   if (peer) {
1884     ret = gst_pad_send_event (peer, gst_event_ref (data->event));
1885     gst_object_unref (peer);
1886   }
1887
1888   data->result &= ret;
1889   /* Always send to all pads */
1890   return FALSE;
1891 }
1892
1893 static gboolean
1894 forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
1895 {
1896   EventData data;
1897
1898   data.event = event;
1899   data.result = TRUE;
1900
1901   gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
1902
1903   gst_event_unref (event);
1904
1905   return data.result;
1906 }
1907
1908 /**
1909  * gst_collect_pads_src_event_default:
1910  * @pads: the #GstCollectPads to use
1911  * @pad: src #GstPad that received the event
1912  * @event: event being processed
1913  *
1914  * Default #GstCollectPads event handling for the src pad of elements.
1915  * Elements can chain up to this to let flushing seek event handling
1916  * be done by #GstCollectPads.
1917  *
1918  * Since: 1.4
1919  */
1920 gboolean
1921 gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
1922     GstEvent * event)
1923 {
1924   GstObject *parent;
1925   gboolean res = TRUE;
1926
1927   parent = GST_OBJECT_PARENT (pad);
1928
1929   switch (GST_EVENT_TYPE (event)) {
1930     case GST_EVENT_SEEK:{
1931       GstSeekFlags flags;
1932
1933       pads->priv->eospads = 0;
1934
1935       GST_INFO_OBJECT (pads, "starting seek");
1936
1937       gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
1938       if (flags & GST_SEEK_FLAG_FLUSH) {
1939         g_atomic_int_set (&pads->priv->seeking, TRUE);
1940         g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
1941         /* forward the seek upstream */
1942         res = forward_event_to_all_sinkpads (pad, event);
1943         event = NULL;
1944         if (!res) {
1945           g_atomic_int_set (&pads->priv->seeking, FALSE);
1946           g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
1947         }
1948       }
1949
1950       GST_INFO_OBJECT (pads, "seek done, result: %d", res);
1951
1952       break;
1953     }
1954     default:
1955       break;
1956   }
1957
1958   if (event)
1959     res = gst_pad_event_default (pad, parent, event);
1960
1961   return res;
1962 }
1963
1964 static gboolean
1965 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1966     GstCollectData * data, GstEvent * event, gpointer user_data)
1967 {
1968   return gst_collect_pads_event_default (pads, data, event, FALSE);
1969 }
1970
1971 static gboolean
1972 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1973 {
1974   gboolean res = FALSE, need_unlock = FALSE;
1975   GstCollectData *data;
1976   GstCollectPads *pads;
1977   GstCollectPadsEventFunction event_func;
1978   gpointer event_user_data;
1979
1980   /* some magic to get the managing collect_pads */
1981   GST_OBJECT_LOCK (pad);
1982   data = (GstCollectData *) gst_pad_get_element_private (pad);
1983   if (G_UNLIKELY (data == NULL))
1984     goto pad_removed;
1985   ref_data (data);
1986   GST_OBJECT_UNLOCK (pad);
1987
1988   res = FALSE;
1989
1990   pads = data->collect;
1991
1992   GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1993       GST_EVENT_TYPE_NAME (event));
1994
1995   GST_OBJECT_LOCK (pads);
1996   event_func = pads->priv->event_func;
1997   event_user_data = pads->priv->event_user_data;
1998   GST_OBJECT_UNLOCK (pads);
1999
2000   if (GST_EVENT_IS_SERIALIZED (event)) {
2001     GST_COLLECT_PADS_STREAM_LOCK (pads);
2002     need_unlock = TRUE;
2003   }
2004
2005   if (G_LIKELY (event_func)) {
2006     res = event_func (pads, data, event, event_user_data);
2007   }
2008
2009   if (need_unlock)
2010     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2011
2012   unref_data (data);
2013   return res;
2014
2015   /* ERRORS */
2016 pad_removed:
2017   {
2018     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2019     GST_OBJECT_UNLOCK (pad);
2020     return FALSE;
2021   }
2022 }
2023
2024 /**
2025  * gst_collect_pads_query_default:
2026  * @pads: the collectpads to use
2027  * @data: collect data of corresponding pad
2028  * @query: query being processed
2029  * @discard: process but do not send event downstream
2030  *
2031  * Default #GstCollectPads query handling that elements should always
2032  * chain up to to ensure proper operation.  Element might however indicate
2033  * query should not be forwarded downstream.
2034  */
2035 gboolean
2036 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
2037     GstQuery * query, gboolean discard)
2038 {
2039   gboolean res = TRUE;
2040   GstObject *parent;
2041   GstPad *pad;
2042
2043   pad = data->pad;
2044   parent = GST_OBJECT_PARENT (pad);
2045
2046   switch (GST_QUERY_TYPE (query)) {
2047     case GST_QUERY_SEEKING:
2048     {
2049       GstFormat format;
2050
2051       /* don't pass it along as some (file)sink might claim it does
2052        * whereas with a collectpads in between that will not likely work */
2053       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2054       gst_query_set_seeking (query, format, FALSE, 0, -1);
2055       res = TRUE;
2056       discard = TRUE;
2057       break;
2058     }
2059     default:
2060       break;
2061   }
2062
2063   if (!discard)
2064     return gst_pad_query_default (pad, parent, query);
2065   else
2066     return res;
2067 }
2068
2069 static gboolean
2070 gst_collect_pads_query_default_internal (GstCollectPads * pads,
2071     GstCollectData * data, GstQuery * query, gpointer user_data)
2072 {
2073   return gst_collect_pads_query_default (pads, data, query, FALSE);
2074 }
2075
2076 static gboolean
2077 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
2078 {
2079   gboolean res = FALSE, need_unlock = FALSE;
2080   GstCollectData *data;
2081   GstCollectPads *pads;
2082   GstCollectPadsQueryFunction query_func;
2083   gpointer query_user_data;
2084
2085   GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
2086       GST_QUERY_TYPE_NAME (query));
2087
2088   /* some magic to get the managing collect_pads */
2089   GST_OBJECT_LOCK (pad);
2090   data = (GstCollectData *) gst_pad_get_element_private (pad);
2091   if (G_UNLIKELY (data == NULL))
2092     goto pad_removed;
2093   ref_data (data);
2094   GST_OBJECT_UNLOCK (pad);
2095
2096   pads = data->collect;
2097
2098   GST_OBJECT_LOCK (pads);
2099   query_func = pads->priv->query_func;
2100   query_user_data = pads->priv->query_user_data;
2101   GST_OBJECT_UNLOCK (pads);
2102
2103   if (GST_QUERY_IS_SERIALIZED (query)) {
2104     GST_COLLECT_PADS_STREAM_LOCK (pads);
2105     need_unlock = TRUE;
2106   }
2107
2108   if (G_LIKELY (query_func)) {
2109     res = query_func (pads, data, query, query_user_data);
2110   }
2111
2112   if (need_unlock)
2113     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2114
2115   unref_data (data);
2116   return res;
2117
2118   /* ERRORS */
2119 pad_removed:
2120   {
2121     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2122     GST_OBJECT_UNLOCK (pad);
2123     return FALSE;
2124   }
2125 }
2126
2127
2128 /* For each buffer we receive we check if our collected condition is reached
2129  * and if so we call the collected function. When this is done we check if
2130  * data has been unqueued. If data is still queued we wait holding the stream
2131  * lock to make sure no EOS event can happen while we are ready to be
2132  * collected
2133  */
2134 static GstFlowReturn
2135 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2136 {
2137   GstCollectData *data;
2138   GstCollectPads *pads;
2139   GstFlowReturn ret;
2140   GstBuffer **buffer_p;
2141   guint32 cookie;
2142
2143   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2144
2145   /* some magic to get the managing collect_pads */
2146   GST_OBJECT_LOCK (pad);
2147   data = (GstCollectData *) gst_pad_get_element_private (pad);
2148   if (G_UNLIKELY (data == NULL))
2149     goto no_data;
2150   ref_data (data);
2151   GST_OBJECT_UNLOCK (pad);
2152
2153   pads = data->collect;
2154
2155   GST_COLLECT_PADS_STREAM_LOCK (pads);
2156   /* if not started, bail out */
2157   if (G_UNLIKELY (!pads->priv->started))
2158     goto not_started;
2159   /* check if this pad is flushing */
2160   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2161               GST_COLLECT_PADS_STATE_FLUSHING)))
2162     goto flushing;
2163   /* pad was EOS, we can refuse this data */
2164   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2165               GST_COLLECT_PADS_STATE_EOS)))
2166     goto eos;
2167
2168   /* see if we need to clip */
2169   if (pads->priv->clip_func) {
2170     GstBuffer *outbuf = NULL;
2171     ret =
2172         pads->priv->clip_func (pads, data, buffer, &outbuf,
2173         pads->priv->clip_user_data);
2174     buffer = outbuf;
2175
2176     if (G_UNLIKELY (outbuf == NULL))
2177       goto clipped;
2178
2179     if (G_UNLIKELY (ret == GST_FLOW_EOS))
2180       goto eos;
2181     else if (G_UNLIKELY (ret != GST_FLOW_OK))
2182       goto error;
2183   }
2184
2185   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
2186       GST_DEBUG_PAD_NAME (pad));
2187
2188   /* One more pad has data queued */
2189   if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
2190     pads->priv->queuedpads++;
2191   buffer_p = &data->buffer;
2192   gst_buffer_replace (buffer_p, buffer);
2193
2194   /* update segment last position if in TIME */
2195   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2196     GstClockTime timestamp;
2197
2198     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
2199
2200     if (GST_CLOCK_TIME_IS_VALID (timestamp))
2201       data->segment.position = timestamp;
2202   }
2203
2204   /* While we have data queued on this pad try to collect stuff */
2205   do {
2206     /* Check if our collected condition is matched and call the collected
2207      * function if it is */
2208     ret = gst_collect_pads_check_collected (pads);
2209     /* when an error occurs, we want to report this back to the caller ASAP
2210      * without having to block if the buffer was not popped */
2211     if (G_UNLIKELY (ret != GST_FLOW_OK))
2212       goto error;
2213
2214     /* data was consumed, we can exit and accept new data */
2215     if (data->buffer == NULL)
2216       break;
2217
2218     /* Having the _INIT here means we don't care about any broadcast up to here
2219      * (most of which occur with STREAM_LOCK held, so could not have happened
2220      * anyway).  We do care about e.g. a remove initiated broadcast as of this
2221      * point.  Putting it here also makes this thread ignores any evt it raised
2222      * itself (as is a usual WAIT semantic).
2223      */
2224     GST_COLLECT_PADS_EVT_INIT (cookie);
2225
2226     /* pad could be removed and re-added */
2227     unref_data (data);
2228     GST_OBJECT_LOCK (pad);
2229     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2230       goto pad_removed;
2231     ref_data (data);
2232     GST_OBJECT_UNLOCK (pad);
2233
2234     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2235         GST_DEBUG_PAD_NAME (pad));
2236
2237     /* wait to be collected, this must happen from another thread triggered
2238      * by the _chain function of another pad. We release the lock so we
2239      * can get stopped or flushed as well. We can however not get EOS
2240      * because we still hold the STREAM_LOCK.
2241      */
2242     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2243     GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2244     GST_COLLECT_PADS_STREAM_LOCK (pads);
2245
2246     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2247
2248     /* after a signal, we could be stopped */
2249     if (G_UNLIKELY (!pads->priv->started))
2250       goto not_started;
2251     /* check if this pad is flushing */
2252     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2253                 GST_COLLECT_PADS_STATE_FLUSHING)))
2254       goto flushing;
2255   }
2256   while (data->buffer != NULL);
2257
2258 unlock_done:
2259   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2260   /* data is definitely NULL if pad_removed goto was run. */
2261   if (data)
2262     unref_data (data);
2263   if (buffer)
2264     gst_buffer_unref (buffer);
2265   return ret;
2266
2267 pad_removed:
2268   {
2269     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2270     GST_OBJECT_UNLOCK (pad);
2271     ret = GST_FLOW_NOT_LINKED;
2272     goto unlock_done;
2273   }
2274   /* ERRORS */
2275 no_data:
2276   {
2277     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2278     GST_OBJECT_UNLOCK (pad);
2279     gst_buffer_unref (buffer);
2280     return GST_FLOW_NOT_LINKED;
2281   }
2282 not_started:
2283   {
2284     GST_DEBUG ("not started");
2285     gst_collect_pads_clear (pads, data);
2286     ret = GST_FLOW_FLUSHING;
2287     goto unlock_done;
2288   }
2289 flushing:
2290   {
2291     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2292     gst_collect_pads_clear (pads, data);
2293     ret = GST_FLOW_FLUSHING;
2294     goto unlock_done;
2295   }
2296 eos:
2297   {
2298     /* we should not post an error for this, just inform upstream that
2299      * we don't expect anything anymore */
2300     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2301     ret = GST_FLOW_EOS;
2302     goto unlock_done;
2303   }
2304 clipped:
2305   {
2306     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2307     ret = GST_FLOW_OK;
2308     goto unlock_done;
2309   }
2310 error:
2311   {
2312     /* we print the error, the element should post a reasonable error
2313      * message for fatal errors */
2314     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2315     gst_collect_pads_clear (pads, data);
2316     goto unlock_done;
2317   }
2318 }