pad: make an ACCEPT_CAPS query
[platform/upstream/gstreamer.git] / plugins / elements / gsttee.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4  *               2007 Wim Taymans <wim.taymans@gmail.com>
5  *
6  * gsttee.c: Tee element, one in N out
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 /**
25  * SECTION:element-tee
26  * @see_also: #GstIdentity
27  *
28  * Split data to multiple pads. Branching the data flow is useful when e.g.
29  * capturing a video where the video is shown on the screen and also encoded and
30  * written to a file. Another example is playing music and hooking up a
31  * visualisation module.
32  *
33  * One needs to use separate queue elements (or a multiqueue) in each branch to
34  * provide separate threads for each branch. Otherwise a blocked dataflow in one
35  * branch would stall the other branches.
36  *
37  * <refsect2>
38  * <title>Example launch line</title>
39  * |[
40  * gst-launch filesrc location=song.ogg ! decodebin2 ! tee name=t ! queue ! autoaudiosink t. ! queue ! audioconvert ! goom ! ffmpegcolorspace ! autovideosink
41  * ]| Play a song.ogg from local dir and render visualisations using the goom
42  * element.
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #  include "config.h"
48 #endif
49
50 #include "gsttee.h"
51
52 #include <string.h>
53
54 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
55     GST_PAD_SINK,
56     GST_PAD_ALWAYS,
57     GST_STATIC_CAPS_ANY);
58
59 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
60 #define GST_CAT_DEFAULT gst_tee_debug
61
62 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
63 static GType
64 gst_tee_pull_mode_get_type (void)
65 {
66   static GType type = 0;
67   static const GEnumValue data[] = {
68     {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
69     {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
70         "single"},
71     {0, NULL, NULL},
72   };
73
74   if (!type) {
75     type = g_enum_register_static ("GstTeePullMode", data);
76   }
77   return type;
78 }
79
80 /* lock to protect request pads from being removed while downstream */
81 #define GST_TEE_DYN_LOCK(tee) g_mutex_lock ((tee)->dyn_lock)
82 #define GST_TEE_DYN_UNLOCK(tee) g_mutex_unlock ((tee)->dyn_lock)
83
84 #define DEFAULT_PROP_NUM_SRC_PADS       0
85 #define DEFAULT_PROP_HAS_SINK_LOOP      FALSE
86 #define DEFAULT_PROP_HAS_CHAIN          TRUE
87 #define DEFAULT_PROP_SILENT             TRUE
88 #define DEFAULT_PROP_LAST_MESSAGE       NULL
89 #define DEFAULT_PULL_MODE               GST_TEE_PULL_MODE_NEVER
90
91 enum
92 {
93   PROP_0,
94   PROP_NUM_SRC_PADS,
95   PROP_HAS_SINK_LOOP,
96   PROP_HAS_CHAIN,
97   PROP_SILENT,
98   PROP_LAST_MESSAGE,
99   PROP_PULL_MODE,
100   PROP_ALLOC_PAD,
101 };
102
103 static GstStaticPadTemplate tee_src_template =
104 GST_STATIC_PAD_TEMPLATE ("src_%u",
105     GST_PAD_SRC,
106     GST_PAD_REQUEST,
107     GST_STATIC_CAPS_ANY);
108
109 /* structure and quark to keep track of which pads have been pushed */
110 static GQuark push_data;
111
112 #define _do_init \
113     GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element"); \
114     push_data = g_quark_from_static_string ("tee-push-data");
115 #define gst_tee_parent_class parent_class
116 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
117
118 static GParamSpec *pspec_last_message = NULL;
119 static GParamSpec *pspec_alloc_pad = NULL;
120
121 typedef struct
122 {
123   gboolean pushed;
124   GstFlowReturn result;
125   gboolean removed;
126 } PushData;
127
128 static GstPad *gst_tee_request_new_pad (GstElement * element,
129     GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
130 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
131
132 static void gst_tee_finalize (GObject * object);
133 static void gst_tee_set_property (GObject * object, guint prop_id,
134     const GValue * value, GParamSpec * pspec);
135 static void gst_tee_get_property (GObject * object, guint prop_id,
136     GValue * value, GParamSpec * pspec);
137 static void gst_tee_dispose (GObject * object);
138
139 static GstFlowReturn gst_tee_chain (GstPad * pad, GstBuffer * buffer);
140 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstBufferList * list);
141 static gboolean gst_tee_sink_event (GstPad * pad, GstEvent * event);
142 static gboolean gst_tee_sink_query (GstPad * pad, GstQuery * query);
143 static gboolean gst_tee_sink_acceptcaps (GstPad * pad, GstCaps * caps);
144 static gboolean gst_tee_sink_activate_push (GstPad * pad, gboolean active);
145 static gboolean gst_tee_src_query (GstPad * pad, GstQuery * query);
146 static gboolean gst_tee_src_activate_pull (GstPad * pad, gboolean active);
147 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, guint64 offset,
148     guint length, GstBuffer ** buf);
149
150 static void
151 gst_tee_dispose (GObject * object)
152 {
153   GList *item;
154
155 restart:
156   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
157     GstPad *pad = GST_PAD (item->data);
158     if (GST_PAD_IS_SRC (pad)) {
159       gst_element_release_request_pad (GST_ELEMENT (object), pad);
160       goto restart;
161     }
162   }
163
164   G_OBJECT_CLASS (parent_class)->dispose (object);
165 }
166
167 static void
168 gst_tee_finalize (GObject * object)
169 {
170   GstTee *tee;
171
172   tee = GST_TEE (object);
173
174   g_free (tee->last_message);
175
176   g_mutex_free (tee->dyn_lock);
177
178   G_OBJECT_CLASS (parent_class)->finalize (object);
179 }
180
181 static void
182 gst_tee_class_init (GstTeeClass * klass)
183 {
184   GObjectClass *gobject_class;
185   GstElementClass *gstelement_class;
186
187   gobject_class = G_OBJECT_CLASS (klass);
188   gstelement_class = GST_ELEMENT_CLASS (klass);
189
190   gobject_class->finalize = gst_tee_finalize;
191   gobject_class->set_property = gst_tee_set_property;
192   gobject_class->get_property = gst_tee_get_property;
193   gobject_class->dispose = gst_tee_dispose;
194
195   g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
196       g_param_spec_int ("num-src-pads", "Num Src Pads",
197           "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
198           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
199   g_object_class_install_property (gobject_class, PROP_HAS_SINK_LOOP,
200       g_param_spec_boolean ("has-sink-loop", "Has Sink Loop",
201           "If the element should spawn a thread (unimplemented and deprecated)",
202           DEFAULT_PROP_HAS_SINK_LOOP,
203           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204   g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
205       g_param_spec_boolean ("has-chain", "Has Chain",
206           "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
207           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
208   g_object_class_install_property (gobject_class, PROP_SILENT,
209       g_param_spec_boolean ("silent", "Silent",
210           "Don't produce last_message events", DEFAULT_PROP_SILENT,
211           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
212   pspec_last_message = g_param_spec_string ("last-message", "Last Message",
213       "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
214       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
215   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
216       pspec_last_message);
217   g_object_class_install_property (gobject_class, PROP_PULL_MODE,
218       g_param_spec_enum ("pull-mode", "Pull mode",
219           "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
220           DEFAULT_PULL_MODE,
221           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
223       "The pad used for gst_pad_alloc_buffer", GST_TYPE_PAD,
224       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
225   g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
226       pspec_alloc_pad);
227
228   gst_element_class_set_details_simple (gstelement_class,
229       "Tee pipe fitting",
230       "Generic",
231       "1-to-N pipe fitting",
232       "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
233   gst_element_class_add_pad_template (gstelement_class,
234       gst_static_pad_template_get (&sinktemplate));
235   gst_element_class_add_pad_template (gstelement_class,
236       gst_static_pad_template_get (&tee_src_template));
237
238   gstelement_class->request_new_pad =
239       GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
240   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
241 }
242
243 static void
244 gst_tee_init (GstTee * tee)
245 {
246   tee->dyn_lock = g_mutex_new ();
247
248   tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
249   tee->sink_mode = GST_PAD_ACTIVATE_NONE;
250
251   gst_pad_set_event_function (tee->sinkpad,
252       GST_DEBUG_FUNCPTR (gst_tee_sink_event));
253   gst_pad_set_query_function (tee->sinkpad,
254       GST_DEBUG_FUNCPTR (gst_tee_sink_query));
255   gst_pad_set_getcaps_function (tee->sinkpad,
256       GST_DEBUG_FUNCPTR (gst_pad_proxy_getcaps));
257   gst_pad_set_activatepush_function (tee->sinkpad,
258       GST_DEBUG_FUNCPTR (gst_tee_sink_activate_push));
259   gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
260   gst_pad_set_chain_list_function (tee->sinkpad,
261       GST_DEBUG_FUNCPTR (gst_tee_chain_list));
262   gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
263
264   tee->last_message = NULL;
265 }
266
267 static void
268 gst_tee_notify_alloc_pad (GstTee * tee)
269 {
270 #if !GLIB_CHECK_VERSION(2,26,0)
271   g_object_notify ((GObject *) tee, "alloc-pad");
272 #else
273   g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
274 #endif
275 }
276
277 static GstFlowReturn
278 forward_sticky_events (GstPad * pad, GstEvent * event, gpointer user_data)
279 {
280   GstPad *srcpad = GST_PAD_CAST (user_data);
281
282   gst_pad_push_event (srcpad, gst_event_ref (event));
283
284   return GST_FLOW_OK;
285 }
286
287 static GstPad *
288 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
289     const gchar * unused, const GstCaps * caps)
290 {
291   gchar *name;
292   GstPad *srcpad;
293   GstTee *tee;
294   GstPadActivateMode mode;
295   gboolean res;
296   PushData *data;
297
298   tee = GST_TEE (element);
299
300   GST_DEBUG_OBJECT (tee, "requesting pad");
301
302   GST_OBJECT_LOCK (tee);
303   name = g_strdup_printf ("src_%u", tee->pad_counter++);
304
305   srcpad = gst_pad_new_from_template (templ, name);
306   g_free (name);
307
308   mode = tee->sink_mode;
309
310   /* install the data, we automatically free it when the pad is disposed because
311    * of _release_pad or when the element goes away. */
312   data = g_new0 (PushData, 1);
313   data->pushed = FALSE;
314   data->result = GST_FLOW_NOT_LINKED;
315   data->removed = FALSE;
316   g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free);
317
318   GST_OBJECT_UNLOCK (tee);
319
320   switch (mode) {
321     case GST_PAD_ACTIVATE_PULL:
322       /* we already have a src pad in pull mode, and our pull mode can only be
323          SINGLE, so fall through to activate this new pad in push mode */
324     case GST_PAD_ACTIVATE_PUSH:
325       res = gst_pad_activate_push (srcpad, TRUE);
326       break;
327     default:
328       res = TRUE;
329       break;
330   }
331
332   if (!res)
333     goto activate_failed;
334
335   gst_pad_set_getcaps_function (srcpad,
336       GST_DEBUG_FUNCPTR (gst_pad_proxy_getcaps));
337   gst_pad_set_activatepull_function (srcpad,
338       GST_DEBUG_FUNCPTR (gst_tee_src_activate_pull));
339   gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
340   gst_pad_set_getrange_function (srcpad,
341       GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
342   /* Forward sticky events to the new srcpad */
343   gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
344   gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
345
346   return srcpad;
347
348   /* ERRORS */
349 activate_failed:
350   {
351     gboolean changed = FALSE;
352
353     GST_OBJECT_LOCK (tee);
354     GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
355     if (tee->allocpad == srcpad) {
356       tee->allocpad = NULL;
357       changed = TRUE;
358     }
359     GST_OBJECT_UNLOCK (tee);
360     gst_object_unref (srcpad);
361     if (changed) {
362       gst_tee_notify_alloc_pad (tee);
363     }
364     return NULL;
365   }
366 }
367
368 static void
369 gst_tee_release_pad (GstElement * element, GstPad * pad)
370 {
371   GstTee *tee;
372   PushData *data;
373   gboolean changed = FALSE;
374
375   tee = GST_TEE (element);
376
377   GST_DEBUG_OBJECT (tee, "releasing pad");
378
379   /* wait for pending pad_alloc to finish */
380   GST_TEE_DYN_LOCK (tee);
381   data = g_object_get_qdata (G_OBJECT (pad), push_data);
382
383   GST_OBJECT_LOCK (tee);
384   /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
385   data->removed = TRUE;
386   if (tee->allocpad == pad) {
387     tee->allocpad = NULL;
388     changed = TRUE;
389   }
390   GST_OBJECT_UNLOCK (tee);
391
392   gst_object_ref (pad);
393   gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
394
395   gst_pad_set_active (pad, FALSE);
396   GST_TEE_DYN_UNLOCK (tee);
397
398   gst_object_unref (pad);
399
400   if (changed) {
401     gst_tee_notify_alloc_pad (tee);
402   }
403 }
404
405 static void
406 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
407     GParamSpec * pspec)
408 {
409   GstTee *tee = GST_TEE (object);
410
411   GST_OBJECT_LOCK (tee);
412   switch (prop_id) {
413     case PROP_HAS_SINK_LOOP:
414       tee->has_sink_loop = g_value_get_boolean (value);
415       if (tee->has_sink_loop) {
416         g_warning ("tee will never implement has-sink-loop==TRUE");
417       }
418       break;
419     case PROP_HAS_CHAIN:
420       tee->has_chain = g_value_get_boolean (value);
421       break;
422     case PROP_SILENT:
423       tee->silent = g_value_get_boolean (value);
424       break;
425     case PROP_PULL_MODE:
426       tee->pull_mode = g_value_get_enum (value);
427       break;
428     case PROP_ALLOC_PAD:
429     {
430       GstPad *pad = g_value_get_object (value);
431       GST_OBJECT_LOCK (pad);
432       if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
433         tee->allocpad = pad;
434       else
435         GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
436             " is not my pad", GST_OBJECT_NAME (pad));
437       GST_OBJECT_UNLOCK (pad);
438       break;
439     }
440     default:
441       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
442       break;
443   }
444   GST_OBJECT_UNLOCK (tee);
445 }
446
447 static void
448 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
449     GParamSpec * pspec)
450 {
451   GstTee *tee = GST_TEE (object);
452
453   GST_OBJECT_LOCK (tee);
454   switch (prop_id) {
455     case PROP_NUM_SRC_PADS:
456       g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
457       break;
458     case PROP_HAS_SINK_LOOP:
459       g_value_set_boolean (value, tee->has_sink_loop);
460       break;
461     case PROP_HAS_CHAIN:
462       g_value_set_boolean (value, tee->has_chain);
463       break;
464     case PROP_SILENT:
465       g_value_set_boolean (value, tee->silent);
466       break;
467     case PROP_LAST_MESSAGE:
468       g_value_set_string (value, tee->last_message);
469       break;
470     case PROP_PULL_MODE:
471       g_value_set_enum (value, tee->pull_mode);
472       break;
473     case PROP_ALLOC_PAD:
474       g_value_set_object (value, tee->allocpad);
475       break;
476     default:
477       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
478       break;
479   }
480   GST_OBJECT_UNLOCK (tee);
481 }
482
483 static gboolean
484 gst_tee_sink_event (GstPad * pad, GstEvent * event)
485 {
486   gboolean res;
487
488   switch (GST_EVENT_TYPE (event)) {
489     default:
490       res = gst_pad_event_default (pad, event);
491       break;
492   }
493
494   return res;
495 }
496
497 /* on the sink we accept caps that are acceptable to all srcpads */
498 static gboolean
499 gst_tee_sink_acceptcaps (GstPad * pad, GstCaps * caps)
500 {
501   GstTee *tee;
502   gboolean res, done;
503   GstIterator *it;
504   GValue item = { 0, };
505
506   tee = GST_TEE_CAST (GST_PAD_PARENT (pad));
507
508   it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (tee));
509
510   res = TRUE;
511   done = FALSE;
512   while (!done && res) {
513     switch (gst_iterator_next (it, &item)) {
514       case GST_ITERATOR_OK:
515         res &= gst_pad_peer_accept_caps (g_value_get_object (&item), caps);
516         g_value_reset (&item);
517         break;
518       case GST_ITERATOR_RESYNC:
519         res = TRUE;
520         gst_iterator_resync (it);
521         break;
522       case GST_ITERATOR_ERROR:
523         res = FALSE;
524         done = TRUE;
525         break;
526       case GST_ITERATOR_DONE:
527         done = TRUE;
528         break;
529     }
530   }
531   g_value_unset (&item);
532   gst_iterator_free (it);
533
534   return res;
535 }
536
537 static gboolean
538 gst_tee_sink_query (GstPad * pad, GstQuery * query)
539 {
540   gboolean res;
541
542   switch (GST_QUERY_TYPE (query)) {
543     case GST_QUERY_ACCEPT_CAPS:
544     {
545       GstCaps *caps;
546
547       gst_query_parse_accept_caps (query, &caps);
548       res = gst_tee_sink_acceptcaps (pad, caps);
549       gst_query_set_accept_caps_result (query, res);
550       res = TRUE;
551       break;
552     }
553     default:
554       res = gst_pad_query_default (pad, query);
555       break;
556   }
557
558   return res;
559 }
560
561 static void
562 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
563 {
564   GST_OBJECT_LOCK (tee);
565   g_free (tee->last_message);
566   if (is_list) {
567     tee->last_message =
568         g_strdup_printf ("chain-list   ******* (%s:%s)t %p",
569         GST_DEBUG_PAD_NAME (pad), data);
570   } else {
571     tee->last_message =
572         g_strdup_printf ("chain        ******* (%s:%s)t (%" G_GSIZE_FORMAT
573         " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
574         gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
575   }
576   GST_OBJECT_UNLOCK (tee);
577
578 #if !GLIB_CHECK_VERSION(2,26,0)
579   g_object_notify ((GObject *) tee, "last-message");
580 #else
581   g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
582 #endif
583 }
584
585 static GstFlowReturn
586 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
587 {
588   GstFlowReturn res;
589
590   /* Push */
591   if (pad == tee->pull_pad) {
592     /* don't push on the pad we're pulling from */
593     res = GST_FLOW_OK;
594   } else if (is_list) {
595     res =
596         gst_pad_push_list (pad,
597         gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
598   } else {
599     res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
600   }
601   return res;
602 }
603
604 static void
605 clear_pads (GstPad * pad, GstTee * tee)
606 {
607   PushData *data;
608
609   data = g_object_get_qdata ((GObject *) pad, push_data);
610
611   /* the data must be there or we have a screwed up internal state */
612   g_assert (data != NULL);
613
614   data->pushed = FALSE;
615   data->result = GST_FLOW_NOT_LINKED;
616 }
617
618 static GstFlowReturn
619 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
620 {
621   GList *pads;
622   guint32 cookie;
623   GstFlowReturn ret, cret;
624
625   if (G_UNLIKELY (!tee->silent))
626     gst_tee_do_message (tee, tee->sinkpad, data, is_list);
627
628   GST_OBJECT_LOCK (tee);
629   pads = GST_ELEMENT_CAST (tee)->srcpads;
630
631   /* special case for zero pads */
632   if (G_UNLIKELY (!pads))
633     goto no_pads;
634
635   /* special case for just one pad that avoids reffing the buffer */
636   if (!pads->next) {
637     GstPad *pad = GST_PAD_CAST (pads->data);
638
639     GST_OBJECT_UNLOCK (tee);
640
641     if (pad == tee->pull_pad) {
642       ret = GST_FLOW_OK;
643     } else if (is_list) {
644       ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
645     } else {
646       ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
647     }
648     return ret;
649   }
650
651   /* mark all pads as 'not pushed on yet' */
652   g_list_foreach (pads, (GFunc) clear_pads, tee);
653
654 restart:
655   cret = GST_FLOW_NOT_LINKED;
656   pads = GST_ELEMENT_CAST (tee)->srcpads;
657   cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
658
659   while (pads) {
660     GstPad *pad;
661     PushData *pdata;
662
663     pad = GST_PAD_CAST (pads->data);
664
665     /* get the private data, something is really wrong with the internal state
666      * when it is not there */
667     pdata = g_object_get_qdata ((GObject *) pad, push_data);
668     g_assert (pdata != NULL);
669
670     if (G_LIKELY (!pdata->pushed)) {
671       /* not yet pushed, release lock and start pushing */
672       gst_object_ref (pad);
673       GST_OBJECT_UNLOCK (tee);
674
675       GST_LOG_OBJECT (tee, "Starting to push %s %p",
676           is_list ? "list" : "buffer", data);
677
678       ret = gst_tee_do_push (tee, pad, data, is_list);
679
680       GST_LOG_OBJECT (tee, "Pushing item %p yielded result %s", data,
681           gst_flow_get_name (ret));
682
683       GST_OBJECT_LOCK (tee);
684       /* keep track of which pad we pushed and the result value. We need to do
685        * this before we release the refcount on the pad, the PushData is
686        * destroyed when the last ref of the pad goes away. */
687       pdata->pushed = TRUE;
688       pdata->result = ret;
689       gst_object_unref (pad);
690     } else {
691       /* already pushed, use previous return value */
692       ret = pdata->result;
693       GST_LOG_OBJECT (tee, "pad already pushed with %s",
694           gst_flow_get_name (ret));
695     }
696
697     /* before we go combining the return value, check if the pad list is still
698      * the same. It could be possible that the pad we just pushed was removed
699      * and the return value it not valid anymore */
700     if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
701       GST_LOG_OBJECT (tee, "pad list changed");
702       /* the list of pads changed, restart iteration. Pads that we already
703        * pushed on and are still in the new list, will not be pushed on
704        * again. */
705       goto restart;
706     }
707
708     /* stop pushing more buffers when we have a fatal error */
709     if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
710       goto error;
711
712     /* keep all other return values, overwriting the previous one. */
713     if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
714       GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
715       cret = ret;
716     }
717     pads = g_list_next (pads);
718   }
719   GST_OBJECT_UNLOCK (tee);
720
721   gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
722
723   /* no need to unset gvalue */
724   return cret;
725
726   /* ERRORS */
727 no_pads:
728   {
729     GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
730     ret = GST_FLOW_NOT_LINKED;
731     goto error;
732   }
733 error:
734   {
735     GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
736     GST_OBJECT_UNLOCK (tee);
737     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
738     return ret;
739   }
740 }
741
742 static GstFlowReturn
743 gst_tee_chain (GstPad * pad, GstBuffer * buffer)
744 {
745   GstFlowReturn res;
746   GstTee *tee;
747
748   tee = GST_TEE_CAST (GST_OBJECT_PARENT (pad));
749
750   GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
751
752   res = gst_tee_handle_data (tee, buffer, FALSE);
753
754   GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
755
756   return res;
757 }
758
759 static GstFlowReturn
760 gst_tee_chain_list (GstPad * pad, GstBufferList * list)
761 {
762   GstFlowReturn res;
763   GstTee *tee;
764
765   tee = GST_TEE_CAST (gst_pad_get_parent (pad));
766
767   GST_DEBUG_OBJECT (tee, "received list %p", list);
768
769   res = gst_tee_handle_data (tee, list, TRUE);
770
771   GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
772
773   gst_object_unref (tee);
774
775   return res;
776 }
777
778 static gboolean
779 gst_tee_sink_activate_push (GstPad * pad, gboolean active)
780 {
781   GstTee *tee;
782
783   tee = GST_TEE (GST_OBJECT_PARENT (pad));
784
785   GST_OBJECT_LOCK (tee);
786   tee->sink_mode = active && GST_PAD_ACTIVATE_PUSH;
787
788   if (active && !tee->has_chain)
789     goto no_chain;
790   GST_OBJECT_UNLOCK (tee);
791
792   return TRUE;
793
794   /* ERRORS */
795 no_chain:
796   {
797     GST_OBJECT_UNLOCK (tee);
798     GST_INFO_OBJECT (tee,
799         "Tee cannot operate in push mode with has-chain==FALSE");
800     return FALSE;
801   }
802 }
803
804 static gboolean
805 gst_tee_src_activate_pull (GstPad * pad, gboolean active)
806 {
807   GstTee *tee;
808   gboolean res;
809   GstPad *sinkpad;
810
811   tee = GST_TEE (gst_pad_get_parent (pad));
812
813   GST_OBJECT_LOCK (tee);
814
815   if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
816     goto cannot_pull;
817
818   if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
819     goto cannot_pull_multiple_srcs;
820
821   sinkpad = gst_object_ref (tee->sinkpad);
822
823   GST_OBJECT_UNLOCK (tee);
824
825   res = gst_pad_activate_pull (sinkpad, active);
826   gst_object_unref (sinkpad);
827
828   if (!res)
829     goto sink_activate_failed;
830
831   GST_OBJECT_LOCK (tee);
832   if (active) {
833     if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
834       tee->pull_pad = pad;
835   } else {
836     if (pad == tee->pull_pad)
837       tee->pull_pad = NULL;
838   }
839   tee->sink_mode = active & GST_PAD_ACTIVATE_PULL;
840   GST_OBJECT_UNLOCK (tee);
841
842   gst_object_unref (tee);
843
844   return res;
845
846   /* ERRORS */
847 cannot_pull:
848   {
849     GST_OBJECT_UNLOCK (tee);
850     GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
851         "set to NEVER");
852     gst_object_unref (tee);
853     return FALSE;
854   }
855 cannot_pull_multiple_srcs:
856   {
857     GST_OBJECT_UNLOCK (tee);
858     GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
859         "pull-mode set to SINGLE");
860     gst_object_unref (tee);
861     return FALSE;
862   }
863 sink_activate_failed:
864   {
865     GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
866         active ? "" : "de");
867     gst_object_unref (tee);
868     return FALSE;
869   }
870 }
871
872 static gboolean
873 gst_tee_src_query (GstPad * pad, GstQuery * query)
874 {
875   GstTee *tee;
876   gboolean res;
877   GstPad *sinkpad;
878
879   tee = GST_TEE (gst_pad_get_parent (pad));
880
881   switch (GST_QUERY_TYPE (query)) {
882     case GST_QUERY_SCHEDULING:
883     {
884       gboolean pull_mode;
885
886       GST_OBJECT_LOCK (tee);
887       pull_mode = TRUE;
888       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
889         GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
890             "set to NEVER");
891         pull_mode = FALSE;
892       } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
893         GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
894             "pull-mode set to SINGLE");
895         pull_mode = FALSE;
896       }
897
898       sinkpad = gst_object_ref (tee->sinkpad);
899       GST_OBJECT_UNLOCK (tee);
900
901       if (pull_mode) {
902         /* ask peer if we can operate in pull mode */
903         res = gst_pad_peer_query (sinkpad, query);
904       } else {
905         res = TRUE;
906       }
907       gst_object_unref (sinkpad);
908       break;
909     }
910     default:
911       res = gst_pad_query_default (pad, query);
912       break;
913   }
914
915   gst_object_unref (tee);
916
917   return res;
918 }
919
920 static void
921 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
922 {
923   GstPad *pad = g_value_get_object (vpad);
924
925   if (pad != tee->pull_pad)
926     gst_pad_push_event (pad, gst_event_new_eos ());
927 }
928
929 static void
930 gst_tee_pull_eos (GstTee * tee)
931 {
932   GstIterator *iter;
933
934   iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
935   gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos,
936       tee);
937   gst_iterator_free (iter);
938 }
939
940 static GstFlowReturn
941 gst_tee_src_get_range (GstPad * pad, guint64 offset, guint length,
942     GstBuffer ** buf)
943 {
944   GstTee *tee;
945   GstFlowReturn ret;
946
947   tee = GST_TEE (gst_pad_get_parent (pad));
948
949   ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
950
951   if (ret == GST_FLOW_OK)
952     ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
953   else if (ret == GST_FLOW_EOS)
954     gst_tee_pull_eos (tee);
955
956   gst_object_unref (tee);
957
958   return ret;
959 }