a322df2071841e7e2c92c19a51e69edcc3c341a0
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:gstcollectpads
25  * @short_description: manages a set of pads that operate in collect mode
26  * @see_also:
27  *
28  * Manages a set of pads that operate in collect mode. This means that control
29  * is given to the manager of this object when all pads have data.
30  * <itemizedlist>
31  *   <listitem><para>
32  *     Collectpads are created with gst_collect_pads_new(). A callback should then
33  *     be installed with gst_collect_pads_set_function ().
34  *   </para></listitem>
35  *   <listitem><para>
36  *     Pads are added to the collection with gst_collect_pads_add_pad()/
37  *     gst_collect_pads_remove_pad(). The pad
38  *     has to be a sinkpad. The chain and event functions of the pad are
39  *     overridden. The element_private of the pad is used to store
40  *     private information for the collectpads.
41  *   </para></listitem>
42  *   <listitem><para>
43  *     For each pad, data is queued in the _chain function or by
44  *     performing a pull_range.
45  *   </para></listitem>
46  *   <listitem><para>
47  *     When data is queued on all pads in waiting mode, the callback function is called.
48  *   </para></listitem>
49  *   <listitem><para>
50  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
51  *     One can peek at the data with the gst_collect_pads_peek() function.
52  *     These functions will return NULL if the pad received an EOS event. When all
53  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
54  *     event itself.
55  *   </para></listitem>
56  *   <listitem><para>
57  *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
58  *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
59  *   </para></listitem>
60  *   <listitem><para>
61  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
62  *     their state change functions to start and stop the processing of the collectpads.
63  *     The gst_collect_pads_stop() call should be called before calling the parent
64  *     element state change function in the PAUSED_TO_READY state change to ensure
65  *     no pad is blocked and the element can finish streaming.
66  *   </para></listitem>
67  *   <listitem><para>
68  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
69  *     elements that start a #GstTask to drive the collect_pads. This feature is however
70  *     not yet implemented.
71  *   </para></listitem>
72  *   <listitem><para>
73  *     gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
74  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
75  *     Thus these pads may but need not have data when the callback is called.
76  *     All pads are in waiting mode by default.
77  *   </para></listitem>
78  * </itemizedlist>
79  *
80  * Last reviewed on 2011-10-28 (0.10.36)
81  */
82
83 #ifdef HAVE_CONFIG_H
84 #  include "config.h"
85 #endif
86
87 #include <gst/gst_private.h>
88
89 #include "gstcollectpads.h"
90
91 #include "../../../gst/glib-compat-private.h"
92
93 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
94 #define GST_CAT_DEFAULT collect_pads_debug
95
96 #define parent_class gst_collect_pads_parent_class
97 G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
98
99 struct _GstCollectDataPrivate
100 {
101   /* refcounting for struct, and destroy callback */
102   GstCollectDataDestroyNotify destroy_notify;
103   gint refcount;
104 };
105
106 struct _GstCollectPadsPrivate
107 {
108   /* with LOCK and/or STREAM_LOCK */
109   gboolean started;
110
111   /* with STREAM_LOCK */
112   guint32 cookie;               /* @data list cookie */
113   guint numpads;                /* number of pads in @data */
114   guint queuedpads;             /* number of pads with a buffer */
115   guint eospads;                /* number of pads that are EOS */
116   GstClockTime earliest_time;   /* Current earliest time */
117   GstCollectData *earliest_data;        /* Pad data for current earliest time */
118
119   /* with LOCK */
120   GSList *pad_list;             /* updated pad list */
121   guint32 pad_cookie;           /* updated cookie */
122
123   GstCollectPadsFunction func;  /* function and user_data for callback */
124   gpointer user_data;
125   GstCollectPadsBufferFunction buffer_func;     /* function and user_data for buffer callback */
126   gpointer buffer_user_data;
127   GstCollectPadsCompareFunction compare_func;
128   gpointer compare_user_data;
129   GstCollectPadsEventFunction event_func;       /* function and data for event callback */
130   gpointer event_user_data;
131   GstCollectPadsQueryFunction query_func;
132   gpointer query_user_data;
133   GstCollectPadsClipFunction clip_func;
134   gpointer clip_user_data;
135   GstCollectPadsFlushFunction flush_func;
136   gpointer flush_user_data;
137
138   /* no other lock needed */
139   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
140   GCond evt_cond;
141   guint32 evt_cookie;
142
143   gboolean seeking;
144   gboolean pending_flush_start;
145   gboolean pending_flush_stop;
146 };
147
148 static void gst_collect_pads_clear (GstCollectPads * pads,
149     GstCollectData * data);
150 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
151     GstBuffer * buffer);
152 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
153     GstEvent * event);
154 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
155     GstQuery * query);
156 static void gst_collect_pads_finalize (GObject * object);
157 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
158     pads, gpointer user_data);
159 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
160     GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
161     GstClockTime timestamp2, gpointer user_data);
162 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
163 static void ref_data (GstCollectData * data);
164 static void unref_data (GstCollectData * data);
165
166 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
167     pads, GstCollectData * data, GstEvent * event, gpointer user_data);
168 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
169     pads, GstCollectData * data, GstQuery * query, gpointer user_data);
170
171
172 /* Some properties are protected by LOCK, others by STREAM_LOCK
173  * However, manipulating either of these partitions may require
174  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
175  * Alternative implementations are possible, e.g. some low-level re-implementing
176  * of the 2 above locks to drop both of them atomically when going into _WAIT.
177  */
178 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
179 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
180 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START {    \
181   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
182   /* should work unless a lot of event'ing and thread starvation */\
183   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
184     g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
185         GST_COLLECT_PADS_GET_EVT_LOCK (pads));                    \
186   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
187   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
188 } G_STMT_END
189 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
190   GTimeVal __tv; \
191   \
192   g_get_current_time (&tv); \
193   g_time_val_add (&tv, timeout); \
194   \
195   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
196   /* should work unless a lot of event'ing and thread starvation */\
197   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
198     g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
199         GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv);                    \
200   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
201   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
202 } G_STMT_END
203 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START {       \
204   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
205   /* never mind wrap-around */                                     \
206   ++(((GstCollectPads *) pads)->priv->evt_cookie);                      \
207   g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads));        \
208   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
209 } G_STMT_END
210 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START {          \
211   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
212   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
213   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
214 } G_STMT_END
215
216 static void
217 gst_collect_pads_class_init (GstCollectPadsClass * klass)
218 {
219   GObjectClass *gobject_class = (GObjectClass *) klass;
220
221   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
222
223   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
224       "GstCollectPads");
225
226   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
227 }
228
229 static void
230 gst_collect_pads_init (GstCollectPads * pads)
231 {
232   pads->priv =
233       G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS,
234       GstCollectPadsPrivate);
235
236   pads->data = NULL;
237   pads->priv->cookie = 0;
238   pads->priv->numpads = 0;
239   pads->priv->queuedpads = 0;
240   pads->priv->eospads = 0;
241   pads->priv->started = FALSE;
242
243   g_rec_mutex_init (&pads->stream_lock);
244
245   pads->priv->func = gst_collect_pads_default_collected;
246   pads->priv->user_data = NULL;
247   pads->priv->event_func = NULL;
248   pads->priv->event_user_data = NULL;
249
250   /* members for default muxing */
251   pads->priv->buffer_func = NULL;
252   pads->priv->buffer_user_data = NULL;
253   pads->priv->compare_func = gst_collect_pads_default_compare_func;
254   pads->priv->compare_user_data = NULL;
255   pads->priv->earliest_data = NULL;
256   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
257
258   pads->priv->event_func = gst_collect_pads_event_default_internal;
259   pads->priv->query_func = gst_collect_pads_query_default_internal;
260
261   /* members to manage the pad list */
262   pads->priv->pad_cookie = 0;
263   pads->priv->pad_list = NULL;
264
265   /* members for event */
266   g_mutex_init (&pads->priv->evt_lock);
267   g_cond_init (&pads->priv->evt_cond);
268   pads->priv->evt_cookie = 0;
269
270   pads->priv->seeking = FALSE;
271   pads->priv->pending_flush_start = FALSE;
272   pads->priv->pending_flush_stop = FALSE;
273 }
274
275 static void
276 gst_collect_pads_finalize (GObject * object)
277 {
278   GstCollectPads *pads = GST_COLLECT_PADS (object);
279
280   GST_DEBUG_OBJECT (object, "finalize");
281
282   g_rec_mutex_clear (&pads->stream_lock);
283
284   g_cond_clear (&pads->priv->evt_cond);
285   g_mutex_clear (&pads->priv->evt_lock);
286
287   /* Remove pads and free pads list */
288   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
289   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
290   g_slist_free (pads->data);
291   g_slist_free (pads->priv->pad_list);
292
293   G_OBJECT_CLASS (parent_class)->finalize (object);
294 }
295
296 /**
297  * gst_collect_pads_new:
298  *
299  * Create a new instance of #GstCollectPads.
300  *
301  * MT safe.
302  *
303  * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
304  */
305 GstCollectPads *
306 gst_collect_pads_new (void)
307 {
308   GstCollectPads *newcoll;
309
310   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
311
312   return newcoll;
313 }
314
315 /* Must be called with GstObject lock! */
316 static void
317 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
318     GstCollectPadsBufferFunction func, gpointer user_data)
319 {
320   pads->priv->buffer_func = func;
321   pads->priv->buffer_user_data = user_data;
322 }
323
324 /**
325  * gst_collect_pads_set_buffer_function:
326  * @pads: the collectpads to use
327  * @func: the function to set
328  * @user_data: (closure): user data passed to the function
329  *
330  * Set the callback function and user data that will be called with
331  * the oldest buffer when all pads have been collected, or NULL on EOS.
332  * If a buffer is passed, the callback owns a reference and must unref
333  * it.
334  *
335  * MT safe.
336  */
337 void
338 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
339     GstCollectPadsBufferFunction func, gpointer user_data)
340 {
341   g_return_if_fail (pads != NULL);
342   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
343
344   GST_OBJECT_LOCK (pads);
345   gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
346   GST_OBJECT_UNLOCK (pads);
347 }
348
349 /**
350  * gst_collect_pads_set_compare_function:
351  * @pads: the pads to use
352  * @func: the function to set
353  * @user_data: (closure): user data passed to the function
354  *
355  * Set the timestamp comparison function.
356  *
357  * MT safe.
358  */
359 /* NOTE allowing to change comparison seems not advisable;
360 no known use-case, and collaboration with default algorithm is unpredictable.
361 If custom compairing/operation is needed, just use a collect function of
362 your own */
363 void
364 gst_collect_pads_set_compare_function (GstCollectPads * pads,
365     GstCollectPadsCompareFunction func, gpointer user_data)
366 {
367   g_return_if_fail (pads != NULL);
368   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
369
370   GST_OBJECT_LOCK (pads);
371   pads->priv->compare_func = func;
372   pads->priv->compare_user_data = user_data;
373   GST_OBJECT_UNLOCK (pads);
374 }
375
376 /**
377  * gst_collect_pads_set_function:
378  * @pads: the collectpads to use
379  * @func: the function to set
380  * @user_data: user data passed to the function
381  *
382  * CollectPads provides a default collection algorithm that will determine
383  * the oldest buffer available on all of its pads, and then delegate
384  * to a configured callback.
385  * However, if circumstances are more complicated and/or more control
386  * is desired, this sets a callback that will be invoked instead when
387  * all the pads added to the collection have buffers queued.
388  * Evidently, this callback is not compatible with
389  * gst_collect_pads_set_buffer_function() callback.
390  * If this callback is set, the former will be unset.
391  *
392  * MT safe.
393  */
394 void
395 gst_collect_pads_set_function (GstCollectPads * pads,
396     GstCollectPadsFunction func, gpointer user_data)
397 {
398   g_return_if_fail (pads != NULL);
399   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
400
401   GST_OBJECT_LOCK (pads);
402   pads->priv->func = func;
403   pads->priv->user_data = user_data;
404   gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
405   GST_OBJECT_UNLOCK (pads);
406 }
407
408 static void
409 ref_data (GstCollectData * data)
410 {
411   g_assert (data != NULL);
412
413   g_atomic_int_inc (&(data->priv->refcount));
414 }
415
416 static void
417 unref_data (GstCollectData * data)
418 {
419   g_assert (data != NULL);
420   g_assert (data->priv->refcount > 0);
421
422   if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
423     return;
424
425   if (data->priv->destroy_notify)
426     data->priv->destroy_notify (data);
427
428   g_object_unref (data->pad);
429   if (data->buffer) {
430     gst_buffer_unref (data->buffer);
431   }
432   g_free (data->priv);
433   g_free (data);
434 }
435
436 /**
437  * gst_collect_pads_set_event_function:
438  * @pads: the collectpads to use
439  * @func: the function to set
440  * @user_data: user data passed to the function
441  *
442  * Set the event callback function and user data that will be called when
443  * collectpads has received an event originating from one of the collected
444  * pads.  If the event being processed is a serialized one, this callback is
445  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
446  * held when calling a number of CollectPads functions, it should be acquired
447  * if so (unusually) needed.
448  *
449  * MT safe.
450  */
451 void
452 gst_collect_pads_set_event_function (GstCollectPads * pads,
453     GstCollectPadsEventFunction func, gpointer user_data)
454 {
455   g_return_if_fail (pads != NULL);
456   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
457
458   GST_OBJECT_LOCK (pads);
459   pads->priv->event_func = func;
460   pads->priv->event_user_data = user_data;
461   GST_OBJECT_UNLOCK (pads);
462 }
463
464 /**
465  * gst_collect_pads_set_query_function:
466  * @pads: the collectpads to use
467  * @func: the function to set
468  * @user_data: user data passed to the function
469  *
470  * Set the query callback function and user data that will be called after
471  * collectpads has received a query originating from one of the collected
472  * pads.  If the query being processed is a serialized one, this callback is
473  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
474  * held when calling a number of CollectPads functions, it should be acquired
475  * if so (unusually) needed.
476  *
477  * MT safe.
478  */
479 void
480 gst_collect_pads_set_query_function (GstCollectPads * pads,
481     GstCollectPadsQueryFunction func, gpointer user_data)
482 {
483   g_return_if_fail (pads != NULL);
484   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
485
486   GST_OBJECT_LOCK (pads);
487   pads->priv->query_func = func;
488   pads->priv->query_user_data = user_data;
489   GST_OBJECT_UNLOCK (pads);
490 }
491
492 /**
493 * gst_collect_pads_clip_running_time:
494 * @pads: the collectpads to use
495 * @cdata: collect data of corresponding pad
496 * @buf: buffer being clipped
497 * @outbuf: (allow-none): output buffer with running time, or NULL if clipped
498 * @user_data: user data (unused)
499 *
500 * Convenience clipping function that converts incoming buffer's timestamp
501 * to running time, or clips the buffer if outside configured segment.
502 */
503 GstFlowReturn
504 gst_collect_pads_clip_running_time (GstCollectPads * pads,
505     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
506     gpointer user_data)
507 {
508   GstClockTime time;
509
510   *outbuf = buf;
511   time = GST_BUFFER_PTS (buf);
512
513   /* invalid left alone and passed */
514   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
515     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
516     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
517       GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
518       gst_buffer_unref (buf);
519       *outbuf = NULL;
520     } else {
521       GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
522           GST_TIME_FORMAT " running time",
523           GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
524       *outbuf = gst_buffer_make_writable (buf);
525       GST_BUFFER_PTS (*outbuf) = time;
526       GST_BUFFER_DTS (*outbuf) = gst_segment_to_running_time (&cdata->segment,
527           GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf));
528     }
529   }
530
531   return GST_FLOW_OK;
532 }
533
534 /**
535  * gst_collect_pads_set_clip_function:
536  * @pads: the collectpads to use
537  * @clipfunc: clip function to install
538  * @user_data: user data to pass to @clip_func
539  *
540  * Install a clipping function that is called right after a buffer is received
541  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
542  */
543 void
544 gst_collect_pads_set_clip_function (GstCollectPads * pads,
545     GstCollectPadsClipFunction clipfunc, gpointer user_data)
546 {
547   g_return_if_fail (pads != NULL);
548   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
549
550   pads->priv->clip_func = clipfunc;
551   pads->priv->clip_user_data = user_data;
552 }
553
554 /**
555  * gst_collect_pads_set_flush_function:
556  * @pads: the collectpads to use
557  * @func: flush function to install
558  * @user_data: user data to pass to @func
559  *
560  * Install a flush function that is called when the internal
561  * state of all pads should be flushed as part of flushing seek
562  * handling. See #GstCollectPadsFlushFunction for more info.
563  *
564  * Since: 1.4
565  */
566 void
567 gst_collect_pads_set_flush_function (GstCollectPads * pads,
568     GstCollectPadsFlushFunction func, gpointer user_data)
569 {
570   g_return_if_fail (pads != NULL);
571   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
572
573   pads->priv->flush_func = func;
574   pads->priv->flush_user_data = user_data;
575 }
576
577 /**
578  * gst_collect_pads_add_pad:
579  * @pads: the collectpads to use
580  * @pad: (transfer none): the pad to add
581  * @size: the size of the returned #GstCollectData structure
582  * @destroy_notify: function to be called before the returned #GstCollectData
583  * structure is freed
584  * @lock: whether to lock this pad in usual waiting state
585  *
586  * Add a pad to the collection of collect pads. The pad has to be
587  * a sinkpad. The refcount of the pad is incremented. Use
588  * gst_collect_pads_remove_pad() to remove the pad from the collection
589  * again.
590  *
591  * You specify a size for the returned #GstCollectData structure
592  * so that you can use it to store additional information.
593  *
594  * You can also specify a #GstCollectDataDestroyNotify that will be called
595  * just before the #GstCollectData structure is freed. It is passed the
596  * pointer to the structure and should free any custom memory and resources
597  * allocated for it.
598  *
599  * Keeping a pad locked in waiting state is only relevant when using
600  * the default collection algorithm (providing the oldest buffer).
601  * It ensures a buffer must be available on this pad for a collection
602  * to take place.  This is of typical use to a muxer element where
603  * non-subtitle streams should always be in waiting state,
604  * e.g. to assure that caps information is available on all these streams
605  * when initial headers have to be written.
606  *
607  * The pad will be automatically activated in push mode when @pads is
608  * started.
609  *
610  * MT safe.
611  *
612  * Returns: a new #GstCollectData to identify the new pad. Or NULL
613  *   if wrong parameters are supplied.
614  */
615 GstCollectData *
616 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
617     GstCollectDataDestroyNotify destroy_notify, gboolean lock)
618 {
619   GstCollectData *data;
620
621   g_return_val_if_fail (pads != NULL, NULL);
622   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
623   g_return_val_if_fail (pad != NULL, NULL);
624   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
625   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
626
627   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
628
629   data = g_malloc0 (size);
630   data->priv = g_new0 (GstCollectDataPrivate, 1);
631   data->collect = pads;
632   data->pad = gst_object_ref (pad);
633   data->buffer = NULL;
634   data->pos = 0;
635   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
636   data->state = GST_COLLECT_PADS_STATE_WAITING;
637   data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
638   data->priv->refcount = 1;
639   data->priv->destroy_notify = destroy_notify;
640
641   GST_OBJECT_LOCK (pads);
642   GST_OBJECT_LOCK (pad);
643   gst_pad_set_element_private (pad, data);
644   GST_OBJECT_UNLOCK (pad);
645   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
646   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
647   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
648   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
649   /* backward compat, also add to data if stopped, so that the element already
650    * has this in the public data list before going PAUSED (typically)
651    * this can only be done when we are stopped because we don't take the
652    * STREAM_LOCK to protect the pads->data list. */
653   if (!pads->priv->started) {
654     pads->data = g_slist_append (pads->data, data);
655     ref_data (data);
656   }
657   /* activate the pad when needed */
658   if (pads->priv->started)
659     gst_pad_set_active (pad, TRUE);
660   pads->priv->pad_cookie++;
661   GST_OBJECT_UNLOCK (pads);
662
663   return data;
664 }
665
666 static gint
667 find_pad (GstCollectData * data, GstPad * pad)
668 {
669   if (data->pad == pad)
670     return 0;
671   return 1;
672 }
673
674 /**
675  * gst_collect_pads_remove_pad:
676  * @pads: the collectpads to use
677  * @pad: (transfer none): the pad to remove
678  *
679  * Remove a pad from the collection of collect pads. This function will also
680  * free the #GstCollectData and all the resources that were allocated with
681  * gst_collect_pads_add_pad().
682  *
683  * The pad will be deactivated automatically when @pads is stopped.
684  *
685  * MT safe.
686  *
687  * Returns: %TRUE if the pad could be removed.
688  */
689 gboolean
690 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
691 {
692   GstCollectData *data;
693   GSList *list;
694
695   g_return_val_if_fail (pads != NULL, FALSE);
696   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
697   g_return_val_if_fail (pad != NULL, FALSE);
698   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
699
700   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
701
702   GST_OBJECT_LOCK (pads);
703   list =
704       g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
705   if (!list)
706     goto unknown_pad;
707
708   data = (GstCollectData *) list->data;
709
710   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
711       data);
712
713   /* clear the stuff we configured */
714   gst_pad_set_chain_function (pad, NULL);
715   gst_pad_set_event_function (pad, NULL);
716   GST_OBJECT_LOCK (pad);
717   gst_pad_set_element_private (pad, NULL);
718   GST_OBJECT_UNLOCK (pad);
719
720   /* backward compat, also remove from data if stopped, note that this function
721    * can only be called when we are stopped because we don't take the
722    * STREAM_LOCK to protect the pads->data list. */
723   if (!pads->priv->started) {
724     GSList *dlist;
725
726     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
727     if (dlist) {
728       GstCollectData *pdata = dlist->data;
729
730       pads->data = g_slist_delete_link (pads->data, dlist);
731       unref_data (pdata);
732     }
733   }
734   /* remove from the pad list */
735   pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
736   pads->priv->pad_cookie++;
737
738   /* signal waiters because something changed */
739   GST_COLLECT_PADS_EVT_BROADCAST (pads);
740
741   /* deactivate the pad when needed */
742   if (!pads->priv->started)
743     gst_pad_set_active (pad, FALSE);
744
745   /* clean and free the collect data */
746   unref_data (data);
747
748   GST_OBJECT_UNLOCK (pads);
749
750   return TRUE;
751
752 unknown_pad:
753   {
754     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
755         GST_DEBUG_PAD_NAME (pad));
756     GST_OBJECT_UNLOCK (pads);
757     return FALSE;
758   }
759 }
760
761 /*
762  * Must be called with STREAM_LOCK and OBJECT_LOCK.
763  */
764 static void
765 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
766     gboolean flushing)
767 {
768   GSList *walk = NULL;
769
770   /* Update the pads flushing flag */
771   for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
772     GstCollectData *cdata = walk->data;
773
774     if (GST_IS_PAD (cdata->pad)) {
775       GST_OBJECT_LOCK (cdata->pad);
776       if (flushing)
777         GST_PAD_SET_FLUSHING (cdata->pad);
778       else
779         GST_PAD_UNSET_FLUSHING (cdata->pad);
780       if (flushing)
781         GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
782       else
783         GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
784       gst_collect_pads_clear (pads, cdata);
785       GST_OBJECT_UNLOCK (cdata->pad);
786     }
787   }
788
789   /* inform _chain of changes */
790   GST_COLLECT_PADS_EVT_BROADCAST (pads);
791 }
792
793 /**
794  * gst_collect_pads_set_flushing:
795  * @pads: the collectpads to use
796  * @flushing: desired state of the pads
797  *
798  * Change the flushing state of all the pads in the collection. No pad
799  * is able to accept anymore data when @flushing is %TRUE. Calling this
800  * function with @flushing %FALSE makes @pads accept data again.
801  * Caller must ensure that downstream streaming (thread) is not blocked,
802  * e.g. by sending a FLUSH_START downstream.
803  *
804  * MT safe.
805  */
806 void
807 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
808 {
809   g_return_if_fail (pads != NULL);
810   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
811
812   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
813   GST_COLLECT_PADS_STREAM_LOCK (pads);
814   GST_OBJECT_LOCK (pads);
815   gst_collect_pads_set_flushing_unlocked (pads, flushing);
816   GST_OBJECT_UNLOCK (pads);
817   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
818 }
819
820 /**
821  * gst_collect_pads_start:
822  * @pads: the collectpads to use
823  *
824  * Starts the processing of data in the collect_pads.
825  *
826  * MT safe.
827  */
828 void
829 gst_collect_pads_start (GstCollectPads * pads)
830 {
831   GSList *collected;
832
833   g_return_if_fail (pads != NULL);
834   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
835
836   GST_DEBUG_OBJECT (pads, "starting collect pads");
837
838   /* make sure stop and collect cannot be called anymore */
839   GST_COLLECT_PADS_STREAM_LOCK (pads);
840
841   /* make pads streamable */
842   GST_OBJECT_LOCK (pads);
843
844   /* loop over the master pad list and reset the segment */
845   collected = pads->priv->pad_list;
846   for (; collected; collected = g_slist_next (collected)) {
847     GstCollectData *data;
848
849     data = collected->data;
850     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
851   }
852
853   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
854
855   /* Start collect pads */
856   pads->priv->started = TRUE;
857   GST_OBJECT_UNLOCK (pads);
858   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
859 }
860
861 /**
862  * gst_collect_pads_stop:
863  * @pads: the collectpads to use
864  *
865  * Stops the processing of data in the collect_pads. this function
866  * will also unblock any blocking operations.
867  *
868  * MT safe.
869  */
870 void
871 gst_collect_pads_stop (GstCollectPads * pads)
872 {
873   GSList *collected;
874
875   g_return_if_fail (pads != NULL);
876   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
877
878   GST_DEBUG_OBJECT (pads, "stopping collect pads");
879
880   /* make sure collect and start cannot be called anymore */
881   GST_COLLECT_PADS_STREAM_LOCK (pads);
882
883   /* make pads not accept data anymore */
884   GST_OBJECT_LOCK (pads);
885   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
886
887   /* Stop collect pads */
888   pads->priv->started = FALSE;
889   pads->priv->eospads = 0;
890   pads->priv->queuedpads = 0;
891
892   /* loop over the master pad list and flush buffers */
893   collected = pads->priv->pad_list;
894   for (; collected; collected = g_slist_next (collected)) {
895     GstCollectData *data;
896     GstBuffer **buffer_p;
897
898     data = collected->data;
899     if (data->buffer) {
900       buffer_p = &data->buffer;
901       gst_buffer_replace (buffer_p, NULL);
902       data->pos = 0;
903     }
904     GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
905   }
906
907   if (pads->priv->earliest_data)
908     unref_data (pads->priv->earliest_data);
909   pads->priv->earliest_data = NULL;
910   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
911
912   GST_OBJECT_UNLOCK (pads);
913   /* Wake them up so they can end the chain functions. */
914   GST_COLLECT_PADS_EVT_BROADCAST (pads);
915
916   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
917 }
918
919 /**
920  * gst_collect_pads_peek:
921  * @pads: the collectpads to peek
922  * @data: the data to use
923  *
924  * Peek at the buffer currently queued in @data. This function
925  * should be called with the @pads STREAM_LOCK held, such as in the callback
926  * handler.
927  *
928  * MT safe.
929  *
930  * Returns: The buffer in @data or NULL if no buffer is queued.
931  *  should unref the buffer after usage.
932  */
933 GstBuffer *
934 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
935 {
936   GstBuffer *result;
937
938   g_return_val_if_fail (pads != NULL, NULL);
939   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
940   g_return_val_if_fail (data != NULL, NULL);
941
942   if ((result = data->buffer))
943     gst_buffer_ref (result);
944
945   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
946       GST_DEBUG_PAD_NAME (data->pad), result);
947
948   return result;
949 }
950
951 /**
952  * gst_collect_pads_pop:
953  * @pads: the collectpads to pop
954  * @data: the data to use
955  *
956  * Pop the buffer currently queued in @data. This function
957  * should be called with the @pads STREAM_LOCK held, such as in the callback
958  * handler.
959  *
960  * MT safe.
961  *
962  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
963  *   queued. You should unref the buffer after usage.
964  */
965 GstBuffer *
966 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
967 {
968   GstBuffer *result;
969
970   g_return_val_if_fail (pads != NULL, NULL);
971   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
972   g_return_val_if_fail (data != NULL, NULL);
973
974   if ((result = data->buffer)) {
975     data->buffer = NULL;
976     data->pos = 0;
977     /* one less pad with queued data now */
978     if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
979       pads->priv->queuedpads--;
980   }
981
982   GST_COLLECT_PADS_EVT_BROADCAST (pads);
983
984   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
985       GST_DEBUG_PAD_NAME (data->pad), result);
986
987   return result;
988 }
989
990 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
991  * held */
992 static void
993 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
994 {
995   GstBuffer *buf;
996
997   if ((buf = gst_collect_pads_pop (pads, data)))
998     gst_buffer_unref (buf);
999 }
1000
1001 /**
1002  * gst_collect_pads_available:
1003  * @pads: the collectpads to query
1004  *
1005  * Query how much bytes can be read from each queued buffer. This means
1006  * that the result of this call is the maximum number of bytes that can
1007  * be read from each of the pads.
1008  *
1009  * This function should be called with @pads STREAM_LOCK held, such as
1010  * in the callback.
1011  *
1012  * MT safe.
1013  *
1014  * Returns: The maximum number of bytes queued on all pads. This function
1015  * returns 0 if a pad has no queued buffer.
1016  */
1017 /* we might pre-calculate this in some struct field,
1018  * but would then have to maintain this in _chain and particularly _pop, etc,
1019  * even if element is never interested in this information */
1020 guint
1021 gst_collect_pads_available (GstCollectPads * pads)
1022 {
1023   GSList *collected;
1024   guint result = G_MAXUINT;
1025
1026   g_return_val_if_fail (pads != NULL, 0);
1027   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1028
1029   collected = pads->data;
1030   for (; collected; collected = g_slist_next (collected)) {
1031     GstCollectData *pdata;
1032     GstBuffer *buffer;
1033     gint size;
1034
1035     pdata = (GstCollectData *) collected->data;
1036
1037     /* ignore pad with EOS */
1038     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1039                 GST_COLLECT_PADS_STATE_EOS))) {
1040       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1041       continue;
1042     }
1043
1044     /* an empty buffer without EOS is weird when we get here.. */
1045     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1046       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1047       goto not_filled;
1048     }
1049
1050     /* this is the size left of the buffer */
1051     size = gst_buffer_get_size (buffer) - pdata->pos;
1052     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1053
1054     /* need to return the min of all available data */
1055     if (size < result)
1056       result = size;
1057   }
1058   /* nothing changed, all must be EOS then, return 0 */
1059   if (G_UNLIKELY (result == G_MAXUINT))
1060     result = 0;
1061
1062   return result;
1063
1064 not_filled:
1065   {
1066     return 0;
1067   }
1068 }
1069
1070 /**
1071  * gst_collect_pads_flush:
1072  * @pads: the collectpads to query
1073  * @data: the data to use
1074  * @size: the number of bytes to flush
1075  *
1076  * Flush @size bytes from the pad @data.
1077  *
1078  * This function should be called with @pads STREAM_LOCK held, such as
1079  * in the callback.
1080  *
1081  * MT safe.
1082  *
1083  * Returns: The number of bytes flushed This can be less than @size and
1084  * is 0 if the pad was end-of-stream.
1085  */
1086 guint
1087 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1088     guint size)
1089 {
1090   guint flushsize;
1091   gsize bsize;
1092   GstBuffer *buffer;
1093
1094   g_return_val_if_fail (pads != NULL, 0);
1095   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1096   g_return_val_if_fail (data != NULL, 0);
1097
1098   /* no buffer, must be EOS */
1099   if ((buffer = data->buffer) == NULL)
1100     return 0;
1101
1102   bsize = gst_buffer_get_size (buffer);
1103
1104   /* this is what we can flush at max */
1105   flushsize = MIN (size, bsize - data->pos);
1106
1107   data->pos += size;
1108
1109   if (data->pos >= bsize)
1110     /* _clear will also reset data->pos to 0 */
1111     gst_collect_pads_clear (pads, data);
1112
1113   return flushsize;
1114 }
1115
1116 /**
1117  * gst_collect_pads_read_buffer:
1118  * @pads: the collectpads to query
1119  * @data: the data to use
1120  * @size: the number of bytes to read
1121  *
1122  * Get a subbuffer of @size bytes from the given pad @data.
1123  *
1124  * This function should be called with @pads STREAM_LOCK held, such as in the
1125  * callback.
1126  *
1127  * MT safe.
1128  *
1129  * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
1130  * A return of NULL signals that the pad is end-of-stream.
1131  * Unref the buffer after use.
1132  */
1133 GstBuffer *
1134 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1135     guint size)
1136 {
1137   guint readsize;
1138   GstBuffer *buffer;
1139
1140   g_return_val_if_fail (pads != NULL, NULL);
1141   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1142   g_return_val_if_fail (data != NULL, NULL);
1143
1144   /* no buffer, must be EOS */
1145   if ((buffer = data->buffer) == NULL)
1146     return NULL;
1147
1148   readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
1149
1150   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1151       readsize);
1152 }
1153
1154 /**
1155  * gst_collect_pads_take_buffer:
1156  * @pads: the collectpads to query
1157  * @data: the data to use
1158  * @size: the number of bytes to read
1159  *
1160  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1161  * of read bytes.
1162  *
1163  * This function should be called with @pads STREAM_LOCK held, such as in the
1164  * callback.
1165  *
1166  * MT safe.
1167  *
1168  * Returns: A sub buffer. The size of the buffer can be less that requested.
1169  * A return of NULL signals that the pad is end-of-stream.
1170  * Unref the buffer after use.
1171  */
1172 GstBuffer *
1173 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1174     guint size)
1175 {
1176   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1177
1178   if (buffer) {
1179     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1180   }
1181   return buffer;
1182 }
1183
1184 /**
1185  * gst_collect_pads_set_waiting:
1186  * @pads: the collectpads
1187  * @data: the data to use
1188  * @waiting: boolean indicating whether this pad should operate
1189  *           in waiting or non-waiting mode
1190  *
1191  * Sets a pad to waiting or non-waiting mode, if at least this pad
1192  * has not been created with locked waiting state,
1193  * in which case nothing happens.
1194  *
1195  * This function should be called with @pads STREAM_LOCK held, such as
1196  * in the callback.
1197  *
1198  * MT safe.
1199  */
1200 void
1201 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1202     gboolean waiting)
1203 {
1204   g_return_if_fail (pads != NULL);
1205   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1206   g_return_if_fail (data != NULL);
1207
1208   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1209       GST_PAD_NAME (data->pad), waiting,
1210       GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1211
1212   /* Do something only on a change and if not locked */
1213   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1214       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1215           ! !waiting)) {
1216     /* Set waiting state for this pad */
1217     if (waiting)
1218       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1219     else
1220       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1221     /* Update number of queued pads if needed */
1222     if (!data->buffer &&
1223         !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1224       if (waiting)
1225         pads->priv->queuedpads--;
1226       else
1227         pads->priv->queuedpads++;
1228     }
1229
1230     /* signal waiters because something changed */
1231     GST_COLLECT_PADS_EVT_BROADCAST (pads);
1232   }
1233 }
1234
1235 /* see if pads were added or removed and update our stats. Any pad
1236  * added after releasing the LOCK will get collected in the next
1237  * round.
1238  *
1239  * We can do a quick check by checking the cookies, that get changed
1240  * whenever the pad list is updated.
1241  *
1242  * Must be called with STREAM_LOCK.
1243  */
1244 static void
1245 gst_collect_pads_check_pads (GstCollectPads * pads)
1246 {
1247   /* the master list and cookie are protected with LOCK */
1248   GST_OBJECT_LOCK (pads);
1249   if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1250     GSList *collected;
1251
1252     /* clear list and stats */
1253     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1254     g_slist_free (pads->data);
1255     pads->data = NULL;
1256     pads->priv->numpads = 0;
1257     pads->priv->queuedpads = 0;
1258     pads->priv->eospads = 0;
1259     if (pads->priv->earliest_data)
1260       unref_data (pads->priv->earliest_data);
1261     pads->priv->earliest_data = NULL;
1262     pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1263
1264     /* loop over the master pad list */
1265     collected = pads->priv->pad_list;
1266     for (; collected; collected = g_slist_next (collected)) {
1267       GstCollectData *data;
1268
1269       /* update the stats */
1270       pads->priv->numpads++;
1271       data = collected->data;
1272       if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1273         pads->priv->eospads++;
1274       else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1275               GST_COLLECT_PADS_STATE_WAITING))
1276         pads->priv->queuedpads++;
1277
1278       /* add to the list of pads to collect */
1279       ref_data (data);
1280       /* preserve order of adding/requesting pads */
1281       pads->data = g_slist_append (pads->data, data);
1282     }
1283     /* and update the cookie */
1284     pads->priv->cookie = pads->priv->pad_cookie;
1285   }
1286   GST_OBJECT_UNLOCK (pads);
1287 }
1288
1289 /* checks if all the pads are collected and call the collectfunction
1290  *
1291  * Should be called with STREAM_LOCK.
1292  *
1293  * Returns: The #GstFlowReturn of collection.
1294  */
1295 static GstFlowReturn
1296 gst_collect_pads_check_collected (GstCollectPads * pads)
1297 {
1298   GstFlowReturn flow_ret = GST_FLOW_OK;
1299   GstCollectPadsFunction func;
1300   gpointer user_data;
1301
1302   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1303
1304   GST_OBJECT_LOCK (pads);
1305   func = pads->priv->func;
1306   user_data = pads->priv->user_data;
1307   GST_OBJECT_UNLOCK (pads);
1308
1309   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1310
1311   /* check for new pads, update stats etc.. */
1312   gst_collect_pads_check_pads (pads);
1313
1314   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1315     /* If all our pads are EOS just collect once to let the element
1316      * do its final EOS handling. */
1317     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1318         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1319
1320     if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1321                 TRUE, FALSE) == TRUE)) {
1322       GST_INFO_OBJECT (pads, "finished seeking");
1323     }
1324     do {
1325       flow_ret = func (pads, user_data);
1326     } while (flow_ret == GST_FLOW_OK);
1327   } else {
1328     gboolean collected = FALSE;
1329
1330     /* We call the collected function as long as our condition matches. */
1331     while (((pads->priv->queuedpads + pads->priv->eospads) >=
1332             pads->priv->numpads)) {
1333       GST_DEBUG_OBJECT (pads,
1334           "All active pads (%d + %d >= %d) have data, " "calling %s",
1335           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1336           GST_DEBUG_FUNCPTR_NAME (func));
1337
1338       if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1339                   TRUE, FALSE) == TRUE)) {
1340         GST_INFO_OBJECT (pads, "finished seeking");
1341       }
1342       flow_ret = func (pads, user_data);
1343       collected = TRUE;
1344
1345       /* break on error */
1346       if (flow_ret != GST_FLOW_OK)
1347         break;
1348       /* Don't keep looping after telling the element EOS or flushing */
1349       if (pads->priv->queuedpads == 0)
1350         break;
1351     }
1352     if (!collected)
1353       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1354           pads->priv->numpads);
1355   }
1356   return flow_ret;
1357 }
1358
1359
1360 /* General overview:
1361  * - only pad with a buffer can determine earliest_data (and earliest_time)
1362  * - only segment info determines (non-)waiting state
1363  * - ? perhaps use _stream_time for comparison
1364  *   (which muxers might have use as well ?)
1365  */
1366
1367 /*
1368  * Function to recalculate the waiting state of all pads.
1369  *
1370  * Must be called with STREAM_LOCK.
1371  *
1372  * Returns TRUE if a pad was set to waiting
1373  * (from non-waiting state).
1374  */
1375 static gboolean
1376 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1377 {
1378   GSList *collected;
1379   gboolean result = FALSE;
1380
1381   /* If earliest time is not known, there is nothing to do. */
1382   if (pads->priv->earliest_data == NULL)
1383     return FALSE;
1384
1385   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1386     GstCollectData *data = (GstCollectData *) collected->data;
1387     int cmp_res;
1388     GstClockTime comp_time;
1389
1390     /* check if pad has a segment */
1391     if (data->segment.format == GST_FORMAT_UNDEFINED) {
1392       GST_WARNING_OBJECT (pads,
1393           "GstCollectPads has no time segment, assuming 0 based.");
1394       gst_segment_init (&data->segment, GST_FORMAT_TIME);
1395       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1396     }
1397
1398     /* check segment format */
1399     if (data->segment.format != GST_FORMAT_TIME) {
1400       GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1401       continue;
1402     }
1403
1404     /* check if the waiting state should be changed */
1405     comp_time = data->segment.position;
1406     cmp_res = pads->priv->compare_func (pads, data, comp_time,
1407         pads->priv->earliest_data, pads->priv->earliest_time,
1408         pads->priv->compare_user_data);
1409     if (cmp_res > 0)
1410       /* stop waiting */
1411       gst_collect_pads_set_waiting (pads, data, FALSE);
1412     else {
1413       if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1414         /* start waiting */
1415         gst_collect_pads_set_waiting (pads, data, TRUE);
1416         result = TRUE;
1417       }
1418     }
1419   }
1420
1421   return result;
1422 }
1423
1424 /**
1425  * gst_collect_pads_find_best_pad:
1426  * @pads: the collectpads to use
1427  * @data: returns the collectdata for earliest data
1428  * @time: returns the earliest available buffertime
1429  *
1430  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1431  * and return the corresponding #GstCollectData and buffertime.
1432  *
1433  * This function should be called with STREAM_LOCK held,
1434  * such as in the callback.
1435  */
1436 static void
1437 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1438     GstCollectData ** data, GstClockTime * time)
1439 {
1440   GSList *collected;
1441   GstCollectData *best = NULL;
1442   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1443
1444   g_return_if_fail (data != NULL);
1445   g_return_if_fail (time != NULL);
1446
1447   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1448     GstBuffer *buffer;
1449     GstCollectData *data = (GstCollectData *) collected->data;
1450     GstClockTime timestamp;
1451
1452     buffer = gst_collect_pads_peek (pads, data);
1453     /* if we have a buffer check if it is better then the current best one */
1454     if (buffer != NULL) {
1455       timestamp = GST_BUFFER_DTS (buffer);
1456       if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1457         timestamp = GST_BUFFER_PTS (buffer);
1458       }
1459       gst_buffer_unref (buffer);
1460       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1461               best, best_time, pads->priv->compare_user_data) < 0) {
1462         best = data;
1463         best_time = timestamp;
1464       }
1465     }
1466   }
1467
1468   /* set earliest time */
1469   *data = best;
1470   *time = best_time;
1471
1472   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1473       best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1474       GST_TIME_ARGS (best_time));
1475 }
1476
1477 /*
1478  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1479  * gst_collect_pads_recalculate_waiting
1480  *
1481  * Must be called with STREAM_LOCK.
1482  */
1483 static gboolean
1484 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1485 {
1486   if (pads->priv->earliest_data)
1487     unref_data (pads->priv->earliest_data);
1488   gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1489       &pads->priv->earliest_time);
1490   if (pads->priv->earliest_data)
1491     ref_data (pads->priv->earliest_data);
1492   return gst_collect_pads_recalculate_waiting (pads);
1493 }
1494
1495 /*
1496  * Default collect callback triggered when #GstCollectPads gathered all data.
1497  *
1498  * Called with STREAM_LOCK.
1499  */
1500 static GstFlowReturn
1501 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1502 {
1503   GstCollectData *best = NULL;
1504   GstBuffer *buffer;
1505   GstFlowReturn ret = GST_FLOW_OK;
1506   GstCollectPadsBufferFunction func;
1507   gpointer buffer_user_data;
1508
1509   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1510
1511   GST_OBJECT_LOCK (pads);
1512   func = pads->priv->buffer_func;
1513   buffer_user_data = pads->priv->buffer_user_data;
1514   GST_OBJECT_UNLOCK (pads);
1515
1516   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1517
1518   /* Find the oldest pad at all cost */
1519   if (gst_collect_pads_recalculate_full (pads)) {
1520     /* waiting was switched on,
1521      * so give another thread a chance to deliver a possibly
1522      * older buffer; don't charge on yet with the current oldest */
1523     ret = GST_FLOW_OK;
1524     goto done;
1525   }
1526
1527   best = pads->priv->earliest_data;
1528
1529   /* No data collected means EOS. */
1530   if (G_UNLIKELY (best == NULL)) {
1531     ret = func (pads, best, NULL, buffer_user_data);
1532     if (ret == GST_FLOW_OK)
1533       ret = GST_FLOW_EOS;
1534     goto done;
1535   }
1536
1537   /* make sure that the pad we take a buffer from is waiting;
1538    * otherwise popping a buffer will seem not to have happened
1539    * and collectpads can get into a busy loop */
1540   gst_collect_pads_set_waiting (pads, best, TRUE);
1541
1542   /* Send buffer */
1543   buffer = gst_collect_pads_pop (pads, best);
1544   ret = func (pads, best, buffer, buffer_user_data);
1545
1546   /* maybe non-waiting was forced to waiting above due to
1547    * newsegment events coming too sparsely,
1548    * so re-check to restore state to avoid hanging/waiting */
1549   gst_collect_pads_recalculate_full (pads);
1550
1551 done:
1552   return ret;
1553 }
1554
1555 /*
1556  * Default timestamp compare function.
1557  */
1558 static gint
1559 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1560     GstCollectData * data1, GstClockTime timestamp1,
1561     GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1562 {
1563
1564   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1565       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1566       GST_TIME_ARGS (timestamp2));
1567   /* non-valid timestamps go first as they are probably headers or so */
1568   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1569     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1570
1571   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1572     return 1;
1573
1574   /* compare timestamp */
1575   if (timestamp1 < timestamp2)
1576     return -1;
1577
1578   if (timestamp1 > timestamp2)
1579     return 1;
1580
1581   return 0;
1582 }
1583
1584 /* called with STREAM_LOCK */
1585 static void
1586 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1587     GstCollectData * data, GstClockTime new_pos)
1588 {
1589   gint cmp_res;
1590
1591   /* If oldest time is not known, or current pad got newsegment;
1592    * recalculate the state */
1593   if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1594     gst_collect_pads_recalculate_full (pads);
1595     goto exit;
1596   }
1597
1598   /* Check if the waiting state of the pad should change. */
1599   cmp_res =
1600       pads->priv->compare_func (pads, data, new_pos,
1601       pads->priv->earliest_data, pads->priv->earliest_time,
1602       pads->priv->compare_user_data);
1603
1604   if (cmp_res > 0)
1605     /* Stop waiting */
1606     gst_collect_pads_set_waiting (pads, data, FALSE);
1607
1608 exit:
1609   return;
1610
1611 }
1612
1613 static GstClockTime
1614 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1615     GstClockTime time)
1616 {
1617   GstClockTime otime = time;
1618   GstBuffer *in, *out = NULL;
1619
1620   if (pads->priv->clip_func) {
1621     in = gst_buffer_new ();
1622     GST_BUFFER_PTS (in) = time;
1623     GST_BUFFER_DTS (in) = time;
1624     pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1625     if (out) {
1626       otime = GST_BUFFER_PTS (out);
1627       gst_buffer_unref (out);
1628     } else {
1629       /* FIXME should distinguish between ahead or after segment,
1630        * let's assume after segment and use some large time ... */
1631       otime = G_MAXINT64 / 2;
1632     }
1633   }
1634
1635   return otime;
1636 }
1637
1638 /**
1639  * gst_collect_pads_event_default:
1640  * @pads: the collectpads to use
1641  * @data: collect data of corresponding pad
1642  * @event: event being processed
1643  * @discard: process but do not send event downstream
1644  *
1645  * Default GstCollectPads event handling that elements should always
1646  * chain up to to ensure proper operation.  Element might however indicate
1647  * event should not be forwarded downstream.
1648  */
1649 gboolean
1650 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1651     GstEvent * event, gboolean discard)
1652 {
1653   gboolean res = TRUE;
1654   GstCollectPadsBufferFunction buffer_func;
1655   GstObject *parent;
1656   GstPad *pad;
1657
1658   GST_OBJECT_LOCK (pads);
1659   buffer_func = pads->priv->buffer_func;
1660   GST_OBJECT_UNLOCK (pads);
1661
1662   pad = data->pad;
1663   parent = GST_OBJECT_PARENT (pad);
1664
1665   switch (GST_EVENT_TYPE (event)) {
1666     case GST_EVENT_FLUSH_START:
1667     {
1668       if (g_atomic_int_get (&pads->priv->seeking)) {
1669         /* drop all but the first FLUSH_STARTs when seeking */
1670         if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_start,
1671                 TRUE, FALSE) == FALSE)
1672           goto eat;
1673
1674         /* unblock collect pads */
1675         gst_pad_event_default (pad, parent, event);
1676         event = NULL;
1677
1678         GST_COLLECT_PADS_STREAM_LOCK (pads);
1679         /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
1680          * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
1681          * non-flushing (which happens in gst_collect_pads_event_default).
1682          */
1683         gst_collect_pads_set_flushing (pads, TRUE);
1684
1685         if (pads->priv->flush_func)
1686           pads->priv->flush_func (pads, pads->priv->flush_user_data);
1687
1688         g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
1689         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1690
1691         goto eat;
1692       } else {
1693         /* forward event to unblock check_collected */
1694         GST_DEBUG_OBJECT (pad, "forwarding flush start");
1695         res = gst_pad_event_default (pad, parent, event);
1696         event = NULL;
1697
1698         /* now unblock the chain function.
1699          * no cond per pad, so they all unblock,
1700          * non-flushing block again */
1701         GST_COLLECT_PADS_STREAM_LOCK (pads);
1702         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1703         gst_collect_pads_clear (pads, data);
1704
1705         /* cater for possible default muxing functionality */
1706         if (buffer_func) {
1707           /* restore to initial state */
1708           gst_collect_pads_set_waiting (pads, data, TRUE);
1709           /* if the current pad is affected, reset state, recalculate later */
1710           if (pads->priv->earliest_data == data) {
1711             unref_data (data);
1712             pads->priv->earliest_data = NULL;
1713             pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1714           }
1715         }
1716
1717         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1718
1719         goto eat;
1720       }
1721     }
1722     case GST_EVENT_FLUSH_STOP:
1723     {
1724       /* flush the 1 buffer queue */
1725       GST_COLLECT_PADS_STREAM_LOCK (pads);
1726       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1727       gst_collect_pads_clear (pads, data);
1728       /* we need new segment info after the flush */
1729       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1730       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1731       /* if the pad was EOS, remove the EOS flag and
1732        * decrement the number of eospads */
1733       if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1734                   GST_COLLECT_PADS_STATE_EOS))) {
1735         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1736                 GST_COLLECT_PADS_STATE_WAITING))
1737           pads->priv->queuedpads++;
1738         pads->priv->eospads--;
1739         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1740       }
1741       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1742
1743       if (g_atomic_int_get (&pads->priv->seeking)) {
1744         if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
1745                 TRUE, FALSE))
1746           goto forward;
1747         else
1748           goto eat;
1749       } else {
1750         goto forward;
1751       }
1752     }
1753     case GST_EVENT_EOS:
1754     {
1755       GST_COLLECT_PADS_STREAM_LOCK (pads);
1756       /* if the pad was not EOS, make it EOS and so we
1757        * have one more eospad */
1758       if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1759                   GST_COLLECT_PADS_STATE_EOS))) {
1760         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1761         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1762                 GST_COLLECT_PADS_STATE_WAITING))
1763           pads->priv->queuedpads--;
1764         pads->priv->eospads++;
1765       }
1766       /* check if we need collecting anything, we ignore the result. */
1767       gst_collect_pads_check_collected (pads);
1768       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1769
1770       goto eat;
1771     }
1772     case GST_EVENT_SEGMENT:
1773     {
1774       GstSegment seg;
1775
1776       GST_COLLECT_PADS_STREAM_LOCK (pads);
1777
1778       gst_event_copy_segment (event, &seg);
1779
1780       GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1781
1782       /* default collection can not handle other segment formats than time */
1783       if (buffer_func && seg.format != GST_FORMAT_TIME) {
1784         GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1785             "can only handle time segments. Non time segment ignored.");
1786         goto newsegment_done;
1787       }
1788
1789       /* need to update segment first */
1790       data->segment = seg;
1791       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1792
1793       /* now we can use for e.g. running time */
1794       seg.position = gst_collect_pads_clip_time (pads, data, seg.start);
1795       /* update again */
1796       data->segment = seg;
1797
1798       /* default muxing functionality */
1799       if (!buffer_func)
1800         goto newsegment_done;
1801
1802       gst_collect_pads_handle_position_update (pads, data, seg.position);
1803
1804     newsegment_done:
1805       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1806       /* we must not forward this event since multiple segments will be
1807        * accumulated and this is certainly not what we want. */
1808       goto eat;
1809     }
1810     case GST_EVENT_GAP:
1811     {
1812       GstClockTime start, duration;
1813
1814       GST_COLLECT_PADS_STREAM_LOCK (pads);
1815
1816       gst_event_parse_gap (event, &start, &duration);
1817       if (GST_CLOCK_TIME_IS_VALID (duration))
1818         start += duration;
1819       /* we do not expect another buffer until after gap,
1820        * so that is our position now */
1821       data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1822
1823       gst_collect_pads_handle_position_update (pads, data,
1824           data->segment.position);
1825
1826       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1827       goto eat;
1828     }
1829     case GST_EVENT_STREAM_START:
1830       /* drop stream start events, element must create its own start event,
1831        * we can't just forward the first random stream start event we get */
1832       goto eat;
1833     case GST_EVENT_CAPS:
1834       goto eat;
1835     default:
1836       /* forward other events */
1837       goto forward;
1838   }
1839
1840 eat:
1841   if (event)
1842     gst_event_unref (event);
1843   return res;
1844
1845 forward:
1846   if (discard)
1847     goto eat;
1848   else
1849     return gst_pad_event_default (pad, parent, event);
1850 }
1851
1852 typedef struct
1853 {
1854   GstEvent *event;
1855   gboolean result;
1856 } EventData;
1857
1858 static gboolean
1859 event_forward_func (GstPad * pad, EventData * data)
1860 {
1861   data->result &= gst_pad_push_event (pad, gst_event_ref (data->event));
1862
1863   /* Always send to all pads */
1864   return FALSE;
1865 }
1866
1867 static gboolean
1868 forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
1869 {
1870   EventData data;
1871
1872   data.event = event;
1873   data.result = TRUE;
1874
1875   gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
1876
1877   gst_event_unref (event);
1878
1879   return data.result;
1880 }
1881
1882 /**
1883  * gst_collect_pads_src_event_default:
1884  * @pads: the collectpads to use
1885  * @pad: src #GstPad that received the event
1886  * @event: event being processed
1887  *
1888  * Default GstCollectPads event handling for the src pad of elements.
1889  * Elements can chain up to this to let flushing seek event handling
1890  * be done by GstCollectPads.
1891  *
1892  * Since: 1.4
1893  */
1894 gboolean
1895 gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
1896     GstEvent * event)
1897 {
1898   GstObject *parent;
1899   gboolean res = TRUE;
1900
1901   parent = GST_OBJECT_PARENT (pad);
1902
1903   switch (GST_EVENT_TYPE (event)) {
1904     case GST_EVENT_SEEK:{
1905       GstSeekFlags flags;
1906
1907       GST_INFO_OBJECT (pads, "starting seek");
1908
1909       gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
1910       if (flags & GST_SEEK_FLAG_FLUSH) {
1911         g_atomic_int_set (&pads->priv->seeking, TRUE);
1912         g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
1913         /* forward the seek upstream */
1914         res = forward_event_to_all_sinkpads (pad, event);
1915         event = NULL;
1916         if (!res) {
1917           g_atomic_int_set (&pads->priv->seeking, FALSE);
1918           g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
1919         }
1920       }
1921
1922       GST_INFO_OBJECT (pads, "seek done, result: %d", res);
1923
1924       break;
1925     }
1926     default:
1927       break;
1928   }
1929
1930   if (event)
1931     res = gst_pad_event_default (pad, parent, event);
1932
1933   return res;
1934 }
1935
1936 static gboolean
1937 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1938     GstCollectData * data, GstEvent * event, gpointer user_data)
1939 {
1940   return gst_collect_pads_event_default (pads, data, event, FALSE);
1941 }
1942
1943 static gboolean
1944 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1945 {
1946   gboolean res = FALSE, need_unlock = FALSE;
1947   GstCollectData *data;
1948   GstCollectPads *pads;
1949   GstCollectPadsEventFunction event_func;
1950   gpointer event_user_data;
1951
1952   /* some magic to get the managing collect_pads */
1953   GST_OBJECT_LOCK (pad);
1954   data = (GstCollectData *) gst_pad_get_element_private (pad);
1955   if (G_UNLIKELY (data == NULL))
1956     goto pad_removed;
1957   ref_data (data);
1958   GST_OBJECT_UNLOCK (pad);
1959
1960   res = FALSE;
1961
1962   pads = data->collect;
1963
1964   GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1965       GST_EVENT_TYPE_NAME (event));
1966
1967   GST_OBJECT_LOCK (pads);
1968   event_func = pads->priv->event_func;
1969   event_user_data = pads->priv->event_user_data;
1970   GST_OBJECT_UNLOCK (pads);
1971
1972   if (GST_EVENT_IS_SERIALIZED (event)) {
1973     GST_COLLECT_PADS_STREAM_LOCK (pads);
1974     need_unlock = TRUE;
1975   }
1976
1977   if (G_LIKELY (event_func)) {
1978     res = event_func (pads, data, event, event_user_data);
1979   }
1980
1981   if (need_unlock)
1982     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1983
1984   unref_data (data);
1985   return res;
1986
1987   /* ERRORS */
1988 pad_removed:
1989   {
1990     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1991     GST_OBJECT_UNLOCK (pad);
1992     return FALSE;
1993   }
1994 }
1995
1996 /**
1997  * gst_collect_pads_query_default:
1998  * @pads: the collectpads to use
1999  * @data: collect data of corresponding pad
2000  * @query: query being processed
2001  * @discard: process but do not send event downstream
2002  *
2003  * Default GstCollectPads query handling that elements should always
2004  * chain up to to ensure proper operation.  Element might however indicate
2005  * query should not be forwarded downstream.
2006  */
2007 gboolean
2008 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
2009     GstQuery * query, gboolean discard)
2010 {
2011   gboolean res = TRUE;
2012   GstObject *parent;
2013   GstPad *pad;
2014
2015   pad = data->pad;
2016   parent = GST_OBJECT_PARENT (pad);
2017
2018   switch (GST_QUERY_TYPE (query)) {
2019     case GST_QUERY_SEEKING:
2020     {
2021       GstFormat format;
2022
2023       /* don't pass it along as some (file)sink might claim it does
2024        * whereas with a collectpads in between that will not likely work */
2025       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2026       gst_query_set_seeking (query, format, FALSE, 0, -1);
2027       res = TRUE;
2028       discard = TRUE;
2029       break;
2030     }
2031     default:
2032       break;
2033   }
2034
2035   if (!discard)
2036     return gst_pad_query_default (pad, parent, query);
2037   else
2038     return res;
2039 }
2040
2041 static gboolean
2042 gst_collect_pads_query_default_internal (GstCollectPads * pads,
2043     GstCollectData * data, GstQuery * query, gpointer user_data)
2044 {
2045   return gst_collect_pads_query_default (pads, data, query, FALSE);
2046 }
2047
2048 static gboolean
2049 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
2050 {
2051   gboolean res = FALSE, need_unlock = FALSE;
2052   GstCollectData *data;
2053   GstCollectPads *pads;
2054   GstCollectPadsQueryFunction query_func;
2055   gpointer query_user_data;
2056
2057   GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
2058       GST_QUERY_TYPE_NAME (query));
2059
2060   /* some magic to get the managing collect_pads */
2061   GST_OBJECT_LOCK (pad);
2062   data = (GstCollectData *) gst_pad_get_element_private (pad);
2063   if (G_UNLIKELY (data == NULL))
2064     goto pad_removed;
2065   ref_data (data);
2066   GST_OBJECT_UNLOCK (pad);
2067
2068   pads = data->collect;
2069
2070   GST_OBJECT_LOCK (pads);
2071   query_func = pads->priv->query_func;
2072   query_user_data = pads->priv->query_user_data;
2073   GST_OBJECT_UNLOCK (pads);
2074
2075   if (GST_QUERY_IS_SERIALIZED (query)) {
2076     GST_COLLECT_PADS_STREAM_LOCK (pads);
2077     need_unlock = TRUE;
2078   }
2079
2080   if (G_LIKELY (query_func)) {
2081     res = query_func (pads, data, query, query_user_data);
2082   }
2083
2084   if (need_unlock)
2085     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2086
2087   unref_data (data);
2088   return res;
2089
2090   /* ERRORS */
2091 pad_removed:
2092   {
2093     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2094     GST_OBJECT_UNLOCK (pad);
2095     return FALSE;
2096   }
2097 }
2098
2099
2100 /* For each buffer we receive we check if our collected condition is reached
2101  * and if so we call the collected function. When this is done we check if
2102  * data has been unqueued. If data is still queued we wait holding the stream
2103  * lock to make sure no EOS event can happen while we are ready to be
2104  * collected 
2105  */
2106 static GstFlowReturn
2107 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2108 {
2109   GstCollectData *data;
2110   GstCollectPads *pads;
2111   GstFlowReturn ret;
2112   GstBuffer **buffer_p;
2113   guint32 cookie;
2114
2115   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2116
2117   /* some magic to get the managing collect_pads */
2118   GST_OBJECT_LOCK (pad);
2119   data = (GstCollectData *) gst_pad_get_element_private (pad);
2120   if (G_UNLIKELY (data == NULL))
2121     goto no_data;
2122   ref_data (data);
2123   GST_OBJECT_UNLOCK (pad);
2124
2125   pads = data->collect;
2126
2127   GST_COLLECT_PADS_STREAM_LOCK (pads);
2128   /* if not started, bail out */
2129   if (G_UNLIKELY (!pads->priv->started))
2130     goto not_started;
2131   /* check if this pad is flushing */
2132   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2133               GST_COLLECT_PADS_STATE_FLUSHING)))
2134     goto flushing;
2135   /* pad was EOS, we can refuse this data */
2136   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2137               GST_COLLECT_PADS_STATE_EOS)))
2138     goto eos;
2139
2140   /* see if we need to clip */
2141   if (pads->priv->clip_func) {
2142     GstBuffer *outbuf = NULL;
2143     ret =
2144         pads->priv->clip_func (pads, data, buffer, &outbuf,
2145         pads->priv->clip_user_data);
2146     buffer = outbuf;
2147
2148     if (G_UNLIKELY (outbuf == NULL))
2149       goto clipped;
2150
2151     if (G_UNLIKELY (ret == GST_FLOW_EOS))
2152       goto eos;
2153     else if (G_UNLIKELY (ret != GST_FLOW_OK))
2154       goto error;
2155   }
2156
2157   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
2158       GST_DEBUG_PAD_NAME (pad));
2159
2160   /* One more pad has data queued */
2161   if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
2162     pads->priv->queuedpads++;
2163   buffer_p = &data->buffer;
2164   gst_buffer_replace (buffer_p, buffer);
2165
2166   /* update segment last position if in TIME */
2167   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2168     GstClockTime timestamp;
2169
2170     timestamp = GST_BUFFER_DTS (buffer);
2171     if (!GST_CLOCK_TIME_IS_VALID (timestamp))
2172       timestamp = GST_BUFFER_PTS (buffer);
2173
2174     if (GST_CLOCK_TIME_IS_VALID (timestamp))
2175       data->segment.position = timestamp;
2176   }
2177
2178   /* While we have data queued on this pad try to collect stuff */
2179   do {
2180     /* Check if our collected condition is matched and call the collected
2181      * function if it is */
2182     ret = gst_collect_pads_check_collected (pads);
2183     /* when an error occurs, we want to report this back to the caller ASAP
2184      * without having to block if the buffer was not popped */
2185     if (G_UNLIKELY (ret != GST_FLOW_OK))
2186       goto error;
2187
2188     /* data was consumed, we can exit and accept new data */
2189     if (data->buffer == NULL)
2190       break;
2191
2192     /* Having the _INIT here means we don't care about any broadcast up to here
2193      * (most of which occur with STREAM_LOCK held, so could not have happened
2194      * anyway).  We do care about e.g. a remove initiated broadcast as of this
2195      * point.  Putting it here also makes this thread ignores any evt it raised
2196      * itself (as is a usual WAIT semantic).
2197      */
2198     GST_COLLECT_PADS_EVT_INIT (cookie);
2199
2200     /* pad could be removed and re-added */
2201     unref_data (data);
2202     GST_OBJECT_LOCK (pad);
2203     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2204       goto pad_removed;
2205     ref_data (data);
2206     GST_OBJECT_UNLOCK (pad);
2207
2208     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2209         GST_DEBUG_PAD_NAME (pad));
2210
2211     /* wait to be collected, this must happen from another thread triggered
2212      * by the _chain function of another pad. We release the lock so we
2213      * can get stopped or flushed as well. We can however not get EOS
2214      * because we still hold the STREAM_LOCK.
2215      */
2216     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2217     GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2218     GST_COLLECT_PADS_STREAM_LOCK (pads);
2219
2220     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2221
2222     /* after a signal, we could be stopped */
2223     if (G_UNLIKELY (!pads->priv->started))
2224       goto not_started;
2225     /* check if this pad is flushing */
2226     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2227                 GST_COLLECT_PADS_STATE_FLUSHING)))
2228       goto flushing;
2229   }
2230   while (data->buffer != NULL);
2231
2232 unlock_done:
2233   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2234   /* data is definitely NULL if pad_removed goto was run. */
2235   if (data)
2236     unref_data (data);
2237   if (buffer)
2238     gst_buffer_unref (buffer);
2239   return ret;
2240
2241 pad_removed:
2242   {
2243     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2244     GST_OBJECT_UNLOCK (pad);
2245     ret = GST_FLOW_NOT_LINKED;
2246     goto unlock_done;
2247   }
2248   /* ERRORS */
2249 no_data:
2250   {
2251     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2252     GST_OBJECT_UNLOCK (pad);
2253     gst_buffer_unref (buffer);
2254     return GST_FLOW_NOT_LINKED;
2255   }
2256 not_started:
2257   {
2258     GST_DEBUG ("not started");
2259     gst_collect_pads_clear (pads, data);
2260     ret = GST_FLOW_FLUSHING;
2261     goto unlock_done;
2262   }
2263 flushing:
2264   {
2265     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2266     gst_collect_pads_clear (pads, data);
2267     ret = GST_FLOW_FLUSHING;
2268     goto unlock_done;
2269   }
2270 eos:
2271   {
2272     /* we should not post an error for this, just inform upstream that
2273      * we don't expect anything anymore */
2274     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2275     ret = GST_FLOW_EOS;
2276     goto unlock_done;
2277   }
2278 clipped:
2279   {
2280     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2281     ret = GST_FLOW_OK;
2282     goto unlock_done;
2283   }
2284 error:
2285   {
2286     /* we print the error, the element should post a reasonable error
2287      * message for fatal errors */
2288     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2289     gst_collect_pads_clear (pads, data);
2290     goto unlock_done;
2291   }
2292 }