elements: use new gst_element_class_add_static_pad_template()
[platform/upstream/gstreamer.git] / plugins / elements / gsttee.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4  *               2007 Wim Taymans <wim.taymans@gmail.com>
5  *
6  * gsttee.c: Tee element, one in N out
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-tee
26  * @see_also: #GstIdentity
27  *
28  * Split data to multiple pads. Branching the data flow is useful when e.g.
29  * capturing a video where the video is shown on the screen and also encoded and
30  * written to a file. Another example is playing music and hooking up a
31  * visualisation module.
32  *
33  * One needs to use separate queue elements (or a multiqueue) in each branch to
34  * provide separate threads for each branch. Otherwise a blocked dataflow in one
35  * branch would stall the other branches.
36  *
37  * <refsect2>
38  * <title>Example launch line</title>
39  * |[
40  * gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
41  * ]| Play song.ogg audio file which must be in the current working directory
42  * and render visualisations using the goom element (this can be easier done
43  * using the playbin element, this is just an example pipeline).
44  * </refsect2>
45  */
46
47 #ifdef HAVE_CONFIG_H
48 #  include "config.h"
49 #endif
50
51 #include "gsttee.h"
52 #include "gst/glib-compat-private.h"
53
54 #include <string.h>
55 #include <stdio.h>
56
57 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
58     GST_PAD_SINK,
59     GST_PAD_ALWAYS,
60     GST_STATIC_CAPS_ANY);
61
62 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
63 #define GST_CAT_DEFAULT gst_tee_debug
64
65 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
66 static GType
67 gst_tee_pull_mode_get_type (void)
68 {
69   static GType type = 0;
70   static const GEnumValue data[] = {
71     {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
72     {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
73         "single"},
74     {0, NULL, NULL},
75   };
76
77   if (!type) {
78     type = g_enum_register_static ("GstTeePullMode", data);
79   }
80   return type;
81 }
82
83 #define DEFAULT_PROP_NUM_SRC_PADS       0
84 #define DEFAULT_PROP_HAS_CHAIN          TRUE
85 #define DEFAULT_PROP_SILENT             TRUE
86 #define DEFAULT_PROP_LAST_MESSAGE       NULL
87 #define DEFAULT_PULL_MODE               GST_TEE_PULL_MODE_NEVER
88 #define DEFAULT_PROP_ALLOW_NOT_LINKED   FALSE
89
90 enum
91 {
92   PROP_0,
93   PROP_NUM_SRC_PADS,
94   PROP_HAS_CHAIN,
95   PROP_SILENT,
96   PROP_LAST_MESSAGE,
97   PROP_PULL_MODE,
98   PROP_ALLOC_PAD,
99   PROP_ALLOW_NOT_LINKED,
100 };
101
102 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
103     GST_PAD_SRC,
104     GST_PAD_REQUEST,
105     GST_STATIC_CAPS_ANY);
106
107 #define _do_init \
108     GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
109 #define gst_tee_parent_class parent_class
110 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
111
112 static GParamSpec *pspec_last_message = NULL;
113 static GParamSpec *pspec_alloc_pad = NULL;
114
115 GType gst_tee_pad_get_type (void);
116
117 #define GST_TYPE_TEE_PAD \
118   (gst_tee_pad_get_type())
119 #define GST_TEE_PAD(obj) \
120   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
121 #define GST_TEE_PAD_CLASS(klass) \
122   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
123 #define GST_IS_TEE_PAD(obj) \
124   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
125 #define GST_IS_TEE_PAD_CLASS(klass) \
126   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
127 #define GST_TEE_PAD_CAST(obj) \
128   ((GstTeePad *)(obj))
129
130 typedef struct _GstTeePad GstTeePad;
131 typedef struct _GstTeePadClass GstTeePadClass;
132
133 struct _GstTeePad
134 {
135   GstPad parent;
136
137   guint index;
138   gboolean pushed;
139   GstFlowReturn result;
140   gboolean removed;
141 };
142
143 struct _GstTeePadClass
144 {
145   GstPadClass parent;
146 };
147
148 G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
149
150 static void
151 gst_tee_pad_class_init (GstTeePadClass * klass)
152 {
153 }
154
155 static void
156 gst_tee_pad_reset (GstTeePad * pad)
157 {
158   pad->pushed = FALSE;
159   pad->result = GST_FLOW_NOT_LINKED;
160   pad->removed = FALSE;
161 }
162
163 static void
164 gst_tee_pad_init (GstTeePad * pad)
165 {
166   gst_tee_pad_reset (pad);
167 }
168
169 static GstPad *gst_tee_request_new_pad (GstElement * element,
170     GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
171 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
172
173 static void gst_tee_finalize (GObject * object);
174 static void gst_tee_set_property (GObject * object, guint prop_id,
175     const GValue * value, GParamSpec * pspec);
176 static void gst_tee_get_property (GObject * object, guint prop_id,
177     GValue * value, GParamSpec * pspec);
178 static void gst_tee_dispose (GObject * object);
179
180 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
181     GstBuffer * buffer);
182 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
183     GstBufferList * list);
184 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
185     GstEvent * event);
186 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
187     GstQuery * query);
188 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
189     GstPadMode mode, gboolean active);
190 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
191     GstQuery * query);
192 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
193     GstPadMode mode, gboolean active);
194 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
195     guint64 offset, guint length, GstBuffer ** buf);
196
197 static void
198 gst_tee_dispose (GObject * object)
199 {
200   GList *item;
201
202 restart:
203   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
204     GstPad *pad = GST_PAD (item->data);
205     if (GST_PAD_IS_SRC (pad)) {
206       gst_element_release_request_pad (GST_ELEMENT (object), pad);
207       goto restart;
208     }
209   }
210
211   G_OBJECT_CLASS (parent_class)->dispose (object);
212 }
213
214 static void
215 gst_tee_finalize (GObject * object)
216 {
217   GstTee *tee;
218
219   tee = GST_TEE (object);
220
221   g_hash_table_unref (tee->pad_indexes);
222
223   g_free (tee->last_message);
224
225   G_OBJECT_CLASS (parent_class)->finalize (object);
226 }
227
228 static void
229 gst_tee_class_init (GstTeeClass * klass)
230 {
231   GObjectClass *gobject_class;
232   GstElementClass *gstelement_class;
233
234   gobject_class = G_OBJECT_CLASS (klass);
235   gstelement_class = GST_ELEMENT_CLASS (klass);
236
237   gobject_class->finalize = gst_tee_finalize;
238   gobject_class->set_property = gst_tee_set_property;
239   gobject_class->get_property = gst_tee_get_property;
240   gobject_class->dispose = gst_tee_dispose;
241
242   g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
243       g_param_spec_int ("num-src-pads", "Num Src Pads",
244           "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
245           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
246   g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
247       g_param_spec_boolean ("has-chain", "Has Chain",
248           "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
249           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250   g_object_class_install_property (gobject_class, PROP_SILENT,
251       g_param_spec_boolean ("silent", "Silent",
252           "Don't produce last_message events", DEFAULT_PROP_SILENT,
253           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254   pspec_last_message = g_param_spec_string ("last-message", "Last Message",
255       "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
256       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
257   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
258       pspec_last_message);
259   g_object_class_install_property (gobject_class, PROP_PULL_MODE,
260       g_param_spec_enum ("pull-mode", "Pull mode",
261           "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
262           DEFAULT_PULL_MODE,
263           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
264   pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
265       "The pad ALLOCATION queries will be proxied to (unused)", GST_TYPE_PAD,
266       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
267   g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
268       pspec_alloc_pad);
269
270   /**
271    * GstTee:allow-not-linked
272    *
273    * This property makes sink pad return GST_FLOW_OK even if there are no
274    * source pads or any of them is linked.
275    *
276    * This is useful to avoid errors when you have a dynamic pipeline and during
277    * a reconnection you can have all the pads unlinked or removed.
278    *
279    * Since: 1.6
280    */
281   g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED,
282       g_param_spec_boolean ("allow-not-linked", "Allow not linked",
283           "Return GST_FLOW_OK even if there are no source pads or they are "
284           "all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED,
285           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286
287   gst_element_class_set_static_metadata (gstelement_class,
288       "Tee pipe fitting",
289       "Generic",
290       "1-to-N pipe fitting",
291       "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
292   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
293   gst_element_class_add_static_pad_template (gstelement_class, &src_template);
294
295   gstelement_class->request_new_pad =
296       GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
297   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
298 }
299
300 static void
301 gst_tee_init (GstTee * tee)
302 {
303   tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
304   tee->sink_mode = GST_PAD_MODE_NONE;
305
306   gst_pad_set_event_function (tee->sinkpad,
307       GST_DEBUG_FUNCPTR (gst_tee_sink_event));
308   gst_pad_set_query_function (tee->sinkpad,
309       GST_DEBUG_FUNCPTR (gst_tee_sink_query));
310   gst_pad_set_activatemode_function (tee->sinkpad,
311       GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
312   gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
313   gst_pad_set_chain_list_function (tee->sinkpad,
314       GST_DEBUG_FUNCPTR (gst_tee_chain_list));
315   GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
316   gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
317
318   tee->pad_indexes = g_hash_table_new (NULL, NULL);
319
320   tee->last_message = NULL;
321 }
322
323 static void
324 gst_tee_notify_alloc_pad (GstTee * tee)
325 {
326   g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
327 }
328
329 static gboolean
330 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
331 {
332   GstPad *srcpad = GST_PAD_CAST (user_data);
333   GstFlowReturn ret;
334
335   ret = gst_pad_store_sticky_event (srcpad, *event);
336   if (ret != GST_FLOW_OK) {
337     GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
338         GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
339   }
340
341   return TRUE;
342 }
343
344 static GstPad *
345 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
346     const gchar * name_templ, const GstCaps * caps)
347 {
348   gchar *name;
349   GstPad *srcpad;
350   GstTee *tee;
351   GstPadMode mode;
352   gboolean res;
353   guint index = 0;
354
355   tee = GST_TEE (element);
356
357   GST_DEBUG_OBJECT (tee, "requesting pad");
358
359   GST_OBJECT_LOCK (tee);
360
361   if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) {
362     GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index);
363     if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) {
364       GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ);
365       GST_OBJECT_UNLOCK (tee);
366       return NULL;
367     }
368     if (index >= tee->next_pad_index)
369       tee->next_pad_index = index + 1;
370   } else {
371     index = tee->next_pad_index;
372
373     while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index)))
374       index++;
375
376     tee->next_pad_index = index + 1;
377   }
378
379   g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL);
380
381   name = g_strdup_printf ("src_%u", index);
382
383   srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
384           "name", name, "direction", templ->direction, "template", templ,
385           NULL));
386   GST_TEE_PAD_CAST (srcpad)->index = index;
387   g_free (name);
388
389   mode = tee->sink_mode;
390
391   GST_OBJECT_UNLOCK (tee);
392
393   switch (mode) {
394     case GST_PAD_MODE_PULL:
395       /* we already have a src pad in pull mode, and our pull mode can only be
396          SINGLE, so fall through to activate this new pad in push mode */
397     case GST_PAD_MODE_PUSH:
398       res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
399       break;
400     default:
401       res = TRUE;
402       break;
403   }
404
405   if (!res)
406     goto activate_failed;
407
408   gst_pad_set_activatemode_function (srcpad,
409       GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
410   gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
411   gst_pad_set_getrange_function (srcpad,
412       GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
413   /* Forward sticky events to the new srcpad */
414   gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
415   GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
416   gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
417
418   return srcpad;
419
420   /* ERRORS */
421 activate_failed:
422   {
423     gboolean changed = FALSE;
424
425     GST_OBJECT_LOCK (tee);
426     GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
427     if (tee->allocpad == srcpad) {
428       tee->allocpad = NULL;
429       changed = TRUE;
430     }
431     GST_OBJECT_UNLOCK (tee);
432     gst_object_unref (srcpad);
433     if (changed) {
434       gst_tee_notify_alloc_pad (tee);
435     }
436     return NULL;
437   }
438 }
439
440 static void
441 gst_tee_release_pad (GstElement * element, GstPad * pad)
442 {
443   GstTee *tee;
444   gboolean changed = FALSE;
445   guint index;
446
447   tee = GST_TEE (element);
448
449   GST_DEBUG_OBJECT (tee, "releasing pad");
450
451   GST_OBJECT_LOCK (tee);
452   index = GST_TEE_PAD_CAST (pad)->index;
453   /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
454   GST_TEE_PAD_CAST (pad)->removed = TRUE;
455   if (tee->allocpad == pad) {
456     tee->allocpad = NULL;
457     changed = TRUE;
458   }
459   GST_OBJECT_UNLOCK (tee);
460
461   gst_object_ref (pad);
462   gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
463
464   gst_pad_set_active (pad, FALSE);
465
466   gst_object_unref (pad);
467
468   if (changed) {
469     gst_tee_notify_alloc_pad (tee);
470   }
471
472   GST_OBJECT_LOCK (tee);
473   g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index));
474   GST_OBJECT_UNLOCK (tee);
475 }
476
477 static void
478 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
479     GParamSpec * pspec)
480 {
481   GstTee *tee = GST_TEE (object);
482
483   GST_OBJECT_LOCK (tee);
484   switch (prop_id) {
485     case PROP_HAS_CHAIN:
486       tee->has_chain = g_value_get_boolean (value);
487       break;
488     case PROP_SILENT:
489       tee->silent = g_value_get_boolean (value);
490       break;
491     case PROP_PULL_MODE:
492       tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
493       break;
494     case PROP_ALLOC_PAD:
495     {
496       GstPad *pad = g_value_get_object (value);
497       GST_OBJECT_LOCK (pad);
498       if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
499         tee->allocpad = pad;
500       else
501         GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
502             " is not my pad", GST_OBJECT_NAME (pad));
503       GST_OBJECT_UNLOCK (pad);
504       break;
505     }
506     case PROP_ALLOW_NOT_LINKED:
507       tee->allow_not_linked = g_value_get_boolean (value);
508       break;
509     default:
510       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
511       break;
512   }
513   GST_OBJECT_UNLOCK (tee);
514 }
515
516 static void
517 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
518     GParamSpec * pspec)
519 {
520   GstTee *tee = GST_TEE (object);
521
522   GST_OBJECT_LOCK (tee);
523   switch (prop_id) {
524     case PROP_NUM_SRC_PADS:
525       g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
526       break;
527     case PROP_HAS_CHAIN:
528       g_value_set_boolean (value, tee->has_chain);
529       break;
530     case PROP_SILENT:
531       g_value_set_boolean (value, tee->silent);
532       break;
533     case PROP_LAST_MESSAGE:
534       g_value_set_string (value, tee->last_message);
535       break;
536     case PROP_PULL_MODE:
537       g_value_set_enum (value, tee->pull_mode);
538       break;
539     case PROP_ALLOC_PAD:
540       g_value_set_object (value, tee->allocpad);
541       break;
542     case PROP_ALLOW_NOT_LINKED:
543       g_value_set_boolean (value, tee->allow_not_linked);
544       break;
545     default:
546       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
547       break;
548   }
549   GST_OBJECT_UNLOCK (tee);
550 }
551
552 static gboolean
553 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
554 {
555   gboolean res;
556
557   switch (GST_EVENT_TYPE (event)) {
558     default:
559       res = gst_pad_event_default (pad, parent, event);
560       break;
561   }
562
563   return res;
564 }
565
566 static gboolean
567 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
568 {
569   gboolean res;
570
571   switch (GST_QUERY_TYPE (query)) {
572     default:
573       res = gst_pad_query_default (pad, parent, query);
574       break;
575   }
576   return res;
577 }
578
579 static void
580 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
581 {
582   GST_OBJECT_LOCK (tee);
583   g_free (tee->last_message);
584   if (is_list) {
585     tee->last_message =
586         g_strdup_printf ("chain-list   ******* (%s:%s)t %p",
587         GST_DEBUG_PAD_NAME (pad), data);
588   } else {
589     tee->last_message =
590         g_strdup_printf ("chain        ******* (%s:%s)t (%" G_GSIZE_FORMAT
591         " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
592         gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
593   }
594   GST_OBJECT_UNLOCK (tee);
595
596   g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
597 }
598
599 static GstFlowReturn
600 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
601 {
602   GstFlowReturn res;
603
604   /* Push */
605   if (pad == tee->pull_pad) {
606     /* don't push on the pad we're pulling from */
607     res = GST_FLOW_OK;
608   } else if (is_list) {
609     res =
610         gst_pad_push_list (pad,
611         gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
612   } else {
613     res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
614   }
615   return res;
616 }
617
618 static void
619 clear_pads (GstPad * pad, GstTee * tee)
620 {
621   GST_TEE_PAD_CAST (pad)->pushed = FALSE;
622   GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
623 }
624
625 static GstFlowReturn
626 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
627 {
628   GList *pads;
629   guint32 cookie;
630   GstFlowReturn ret, cret;
631
632   if (G_UNLIKELY (!tee->silent))
633     gst_tee_do_message (tee, tee->sinkpad, data, is_list);
634
635   GST_OBJECT_LOCK (tee);
636   pads = GST_ELEMENT_CAST (tee)->srcpads;
637
638   /* special case for zero pads */
639   if (G_UNLIKELY (!pads))
640     goto no_pads;
641
642   /* special case for just one pad that avoids reffing the buffer */
643   if (!pads->next) {
644     GstPad *pad = GST_PAD_CAST (pads->data);
645
646     /* Keep another ref around, a pad probe
647      * might release and destroy the pad */
648     gst_object_ref (pad);
649     GST_OBJECT_UNLOCK (tee);
650
651     if (pad == tee->pull_pad) {
652       ret = GST_FLOW_OK;
653     } else if (is_list) {
654       ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
655     } else {
656       ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
657     }
658
659     gst_object_unref (pad);
660
661     if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) {
662       ret = GST_FLOW_OK;
663     }
664
665     return ret;
666   }
667
668   /* mark all pads as 'not pushed on yet' */
669   g_list_foreach (pads, (GFunc) clear_pads, tee);
670
671 restart:
672   if (tee->allow_not_linked) {
673     cret = GST_FLOW_OK;
674   } else {
675     cret = GST_FLOW_NOT_LINKED;
676   }
677   pads = GST_ELEMENT_CAST (tee)->srcpads;
678   cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
679
680   while (pads) {
681     GstPad *pad;
682
683     pad = GST_PAD_CAST (pads->data);
684
685     if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
686       /* not yet pushed, release lock and start pushing */
687       gst_object_ref (pad);
688       GST_OBJECT_UNLOCK (tee);
689
690       GST_LOG_OBJECT (pad, "Starting to push %s %p",
691           is_list ? "list" : "buffer", data);
692
693       ret = gst_tee_do_push (tee, pad, data, is_list);
694
695       GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data,
696           gst_flow_get_name (ret));
697
698       GST_OBJECT_LOCK (tee);
699       /* keep track of which pad we pushed and the result value */
700       GST_TEE_PAD_CAST (pad)->pushed = TRUE;
701       GST_TEE_PAD_CAST (pad)->result = ret;
702       gst_object_unref (pad);
703       pad = NULL;
704     } else {
705       /* already pushed, use previous return value */
706       ret = GST_TEE_PAD_CAST (pad)->result;
707       GST_LOG_OBJECT (pad, "pad already pushed with %s",
708           gst_flow_get_name (ret));
709     }
710
711     /* before we go combining the return value, check if the pad list is still
712      * the same. It could be possible that the pad we just pushed was removed
713      * and the return value it not valid anymore */
714     if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
715       GST_LOG_OBJECT (tee, "pad list changed");
716       /* the list of pads changed, restart iteration. Pads that we already
717        * pushed on and are still in the new list, will not be pushed on
718        * again. */
719       goto restart;
720     }
721
722     /* stop pushing more buffers when we have a fatal error */
723     if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
724       goto error;
725
726     /* keep all other return values, overwriting the previous one. */
727     if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
728       GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
729       cret = ret;
730     }
731     pads = g_list_next (pads);
732   }
733   GST_OBJECT_UNLOCK (tee);
734
735   gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
736
737   /* no need to unset gvalue */
738   return cret;
739
740   /* ERRORS */
741 no_pads:
742   {
743     if (tee->allow_not_linked) {
744       GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s",
745           is_list ? "buffer-list" : "buffer");
746       ret = GST_FLOW_OK;
747     } else {
748       GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
749       ret = GST_FLOW_NOT_LINKED;
750     }
751     goto end;
752   }
753 error:
754   {
755     GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
756     goto end;
757   }
758 end:
759   {
760     GST_OBJECT_UNLOCK (tee);
761     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
762     return ret;
763   }
764 }
765
766 static GstFlowReturn
767 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
768 {
769   GstFlowReturn res;
770   GstTee *tee;
771
772   tee = GST_TEE_CAST (parent);
773
774   GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
775
776   res = gst_tee_handle_data (tee, buffer, FALSE);
777
778   GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
779
780   return res;
781 }
782
783 static GstFlowReturn
784 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
785 {
786   GstFlowReturn res;
787   GstTee *tee;
788
789   tee = GST_TEE_CAST (parent);
790
791   GST_DEBUG_OBJECT (tee, "received list %p", list);
792
793   res = gst_tee_handle_data (tee, list, TRUE);
794
795   GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
796
797   return res;
798 }
799
800 static gboolean
801 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
802     gboolean active)
803 {
804   gboolean res;
805   GstTee *tee;
806
807   tee = GST_TEE (parent);
808
809   switch (mode) {
810     case GST_PAD_MODE_PUSH:
811     {
812       GST_OBJECT_LOCK (tee);
813       tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
814
815       if (active && !tee->has_chain)
816         goto no_chain;
817       GST_OBJECT_UNLOCK (tee);
818       res = TRUE;
819       break;
820     }
821     default:
822       res = FALSE;
823       break;
824   }
825   return res;
826
827   /* ERRORS */
828 no_chain:
829   {
830     GST_OBJECT_UNLOCK (tee);
831     GST_INFO_OBJECT (tee,
832         "Tee cannot operate in push mode with has-chain==FALSE");
833     return FALSE;
834   }
835 }
836
837 static gboolean
838 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
839     gboolean active)
840 {
841   GstTee *tee;
842   gboolean res;
843   GstPad *sinkpad;
844
845   tee = GST_TEE (parent);
846
847   switch (mode) {
848     case GST_PAD_MODE_PULL:
849     {
850       GST_OBJECT_LOCK (tee);
851
852       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
853         goto cannot_pull;
854
855       if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
856         goto cannot_pull_multiple_srcs;
857
858       sinkpad = gst_object_ref (tee->sinkpad);
859
860       GST_OBJECT_UNLOCK (tee);
861
862       res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
863       gst_object_unref (sinkpad);
864
865       if (!res)
866         goto sink_activate_failed;
867
868       GST_OBJECT_LOCK (tee);
869       if (active) {
870         if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
871           tee->pull_pad = pad;
872       } else {
873         if (pad == tee->pull_pad)
874           tee->pull_pad = NULL;
875       }
876       tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
877       GST_OBJECT_UNLOCK (tee);
878       break;
879     }
880     default:
881       res = TRUE;
882       break;
883   }
884
885   return res;
886
887   /* ERRORS */
888 cannot_pull:
889   {
890     GST_OBJECT_UNLOCK (tee);
891     GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
892         "set to NEVER");
893     return FALSE;
894   }
895 cannot_pull_multiple_srcs:
896   {
897     GST_OBJECT_UNLOCK (tee);
898     GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
899         "pull-mode set to SINGLE");
900     return FALSE;
901   }
902 sink_activate_failed:
903   {
904     GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
905         active ? "" : "de");
906     return FALSE;
907   }
908 }
909
910 static gboolean
911 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
912 {
913   GstTee *tee;
914   gboolean res;
915   GstPad *sinkpad;
916
917   tee = GST_TEE (parent);
918
919   switch (GST_QUERY_TYPE (query)) {
920     case GST_QUERY_SCHEDULING:
921     {
922       gboolean pull_mode;
923
924       GST_OBJECT_LOCK (tee);
925       pull_mode = TRUE;
926       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
927         GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
928             "set to NEVER");
929         pull_mode = FALSE;
930       } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
931         GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
932             "pull-mode set to SINGLE");
933         pull_mode = FALSE;
934       }
935
936       sinkpad = gst_object_ref (tee->sinkpad);
937       GST_OBJECT_UNLOCK (tee);
938
939       if (pull_mode) {
940         /* ask peer if we can operate in pull mode */
941         res = gst_pad_peer_query (sinkpad, query);
942       } else {
943         res = TRUE;
944       }
945       gst_object_unref (sinkpad);
946       break;
947     }
948     default:
949       res = gst_pad_query_default (pad, parent, query);
950       break;
951   }
952
953   return res;
954 }
955
956 static void
957 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
958 {
959   GstPad *pad = g_value_get_object (vpad);
960
961   if (pad != tee->pull_pad)
962     gst_pad_push_event (pad, gst_event_new_eos ());
963 }
964
965 static void
966 gst_tee_pull_eos (GstTee * tee)
967 {
968   GstIterator *iter;
969
970   iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
971   gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos,
972       tee);
973   gst_iterator_free (iter);
974 }
975
976 static GstFlowReturn
977 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
978     guint length, GstBuffer ** buf)
979 {
980   GstTee *tee;
981   GstFlowReturn ret;
982
983   tee = GST_TEE (parent);
984
985   ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
986
987   if (ret == GST_FLOW_OK)
988     ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
989   else if (ret == GST_FLOW_EOS)
990     gst_tee_pull_eos (tee);
991
992   return ret;
993 }