tee: Add allow-not-linked property
[platform/upstream/gstreamer.git] / plugins / elements / gsttee.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4  *               2007 Wim Taymans <wim.taymans@gmail.com>
5  *
6  * gsttee.c: Tee element, one in N out
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-tee
26  * @see_also: #GstIdentity
27  *
28  * Split data to multiple pads. Branching the data flow is useful when e.g.
29  * capturing a video where the video is shown on the screen and also encoded and
30  * written to a file. Another example is playing music and hooking up a
31  * visualisation module.
32  *
33  * One needs to use separate queue elements (or a multiqueue) in each branch to
34  * provide separate threads for each branch. Otherwise a blocked dataflow in one
35  * branch would stall the other branches.
36  *
37  * <refsect2>
38  * <title>Example launch line</title>
39  * |[
40  * gst-launch filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
41  * ]| Play a song.ogg from local dir and render visualisations using the goom
42  * element.
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #  include "config.h"
48 #endif
49
50 #include "gsttee.h"
51 #include "gst/glib-compat-private.h"
52
53 #include <string.h>
54 #include <stdio.h>
55
56 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
57     GST_PAD_SINK,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS_ANY);
60
61 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
62 #define GST_CAT_DEFAULT gst_tee_debug
63
64 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
65 static GType
66 gst_tee_pull_mode_get_type (void)
67 {
68   static GType type = 0;
69   static const GEnumValue data[] = {
70     {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
71     {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
72         "single"},
73     {0, NULL, NULL},
74   };
75
76   if (!type) {
77     type = g_enum_register_static ("GstTeePullMode", data);
78   }
79   return type;
80 }
81
82 #define DEFAULT_PROP_NUM_SRC_PADS       0
83 #define DEFAULT_PROP_HAS_CHAIN          TRUE
84 #define DEFAULT_PROP_SILENT             TRUE
85 #define DEFAULT_PROP_LAST_MESSAGE       NULL
86 #define DEFAULT_PULL_MODE               GST_TEE_PULL_MODE_NEVER
87 #define DEFAULT_PROP_ALLOW_NOT_LINKED   FALSE
88
89 enum
90 {
91   PROP_0,
92   PROP_NUM_SRC_PADS,
93   PROP_HAS_CHAIN,
94   PROP_SILENT,
95   PROP_LAST_MESSAGE,
96   PROP_PULL_MODE,
97   PROP_ALLOC_PAD,
98   PROP_ALLOW_NOT_LINKED,
99 };
100
101 static GstStaticPadTemplate tee_src_template =
102 GST_STATIC_PAD_TEMPLATE ("src_%u",
103     GST_PAD_SRC,
104     GST_PAD_REQUEST,
105     GST_STATIC_CAPS_ANY);
106
107 #define _do_init \
108     GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
109 #define gst_tee_parent_class parent_class
110 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
111
112 static GParamSpec *pspec_last_message = NULL;
113 static GParamSpec *pspec_alloc_pad = NULL;
114
115 GType gst_tee_pad_get_type (void);
116
117 #define GST_TYPE_TEE_PAD \
118   (gst_tee_pad_get_type())
119 #define GST_TEE_PAD(obj) \
120   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
121 #define GST_TEE_PAD_CLASS(klass) \
122   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
123 #define GST_IS_TEE_PAD(obj) \
124   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
125 #define GST_IS_TEE_PAD_CLASS(klass) \
126   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
127 #define GST_TEE_PAD_CAST(obj) \
128   ((GstTeePad *)(obj))
129
130 typedef struct _GstTeePad GstTeePad;
131 typedef struct _GstTeePadClass GstTeePadClass;
132
133 struct _GstTeePad
134 {
135   GstPad parent;
136
137   guint index;
138   gboolean pushed;
139   GstFlowReturn result;
140   gboolean removed;
141 };
142
143 struct _GstTeePadClass
144 {
145   GstPadClass parent;
146 };
147
148 G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
149
150 static void
151 gst_tee_pad_class_init (GstTeePadClass * klass)
152 {
153 }
154
155 static void
156 gst_tee_pad_reset (GstTeePad * pad)
157 {
158   pad->pushed = FALSE;
159   pad->result = GST_FLOW_NOT_LINKED;
160   pad->removed = FALSE;
161 }
162
163 static void
164 gst_tee_pad_init (GstTeePad * pad)
165 {
166   gst_tee_pad_reset (pad);
167 }
168
169 static GstPad *gst_tee_request_new_pad (GstElement * element,
170     GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
171 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
172
173 static void gst_tee_finalize (GObject * object);
174 static void gst_tee_set_property (GObject * object, guint prop_id,
175     const GValue * value, GParamSpec * pspec);
176 static void gst_tee_get_property (GObject * object, guint prop_id,
177     GValue * value, GParamSpec * pspec);
178 static void gst_tee_dispose (GObject * object);
179
180 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
181     GstBuffer * buffer);
182 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
183     GstBufferList * list);
184 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
185     GstEvent * event);
186 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
187     GstQuery * query);
188 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
189     GstPadMode mode, gboolean active);
190 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
191     GstQuery * query);
192 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
193     GstPadMode mode, gboolean active);
194 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
195     guint64 offset, guint length, GstBuffer ** buf);
196
197 static void
198 gst_tee_dispose (GObject * object)
199 {
200   GList *item;
201
202 restart:
203   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
204     GstPad *pad = GST_PAD (item->data);
205     if (GST_PAD_IS_SRC (pad)) {
206       gst_element_release_request_pad (GST_ELEMENT (object), pad);
207       goto restart;
208     }
209   }
210
211   G_OBJECT_CLASS (parent_class)->dispose (object);
212 }
213
214 static void
215 gst_tee_finalize (GObject * object)
216 {
217   GstTee *tee;
218
219   tee = GST_TEE (object);
220
221   g_hash_table_unref (tee->pad_indexes);
222
223   g_free (tee->last_message);
224
225   G_OBJECT_CLASS (parent_class)->finalize (object);
226 }
227
228 static void
229 gst_tee_class_init (GstTeeClass * klass)
230 {
231   GObjectClass *gobject_class;
232   GstElementClass *gstelement_class;
233
234   gobject_class = G_OBJECT_CLASS (klass);
235   gstelement_class = GST_ELEMENT_CLASS (klass);
236
237   gobject_class->finalize = gst_tee_finalize;
238   gobject_class->set_property = gst_tee_set_property;
239   gobject_class->get_property = gst_tee_get_property;
240   gobject_class->dispose = gst_tee_dispose;
241
242   g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
243       g_param_spec_int ("num-src-pads", "Num Src Pads",
244           "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
245           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
246   g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
247       g_param_spec_boolean ("has-chain", "Has Chain",
248           "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
249           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250   g_object_class_install_property (gobject_class, PROP_SILENT,
251       g_param_spec_boolean ("silent", "Silent",
252           "Don't produce last_message events", DEFAULT_PROP_SILENT,
253           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254   pspec_last_message = g_param_spec_string ("last-message", "Last Message",
255       "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
256       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
257   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
258       pspec_last_message);
259   g_object_class_install_property (gobject_class, PROP_PULL_MODE,
260       g_param_spec_enum ("pull-mode", "Pull mode",
261           "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
262           DEFAULT_PULL_MODE,
263           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
264   pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
265       "The pad ALLOCATION queries will be proxied to (unused)", GST_TYPE_PAD,
266       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
267   g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
268       pspec_alloc_pad);
269
270   /**
271    * GstTee:allow-not-linked
272    *
273    * This property makes sink pad return GST_FLOW_OK even if there are no
274    * source pads or any of them is linked.
275    *
276    * This is useful to avoid errors when you have a dynamic pipeline and during
277    * a reconnection you can have all the pads unlinked or removed.
278    *
279    * Since: 1.6
280    */
281   g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED,
282       g_param_spec_boolean ("allow-not-linked", "Allow not linked",
283           "Return GTS_FLOW_OK even if there are not source pads or all are "
284           "unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED,
285           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286
287   gst_element_class_set_static_metadata (gstelement_class,
288       "Tee pipe fitting",
289       "Generic",
290       "1-to-N pipe fitting",
291       "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
292   gst_element_class_add_pad_template (gstelement_class,
293       gst_static_pad_template_get (&sinktemplate));
294   gst_element_class_add_pad_template (gstelement_class,
295       gst_static_pad_template_get (&tee_src_template));
296
297   gstelement_class->request_new_pad =
298       GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
299   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
300 }
301
302 static void
303 gst_tee_init (GstTee * tee)
304 {
305   tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
306   tee->sink_mode = GST_PAD_MODE_NONE;
307
308   gst_pad_set_event_function (tee->sinkpad,
309       GST_DEBUG_FUNCPTR (gst_tee_sink_event));
310   gst_pad_set_query_function (tee->sinkpad,
311       GST_DEBUG_FUNCPTR (gst_tee_sink_query));
312   gst_pad_set_activatemode_function (tee->sinkpad,
313       GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
314   gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
315   gst_pad_set_chain_list_function (tee->sinkpad,
316       GST_DEBUG_FUNCPTR (gst_tee_chain_list));
317   GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
318   gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
319
320   tee->pad_indexes = g_hash_table_new (NULL, NULL);
321
322   tee->last_message = NULL;
323 }
324
325 static void
326 gst_tee_notify_alloc_pad (GstTee * tee)
327 {
328   g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
329 }
330
331 static gboolean
332 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
333 {
334   GstPad *srcpad = GST_PAD_CAST (user_data);
335   GstFlowReturn ret;
336
337   ret = gst_pad_store_sticky_event (srcpad, *event);
338   if (ret != GST_FLOW_OK) {
339     GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
340         GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
341   }
342
343   return TRUE;
344 }
345
346 static GstPad *
347 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
348     const gchar * name_templ, const GstCaps * caps)
349 {
350   gchar *name;
351   GstPad *srcpad;
352   GstTee *tee;
353   GstPadMode mode;
354   gboolean res;
355   guint index = 0;
356
357   tee = GST_TEE (element);
358
359   GST_DEBUG_OBJECT (tee, "requesting pad");
360
361   GST_OBJECT_LOCK (tee);
362
363   if (name_templ) {
364     sscanf (name_templ, "src_%u", &index);
365     GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index);
366     if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) {
367       GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ);
368       GST_OBJECT_UNLOCK (tee);
369       return NULL;
370     }
371     if (index >= tee->next_pad_index)
372       tee->next_pad_index = index + 1;
373   } else {
374     index = tee->next_pad_index;
375
376     while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index)))
377       index++;
378
379     tee->next_pad_index = index + 1;
380   }
381
382   g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL);
383
384   name = g_strdup_printf ("src_%u", index);
385
386   srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
387           "name", name, "direction", templ->direction, "template", templ,
388           NULL));
389   GST_TEE_PAD_CAST (srcpad)->index = index;
390   g_free (name);
391
392   mode = tee->sink_mode;
393
394   GST_OBJECT_UNLOCK (tee);
395
396   switch (mode) {
397     case GST_PAD_MODE_PULL:
398       /* we already have a src pad in pull mode, and our pull mode can only be
399          SINGLE, so fall through to activate this new pad in push mode */
400     case GST_PAD_MODE_PUSH:
401       res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
402       break;
403     default:
404       res = TRUE;
405       break;
406   }
407
408   if (!res)
409     goto activate_failed;
410
411   gst_pad_set_activatemode_function (srcpad,
412       GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
413   gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
414   gst_pad_set_getrange_function (srcpad,
415       GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
416   /* Forward sticky events to the new srcpad */
417   gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
418   GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
419   gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
420
421   return srcpad;
422
423   /* ERRORS */
424 activate_failed:
425   {
426     gboolean changed = FALSE;
427
428     GST_OBJECT_LOCK (tee);
429     GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
430     if (tee->allocpad == srcpad) {
431       tee->allocpad = NULL;
432       changed = TRUE;
433     }
434     GST_OBJECT_UNLOCK (tee);
435     gst_object_unref (srcpad);
436     if (changed) {
437       gst_tee_notify_alloc_pad (tee);
438     }
439     return NULL;
440   }
441 }
442
443 static void
444 gst_tee_release_pad (GstElement * element, GstPad * pad)
445 {
446   GstTee *tee;
447   gboolean changed = FALSE;
448   guint index;
449
450   tee = GST_TEE (element);
451
452   GST_DEBUG_OBJECT (tee, "releasing pad");
453
454   GST_OBJECT_LOCK (tee);
455   index = GST_TEE_PAD_CAST (pad)->index;
456   /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
457   GST_TEE_PAD_CAST (pad)->removed = TRUE;
458   if (tee->allocpad == pad) {
459     tee->allocpad = NULL;
460     changed = TRUE;
461   }
462   GST_OBJECT_UNLOCK (tee);
463
464   gst_object_ref (pad);
465   gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
466
467   gst_pad_set_active (pad, FALSE);
468
469   gst_object_unref (pad);
470
471   if (changed) {
472     gst_tee_notify_alloc_pad (tee);
473   }
474
475   GST_OBJECT_LOCK (tee);
476   g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index));
477   GST_OBJECT_UNLOCK (tee);
478 }
479
480 static void
481 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
482     GParamSpec * pspec)
483 {
484   GstTee *tee = GST_TEE (object);
485
486   GST_OBJECT_LOCK (tee);
487   switch (prop_id) {
488     case PROP_HAS_CHAIN:
489       tee->has_chain = g_value_get_boolean (value);
490       break;
491     case PROP_SILENT:
492       tee->silent = g_value_get_boolean (value);
493       break;
494     case PROP_PULL_MODE:
495       tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
496       break;
497     case PROP_ALLOC_PAD:
498     {
499       GstPad *pad = g_value_get_object (value);
500       GST_OBJECT_LOCK (pad);
501       if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
502         tee->allocpad = pad;
503       else
504         GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
505             " is not my pad", GST_OBJECT_NAME (pad));
506       GST_OBJECT_UNLOCK (pad);
507       break;
508     }
509     case PROP_ALLOW_NOT_LINKED:
510       tee->allow_not_linked = g_value_get_boolean (value);
511       break;
512     default:
513       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
514       break;
515   }
516   GST_OBJECT_UNLOCK (tee);
517 }
518
519 static void
520 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
521     GParamSpec * pspec)
522 {
523   GstTee *tee = GST_TEE (object);
524
525   GST_OBJECT_LOCK (tee);
526   switch (prop_id) {
527     case PROP_NUM_SRC_PADS:
528       g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
529       break;
530     case PROP_HAS_CHAIN:
531       g_value_set_boolean (value, tee->has_chain);
532       break;
533     case PROP_SILENT:
534       g_value_set_boolean (value, tee->silent);
535       break;
536     case PROP_LAST_MESSAGE:
537       g_value_set_string (value, tee->last_message);
538       break;
539     case PROP_PULL_MODE:
540       g_value_set_enum (value, tee->pull_mode);
541       break;
542     case PROP_ALLOC_PAD:
543       g_value_set_object (value, tee->allocpad);
544       break;
545     case PROP_ALLOW_NOT_LINKED:
546       g_value_set_boolean (value, tee->allow_not_linked);
547       break;
548     default:
549       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
550       break;
551   }
552   GST_OBJECT_UNLOCK (tee);
553 }
554
555 static gboolean
556 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
557 {
558   gboolean res;
559
560   switch (GST_EVENT_TYPE (event)) {
561     default:
562       res = gst_pad_event_default (pad, parent, event);
563       break;
564   }
565
566   return res;
567 }
568
569 static gboolean
570 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
571 {
572   gboolean res;
573
574   switch (GST_QUERY_TYPE (query)) {
575     default:
576       res = gst_pad_query_default (pad, parent, query);
577       break;
578   }
579   return res;
580 }
581
582 static void
583 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
584 {
585   GST_OBJECT_LOCK (tee);
586   g_free (tee->last_message);
587   if (is_list) {
588     tee->last_message =
589         g_strdup_printf ("chain-list   ******* (%s:%s)t %p",
590         GST_DEBUG_PAD_NAME (pad), data);
591   } else {
592     tee->last_message =
593         g_strdup_printf ("chain        ******* (%s:%s)t (%" G_GSIZE_FORMAT
594         " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
595         gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
596   }
597   GST_OBJECT_UNLOCK (tee);
598
599   g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
600 }
601
602 static GstFlowReturn
603 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
604 {
605   GstFlowReturn res;
606
607   /* Push */
608   if (pad == tee->pull_pad) {
609     /* don't push on the pad we're pulling from */
610     res = GST_FLOW_OK;
611   } else if (is_list) {
612     res =
613         gst_pad_push_list (pad,
614         gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
615   } else {
616     res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
617   }
618   return res;
619 }
620
621 static void
622 clear_pads (GstPad * pad, GstTee * tee)
623 {
624   GST_TEE_PAD_CAST (pad)->pushed = FALSE;
625   GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
626 }
627
628 static GstFlowReturn
629 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
630 {
631   GList *pads;
632   guint32 cookie;
633   GstFlowReturn ret, cret;
634
635   if (G_UNLIKELY (!tee->silent))
636     gst_tee_do_message (tee, tee->sinkpad, data, is_list);
637
638   GST_OBJECT_LOCK (tee);
639   pads = GST_ELEMENT_CAST (tee)->srcpads;
640
641   /* special case for zero pads */
642   if (G_UNLIKELY (!pads))
643     goto no_pads;
644
645   /* special case for just one pad that avoids reffing the buffer */
646   if (!pads->next) {
647     GstPad *pad = GST_PAD_CAST (pads->data);
648
649     /* Keep another ref around, a pad probe
650      * might release and destroy the pad */
651     gst_object_ref (pad);
652     GST_OBJECT_UNLOCK (tee);
653
654     if (pad == tee->pull_pad) {
655       ret = GST_FLOW_OK;
656     } else if (is_list) {
657       ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
658     } else {
659       ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
660     }
661
662     gst_object_unref (pad);
663
664     if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) {
665       ret = GST_FLOW_OK;
666     }
667
668     return ret;
669   }
670
671   /* mark all pads as 'not pushed on yet' */
672   g_list_foreach (pads, (GFunc) clear_pads, tee);
673
674 restart:
675   if (tee->allow_not_linked) {
676     cret = GST_FLOW_OK;
677   } else {
678     cret = GST_FLOW_NOT_LINKED;
679   }
680   pads = GST_ELEMENT_CAST (tee)->srcpads;
681   cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
682
683   while (pads) {
684     GstPad *pad;
685
686     pad = GST_PAD_CAST (pads->data);
687
688     if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
689       /* not yet pushed, release lock and start pushing */
690       gst_object_ref (pad);
691       GST_OBJECT_UNLOCK (tee);
692
693       GST_LOG_OBJECT (pad, "Starting to push %s %p",
694           is_list ? "list" : "buffer", data);
695
696       ret = gst_tee_do_push (tee, pad, data, is_list);
697
698       GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data,
699           gst_flow_get_name (ret));
700
701       GST_OBJECT_LOCK (tee);
702       /* keep track of which pad we pushed and the result value */
703       GST_TEE_PAD_CAST (pad)->pushed = TRUE;
704       GST_TEE_PAD_CAST (pad)->result = ret;
705       gst_object_unref (pad);
706     } else {
707       /* already pushed, use previous return value */
708       ret = GST_TEE_PAD_CAST (pad)->result;
709       GST_LOG_OBJECT (pad, "pad already pushed with %s",
710           gst_flow_get_name (ret));
711     }
712
713     /* before we go combining the return value, check if the pad list is still
714      * the same. It could be possible that the pad we just pushed was removed
715      * and the return value it not valid anymore */
716     if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
717       GST_LOG_OBJECT (pad, "pad list changed");
718       /* the list of pads changed, restart iteration. Pads that we already
719        * pushed on and are still in the new list, will not be pushed on
720        * again. */
721       goto restart;
722     }
723
724     /* stop pushing more buffers when we have a fatal error */
725     if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
726       goto error;
727
728     /* keep all other return values, overwriting the previous one. */
729     if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
730       GST_LOG_OBJECT (pad, "Replacing ret val %d with %d", cret, ret);
731       cret = ret;
732     }
733     pads = g_list_next (pads);
734   }
735   GST_OBJECT_UNLOCK (tee);
736
737   gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
738
739   /* no need to unset gvalue */
740   return cret;
741
742   /* ERRORS */
743 no_pads:
744   {
745     if (tee->allow_not_linked) {
746       GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s",
747           is_list ? "buffer-list" : "buffer");
748       ret = GST_FLOW_OK;
749     } else {
750       GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
751       ret = GST_FLOW_NOT_LINKED;
752     }
753     goto end;
754   }
755 error:
756   {
757     GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
758     goto end;
759   }
760 end:
761   {
762     GST_OBJECT_UNLOCK (tee);
763     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
764     return ret;
765   }
766 }
767
768 static GstFlowReturn
769 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
770 {
771   GstFlowReturn res;
772   GstTee *tee;
773
774   tee = GST_TEE_CAST (parent);
775
776   GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
777
778   res = gst_tee_handle_data (tee, buffer, FALSE);
779
780   GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
781
782   return res;
783 }
784
785 static GstFlowReturn
786 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
787 {
788   GstFlowReturn res;
789   GstTee *tee;
790
791   tee = GST_TEE_CAST (parent);
792
793   GST_DEBUG_OBJECT (tee, "received list %p", list);
794
795   res = gst_tee_handle_data (tee, list, TRUE);
796
797   GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
798
799   return res;
800 }
801
802 static gboolean
803 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
804     gboolean active)
805 {
806   gboolean res;
807   GstTee *tee;
808
809   tee = GST_TEE (parent);
810
811   switch (mode) {
812     case GST_PAD_MODE_PUSH:
813     {
814       GST_OBJECT_LOCK (tee);
815       tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
816
817       if (active && !tee->has_chain)
818         goto no_chain;
819       GST_OBJECT_UNLOCK (tee);
820       res = TRUE;
821       break;
822     }
823     default:
824       res = FALSE;
825       break;
826   }
827   return res;
828
829   /* ERRORS */
830 no_chain:
831   {
832     GST_OBJECT_UNLOCK (tee);
833     GST_INFO_OBJECT (tee,
834         "Tee cannot operate in push mode with has-chain==FALSE");
835     return FALSE;
836   }
837 }
838
839 static gboolean
840 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
841     gboolean active)
842 {
843   GstTee *tee;
844   gboolean res;
845   GstPad *sinkpad;
846
847   tee = GST_TEE (parent);
848
849   switch (mode) {
850     case GST_PAD_MODE_PULL:
851     {
852       GST_OBJECT_LOCK (tee);
853
854       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
855         goto cannot_pull;
856
857       if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
858         goto cannot_pull_multiple_srcs;
859
860       sinkpad = gst_object_ref (tee->sinkpad);
861
862       GST_OBJECT_UNLOCK (tee);
863
864       res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
865       gst_object_unref (sinkpad);
866
867       if (!res)
868         goto sink_activate_failed;
869
870       GST_OBJECT_LOCK (tee);
871       if (active) {
872         if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
873           tee->pull_pad = pad;
874       } else {
875         if (pad == tee->pull_pad)
876           tee->pull_pad = NULL;
877       }
878       tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
879       GST_OBJECT_UNLOCK (tee);
880       break;
881     }
882     default:
883       res = TRUE;
884       break;
885   }
886
887   return res;
888
889   /* ERRORS */
890 cannot_pull:
891   {
892     GST_OBJECT_UNLOCK (tee);
893     GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
894         "set to NEVER");
895     return FALSE;
896   }
897 cannot_pull_multiple_srcs:
898   {
899     GST_OBJECT_UNLOCK (tee);
900     GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
901         "pull-mode set to SINGLE");
902     return FALSE;
903   }
904 sink_activate_failed:
905   {
906     GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
907         active ? "" : "de");
908     return FALSE;
909   }
910 }
911
912 static gboolean
913 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
914 {
915   GstTee *tee;
916   gboolean res;
917   GstPad *sinkpad;
918
919   tee = GST_TEE (parent);
920
921   switch (GST_QUERY_TYPE (query)) {
922     case GST_QUERY_SCHEDULING:
923     {
924       gboolean pull_mode;
925
926       GST_OBJECT_LOCK (tee);
927       pull_mode = TRUE;
928       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
929         GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
930             "set to NEVER");
931         pull_mode = FALSE;
932       } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
933         GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
934             "pull-mode set to SINGLE");
935         pull_mode = FALSE;
936       }
937
938       sinkpad = gst_object_ref (tee->sinkpad);
939       GST_OBJECT_UNLOCK (tee);
940
941       if (pull_mode) {
942         /* ask peer if we can operate in pull mode */
943         res = gst_pad_peer_query (sinkpad, query);
944       } else {
945         res = TRUE;
946       }
947       gst_object_unref (sinkpad);
948       break;
949     }
950     default:
951       res = gst_pad_query_default (pad, parent, query);
952       break;
953   }
954
955   return res;
956 }
957
958 static void
959 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
960 {
961   GstPad *pad = g_value_get_object (vpad);
962
963   if (pad != tee->pull_pad)
964     gst_pad_push_event (pad, gst_event_new_eos ());
965 }
966
967 static void
968 gst_tee_pull_eos (GstTee * tee)
969 {
970   GstIterator *iter;
971
972   iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
973   gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos,
974       tee);
975   gst_iterator_free (iter);
976 }
977
978 static GstFlowReturn
979 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
980     guint length, GstBuffer ** buf)
981 {
982   GstTee *tee;
983   GstFlowReturn ret;
984
985   tee = GST_TEE (parent);
986
987   ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
988
989   if (ret == GST_FLOW_OK)
990     ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
991   else if (ret == GST_FLOW_EOS)
992     gst_tee_pull_eos (tee);
993
994   return ret;
995 }