Add few missing allow-none annotation
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * gstcollectpads.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:gstcollectpads
25  * @short_description: manages a set of pads that operate in collect mode
26  * @see_also:
27  *
28  * Manages a set of pads that operate in collect mode. This means that control
29  * is given to the manager of this object when all pads have data.
30  * <itemizedlist>
31  *   <listitem><para>
32  *     Collectpads are created with gst_collect_pads_new(). A callback should then
33  *     be installed with gst_collect_pads_set_function ().
34  *   </para></listitem>
35  *   <listitem><para>
36  *     Pads are added to the collection with gst_collect_pads_add_pad()/
37  *     gst_collect_pads_remove_pad(). The pad
38  *     has to be a sinkpad. The chain and event functions of the pad are
39  *     overridden. The element_private of the pad is used to store
40  *     private information for the collectpads.
41  *   </para></listitem>
42  *   <listitem><para>
43  *     For each pad, data is queued in the _chain function or by
44  *     performing a pull_range.
45  *   </para></listitem>
46  *   <listitem><para>
47  *     When data is queued on all pads in waiting mode, the callback function is called.
48  *   </para></listitem>
49  *   <listitem><para>
50  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
51  *     One can peek at the data with the gst_collect_pads_peek() function.
52  *     These functions will return NULL if the pad received an EOS event. When all
53  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
54  *     event itself.
55  *   </para></listitem>
56  *   <listitem><para>
57  *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
58  *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
59  *   </para></listitem>
60  *   <listitem><para>
61  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
62  *     their state change functions to start and stop the processing of the collectpads.
63  *     The gst_collect_pads_stop() call should be called before calling the parent
64  *     element state change function in the PAUSED_TO_READY state change to ensure
65  *     no pad is blocked and the element can finish streaming.
66  *   </para></listitem>
67  *   <listitem><para>
68  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
69  *     elements that start a #GstTask to drive the collect_pads. This feature is however
70  *     not yet implemented.
71  *   </para></listitem>
72  *   <listitem><para>
73  *     gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
74  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
75  *     Thus these pads may but need not have data when the callback is called.
76  *     All pads are in waiting mode by default.
77  *   </para></listitem>
78  * </itemizedlist>
79  *
80  * Last reviewed on 2011-10-28 (0.10.36)
81  */
82
83 #ifdef HAVE_CONFIG_H
84 #  include "config.h"
85 #endif
86
87 #include <gst/gst_private.h>
88
89 #include "gstcollectpads.h"
90
91 #include "../../../gst/glib-compat-private.h"
92
93 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
94 #define GST_CAT_DEFAULT collect_pads_debug
95
96 #define parent_class gst_collect_pads_parent_class
97 G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
98
99 struct _GstCollectDataPrivate
100 {
101   /* refcounting for struct, and destroy callback */
102   GstCollectDataDestroyNotify destroy_notify;
103   gint refcount;
104 };
105
106 struct _GstCollectPadsPrivate
107 {
108   /* with LOCK and/or STREAM_LOCK */
109   gboolean started;
110
111   /* with STREAM_LOCK */
112   guint32 cookie;               /* @data list cookie */
113   guint numpads;                /* number of pads in @data */
114   guint queuedpads;             /* number of pads with a buffer */
115   guint eospads;                /* number of pads that are EOS */
116   GstClockTime earliest_time;   /* Current earliest time */
117   GstCollectData *earliest_data;        /* Pad data for current earliest time */
118
119   /* with LOCK */
120   GSList *pad_list;             /* updated pad list */
121   guint32 pad_cookie;           /* updated cookie */
122
123   GstCollectPadsFunction func;  /* function and user_data for callback */
124   gpointer user_data;
125   GstCollectPadsBufferFunction buffer_func;     /* function and user_data for buffer callback */
126   gpointer buffer_user_data;
127   GstCollectPadsCompareFunction compare_func;
128   gpointer compare_user_data;
129   GstCollectPadsEventFunction event_func;       /* function and data for event callback */
130   gpointer event_user_data;
131   GstCollectPadsQueryFunction query_func;
132   gpointer query_user_data;
133   GstCollectPadsClipFunction clip_func;
134   gpointer clip_user_data;
135
136   /* no other lock needed */
137   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
138   GCond evt_cond;
139   guint32 evt_cookie;
140 };
141
142 static void gst_collect_pads_clear (GstCollectPads * pads,
143     GstCollectData * data);
144 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
145     GstBuffer * buffer);
146 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
147     GstEvent * event);
148 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
149     GstQuery * query);
150 static void gst_collect_pads_finalize (GObject * object);
151 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
152     pads, gpointer user_data);
153 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
154     GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
155     GstClockTime timestamp2, gpointer user_data);
156 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
157 static void ref_data (GstCollectData * data);
158 static void unref_data (GstCollectData * data);
159
160 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
161     pads, GstCollectData * data, GstEvent * event, gpointer user_data);
162 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
163     pads, GstCollectData * data, GstQuery * query, gpointer user_data);
164
165
166 /* Some properties are protected by LOCK, others by STREAM_LOCK
167  * However, manipulating either of these partitions may require
168  * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
169  * Alternative implementations are possible, e.g. some low-level re-implementing
170  * of the 2 above locks to drop both of them atomically when going into _WAIT.
171  */
172 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
173 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
174 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START {    \
175   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
176   /* should work unless a lot of event'ing and thread starvation */\
177   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
178     g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
179         GST_COLLECT_PADS_GET_EVT_LOCK (pads));                    \
180   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
181   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
182 } G_STMT_END
183 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
184   GTimeVal __tv; \
185   \
186   g_get_current_time (&tv); \
187   g_time_val_add (&tv, timeout); \
188   \
189   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
190   /* should work unless a lot of event'ing and thread starvation */\
191   while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
192     g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
193         GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv);                    \
194   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
195   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
196 } G_STMT_END
197 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START {       \
198   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
199   /* never mind wrap-around */                                     \
200   ++(((GstCollectPads *) pads)->priv->evt_cookie);                      \
201   g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads));        \
202   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
203 } G_STMT_END
204 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START {          \
205   g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
206   cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
207   g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
208 } G_STMT_END
209
210 static void
211 gst_collect_pads_class_init (GstCollectPadsClass * klass)
212 {
213   GObjectClass *gobject_class = (GObjectClass *) klass;
214
215   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
216
217   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
218       "GstCollectPads");
219
220   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
221 }
222
223 static void
224 gst_collect_pads_init (GstCollectPads * pads)
225 {
226   pads->priv =
227       G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS,
228       GstCollectPadsPrivate);
229
230   pads->data = NULL;
231   pads->priv->cookie = 0;
232   pads->priv->numpads = 0;
233   pads->priv->queuedpads = 0;
234   pads->priv->eospads = 0;
235   pads->priv->started = FALSE;
236
237   g_rec_mutex_init (&pads->stream_lock);
238
239   pads->priv->func = gst_collect_pads_default_collected;
240   pads->priv->user_data = NULL;
241   pads->priv->event_func = NULL;
242   pads->priv->event_user_data = NULL;
243
244   /* members for default muxing */
245   pads->priv->buffer_func = NULL;
246   pads->priv->buffer_user_data = NULL;
247   pads->priv->compare_func = gst_collect_pads_default_compare_func;
248   pads->priv->compare_user_data = NULL;
249   pads->priv->earliest_data = NULL;
250   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
251
252   pads->priv->event_func = gst_collect_pads_event_default_internal;
253   pads->priv->query_func = gst_collect_pads_query_default_internal;
254
255   /* members to manage the pad list */
256   pads->priv->pad_cookie = 0;
257   pads->priv->pad_list = NULL;
258
259   /* members for event */
260   g_mutex_init (&pads->priv->evt_lock);
261   g_cond_init (&pads->priv->evt_cond);
262   pads->priv->evt_cookie = 0;
263 }
264
265 static void
266 gst_collect_pads_finalize (GObject * object)
267 {
268   GstCollectPads *pads = GST_COLLECT_PADS (object);
269
270   GST_DEBUG_OBJECT (object, "finalize");
271
272   g_rec_mutex_clear (&pads->stream_lock);
273
274   g_cond_clear (&pads->priv->evt_cond);
275   g_mutex_clear (&pads->priv->evt_lock);
276
277   /* Remove pads and free pads list */
278   g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
279   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
280   g_slist_free (pads->data);
281   g_slist_free (pads->priv->pad_list);
282
283   G_OBJECT_CLASS (parent_class)->finalize (object);
284 }
285
286 /**
287  * gst_collect_pads_new:
288  *
289  * Create a new instance of #GstCollectPads.
290  *
291  * MT safe.
292  *
293  * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
294  */
295 GstCollectPads *
296 gst_collect_pads_new (void)
297 {
298   GstCollectPads *newcoll;
299
300   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
301
302   return newcoll;
303 }
304
305 /* Must be called with GstObject lock! */
306 static void
307 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
308     GstCollectPadsBufferFunction func, gpointer user_data)
309 {
310   pads->priv->buffer_func = func;
311   pads->priv->buffer_user_data = user_data;
312 }
313
314 /**
315  * gst_collect_pads_set_buffer_function:
316  * @pads: the collectpads to use
317  * @func: the function to set
318  * @user_data: (closure): user data passed to the function
319  *
320  * Set the callback function and user data that will be called with
321  * the oldest buffer when all pads have been collected, or NULL on EOS.
322  * If a buffer is passed, the callback owns a reference and must unref
323  * it.
324  *
325  * MT safe.
326  */
327 void
328 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
329     GstCollectPadsBufferFunction func, gpointer user_data)
330 {
331   g_return_if_fail (pads != NULL);
332   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
333
334   GST_OBJECT_LOCK (pads);
335   gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
336   GST_OBJECT_UNLOCK (pads);
337 }
338
339 /**
340  * gst_collect_pads_set_compare_function:
341  * @pads: the pads to use
342  * @func: the function to set
343  * @user_data: (closure): user data passed to the function
344  *
345  * Set the timestamp comparison function.
346  *
347  * MT safe.
348  */
349 /* NOTE allowing to change comparison seems not advisable;
350 no known use-case, and collaboration with default algorithm is unpredictable.
351 If custom compairing/operation is needed, just use a collect function of
352 your own */
353 void
354 gst_collect_pads_set_compare_function (GstCollectPads * pads,
355     GstCollectPadsCompareFunction func, gpointer user_data)
356 {
357   g_return_if_fail (pads != NULL);
358   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
359
360   GST_OBJECT_LOCK (pads);
361   pads->priv->compare_func = func;
362   pads->priv->compare_user_data = user_data;
363   GST_OBJECT_UNLOCK (pads);
364 }
365
366 /**
367  * gst_collect_pads_set_function:
368  * @pads: the collectpads to use
369  * @func: the function to set
370  * @user_data: user data passed to the function
371  *
372  * CollectPads provides a default collection algorithm that will determine
373  * the oldest buffer available on all of its pads, and then delegate
374  * to a configured callback.
375  * However, if circumstances are more complicated and/or more control
376  * is desired, this sets a callback that will be invoked instead when
377  * all the pads added to the collection have buffers queued.
378  * Evidently, this callback is not compatible with
379  * gst_collect_pads_set_buffer_function() callback.
380  * If this callback is set, the former will be unset.
381  *
382  * MT safe.
383  */
384 void
385 gst_collect_pads_set_function (GstCollectPads * pads,
386     GstCollectPadsFunction func, gpointer user_data)
387 {
388   g_return_if_fail (pads != NULL);
389   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
390
391   GST_OBJECT_LOCK (pads);
392   pads->priv->func = func;
393   pads->priv->user_data = user_data;
394   gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
395   GST_OBJECT_UNLOCK (pads);
396 }
397
398 static void
399 ref_data (GstCollectData * data)
400 {
401   g_assert (data != NULL);
402
403   g_atomic_int_inc (&(data->priv->refcount));
404 }
405
406 static void
407 unref_data (GstCollectData * data)
408 {
409   g_assert (data != NULL);
410   g_assert (data->priv->refcount > 0);
411
412   if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
413     return;
414
415   if (data->priv->destroy_notify)
416     data->priv->destroy_notify (data);
417
418   g_object_unref (data->pad);
419   if (data->buffer) {
420     gst_buffer_unref (data->buffer);
421   }
422   g_free (data->priv);
423   g_free (data);
424 }
425
426 /**
427  * gst_collect_pads_set_event_function:
428  * @pads: the collectpads to use
429  * @func: the function to set
430  * @user_data: user data passed to the function
431  *
432  * Set the event callback function and user data that will be called when
433  * collectpads has received an event originating from one of the collected
434  * pads.  If the event being processed is a serialized one, this callback is
435  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
436  * held when calling a number of CollectPads functions, it should be acquired
437  * if so (unusually) needed.
438  *
439  * MT safe.
440  */
441 void
442 gst_collect_pads_set_event_function (GstCollectPads * pads,
443     GstCollectPadsEventFunction func, gpointer user_data)
444 {
445   g_return_if_fail (pads != NULL);
446   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
447
448   GST_OBJECT_LOCK (pads);
449   pads->priv->event_func = func;
450   pads->priv->event_user_data = user_data;
451   GST_OBJECT_UNLOCK (pads);
452 }
453
454 /**
455  * gst_collect_pads_set_query_function:
456  * @pads: the collectpads to use
457  * @func: the function to set
458  * @user_data: user data passed to the function
459  *
460  * Set the query callback function and user data that will be called after
461  * collectpads has received a query originating from one of the collected
462  * pads.  If the query being processed is a serialized one, this callback is
463  * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
464  * held when calling a number of CollectPads functions, it should be acquired
465  * if so (unusually) needed.
466  *
467  * MT safe.
468  */
469 void
470 gst_collect_pads_set_query_function (GstCollectPads * pads,
471     GstCollectPadsQueryFunction func, gpointer user_data)
472 {
473   g_return_if_fail (pads != NULL);
474   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
475
476   GST_OBJECT_LOCK (pads);
477   pads->priv->query_func = func;
478   pads->priv->query_user_data = user_data;
479   GST_OBJECT_UNLOCK (pads);
480 }
481
482 /**
483 * gst_collect_pads_clip_running_time:
484 * @pads: the collectpads to use
485 * @cdata: collect data of corresponding pad
486 * @buf: buffer being clipped
487 * @outbuf: (allow-none): output buffer with running time, or NULL if clipped
488 * @user_data: user data (unused)
489 *
490 * Convenience clipping function that converts incoming buffer's timestamp
491 * to running time, or clips the buffer if outside configured segment.
492 */
493 GstFlowReturn
494 gst_collect_pads_clip_running_time (GstCollectPads * pads,
495     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
496     gpointer user_data)
497 {
498   GstClockTime time;
499
500   *outbuf = buf;
501   time = GST_BUFFER_PTS (buf);
502
503   /* invalid left alone and passed */
504   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
505     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
506     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
507       GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
508       gst_buffer_unref (buf);
509       *outbuf = NULL;
510     } else {
511       GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
512           GST_TIME_FORMAT " running time",
513           GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
514       *outbuf = gst_buffer_make_writable (buf);
515       GST_BUFFER_PTS (*outbuf) = time;
516       GST_BUFFER_DTS (*outbuf) = gst_segment_to_running_time (&cdata->segment,
517           GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf));
518     }
519   }
520
521   return GST_FLOW_OK;
522 }
523
524  /**
525  * gst_collect_pads_set_clip_function:
526  * @pads: the collectpads to use
527  * @clipfunc: clip function to install
528  * @user_data: user data to pass to @clip_func
529  *
530  * Install a clipping function that is called right after a buffer is received
531  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
532  */
533 void
534 gst_collect_pads_set_clip_function (GstCollectPads * pads,
535     GstCollectPadsClipFunction clipfunc, gpointer user_data)
536 {
537   g_return_if_fail (pads != NULL);
538   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
539
540   pads->priv->clip_func = clipfunc;
541   pads->priv->clip_user_data = user_data;
542 }
543
544 /**
545  * gst_collect_pads_add_pad:
546  * @pads: the collectpads to use
547  * @pad: (transfer none): the pad to add
548  * @size: the size of the returned #GstCollectData structure
549  * @destroy_notify: function to be called before the returned #GstCollectData
550  * structure is freed
551  * @lock: whether to lock this pad in usual waiting state
552  *
553  * Add a pad to the collection of collect pads. The pad has to be
554  * a sinkpad. The refcount of the pad is incremented. Use
555  * gst_collect_pads_remove_pad() to remove the pad from the collection
556  * again.
557  *
558  * You specify a size for the returned #GstCollectData structure
559  * so that you can use it to store additional information.
560  *
561  * You can also specify a #GstCollectDataDestroyNotify that will be called
562  * just before the #GstCollectData structure is freed. It is passed the
563  * pointer to the structure and should free any custom memory and resources
564  * allocated for it.
565  *
566  * Keeping a pad locked in waiting state is only relevant when using
567  * the default collection algorithm (providing the oldest buffer).
568  * It ensures a buffer must be available on this pad for a collection
569  * to take place.  This is of typical use to a muxer element where
570  * non-subtitle streams should always be in waiting state,
571  * e.g. to assure that caps information is available on all these streams
572  * when initial headers have to be written.
573  *
574  * The pad will be automatically activated in push mode when @pads is
575  * started.
576  *
577  * MT safe.
578  *
579  * Returns: a new #GstCollectData to identify the new pad. Or NULL
580  *   if wrong parameters are supplied.
581  */
582 GstCollectData *
583 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
584     GstCollectDataDestroyNotify destroy_notify, gboolean lock)
585 {
586   GstCollectData *data;
587
588   g_return_val_if_fail (pads != NULL, NULL);
589   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
590   g_return_val_if_fail (pad != NULL, NULL);
591   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
592   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
593
594   GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
595
596   data = g_malloc0 (size);
597   data->priv = g_new0 (GstCollectDataPrivate, 1);
598   data->collect = pads;
599   data->pad = gst_object_ref (pad);
600   data->buffer = NULL;
601   data->pos = 0;
602   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
603   data->state = GST_COLLECT_PADS_STATE_WAITING;
604   data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
605   data->priv->refcount = 1;
606   data->priv->destroy_notify = destroy_notify;
607
608   GST_OBJECT_LOCK (pads);
609   GST_OBJECT_LOCK (pad);
610   gst_pad_set_element_private (pad, data);
611   GST_OBJECT_UNLOCK (pad);
612   pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
613   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
614   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
615   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
616   /* backward compat, also add to data if stopped, so that the element already
617    * has this in the public data list before going PAUSED (typically)
618    * this can only be done when we are stopped because we don't take the
619    * STREAM_LOCK to protect the pads->data list. */
620   if (!pads->priv->started) {
621     pads->data = g_slist_append (pads->data, data);
622     ref_data (data);
623   }
624   /* activate the pad when needed */
625   if (pads->priv->started)
626     gst_pad_set_active (pad, TRUE);
627   pads->priv->pad_cookie++;
628   GST_OBJECT_UNLOCK (pads);
629
630   return data;
631 }
632
633 static gint
634 find_pad (GstCollectData * data, GstPad * pad)
635 {
636   if (data->pad == pad)
637     return 0;
638   return 1;
639 }
640
641 /**
642  * gst_collect_pads_remove_pad:
643  * @pads: the collectpads to use
644  * @pad: (transfer none): the pad to remove
645  *
646  * Remove a pad from the collection of collect pads. This function will also
647  * free the #GstCollectData and all the resources that were allocated with
648  * gst_collect_pads_add_pad().
649  *
650  * The pad will be deactivated automatically when @pads is stopped.
651  *
652  * MT safe.
653  *
654  * Returns: %TRUE if the pad could be removed.
655  */
656 gboolean
657 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
658 {
659   GstCollectData *data;
660   GSList *list;
661
662   g_return_val_if_fail (pads != NULL, FALSE);
663   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
664   g_return_val_if_fail (pad != NULL, FALSE);
665   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
666
667   GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
668
669   GST_OBJECT_LOCK (pads);
670   list =
671       g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
672   if (!list)
673     goto unknown_pad;
674
675   data = (GstCollectData *) list->data;
676
677   GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
678       data);
679
680   /* clear the stuff we configured */
681   gst_pad_set_chain_function (pad, NULL);
682   gst_pad_set_event_function (pad, NULL);
683   GST_OBJECT_LOCK (pad);
684   gst_pad_set_element_private (pad, NULL);
685   GST_OBJECT_UNLOCK (pad);
686
687   /* backward compat, also remove from data if stopped, note that this function
688    * can only be called when we are stopped because we don't take the
689    * STREAM_LOCK to protect the pads->data list. */
690   if (!pads->priv->started) {
691     GSList *dlist;
692
693     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
694     if (dlist) {
695       GstCollectData *pdata = dlist->data;
696
697       pads->data = g_slist_delete_link (pads->data, dlist);
698       unref_data (pdata);
699     }
700   }
701   /* remove from the pad list */
702   pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
703   pads->priv->pad_cookie++;
704
705   /* signal waiters because something changed */
706   GST_COLLECT_PADS_EVT_BROADCAST (pads);
707
708   /* deactivate the pad when needed */
709   if (!pads->priv->started)
710     gst_pad_set_active (pad, FALSE);
711
712   /* clean and free the collect data */
713   unref_data (data);
714
715   GST_OBJECT_UNLOCK (pads);
716
717   return TRUE;
718
719 unknown_pad:
720   {
721     GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
722         GST_DEBUG_PAD_NAME (pad));
723     GST_OBJECT_UNLOCK (pads);
724     return FALSE;
725   }
726 }
727
728 /*
729  * Must be called with STREAM_LOCK.
730  */
731 static void
732 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
733     gboolean flushing)
734 {
735   GSList *walk = NULL;
736
737   /* Update the pads flushing flag */
738   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
739     GstCollectData *cdata = walk->data;
740
741     if (GST_IS_PAD (cdata->pad)) {
742       GST_OBJECT_LOCK (cdata->pad);
743       if (flushing)
744         GST_PAD_SET_FLUSHING (cdata->pad);
745       else
746         GST_PAD_UNSET_FLUSHING (cdata->pad);
747       if (flushing)
748         GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
749       else
750         GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
751       gst_collect_pads_clear (pads, cdata);
752       GST_OBJECT_UNLOCK (cdata->pad);
753     }
754   }
755
756   /* inform _chain of changes */
757   GST_COLLECT_PADS_EVT_BROADCAST (pads);
758 }
759
760 /**
761  * gst_collect_pads_set_flushing:
762  * @pads: the collectpads to use
763  * @flushing: desired state of the pads
764  *
765  * Change the flushing state of all the pads in the collection. No pad
766  * is able to accept anymore data when @flushing is %TRUE. Calling this
767  * function with @flushing %FALSE makes @pads accept data again.
768  * Caller must ensure that downstream streaming (thread) is not blocked,
769  * e.g. by sending a FLUSH_START downstream.
770  *
771  * MT safe.
772  */
773 void
774 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
775 {
776   g_return_if_fail (pads != NULL);
777   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
778
779   /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
780   GST_COLLECT_PADS_STREAM_LOCK (pads);
781   gst_collect_pads_set_flushing_unlocked (pads, flushing);
782   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
783 }
784
785 /**
786  * gst_collect_pads_start:
787  * @pads: the collectpads to use
788  *
789  * Starts the processing of data in the collect_pads.
790  *
791  * MT safe.
792  */
793 void
794 gst_collect_pads_start (GstCollectPads * pads)
795 {
796   GSList *collected;
797
798   g_return_if_fail (pads != NULL);
799   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
800
801   GST_DEBUG_OBJECT (pads, "starting collect pads");
802
803   /* make sure stop and collect cannot be called anymore */
804   GST_COLLECT_PADS_STREAM_LOCK (pads);
805
806   /* make pads streamable */
807   GST_OBJECT_LOCK (pads);
808
809   /* loop over the master pad list and reset the segment */
810   collected = pads->priv->pad_list;
811   for (; collected; collected = g_slist_next (collected)) {
812     GstCollectData *data;
813
814     data = collected->data;
815     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
816   }
817
818   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
819
820   /* Start collect pads */
821   pads->priv->started = TRUE;
822   GST_OBJECT_UNLOCK (pads);
823   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
824 }
825
826 /**
827  * gst_collect_pads_stop:
828  * @pads: the collectpads to use
829  *
830  * Stops the processing of data in the collect_pads. this function
831  * will also unblock any blocking operations.
832  *
833  * MT safe.
834  */
835 void
836 gst_collect_pads_stop (GstCollectPads * pads)
837 {
838   GSList *collected;
839
840   g_return_if_fail (pads != NULL);
841   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
842
843   GST_DEBUG_OBJECT (pads, "stopping collect pads");
844
845   /* make sure collect and start cannot be called anymore */
846   GST_COLLECT_PADS_STREAM_LOCK (pads);
847
848   /* make pads not accept data anymore */
849   GST_OBJECT_LOCK (pads);
850   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
851
852   /* Stop collect pads */
853   pads->priv->started = FALSE;
854   pads->priv->eospads = 0;
855   pads->priv->queuedpads = 0;
856
857   /* loop over the master pad list and flush buffers */
858   collected = pads->priv->pad_list;
859   for (; collected; collected = g_slist_next (collected)) {
860     GstCollectData *data;
861     GstBuffer **buffer_p;
862
863     data = collected->data;
864     if (data->buffer) {
865       buffer_p = &data->buffer;
866       gst_buffer_replace (buffer_p, NULL);
867       data->pos = 0;
868     }
869     GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
870   }
871
872   if (pads->priv->earliest_data)
873     unref_data (pads->priv->earliest_data);
874   pads->priv->earliest_data = NULL;
875   pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
876
877   GST_OBJECT_UNLOCK (pads);
878   /* Wake them up so they can end the chain functions. */
879   GST_COLLECT_PADS_EVT_BROADCAST (pads);
880
881   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
882 }
883
884 /**
885  * gst_collect_pads_peek:
886  * @pads: the collectpads to peek
887  * @data: the data to use
888  *
889  * Peek at the buffer currently queued in @data. This function
890  * should be called with the @pads STREAM_LOCK held, such as in the callback
891  * handler.
892  *
893  * MT safe.
894  *
895  * Returns: The buffer in @data or NULL if no buffer is queued.
896  *  should unref the buffer after usage.
897  */
898 GstBuffer *
899 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
900 {
901   GstBuffer *result;
902
903   g_return_val_if_fail (pads != NULL, NULL);
904   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
905   g_return_val_if_fail (data != NULL, NULL);
906
907   if ((result = data->buffer))
908     gst_buffer_ref (result);
909
910   GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
911       GST_DEBUG_PAD_NAME (data->pad), result);
912
913   return result;
914 }
915
916 /**
917  * gst_collect_pads_pop:
918  * @pads: the collectpads to pop
919  * @data: the data to use
920  *
921  * Pop the buffer currently queued in @data. This function
922  * should be called with the @pads STREAM_LOCK held, such as in the callback
923  * handler.
924  *
925  * MT safe.
926  *
927  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
928  *   queued. You should unref the buffer after usage.
929  */
930 GstBuffer *
931 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
932 {
933   GstBuffer *result;
934
935   g_return_val_if_fail (pads != NULL, NULL);
936   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
937   g_return_val_if_fail (data != NULL, NULL);
938
939   if ((result = data->buffer)) {
940     data->buffer = NULL;
941     data->pos = 0;
942     /* one less pad with queued data now */
943     if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
944       pads->priv->queuedpads--;
945   }
946
947   GST_COLLECT_PADS_EVT_BROADCAST (pads);
948
949   GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
950       GST_DEBUG_PAD_NAME (data->pad), result);
951
952   return result;
953 }
954
955 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
956  * held */
957 static void
958 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
959 {
960   GstBuffer *buf;
961
962   if ((buf = gst_collect_pads_pop (pads, data)))
963     gst_buffer_unref (buf);
964 }
965
966 /**
967  * gst_collect_pads_available:
968  * @pads: the collectpads to query
969  *
970  * Query how much bytes can be read from each queued buffer. This means
971  * that the result of this call is the maximum number of bytes that can
972  * be read from each of the pads.
973  *
974  * This function should be called with @pads STREAM_LOCK held, such as
975  * in the callback.
976  *
977  * MT safe.
978  *
979  * Returns: The maximum number of bytes queued on all pads. This function
980  * returns 0 if a pad has no queued buffer.
981  */
982 /* we might pre-calculate this in some struct field,
983  * but would then have to maintain this in _chain and particularly _pop, etc,
984  * even if element is never interested in this information */
985 guint
986 gst_collect_pads_available (GstCollectPads * pads)
987 {
988   GSList *collected;
989   guint result = G_MAXUINT;
990
991   g_return_val_if_fail (pads != NULL, 0);
992   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
993
994   collected = pads->data;
995   for (; collected; collected = g_slist_next (collected)) {
996     GstCollectData *pdata;
997     GstBuffer *buffer;
998     gint size;
999
1000     pdata = (GstCollectData *) collected->data;
1001
1002     /* ignore pad with EOS */
1003     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1004                 GST_COLLECT_PADS_STATE_EOS))) {
1005       GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1006       continue;
1007     }
1008
1009     /* an empty buffer without EOS is weird when we get here.. */
1010     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1011       GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1012       goto not_filled;
1013     }
1014
1015     /* this is the size left of the buffer */
1016     size = gst_buffer_get_size (buffer) - pdata->pos;
1017     GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1018
1019     /* need to return the min of all available data */
1020     if (size < result)
1021       result = size;
1022   }
1023   /* nothing changed, all must be EOS then, return 0 */
1024   if (G_UNLIKELY (result == G_MAXUINT))
1025     result = 0;
1026
1027   return result;
1028
1029 not_filled:
1030   {
1031     return 0;
1032   }
1033 }
1034
1035 /**
1036  * gst_collect_pads_flush:
1037  * @pads: the collectpads to query
1038  * @data: the data to use
1039  * @size: the number of bytes to flush
1040  *
1041  * Flush @size bytes from the pad @data.
1042  *
1043  * This function should be called with @pads STREAM_LOCK held, such as
1044  * in the callback.
1045  *
1046  * MT safe.
1047  *
1048  * Returns: The number of bytes flushed This can be less than @size and
1049  * is 0 if the pad was end-of-stream.
1050  */
1051 guint
1052 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1053     guint size)
1054 {
1055   guint flushsize;
1056   gsize bsize;
1057   GstBuffer *buffer;
1058
1059   g_return_val_if_fail (pads != NULL, 0);
1060   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1061   g_return_val_if_fail (data != NULL, 0);
1062
1063   /* no buffer, must be EOS */
1064   if ((buffer = data->buffer) == NULL)
1065     return 0;
1066
1067   bsize = gst_buffer_get_size (buffer);
1068
1069   /* this is what we can flush at max */
1070   flushsize = MIN (size, bsize - data->pos);
1071
1072   data->pos += size;
1073
1074   if (data->pos >= bsize)
1075     /* _clear will also reset data->pos to 0 */
1076     gst_collect_pads_clear (pads, data);
1077
1078   return flushsize;
1079 }
1080
1081 /**
1082  * gst_collect_pads_read_buffer:
1083  * @pads: the collectpads to query
1084  * @data: the data to use
1085  * @size: the number of bytes to read
1086  *
1087  * Get a subbuffer of @size bytes from the given pad @data.
1088  *
1089  * This function should be called with @pads STREAM_LOCK held, such as in the
1090  * callback.
1091  *
1092  * MT safe.
1093  *
1094  * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
1095  * A return of NULL signals that the pad is end-of-stream.
1096  * Unref the buffer after use.
1097  */
1098 GstBuffer *
1099 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1100     guint size)
1101 {
1102   guint readsize;
1103   GstBuffer *buffer;
1104
1105   g_return_val_if_fail (pads != NULL, NULL);
1106   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1107   g_return_val_if_fail (data != NULL, NULL);
1108
1109   /* no buffer, must be EOS */
1110   if ((buffer = data->buffer) == NULL)
1111     return NULL;
1112
1113   readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
1114
1115   return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1116       readsize);
1117 }
1118
1119 /**
1120  * gst_collect_pads_take_buffer:
1121  * @pads: the collectpads to query
1122  * @data: the data to use
1123  * @size: the number of bytes to read
1124  *
1125  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1126  * of read bytes.
1127  *
1128  * This function should be called with @pads STREAM_LOCK held, such as in the
1129  * callback.
1130  *
1131  * MT safe.
1132  *
1133  * Returns: A sub buffer. The size of the buffer can be less that requested.
1134  * A return of NULL signals that the pad is end-of-stream.
1135  * Unref the buffer after use.
1136  */
1137 GstBuffer *
1138 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1139     guint size)
1140 {
1141   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1142
1143   if (buffer) {
1144     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1145   }
1146   return buffer;
1147 }
1148
1149 /**
1150  * gst_collect_pads_set_waiting:
1151  * @pads: the collectpads
1152  * @data: the data to use
1153  * @waiting: boolean indicating whether this pad should operate
1154  *           in waiting or non-waiting mode
1155  *
1156  * Sets a pad to waiting or non-waiting mode, if at least this pad
1157  * has not been created with locked waiting state,
1158  * in which case nothing happens.
1159  *
1160  * This function should be called with @pads STREAM_LOCK held, such as
1161  * in the callback.
1162  *
1163  * MT safe.
1164  */
1165 void
1166 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1167     gboolean waiting)
1168 {
1169   g_return_if_fail (pads != NULL);
1170   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1171   g_return_if_fail (data != NULL);
1172
1173   GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1174       GST_PAD_NAME (data->pad), waiting,
1175       GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1176
1177   /* Do something only on a change and if not locked */
1178   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1179       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1180           ! !waiting)) {
1181     /* Set waiting state for this pad */
1182     if (waiting)
1183       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1184     else
1185       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1186     /* Update number of queued pads if needed */
1187     if (!data->buffer &&
1188         !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1189       if (waiting)
1190         pads->priv->queuedpads--;
1191       else
1192         pads->priv->queuedpads++;
1193     }
1194
1195     /* signal waiters because something changed */
1196     GST_COLLECT_PADS_EVT_BROADCAST (pads);
1197   }
1198 }
1199
1200 /* see if pads were added or removed and update our stats. Any pad
1201  * added after releasing the LOCK will get collected in the next
1202  * round.
1203  *
1204  * We can do a quick check by checking the cookies, that get changed
1205  * whenever the pad list is updated.
1206  *
1207  * Must be called with STREAM_LOCK.
1208  */
1209 static void
1210 gst_collect_pads_check_pads (GstCollectPads * pads)
1211 {
1212   /* the master list and cookie are protected with LOCK */
1213   GST_OBJECT_LOCK (pads);
1214   if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1215     GSList *collected;
1216
1217     /* clear list and stats */
1218     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1219     g_slist_free (pads->data);
1220     pads->data = NULL;
1221     pads->priv->numpads = 0;
1222     pads->priv->queuedpads = 0;
1223     pads->priv->eospads = 0;
1224     if (pads->priv->earliest_data)
1225       unref_data (pads->priv->earliest_data);
1226     pads->priv->earliest_data = NULL;
1227     pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1228
1229     /* loop over the master pad list */
1230     collected = pads->priv->pad_list;
1231     for (; collected; collected = g_slist_next (collected)) {
1232       GstCollectData *data;
1233
1234       /* update the stats */
1235       pads->priv->numpads++;
1236       data = collected->data;
1237       if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1238         pads->priv->eospads++;
1239       else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1240               GST_COLLECT_PADS_STATE_WAITING))
1241         pads->priv->queuedpads++;
1242
1243       /* add to the list of pads to collect */
1244       ref_data (data);
1245       /* preserve order of adding/requesting pads */
1246       pads->data = g_slist_append (pads->data, data);
1247     }
1248     /* and update the cookie */
1249     pads->priv->cookie = pads->priv->pad_cookie;
1250   }
1251   GST_OBJECT_UNLOCK (pads);
1252 }
1253
1254 /* checks if all the pads are collected and call the collectfunction
1255  *
1256  * Should be called with STREAM_LOCK.
1257  *
1258  * Returns: The #GstFlowReturn of collection.
1259  */
1260 static GstFlowReturn
1261 gst_collect_pads_check_collected (GstCollectPads * pads)
1262 {
1263   GstFlowReturn flow_ret = GST_FLOW_OK;
1264   GstCollectPadsFunction func;
1265   gpointer user_data;
1266
1267   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1268
1269   GST_OBJECT_LOCK (pads);
1270   func = pads->priv->func;
1271   user_data = pads->priv->user_data;
1272   GST_OBJECT_UNLOCK (pads);
1273
1274   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1275
1276   /* check for new pads, update stats etc.. */
1277   gst_collect_pads_check_pads (pads);
1278
1279   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1280     /* If all our pads are EOS just collect once to let the element
1281      * do its final EOS handling. */
1282     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1283         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1284
1285     flow_ret = func (pads, user_data);
1286   } else {
1287     gboolean collected = FALSE;
1288
1289     /* We call the collected function as long as our condition matches. */
1290     while (((pads->priv->queuedpads + pads->priv->eospads) >=
1291             pads->priv->numpads)) {
1292       GST_DEBUG_OBJECT (pads,
1293           "All active pads (%d + %d >= %d) have data, " "calling %s",
1294           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1295           GST_DEBUG_FUNCPTR_NAME (func));
1296
1297       flow_ret = func (pads, user_data);
1298       collected = TRUE;
1299
1300       /* break on error */
1301       if (flow_ret != GST_FLOW_OK)
1302         break;
1303       /* Don't keep looping after telling the element EOS or flushing */
1304       if (pads->priv->queuedpads == 0)
1305         break;
1306     }
1307     if (!collected)
1308       GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1309           pads->priv->numpads);
1310   }
1311   return flow_ret;
1312 }
1313
1314
1315 /* General overview:
1316  * - only pad with a buffer can determine earliest_data (and earliest_time)
1317  * - only segment info determines (non-)waiting state
1318  * - ? perhaps use _stream_time for comparison
1319  *   (which muxers might have use as well ?)
1320  */
1321
1322 /*
1323  * Function to recalculate the waiting state of all pads.
1324  *
1325  * Must be called with STREAM_LOCK.
1326  *
1327  * Returns TRUE if a pad was set to waiting
1328  * (from non-waiting state).
1329  */
1330 static gboolean
1331 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1332 {
1333   GSList *collected;
1334   gboolean result = FALSE;
1335
1336   /* If earliest time is not known, there is nothing to do. */
1337   if (pads->priv->earliest_data == NULL)
1338     return FALSE;
1339
1340   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1341     GstCollectData *data = (GstCollectData *) collected->data;
1342     int cmp_res;
1343     GstClockTime comp_time;
1344
1345     /* check if pad has a segment */
1346     if (data->segment.format == GST_FORMAT_UNDEFINED) {
1347       GST_WARNING_OBJECT (pads,
1348           "GstCollectPads has no time segment, assuming 0 based.");
1349       gst_segment_init (&data->segment, GST_FORMAT_TIME);
1350       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1351     }
1352
1353     /* check segment format */
1354     if (data->segment.format != GST_FORMAT_TIME) {
1355       GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1356       continue;
1357     }
1358
1359     /* check if the waiting state should be changed */
1360     comp_time = data->segment.position;
1361     cmp_res = pads->priv->compare_func (pads, data, comp_time,
1362         pads->priv->earliest_data, pads->priv->earliest_time,
1363         pads->priv->compare_user_data);
1364     if (cmp_res > 0)
1365       /* stop waiting */
1366       gst_collect_pads_set_waiting (pads, data, FALSE);
1367     else {
1368       if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1369         /* start waiting */
1370         gst_collect_pads_set_waiting (pads, data, TRUE);
1371         result = TRUE;
1372       }
1373     }
1374   }
1375
1376   return result;
1377 }
1378
1379 /**
1380  * gst_collect_pads_find_best_pad:
1381  * @pads: the collectpads to use
1382  * @data: returns the collectdata for earliest data
1383  * @time: returns the earliest available buffertime
1384  *
1385  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1386  * and return the corresponding #GstCollectData and buffertime.
1387  *
1388  * This function should be called with STREAM_LOCK held,
1389  * such as in the callback.
1390  */
1391 static void
1392 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1393     GstCollectData ** data, GstClockTime * time)
1394 {
1395   GSList *collected;
1396   GstCollectData *best = NULL;
1397   GstClockTime best_time = GST_CLOCK_TIME_NONE;
1398
1399   g_return_if_fail (data != NULL);
1400   g_return_if_fail (time != NULL);
1401
1402   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1403     GstBuffer *buffer;
1404     GstCollectData *data = (GstCollectData *) collected->data;
1405     GstClockTime timestamp;
1406
1407     buffer = gst_collect_pads_peek (pads, data);
1408     /* if we have a buffer check if it is better then the current best one */
1409     if (buffer != NULL) {
1410       timestamp = GST_BUFFER_DTS (buffer);
1411       if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1412         timestamp = GST_BUFFER_PTS (buffer);
1413       }
1414       gst_buffer_unref (buffer);
1415       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1416               best, best_time, pads->priv->compare_user_data) < 0) {
1417         best = data;
1418         best_time = timestamp;
1419       }
1420     }
1421   }
1422
1423   /* set earliest time */
1424   *data = best;
1425   *time = best_time;
1426
1427   GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1428       best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1429       GST_TIME_ARGS (best_time));
1430 }
1431
1432 /*
1433  * Function to recalculate earliest_data and earliest_timestamp. This also calls
1434  * gst_collect_pads_recalculate_waiting
1435  *
1436  * Must be called with STREAM_LOCK.
1437  */
1438 static gboolean
1439 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1440 {
1441   if (pads->priv->earliest_data)
1442     unref_data (pads->priv->earliest_data);
1443   gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1444       &pads->priv->earliest_time);
1445   if (pads->priv->earliest_data)
1446     ref_data (pads->priv->earliest_data);
1447   return gst_collect_pads_recalculate_waiting (pads);
1448 }
1449
1450 /*
1451  * Default collect callback triggered when #GstCollectPads gathered all data.
1452  *
1453  * Called with STREAM_LOCK.
1454  */
1455 static GstFlowReturn
1456 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1457 {
1458   GstCollectData *best = NULL;
1459   GstBuffer *buffer;
1460   GstFlowReturn ret = GST_FLOW_OK;
1461   GstCollectPadsBufferFunction func;
1462   gpointer buffer_user_data;
1463
1464   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1465
1466   GST_OBJECT_LOCK (pads);
1467   func = pads->priv->buffer_func;
1468   buffer_user_data = pads->priv->buffer_user_data;
1469   GST_OBJECT_UNLOCK (pads);
1470
1471   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1472
1473   /* Find the oldest pad at all cost */
1474   if (gst_collect_pads_recalculate_full (pads)) {
1475     /* waiting was switched on,
1476      * so give another thread a chance to deliver a possibly
1477      * older buffer; don't charge on yet with the current oldest */
1478     ret = GST_FLOW_OK;
1479     goto done;
1480   }
1481
1482   best = pads->priv->earliest_data;
1483
1484   /* No data collected means EOS. */
1485   if (G_UNLIKELY (best == NULL)) {
1486     ret = func (pads, best, NULL, buffer_user_data);
1487     if (ret == GST_FLOW_OK)
1488       ret = GST_FLOW_EOS;
1489     goto done;
1490   }
1491
1492   /* make sure that the pad we take a buffer from is waiting;
1493    * otherwise popping a buffer will seem not to have happened
1494    * and collectpads can get into a busy loop */
1495   gst_collect_pads_set_waiting (pads, best, TRUE);
1496
1497   /* Send buffer */
1498   buffer = gst_collect_pads_pop (pads, best);
1499   ret = func (pads, best, buffer, buffer_user_data);
1500
1501   /* maybe non-waiting was forced to waiting above due to
1502    * newsegment events coming too sparsely,
1503    * so re-check to restore state to avoid hanging/waiting */
1504   gst_collect_pads_recalculate_full (pads);
1505
1506 done:
1507   return ret;
1508 }
1509
1510 /*
1511  * Default timestamp compare function.
1512  */
1513 static gint
1514 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1515     GstCollectData * data1, GstClockTime timestamp1,
1516     GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1517 {
1518
1519   GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1520       " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1521       GST_TIME_ARGS (timestamp2));
1522   /* non-valid timestamps go first as they are probably headers or so */
1523   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1524     return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1525
1526   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1527     return 1;
1528
1529   /* compare timestamp */
1530   if (timestamp1 < timestamp2)
1531     return -1;
1532
1533   if (timestamp1 > timestamp2)
1534     return 1;
1535
1536   return 0;
1537 }
1538
1539 /* called with STREAM_LOCK */
1540 static void
1541 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1542     GstCollectData * data, GstClockTime new_pos)
1543 {
1544   gint cmp_res;
1545
1546   /* If oldest time is not known, or current pad got newsegment;
1547    * recalculate the state */
1548   if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1549     gst_collect_pads_recalculate_full (pads);
1550     goto exit;
1551   }
1552
1553   /* Check if the waiting state of the pad should change. */
1554   cmp_res =
1555       pads->priv->compare_func (pads, data, new_pos,
1556       pads->priv->earliest_data, pads->priv->earliest_time,
1557       pads->priv->compare_user_data);
1558
1559   if (cmp_res > 0)
1560     /* Stop waiting */
1561     gst_collect_pads_set_waiting (pads, data, FALSE);
1562
1563 exit:
1564   return;
1565
1566 }
1567
1568 static GstClockTime
1569 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1570     GstClockTime time)
1571 {
1572   GstClockTime otime = time;
1573   GstBuffer *in, *out = NULL;
1574
1575   if (pads->priv->clip_func) {
1576     in = gst_buffer_new ();
1577     GST_BUFFER_PTS (in) = time;
1578     GST_BUFFER_DTS (in) = time;
1579     pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1580     if (out) {
1581       otime = GST_BUFFER_PTS (out);
1582       gst_buffer_unref (out);
1583     } else {
1584       /* FIXME should distinguish between ahead or after segment,
1585        * let's assume after segment and use some large time ... */
1586       otime = G_MAXINT64 / 2;
1587     }
1588   }
1589
1590   return otime;
1591 }
1592
1593 /**
1594  * gst_collect_pads_event_default:
1595  * @pads: the collectpads to use
1596  * @data: collect data of corresponding pad
1597  * @event: event being processed
1598  * @discard: process but do not send event downstream
1599  *
1600  * Default GstCollectPads event handling that elements should always
1601  * chain up to to ensure proper operation.  Element might however indicate
1602  * event should not be forwarded downstream.
1603  */
1604 gboolean
1605 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1606     GstEvent * event, gboolean discard)
1607 {
1608   gboolean res = TRUE;
1609   GstCollectPadsBufferFunction buffer_func;
1610   GstObject *parent;
1611   GstPad *pad;
1612
1613   GST_OBJECT_LOCK (pads);
1614   buffer_func = pads->priv->buffer_func;
1615   GST_OBJECT_UNLOCK (pads);
1616
1617   pad = data->pad;
1618   parent = GST_OBJECT_PARENT (pad);
1619
1620   switch (GST_EVENT_TYPE (event)) {
1621     case GST_EVENT_FLUSH_START:
1622     {
1623       /* forward event to unblock check_collected */
1624       GST_DEBUG_OBJECT (pad, "forwarding flush start");
1625       res = gst_pad_event_default (pad, parent, event);
1626       event = NULL;
1627
1628       /* now unblock the chain function.
1629        * no cond per pad, so they all unblock,
1630        * non-flushing block again */
1631       GST_COLLECT_PADS_STREAM_LOCK (pads);
1632       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1633       gst_collect_pads_clear (pads, data);
1634
1635       /* cater for possible default muxing functionality */
1636       if (buffer_func) {
1637         /* restore to initial state */
1638         gst_collect_pads_set_waiting (pads, data, TRUE);
1639         /* if the current pad is affected, reset state, recalculate later */
1640         if (pads->priv->earliest_data == data) {
1641           unref_data (data);
1642           pads->priv->earliest_data = NULL;
1643           pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1644         }
1645       }
1646
1647       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1648
1649       goto eat;
1650     }
1651     case GST_EVENT_FLUSH_STOP:
1652     {
1653       /* flush the 1 buffer queue */
1654       GST_COLLECT_PADS_STREAM_LOCK (pads);
1655       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1656       gst_collect_pads_clear (pads, data);
1657       /* we need new segment info after the flush */
1658       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1659       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1660       /* if the pad was EOS, remove the EOS flag and
1661        * decrement the number of eospads */
1662       if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1663                   GST_COLLECT_PADS_STATE_EOS))) {
1664         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1665                 GST_COLLECT_PADS_STATE_WAITING))
1666           pads->priv->queuedpads++;
1667         pads->priv->eospads--;
1668         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1669       }
1670       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1671
1672       goto forward;
1673     }
1674     case GST_EVENT_EOS:
1675     {
1676       GST_COLLECT_PADS_STREAM_LOCK (pads);
1677       /* if the pad was not EOS, make it EOS and so we
1678        * have one more eospad */
1679       if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1680                   GST_COLLECT_PADS_STATE_EOS))) {
1681         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1682         if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1683                 GST_COLLECT_PADS_STATE_WAITING))
1684           pads->priv->queuedpads--;
1685         pads->priv->eospads++;
1686       }
1687       /* check if we need collecting anything, we ignore the result. */
1688       gst_collect_pads_check_collected (pads);
1689       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1690
1691       goto eat;
1692     }
1693     case GST_EVENT_SEGMENT:
1694     {
1695       GstSegment seg;
1696
1697       GST_COLLECT_PADS_STREAM_LOCK (pads);
1698
1699       gst_event_copy_segment (event, &seg);
1700
1701       GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1702
1703       /* default collection can not handle other segment formats than time */
1704       if (buffer_func && seg.format != GST_FORMAT_TIME) {
1705         GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1706             "can only handle time segments. Non time segment ignored.");
1707         goto newsegment_done;
1708       }
1709
1710       /* need to update segment first */
1711       data->segment = seg;
1712       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1713
1714       /* now we can use for e.g. running time */
1715       seg.position = gst_collect_pads_clip_time (pads, data, seg.start);
1716       /* update again */
1717       data->segment = seg;
1718
1719       /* default muxing functionality */
1720       if (!buffer_func)
1721         goto newsegment_done;
1722
1723       gst_collect_pads_handle_position_update (pads, data, seg.position);
1724
1725     newsegment_done:
1726       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1727       /* we must not forward this event since multiple segments will be
1728        * accumulated and this is certainly not what we want. */
1729       goto eat;
1730     }
1731     case GST_EVENT_GAP:
1732     {
1733       GstClockTime start, duration;
1734
1735       GST_COLLECT_PADS_STREAM_LOCK (pads);
1736
1737       gst_event_parse_gap (event, &start, &duration);
1738       if (GST_CLOCK_TIME_IS_VALID (duration))
1739         start += duration;
1740       /* we do not expect another buffer until after gap,
1741        * so that is our position now */
1742       data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1743
1744       gst_collect_pads_handle_position_update (pads, data,
1745           data->segment.position);
1746
1747       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1748       goto eat;
1749     }
1750     case GST_EVENT_STREAM_START:
1751       /* drop stream start events, element must create its own start event,
1752        * we can't just forward the first random stream start event we get */
1753       goto eat;
1754     case GST_EVENT_CAPS:
1755       goto eat;
1756     default:
1757       /* forward other events */
1758       goto forward;
1759   }
1760
1761 eat:
1762   if (event)
1763     gst_event_unref (event);
1764   return res;
1765
1766 forward:
1767   if (discard)
1768     goto eat;
1769   else
1770     return gst_pad_event_default (pad, parent, event);
1771 }
1772
1773 static gboolean
1774 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1775     GstCollectData * data, GstEvent * event, gpointer user_data)
1776 {
1777   return gst_collect_pads_event_default (pads, data, event, FALSE);
1778 }
1779
1780 static gboolean
1781 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1782 {
1783   gboolean res = FALSE, need_unlock = FALSE;
1784   GstCollectData *data;
1785   GstCollectPads *pads;
1786   GstCollectPadsEventFunction event_func;
1787   gpointer event_user_data;
1788
1789   /* some magic to get the managing collect_pads */
1790   GST_OBJECT_LOCK (pad);
1791   data = (GstCollectData *) gst_pad_get_element_private (pad);
1792   if (G_UNLIKELY (data == NULL))
1793     goto pad_removed;
1794   ref_data (data);
1795   GST_OBJECT_UNLOCK (pad);
1796
1797   res = FALSE;
1798
1799   pads = data->collect;
1800
1801   GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1802       GST_EVENT_TYPE_NAME (event));
1803
1804   GST_OBJECT_LOCK (pads);
1805   event_func = pads->priv->event_func;
1806   event_user_data = pads->priv->event_user_data;
1807   GST_OBJECT_UNLOCK (pads);
1808
1809   if (GST_EVENT_IS_SERIALIZED (event)) {
1810     GST_COLLECT_PADS_STREAM_LOCK (pads);
1811     need_unlock = TRUE;
1812   }
1813
1814   if (G_LIKELY (event_func)) {
1815     res = event_func (pads, data, event, event_user_data);
1816   }
1817
1818   if (need_unlock)
1819     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1820
1821   unref_data (data);
1822   return res;
1823
1824   /* ERRORS */
1825 pad_removed:
1826   {
1827     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1828     GST_OBJECT_UNLOCK (pad);
1829     return FALSE;
1830   }
1831 }
1832
1833 /**
1834  * gst_collect_pads_query_default:
1835  * @pads: the collectpads to use
1836  * @data: collect data of corresponding pad
1837  * @query: query being processed
1838  * @discard: process but do not send event downstream
1839  *
1840  * Default GstCollectPads query handling that elements should always
1841  * chain up to to ensure proper operation.  Element might however indicate
1842  * query should not be forwarded downstream.
1843  */
1844 gboolean
1845 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
1846     GstQuery * query, gboolean discard)
1847 {
1848   gboolean res = TRUE;
1849   GstObject *parent;
1850   GstPad *pad;
1851
1852   pad = data->pad;
1853   parent = GST_OBJECT_PARENT (pad);
1854
1855   switch (GST_QUERY_TYPE (query)) {
1856     case GST_QUERY_SEEKING:
1857     {
1858       GstFormat format;
1859
1860       /* don't pass it along as some (file)sink might claim it does
1861        * whereas with a collectpads in between that will not likely work */
1862       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1863       gst_query_set_seeking (query, format, FALSE, 0, -1);
1864       res = TRUE;
1865       discard = TRUE;
1866       break;
1867     }
1868     default:
1869       break;
1870   }
1871
1872   if (!discard)
1873     return gst_pad_query_default (pad, parent, query);
1874   else
1875     return res;
1876 }
1877
1878 static gboolean
1879 gst_collect_pads_query_default_internal (GstCollectPads * pads,
1880     GstCollectData * data, GstQuery * query, gpointer user_data)
1881 {
1882   return gst_collect_pads_query_default (pads, data, query, FALSE);
1883 }
1884
1885 static gboolean
1886 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
1887 {
1888   gboolean res = FALSE, need_unlock = FALSE;
1889   GstCollectData *data;
1890   GstCollectPads *pads;
1891   GstCollectPadsQueryFunction query_func;
1892   gpointer query_user_data;
1893
1894   GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
1895       GST_QUERY_TYPE_NAME (query));
1896
1897   /* some magic to get the managing collect_pads */
1898   GST_OBJECT_LOCK (pad);
1899   data = (GstCollectData *) gst_pad_get_element_private (pad);
1900   if (G_UNLIKELY (data == NULL))
1901     goto pad_removed;
1902   ref_data (data);
1903   GST_OBJECT_UNLOCK (pad);
1904
1905   pads = data->collect;
1906
1907   GST_OBJECT_LOCK (pads);
1908   query_func = pads->priv->query_func;
1909   query_user_data = pads->priv->query_user_data;
1910   GST_OBJECT_UNLOCK (pads);
1911
1912   if (GST_QUERY_IS_SERIALIZED (query)) {
1913     GST_COLLECT_PADS_STREAM_LOCK (pads);
1914     need_unlock = TRUE;
1915   }
1916
1917   if (G_LIKELY (query_func)) {
1918     res = query_func (pads, data, query, query_user_data);
1919   }
1920
1921   if (need_unlock)
1922     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1923
1924   unref_data (data);
1925   return res;
1926
1927   /* ERRORS */
1928 pad_removed:
1929   {
1930     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1931     GST_OBJECT_UNLOCK (pad);
1932     return FALSE;
1933   }
1934 }
1935
1936
1937 /* For each buffer we receive we check if our collected condition is reached
1938  * and if so we call the collected function. When this is done we check if
1939  * data has been unqueued. If data is still queued we wait holding the stream
1940  * lock to make sure no EOS event can happen while we are ready to be
1941  * collected 
1942  */
1943 static GstFlowReturn
1944 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1945 {
1946   GstCollectData *data;
1947   GstCollectPads *pads;
1948   GstFlowReturn ret;
1949   GstBuffer **buffer_p;
1950   guint32 cookie;
1951
1952   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1953
1954   /* some magic to get the managing collect_pads */
1955   GST_OBJECT_LOCK (pad);
1956   data = (GstCollectData *) gst_pad_get_element_private (pad);
1957   if (G_UNLIKELY (data == NULL))
1958     goto no_data;
1959   ref_data (data);
1960   GST_OBJECT_UNLOCK (pad);
1961
1962   pads = data->collect;
1963
1964   GST_COLLECT_PADS_STREAM_LOCK (pads);
1965   /* if not started, bail out */
1966   if (G_UNLIKELY (!pads->priv->started))
1967     goto not_started;
1968   /* check if this pad is flushing */
1969   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1970               GST_COLLECT_PADS_STATE_FLUSHING)))
1971     goto flushing;
1972   /* pad was EOS, we can refuse this data */
1973   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1974               GST_COLLECT_PADS_STATE_EOS)))
1975     goto eos;
1976
1977   /* see if we need to clip */
1978   if (pads->priv->clip_func) {
1979     GstBuffer *outbuf = NULL;
1980     ret =
1981         pads->priv->clip_func (pads, data, buffer, &outbuf,
1982         pads->priv->clip_user_data);
1983     buffer = outbuf;
1984
1985     if (G_UNLIKELY (outbuf == NULL))
1986       goto clipped;
1987
1988     if (G_UNLIKELY (ret == GST_FLOW_EOS))
1989       goto eos;
1990     else if (G_UNLIKELY (ret != GST_FLOW_OK))
1991       goto error;
1992   }
1993
1994   GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
1995       GST_DEBUG_PAD_NAME (pad));
1996
1997   /* One more pad has data queued */
1998   if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
1999     pads->priv->queuedpads++;
2000   buffer_p = &data->buffer;
2001   gst_buffer_replace (buffer_p, buffer);
2002
2003   /* update segment last position if in TIME */
2004   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2005     GstClockTime timestamp;
2006
2007     timestamp = GST_BUFFER_DTS (buffer);
2008     if (!GST_CLOCK_TIME_IS_VALID (timestamp))
2009       timestamp = GST_BUFFER_PTS (buffer);
2010
2011     if (GST_CLOCK_TIME_IS_VALID (timestamp))
2012       data->segment.position = timestamp;
2013   }
2014
2015   /* While we have data queued on this pad try to collect stuff */
2016   do {
2017     /* Check if our collected condition is matched and call the collected
2018      * function if it is */
2019     ret = gst_collect_pads_check_collected (pads);
2020     /* when an error occurs, we want to report this back to the caller ASAP
2021      * without having to block if the buffer was not popped */
2022     if (G_UNLIKELY (ret != GST_FLOW_OK))
2023       goto error;
2024
2025     /* data was consumed, we can exit and accept new data */
2026     if (data->buffer == NULL)
2027       break;
2028
2029     /* Having the _INIT here means we don't care about any broadcast up to here
2030      * (most of which occur with STREAM_LOCK held, so could not have happened
2031      * anyway).  We do care about e.g. a remove initiated broadcast as of this
2032      * point.  Putting it here also makes this thread ignores any evt it raised
2033      * itself (as is a usual WAIT semantic).
2034      */
2035     GST_COLLECT_PADS_EVT_INIT (cookie);
2036
2037     /* pad could be removed and re-added */
2038     unref_data (data);
2039     GST_OBJECT_LOCK (pad);
2040     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2041       goto pad_removed;
2042     ref_data (data);
2043     GST_OBJECT_UNLOCK (pad);
2044
2045     GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2046         GST_DEBUG_PAD_NAME (pad));
2047
2048     /* wait to be collected, this must happen from another thread triggered
2049      * by the _chain function of another pad. We release the lock so we
2050      * can get stopped or flushed as well. We can however not get EOS
2051      * because we still hold the STREAM_LOCK.
2052      */
2053     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2054     GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2055     GST_COLLECT_PADS_STREAM_LOCK (pads);
2056
2057     GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2058
2059     /* after a signal, we could be stopped */
2060     if (G_UNLIKELY (!pads->priv->started))
2061       goto not_started;
2062     /* check if this pad is flushing */
2063     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2064                 GST_COLLECT_PADS_STATE_FLUSHING)))
2065       goto flushing;
2066   }
2067   while (data->buffer != NULL);
2068
2069 unlock_done:
2070   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2071   unref_data (data);
2072   if (buffer)
2073     gst_buffer_unref (buffer);
2074   return ret;
2075
2076 pad_removed:
2077   {
2078     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2079     GST_OBJECT_UNLOCK (pad);
2080     ret = GST_FLOW_NOT_LINKED;
2081     goto unlock_done;
2082   }
2083   /* ERRORS */
2084 no_data:
2085   {
2086     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2087     GST_OBJECT_UNLOCK (pad);
2088     gst_buffer_unref (buffer);
2089     return GST_FLOW_NOT_LINKED;
2090   }
2091 not_started:
2092   {
2093     GST_DEBUG ("not started");
2094     gst_collect_pads_clear (pads, data);
2095     ret = GST_FLOW_FLUSHING;
2096     goto unlock_done;
2097   }
2098 flushing:
2099   {
2100     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2101     gst_collect_pads_clear (pads, data);
2102     ret = GST_FLOW_FLUSHING;
2103     goto unlock_done;
2104   }
2105 eos:
2106   {
2107     /* we should not post an error for this, just inform upstream that
2108      * we don't expect anything anymore */
2109     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2110     ret = GST_FLOW_EOS;
2111     goto unlock_done;
2112   }
2113 clipped:
2114   {
2115     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2116     ret = GST_FLOW_OK;
2117     goto unlock_done;
2118   }
2119 error:
2120   {
2121     /* we print the error, the element should post a reasonable error
2122      * message for fatal errors */
2123     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2124     gst_collect_pads_clear (pads, data);
2125     goto unlock_done;
2126   }
2127 }