2 * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3 * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
4 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
24 * SECTION:gstcollectpads
25 * @short_description: manages a set of pads that operate in collect mode
28 * Manages a set of pads that operate in collect mode. This means that control
29 * is given to the manager of this object when all pads have data.
32 * Collectpads are created with gst_collect_pads_new(). A callback should then
33 * be installed with gst_collect_pads_set_function ().
36 * Pads are added to the collection with gst_collect_pads_add_pad()/
37 * gst_collect_pads_remove_pad(). The pad
38 * has to be a sinkpad. The chain and event functions of the pad are
39 * overridden. The element_private of the pad is used to store
40 * private information for the collectpads.
43 * For each pad, data is queued in the _chain function or by
44 * performing a pull_range.
47 * When data is queued on all pads in waiting mode, the callback function is called.
50 * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
51 * One can peek at the data with the gst_collect_pads_peek() function.
52 * These functions will return %NULL if the pad received an EOS event. When all
53 * pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS
57 * Data can also be dequeued in byte units using the gst_collect_pads_available(),
58 * gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
61 * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
62 * their state change functions to start and stop the processing of the collectpads.
63 * The gst_collect_pads_stop() call should be called before calling the parent
64 * element state change function in the PAUSED_TO_READY state change to ensure
65 * no pad is blocked and the element can finish streaming.
68 * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
69 * CollectPads element is not waiting for data to be collected on non-waiting pads.
70 * Thus these pads may but need not have data when the callback is called.
71 * All pads are in waiting mode by default.
80 #include <gst/gst_private.h>
82 #include "gstcollectpads.h"
84 #include "../../../gst/glib-compat-private.h"
86 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
87 #define GST_CAT_DEFAULT collect_pads_debug
89 #define parent_class gst_collect_pads_parent_class
90 G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
92 struct _GstCollectDataPrivate
94 /* refcounting for struct, and destroy callback */
95 GstCollectDataDestroyNotify destroy_notify;
99 struct _GstCollectPadsPrivate
101 /* with LOCK and/or STREAM_LOCK */
104 /* with STREAM_LOCK */
105 guint32 cookie; /* @data list cookie */
106 guint numpads; /* number of pads in @data */
107 guint queuedpads; /* number of pads with a buffer */
108 guint eospads; /* number of pads that are EOS */
109 GstClockTime earliest_time; /* Current earliest time */
110 GstCollectData *earliest_data; /* Pad data for current earliest time */
113 GSList *pad_list; /* updated pad list */
114 guint32 pad_cookie; /* updated cookie */
116 GstCollectPadsFunction func; /* function and user_data for callback */
118 GstCollectPadsBufferFunction buffer_func; /* function and user_data for buffer callback */
119 gpointer buffer_user_data;
120 GstCollectPadsCompareFunction compare_func;
121 gpointer compare_user_data;
122 GstCollectPadsEventFunction event_func; /* function and data for event callback */
123 gpointer event_user_data;
124 GstCollectPadsQueryFunction query_func;
125 gpointer query_user_data;
126 GstCollectPadsClipFunction clip_func;
127 gpointer clip_user_data;
128 GstCollectPadsFlushFunction flush_func;
129 gpointer flush_user_data;
131 /* no other lock needed */
132 GMutex evt_lock; /* these make up sort of poor man's event signaling */
137 gboolean pending_flush_start;
138 gboolean pending_flush_stop;
141 static void gst_collect_pads_clear (GstCollectPads * pads,
142 GstCollectData * data);
143 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
145 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
147 static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
149 static void gst_collect_pads_finalize (GObject * object);
150 static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
151 pads, gpointer user_data);
152 static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
153 GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
154 GstClockTime timestamp2, gpointer user_data);
155 static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
156 static void ref_data (GstCollectData * data);
157 static void unref_data (GstCollectData * data);
159 static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
160 pads, GstCollectData * data, GstEvent * event, gpointer user_data);
161 static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
162 pads, GstCollectData * data, GstQuery * query, gpointer user_data);
165 /* Some properties are protected by LOCK, others by STREAM_LOCK
166 * However, manipulating either of these partitions may require
167 * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
168 * Alternative implementations are possible, e.g. some low-level re-implementing
169 * of the 2 above locks to drop both of them atomically when going into _WAIT.
171 #define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
172 #define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
173 #define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START { \
174 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
175 /* should work unless a lot of event'ing and thread starvation */\
176 while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \
177 g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads), \
178 GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
179 cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
180 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
182 #define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
185 g_get_current_time (&tv); \
186 g_time_val_add (&tv, timeout); \
188 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
189 /* should work unless a lot of event'ing and thread starvation */\
190 while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie) \
191 g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads), \
192 GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv); \
193 cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
194 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
196 #define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START { \
197 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
198 /* never mind wrap-around */ \
199 ++(((GstCollectPads *) pads)->priv->evt_cookie); \
200 g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads)); \
201 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
203 #define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START { \
204 g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
205 cookie = ((GstCollectPads *) pads)->priv->evt_cookie; \
206 g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads)); \
210 gst_collect_pads_class_init (GstCollectPadsClass * klass)
212 GObjectClass *gobject_class = (GObjectClass *) klass;
214 g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
216 GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
219 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
223 gst_collect_pads_init (GstCollectPads * pads)
226 G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS,
227 GstCollectPadsPrivate);
230 pads->priv->cookie = 0;
231 pads->priv->numpads = 0;
232 pads->priv->queuedpads = 0;
233 pads->priv->eospads = 0;
234 pads->priv->started = FALSE;
236 g_rec_mutex_init (&pads->stream_lock);
238 pads->priv->func = gst_collect_pads_default_collected;
239 pads->priv->user_data = NULL;
240 pads->priv->event_func = NULL;
241 pads->priv->event_user_data = NULL;
243 /* members for default muxing */
244 pads->priv->buffer_func = NULL;
245 pads->priv->buffer_user_data = NULL;
246 pads->priv->compare_func = gst_collect_pads_default_compare_func;
247 pads->priv->compare_user_data = NULL;
248 pads->priv->earliest_data = NULL;
249 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
251 pads->priv->event_func = gst_collect_pads_event_default_internal;
252 pads->priv->query_func = gst_collect_pads_query_default_internal;
254 /* members to manage the pad list */
255 pads->priv->pad_cookie = 0;
256 pads->priv->pad_list = NULL;
258 /* members for event */
259 g_mutex_init (&pads->priv->evt_lock);
260 g_cond_init (&pads->priv->evt_cond);
261 pads->priv->evt_cookie = 0;
263 pads->priv->seeking = FALSE;
264 pads->priv->pending_flush_start = FALSE;
265 pads->priv->pending_flush_stop = FALSE;
267 /* clear floating flag */
268 gst_object_ref_sink (pads);
272 gst_collect_pads_finalize (GObject * object)
274 GstCollectPads *pads = GST_COLLECT_PADS (object);
276 GST_DEBUG_OBJECT (object, "finalize");
278 g_rec_mutex_clear (&pads->stream_lock);
280 g_cond_clear (&pads->priv->evt_cond);
281 g_mutex_clear (&pads->priv->evt_lock);
283 /* Remove pads and free pads list */
284 g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
285 g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
286 g_slist_free (pads->data);
287 g_slist_free (pads->priv->pad_list);
289 G_OBJECT_CLASS (parent_class)->finalize (object);
293 * gst_collect_pads_new:
295 * Create a new instance of #GstCollectPads.
299 * Returns: (transfer full): a new #GstCollectPads, or %NULL in case of an error.
302 gst_collect_pads_new (void)
304 GstCollectPads *newcoll;
306 newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
311 /* Must be called with GstObject lock! */
313 gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
314 GstCollectPadsBufferFunction func, gpointer user_data)
316 pads->priv->buffer_func = func;
317 pads->priv->buffer_user_data = user_data;
321 * gst_collect_pads_set_buffer_function:
322 * @pads: the collectpads to use
323 * @func: the function to set
324 * @user_data: (closure): user data passed to the function
326 * Set the callback function and user data that will be called with
327 * the oldest buffer when all pads have been collected, or %NULL on EOS.
328 * If a buffer is passed, the callback owns a reference and must unref
334 gst_collect_pads_set_buffer_function (GstCollectPads * pads,
335 GstCollectPadsBufferFunction func, gpointer user_data)
337 g_return_if_fail (pads != NULL);
338 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
340 GST_OBJECT_LOCK (pads);
341 gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
342 GST_OBJECT_UNLOCK (pads);
346 * gst_collect_pads_set_compare_function:
347 * @pads: the pads to use
348 * @func: the function to set
349 * @user_data: (closure): user data passed to the function
351 * Set the timestamp comparison function.
355 /* NOTE allowing to change comparison seems not advisable;
356 no known use-case, and collaboration with default algorithm is unpredictable.
357 If custom compairing/operation is needed, just use a collect function of
360 gst_collect_pads_set_compare_function (GstCollectPads * pads,
361 GstCollectPadsCompareFunction func, gpointer user_data)
363 g_return_if_fail (pads != NULL);
364 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
366 GST_OBJECT_LOCK (pads);
367 pads->priv->compare_func = func;
368 pads->priv->compare_user_data = user_data;
369 GST_OBJECT_UNLOCK (pads);
373 * gst_collect_pads_set_function:
374 * @pads: the collectpads to use
375 * @func: the function to set
376 * @user_data: user data passed to the function
378 * CollectPads provides a default collection algorithm that will determine
379 * the oldest buffer available on all of its pads, and then delegate
380 * to a configured callback.
381 * However, if circumstances are more complicated and/or more control
382 * is desired, this sets a callback that will be invoked instead when
383 * all the pads added to the collection have buffers queued.
384 * Evidently, this callback is not compatible with
385 * gst_collect_pads_set_buffer_function() callback.
386 * If this callback is set, the former will be unset.
391 gst_collect_pads_set_function (GstCollectPads * pads,
392 GstCollectPadsFunction func, gpointer user_data)
394 g_return_if_fail (pads != NULL);
395 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
397 GST_OBJECT_LOCK (pads);
398 pads->priv->func = func;
399 pads->priv->user_data = user_data;
400 gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
401 GST_OBJECT_UNLOCK (pads);
405 ref_data (GstCollectData * data)
407 g_assert (data != NULL);
409 g_atomic_int_inc (&(data->priv->refcount));
413 unref_data (GstCollectData * data)
415 g_assert (data != NULL);
416 g_assert (data->priv->refcount > 0);
418 if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
421 if (data->priv->destroy_notify)
422 data->priv->destroy_notify (data);
424 g_object_unref (data->pad);
426 gst_buffer_unref (data->buffer);
433 * gst_collect_pads_set_event_function:
434 * @pads: the collectpads to use
435 * @func: the function to set
436 * @user_data: user data passed to the function
438 * Set the event callback function and user data that will be called when
439 * collectpads has received an event originating from one of the collected
440 * pads. If the event being processed is a serialized one, this callback is
441 * called with @pads STREAM_LOCK held, otherwise not. As this lock should be
442 * held when calling a number of CollectPads functions, it should be acquired
443 * if so (unusually) needed.
448 gst_collect_pads_set_event_function (GstCollectPads * pads,
449 GstCollectPadsEventFunction func, gpointer user_data)
451 g_return_if_fail (pads != NULL);
452 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
454 GST_OBJECT_LOCK (pads);
455 pads->priv->event_func = func;
456 pads->priv->event_user_data = user_data;
457 GST_OBJECT_UNLOCK (pads);
461 * gst_collect_pads_set_query_function:
462 * @pads: the collectpads to use
463 * @func: the function to set
464 * @user_data: user data passed to the function
466 * Set the query callback function and user data that will be called after
467 * collectpads has received a query originating from one of the collected
468 * pads. If the query being processed is a serialized one, this callback is
469 * called with @pads STREAM_LOCK held, otherwise not. As this lock should be
470 * held when calling a number of CollectPads functions, it should be acquired
471 * if so (unusually) needed.
476 gst_collect_pads_set_query_function (GstCollectPads * pads,
477 GstCollectPadsQueryFunction func, gpointer user_data)
479 g_return_if_fail (pads != NULL);
480 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
482 GST_OBJECT_LOCK (pads);
483 pads->priv->query_func = func;
484 pads->priv->query_user_data = user_data;
485 GST_OBJECT_UNLOCK (pads);
489 * gst_collect_pads_clip_running_time:
490 * @pads: the collectpads to use
491 * @cdata: collect data of corresponding pad
492 * @buf: buffer being clipped
493 * @outbuf: (allow-none): output buffer with running time, or NULL if clipped
494 * @user_data: user data (unused)
496 * Convenience clipping function that converts incoming buffer's timestamp
497 * to running time, or clips the buffer if outside configured segment.
500 gst_collect_pads_clip_running_time (GstCollectPads * pads,
501 GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
507 time = GST_BUFFER_PTS (buf);
509 /* invalid left alone and passed */
510 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
511 time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
512 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
513 GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
514 GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
515 gst_buffer_unref (buf);
518 GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
519 GST_TIME_FORMAT " running time",
520 GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
521 *outbuf = gst_buffer_make_writable (buf);
522 GST_BUFFER_PTS (*outbuf) = time;
523 GST_BUFFER_DTS (*outbuf) = gst_segment_to_running_time (&cdata->segment,
524 GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf));
532 * gst_collect_pads_set_clip_function:
533 * @pads: the collectpads to use
534 * @clipfunc: clip function to install
535 * @user_data: user data to pass to @clip_func
537 * Install a clipping function that is called right after a buffer is received
538 * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
541 gst_collect_pads_set_clip_function (GstCollectPads * pads,
542 GstCollectPadsClipFunction clipfunc, gpointer user_data)
544 g_return_if_fail (pads != NULL);
545 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
547 pads->priv->clip_func = clipfunc;
548 pads->priv->clip_user_data = user_data;
552 * gst_collect_pads_set_flush_function:
553 * @pads: the collectpads to use
554 * @func: flush function to install
555 * @user_data: user data to pass to @func
557 * Install a flush function that is called when the internal
558 * state of all pads should be flushed as part of flushing seek
559 * handling. See #GstCollectPadsFlushFunction for more info.
564 gst_collect_pads_set_flush_function (GstCollectPads * pads,
565 GstCollectPadsFlushFunction func, gpointer user_data)
567 g_return_if_fail (pads != NULL);
568 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
570 pads->priv->flush_func = func;
571 pads->priv->flush_user_data = user_data;
575 * gst_collect_pads_add_pad:
576 * @pads: the collectpads to use
577 * @pad: (transfer none): the pad to add
578 * @size: the size of the returned #GstCollectData structure
579 * @destroy_notify: (scope async): function to be called before the returned
580 * #GstCollectData structure is freed
581 * @lock: whether to lock this pad in usual waiting state
583 * Add a pad to the collection of collect pads. The pad has to be
584 * a sinkpad. The refcount of the pad is incremented. Use
585 * gst_collect_pads_remove_pad() to remove the pad from the collection
588 * You specify a size for the returned #GstCollectData structure
589 * so that you can use it to store additional information.
591 * You can also specify a #GstCollectDataDestroyNotify that will be called
592 * just before the #GstCollectData structure is freed. It is passed the
593 * pointer to the structure and should free any custom memory and resources
596 * Keeping a pad locked in waiting state is only relevant when using
597 * the default collection algorithm (providing the oldest buffer).
598 * It ensures a buffer must be available on this pad for a collection
599 * to take place. This is of typical use to a muxer element where
600 * non-subtitle streams should always be in waiting state,
601 * e.g. to assure that caps information is available on all these streams
602 * when initial headers have to be written.
604 * The pad will be automatically activated in push mode when @pads is
609 * Returns: (nullable) (transfer none): a new #GstCollectData to identify the
610 * new pad. Or %NULL if wrong parameters are supplied.
613 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
614 GstCollectDataDestroyNotify destroy_notify, gboolean lock)
616 GstCollectData *data;
618 g_return_val_if_fail (pads != NULL, NULL);
619 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
620 g_return_val_if_fail (pad != NULL, NULL);
621 g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
622 g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
624 GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
626 data = g_malloc0 (size);
627 data->priv = g_new0 (GstCollectDataPrivate, 1);
628 data->collect = pads;
629 data->pad = gst_object_ref (pad);
632 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
633 data->state = GST_COLLECT_PADS_STATE_WAITING;
634 data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
635 data->priv->refcount = 1;
636 data->priv->destroy_notify = destroy_notify;
638 GST_OBJECT_LOCK (pads);
639 GST_OBJECT_LOCK (pad);
640 gst_pad_set_element_private (pad, data);
641 GST_OBJECT_UNLOCK (pad);
642 pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
643 gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
644 gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
645 gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
646 /* backward compat, also add to data if stopped, so that the element already
647 * has this in the public data list before going PAUSED (typically)
648 * this can only be done when we are stopped because we don't take the
649 * STREAM_LOCK to protect the pads->data list. */
650 if (!pads->priv->started) {
651 pads->data = g_slist_append (pads->data, data);
654 /* activate the pad when needed */
655 if (pads->priv->started)
656 gst_pad_set_active (pad, TRUE);
657 pads->priv->pad_cookie++;
658 GST_OBJECT_UNLOCK (pads);
664 find_pad (GstCollectData * data, GstPad * pad)
666 if (data->pad == pad)
672 * gst_collect_pads_remove_pad:
673 * @pads: the collectpads to use
674 * @pad: (transfer none): the pad to remove
676 * Remove a pad from the collection of collect pads. This function will also
677 * free the #GstCollectData and all the resources that were allocated with
678 * gst_collect_pads_add_pad().
680 * The pad will be deactivated automatically when @pads is stopped.
684 * Returns: %TRUE if the pad could be removed.
687 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
689 GstCollectData *data;
692 g_return_val_if_fail (pads != NULL, FALSE);
693 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
694 g_return_val_if_fail (pad != NULL, FALSE);
695 g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
697 GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
699 GST_OBJECT_LOCK (pads);
701 g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
705 data = (GstCollectData *) list->data;
707 GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
710 /* clear the stuff we configured */
711 gst_pad_set_chain_function (pad, NULL);
712 gst_pad_set_event_function (pad, NULL);
713 GST_OBJECT_LOCK (pad);
714 gst_pad_set_element_private (pad, NULL);
715 GST_OBJECT_UNLOCK (pad);
717 /* backward compat, also remove from data if stopped, note that this function
718 * can only be called when we are stopped because we don't take the
719 * STREAM_LOCK to protect the pads->data list. */
720 if (!pads->priv->started) {
723 dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
725 GstCollectData *pdata = dlist->data;
727 pads->data = g_slist_delete_link (pads->data, dlist);
731 /* remove from the pad list */
732 pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
733 pads->priv->pad_cookie++;
735 /* signal waiters because something changed */
736 GST_COLLECT_PADS_EVT_BROADCAST (pads);
738 /* deactivate the pad when needed */
739 if (!pads->priv->started)
740 gst_pad_set_active (pad, FALSE);
742 /* clean and free the collect data */
745 GST_OBJECT_UNLOCK (pads);
751 GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
752 GST_DEBUG_PAD_NAME (pad));
753 GST_OBJECT_UNLOCK (pads);
759 * Must be called with STREAM_LOCK and OBJECT_LOCK.
762 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
767 /* Update the pads flushing flag */
768 for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
769 GstCollectData *cdata = walk->data;
771 if (GST_IS_PAD (cdata->pad)) {
772 GST_OBJECT_LOCK (cdata->pad);
774 GST_PAD_SET_FLUSHING (cdata->pad);
776 GST_PAD_UNSET_FLUSHING (cdata->pad);
778 GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
780 GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
781 gst_collect_pads_clear (pads, cdata);
782 GST_OBJECT_UNLOCK (cdata->pad);
786 /* inform _chain of changes */
787 GST_COLLECT_PADS_EVT_BROADCAST (pads);
791 * gst_collect_pads_set_flushing:
792 * @pads: the collectpads to use
793 * @flushing: desired state of the pads
795 * Change the flushing state of all the pads in the collection. No pad
796 * is able to accept anymore data when @flushing is %TRUE. Calling this
797 * function with @flushing %FALSE makes @pads accept data again.
798 * Caller must ensure that downstream streaming (thread) is not blocked,
799 * e.g. by sending a FLUSH_START downstream.
804 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
806 g_return_if_fail (pads != NULL);
807 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
809 /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
810 GST_COLLECT_PADS_STREAM_LOCK (pads);
811 GST_OBJECT_LOCK (pads);
812 gst_collect_pads_set_flushing_unlocked (pads, flushing);
813 GST_OBJECT_UNLOCK (pads);
814 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
818 * gst_collect_pads_start:
819 * @pads: the collectpads to use
821 * Starts the processing of data in the collect_pads.
826 gst_collect_pads_start (GstCollectPads * pads)
830 g_return_if_fail (pads != NULL);
831 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
833 GST_DEBUG_OBJECT (pads, "starting collect pads");
835 /* make sure stop and collect cannot be called anymore */
836 GST_COLLECT_PADS_STREAM_LOCK (pads);
838 /* make pads streamable */
839 GST_OBJECT_LOCK (pads);
841 /* loop over the master pad list and reset the segment */
842 collected = pads->priv->pad_list;
843 for (; collected; collected = g_slist_next (collected)) {
844 GstCollectData *data;
846 data = collected->data;
847 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
850 gst_collect_pads_set_flushing_unlocked (pads, FALSE);
852 /* Start collect pads */
853 pads->priv->started = TRUE;
854 GST_OBJECT_UNLOCK (pads);
855 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
859 * gst_collect_pads_stop:
860 * @pads: the collectpads to use
862 * Stops the processing of data in the collect_pads. this function
863 * will also unblock any blocking operations.
868 gst_collect_pads_stop (GstCollectPads * pads)
872 g_return_if_fail (pads != NULL);
873 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
875 GST_DEBUG_OBJECT (pads, "stopping collect pads");
877 /* make sure collect and start cannot be called anymore */
878 GST_COLLECT_PADS_STREAM_LOCK (pads);
880 /* make pads not accept data anymore */
881 GST_OBJECT_LOCK (pads);
882 gst_collect_pads_set_flushing_unlocked (pads, TRUE);
884 /* Stop collect pads */
885 pads->priv->started = FALSE;
886 pads->priv->eospads = 0;
887 pads->priv->queuedpads = 0;
889 /* loop over the master pad list and flush buffers */
890 collected = pads->priv->pad_list;
891 for (; collected; collected = g_slist_next (collected)) {
892 GstCollectData *data;
893 GstBuffer **buffer_p;
895 data = collected->data;
897 buffer_p = &data->buffer;
898 gst_buffer_replace (buffer_p, NULL);
901 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
904 if (pads->priv->earliest_data)
905 unref_data (pads->priv->earliest_data);
906 pads->priv->earliest_data = NULL;
907 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
909 GST_OBJECT_UNLOCK (pads);
910 /* Wake them up so they can end the chain functions. */
911 GST_COLLECT_PADS_EVT_BROADCAST (pads);
913 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
917 * gst_collect_pads_peek:
918 * @pads: the collectpads to peek
919 * @data: the data to use
921 * Peek at the buffer currently queued in @data. This function
922 * should be called with the @pads STREAM_LOCK held, such as in the callback
927 * Returns: The buffer in @data or %NULL if no buffer is queued.
928 * should unref the buffer after usage.
931 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
935 g_return_val_if_fail (pads != NULL, NULL);
936 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
937 g_return_val_if_fail (data != NULL, NULL);
939 if ((result = data->buffer))
940 gst_buffer_ref (result);
942 GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
943 GST_DEBUG_PAD_NAME (data->pad), result);
949 * gst_collect_pads_pop:
950 * @pads: the collectpads to pop
951 * @data: the data to use
953 * Pop the buffer currently queued in @data. This function
954 * should be called with the @pads STREAM_LOCK held, such as in the callback
959 * Returns: (transfer full): The buffer in @data or %NULL if no buffer was
960 * queued. You should unref the buffer after usage.
963 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
967 g_return_val_if_fail (pads != NULL, NULL);
968 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
969 g_return_val_if_fail (data != NULL, NULL);
971 if ((result = data->buffer)) {
974 /* one less pad with queued data now */
975 if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
976 pads->priv->queuedpads--;
979 GST_COLLECT_PADS_EVT_BROADCAST (pads);
981 GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
982 GST_DEBUG_PAD_NAME (data->pad), result);
987 /* pop and unref the currently queued buffer, should be called with STREAM_LOCK
990 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
994 if ((buf = gst_collect_pads_pop (pads, data)))
995 gst_buffer_unref (buf);
999 * gst_collect_pads_available:
1000 * @pads: the collectpads to query
1002 * Query how much bytes can be read from each queued buffer. This means
1003 * that the result of this call is the maximum number of bytes that can
1004 * be read from each of the pads.
1006 * This function should be called with @pads STREAM_LOCK held, such as
1011 * Returns: The maximum number of bytes queued on all pads. This function
1012 * returns 0 if a pad has no queued buffer.
1014 /* we might pre-calculate this in some struct field,
1015 * but would then have to maintain this in _chain and particularly _pop, etc,
1016 * even if element is never interested in this information */
1018 gst_collect_pads_available (GstCollectPads * pads)
1021 guint result = G_MAXUINT;
1023 g_return_val_if_fail (pads != NULL, 0);
1024 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1026 collected = pads->data;
1027 for (; collected; collected = g_slist_next (collected)) {
1028 GstCollectData *pdata;
1032 pdata = (GstCollectData *) collected->data;
1034 /* ignore pad with EOS */
1035 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
1036 GST_COLLECT_PADS_STATE_EOS))) {
1037 GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
1041 /* an empty buffer without EOS is weird when we get here.. */
1042 if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
1043 GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
1047 /* this is the size left of the buffer */
1048 size = gst_buffer_get_size (buffer) - pdata->pos;
1049 GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
1051 /* need to return the min of all available data */
1055 /* nothing changed, all must be EOS then, return 0 */
1056 if (G_UNLIKELY (result == G_MAXUINT))
1068 * gst_collect_pads_flush:
1069 * @pads: the collectpads to query
1070 * @data: the data to use
1071 * @size: the number of bytes to flush
1073 * Flush @size bytes from the pad @data.
1075 * This function should be called with @pads STREAM_LOCK held, such as
1080 * Returns: The number of bytes flushed This can be less than @size and
1081 * is 0 if the pad was end-of-stream.
1084 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1091 g_return_val_if_fail (pads != NULL, 0);
1092 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1093 g_return_val_if_fail (data != NULL, 0);
1095 /* no buffer, must be EOS */
1096 if ((buffer = data->buffer) == NULL)
1099 bsize = gst_buffer_get_size (buffer);
1101 /* this is what we can flush at max */
1102 flushsize = MIN (size, bsize - data->pos);
1106 if (data->pos >= bsize)
1107 /* _clear will also reset data->pos to 0 */
1108 gst_collect_pads_clear (pads, data);
1114 * gst_collect_pads_read_buffer:
1115 * @pads: the collectpads to query
1116 * @data: the data to use
1117 * @size: the number of bytes to read
1119 * Get a subbuffer of @size bytes from the given pad @data.
1121 * This function should be called with @pads STREAM_LOCK held, such as in the
1126 * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
1127 * A return of %NULL signals that the pad is end-of-stream.
1128 * Unref the buffer after use.
1131 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
1137 g_return_val_if_fail (pads != NULL, NULL);
1138 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
1139 g_return_val_if_fail (data != NULL, NULL);
1141 /* no buffer, must be EOS */
1142 if ((buffer = data->buffer) == NULL)
1145 readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
1147 return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
1152 * gst_collect_pads_take_buffer:
1153 * @pads: the collectpads to query
1154 * @data: the data to use
1155 * @size: the number of bytes to read
1157 * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
1160 * This function should be called with @pads STREAM_LOCK held, such as in the
1165 * Returns: A sub buffer. The size of the buffer can be less that requested.
1166 * A return of %NULL signals that the pad is end-of-stream.
1167 * Unref the buffer after use.
1170 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
1173 GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
1176 gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
1182 * gst_collect_pads_set_waiting:
1183 * @pads: the collectpads
1184 * @data: the data to use
1185 * @waiting: boolean indicating whether this pad should operate
1186 * in waiting or non-waiting mode
1188 * Sets a pad to waiting or non-waiting mode, if at least this pad
1189 * has not been created with locked waiting state,
1190 * in which case nothing happens.
1192 * This function should be called with @pads STREAM_LOCK held, such as
1198 gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
1201 g_return_if_fail (pads != NULL);
1202 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
1203 g_return_if_fail (data != NULL);
1205 GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
1206 GST_PAD_NAME (data->pad), waiting,
1207 GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
1209 /* Do something only on a change and if not locked */
1210 if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
1211 (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
1213 /* Set waiting state for this pad */
1215 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
1217 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
1218 /* Update number of queued pads if needed */
1219 if (!data->buffer &&
1220 !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
1222 pads->priv->queuedpads--;
1224 pads->priv->queuedpads++;
1227 /* signal waiters because something changed */
1228 GST_COLLECT_PADS_EVT_BROADCAST (pads);
1232 /* see if pads were added or removed and update our stats. Any pad
1233 * added after releasing the LOCK will get collected in the next
1236 * We can do a quick check by checking the cookies, that get changed
1237 * whenever the pad list is updated.
1239 * Must be called with STREAM_LOCK.
1242 gst_collect_pads_check_pads (GstCollectPads * pads)
1244 /* the master list and cookie are protected with LOCK */
1245 GST_OBJECT_LOCK (pads);
1246 if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
1249 /* clear list and stats */
1250 g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1251 g_slist_free (pads->data);
1253 pads->priv->numpads = 0;
1254 pads->priv->queuedpads = 0;
1255 pads->priv->eospads = 0;
1256 if (pads->priv->earliest_data)
1257 unref_data (pads->priv->earliest_data);
1258 pads->priv->earliest_data = NULL;
1259 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1261 /* loop over the master pad list */
1262 collected = pads->priv->pad_list;
1263 for (; collected; collected = g_slist_next (collected)) {
1264 GstCollectData *data;
1266 /* update the stats */
1267 pads->priv->numpads++;
1268 data = collected->data;
1269 if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
1270 pads->priv->eospads++;
1271 else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
1272 GST_COLLECT_PADS_STATE_WAITING))
1273 pads->priv->queuedpads++;
1275 /* add to the list of pads to collect */
1277 /* preserve order of adding/requesting pads */
1278 pads->data = g_slist_append (pads->data, data);
1280 /* and update the cookie */
1281 pads->priv->cookie = pads->priv->pad_cookie;
1283 GST_OBJECT_UNLOCK (pads);
1286 /* checks if all the pads are collected and call the collectfunction
1288 * Should be called with STREAM_LOCK.
1290 * Returns: The #GstFlowReturn of collection.
1292 static GstFlowReturn
1293 gst_collect_pads_check_collected (GstCollectPads * pads)
1295 GstFlowReturn flow_ret = GST_FLOW_OK;
1296 GstCollectPadsFunction func;
1299 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1301 GST_OBJECT_LOCK (pads);
1302 func = pads->priv->func;
1303 user_data = pads->priv->user_data;
1304 GST_OBJECT_UNLOCK (pads);
1306 g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
1308 /* check for new pads, update stats etc.. */
1309 gst_collect_pads_check_pads (pads);
1311 if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
1312 /* If all our pads are EOS just collect once to let the element
1313 * do its final EOS handling. */
1314 GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
1315 pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
1317 if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1318 TRUE, FALSE) == TRUE)) {
1319 GST_INFO_OBJECT (pads, "finished seeking");
1322 flow_ret = func (pads, user_data);
1323 } while (flow_ret == GST_FLOW_OK);
1325 gboolean collected = FALSE;
1327 /* We call the collected function as long as our condition matches. */
1328 while (((pads->priv->queuedpads + pads->priv->eospads) >=
1329 pads->priv->numpads)) {
1330 GST_DEBUG_OBJECT (pads,
1331 "All active pads (%d + %d >= %d) have data, " "calling %s",
1332 pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
1333 GST_DEBUG_FUNCPTR_NAME (func));
1335 if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
1336 TRUE, FALSE) == TRUE)) {
1337 GST_INFO_OBJECT (pads, "finished seeking");
1339 flow_ret = func (pads, user_data);
1342 /* break on error */
1343 if (flow_ret != GST_FLOW_OK)
1345 /* Don't keep looping after telling the element EOS or flushing */
1346 if (pads->priv->queuedpads == 0)
1350 GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
1351 pads->priv->numpads);
1357 /* General overview:
1358 * - only pad with a buffer can determine earliest_data (and earliest_time)
1359 * - only segment info determines (non-)waiting state
1360 * - ? perhaps use _stream_time for comparison
1361 * (which muxers might have use as well ?)
1365 * Function to recalculate the waiting state of all pads.
1367 * Must be called with STREAM_LOCK.
1369 * Returns %TRUE if a pad was set to waiting
1370 * (from non-waiting state).
1373 gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
1376 gboolean result = FALSE;
1378 /* If earliest time is not known, there is nothing to do. */
1379 if (pads->priv->earliest_data == NULL)
1382 for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1383 GstCollectData *data = (GstCollectData *) collected->data;
1385 GstClockTime comp_time;
1387 /* check if pad has a segment */
1388 if (data->segment.format == GST_FORMAT_UNDEFINED) {
1389 GST_WARNING_OBJECT (pads,
1390 "GstCollectPads has no time segment, assuming 0 based.");
1391 gst_segment_init (&data->segment, GST_FORMAT_TIME);
1392 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1395 /* check segment format */
1396 if (data->segment.format != GST_FORMAT_TIME) {
1397 GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
1401 /* check if the waiting state should be changed */
1402 comp_time = data->segment.position;
1403 cmp_res = pads->priv->compare_func (pads, data, comp_time,
1404 pads->priv->earliest_data, pads->priv->earliest_time,
1405 pads->priv->compare_user_data);
1408 gst_collect_pads_set_waiting (pads, data, FALSE);
1410 if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
1412 gst_collect_pads_set_waiting (pads, data, TRUE);
1422 * gst_collect_pads_find_best_pad:
1423 * @pads: the collectpads to use
1424 * @data: returns the collectdata for earliest data
1425 * @time: returns the earliest available buffertime
1427 * Find the oldest/best pad, i.e. pad holding the oldest buffer and
1428 * and return the corresponding #GstCollectData and buffertime.
1430 * This function should be called with STREAM_LOCK held,
1431 * such as in the callback.
1434 gst_collect_pads_find_best_pad (GstCollectPads * pads,
1435 GstCollectData ** data, GstClockTime * time)
1438 GstCollectData *best = NULL;
1439 GstClockTime best_time = GST_CLOCK_TIME_NONE;
1441 g_return_if_fail (data != NULL);
1442 g_return_if_fail (time != NULL);
1444 for (collected = pads->data; collected; collected = g_slist_next (collected)) {
1446 GstCollectData *data = (GstCollectData *) collected->data;
1447 GstClockTime timestamp;
1449 buffer = gst_collect_pads_peek (pads, data);
1450 /* if we have a buffer check if it is better then the current best one */
1451 if (buffer != NULL) {
1452 timestamp = GST_BUFFER_DTS (buffer);
1453 if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1454 timestamp = GST_BUFFER_PTS (buffer);
1456 gst_buffer_unref (buffer);
1457 if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
1458 best, best_time, pads->priv->compare_user_data) < 0) {
1460 best_time = timestamp;
1465 /* set earliest time */
1469 GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
1470 best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
1471 GST_TIME_ARGS (best_time));
1475 * Function to recalculate earliest_data and earliest_timestamp. This also calls
1476 * gst_collect_pads_recalculate_waiting
1478 * Must be called with STREAM_LOCK.
1481 gst_collect_pads_recalculate_full (GstCollectPads * pads)
1483 if (pads->priv->earliest_data)
1484 unref_data (pads->priv->earliest_data);
1485 gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
1486 &pads->priv->earliest_time);
1487 if (pads->priv->earliest_data)
1488 ref_data (pads->priv->earliest_data);
1489 return gst_collect_pads_recalculate_waiting (pads);
1493 * Default collect callback triggered when #GstCollectPads gathered all data.
1495 * Called with STREAM_LOCK.
1497 static GstFlowReturn
1498 gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
1500 GstCollectData *best = NULL;
1502 GstFlowReturn ret = GST_FLOW_OK;
1503 GstCollectPadsBufferFunction func;
1504 gpointer buffer_user_data;
1506 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1508 GST_OBJECT_LOCK (pads);
1509 func = pads->priv->buffer_func;
1510 buffer_user_data = pads->priv->buffer_user_data;
1511 GST_OBJECT_UNLOCK (pads);
1513 g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
1515 /* Find the oldest pad at all cost */
1516 if (gst_collect_pads_recalculate_full (pads)) {
1517 /* waiting was switched on,
1518 * so give another thread a chance to deliver a possibly
1519 * older buffer; don't charge on yet with the current oldest */
1524 best = pads->priv->earliest_data;
1526 /* No data collected means EOS. */
1527 if (G_UNLIKELY (best == NULL)) {
1528 ret = func (pads, best, NULL, buffer_user_data);
1529 if (ret == GST_FLOW_OK)
1534 /* make sure that the pad we take a buffer from is waiting;
1535 * otherwise popping a buffer will seem not to have happened
1536 * and collectpads can get into a busy loop */
1537 gst_collect_pads_set_waiting (pads, best, TRUE);
1540 buffer = gst_collect_pads_pop (pads, best);
1541 ret = func (pads, best, buffer, buffer_user_data);
1543 /* maybe non-waiting was forced to waiting above due to
1544 * newsegment events coming too sparsely,
1545 * so re-check to restore state to avoid hanging/waiting */
1546 gst_collect_pads_recalculate_full (pads);
1553 * Default timestamp compare function.
1556 gst_collect_pads_default_compare_func (GstCollectPads * pads,
1557 GstCollectData * data1, GstClockTime timestamp1,
1558 GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
1561 GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
1562 " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
1563 GST_TIME_ARGS (timestamp2));
1564 /* non-valid timestamps go first as they are probably headers or so */
1565 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
1566 return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
1568 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
1571 /* compare timestamp */
1572 if (timestamp1 < timestamp2)
1575 if (timestamp1 > timestamp2)
1581 /* called with STREAM_LOCK */
1583 gst_collect_pads_handle_position_update (GstCollectPads * pads,
1584 GstCollectData * data, GstClockTime new_pos)
1588 /* If oldest time is not known, or current pad got newsegment;
1589 * recalculate the state */
1590 if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
1591 gst_collect_pads_recalculate_full (pads);
1595 /* Check if the waiting state of the pad should change. */
1597 pads->priv->compare_func (pads, data, new_pos,
1598 pads->priv->earliest_data, pads->priv->earliest_time,
1599 pads->priv->compare_user_data);
1603 gst_collect_pads_set_waiting (pads, data, FALSE);
1611 gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
1614 GstClockTime otime = time;
1615 GstBuffer *in, *out = NULL;
1617 if (pads->priv->clip_func) {
1618 in = gst_buffer_new ();
1619 GST_BUFFER_PTS (in) = time;
1620 GST_BUFFER_DTS (in) = time;
1621 pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
1623 otime = GST_BUFFER_PTS (out);
1624 gst_buffer_unref (out);
1626 /* FIXME should distinguish between ahead or after segment,
1627 * let's assume after segment and use some large time ... */
1628 otime = G_MAXINT64 / 2;
1636 * gst_collect_pads_event_default:
1637 * @pads: the collectpads to use
1638 * @data: collect data of corresponding pad
1639 * @event: event being processed
1640 * @discard: process but do not send event downstream
1642 * Default #GstCollectPads event handling that elements should always
1643 * chain up to to ensure proper operation. Element might however indicate
1644 * event should not be forwarded downstream.
1647 gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
1648 GstEvent * event, gboolean discard)
1650 gboolean res = TRUE;
1651 GstCollectPadsBufferFunction buffer_func;
1655 GST_OBJECT_LOCK (pads);
1656 buffer_func = pads->priv->buffer_func;
1657 GST_OBJECT_UNLOCK (pads);
1660 parent = GST_OBJECT_PARENT (pad);
1662 switch (GST_EVENT_TYPE (event)) {
1663 case GST_EVENT_FLUSH_START:
1665 if (g_atomic_int_get (&pads->priv->seeking)) {
1666 /* drop all but the first FLUSH_STARTs when seeking */
1667 if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_start,
1668 TRUE, FALSE) == FALSE)
1671 /* unblock collect pads */
1672 gst_pad_event_default (pad, parent, event);
1675 GST_COLLECT_PADS_STREAM_LOCK (pads);
1676 /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
1677 * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
1678 * non-flushing (which happens in gst_collect_pads_event_default).
1680 gst_collect_pads_set_flushing (pads, TRUE);
1682 if (pads->priv->flush_func)
1683 pads->priv->flush_func (pads, pads->priv->flush_user_data);
1685 g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
1686 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1690 /* forward event to unblock check_collected */
1691 GST_DEBUG_OBJECT (pad, "forwarding flush start");
1692 res = gst_pad_event_default (pad, parent, event);
1695 /* now unblock the chain function.
1696 * no cond per pad, so they all unblock,
1697 * non-flushing block again */
1698 GST_COLLECT_PADS_STREAM_LOCK (pads);
1699 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1700 gst_collect_pads_clear (pads, data);
1702 /* cater for possible default muxing functionality */
1704 /* restore to initial state */
1705 gst_collect_pads_set_waiting (pads, data, TRUE);
1706 /* if the current pad is affected, reset state, recalculate later */
1707 if (pads->priv->earliest_data == data) {
1709 pads->priv->earliest_data = NULL;
1710 pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
1714 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1719 case GST_EVENT_FLUSH_STOP:
1721 /* flush the 1 buffer queue */
1722 GST_COLLECT_PADS_STREAM_LOCK (pads);
1723 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
1724 gst_collect_pads_clear (pads, data);
1725 /* we need new segment info after the flush */
1726 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1727 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1728 /* if the pad was EOS, remove the EOS flag and
1729 * decrement the number of eospads */
1730 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
1731 GST_COLLECT_PADS_STATE_EOS))) {
1732 if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1733 GST_COLLECT_PADS_STATE_WAITING))
1734 pads->priv->queuedpads++;
1735 if (!g_atomic_int_get (&pads->priv->seeking)) {
1736 pads->priv->eospads--;
1738 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
1740 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1742 if (g_atomic_int_get (&pads->priv->seeking)) {
1743 if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
1754 GST_COLLECT_PADS_STREAM_LOCK (pads);
1755 /* if the pad was not EOS, make it EOS and so we
1756 * have one more eospad */
1757 if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
1758 GST_COLLECT_PADS_STATE_EOS))) {
1759 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
1760 if (!GST_COLLECT_PADS_STATE_IS_SET (data,
1761 GST_COLLECT_PADS_STATE_WAITING))
1762 pads->priv->queuedpads--;
1763 pads->priv->eospads++;
1765 /* check if we need collecting anything, we ignore the result. */
1766 gst_collect_pads_check_collected (pads);
1767 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1771 case GST_EVENT_SEGMENT:
1775 GST_COLLECT_PADS_STREAM_LOCK (pads);
1777 gst_event_copy_segment (event, &seg);
1779 GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
1781 /* default collection can not handle other segment formats than time */
1782 if (buffer_func && seg.format != GST_FORMAT_TIME) {
1783 GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
1784 "can only handle time segments. Non time segment ignored.");
1785 goto newsegment_done;
1788 /* need to update segment first */
1789 data->segment = seg;
1790 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
1792 /* now we can use for e.g. running time */
1794 gst_collect_pads_clip_time (pads, data, seg.start + seg.offset);
1796 data->segment = seg;
1798 /* default muxing functionality */
1800 goto newsegment_done;
1802 gst_collect_pads_handle_position_update (pads, data, seg.position);
1805 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1806 /* we must not forward this event since multiple segments will be
1807 * accumulated and this is certainly not what we want. */
1812 GstClockTime start, duration;
1814 GST_COLLECT_PADS_STREAM_LOCK (pads);
1816 gst_event_parse_gap (event, &start, &duration);
1817 /* FIXME, handle reverse playback case */
1818 if (GST_CLOCK_TIME_IS_VALID (duration))
1820 /* we do not expect another buffer until after gap,
1821 * so that is our position now */
1822 data->segment.position = gst_collect_pads_clip_time (pads, data, start);
1824 gst_collect_pads_handle_position_update (pads, data,
1825 data->segment.position);
1827 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
1830 case GST_EVENT_STREAM_START:
1831 /* drop stream start events, element must create its own start event,
1832 * we can't just forward the first random stream start event we get */
1834 case GST_EVENT_CAPS:
1837 /* forward other events */
1843 gst_event_unref (event);
1850 return gst_pad_event_default (pad, parent, event);
1860 event_forward_func (GstPad * pad, EventData * data)
1862 gboolean ret = TRUE;
1863 GstPad *peer = gst_pad_get_peer (pad);
1866 ret = gst_pad_send_event (peer, gst_event_ref (data->event));
1867 gst_object_unref (peer);
1870 data->result &= ret;
1871 /* Always send to all pads */
1876 forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
1883 gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
1885 gst_event_unref (event);
1891 * gst_collect_pads_src_event_default:
1892 * @pads: the collectpads to use
1893 * @pad: src #GstPad that received the event
1894 * @event: event being processed
1896 * Default #GstCollectPads event handling for the src pad of elements.
1897 * Elements can chain up to this to let flushing seek event handling
1898 * be done by GstCollectPads.
1903 gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
1907 gboolean res = TRUE;
1909 parent = GST_OBJECT_PARENT (pad);
1911 switch (GST_EVENT_TYPE (event)) {
1912 case GST_EVENT_SEEK:{
1915 pads->priv->eospads = 0;
1917 GST_INFO_OBJECT (pads, "starting seek");
1919 gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
1920 if (flags & GST_SEEK_FLAG_FLUSH) {
1921 g_atomic_int_set (&pads->priv->seeking, TRUE);
1922 g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
1923 /* forward the seek upstream */
1924 res = forward_event_to_all_sinkpads (pad, event);
1927 g_atomic_int_set (&pads->priv->seeking, FALSE);
1928 g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
1932 GST_INFO_OBJECT (pads, "seek done, result: %d", res);
1941 res = gst_pad_event_default (pad, parent, event);
1947 gst_collect_pads_event_default_internal (GstCollectPads * pads,
1948 GstCollectData * data, GstEvent * event, gpointer user_data)
1950 return gst_collect_pads_event_default (pads, data, event, FALSE);
1954 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1956 gboolean res = FALSE, need_unlock = FALSE;
1957 GstCollectData *data;
1958 GstCollectPads *pads;
1959 GstCollectPadsEventFunction event_func;
1960 gpointer event_user_data;
1962 /* some magic to get the managing collect_pads */
1963 GST_OBJECT_LOCK (pad);
1964 data = (GstCollectData *) gst_pad_get_element_private (pad);
1965 if (G_UNLIKELY (data == NULL))
1968 GST_OBJECT_UNLOCK (pad);
1972 pads = data->collect;
1974 GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
1975 GST_EVENT_TYPE_NAME (event));
1977 GST_OBJECT_LOCK (pads);
1978 event_func = pads->priv->event_func;
1979 event_user_data = pads->priv->event_user_data;
1980 GST_OBJECT_UNLOCK (pads);
1982 if (GST_EVENT_IS_SERIALIZED (event)) {
1983 GST_COLLECT_PADS_STREAM_LOCK (pads);
1987 if (G_LIKELY (event_func)) {
1988 res = event_func (pads, data, event, event_user_data);
1992 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2000 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2001 GST_OBJECT_UNLOCK (pad);
2007 * gst_collect_pads_query_default:
2008 * @pads: the collectpads to use
2009 * @data: collect data of corresponding pad
2010 * @query: query being processed
2011 * @discard: process but do not send event downstream
2013 * Default #GstCollectPads query handling that elements should always
2014 * chain up to to ensure proper operation. Element might however indicate
2015 * query should not be forwarded downstream.
2018 gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
2019 GstQuery * query, gboolean discard)
2021 gboolean res = TRUE;
2026 parent = GST_OBJECT_PARENT (pad);
2028 switch (GST_QUERY_TYPE (query)) {
2029 case GST_QUERY_SEEKING:
2033 /* don't pass it along as some (file)sink might claim it does
2034 * whereas with a collectpads in between that will not likely work */
2035 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
2036 gst_query_set_seeking (query, format, FALSE, 0, -1);
2046 return gst_pad_query_default (pad, parent, query);
2052 gst_collect_pads_query_default_internal (GstCollectPads * pads,
2053 GstCollectData * data, GstQuery * query, gpointer user_data)
2055 return gst_collect_pads_query_default (pads, data, query, FALSE);
2059 gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
2061 gboolean res = FALSE, need_unlock = FALSE;
2062 GstCollectData *data;
2063 GstCollectPads *pads;
2064 GstCollectPadsQueryFunction query_func;
2065 gpointer query_user_data;
2067 GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
2068 GST_QUERY_TYPE_NAME (query));
2070 /* some magic to get the managing collect_pads */
2071 GST_OBJECT_LOCK (pad);
2072 data = (GstCollectData *) gst_pad_get_element_private (pad);
2073 if (G_UNLIKELY (data == NULL))
2076 GST_OBJECT_UNLOCK (pad);
2078 pads = data->collect;
2080 GST_OBJECT_LOCK (pads);
2081 query_func = pads->priv->query_func;
2082 query_user_data = pads->priv->query_user_data;
2083 GST_OBJECT_UNLOCK (pads);
2085 if (GST_QUERY_IS_SERIALIZED (query)) {
2086 GST_COLLECT_PADS_STREAM_LOCK (pads);
2090 if (G_LIKELY (query_func)) {
2091 res = query_func (pads, data, query, query_user_data);
2095 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2103 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2104 GST_OBJECT_UNLOCK (pad);
2110 /* For each buffer we receive we check if our collected condition is reached
2111 * and if so we call the collected function. When this is done we check if
2112 * data has been unqueued. If data is still queued we wait holding the stream
2113 * lock to make sure no EOS event can happen while we are ready to be
2116 static GstFlowReturn
2117 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2119 GstCollectData *data;
2120 GstCollectPads *pads;
2122 GstBuffer **buffer_p;
2125 GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2127 /* some magic to get the managing collect_pads */
2128 GST_OBJECT_LOCK (pad);
2129 data = (GstCollectData *) gst_pad_get_element_private (pad);
2130 if (G_UNLIKELY (data == NULL))
2133 GST_OBJECT_UNLOCK (pad);
2135 pads = data->collect;
2137 GST_COLLECT_PADS_STREAM_LOCK (pads);
2138 /* if not started, bail out */
2139 if (G_UNLIKELY (!pads->priv->started))
2141 /* check if this pad is flushing */
2142 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2143 GST_COLLECT_PADS_STATE_FLUSHING)))
2145 /* pad was EOS, we can refuse this data */
2146 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2147 GST_COLLECT_PADS_STATE_EOS)))
2150 /* see if we need to clip */
2151 if (pads->priv->clip_func) {
2152 GstBuffer *outbuf = NULL;
2154 pads->priv->clip_func (pads, data, buffer, &outbuf,
2155 pads->priv->clip_user_data);
2158 if (G_UNLIKELY (outbuf == NULL))
2161 if (G_UNLIKELY (ret == GST_FLOW_EOS))
2163 else if (G_UNLIKELY (ret != GST_FLOW_OK))
2167 GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
2168 GST_DEBUG_PAD_NAME (pad));
2170 /* One more pad has data queued */
2171 if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
2172 pads->priv->queuedpads++;
2173 buffer_p = &data->buffer;
2174 gst_buffer_replace (buffer_p, buffer);
2176 /* update segment last position if in TIME */
2177 if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
2178 GstClockTime timestamp;
2180 timestamp = GST_BUFFER_DTS (buffer);
2181 if (!GST_CLOCK_TIME_IS_VALID (timestamp))
2182 timestamp = GST_BUFFER_PTS (buffer);
2184 if (GST_CLOCK_TIME_IS_VALID (timestamp))
2185 data->segment.position = timestamp;
2188 /* While we have data queued on this pad try to collect stuff */
2190 /* Check if our collected condition is matched and call the collected
2191 * function if it is */
2192 ret = gst_collect_pads_check_collected (pads);
2193 /* when an error occurs, we want to report this back to the caller ASAP
2194 * without having to block if the buffer was not popped */
2195 if (G_UNLIKELY (ret != GST_FLOW_OK))
2198 /* data was consumed, we can exit and accept new data */
2199 if (data->buffer == NULL)
2202 /* Having the _INIT here means we don't care about any broadcast up to here
2203 * (most of which occur with STREAM_LOCK held, so could not have happened
2204 * anyway). We do care about e.g. a remove initiated broadcast as of this
2205 * point. Putting it here also makes this thread ignores any evt it raised
2206 * itself (as is a usual WAIT semantic).
2208 GST_COLLECT_PADS_EVT_INIT (cookie);
2210 /* pad could be removed and re-added */
2212 GST_OBJECT_LOCK (pad);
2213 if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
2216 GST_OBJECT_UNLOCK (pad);
2218 GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
2219 GST_DEBUG_PAD_NAME (pad));
2221 /* wait to be collected, this must happen from another thread triggered
2222 * by the _chain function of another pad. We release the lock so we
2223 * can get stopped or flushed as well. We can however not get EOS
2224 * because we still hold the STREAM_LOCK.
2226 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2227 GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
2228 GST_COLLECT_PADS_STREAM_LOCK (pads);
2230 GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
2232 /* after a signal, we could be stopped */
2233 if (G_UNLIKELY (!pads->priv->started))
2235 /* check if this pad is flushing */
2236 if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
2237 GST_COLLECT_PADS_STATE_FLUSHING)))
2240 while (data->buffer != NULL);
2243 GST_COLLECT_PADS_STREAM_UNLOCK (pads);
2244 /* data is definitely NULL if pad_removed goto was run. */
2248 gst_buffer_unref (buffer);
2253 GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2254 GST_OBJECT_UNLOCK (pad);
2255 ret = GST_FLOW_NOT_LINKED;
2261 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
2262 GST_OBJECT_UNLOCK (pad);
2263 gst_buffer_unref (buffer);
2264 return GST_FLOW_NOT_LINKED;
2268 GST_DEBUG ("not started");
2269 gst_collect_pads_clear (pads, data);
2270 ret = GST_FLOW_FLUSHING;
2275 GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
2276 gst_collect_pads_clear (pads, data);
2277 ret = GST_FLOW_FLUSHING;
2282 /* we should not post an error for this, just inform upstream that
2283 * we don't expect anything anymore */
2284 GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
2290 GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
2296 /* we print the error, the element should post a reasonable error
2297 * message for fatal errors */
2298 GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
2299 gst_collect_pads_clear (pads, data);