d02fd303f405d11d61dd72a8e0aa7f2ceae4ecad
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  *
4  * gstcollectpads.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21 /**
22  * SECTION:gstcollectpads
23  * @short_description: manages a set of pads that operate in collect mode
24  * @see_also:
25  *
26  * Manages a set of pads that operate in collect mode. This means that control
27  * is given to the manager of this object when all pads have data.
28  * <itemizedlist>
29  *   <listitem><para>
30  *     Collectpads are created with gst_collect_pads_new(). A callback should then
31  *     be installed with gst_collect_pads_set_function ().
32  *   </para></listitem>
33  *   <listitem><para>
34  *     Pads are added to the collection with gst_collect_pads_add_pad()/
35  *     gst_collect_pads_remove_pad(). The pad
36  *     has to be a sinkpad. The chain and event functions of the pad are
37  *     overridden. The element_private of the pad is used to store
38  *     private information for the collectpads.
39  *   </para></listitem>
40  *   <listitem><para>
41  *     For each pad, data is queued in the _chain function or by
42  *     performing a pull_range.
43  *   </para></listitem>
44  *   <listitem><para>
45  *     When data is queued on all pads, the callback function is called.
46  *   </para></listitem>
47  *   <listitem><para>
48  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
49  *     One can peek at the data with the gst_collect_pads_peek() function.
50  *     These functions will return NULL if the pad received an EOS event. When all
51  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
52  *     event itself.
53  *   </para></listitem>
54  *   <listitem><para>
55  *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
56  *     gst_collect_pads_read() and gst_collect_pads_flush() calls.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
60  *     their state change functions to start and stop the processing of the collecpads.
61  *     The gst_collect_pads_stop() call should be called before calling the parent
62  *     element state change function in the PAUSED_TO_READY state change to ensure
63  *     no pad is blocked and the element can finish streaming.
64  *   </para></listitem>
65  *   <listitem><para>
66  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
67  *     elements that start a #GstTask to drive the collect_pads. This feature is however
68  *     not yet implemented.
69  *   </para></listitem>
70  * </itemizedlist>
71  *
72  * Last reviewed on 2006-05-10 (0.10.6)
73  */
74
75 #include "gstcollectpads.h"
76 #include "gst/glib-compat-private.h"
77
78 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
79 #define GST_CAT_DEFAULT collect_pads_debug
80
81 #define GST_COLLECT_PADS_GET_PRIVATE(obj)  \
82   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_COLLECT_PADS, GstCollectPadsPrivate))
83
84 struct _GstCollectPadsPrivate
85 {
86   GstCollectPadsClipFunction clipfunc;
87   gpointer clipfunc_user_data;
88 };
89
90 #define gst_collect_pads_parent_class parent_class
91 G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
92
93 static void gst_collect_pads_clear (GstCollectPads * pads,
94     GstCollectData * data);
95 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
96     GstBuffer * buffer);
97 static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
98     GstEvent * event);
99 static void gst_collect_pads_finalize (GObject * object);
100 static void ref_data (GstCollectData * data);
101 static void unref_data (GstCollectData * data);
102 static void gst_collect_pads_check_pads_unlocked (GstCollectPads * pads);
103
104 static void
105 gst_collect_pads_class_init (GstCollectPadsClass * klass)
106 {
107   GObjectClass *gobject_class = (GObjectClass *) klass;
108
109   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
110
111   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
112       "GstCollectPads");
113
114   gobject_class->finalize = gst_collect_pads_finalize;
115 }
116
117 static void
118 gst_collect_pads_init (GstCollectPads * pads)
119 {
120   pads->priv = GST_COLLECT_PADS_GET_PRIVATE (pads);
121
122   pads->cond = g_cond_new ();
123   pads->data = NULL;
124   pads->cookie = 0;
125   pads->numpads = 0;
126   pads->queuedpads = 0;
127   pads->eospads = 0;
128   pads->started = FALSE;
129
130   /* members to manage the pad list */
131   pads->pad_lock = g_mutex_new ();
132   pads->pad_cookie = 0;
133   pads->pad_list = NULL;
134 }
135
136 static void
137 gst_collect_pads_finalize (GObject * object)
138 {
139   GSList *collected;
140   GstCollectPads *pads = GST_COLLECT_PADS (object);
141
142   GST_DEBUG ("finalize");
143
144   g_cond_free (pads->cond);
145   g_mutex_free (pads->pad_lock);
146
147   /* Remove pads */
148   collected = pads->pad_list;
149   for (; collected; collected = g_slist_next (collected)) {
150     GstCollectData *pdata = (GstCollectData *) collected->data;
151
152     unref_data (pdata);
153   }
154   /* Free pads list */
155   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
156   g_slist_free (pads->data);
157   g_slist_free (pads->pad_list);
158
159   G_OBJECT_CLASS (parent_class)->finalize (object);
160 }
161
162 /**
163  * gst_collect_pads_new:
164  *
165  * Create a new instance of #GstCollectPads.
166  *
167  * MT safe.
168  *
169  * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
170  */
171 GstCollectPads *
172 gst_collect_pads_new (void)
173 {
174   GstCollectPads *newcoll;
175
176   newcoll = g_object_newv (GST_TYPE_COLLECT_PADS, 0, NULL);
177
178   return newcoll;
179 }
180
181 /**
182  * gst_collect_pads_set_function:
183  * @pads: the collectspads to use
184  * @func: the function to set
185  * @user_data: (closure): user data passed to the function
186  *
187  * Set the callback function and user data that will be called when
188  * all the pads added to the collection have buffers queued.
189  *
190  * MT safe.
191  */
192 void
193 gst_collect_pads_set_function (GstCollectPads * pads,
194     GstCollectPadsFunction func, gpointer user_data)
195 {
196   g_return_if_fail (pads != NULL);
197   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
198
199   GST_OBJECT_LOCK (pads);
200   pads->func = func;
201   pads->user_data = user_data;
202   GST_OBJECT_UNLOCK (pads);
203 }
204
205 static void
206 ref_data (GstCollectData * data)
207 {
208   g_assert (data != NULL);
209
210   g_atomic_int_inc (&(data->refcount));
211 }
212
213 static void
214 unref_data (GstCollectData * data)
215 {
216   g_assert (data != NULL);
217   g_assert (data->refcount > 0);
218
219   if (!g_atomic_int_dec_and_test (&(data->refcount)))
220     return;
221
222   if (data->destroy_notify)
223     data->destroy_notify (data);
224   g_object_unref (data->pad);
225   if (data->buffer)
226     gst_buffer_unref (data->buffer);
227   g_free (data);
228 }
229
230 /**
231  * gst_collect_pads_add_pad:
232  * @pads: the collectspads to use
233  * @pad: (transfer none): the pad to add
234  * @size: the size of the returned #GstCollectData structure
235  * @destroy_notify: function to be called before the returned #GstCollectData
236  * structure is freed
237  *
238  * Add a pad to the collection of collect pads. The pad has to be
239  * a sinkpad. The refcount of the pad is incremented. Use
240  * gst_collect_pads_remove_pad() to remove the pad from the collection
241  * again.
242  *
243  * You specify a size for the returned #GstCollectData structure
244  * so that you can use it to store additional information.
245  *
246  * You can also specify a #GstCollectDataDestroyNotify that will be called
247  * just before the #GstCollectData structure is freed. It is passed the
248  * pointer to the structure and should free any custom memory and resources
249  * allocated for it.
250  *
251  * The pad will be automatically activated in push mode when @pads is
252  * started.
253  *
254  * MT safe.
255  *
256  * Returns: a new #GstCollectData to identify the new pad. Or NULL
257  *   if wrong parameters are supplied.
258  */
259 GstCollectData *
260 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size,
261     GstCollectDataDestroyNotify destroy_notify)
262 {
263   GstCollectData *data;
264
265   g_return_val_if_fail (pads != NULL, NULL);
266   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
267   g_return_val_if_fail (pad != NULL, NULL);
268   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
269   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
270
271   GST_DEBUG ("adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
272
273   data = g_malloc0 (size);
274   data->collect = pads;
275   data->pad = gst_object_ref (pad);
276   data->buffer = NULL;
277   data->pos = 0;
278   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
279   data->flushing = FALSE;
280   data->new_segment = FALSE;
281   data->eos = FALSE;
282   data->refcount = 1;
283   data->destroy_notify = destroy_notify;
284
285   GST_COLLECT_PADS_PAD_LOCK (pads);
286   GST_OBJECT_LOCK (pad);
287   gst_pad_set_element_private (pad, data);
288   GST_OBJECT_UNLOCK (pad);
289   pads->pad_list = g_slist_append (pads->pad_list, data);
290   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
291   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
292   /* activate the pad when needed */
293   if (pads->started)
294     gst_pad_set_active (pad, TRUE);
295   pads->pad_cookie++;
296   GST_COLLECT_PADS_PAD_UNLOCK (pads);
297
298   return data;
299 }
300
301 static gint
302 find_pad (GstCollectData * data, GstPad * pad)
303 {
304   if (data->pad == pad)
305     return 0;
306   return 1;
307 }
308
309 /**
310  * gst_collect_pads_set_clip_function:
311  * @pads: the collectspads to use
312  * @clipfunc: clip function to install
313  * @user_data: (closure): user data to pass to @clip_func
314  *
315  * Install a clipping function that is called right after a buffer is received
316  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
317  *
318  * Since: 0.10.26
319  */
320 void
321 gst_collect_pads_set_clip_function (GstCollectPads * pads,
322     GstCollectPadsClipFunction clipfunc, gpointer user_data)
323 {
324   GstCollectPadsPrivate *priv;
325
326   g_return_if_fail (pads != NULL);
327   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
328
329   priv = pads->priv;
330
331   priv->clipfunc = clipfunc;
332   priv->clipfunc_user_data = user_data;
333 }
334
335 /**
336  * gst_collect_pads_remove_pad:
337  * @pads: the collectspads to use
338  * @pad: (transfer none): the pad to remove
339  *
340  * Remove a pad from the collection of collect pads. This function will also
341  * free the #GstCollectData and all the resources that were allocated with
342  * gst_collect_pads_add_pad().
343  *
344  * The pad will be deactivated automatically when @pads is stopped.
345  *
346  * MT safe.
347  *
348  * Returns: %TRUE if the pad could be removed.
349  */
350 gboolean
351 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
352 {
353   GstCollectData *data;
354   GSList *list;
355
356   g_return_val_if_fail (pads != NULL, FALSE);
357   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
358   g_return_val_if_fail (pad != NULL, FALSE);
359   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
360
361   GST_DEBUG ("removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
362
363   GST_COLLECT_PADS_PAD_LOCK (pads);
364   list = g_slist_find_custom (pads->pad_list, pad, (GCompareFunc) find_pad);
365   if (!list)
366     goto unknown_pad;
367
368   data = (GstCollectData *) list->data;
369
370   GST_DEBUG ("found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad), data);
371
372   /* clear the stuff we configured */
373   gst_pad_set_chain_function (pad, NULL);
374   gst_pad_set_event_function (pad, NULL);
375   GST_OBJECT_LOCK (pad);
376   gst_pad_set_element_private (pad, NULL);
377   GST_OBJECT_UNLOCK (pad);
378
379   /* backward compat, also remove from data if stopped, note that this function
380    * can only be called when we are stopped because we don't take the LOCK to
381    * protect the pads->data list. */
382   if (!pads->started) {
383     GSList *dlist;
384
385     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
386     if (dlist) {
387       GstCollectData *pdata = dlist->data;
388
389       pads->data = g_slist_delete_link (pads->data, dlist);
390       unref_data (pdata);
391     }
392   }
393   /* remove from the pad list */
394   pads->pad_list = g_slist_delete_link (pads->pad_list, list);
395   pads->pad_cookie++;
396
397   /* signal waiters because something changed */
398   GST_COLLECT_PADS_BROADCAST (pads);
399
400   /* deactivate the pad when needed */
401   if (!pads->started)
402     gst_pad_set_active (pad, FALSE);
403
404   /* clean and free the collect data */
405   unref_data (data);
406
407   GST_COLLECT_PADS_PAD_UNLOCK (pads);
408
409   return TRUE;
410
411 unknown_pad:
412   {
413     GST_WARNING ("cannot remove unknown pad %s:%s", GST_DEBUG_PAD_NAME (pad));
414     GST_COLLECT_PADS_PAD_UNLOCK (pads);
415     return FALSE;
416   }
417 }
418
419 /**
420  * gst_collect_pads_is_active:
421  * @pads: (transfer none): the collectspads to use
422  * @pad: the pad to check
423  *
424  * Check if a pad is active.
425  *
426  * This function is currently not implemented.
427  *
428  * MT safe.
429  *
430  * Returns: %TRUE if the pad is active.
431  */
432 gboolean
433 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
434 {
435   g_return_val_if_fail (pads != NULL, FALSE);
436   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
437   g_return_val_if_fail (pad != NULL, FALSE);
438   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
439
440   g_warning ("gst_collect_pads_is_active() is not implemented");
441
442   return FALSE;
443 }
444
445 /**
446  * gst_collect_pads_collect:
447  * @pads: the collectspads to use
448  *
449  * Collect data on all pads. This function is usually called
450  * from a #GstTask function in an element.
451  *
452  * This function is currently not implemented.
453  *
454  * MT safe.
455  *
456  * Returns: #GstFlowReturn of the operation.
457  */
458 GstFlowReturn
459 gst_collect_pads_collect (GstCollectPads * pads)
460 {
461   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
462   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
463
464   g_warning ("gst_collect_pads_collect() is not implemented");
465
466   return GST_FLOW_NOT_SUPPORTED;
467 }
468
469 /**
470  * gst_collect_pads_collect_range:
471  * @pads: the collectspads to use
472  * @offset: the offset to collect
473  * @length: the length to collect
474  *
475  * Collect data with @offset and @length on all pads. This function
476  * is typically called in the getrange function of an element.
477  *
478  * This function is currently not implemented.
479  *
480  * MT safe.
481  *
482  * Returns: #GstFlowReturn of the operation.
483  */
484 GstFlowReturn
485 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
486     guint length)
487 {
488   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
489   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
490
491   g_warning ("gst_collect_pads_collect_range() is not implemented");
492
493   return GST_FLOW_NOT_SUPPORTED;
494 }
495
496 static gboolean
497 gst_collect_pads_is_flushing (GstCollectPads * pads)
498 {
499   GSList *walk = NULL;
500   gboolean res = TRUE;
501
502   GST_COLLECT_PADS_PAD_LOCK (pads);
503
504   /* Ensure pads->data state */
505   gst_collect_pads_check_pads_unlocked (pads);
506
507   GST_DEBUG ("Getting flushing state (pads:%p, pads->data:%p)",
508       pads, pads->data);
509
510   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
511     GstCollectData *cdata = walk->data;
512
513     GST_DEBUG_OBJECT (cdata->pad, "flushing:%d", cdata->flushing);
514
515     if (cdata->flushing) {
516       goto done;
517     }
518   }
519
520   res = FALSE;
521 done:
522   GST_COLLECT_PADS_PAD_UNLOCK (pads);
523   return res;
524 }
525
526 /* FIXME, I think this function is used to work around bad behaviour
527  * of elements that add pads to themselves without activating them.
528  *
529  * Must be called with PAD_LOCK.
530  */
531 static void
532 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
533     gboolean flushing)
534 {
535   GSList *walk = NULL;
536
537   GST_DEBUG ("Setting flushing (%d)", flushing);
538
539   /* Update the pads flushing flag */
540   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
541     GstCollectData *cdata = walk->data;
542
543     if (GST_IS_PAD (cdata->pad)) {
544       GST_OBJECT_LOCK (cdata->pad);
545       if (flushing)
546         GST_PAD_SET_FLUSHING (cdata->pad);
547       else
548         GST_PAD_UNSET_FLUSHING (cdata->pad);
549       cdata->flushing = flushing;
550       gst_collect_pads_clear (pads, cdata);
551       GST_OBJECT_UNLOCK (cdata->pad);
552     }
553   }
554   /* Setting the pads to flushing means that we changed the values which
555    * are 'protected' by the cookie. We therefore update it to force a
556    * recalculation of the current pad status. */
557   pads->pad_cookie++;
558 }
559
560 /**
561  * gst_collect_pads_set_flushing:
562  * @pads: the collectspads to use
563  * @flushing: desired state of the pads
564  *
565  * Change the flushing state of all the pads in the collection. No pad
566  * is able to accept anymore data when @flushing is %TRUE. Calling this
567  * function with @flushing %FALSE makes @pads accept data again.
568  *
569  * MT safe.
570  *
571  * Since: 0.10.7.
572  */
573 void
574 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
575 {
576   g_return_if_fail (pads != NULL);
577   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
578
579   GST_COLLECT_PADS_PAD_LOCK (pads);
580   /* Ensure pads->data state */
581   gst_collect_pads_check_pads_unlocked (pads);
582   gst_collect_pads_set_flushing_unlocked (pads, flushing);
583   GST_COLLECT_PADS_PAD_UNLOCK (pads);
584 }
585
586 /**
587  * gst_collect_pads_start:
588  * @pads: the collectspads to use
589  *
590  * Starts the processing of data in the collect_pads.
591  *
592  * MT safe.
593  */
594 void
595 gst_collect_pads_start (GstCollectPads * pads)
596 {
597   GSList *collected;
598
599   g_return_if_fail (pads != NULL);
600   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
601
602   GST_DEBUG_OBJECT (pads, "starting collect pads");
603
604   /* make sure stop and collect cannot be called anymore */
605   GST_OBJECT_LOCK (pads);
606
607   /* make pads streamable */
608   GST_COLLECT_PADS_PAD_LOCK (pads);
609
610   /* loop over the master pad list and reset the segment */
611   collected = pads->pad_list;
612   for (; collected; collected = g_slist_next (collected)) {
613     GstCollectData *data;
614
615     data = collected->data;
616     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
617   }
618
619   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
620
621   /* Start collect pads */
622   pads->started = TRUE;
623   GST_COLLECT_PADS_PAD_UNLOCK (pads);
624   GST_OBJECT_UNLOCK (pads);
625 }
626
627 /**
628  * gst_collect_pads_stop:
629  * @pads: the collectspads to use
630  *
631  * Stops the processing of data in the collect_pads. this function
632  * will also unblock any blocking operations.
633  *
634  * MT safe.
635  */
636 void
637 gst_collect_pads_stop (GstCollectPads * pads)
638 {
639   GSList *collected;
640
641   g_return_if_fail (pads != NULL);
642   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
643
644   GST_DEBUG_OBJECT (pads, "stopping collect pads");
645
646   /* make sure collect and start cannot be called anymore */
647   GST_OBJECT_LOCK (pads);
648
649   /* make pads not accept data anymore */
650   GST_COLLECT_PADS_PAD_LOCK (pads);
651   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
652
653   /* Stop collect pads */
654   pads->started = FALSE;
655   pads->eospads = 0;
656   pads->queuedpads = 0;
657
658   /* loop over the master pad list and flush buffers */
659   collected = pads->pad_list;
660   for (; collected; collected = g_slist_next (collected)) {
661     GstCollectData *data;
662     GstBuffer **buffer_p;
663
664     data = collected->data;
665     if (data->buffer) {
666       buffer_p = &data->buffer;
667       gst_buffer_replace (buffer_p, NULL);
668       data->pos = 0;
669     }
670     data->eos = FALSE;
671   }
672
673   GST_COLLECT_PADS_PAD_UNLOCK (pads);
674   /* Wake them up so they can end the chain functions. */
675   GST_COLLECT_PADS_BROADCAST (pads);
676
677   GST_OBJECT_UNLOCK (pads);
678 }
679
680 /**
681  * gst_collect_pads_peek:
682  * @pads: the collectspads to peek
683  * @data: the data to use
684  *
685  * Peek at the buffer currently queued in @data. This function
686  * should be called with the @pads LOCK held, such as in the callback
687  * handler.
688  *
689  * MT safe.
690  *
691  * Returns: (transfer full): The buffer in @data or NULL if no buffer is queued.
692  *  should unref the buffer after usage.
693  */
694 GstBuffer *
695 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
696 {
697   GstBuffer *result;
698
699   g_return_val_if_fail (pads != NULL, NULL);
700   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
701   g_return_val_if_fail (data != NULL, NULL);
702
703   if ((result = data->buffer))
704     gst_buffer_ref (result);
705
706   GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
707       GST_DEBUG_PAD_NAME (data->pad), result);
708
709   return result;
710 }
711
712 /**
713  * gst_collect_pads_pop:
714  * @pads: the collectspads to pop
715  * @data: the data to use
716  *
717  * Pop the buffer currently queued in @data. This function
718  * should be called with the @pads LOCK held, such as in the callback
719  * handler.
720  *
721  * Free-function: gst_buffer_unref
722  *
723  * MT safe.
724  *
725  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
726  *   queued. You should unref the buffer after usage.
727  */
728 GstBuffer *
729 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
730 {
731   GstBuffer *result;
732
733   g_return_val_if_fail (pads != NULL, NULL);
734   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
735   g_return_val_if_fail (data != NULL, NULL);
736
737   if ((result = data->buffer)) {
738     data->buffer = NULL;
739     data->pos = 0;
740     /* one less pad with queued data now */
741     pads->queuedpads--;
742   }
743
744   GST_COLLECT_PADS_BROADCAST (pads);
745
746   GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p",
747       GST_DEBUG_PAD_NAME (data->pad), result);
748
749   return result;
750 }
751
752 /* pop and unref the currently queued buffer, should e called with the LOCK
753  * helt. */
754 static void
755 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
756 {
757   GstBuffer *buf;
758
759   if ((buf = gst_collect_pads_pop (pads, data)))
760     gst_buffer_unref (buf);
761 }
762
763 /**
764  * gst_collect_pads_available:
765  * @pads: the collectspads to query
766  *
767  * Query how much bytes can be read from each queued buffer. This means
768  * that the result of this call is the maximum number of bytes that can
769  * be read from each of the pads.
770  *
771  * This function should be called with @pads LOCK held, such as
772  * in the callback.
773  *
774  * MT safe.
775  *
776  * Returns: The maximum number of bytes queued on all pads. This function
777  * returns 0 if a pad has no queued buffer.
778  */
779 /* FIXME, we can do this in the _chain functions */
780 guint
781 gst_collect_pads_available (GstCollectPads * pads)
782 {
783   GSList *collected;
784   guint result = G_MAXUINT;
785
786   g_return_val_if_fail (pads != NULL, 0);
787   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
788
789   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
790     GstCollectData *pdata;
791     GstBuffer *buffer;
792     gint size;
793
794     pdata = (GstCollectData *) collected->data;
795
796     /* ignore pad with EOS */
797     if (G_UNLIKELY (pdata->eos)) {
798       GST_DEBUG ("pad %s:%s is EOS", GST_DEBUG_PAD_NAME (pdata->pad));
799       continue;
800     }
801
802     /* an empty buffer without EOS is weird when we get here.. */
803     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
804       GST_WARNING ("pad %s:%s has no buffer", GST_DEBUG_PAD_NAME (pdata->pad));
805       goto not_filled;
806     }
807
808     /* this is the size left of the buffer */
809     size = gst_buffer_get_size (buffer) - pdata->pos;
810     GST_DEBUG ("pad %s:%s has %d bytes left",
811         GST_DEBUG_PAD_NAME (pdata->pad), size);
812
813     /* need to return the min of all available data */
814     if (size < result)
815       result = size;
816   }
817   /* nothing changed, all must be EOS then, return 0 */
818   if (G_UNLIKELY (result == G_MAXUINT))
819     result = 0;
820
821   return result;
822
823 not_filled:
824   {
825     return 0;
826   }
827 }
828
829 /**
830  * gst_collect_pads_read_buffer:
831  * @pads: the collectspads to query
832  * @data: the data to use
833  * @size: the number of bytes to read
834  *
835  * Get a buffer of @size bytes from the given pad @data.
836  *
837  * This function should be called with @pads LOCK held, such as in the callback.
838  *
839  * Free-function: gst_buffer_unref
840  *
841  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
842  *     that requested. A return of NULL signals that the pad is end-of-stream.
843  *     Unref the buffer with gst_buffer_unref() after use.
844  *
845  * MT safe.
846  *
847  * Since: 0.10.18
848  */
849 GstBuffer *
850 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
851     guint size)
852 {
853   guint readsize, bufsize;
854   GstBuffer *buffer;
855
856   g_return_val_if_fail (pads != NULL, NULL);
857   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
858   g_return_val_if_fail (data != NULL, NULL);
859
860   /* no buffer, must be EOS */
861   if ((buffer = data->buffer) == NULL)
862     return NULL;
863
864   bufsize = gst_buffer_get_size (buffer);
865
866   readsize = MIN (size, bufsize - data->pos);
867
868   if (data->pos == 0 && readsize == bufsize)
869     return gst_buffer_ref (buffer);
870   else
871     return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
872         readsize);
873 }
874
875 /**
876  * gst_collect_pads_take_buffer:
877  * @pads: the collectspads to query
878  * @data: the data to use
879  * @size: the number of bytes to read
880  *
881  * Get a buffer of @size bytes from the given pad @data. Flushes the amount
882  * of read bytes.
883  *
884  * This function should be called with @pads LOCK held, such as in the callback.
885  *
886  * Free-function: gst_buffer_unref
887  *
888  * MT safe.
889  *
890  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
891  *     that requested. A return of NULL signals that the pad is end-of-stream.
892  *     Unref the buffer after use.
893  *
894  * Since: 0.10.18
895  */
896 GstBuffer *
897 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
898     guint size)
899 {
900   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
901
902   if (buffer) {
903     gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
904   }
905   return buffer;
906 }
907
908 /**
909  * gst_collect_pads_flush:
910  * @pads: the collectspads to query
911  * @data: the data to use
912  * @size: the number of bytes to flush
913  *
914  * Flush @size bytes from the pad @data.
915  *
916  * This function should be called with @pads LOCK held, such as
917  * in the callback.
918  *
919  * MT safe.
920  *
921  * Returns: The number of bytes flushed. This can be less than @size and
922  * is 0 if the pad was end-of-stream.
923  */
924 guint
925 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
926     guint size)
927 {
928   guint flushsize;
929   GstBuffer *buffer;
930   gsize bsize;
931
932   g_return_val_if_fail (pads != NULL, 0);
933   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
934   g_return_val_if_fail (data != NULL, 0);
935
936   /* no buffer, must be EOS */
937   if ((buffer = data->buffer) == NULL)
938     return 0;
939
940   bsize = gst_buffer_get_size (buffer);
941
942   /* this is what we can flush at max */
943   flushsize = MIN (size, bsize - data->pos);
944
945   data->pos += size;
946
947   GST_LOG_OBJECT (pads, "Flushing %d bytes, requested %u", flushsize, size);
948
949   if (data->pos >= bsize)
950     /* _clear will also reset data->pos to 0 */
951     gst_collect_pads_clear (pads, data);
952
953   return flushsize;
954 }
955
956 /* see if pads were added or removed and update our stats. Any pad
957  * added after releasing the PAD_LOCK will get collected in the next
958  * round.
959  *
960  * We can do a quick check by checking the cookies, that get changed
961  * whenever the pad list is updated.
962  *
963  * Must be called with LOCK.
964  */
965 static void
966 gst_collect_pads_check_pads_unlocked (GstCollectPads * pads)
967 {
968   GST_DEBUG ("stored cookie : %d, used_cookie:%d",
969       pads->pad_cookie, pads->cookie);
970   if (G_UNLIKELY (pads->pad_cookie != pads->cookie)) {
971     GSList *collected;
972
973     /* clear list and stats */
974     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
975     g_slist_free (pads->data);
976     pads->data = NULL;
977     pads->numpads = 0;
978     pads->queuedpads = 0;
979     pads->eospads = 0;
980
981     /* loop over the master pad list */
982     collected = pads->pad_list;
983     for (; collected; collected = g_slist_next (collected)) {
984       GstCollectData *data;
985
986       /* update the stats */
987       pads->numpads++;
988       data = collected->data;
989
990       if (G_LIKELY (!data->flushing)) {
991         if (data->buffer)
992           pads->queuedpads++;
993         if (data->eos)
994           pads->eospads++;
995       }
996
997       /* add to the list of pads to collect */
998       ref_data (data);
999       pads->data = g_slist_prepend (pads->data, data);
1000     }
1001     /* and update the cookie */
1002     pads->cookie = pads->pad_cookie;
1003   }
1004 }
1005
1006 static inline void
1007 gst_collect_pads_check_pads (GstCollectPads * pads)
1008 {
1009   /* the master list and cookie are protected with the PAD_LOCK */
1010   GST_COLLECT_PADS_PAD_LOCK (pads);
1011   gst_collect_pads_check_pads_unlocked (pads);
1012   GST_COLLECT_PADS_PAD_UNLOCK (pads);
1013 }
1014
1015 /* checks if all the pads are collected and call the collectfunction
1016  *
1017  * Should be called with LOCK.
1018  *
1019  * Returns: The #GstFlowReturn of collection.
1020  */
1021 static GstFlowReturn
1022 gst_collect_pads_check_collected (GstCollectPads * pads)
1023 {
1024   GstFlowReturn flow_ret = GST_FLOW_OK;
1025
1026   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1027   g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
1028
1029   /* check for new pads, update stats etc.. */
1030   gst_collect_pads_check_pads (pads);
1031
1032   if (G_UNLIKELY (pads->eospads == pads->numpads)) {
1033     /* If all our pads are EOS just collect once to let the element
1034      * do its final EOS handling. */
1035     GST_DEBUG ("All active pads (%d) are EOS, calling %s",
1036         pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
1037     flow_ret = pads->func (pads, pads->user_data);
1038   } else {
1039     gboolean collected = FALSE;
1040
1041     /* We call the collected function as long as our condition matches.
1042      * FIXME: should we error out if the collect function did not pop anything ?
1043      * we can get a busy loop here if the element does not pop from the collect
1044      * function
1045      */
1046     while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
1047       GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s",
1048           pads->queuedpads, pads->eospads, pads->numpads,
1049           GST_DEBUG_FUNCPTR_NAME (pads->func));
1050       flow_ret = pads->func (pads, pads->user_data);
1051       collected = TRUE;
1052
1053       /* break on error */
1054       if (flow_ret != GST_FLOW_OK)
1055         break;
1056       /* Don't keep looping after telling the element EOS or flushing */
1057       if (pads->queuedpads == 0)
1058         break;
1059     }
1060     if (!collected)
1061       GST_DEBUG ("Not all active pads (%d) have data, continuing",
1062           pads->numpads);
1063   }
1064   return flow_ret;
1065 }
1066
1067 static gboolean
1068 gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
1069 {
1070   gboolean res;
1071   GstCollectData *data;
1072   GstCollectPads *pads;
1073
1074   /* some magic to get the managing collect_pads */
1075   GST_OBJECT_LOCK (pad);
1076   data = (GstCollectData *) gst_pad_get_element_private (pad);
1077   if (G_UNLIKELY (data == NULL))
1078     goto pad_removed;
1079   ref_data (data);
1080   GST_OBJECT_UNLOCK (pad);
1081
1082   res = TRUE;
1083
1084   pads = data->collect;
1085
1086   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
1087       GST_DEBUG_PAD_NAME (data->pad));
1088
1089   switch (GST_EVENT_TYPE (event)) {
1090     case GST_EVENT_FLUSH_START:
1091     {
1092       /* forward event to unblock check_collected */
1093       gst_pad_event_default (pad, parent, event);
1094
1095       /* now unblock the chain function.
1096        * no cond per pad, so they all unblock,
1097        * non-flushing block again */
1098       GST_OBJECT_LOCK (pads);
1099       data->flushing = TRUE;
1100       gst_collect_pads_clear (pads, data);
1101       GST_OBJECT_UNLOCK (pads);
1102
1103       /* event already cleaned up by forwarding */
1104       goto done;
1105     }
1106     case GST_EVENT_FLUSH_STOP:
1107     {
1108       /* flush the 1 buffer queue */
1109       GST_OBJECT_LOCK (pads);
1110       data->flushing = FALSE;
1111       gst_collect_pads_clear (pads, data);
1112       /* we need new segment info after the flush */
1113       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1114       data->new_segment = FALSE;
1115       /* if the pad was EOS, remove the EOS flag and
1116        * decrement the number of eospads */
1117       if (G_UNLIKELY (data->eos == TRUE)) {
1118         pads->eospads--;
1119         data->eos = FALSE;
1120       }
1121
1122       if (!gst_collect_pads_is_flushing (pads)) {
1123         /* forward event if all pads are no longer flushing */
1124         GST_DEBUG ("No more pads are flushing, forwarding FLUSH_STOP");
1125         GST_OBJECT_UNLOCK (pads);
1126         goto forward;
1127       }
1128       gst_event_unref (event);
1129       GST_OBJECT_UNLOCK (pads);
1130       goto done;
1131     }
1132     case GST_EVENT_EOS:
1133     {
1134       GST_OBJECT_LOCK (pads);
1135       /* if the pad was not EOS, make it EOS and so we
1136        * have one more eospad */
1137       if (G_LIKELY (data->eos == FALSE)) {
1138         data->eos = TRUE;
1139         pads->eospads++;
1140       }
1141       /* check if we need collecting anything, we ignore the
1142        * result. */
1143       gst_collect_pads_check_collected (pads);
1144       GST_OBJECT_UNLOCK (pads);
1145
1146       /* We eat this event, element should do something
1147        * in the collected callback. */
1148       gst_event_unref (event);
1149       goto done;
1150     }
1151     case GST_EVENT_SEGMENT:
1152     {
1153       gst_event_copy_segment (event, &data->segment);
1154
1155       GST_DEBUG_OBJECT (data->pad, "got newsegment %" GST_SEGMENT_FORMAT,
1156           &data->segment);
1157       data->new_segment = TRUE;
1158
1159       /* we must not forward this event since multiple segments will be
1160        * accumulated and this is certainly not what we want. */
1161       gst_event_unref (event);
1162       /* FIXME: collect-pads based elements need to create their own newsegment
1163        * event (and only one really)
1164        * (a) make the segment part of the GstCollectData structure of each pad,
1165        * so you can just check that once you have a buffer queued on that pad,
1166        * (b) you can override a pad's event function with your own,
1167        * catch the newsegment event and then pass it on to the original
1168        * gstcollectpads event function
1169        * (that's what avimux does for something IIRC)
1170        * see #340060
1171        */
1172       goto done;
1173     }
1174     default:
1175       /* forward other events */
1176       goto forward;
1177   }
1178
1179 forward:
1180   GST_DEBUG_OBJECT (pads, "forward unhandled event: %s",
1181       GST_EVENT_TYPE_NAME (event));
1182   res = gst_pad_event_default (pad, parent, event);
1183
1184 done:
1185   unref_data (data);
1186   return res;
1187
1188   /* ERRORS */
1189 pad_removed:
1190   {
1191     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1192     GST_OBJECT_UNLOCK (pad);
1193     return FALSE;
1194   }
1195 }
1196
1197 /* For each buffer we receive we check if our collected condition is reached
1198  * and if so we call the collected function. When this is done we check if
1199  * data has been unqueued. If data is still queued we wait holding the stream
1200  * lock to make sure no EOS event can happen while we are ready to be
1201  * collected
1202  */
1203 static GstFlowReturn
1204 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1205 {
1206   GstCollectData *data;
1207   GstCollectPads *pads;
1208   GstCollectPadsPrivate *priv;
1209   GstFlowReturn ret;
1210
1211   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1212
1213   /* some magic to get the managing collect_pads */
1214   GST_OBJECT_LOCK (pad);
1215   data = (GstCollectData *) gst_pad_get_element_private (pad);
1216   if (G_UNLIKELY (data == NULL))
1217     goto no_data;
1218   ref_data (data);
1219   GST_OBJECT_UNLOCK (pad);
1220
1221   pads = data->collect;
1222   priv = pads->priv;
1223
1224   GST_OBJECT_LOCK (pads);
1225   /* if not started, bail out */
1226   if (G_UNLIKELY (!pads->started))
1227     goto not_started;
1228   /* check if this pad is flushing */
1229   if (G_UNLIKELY (data->flushing))
1230     goto flushing;
1231   /* pad was EOS, we can refuse this data */
1232   if (G_UNLIKELY (data->eos))
1233     goto unexpected;
1234
1235   /* see if we need to clip */
1236   if (priv->clipfunc) {
1237     buffer = priv->clipfunc (pads, data, buffer, priv->clipfunc_user_data);
1238
1239     if (G_UNLIKELY (buffer == NULL))
1240       goto clipped;
1241   }
1242
1243   GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
1244       GST_DEBUG_PAD_NAME (pad));
1245
1246   /* One more pad has data queued */
1247   pads->queuedpads++;
1248   /* take ownership of the buffer */
1249   if (data->buffer)
1250     gst_buffer_unref (data->buffer);
1251   data->buffer = buffer;
1252   buffer = NULL;
1253
1254   /* update segment last position if in TIME */
1255   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1256     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (data->buffer);
1257
1258     if (GST_CLOCK_TIME_IS_VALID (timestamp))
1259       data->segment.position = timestamp;
1260   }
1261
1262   /* While we have data queued on this pad try to collect stuff */
1263   do {
1264     GST_DEBUG ("Pad %s:%s checking", GST_DEBUG_PAD_NAME (pad));
1265     /* Check if our collected condition is matched and call the collected function
1266      * if it is */
1267     ret = gst_collect_pads_check_collected (pads);
1268     /* when an error occurs, we want to report this back to the caller ASAP
1269      * without having to block if the buffer was not popped */
1270     if (G_UNLIKELY (ret != GST_FLOW_OK))
1271       goto error;
1272
1273     /* data was consumed, we can exit and accept new data */
1274     if (data->buffer == NULL)
1275       break;
1276
1277     /* Check if we got removed in the mean time, FIXME, this is racy.
1278      * Between this check and the _WAIT, the pad could be removed which will
1279      * makes us hang in the _WAIT. */
1280     GST_OBJECT_LOCK (pad);
1281     if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL))
1282       goto pad_removed;
1283     GST_OBJECT_UNLOCK (pad);
1284
1285     GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
1286         GST_DEBUG_PAD_NAME (pad));
1287
1288     /* wait to be collected, this must happen from another thread triggered
1289      * by the _chain function of another pad. We release the lock so we
1290      * can get stopped or flushed as well. We can however not get EOS
1291      * because we still hold the STREAM_LOCK.
1292      */
1293     GST_COLLECT_PADS_WAIT (pads);
1294
1295     GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
1296
1297     /* after a signal, we could be stopped */
1298     if (G_UNLIKELY (!pads->started))
1299       goto not_started;
1300     /* check if this pad is flushing */
1301     if (G_UNLIKELY (data->flushing))
1302       goto flushing;
1303   }
1304   while (data->buffer != NULL);
1305
1306 unlock_done:
1307   GST_DEBUG ("Pad %s:%s done", GST_DEBUG_PAD_NAME (pad));
1308   GST_OBJECT_UNLOCK (pads);
1309   unref_data (data);
1310   if (buffer)
1311     gst_buffer_unref (buffer);
1312   return ret;
1313
1314 pad_removed:
1315   {
1316     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1317     GST_OBJECT_UNLOCK (pad);
1318     ret = GST_FLOW_NOT_LINKED;
1319     goto unlock_done;
1320   }
1321   /* ERRORS */
1322 no_data:
1323   {
1324     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1325     GST_OBJECT_UNLOCK (pad);
1326     gst_buffer_unref (buffer);
1327     return GST_FLOW_NOT_LINKED;
1328   }
1329 not_started:
1330   {
1331     GST_DEBUG ("not started");
1332     gst_collect_pads_clear (pads, data);
1333     ret = GST_FLOW_WRONG_STATE;
1334     goto unlock_done;
1335   }
1336 flushing:
1337   {
1338     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
1339     gst_collect_pads_clear (pads, data);
1340     ret = GST_FLOW_WRONG_STATE;
1341     goto unlock_done;
1342   }
1343 unexpected:
1344   {
1345     /* we should not post an error for this, just inform upstream that
1346      * we don't expect anything anymore */
1347     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
1348     ret = GST_FLOW_EOS;
1349     goto unlock_done;
1350   }
1351 clipped:
1352   {
1353     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1354     ret = GST_FLOW_OK;
1355     goto unlock_done;
1356   }
1357 error:
1358   {
1359     /* we print the error, the element should post a reasonable error
1360      * message for fatal errors */
1361     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
1362     gst_collect_pads_clear (pads, data);
1363     goto unlock_done;
1364   }
1365 }