2 * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
22 * SECTION:gstcollectpads
23 * @short_description: manages a set of pads that operate in collect mode
26 * Manages a set of pads that operate in collect mode. This means that control
27 * is given to the manager of this object when all pads have data.
30 * Collectpads are created with gst_collect_pads_new(). A callback should then
31 * be installed with gst_collect_pads_set_function ().
34 * Pads are added to the collection with gst_collect_pads_add_pad()/
35 * gst_collect_pads_remove_pad(). The pad
36 * has to be a sinkpad. The chain function of the pad is
37 * overridden. The element_private of the pad is used to store
38 * private information.
41 * For each pad, data is queued in the chain function or by
42 * performing a pull_range.
45 * When data is queued on all pads, the callback function is called.
48 * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
49 * One can peek at the data with the gst_collect_pads_peek() function.
50 * These functions will return NULL if the pad received an EOS event. When all
51 * pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
55 * Data can also be dequeued in byte units using the gst_collect_pads_available(),
56 * gst_collect_pads_read() and gst_collect_pads_flush() calls.
59 * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
60 * their state change functions to start and stop the processing of the collecpads.
61 * The gst_collect_pads_stop() call should be called before calling the parent
62 * element state change function in the PAUSED_TO_READY state change to ensure
63 * no pad is blocked and the element can finish streaming.
66 * gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
67 * elements that start a #GstTask to drive the collect_pads.
72 #include "gstcollectpads.h"
74 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
75 #define GST_CAT_DEFAULT collect_pads_debug
77 GST_BOILERPLATE (GstCollectPads, gst_collect_pads, GstObject, GST_TYPE_OBJECT)
79 static GstFlowReturn gst_collect_pads_chain (GstPad * pad,
81 static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event);
82 static void gst_collect_pads_finalize (GObject * object);
83 static void gst_collect_pads_init (GstCollectPads * pads,
84 GstCollectPadsClass * g_class);
86 static void gst_collect_pads_base_init (gpointer g_class)
88 GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collect_pads", 0,
93 gst_collect_pads_class_init (GstCollectPadsClass * klass)
95 GObjectClass *gobject_class = (GObjectClass *) klass;
97 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
101 gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class)
103 pads->cond = g_cond_new ();
107 pads->queuedpads = 0;
109 pads->started = FALSE;
113 gst_collect_pads_finalize (GObject * object)
115 GstCollectPads *pads = GST_COLLECT_PADS (object);
117 gst_collect_pads_stop (pads);
118 g_cond_free (pads->cond);
119 /* FIXME, free data */
121 G_OBJECT_CLASS (parent_class)->finalize (object);
125 * gst_collect_pads_new:
127 * Create a new instance of #GstCollectsPads.
129 * Returns: a new #GstCollectPads, or NULL in case of an error.
134 gst_collect_pads_new (void)
136 GstCollectPads *newcoll;
138 newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
144 * gst_collect_pads_set_function:
145 * @pads: the collectspads to use
146 * @func: the function to set
147 * @user_data: user data passed to the function
149 * Set the callback function and user data that will be called when
150 * all the pads added to the collection have buffers queued.
155 gst_collect_pads_set_function (GstCollectPads * pads,
156 GstCollectPadsFunction func, gpointer user_data)
158 g_return_if_fail (pads != NULL);
159 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
161 GST_OBJECT_LOCK (pads);
163 pads->user_data = user_data;
164 GST_OBJECT_UNLOCK (pads);
168 * gst_collect_pads_add_pad:
169 * @pads: the collectspads to use
170 * @pad: the pad to add
171 * @size: the size of the returned GstCollectData structure
173 * Add a pad to the collection of collect pads. The pad has to be
176 * You specify a size for the returned #GstCollectData structure
177 * so that you can use it to store additional information.
179 * Returns: a new #GstCollectData to identify the new pad. Or NULL
180 * if wrong parameters are supplied.
185 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
187 GstCollectData *data;
189 g_return_val_if_fail (pads != NULL, NULL);
190 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
191 g_return_val_if_fail (pad != NULL, NULL);
192 g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
193 g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
195 data = g_malloc0 (size);
196 data->collect = pads;
199 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
201 GST_OBJECT_LOCK (pads);
202 pads->data = g_slist_append (pads->data, data);
203 gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
204 gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
205 gst_pad_set_element_private (pad, data);
208 GST_OBJECT_UNLOCK (pads);
214 find_pad (GstCollectData * data, GstPad * pad)
216 if (data->pad == pad)
222 * gst_collect_pads_remove_pad:
223 * @pads: the collectspads to use
224 * @pad: the pad to remove
226 * Remove a pad from the collection of collect pads.
228 * Returns: TRUE if the pad could be removed.
233 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
237 g_return_val_if_fail (pads != NULL, FALSE);
238 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
239 g_return_val_if_fail (pad != NULL, FALSE);
240 g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
242 GST_OBJECT_LOCK (pads);
243 list = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
246 pads->data = g_slist_delete_link (pads->data, list);
250 GST_OBJECT_UNLOCK (pads);
256 * gst_collect_pads_is_active:
257 * @pads: the collectspads to use
258 * @pad: the pad to check
260 * Check if a pad is active.
262 * Returns: TRUE if the pad is active.
267 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
269 g_return_val_if_fail (pads != NULL, FALSE);
270 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
271 g_return_val_if_fail (pad != NULL, FALSE);
272 g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
274 g_warning ("gst_collect_pads_is_active() is not implemented");
280 * gst_collect_pads_collect:
281 * @pads: the collectspads to use
283 * Collect data on all pads. This function is usually called
284 * from a GstTask function in an element. This function is
285 * currently not implemented.
287 * Returns: GstFlowReturn of the operation.
292 gst_collect_pads_collect (GstCollectPads * pads)
294 g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
295 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
297 g_warning ("gst_collect_pads_collect() is not implemented");
299 return GST_FLOW_ERROR;
303 * gst_collect_pads_collect_range:
304 * @pads: the collectspads to use
305 * @offset: the offset to collect
306 * @length: the length to collect
308 * Collect data with @offset and @length on all pads. This function
309 * is typically called in the getrange function of an element. This
310 * function is currently not implemented.
312 * Returns: GstFlowReturn of the operation.
317 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
320 g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
321 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
323 g_warning ("gst_collect_pads_collect_range() is not implemented");
325 return GST_FLOW_ERROR;
329 * gst_collect_pads_start:
330 * @pads: the collectspads to use
332 * Starts the processing of data in the collect_pads.
337 gst_collect_pads_start (GstCollectPads * pads)
339 g_return_if_fail (pads != NULL);
340 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
342 GST_OBJECT_LOCK (pads);
343 pads->started = TRUE;
344 GST_OBJECT_UNLOCK (pads);
348 * gst_collect_pads_stop:
349 * @pads: the collectspads to use
351 * Stops the processing of data in the collect_pads. this function
352 * will also unblock any blocking operations.
357 gst_collect_pads_stop (GstCollectPads * pads)
359 g_return_if_fail (pads != NULL);
360 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
362 GST_OBJECT_LOCK (pads);
363 pads->started = FALSE;
364 GST_COLLECT_PADS_BROADCAST (pads);
365 GST_OBJECT_UNLOCK (pads);
369 * gst_collect_pads_peek:
370 * @pads: the collectspads to peek
371 * @data: the data to use
373 * Peek at the buffer currently queued in @data. This function
374 * should be called with the @pads LOCK held, such as in the callback
377 * Returns: The buffer in @data or NULL if no buffer is queued.
378 * should unref the buffer after usage.
383 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
387 g_return_val_if_fail (pads != NULL, NULL);
388 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
389 g_return_val_if_fail (data != NULL, NULL);
391 result = data->buffer;
394 gst_buffer_ref (result);
396 GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
397 GST_DEBUG_PAD_NAME (data->pad), result);
403 * gst_collect_pads_pop:
404 * @pads: the collectspads to pop
405 * @data: the data to use
407 * Pop the buffer currently queued in @data. This function
408 * should be called with the @pads LOCK held, such as in the callback
411 * Returns: The buffer in @data or NULL if no buffer was queued.
412 * You should unref the buffer after usage.
417 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
421 g_return_val_if_fail (pads != NULL, NULL);
422 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
423 g_return_val_if_fail (data != NULL, NULL);
425 result = data->buffer;
427 gst_buffer_replace (&data->buffer, NULL);
432 GST_COLLECT_PADS_SIGNAL (pads);
434 GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p",
435 GST_DEBUG_PAD_NAME (data->pad), result);
441 * gst_collect_pads_available:
442 * @pads: the collectspads to query
444 * Query how much bytes can be read from each queued buffer. This means
445 * that the result of this call is the maximum number of bytes that can
446 * be read from each of the pads.
448 * This function should be called with @pads LOCK held, such as
451 * Returns: The maximum number of bytes queued on all pad. This function
452 * returns 0 if a pad has no queued buffer.
457 gst_collect_pads_available (GstCollectPads * pads)
460 guint result = G_MAXUINT;
462 g_return_val_if_fail (pads != NULL, 0);
463 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
465 for (collected = pads->data; collected; collected = g_slist_next (collected)) {
466 GstCollectData *pdata;
469 pdata = (GstCollectData *) collected->data;
471 if (pdata->buffer == NULL)
474 size = GST_BUFFER_SIZE (pdata->buffer) - pdata->pos;
488 * gst_collect_pads_read:
489 * @pads: the collectspads to query
490 * @data: the data to use
491 * @bytes: a pointer to a byte array
492 * @size: the number of bytes to read
494 * Get a pointer in @bytes where @size bytes can be read from the
497 * This function should be called with @pads LOCK held, such as
500 * Returns: The number of bytes available for consumption in the
501 * memory pointed to by @bytes. This can be less than @size and
502 * is 0 if the pad is end-of-stream.
507 gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data,
508 guint8 ** bytes, guint size)
512 g_return_val_if_fail (pads != NULL, 0);
513 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
514 g_return_val_if_fail (data != NULL, 0);
515 g_return_val_if_fail (bytes != NULL, 0);
517 readsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
519 *bytes = GST_BUFFER_DATA (data->buffer) + data->pos;
525 * gst_collect_pads_flush:
526 * @pads: the collectspads to query
527 * @data: the data to use
528 * @size: the number of bytes to flush
530 * Flush @size bytes from the pad @data.
532 * This function should be called with @pads LOCK held, such as
535 * Returns: The number of bytes flushed This can be less than @size and
536 * is 0 if the pad was end-of-stream.
541 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
546 g_return_val_if_fail (pads != NULL, 0);
547 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
548 g_return_val_if_fail (data != NULL, 0);
550 flushsize = MIN (size, GST_BUFFER_SIZE (data->buffer) - data->pos);
554 if (data->pos >= GST_BUFFER_SIZE (data->buffer)) {
557 buf = gst_collect_pads_pop (pads, data);
558 gst_buffer_unref (buf);
565 gst_collect_pads_event (GstPad * pad, GstEvent * event)
567 GstCollectData *data;
568 GstCollectPads *pads;
570 /* some magic to get the managing collect_pads */
571 data = (GstCollectData *) gst_pad_get_element_private (pad);
575 pads = data->collect;
577 GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
578 GST_DEBUG_PAD_NAME (data->pad));
580 switch (GST_EVENT_TYPE (event)) {
583 GstFlowReturn ret = GST_FLOW_OK;
585 GST_OBJECT_LOCK (pads);
589 /* if all pads are EOS and we have a function, call it */
590 if ((pads->eospads == pads->numpads) && pads->func) {
591 ret = pads->func (pads, pads->user_data);
594 GST_OBJECT_UNLOCK (pads);
596 /* We eat this event */
597 gst_event_unref (event);
601 case GST_EVENT_NEWSEGMENT:
603 gint64 start, stop, time;
608 gst_event_parse_new_segment (event, &update, &rate, &format,
609 &start, &stop, &time);
611 gst_segment_set_newsegment (&data->segment, update, rate, format,
620 return gst_pad_event_default (pad, event);
625 GST_DEBUG ("collect_pads not ours");
632 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
634 GstCollectData *data;
635 GstCollectPads *pads;
639 GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
641 /* some magic to get the managing collect_pads */
642 data = (GstCollectData *) gst_pad_get_element_private (pad);
646 pads = data->collect;
647 size = GST_BUFFER_SIZE (buffer);
649 GST_OBJECT_LOCK (pads);
651 /* if not started, bail out */
655 /* Call the collected callback until a pad with a buffer is popped. */
656 while (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func)
657 ret = pads->func (pads, pads->user_data);
659 /* queue buffer on this pad, block if filled */
660 while (data->buffer != NULL) {
661 GST_DEBUG ("Pad %s:%s already has a buffer queued, waiting",
662 GST_DEBUG_PAD_NAME (pad));
663 GST_COLLECT_PADS_WAIT (pads);
664 GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
665 /* after a signal, we could be stopped */
670 GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
671 GST_DEBUG_PAD_NAME (pad));
674 gst_buffer_replace (&data->buffer, buffer);
676 /* if all pads have data and we have a function, call it */
677 if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) {
678 GST_DEBUG ("All active pads have data, calling %s",
679 GST_DEBUG_FUNCPTR_NAME (pads->func));
680 ret = pads->func (pads, pads->user_data);
682 GST_DEBUG ("Not all active pads have data, continuing");
685 GST_OBJECT_UNLOCK (pads);
692 GST_DEBUG ("collect_pads not ours");
693 return GST_FLOW_ERROR;
697 GST_OBJECT_UNLOCK (pads);
698 GST_DEBUG ("collect_pads not started");
699 return GST_FLOW_WRONG_STATE;