aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:gstcollectpads
25  * @title: GstCollectPads
26  * @short_description: manages a set of pads that operate in collect mode
27  * @see_also:
28  *
29  * Manages a set of pads that operate in collect mode. This means that control
30  * is given to the manager of this object when all pads have data.
31  *
32  *   * Collectpads are created with gst_collect_pads_new(). A callback should then
33  *     be installed with gst_collect_pads_set_function ().
34  *
35  *   * Pads are added to the collection with gst_collect_pads_add_pad()/
36  *     gst_collect_pads_remove_pad(). The pad has to be a sinkpad. When added,
37  *     the chain, event and query functions of the pad are overridden. The
38  *     element_private of the pad is used to store private information for the
39  *     collectpads.
40  *
41  *   * For each pad, data is queued in the _chain function or by
42  *     performing a pull_range.
43  *
44  *   * When data is queued on all pads in waiting mode, the callback function is called.
45  *
46  *   * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
47  *     One can peek at the data with the gst_collect_pads_peek() function.
48  *     These functions will return %NULL if the pad received an EOS event. When all
49  *     pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS
50  *     event itself.
51  *
52  *   * Data can also be dequeued in byte units using the gst_collect_pads_available(),
53  *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
54  *
55  *   * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
56  *     their state change functions to start and stop the processing of the collectpads.
57  *     The gst_collect_pads_stop() call should be called before calling the parent
58  *     element state change function in the PAUSED_TO_READY state change to ensure
59  *     no pad is blocked and the element can finish streaming.
60  *
61  *   * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
62  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
63  *     Thus these pads may but need not have data when the callback is called.
64  *     All pads are in waiting mode by default.
65  *
66  */
67
68 #ifdef HAVE_CONFIG_H
69 #  include "config.h"
70 #endif
71
72 #include <gst/gst_private.h>
73
74 #include "gstcollectpads.h"
75
76 #include "../../../gst/glib-compat-private.h"
77
78 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
79 #define GST_CAT_DEFAULT collect_pads_debug
80
81 struct _GstCollectDataPrivate
82 {
83   /* refcounting for struct, and destroy callback */
84   GstCollectDataDestroyNotify destroy_notify;
85   gint refcount;
86 };
87
88 struct _GstCollectPadsPrivate
89 {
90   /* with LOCK and/or STREAM_LOCK */
91   gboolean started;
92
93   /* with STREAM_LOCK */
94   guint32 cookie;               /* pad_list cookie */
95   guint numpads;                /* number of pads in @data */
96   guint queuedpads;             /* number of pads with a buffer */
97   guint eospads;                /* number of pads that are EOS */
98   GstClockTime earliest_time;   /* Current earliest time */
99   GstCollectData *earliest_data;        /* Pad data for current earliest time */
100
101   /* with LOCK */
102   GSList *pad_list;             /* list of GstCollectData* */
103   guint32 pad_cookie;           /* updated cookie */
104
105   GstCollectPadsFunction func;  /* function and user_data for callback */
106   gpointer user_data;
107   GstCollectPadsBufferFunction buffer_func;     /* function and user_data for buffer callback */
108   gpointer buffer_user_data;
109   GstCollectPadsCompareFunction compare_func;
110   gpointer compare_user_data;
111   GstCollectPadsEventFunction event_func;       /* function and data for event callback */
112   gpointer event_user_data;
113   GstCollectPadsQueryFunction query_func;
114   gpointer query_user_data;
115   GstCollectPadsClipFunction clip_func;
116   gpointer clip_user_data;
117   GstCollectPadsFlushFunction flush_func;
118   gpointer flush_user_data;
119
120   /* no other lock needed */
121   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
122   GCond evt_cond;
123   guint32 evt_cookie;
124
125   gboolean seeking;
126   gboolean pending_flush_start;
127   gboolean pending_flush_stop;
128 };
129
130 #define parent_class gst_collect_pads_parent_class
131 G_DEFINE_TYPE_WITH_PRIVATE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
132
133 static void gst_collect_pads_clear (GstCollectPads * pads,
134     GstCollectData * data);
135 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
136     GstBuffer * buffer);
137 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
138     GstEvent * event);
139 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
140     GstQuery * query);
141 static void gst_collect_pads_finalize (GObject * object);
142 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
143     pads, gpointer user_data);
144 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
145     GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
146     GstClockTime timestamp2, gpointer user_data);
147 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
148 static void ref_data (GstCollectData * data);
149 static void unref_data (GstCollectData * data);
150
151 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
152     pads, GstCollectData * data, GstEvent * event, gpointer user_data);
153 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
154     pads, GstCollectData * data, GstQuery * query, gpointer user_data);
155
156
157 /* Some properties are protected by LOCK, others by STREAM_LOCK
158  * However, manipulating either of these partitions may require
159  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
160  * Alternative implementations are possible, e.g. some low-level re-implementing
161  * of the 2 above locks to drop both of them atomically when going into _WAIT.
162  */
163 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
164 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
165 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START {    \
166   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
167   /* should work unless a lot of event'ing and thread starvation */\
168   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
169     g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
170         GST_COLLECT_PADS_GET_EVT_LOCK (pads));                    \
171   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
172   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
173 } G_STMT_END
174 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
175   GTimeVal __tv; \
176   \
177   g_get_current_time (&tv); \
178   g_time_val_add (&tv, timeout); \
179   \
180   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
181   /* should work unless a lot of event'ing and thread starvation */\
182   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
183     g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
184         GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv);                    \
185   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
186   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
187 } G_STMT_END
188 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START {       \
189   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
190   /* never mind wrap-around */                                     \
191   ++(((GstCollectPads *) pads)->priv->evt_cookie);                      \
192   g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads));        \
193   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
194 } G_STMT_END
195 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START {          \
196   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
197   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
198   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
199 } G_STMT_END
200
201 static void
202 gst_collect_pads_class_init (GstCollectPadsClass * klass)
203 {
204   GObjectClass *gobject_class = (GObjectClass *) klass;
205
206   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
207       "GstCollectPads");
208
209   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
210 }
211
212 static void
213 gst_collect_pads_init (GstCollectPads * pads)
214 {
215   pads->priv = gst_collect_pads_get_instance_private (pads);
216
217   pads->data = NULL;
218   pads->priv->cookie = 0;
219   pads->priv->numpads = 0;
220   pads->priv->queuedpads = 0;
221   pads->priv->eospads = 0;
222   pads->priv->started = FALSE;
223
224   g_rec_mutex_init (&pads->stream_lock);
225
226   pads->priv->func = gst_collect_pads_default_collected;
227   pads->priv->user_data = NULL;
228   pads->priv->event_func = NULL;
229   pads->priv->event_user_data = NULL;
230
231   /* members for default muxing */
232   pads->priv->buffer_func = NULL;
233   pads->priv->buffer_user_data = NULL;
234   pads->priv->compare_func = gst_collect_pads_default_compare_func;
235   pads->priv->compare_user_data = NULL;
236   pads->priv->earliest_data = NULL;
237   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
238
239   pads->priv->event_func = gst_collect_pads_event_default_internal;
240   pads->priv->query_func = gst_collect_pads_query_default_internal;
241
242   /* members to manage the pad list */
243   pads->priv->pad_cookie = 0;
244   pads->priv->pad_list = NULL;
245
246   /* members for event */
247   g_mutex_init (&pads->priv->evt_lock);
248   g_cond_init (&pads->priv->evt_cond);
249   pads->priv->evt_cookie = 0;
250
251   pads->priv->seeking = FALSE;
252   pads->priv->pending_flush_start = FALSE;
253   pads->priv->pending_flush_stop = FALSE;
254 }
255
256 static void
257 gst_collect_pads_finalize (GObject * object)
258 {
259   GstCollectPads *pads = GST_COLLECT_PADS (object);
260
261   GST_DEBUG_OBJECT (object, "finalize");
262
263   g_rec_mutex_clear (&pads->stream_lock);
264
265   g_cond_clear (&pads->priv->evt_cond);
266   g_mutex_clear (&pads->priv->evt_lock);
267
268   /* Remove pads and free pads list */
269   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
270   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
271   g_slist_free (pads->data);
272   g_slist_free (pads->priv->pad_list);
273
274   G_OBJECT_CLASS (parent_class)->finalize (object);
275 }
276
277 /**
278  * gst_collect_pads_new:
279  *
280  * Create a new instance of #GstCollectPads.
281  *
282  * MT safe.
283  *
284  * Returns: (transfer full): a new #GstCollectPads, or %NULL in case of an error.
285  */
286 GstCollectPads *
287 gst_collect_pads_new (void)
288 {
289   GstCollectPads *newcoll;
290
291   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
292
293   /* clear floating flag */
294   gst_object_ref_sink (newcoll);
295
296   return newcoll;
297 }
298
299 /* Must be called with GstObject lock! */
300 static void
301 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
302     GstCollectPadsBufferFunction func, gpointer user_data)
303 {
304   pads->priv->buffer_func = func;
305   pads->priv->buffer_user_data = user_data;
306 }
307
308 /**
309  * gst_collect_pads_set_buffer_function:
310  * @pads: the collectpads to use
311  * @func: (scope call): the function to set
312  * @user_data: (closure): user data passed to the function
313  *
314  * Set the callback function and user data that will be called with
315  * the oldest buffer when all pads have been collected, or %NULL on EOS.
316  * If a buffer is passed, the callback owns a reference and must unref
317  * it.
318  *
319  * MT safe.
320  */
321 void
322 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
323     GstCollectPadsBufferFunction func, gpointer user_data)
324 {
325   g_return_if_fail (pads != NULL);
326   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
327
328   GST_OBJECT_LOCK (pads);
329   gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
330   GST_OBJECT_UNLOCK (pads);
331 }
332
333 /**
334  * gst_collect_pads_set_compare_function:
335  * @pads: the pads to use
336  * @func: (scope call): the function to set
337  * @user_data: (closure): user data passed to the function
338  *
339  * Set the timestamp comparison function.
340  *
341  * MT safe.
342  */
343 /* NOTE allowing to change comparison seems not advisable;
344 no known use-case, and collaboration with default algorithm is unpredictable.
345 If custom comparing/operation is needed, just use a collect function of
346 your own */
347 void
348 gst_collect_pads_set_compare_function (GstCollectPads * pads,
349     GstCollectPadsCompareFunction func, gpointer user_data)
350 {
351   g_return_if_fail (pads != NULL);
352   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
353
354   GST_OBJECT_LOCK (pads);
355   pads->priv->compare_func = func;
356   pads->priv->compare_user_data = user_data;
357   GST_OBJECT_UNLOCK (pads);
358 }
359
360 /**
361  * gst_collect_pads_set_function:
362  * @pads: the collectpads to use
363  * @func: (scope call): the function to set
364  * @user_data: user data passed to the function
365  *
366  * CollectPads provides a default collection algorithm that will determine
367  * the oldest buffer available on all of its pads, and then delegate
368  * to a configured callback.
369  * However, if circumstances are more complicated and/or more control
370  * is desired, this sets a callback that will be invoked instead when
371  * all the pads added to the collection have buffers queued.
372  * Evidently, this callback is not compatible with
373  * gst_collect_pads_set_buffer_function() callback.
374  * If this callback is set, the former will be unset.
375  *
376  * MT safe.
377  */
378 void
379 gst_collect_pads_set_function (GstCollectPads * pads,
380     GstCollectPadsFunction func, gpointer user_data)
381 {
382   g_return_if_fail (pads != NULL);
383   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
384
385   GST_OBJECT_LOCK (pads);
386   pads->priv->func = func;
387   pads->priv->user_data = user_data;
388   gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
389   GST_OBJECT_UNLOCK (pads);
390 }
391
392 static void
393 ref_data (GstCollectData * data)
394 {
395   g_assert (data != NULL);
396
397   g_atomic_int_inc (&(data->priv->refcount));
398 }
399
400 static void
401 unref_data (GstCollectData * data)
402 {
403   g_assert (data != NULL);
404   g_assert (data->priv->refcount > 0);
405
406   if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
407     return;
408
409   if (data->priv->destroy_notify)
410     data->priv->destroy_notify (data);
411
412   g_object_unref (data->pad);
413   if (data->buffer) {
414     gst_buffer_unref (data->buffer);
415   }
416   g_free (data->priv);
417   g_free (data);
418 }
419
420 /**
421  * gst_collect_pads_set_event_function:
422  * @pads: the collectpads to use
423  * @func: (scope call): the function to set
424  * @user_data: user data passed to the function
425  *
426  * Set the event callback function and user data that will be called when
427  * collectpads has received an event originating from one of the collected
428  * pads.  If the event being processed is a serialized one, this callback is
429  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
430  * held when calling a number of CollectPads functions, it should be acquired
431  * if so (unusually) needed.
432  *
433  * MT safe.
434  */
435 void
436 gst_collect_pads_set_event_function (GstCollectPads * pads,
437     GstCollectPadsEventFunction func, gpointer user_data)
438 {
439   g_return_if_fail (pads != NULL);
440   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
441
442   GST_OBJECT_LOCK (pads);
443   pads->priv->event_func = func;
444   pads->priv->event_user_data = user_data;
445   GST_OBJECT_UNLOCK (pads);
446 }
447
448 /**
449  * gst_collect_pads_set_query_function:
450  * @pads: the collectpads to use
451  * @func: (scope call): the function to set
452  * @user_data: user data passed to the function
453  *
454  * Set the query callback function and user data that will be called after
455  * collectpads has received a query originating from one of the collected
456  * pads.  If the query being processed is a serialized one, this callback is
457  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
458  * held when calling a number of CollectPads functions, it should be acquired
459  * if so (unusually) needed.
460  *
461  * MT safe.
462  */
463 void
464 gst_collect_pads_set_query_function (GstCollectPads * pads,
465     GstCollectPadsQueryFunction func, gpointer user_data)
466 {
467   g_return_if_fail (pads != NULL);
468   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
469
470   GST_OBJECT_LOCK (pads);
471   pads->priv->query_func = func;
472   pads->priv->query_user_data = user_data;
473   GST_OBJECT_UNLOCK (pads);
474 }
475
476 /**
477 * gst_collect_pads_clip_running_time:
478 * @pads: the collectpads to use
479 * @cdata: collect data of corresponding pad
480 * @buf: buffer being clipped
481 * @outbuf: (allow-none) (out): output buffer with running time, or NULL if clipped
482 * @user_data: user data (unused)
483 *
484 * Convenience clipping function that converts incoming buffer's timestamp
485 * to running time, or clips the buffer if outside configured segment.
486 *
487 * Since 1.6, this clipping function also sets the DTS parameter of the
488 * GstCollectData structure. This version of the running time DTS can be
489 * negative. G_MININT64 is used to indicate invalid value.
490 */
491 GstFlowReturn
492 gst_collect_pads_clip_running_time (GstCollectPads * pads,
493     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
494     gpointer user_data)
495 {
496   *outbuf = buf;
497
498   /* invalid left alone and passed */
499   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) {
500     GstClockTime time;
501     GstClockTime buf_dts, abs_dts;
502     gint dts_sign;
503
504     time = GST_BUFFER_PTS (buf);
505
506     if (GST_CLOCK_TIME_IS_VALID (time)) {
507       time =
508           gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
509       if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
510         GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
511             GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
512         gst_buffer_unref (buf);
513         *outbuf = NULL;
514         return GST_FLOW_OK;
515       }
516     }
517
518     GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
519         GST_TIME_FORMAT " running time",
520         GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
521     *outbuf = gst_buffer_make_writable (buf);
522     GST_BUFFER_PTS (*outbuf) = time;
523
524     dts_sign = gst_segment_to_running_time_full (&cdata->segment,
525         GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts);
526     buf_dts = GST_BUFFER_DTS (*outbuf);
527     if (dts_sign > 0) {
528       GST_BUFFER_DTS (*outbuf) = abs_dts;
529       GST_COLLECT_PADS_DTS (cdata) = abs_dts;
530     } else if (dts_sign < 0) {
531       GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
532       GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts);
533     } else {
534       GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
535       GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE;
536     }
537
538     GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
539         GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
540         GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata)));
541   }
542
543   return GST_FLOW_OK;
544 }
545
546 /**
547  * gst_collect_pads_set_clip_function:
548  * @pads: the collectpads to use
549  * @clipfunc: (scope call): clip function to install
550  * @user_data: user data to pass to @clip_func
551  *
552  * Install a clipping function that is called right after a buffer is received
553  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
554  */
555 void
556 gst_collect_pads_set_clip_function (GstCollectPads * pads,
557     GstCollectPadsClipFunction clipfunc, gpointer user_data)
558 {
559   g_return_if_fail (pads != NULL);
560   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
561
562   pads->priv->clip_func = clipfunc;
563   pads->priv->clip_user_data = user_data;
564 }
565
566 /**
567  * gst_collect_pads_set_flush_function:
568  * @pads: the collectpads to use
569  * @func: (scope call): flush function to install
570  * @user_data: user data to pass to @func
571  *
572  * Install a flush function that is called when the internal
573  * state of all pads should be flushed as part of flushing seek
574  * handling. See #GstCollectPadsFlushFunction for more info.
575  *
576  * Since: 1.4
577  */
578 void
579 gst_collect_pads_set_flush_function (GstCollectPads * pads,
580     GstCollectPadsFlushFunction func, gpointer user_data)
581 {
582   g_return_if_fail (pads != NULL);
583   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
584
585   pads->priv->flush_func = func;
586   pads->priv->flush_user_data = user_data;
587 }
588
589 /**
590  * gst_collect_pads_add_pad:
591  * @pads: the collectpads to use
592  * @pad: (transfer none): the pad to add
593  * @size: the size of the returned #GstCollectData structure
594  * @destroy_notify: (scope async): function to be called before the returned
595  *   #GstCollectData structure is freed
596  * @lock: whether to lock this pad in usual waiting state
597  *
598  * Add a pad to the collection of collect pads. The pad has to be
599  * a sinkpad. The refcount of the pad is incremented. Use
600  * gst_collect_pads_remove_pad() to remove the pad from the collection
601  * again.
602  *
603  * You specify a size for the returned #GstCollectData structure
604  * so that you can use it to store additional information.
605  *
606  * You can also specify a #GstCollectDataDestroyNotify that will be called
607  * just before the #GstCollectData structure is freed. It is passed the
608  * pointer to the structure and should free any custom memory and resources
609  * allocated for it.
610  *
611  * Keeping a pad locked in waiting state is only relevant when using
612  * the default collection algorithm (providing the oldest buffer).
613  * It ensures a buffer must be available on this pad for a collection
614  * to take place.  This is of typical use to a muxer element where
615  * non-subtitle streams should always be in waiting state,
616  * e.g. to assure that caps information is available on all these streams
617  * when initial headers have to be written.
618  *
619  * The pad will be automatically activated in push mode when @pads is
620  * started.
621  *
622  * MT safe.
623  *
624  * Returns: (nullable) (transfer none): a new #GstCollectData to identify the
625  *   new pad. Or %NULL if wrong parameters are supplied.
626  */
627 GstCollectData *
628 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
629     GstCollectDataDestroyNotify destroy_notify, gboolean lock)
630 {
631   GstCollectData *data;
632
633   g_return_val_if_fail (pads != NULL, NULL);
634   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
635   g_return_val_if_fail (pad != NULL, NULL);
636   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
637   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
638
639   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
640
641   data = g_malloc0 (size);
642   data->priv = g_new0 (GstCollectDataPrivate, 1);
643   data->collect = pads;
644   data->pad = gst_object_ref (pad);
645   data->buffer = NULL;
646   data->pos = 0;
647   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
648   data->state = GST_COLLECT_PADS_STATE_WAITING;
649   data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
650   data->priv->refcount = 1;
651   data->priv->destroy_notify = destroy_notify;
652   data->ABI.abi.dts = G_MININT64;
653
654   GST_OBJECT_LOCK (pads);
655   GST_OBJECT_LOCK (pad);
656   gst_pad_set_element_private (pad, data);
657   GST_OBJECT_UNLOCK (pad);
658   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
659   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
660   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
661   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
662   /* backward compat, also add to data if stopped, so that the element already
663    * has this in the public data list before going PAUSED (typically)
664    * this can only be done when we are stopped because we don't take the
665    * STREAM_LOCK to protect the pads->data list. */
666   if (!pads->priv->started) {
667     pads->data = g_slist_append (pads->data, data);
668     ref_data (data);
669   }
670   /* activate the pad when needed */
671   if (pads->priv->started)
672     gst_pad_set_active (pad, TRUE);
673   pads->priv->pad_cookie++;
674   GST_OBJECT_UNLOCK (pads);
675
676   return data;
677 }
678
679 static gint
680 find_pad (GstCollectData * data, GstPad * pad)
681 {
682   if (data->pad == pad)
683     return 0;
684   return 1;
685 }
686
687 /**
688  * gst_collect_pads_remove_pad:
689  * @pads: the collectpads to use
690  * @pad: (transfer none): the pad to remove
691  *
692  * Remove a pad from the collection of collect pads. This function will also
693  * free the #GstCollectData and all the resources that were allocated with
694  * gst_collect_pads_add_pad().
695  *
696  * The pad will be deactivated automatically when @pads is stopped.
697  *
698  * MT safe.
699  *
700  * Returns: %TRUE if the pad could be removed.
701  */
702 gboolean
703 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
704 {
705   GstCollectData *data;
706   GSList *list;
707
708   g_return_val_if_fail (pads != NULL, FALSE);
709   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
710   g_return_val_if_fail (pad != NULL, FALSE);
711   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
712
713   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
714
715   GST_OBJECT_LOCK (pads);
716   list =
717       g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
718   if (!list)
719     goto unknown_pad;
720
721   data = (GstCollectData *) list->data;
722
723   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
724       data);
725
726   /* clear the stuff we configured */
727   gst_pad_set_chain_function (pad, NULL);
728   gst_pad_set_event_function (pad, NULL);
729   GST_OBJECT_LOCK (pad);
730   gst_pad_set_element_private (pad, NULL);
731   GST_OBJECT_UNLOCK (pad);
732
733   /* backward compat, also remove from data if stopped, note that this function
734    * can only be called when we are stopped because we don't take the
735    * STREAM_LOCK to protect the pads->data list. */
736   if (!pads->priv->started) {
737     GSList *dlist;
738
739     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
740     if (dlist) {
741       GstCollectData *pdata = dlist->data;
742
743       pads->data = g_slist_delete_link (pads->data, dlist);
744       unref_data (pdata);
745     }
746   }
747   /* remove from the pad list */
748   pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
749   pads->priv->pad_cookie++;
750
751   /* signal waiters because something changed */
752   GST_COLLECT_PADS_EVT_BROADCAST (pads);
753
754   /* deactivate the pad when needed */
755   if (!pads->priv->started)
756     gst_pad_set_active (pad, FALSE);
757
758   /* clean and free the collect data */
759   unref_data (data);
760
761   GST_OBJECT_UNLOCK (pads);
762
763   return TRUE;
764
765 unknown_pad:
766   {
767     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
768         GST_DEBUG_PAD_NAME (pad));
769     GST_OBJECT_UNLOCK (pads);
770     return FALSE;
771   }
772 }
773
774 /*
775  * Must be called with STREAM_LOCK and OBJECT_LOCK.
776  */
777 static void
778 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
779     gboolean flushing)
780 {
781   GSList *walk = NULL;
782
783   GST_DEBUG ("sink-pads flushing=%d", flushing);
784
785   /* Update the pads flushing flag */
786   for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
787     GstCollectData *cdata = walk->data;
788
789     if (GST_IS_PAD (cdata->pad)) {
790       GST_OBJECT_LOCK (cdata->pad);
791       if (flushing)
792         GST_PAD_SET_FLUSHING (cdata->pad);
793       else
794         GST_PAD_UNSET_FLUSHING (cdata->pad);
795       if (flushing)
796         GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
797       else
798         GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
799       gst_collect_pads_clear (pads, cdata);
800       GST_OBJECT_UNLOCK (cdata->pad);
801     }
802   }
803
804   /* inform _chain of changes */
805   GST_COLLECT_PADS_EVT_BROADCAST (pads);
806 }
807
808 /**
809  * gst_collect_pads_set_flushing:
810  * @pads: the collectpads to use
811  * @flushing: desired state of the pads
812  *
813  * Change the flushing state of all the pads in the collection. No pad
814  * is able to accept anymore data when @flushing is %TRUE. Calling this
815  * function with @flushing %FALSE makes @pads accept data again.
816  * Caller must ensure that downstream streaming (thread) is not blocked,
817  * e.g. by sending a FLUSH_START downstream.
818  *
819  * MT safe.
820  */
821 void
822 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
823 {
824   g_return_if_fail (pads != NULL);
825   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
826
827   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
828   GST_COLLECT_PADS_STREAM_LOCK (pads);
829   GST_OBJECT_LOCK (pads);
830   gst_collect_pads_set_flushing_unlocked (pads, flushing);
831   GST_OBJECT_UNLOCK (pads);
832   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
833 }
834
835 /**
836  * gst_collect_pads_start:
837  * @pads: the collectpads to use
838  *
839  * Starts the processing of data in the collect_pads.
840  *
841  * MT safe.
842  */
843 void
844 gst_collect_pads_start (GstCollectPads * pads)
845 {
846   GSList *collected;
847
848   g_return_if_fail (pads != NULL);
849   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
850
851   GST_DEBUG_OBJECT (pads, "starting collect pads");
852
853   /* make sure stop and collect cannot be called anymore */
854   GST_COLLECT_PADS_STREAM_LOCK (pads);
855
856   /* make pads streamable */
857   GST_OBJECT_LOCK (pads);
858
859   /* loop over the master pad list and reset the segment */
860   collected = pads->priv->pad_list;
861   for (; collected; collected = g_slist_next (collected)) {
862     GstCollectData *data;
863
864     data = collected->data;
865     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
866   }
867
868   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
869
870   /* Start collect pads */
871   pads->priv->started = TRUE;
872   GST_OBJECT_UNLOCK (pads);
873   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
874 }
875
876 /**
877  * gst_collect_pads_stop:
878  * @pads: the collectpads to use
879  *
880  * Stops the processing of data in the collect_pads. this function
881  * will also unblock any blocking operations.
882  *
883  * MT safe.
884  */
885 void
886 gst_collect_pads_stop (GstCollectPads * pads)
887 {
888   GSList *collected;
889
890   g_return_if_fail (pads != NULL);
891   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
892
893   GST_DEBUG_OBJECT (pads, "stopping collect pads");
894
895   /* make sure collect and start cannot be called anymore */
896   GST_COLLECT_PADS_STREAM_LOCK (pads);
897
898   /* make pads not accept data anymore */
899   GST_OBJECT_LOCK (pads);
900   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
901
902   /* Stop collect pads */
903   pads->priv->started = FALSE;
904   pads->priv->eospads = 0;
905   pads->priv->queuedpads = 0;
906
907   /* loop over the master pad list and flush buffers */
908   collected = pads->priv->pad_list;
909   for (; collected; collected = g_slist_next (collected)) {
910     GstCollectData *data;
911     GstBuffer **buffer_p;
912
913     data = collected->data;
914     if (data->buffer) {
915       buffer_p = &data->buffer;
916       gst_buffer_replace (buffer_p, NULL);
917       data->pos = 0;
918     }
919     GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
920   }
921
922   if (pads->priv->earliest_data)
923     unref_data (pads->priv->earliest_data);
924   pads->priv->earliest_data = NULL;
925   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
926
927   GST_OBJECT_UNLOCK (pads);
928   /* Wake them up so they can end the chain functions. */
929   GST_COLLECT_PADS_EVT_BROADCAST (pads);
930
931   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
932 }
933
934 /**
935  * gst_collect_pads_peek:
936  * @pads: the collectpads to peek
937  * @data: the data to use
938  *
939  * Peek at the buffer currently queued in @data. This function
940  * should be called with the @pads STREAM_LOCK held, such as in the callback
941  * handler.
942  *
943  * MT safe.
944  *
945  * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
946  * buffer is queued. should unref the buffer after usage.
947  */
948 GstBuffer *
949 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
950 {
951   GstBuffer *result;
952
953   g_return_val_if_fail (pads != NULL, NULL);
954   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
955   g_return_val_if_fail (data != NULL, NULL);
956
957   if ((result = data->buffer))
958     gst_buffer_ref (result);
959
960   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%" GST_PTR_FORMAT,
961       GST_DEBUG_PAD_NAME (data->pad), result);
962
963   return result;
964 }
965
966 /**
967  * gst_collect_pads_pop:
968  * @pads: the collectpads to pop
969  * @data: the data to use
970  *
971  * Pop the buffer currently queued in @data. This function
972  * should be called with the @pads STREAM_LOCK held, such as in the callback
973  * handler.
974  *
975  * MT safe.
976  *
977  * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
978  * buffer was queued. You should unref the buffer after usage.
979  */
980 GstBuffer *
981 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
982 {
983   GstBuffer *result;
984
985   g_return_val_if_fail (pads != NULL, NULL);
986   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
987   g_return_val_if_fail (data != NULL, NULL);
988
989   if ((result = data->buffer)) {
990     data->buffer = NULL;
991     data->pos = 0;
992     /* one less pad with queued data now */
993     if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
994       pads->priv->queuedpads--;
995   }
996
997   GST_COLLECT_PADS_EVT_BROADCAST (pads);
998
999   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%" GST_PTR_FORMAT,
1000       GST_DEBUG_PAD_NAME (data->pad), result);
1001
1002   return result;
1003 }
1004
1005 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
1006  * held */
1007 static void
1008 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
1009 {
1010   GstBuffer *buf;
1011
1012   if ((buf = gst_collect_pads_pop (pads, data)))
1013     gst_buffer_unref (buf);
1014 }
1015
1016 /**
1017  * gst_collect_pads_available:
1018  * @pads: the collectpads to query
1019  *
1020  * Query how much bytes can be read from each queued buffer. This means
1021  * that the result of this call is the maximum number of bytes that can
1022  * be read from each of the pads.
1023  *
1024  * This function should be called with @pads STREAM_LOCK held, such as
1025  * in the callback.
1026  *
1027  * MT safe.
1028  *
1029  * Returns: The maximum number of bytes queued on all pads. This function
1030  * returns 0 if a pad has no queued buffer.
1031  */
1032 /* we might pre-calculate this in some struct field,
1033  * but would then have to maintain this in _chain and particularly _pop, etc,
1034  * even if element is never interested in this information */
1035 guint
1036 gst_collect_pads_available (GstCollectPads * pads)
1037 {
1038   GSList *collected;
1039   guint result = G_MAXUINT;
1040
1041   g_return_val_if_fail (pads != NULL, 0);
1042   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1043
1044   collected = pads->data;
1045   for (; collected; collected = g_slist_next (collected)) {
1046     GstCollectData *pdata;
1047     GstBuffer *buffer;
1048     gint size;
1049
1050     pdata = (GstCollectData *) collected->data;
1051
1052     /* ignore pad with EOS */
1053     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1054                 GST_COLLECT_PADS_STATE_EOS))) {
1055       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1056       continue;
1057     }
1058
1059     /* an empty buffer without EOS is weird when we get here.. */
1060     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1061       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1062       goto not_filled;
1063     }
1064
1065     /* this is the size left of the buffer */
1066     size = gst_buffer_get_size (buffer) - pdata->pos;
1067     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1068
1069     /* need to return the min of all available data */
1070     if (size < result)
1071       result = size;
1072   }
1073   /* nothing changed, all must be EOS then, return 0 */
1074   if (G_UNLIKELY (result == G_MAXUINT))
1075     result = 0;
1076
1077   return result;
1078
1079 not_filled:
1080   {
1081     return 0;
1082   }
1083 }
1084
1085 /**
1086  * gst_collect_pads_flush:
1087  * @pads: the collectpads to query
1088  * @data: the data to use
1089  * @size: the number of bytes to flush
1090  *
1091  * Flush @size bytes from the pad @data.
1092  *
1093  * This function should be called with @pads STREAM_LOCK held, such as
1094  * in the callback.
1095  *
1096  * MT safe.
1097  *
1098  * Returns: The number of bytes flushed This can be less than @size and
1099  * is 0 if the pad was end-of-stream.
1100  */
1101 guint
1102 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1103     guint size)
1104 {
1105   guint flushsize;
1106   gsize bsize;
1107   GstBuffer *buffer;
1108
1109   g_return_val_if_fail (pads != NULL, 0);
1110   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1111   g_return_val_if_fail (data != NULL, 0);
1112
1113   /* no buffer, must be EOS */
1114   if ((buffer = data->buffer) == NULL)
1115     return 0;
1116
1117   bsize = gst_buffer_get_size (buffer);
1118
1119   /* this is what we can flush at max */
1120   flushsize = MIN (size, bsize - data->pos);
1121
1122   data->pos += size;
1123
1124   if (data->pos >= bsize)
1125     /* _clear will also reset data->pos to 0 */
1126     gst_collect_pads_clear (pads, data);
1127
1128   return flushsize;
1129 }
1130
1131 /**
1132  * gst_collect_pads_read_buffer:
1133  * @pads: the collectpads to query
1134  * @data: the data to use
1135  * @size: the number of bytes to read
1136  *
1137  * Get a subbuffer of @size bytes from the given pad @data.
1138  *
1139  * This function should be called with @pads STREAM_LOCK held, such as in the
1140  * callback.
1141  *
1142  * MT safe.
1143  *
1144  * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1145  * be less that requested. A return of %NULL signals that the pad is
1146  * end-of-stream. Unref the buffer after use.
1147  */
1148 GstBuffer *
1149 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1150     guint size)
1151 {
1152   guint readsize, buf_size;
1153   GstBuffer *buffer;
1154
1155   g_return_val_if_fail (pads != NULL, NULL);
1156   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1157   g_return_val_if_fail (data != NULL, NULL);
1158
1159   /* no buffer, must be EOS */
1160   if ((buffer = data->buffer) == NULL)
1161     return NULL;
1162
1163   buf_size = gst_buffer_get_size (buffer);
1164   readsize = MIN (size, buf_size - data->pos);
1165
1166   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1167       readsize);
1168 }
1169
1170 /**
1171  * gst_collect_pads_take_buffer:
1172  * @pads: the collectpads to query
1173  * @data: the data to use
1174  * @size: the number of bytes to read
1175  *
1176  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1177  * of read bytes.
1178  *
1179  * This function should be called with @pads STREAM_LOCK held, such as in the
1180  * callback.
1181  *
1182  * MT safe.
1183  *
1184  * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
1185  * be less that requested. A return of %NULL signals that the pad is
1186  * end-of-stream. Unref the buffer after use.
1187  */
1188 GstBuffer *
1189 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1190     guint size)
1191 {
1192   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1193
1194   if (buffer) {
1195     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1196   }
1197   return buffer;
1198 }
1199
1200 /**
1201  * gst_collect_pads_set_waiting:
1202  * @pads: the collectpads
1203  * @data: the data to use
1204  * @waiting: boolean indicating whether this pad should operate
1205  *           in waiting or non-waiting mode
1206  *
1207  * Sets a pad to waiting or non-waiting mode, if at least this pad
1208  * has not been created with locked waiting state,
1209  * in which case nothing happens.
1210  *
1211  * This function should be called with @pads STREAM_LOCK held, such as
1212  * in the callback.
1213  *
1214  * MT safe.
1215  */
1216 void
1217 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1218     gboolean waiting)
1219 {
1220   g_return_if_fail (pads != NULL);
1221   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1222   g_return_if_fail (data != NULL);
1223
1224   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1225       GST_PAD_NAME (data->pad), waiting,
1226       GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1227
1228   /* Do something only on a change and if not locked */
1229   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1230       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1231           ! !waiting)) {
1232     /* Set waiting state for this pad */
1233     if (waiting)
1234       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1235     else
1236       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1237     /* Update number of queued pads if needed */
1238     if (!data->buffer &&
1239         !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1240       if (waiting)
1241         pads->priv->queuedpads--;
1242       else
1243         pads->priv->queuedpads++;
1244     }
1245
1246     /* signal waiters because something changed */
1247     GST_COLLECT_PADS_EVT_BROADCAST (pads);
1248   }
1249 }
1250
1251 /* see if pads were added or removed and update our stats. Any pad
1252  * added after releasing the LOCK will get collected in the next
1253  * round.
1254  *
1255  * We can do a quick check by checking the cookies, that get changed
1256  * whenever the pad list is updated.
1257  *
1258  * Must be called with STREAM_LOCK.
1259  */
1260 static void
1261 gst_collect_pads_check_pads (GstCollectPads * pads)
1262 {
1263   /* the master list and cookie are protected with LOCK */
1264   GST_OBJECT_LOCK (pads);
1265   if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1266     GSList *collected;
1267
1268     /* clear list and stats */
1269     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1270     g_slist_free (pads->data);
1271     pads->data = NULL;
1272     pads->priv->numpads = 0;
1273     pads->priv->queuedpads = 0;
1274     pads->priv->eospads = 0;
1275     if (pads->priv->earliest_data)
1276       unref_data (pads->priv->earliest_data);
1277     pads->priv->earliest_data = NULL;
1278     pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1279
1280     /* loop over the master pad list */
1281     collected = pads->priv->pad_list;
1282     for (; collected; collected = g_slist_next (collected)) {
1283       GstCollectData *data;
1284
1285       /* update the stats */
1286       pads->priv->numpads++;
1287       data = collected->data;
1288       if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1289         pads->priv->eospads++;
1290       else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1291               GST_COLLECT_PADS_STATE_WAITING))
1292         pads->priv->queuedpads++;
1293
1294       /* add to the list of pads to collect */
1295       ref_data (data);
1296       /* preserve order of adding/requesting pads */
1297       pads->data = g_slist_append (pads->data, data);
1298     }
1299     /* and update the cookie */
1300     pads->priv->cookie = pads->priv->pad_cookie;
1301   }
1302   GST_OBJECT_UNLOCK (pads);
1303 }
1304
1305 /* checks if all the pads are collected and call the collectfunction
1306  *
1307  * Should be called with STREAM_LOCK.
1308  *
1309  * Returns: The #GstFlowReturn of collection.
1310  */
1311 static GstFlowReturn
1312 gst_collect_pads_check_collected (GstCollectPads * pads)
1313 {
1314   GstFlowReturn flow_ret = GST_FLOW_OK;
1315   GstCollectPadsFunction func;
1316   gpointer user_data;
1317
1318   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1319
1320   GST_OBJECT_LOCK (pads);
1321   func = pads->priv->func;
1322   user_data = pads->priv->user_data;
1323   GST_OBJECT_UNLOCK (pads);
1324
1325   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1326
1327   /* check for new pads, update stats etc.. */
1328   gst_collect_pads_check_pads (pads);
1329
1330   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1331     /* If all our pads are EOS just collect once to let the element
1332      * do its final EOS handling. */
1333     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1334         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1335
1336     if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1337                 TRUE, FALSE))) {
1338       GST_INFO_OBJECT (pads, "finished seeking");
1339     }
1340     do {
1341       flow_ret = func (pads, user_data);
1342     } while (flow_ret == GST_FLOW_OK);
1343   } else {
1344     gboolean collected = FALSE;
1345
1346     /* We call the collected function as long as our condition matches. */
1347     while (((pads->priv->queuedpads + pads->priv->eospads) >=
1348             pads->priv->numpads)) {
1349       GST_DEBUG_OBJECT (pads,
1350           "All active pads (%d + %d >= %d) have data, " "calling %s",
1351           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1352           GST_DEBUG_FUNCPTR_NAME (func));
1353
1354       if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1355                   TRUE, FALSE))) {
1356         GST_INFO_OBJECT (pads, "finished seeking");
1357       }
1358       flow_ret = func (pads, user_data);
1359       collected = TRUE;
1360
1361       /* break on error */
1362       if (flow_ret != GST_FLOW_OK)
1363         break;
1364       /* Don't keep looping after telling the element EOS or flushing */
1365       if (pads->priv->queuedpads == 0)
1366         break;
1367     }
1368     if (!collected)
1369       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1370           pads->priv->numpads);
1371   }
1372   return flow_ret;
1373 }
1374
1375
1376 /* General overview:
1377  * - only pad with a buffer can determine earliest_data (and earliest_time)
1378  * - only segment info determines (non-)waiting state
1379  * - ? perhaps use _stream_time for comparison
1380  *   (which muxers might have use as well ?)
1381  */
1382
1383 /*
1384  * Function to recalculate the waiting state of all pads.
1385  *
1386  * Must be called with STREAM_LOCK.
1387  *
1388  * Returns %TRUE if a pad was set to waiting
1389  * (from non-waiting state).
1390  */
1391 static gboolean
1392 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1393 {
1394   GSList *collected;
1395   gboolean result = FALSE;
1396
1397   /* If earliest time is not known, there is nothing to do. */
1398   if (pads->priv->earliest_data == NULL)
1399     return FALSE;
1400
1401   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1402     GstCollectData *data = (GstCollectData *) collected->data;
1403     int cmp_res;
1404     GstClockTime comp_time;
1405
1406     /* check if pad has a segment */
1407     if (data->segment.format == GST_FORMAT_UNDEFINED) {
1408       GST_WARNING_OBJECT (pads,
1409           "GstCollectPads has no time segment, assuming 0 based.");
1410       gst_segment_init (&data->segment, GST_FORMAT_TIME);
1411       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1412     }
1413
1414     /* check segment format */
1415     if (data->segment.format != GST_FORMAT_TIME) {
1416       GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1417       continue;
1418     }
1419
1420     /* check if the waiting state should be changed */
1421     comp_time = data->segment.position;
1422     cmp_res = pads->priv->compare_func (pads, data, comp_time,
1423         pads->priv->earliest_data, pads->priv->earliest_time,
1424         pads->priv->compare_user_data);
1425     if (cmp_res > 0)
1426       /* stop waiting */
1427       gst_collect_pads_set_waiting (pads, data, FALSE);
1428     else {
1429       if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1430         /* start waiting */
1431         gst_collect_pads_set_waiting (pads, data, TRUE);
1432         result = TRUE;
1433       }
1434     }
1435   }
1436
1437   return result;
1438 }
1439
1440 /**
1441  * gst_collect_pads_find_best_pad:
1442  * @pads: the collectpads to use
1443  * @data: returns the collectdata for earliest data
1444  * @time: returns the earliest available buffertime
1445  *
1446  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1447  * and return the corresponding #GstCollectData and buffertime.
1448  *
1449  * This function should be called with STREAM_LOCK held,
1450  * such as in the callback.
1451  */
1452 static void
1453 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1454     GstCollectData ** data, GstClockTime * time)
1455 {
1456   GSList *collected;
1457   GstCollectData *best = NULL;
1458   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1459
1460   g_return_if_fail (data != NULL);
1461   g_return_if_fail (time != NULL);
1462
1463   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1464     GstBuffer *buffer;
1465     GstCollectData *data = (GstCollectData *) collected->data;
1466     GstClockTime timestamp;
1467
1468     buffer = gst_collect_pads_peek (pads, data);
1469     /* if we have a buffer check if it is better then the current best one */
1470     if (buffer != NULL) {
1471       timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1472       gst_buffer_unref (buffer);
1473       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1474               best, best_time, pads->priv->compare_user_data) < 0) {
1475         best = data;
1476         best_time = timestamp;
1477       }
1478     }
1479   }
1480
1481   /* set earliest time */
1482   *data = best;
1483   *time = best_time;
1484
1485   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1486       best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1487       GST_TIME_ARGS (best_time));
1488 }
1489
1490 /*
1491  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1492  * gst_collect_pads_recalculate_waiting
1493  *
1494  * Must be called with STREAM_LOCK.
1495  */
1496 static gboolean
1497 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1498 {
1499   if (pads->priv->earliest_data)
1500     unref_data (pads->priv->earliest_data);
1501   gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1502       &pads->priv->earliest_time);
1503   if (pads->priv->earliest_data)
1504     ref_data (pads->priv->earliest_data);
1505   return gst_collect_pads_recalculate_waiting (pads);
1506 }
1507
1508 /*
1509  * Default collect callback triggered when #GstCollectPads gathered all data.
1510  *
1511  * Called with STREAM_LOCK.
1512  */
1513 static GstFlowReturn
1514 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1515 {
1516   GstCollectData *best = NULL;
1517   GstBuffer *buffer;
1518   GstFlowReturn ret = GST_FLOW_OK;
1519   GstCollectPadsBufferFunction func;
1520   gpointer buffer_user_data;
1521
1522   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1523
1524   GST_OBJECT_LOCK (pads);
1525   func = pads->priv->buffer_func;
1526   buffer_user_data = pads->priv->buffer_user_data;
1527   GST_OBJECT_UNLOCK (pads);
1528
1529   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1530
1531   /* Find the oldest pad at all cost */
1532   if (gst_collect_pads_recalculate_full (pads)) {
1533     /* waiting was switched on,
1534      * so give another thread a chance to deliver a possibly
1535      * older buffer; don't charge on yet with the current oldest */
1536     ret = GST_FLOW_OK;
1537     goto done;
1538   }
1539
1540   best = pads->priv->earliest_data;
1541
1542   /* No data collected means EOS. */
1543   if (G_UNLIKELY (best == NULL)) {
1544     ret = func (pads, best, NULL, buffer_user_data);
1545     if (ret == GST_FLOW_OK)
1546       ret = GST_FLOW_EOS;
1547     goto done;
1548   }
1549
1550   /* make sure that the pad we take a buffer from is waiting;
1551    * otherwise popping a buffer will seem not to have happened
1552    * and collectpads can get into a busy loop */
1553   gst_collect_pads_set_waiting (pads, best, TRUE);
1554
1555   /* Send buffer */
1556   buffer = gst_collect_pads_pop (pads, best);
1557   ret = func (pads, best, buffer, buffer_user_data);
1558
1559   /* maybe non-waiting was forced to waiting above due to
1560    * newsegment events coming too sparsely,
1561    * so re-check to restore state to avoid hanging/waiting */
1562   gst_collect_pads_recalculate_full (pads);
1563
1564 done:
1565   return ret;
1566 }
1567
1568 /*
1569  * Default timestamp compare function.
1570  */
1571 static gint
1572 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1573     GstCollectData * data1, GstClockTime timestamp1,
1574     GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1575 {
1576
1577   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1578       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1579       GST_TIME_ARGS (timestamp2));
1580   /* non-valid timestamps go first as they are probably headers or so */
1581   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1582     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1583
1584   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1585     return 1;
1586
1587   /* compare timestamp */
1588   if (timestamp1 < timestamp2)
1589     return -1;
1590
1591   if (timestamp1 > timestamp2)
1592     return 1;
1593
1594   return 0;
1595 }
1596
1597 /* called with STREAM_LOCK */
1598 static void
1599 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1600     GstCollectData * data, GstClockTime new_pos)
1601 {
1602   gint cmp_res;
1603
1604   /* If oldest time is not known, or current pad got newsegment;
1605    * recalculate the state */
1606   if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1607     gst_collect_pads_recalculate_full (pads);
1608     goto exit;
1609   }
1610
1611   /* Check if the waiting state of the pad should change. */
1612   cmp_res =
1613       pads->priv->compare_func (pads, data, new_pos,
1614       pads->priv->earliest_data, pads->priv->earliest_time,
1615       pads->priv->compare_user_data);
1616
1617   if (cmp_res > 0)
1618     /* Stop waiting */
1619     gst_collect_pads_set_waiting (pads, data, FALSE);
1620
1621 exit:
1622   return;
1623
1624 }
1625
1626 static GstClockTime
1627 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1628     GstClockTime time)
1629 {
1630   GstClockTime otime = time;
1631   GstBuffer *in, *out = NULL;
1632
1633   if (pads->priv->clip_func) {
1634     in = gst_buffer_new ();
1635     GST_BUFFER_PTS (in) = time;
1636     GST_BUFFER_DTS (in) = GST_CLOCK_TIME_NONE;
1637     pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1638     if (out) {
1639       otime = GST_BUFFER_PTS (out);
1640       gst_buffer_unref (out);
1641     } else {
1642       /* FIXME should distinguish between ahead or after segment,
1643        * let's assume after segment and use some large time ... */
1644       otime = G_MAXINT64 / 2;
1645     }
1646   }
1647
1648   return otime;
1649 }
1650
1651 /**
1652  * gst_collect_pads_event_default:
1653  * @pads: the collectpads to use
1654  * @data: collect data of corresponding pad
1655  * @event: event being processed
1656  * @discard: process but do not send event downstream
1657  *
1658  * Default #GstCollectPads event handling that elements should always
1659  * chain up to to ensure proper operation.  Element might however indicate
1660  * event should not be forwarded downstream.
1661  */
1662 gboolean
1663 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1664     GstEvent * event, gboolean discard)
1665 {
1666   gboolean res = TRUE;
1667   GstCollectPadsBufferFunction buffer_func;
1668   GstObject *parent;
1669   GstPad *pad;
1670
1671   GST_OBJECT_LOCK (pads);
1672   buffer_func = pads->priv->buffer_func;
1673   GST_OBJECT_UNLOCK (pads);
1674
1675   pad = data->pad;
1676   parent = GST_OBJECT_PARENT (pad);
1677
1678   GST_DEBUG_OBJECT (pad, "Got '%s' event", GST_EVENT_TYPE_NAME (event));
1679
1680   switch (GST_EVENT_TYPE (event)) {
1681     case GST_EVENT_FLUSH_START:
1682     {
1683       if (g_atomic_int_get (&pads->priv->seeking)) {
1684         /* drop all but the first FLUSH_STARTs when seeking */
1685         if (!g_atomic_int_compare_and_exchange (&pads->
1686                 priv->pending_flush_start, TRUE, FALSE))
1687           goto eat;
1688
1689         /* unblock collect pads */
1690         gst_pad_event_default (pad, parent, event);
1691         event = NULL;
1692
1693         GST_COLLECT_PADS_STREAM_LOCK (pads);
1694         /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
1695          * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
1696          * non-flushing (which happens in gst_collect_pads_event_default).
1697          */
1698         gst_collect_pads_set_flushing (pads, TRUE);
1699
1700         if (pads->priv->flush_func)
1701           pads->priv->flush_func (pads, pads->priv->flush_user_data);
1702
1703         g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
1704         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1705
1706         goto eat;
1707       } else {
1708         /* forward event to unblock check_collected */
1709         GST_DEBUG_OBJECT (pad, "forwarding flush start");
1710         if (!(res = gst_pad_event_default (pad, parent, event))) {
1711           GST_WARNING_OBJECT (pad, "forwarding flush start failed");
1712         }
1713         event = NULL;
1714
1715         /* now unblock the chain function.
1716          * no cond per pad, so they all unblock,
1717          * non-flushing block again */
1718         GST_COLLECT_PADS_STREAM_LOCK (pads);
1719         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1720         gst_collect_pads_clear (pads, data);
1721
1722         /* cater for possible default muxing functionality */
1723         if (buffer_func) {
1724           /* restore to initial state */
1725           gst_collect_pads_set_waiting (pads, data, TRUE);
1726           /* if the current pad is affected, reset state, recalculate later */
1727           if (pads->priv->earliest_data == data) {
1728             unref_data (data);
1729             pads->priv->earliest_data = NULL;
1730             pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1731           }
1732         }
1733
1734         GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1735
1736         goto eat;
1737       }
1738     }
1739     case GST_EVENT_FLUSH_STOP:
1740     {
1741       /* flush the 1 buffer queue */
1742       GST_COLLECT_PADS_STREAM_LOCK (pads);
1743       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1744       gst_collect_pads_clear (pads, data);
1745       /* we need new segment info after the flush */
1746       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1747       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1748       /* if the pad was EOS, remove the EOS flag and
1749        * decrement the number of eospads */
1750       if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1751                   GST_COLLECT_PADS_STATE_EOS))) {
1752         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1753                 GST_COLLECT_PADS_STATE_WAITING))
1754           pads->priv->queuedpads++;
1755         if (!g_atomic_int_get (&pads->priv->seeking)) {
1756           pads->priv->eospads--;
1757         }
1758         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1759       }
1760       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1761
1762       if (g_atomic_int_get (&pads->priv->seeking)) {
1763         if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
1764                 TRUE, FALSE))
1765           goto forward;
1766         else
1767           goto eat;
1768       } else {
1769         goto forward;
1770       }
1771     }
1772     case GST_EVENT_EOS:
1773     {
1774       GST_COLLECT_PADS_STREAM_LOCK (pads);
1775       /* if the pad was not EOS, make it EOS and so we
1776        * have one more eospad */
1777       if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1778                   GST_COLLECT_PADS_STATE_EOS))) {
1779         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1780         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1781                 GST_COLLECT_PADS_STATE_WAITING))
1782           pads->priv->queuedpads--;
1783         pads->priv->eospads++;
1784       }
1785       /* check if we need collecting anything, we ignore the result. */
1786       gst_collect_pads_check_collected (pads);
1787       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1788
1789       goto eat;
1790     }
1791     case GST_EVENT_SEGMENT:
1792     {
1793       GstSegment seg;
1794
1795       GST_COLLECT_PADS_STREAM_LOCK (pads);
1796
1797       gst_event_copy_segment (event, &seg);
1798
1799       GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1800
1801       /* default collection can not handle other segment formats than time */
1802       if (buffer_func && seg.format != GST_FORMAT_TIME) {
1803         GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1804             "can only handle time segments. Non time segment ignored.");
1805         goto newsegment_done;
1806       }
1807
1808       /* need to update segment first */
1809       data->segment = seg;
1810       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1811
1812       /* now we can use for e.g. running time */
1813       seg.position =
1814           gst_collect_pads_clip_time (pads, data, seg.start + seg.offset);
1815       /* update again */
1816       data->segment = seg;
1817
1818       /* default muxing functionality */
1819       if (!buffer_func)
1820         goto newsegment_done;
1821
1822       gst_collect_pads_handle_position_update (pads, data, seg.position);
1823
1824     newsegment_done:
1825       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1826       /* we must not forward this event since multiple segments will be
1827        * accumulated and this is certainly not what we want. */
1828       goto eat;
1829     }
1830     case GST_EVENT_GAP:
1831     {
1832       GstClockTime start, duration;
1833
1834       GST_COLLECT_PADS_STREAM_LOCK (pads);
1835
1836       gst_event_parse_gap (event, &start, &duration);
1837       /* FIXME, handle reverse playback case */
1838       if (GST_CLOCK_TIME_IS_VALID (duration))
1839         start += duration;
1840       /* we do not expect another buffer until after gap,
1841        * so that is our position now */
1842       data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1843
1844       gst_collect_pads_handle_position_update (pads, data,
1845           data->segment.position);
1846
1847       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1848       goto eat;
1849     }
1850     case GST_EVENT_STREAM_START:
1851       /* drop stream start events, element must create its own start event,
1852        * we can't just forward the first random stream start event we get */
1853       goto eat;
1854     case GST_EVENT_CAPS:
1855       goto eat;
1856     default:
1857       /* forward other events */
1858       goto forward;
1859   }
1860
1861 eat:
1862   GST_DEBUG_OBJECT (pads, "dropping event: %" GST_PTR_FORMAT, event);
1863   if (event)
1864     gst_event_unref (event);
1865   return res;
1866
1867 forward:
1868   if (discard)
1869     goto eat;
1870   else {
1871     GST_DEBUG_OBJECT (pads, "forward event: %" GST_PTR_FORMAT, event);
1872     return gst_pad_event_default (pad, parent, event);
1873   }
1874 }
1875
1876 typedef struct
1877 {
1878   GstEvent *event;
1879   gboolean result;
1880 } EventData;
1881
1882 static gboolean
1883 event_forward_func (GstPad * pad, EventData * data)
1884 {
1885   gboolean ret = TRUE;
1886   GstPad *peer = gst_pad_get_peer (pad);
1887
1888   if (peer) {
1889     ret = gst_pad_send_event (peer, gst_event_ref (data->event));
1890     gst_object_unref (peer);
1891   }
1892
1893   data->result &= ret;
1894   /* Always send to all pads */
1895   return FALSE;
1896 }
1897
1898 static gboolean
1899 forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
1900 {
1901   EventData data;
1902
1903   data.event = event;
1904   data.result = TRUE;
1905
1906   gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
1907
1908   gst_event_unref (event);
1909
1910   return data.result;
1911 }
1912
1913 /**
1914  * gst_collect_pads_src_event_default:
1915  * @pads: the #GstCollectPads to use
1916  * @pad: src #GstPad that received the event
1917  * @event: event being processed
1918  *
1919  * Default #GstCollectPads event handling for the src pad of elements.
1920  * Elements can chain up to this to let flushing seek event handling
1921  * be done by #GstCollectPads.
1922  *
1923  * Since: 1.4
1924  */
1925 gboolean
1926 gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
1927     GstEvent * event)
1928 {
1929   GstObject *parent;
1930   gboolean res = TRUE;
1931
1932   parent = GST_OBJECT_PARENT (pad);
1933
1934   switch (GST_EVENT_TYPE (event)) {
1935     case GST_EVENT_SEEK:{
1936       GstSeekFlags flags;
1937
1938       pads->priv->eospads = 0;
1939
1940       GST_INFO_OBJECT (pads, "starting seek");
1941
1942       gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
1943       if (flags & GST_SEEK_FLAG_FLUSH) {
1944         g_atomic_int_set (&pads->priv->seeking, TRUE);
1945         g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
1946         /* forward the seek upstream */
1947         res = forward_event_to_all_sinkpads (pad, event);
1948         event = NULL;
1949         if (!res) {
1950           g_atomic_int_set (&pads->priv->seeking, FALSE);
1951           g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
1952         }
1953       }
1954
1955       GST_INFO_OBJECT (pads, "seek done, result: %d", res);
1956
1957       break;
1958     }
1959     default:
1960       break;
1961   }
1962
1963   if (event)
1964     res = gst_pad_event_default (pad, parent, event);
1965
1966   return res;
1967 }
1968
1969 static gboolean
1970 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1971     GstCollectData * data, GstEvent * event, gpointer user_data)
1972 {
1973   return gst_collect_pads_event_default (pads, data, event, FALSE);
1974 }
1975
1976 static gboolean
1977 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1978 {
1979   gboolean res = FALSE, need_unlock = FALSE;
1980   GstCollectData *data;
1981   GstCollectPads *pads;
1982   GstCollectPadsEventFunction event_func;
1983   gpointer event_user_data;
1984
1985   /* some magic to get the managing collect_pads */
1986   GST_OBJECT_LOCK (pad);
1987   data = (GstCollectData *) gst_pad_get_element_private (pad);
1988   if (G_UNLIKELY (data == NULL))
1989     goto pad_removed;
1990   ref_data (data);
1991   GST_OBJECT_UNLOCK (pad);
1992
1993   res = FALSE;
1994
1995   pads = data->collect;
1996
1997   GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1998       GST_EVENT_TYPE_NAME (event));
1999
2000   GST_OBJECT_LOCK (pads);
2001   event_func = pads->priv->event_func;
2002   event_user_data = pads->priv->event_user_data;
2003   GST_OBJECT_UNLOCK (pads);
2004
2005   if (GST_EVENT_IS_SERIALIZED (event)) {
2006     GST_COLLECT_PADS_STREAM_LOCK (pads);
2007     need_unlock = TRUE;
2008   }
2009
2010   if (G_LIKELY (event_func)) {
2011     res = event_func (pads, data, event, event_user_data);
2012   }
2013
2014   if (need_unlock)
2015     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2016
2017   unref_data (data);
2018   return res;
2019
2020   /* ERRORS */
2021 pad_removed:
2022   {
2023     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2024     GST_OBJECT_UNLOCK (pad);
2025     return FALSE;
2026   }
2027 }
2028
2029 /**
2030  * gst_collect_pads_query_default:
2031  * @pads: the collectpads to use
2032  * @data: collect data of corresponding pad
2033  * @query: query being processed
2034  * @discard: process but do not send event downstream
2035  *
2036  * Default #GstCollectPads query handling that elements should always
2037  * chain up to to ensure proper operation.  Element might however indicate
2038  * query should not be forwarded downstream.
2039  */
2040 gboolean
2041 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
2042     GstQuery * query, gboolean discard)
2043 {
2044   gboolean res = TRUE;
2045   GstObject *parent;
2046   GstPad *pad;
2047
2048   pad = data->pad;
2049   parent = GST_OBJECT_PARENT (pad);
2050
2051   switch (GST_QUERY_TYPE (query)) {
2052     case GST_QUERY_SEEKING:
2053     {
2054       GstFormat format;
2055
2056       /* don't pass it along as some (file)sink might claim it does
2057        * whereas with a collectpads in between that will not likely work */
2058       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2059       gst_query_set_seeking (query, format, FALSE, 0, -1);
2060       res = TRUE;
2061       discard = TRUE;
2062       break;
2063     }
2064     default:
2065       break;
2066   }
2067
2068   if (!discard)
2069     return gst_pad_query_default (pad, parent, query);
2070   else
2071     return res;
2072 }
2073
2074 static gboolean
2075 gst_collect_pads_query_default_internal (GstCollectPads * pads,
2076     GstCollectData * data, GstQuery * query, gpointer user_data)
2077 {
2078   return gst_collect_pads_query_default (pads, data, query, FALSE);
2079 }
2080
2081 static gboolean
2082 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
2083 {
2084   gboolean res = FALSE, need_unlock = FALSE;
2085   GstCollectData *data;
2086   GstCollectPads *pads;
2087   GstCollectPadsQueryFunction query_func;
2088   gpointer query_user_data;
2089
2090   GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
2091       GST_QUERY_TYPE_NAME (query));
2092
2093   /* some magic to get the managing collect_pads */
2094   GST_OBJECT_LOCK (pad);
2095   data = (GstCollectData *) gst_pad_get_element_private (pad);
2096   if (G_UNLIKELY (data == NULL))
2097     goto pad_removed;
2098   ref_data (data);
2099   GST_OBJECT_UNLOCK (pad);
2100
2101   pads = data->collect;
2102
2103   GST_OBJECT_LOCK (pads);
2104   query_func = pads->priv->query_func;
2105   query_user_data = pads->priv->query_user_data;
2106   GST_OBJECT_UNLOCK (pads);
2107
2108   if (GST_QUERY_IS_SERIALIZED (query)) {
2109     GST_COLLECT_PADS_STREAM_LOCK (pads);
2110     need_unlock = TRUE;
2111   }
2112
2113   if (G_LIKELY (query_func)) {
2114     res = query_func (pads, data, query, query_user_data);
2115   }
2116
2117   if (need_unlock)
2118     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2119
2120   unref_data (data);
2121   return res;
2122
2123   /* ERRORS */
2124 pad_removed:
2125   {
2126     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2127     GST_OBJECT_UNLOCK (pad);
2128     return FALSE;
2129   }
2130 }
2131
2132
2133 /* For each buffer we receive we check if our collected condition is reached
2134  * and if so we call the collected function. When this is done we check if
2135  * data has been unqueued. If data is still queued we wait holding the stream
2136  * lock to make sure no EOS event can happen while we are ready to be
2137  * collected
2138  */
2139 static GstFlowReturn
2140 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2141 {
2142   GstCollectData *data;
2143   GstCollectPads *pads;
2144   GstFlowReturn ret;
2145   GstBuffer **buffer_p;
2146   guint32 cookie;
2147
2148   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2149
2150   /* some magic to get the managing collect_pads */
2151   GST_OBJECT_LOCK (pad);
2152   data = (GstCollectData *) gst_pad_get_element_private (pad);
2153   if (G_UNLIKELY (data == NULL))
2154     goto no_data;
2155   ref_data (data);
2156   GST_OBJECT_UNLOCK (pad);
2157
2158   pads = data->collect;
2159
2160   GST_COLLECT_PADS_STREAM_LOCK (pads);
2161   /* if not started, bail out */
2162   if (G_UNLIKELY (!pads->priv->started))
2163     goto not_started;
2164   /* check if this pad is flushing */
2165   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2166               GST_COLLECT_PADS_STATE_FLUSHING)))
2167     goto flushing;
2168   /* pad was EOS, we can refuse this data */
2169   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2170               GST_COLLECT_PADS_STATE_EOS)))
2171     goto eos;
2172
2173   /* see if we need to clip */
2174   if (pads->priv->clip_func) {
2175     GstBuffer *outbuf = NULL;
2176     ret =
2177         pads->priv->clip_func (pads, data, buffer, &outbuf,
2178         pads->priv->clip_user_data);
2179     buffer = outbuf;
2180
2181     if (G_UNLIKELY (outbuf == NULL))
2182       goto clipped;
2183
2184     if (G_UNLIKELY (ret == GST_FLOW_EOS))
2185       goto eos;
2186     else if (G_UNLIKELY (ret != GST_FLOW_OK))
2187       goto error;
2188   }
2189
2190   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
2191       GST_DEBUG_PAD_NAME (pad));
2192
2193   /* One more pad has data queued */
2194   if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
2195     pads->priv->queuedpads++;
2196   buffer_p = &data->buffer;
2197   gst_buffer_replace (buffer_p, buffer);
2198
2199   /* update segment last position if in TIME */
2200   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2201     GstClockTime timestamp;
2202
2203     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
2204
2205     if (GST_CLOCK_TIME_IS_VALID (timestamp))
2206       data->segment.position = timestamp;
2207   }
2208
2209   /* While we have data queued on this pad try to collect stuff */
2210   do {
2211     /* Check if our collected condition is matched and call the collected
2212      * function if it is */
2213     ret = gst_collect_pads_check_collected (pads);
2214     /* when an error occurs, we want to report this back to the caller ASAP
2215      * without having to block if the buffer was not popped */
2216     if (G_UNLIKELY (ret != GST_FLOW_OK))
2217       goto error;
2218
2219     /* data was consumed, we can exit and accept new data */
2220     if (data->buffer == NULL)
2221       break;
2222
2223     /* Having the _INIT here means we don't care about any broadcast up to here
2224      * (most of which occur with STREAM_LOCK held, so could not have happened
2225      * anyway).  We do care about e.g. a remove initiated broadcast as of this
2226      * point.  Putting it here also makes this thread ignores any evt it raised
2227      * itself (as is a usual WAIT semantic).
2228      */
2229     GST_COLLECT_PADS_EVT_INIT (cookie);
2230
2231     /* pad could be removed and re-added */
2232     unref_data (data);
2233     GST_OBJECT_LOCK (pad);
2234     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2235       goto pad_removed;
2236     ref_data (data);
2237     GST_OBJECT_UNLOCK (pad);
2238
2239     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2240         GST_DEBUG_PAD_NAME (pad));
2241
2242     /* wait to be collected, this must happen from another thread triggered
2243      * by the _chain function of another pad. We release the lock so we
2244      * can get stopped or flushed as well. We can however not get EOS
2245      * because we still hold the STREAM_LOCK.
2246      */
2247     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2248     GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2249     GST_COLLECT_PADS_STREAM_LOCK (pads);
2250
2251     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2252
2253     /* after a signal, we could be stopped */
2254     if (G_UNLIKELY (!pads->priv->started))
2255       goto not_started;
2256     /* check if this pad is flushing */
2257     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2258                 GST_COLLECT_PADS_STATE_FLUSHING)))
2259       goto flushing;
2260   }
2261   while (data->buffer != NULL);
2262
2263 unlock_done:
2264   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2265   /* data is definitely NULL if pad_removed goto was run. */
2266   if (data)
2267     unref_data (data);
2268   if (buffer)
2269     gst_buffer_unref (buffer);
2270   return ret;
2271
2272 pad_removed:
2273   {
2274     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2275     GST_OBJECT_UNLOCK (pad);
2276     ret = GST_FLOW_NOT_LINKED;
2277     goto unlock_done;
2278   }
2279   /* ERRORS */
2280 no_data:
2281   {
2282     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2283     GST_OBJECT_UNLOCK (pad);
2284     gst_buffer_unref (buffer);
2285     return GST_FLOW_NOT_LINKED;
2286   }
2287 not_started:
2288   {
2289     GST_DEBUG ("not started");
2290     gst_collect_pads_clear (pads, data);
2291     ret = GST_FLOW_FLUSHING;
2292     goto unlock_done;
2293   }
2294 flushing:
2295   {
2296     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2297     gst_collect_pads_clear (pads, data);
2298     ret = GST_FLOW_FLUSHING;
2299     goto unlock_done;
2300   }
2301 eos:
2302   {
2303     /* we should not post an error for this, just inform upstream that
2304      * we don't expect anything anymore */
2305     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2306     ret = GST_FLOW_EOS;
2307     goto unlock_done;
2308   }
2309 clipped:
2310   {
2311     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2312     ret = GST_FLOW_OK;
2313     goto unlock_done;
2314   }
2315 error:
2316   {
2317     /* we print the error, the element should post a reasonable error
2318      * message for fatal errors */
2319     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2320     gst_collect_pads_clear (pads, data);
2321     goto unlock_done;
2322   }
2323 }