Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / plugins / elements / gsttee.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4  *               2007 Wim Taymans <wim.taymans@gmail.com>
5  *
6  * gsttee.c: Tee element, one in N out
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 /**
25  * SECTION:element-tee
26  * @see_also: #GstIdentity
27  *
28  * Split data to multiple pads. Branching the data flow is useful when e.g.
29  * capturing a video where the video is shown on the screen and also encoded and
30  * written to a file. Another example is playing music and hooking up a
31  * visualisation module.
32  *
33  * One needs to use separate queue elements (or a multiqueue) in each branch to
34  * provide separate threads for each branch. Otherwise a blocked dataflow in one
35  * branch would stall the other branches.
36  *
37  * <refsect2>
38  * <title>Example launch line</title>
39  * |[
40  * gst-launch filesrc location=song.ogg ! decodebin2 ! tee name=t ! queue ! autoaudiosink t. ! queue ! audioconvert ! goom ! ffmpegcolorspace ! autovideosink
41  * ]| Play a song.ogg from local dir and render visualisations using the goom
42  * element.
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #  include "config.h"
48 #endif
49
50 #include "gsttee.h"
51 #include "gst/glib-compat-private.h"
52
53 #include <string.h>
54
55 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
56     GST_PAD_SINK,
57     GST_PAD_ALWAYS,
58     GST_STATIC_CAPS_ANY);
59
60 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
61 #define GST_CAT_DEFAULT gst_tee_debug
62
63 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
64 static GType
65 gst_tee_pull_mode_get_type (void)
66 {
67   static GType type = 0;
68   static const GEnumValue data[] = {
69     {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
70     {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
71         "single"},
72     {0, NULL, NULL},
73   };
74
75   if (!type) {
76     type = g_enum_register_static ("GstTeePullMode", data);
77   }
78   return type;
79 }
80
81 /* lock to protect request pads from being removed while downstream */
82 #define GST_TEE_DYN_LOCK(tee) g_mutex_lock ((tee)->dyn_lock)
83 #define GST_TEE_DYN_UNLOCK(tee) g_mutex_unlock ((tee)->dyn_lock)
84
85 #define DEFAULT_PROP_NUM_SRC_PADS       0
86 #define DEFAULT_PROP_HAS_SINK_LOOP      FALSE
87 #define DEFAULT_PROP_HAS_CHAIN          TRUE
88 #define DEFAULT_PROP_SILENT             TRUE
89 #define DEFAULT_PROP_LAST_MESSAGE       NULL
90 #define DEFAULT_PULL_MODE               GST_TEE_PULL_MODE_NEVER
91
92 enum
93 {
94   PROP_0,
95   PROP_NUM_SRC_PADS,
96   PROP_HAS_SINK_LOOP,
97   PROP_HAS_CHAIN,
98   PROP_SILENT,
99   PROP_LAST_MESSAGE,
100   PROP_PULL_MODE,
101   PROP_ALLOC_PAD,
102 };
103
104 static GstStaticPadTemplate tee_src_template =
105 GST_STATIC_PAD_TEMPLATE ("src_%u",
106     GST_PAD_SRC,
107     GST_PAD_REQUEST,
108     GST_STATIC_CAPS_ANY);
109
110 /* structure and quark to keep track of which pads have been pushed */
111 static GQuark push_data;
112
113 #define _do_init \
114     GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element"); \
115     push_data = g_quark_from_static_string ("tee-push-data");
116 #define gst_tee_parent_class parent_class
117 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
118
119 static GParamSpec *pspec_last_message = NULL;
120 static GParamSpec *pspec_alloc_pad = NULL;
121
122 typedef struct
123 {
124   gboolean pushed;
125   GstFlowReturn result;
126   gboolean removed;
127 } PushData;
128
129 static GstPad *gst_tee_request_new_pad (GstElement * element,
130     GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
131 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
132
133 static void gst_tee_finalize (GObject * object);
134 static void gst_tee_set_property (GObject * object, guint prop_id,
135     const GValue * value, GParamSpec * pspec);
136 static void gst_tee_get_property (GObject * object, guint prop_id,
137     GValue * value, GParamSpec * pspec);
138 static void gst_tee_dispose (GObject * object);
139
140 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
141     GstBuffer * buffer);
142 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
143     GstBufferList * list);
144 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
145     GstEvent * event);
146 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
147     GstQuery * query);
148 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
149     GstPadMode mode, gboolean active);
150 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
151     GstQuery * query);
152 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
153     GstPadMode mode, gboolean active);
154 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
155     guint64 offset, guint length, GstBuffer ** buf);
156
157 static void
158 gst_tee_dispose (GObject * object)
159 {
160   GList *item;
161
162 restart:
163   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
164     GstPad *pad = GST_PAD (item->data);
165     if (GST_PAD_IS_SRC (pad)) {
166       gst_element_release_request_pad (GST_ELEMENT (object), pad);
167       goto restart;
168     }
169   }
170
171   G_OBJECT_CLASS (parent_class)->dispose (object);
172 }
173
174 static void
175 gst_tee_finalize (GObject * object)
176 {
177   GstTee *tee;
178
179   tee = GST_TEE (object);
180
181   g_free (tee->last_message);
182
183   g_mutex_free (tee->dyn_lock);
184
185   G_OBJECT_CLASS (parent_class)->finalize (object);
186 }
187
188 static void
189 gst_tee_class_init (GstTeeClass * klass)
190 {
191   GObjectClass *gobject_class;
192   GstElementClass *gstelement_class;
193
194   gobject_class = G_OBJECT_CLASS (klass);
195   gstelement_class = GST_ELEMENT_CLASS (klass);
196
197   gobject_class->finalize = gst_tee_finalize;
198   gobject_class->set_property = gst_tee_set_property;
199   gobject_class->get_property = gst_tee_get_property;
200   gobject_class->dispose = gst_tee_dispose;
201
202   g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
203       g_param_spec_int ("num-src-pads", "Num Src Pads",
204           "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
205           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_HAS_SINK_LOOP,
207       g_param_spec_boolean ("has-sink-loop", "Has Sink Loop",
208           "If the element should spawn a thread (unimplemented and deprecated)",
209           DEFAULT_PROP_HAS_SINK_LOOP,
210           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
211   g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
212       g_param_spec_boolean ("has-chain", "Has Chain",
213           "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
214           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215   g_object_class_install_property (gobject_class, PROP_SILENT,
216       g_param_spec_boolean ("silent", "Silent",
217           "Don't produce last_message events", DEFAULT_PROP_SILENT,
218           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219   pspec_last_message = g_param_spec_string ("last-message", "Last Message",
220       "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
221       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
222   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
223       pspec_last_message);
224   g_object_class_install_property (gobject_class, PROP_PULL_MODE,
225       g_param_spec_enum ("pull-mode", "Pull mode",
226           "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
227           DEFAULT_PULL_MODE,
228           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
229   pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
230       "The pad used for gst_pad_alloc_buffer", GST_TYPE_PAD,
231       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
232   g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
233       pspec_alloc_pad);
234
235   gst_element_class_set_details_simple (gstelement_class,
236       "Tee pipe fitting",
237       "Generic",
238       "1-to-N pipe fitting",
239       "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
240   gst_element_class_add_pad_template (gstelement_class,
241       gst_static_pad_template_get (&sinktemplate));
242   gst_element_class_add_pad_template (gstelement_class,
243       gst_static_pad_template_get (&tee_src_template));
244
245   gstelement_class->request_new_pad =
246       GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
247   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
248 }
249
250 static void
251 gst_tee_init (GstTee * tee)
252 {
253   tee->dyn_lock = g_mutex_new ();
254
255   tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
256   tee->sink_mode = GST_PAD_MODE_NONE;
257
258   gst_pad_set_event_function (tee->sinkpad,
259       GST_DEBUG_FUNCPTR (gst_tee_sink_event));
260   gst_pad_set_query_function (tee->sinkpad,
261       GST_DEBUG_FUNCPTR (gst_tee_sink_query));
262   gst_pad_set_activatemode_function (tee->sinkpad,
263       GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
264   gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
265   gst_pad_set_chain_list_function (tee->sinkpad,
266       GST_DEBUG_FUNCPTR (gst_tee_chain_list));
267   GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
268   gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
269
270   tee->last_message = NULL;
271 }
272
273 static void
274 gst_tee_notify_alloc_pad (GstTee * tee)
275 {
276 #if !GLIB_CHECK_VERSION(2,26,0)
277   g_object_notify ((GObject *) tee, "alloc-pad");
278 #else
279   g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
280 #endif
281 }
282
283 static gboolean
284 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
285 {
286   GstPad *srcpad = GST_PAD_CAST (user_data);
287
288   gst_pad_push_event (srcpad, gst_event_ref (*event));
289
290   return TRUE;
291 }
292
293 static GstPad *
294 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
295     const gchar * unused, const GstCaps * caps)
296 {
297   gchar *name;
298   GstPad *srcpad;
299   GstTee *tee;
300   GstPadMode mode;
301   gboolean res;
302   PushData *data;
303
304   tee = GST_TEE (element);
305
306   GST_DEBUG_OBJECT (tee, "requesting pad");
307
308   GST_OBJECT_LOCK (tee);
309   name = g_strdup_printf ("src_%u", tee->pad_counter++);
310
311   srcpad = gst_pad_new_from_template (templ, name);
312   g_free (name);
313
314   mode = tee->sink_mode;
315
316   /* install the data, we automatically free it when the pad is disposed because
317    * of _release_pad or when the element goes away. */
318   data = g_new0 (PushData, 1);
319   data->pushed = FALSE;
320   data->result = GST_FLOW_NOT_LINKED;
321   data->removed = FALSE;
322   g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free);
323
324   GST_OBJECT_UNLOCK (tee);
325
326   switch (mode) {
327     case GST_PAD_MODE_PULL:
328       /* we already have a src pad in pull mode, and our pull mode can only be
329          SINGLE, so fall through to activate this new pad in push mode */
330     case GST_PAD_MODE_PUSH:
331       res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
332       break;
333     default:
334       res = TRUE;
335       break;
336   }
337
338   if (!res)
339     goto activate_failed;
340
341   gst_pad_set_activatemode_function (srcpad,
342       GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
343   gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
344   gst_pad_set_getrange_function (srcpad,
345       GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
346   /* Forward sticky events to the new srcpad */
347   gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
348   GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
349   gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
350
351   return srcpad;
352
353   /* ERRORS */
354 activate_failed:
355   {
356     gboolean changed = FALSE;
357
358     GST_OBJECT_LOCK (tee);
359     GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
360     if (tee->allocpad == srcpad) {
361       tee->allocpad = NULL;
362       changed = TRUE;
363     }
364     GST_OBJECT_UNLOCK (tee);
365     gst_object_unref (srcpad);
366     if (changed) {
367       gst_tee_notify_alloc_pad (tee);
368     }
369     return NULL;
370   }
371 }
372
373 static void
374 gst_tee_release_pad (GstElement * element, GstPad * pad)
375 {
376   GstTee *tee;
377   PushData *data;
378   gboolean changed = FALSE;
379
380   tee = GST_TEE (element);
381
382   GST_DEBUG_OBJECT (tee, "releasing pad");
383
384   /* wait for pending pad_alloc to finish */
385   GST_TEE_DYN_LOCK (tee);
386   data = g_object_get_qdata (G_OBJECT (pad), push_data);
387
388   GST_OBJECT_LOCK (tee);
389   /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
390   data->removed = TRUE;
391   if (tee->allocpad == pad) {
392     tee->allocpad = NULL;
393     changed = TRUE;
394   }
395   GST_OBJECT_UNLOCK (tee);
396
397   gst_object_ref (pad);
398   gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
399
400   gst_pad_set_active (pad, FALSE);
401   GST_TEE_DYN_UNLOCK (tee);
402
403   gst_object_unref (pad);
404
405   if (changed) {
406     gst_tee_notify_alloc_pad (tee);
407   }
408 }
409
410 static void
411 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
412     GParamSpec * pspec)
413 {
414   GstTee *tee = GST_TEE (object);
415
416   GST_OBJECT_LOCK (tee);
417   switch (prop_id) {
418     case PROP_HAS_SINK_LOOP:
419       tee->has_sink_loop = g_value_get_boolean (value);
420       if (tee->has_sink_loop) {
421         g_warning ("tee will never implement has-sink-loop==TRUE");
422       }
423       break;
424     case PROP_HAS_CHAIN:
425       tee->has_chain = g_value_get_boolean (value);
426       break;
427     case PROP_SILENT:
428       tee->silent = g_value_get_boolean (value);
429       break;
430     case PROP_PULL_MODE:
431       tee->pull_mode = g_value_get_enum (value);
432       break;
433     case PROP_ALLOC_PAD:
434     {
435       GstPad *pad = g_value_get_object (value);
436       GST_OBJECT_LOCK (pad);
437       if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
438         tee->allocpad = pad;
439       else
440         GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
441             " is not my pad", GST_OBJECT_NAME (pad));
442       GST_OBJECT_UNLOCK (pad);
443       break;
444     }
445     default:
446       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
447       break;
448   }
449   GST_OBJECT_UNLOCK (tee);
450 }
451
452 static void
453 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
454     GParamSpec * pspec)
455 {
456   GstTee *tee = GST_TEE (object);
457
458   GST_OBJECT_LOCK (tee);
459   switch (prop_id) {
460     case PROP_NUM_SRC_PADS:
461       g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
462       break;
463     case PROP_HAS_SINK_LOOP:
464       g_value_set_boolean (value, tee->has_sink_loop);
465       break;
466     case PROP_HAS_CHAIN:
467       g_value_set_boolean (value, tee->has_chain);
468       break;
469     case PROP_SILENT:
470       g_value_set_boolean (value, tee->silent);
471       break;
472     case PROP_LAST_MESSAGE:
473       g_value_set_string (value, tee->last_message);
474       break;
475     case PROP_PULL_MODE:
476       g_value_set_enum (value, tee->pull_mode);
477       break;
478     case PROP_ALLOC_PAD:
479       g_value_set_object (value, tee->allocpad);
480       break;
481     default:
482       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
483       break;
484   }
485   GST_OBJECT_UNLOCK (tee);
486 }
487
488 static gboolean
489 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
490 {
491   gboolean res;
492
493   switch (GST_EVENT_TYPE (event)) {
494     default:
495       res = gst_pad_event_default (pad, parent, event);
496       break;
497   }
498
499   return res;
500 }
501
502 static gboolean
503 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
504 {
505   gboolean res;
506
507   switch (GST_QUERY_TYPE (query)) {
508     default:
509       res = gst_pad_query_default (pad, parent, query);
510       break;
511   }
512   return res;
513 }
514
515 static void
516 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
517 {
518   GST_OBJECT_LOCK (tee);
519   g_free (tee->last_message);
520   if (is_list) {
521     tee->last_message =
522         g_strdup_printf ("chain-list   ******* (%s:%s)t %p",
523         GST_DEBUG_PAD_NAME (pad), data);
524   } else {
525     tee->last_message =
526         g_strdup_printf ("chain        ******* (%s:%s)t (%" G_GSIZE_FORMAT
527         " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
528         gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
529   }
530   GST_OBJECT_UNLOCK (tee);
531
532 #if !GLIB_CHECK_VERSION(2,26,0)
533   g_object_notify ((GObject *) tee, "last-message");
534 #else
535   g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
536 #endif
537 }
538
539 static GstFlowReturn
540 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
541 {
542   GstFlowReturn res;
543
544   /* Push */
545   if (pad == tee->pull_pad) {
546     /* don't push on the pad we're pulling from */
547     res = GST_FLOW_OK;
548   } else if (is_list) {
549     res =
550         gst_pad_push_list (pad,
551         gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
552   } else {
553     res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
554   }
555   return res;
556 }
557
558 static void
559 clear_pads (GstPad * pad, GstTee * tee)
560 {
561   PushData *data;
562
563   data = g_object_get_qdata ((GObject *) pad, push_data);
564
565   /* the data must be there or we have a screwed up internal state */
566   g_assert (data != NULL);
567
568   data->pushed = FALSE;
569   data->result = GST_FLOW_NOT_LINKED;
570 }
571
572 static GstFlowReturn
573 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
574 {
575   GList *pads;
576   guint32 cookie;
577   GstFlowReturn ret, cret;
578
579   if (G_UNLIKELY (!tee->silent))
580     gst_tee_do_message (tee, tee->sinkpad, data, is_list);
581
582   GST_OBJECT_LOCK (tee);
583   pads = GST_ELEMENT_CAST (tee)->srcpads;
584
585   /* special case for zero pads */
586   if (G_UNLIKELY (!pads))
587     goto no_pads;
588
589   /* special case for just one pad that avoids reffing the buffer */
590   if (!pads->next) {
591     GstPad *pad = GST_PAD_CAST (pads->data);
592
593     GST_OBJECT_UNLOCK (tee);
594
595     if (pad == tee->pull_pad) {
596       ret = GST_FLOW_OK;
597     } else if (is_list) {
598       ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
599     } else {
600       ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
601     }
602     return ret;
603   }
604
605   /* mark all pads as 'not pushed on yet' */
606   g_list_foreach (pads, (GFunc) clear_pads, tee);
607
608 restart:
609   cret = GST_FLOW_NOT_LINKED;
610   pads = GST_ELEMENT_CAST (tee)->srcpads;
611   cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
612
613   while (pads) {
614     GstPad *pad;
615     PushData *pdata;
616
617     pad = GST_PAD_CAST (pads->data);
618
619     /* get the private data, something is really wrong with the internal state
620      * when it is not there */
621     pdata = g_object_get_qdata ((GObject *) pad, push_data);
622     g_assert (pdata != NULL);
623
624     if (G_LIKELY (!pdata->pushed)) {
625       /* not yet pushed, release lock and start pushing */
626       gst_object_ref (pad);
627       GST_OBJECT_UNLOCK (tee);
628
629       GST_LOG_OBJECT (tee, "Starting to push %s %p",
630           is_list ? "list" : "buffer", data);
631
632       ret = gst_tee_do_push (tee, pad, data, is_list);
633
634       GST_LOG_OBJECT (tee, "Pushing item %p yielded result %s", data,
635           gst_flow_get_name (ret));
636
637       GST_OBJECT_LOCK (tee);
638       /* keep track of which pad we pushed and the result value. We need to do
639        * this before we release the refcount on the pad, the PushData is
640        * destroyed when the last ref of the pad goes away. */
641       pdata->pushed = TRUE;
642       pdata->result = ret;
643       gst_object_unref (pad);
644     } else {
645       /* already pushed, use previous return value */
646       ret = pdata->result;
647       GST_LOG_OBJECT (tee, "pad already pushed with %s",
648           gst_flow_get_name (ret));
649     }
650
651     /* before we go combining the return value, check if the pad list is still
652      * the same. It could be possible that the pad we just pushed was removed
653      * and the return value it not valid anymore */
654     if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
655       GST_LOG_OBJECT (tee, "pad list changed");
656       /* the list of pads changed, restart iteration. Pads that we already
657        * pushed on and are still in the new list, will not be pushed on
658        * again. */
659       goto restart;
660     }
661
662     /* stop pushing more buffers when we have a fatal error */
663     if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
664       goto error;
665
666     /* keep all other return values, overwriting the previous one. */
667     if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
668       GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
669       cret = ret;
670     }
671     pads = g_list_next (pads);
672   }
673   GST_OBJECT_UNLOCK (tee);
674
675   gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
676
677   /* no need to unset gvalue */
678   return cret;
679
680   /* ERRORS */
681 no_pads:
682   {
683     GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
684     ret = GST_FLOW_NOT_LINKED;
685     goto error;
686   }
687 error:
688   {
689     GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
690     GST_OBJECT_UNLOCK (tee);
691     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
692     return ret;
693   }
694 }
695
696 static GstFlowReturn
697 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
698 {
699   GstFlowReturn res;
700   GstTee *tee;
701
702   tee = GST_TEE_CAST (parent);
703
704   GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
705
706   res = gst_tee_handle_data (tee, buffer, FALSE);
707
708   GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
709
710   return res;
711 }
712
713 static GstFlowReturn
714 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
715 {
716   GstFlowReturn res;
717   GstTee *tee;
718
719   tee = GST_TEE_CAST (parent);
720
721   GST_DEBUG_OBJECT (tee, "received list %p", list);
722
723   res = gst_tee_handle_data (tee, list, TRUE);
724
725   GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
726
727   return res;
728 }
729
730 static gboolean
731 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
732     gboolean active)
733 {
734   gboolean res;
735   GstTee *tee;
736
737   tee = GST_TEE (parent);
738
739   switch (mode) {
740     case GST_PAD_MODE_PUSH:
741     {
742       GST_OBJECT_LOCK (tee);
743       tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
744
745       if (active && !tee->has_chain)
746         goto no_chain;
747       GST_OBJECT_UNLOCK (tee);
748       res = TRUE;
749       break;
750     }
751     default:
752       res = FALSE;
753       break;
754   }
755   return res;
756
757   /* ERRORS */
758 no_chain:
759   {
760     GST_OBJECT_UNLOCK (tee);
761     GST_INFO_OBJECT (tee,
762         "Tee cannot operate in push mode with has-chain==FALSE");
763     return FALSE;
764   }
765 }
766
767 static gboolean
768 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
769     gboolean active)
770 {
771   GstTee *tee;
772   gboolean res;
773   GstPad *sinkpad;
774
775   tee = GST_TEE (parent);
776
777   switch (mode) {
778     case GST_PAD_MODE_PULL:
779     {
780       GST_OBJECT_LOCK (tee);
781
782       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
783         goto cannot_pull;
784
785       if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
786         goto cannot_pull_multiple_srcs;
787
788       sinkpad = gst_object_ref (tee->sinkpad);
789
790       GST_OBJECT_UNLOCK (tee);
791
792       res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
793       gst_object_unref (sinkpad);
794
795       if (!res)
796         goto sink_activate_failed;
797
798       GST_OBJECT_LOCK (tee);
799       if (active) {
800         if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
801           tee->pull_pad = pad;
802       } else {
803         if (pad == tee->pull_pad)
804           tee->pull_pad = NULL;
805       }
806       tee->sink_mode = active & GST_PAD_MODE_PULL;
807       GST_OBJECT_UNLOCK (tee);
808       break;
809     }
810     default:
811       res = TRUE;
812       break;
813   }
814   return res;
815
816   /* ERRORS */
817 cannot_pull:
818   {
819     GST_OBJECT_UNLOCK (tee);
820     GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
821         "set to NEVER");
822     return FALSE;
823   }
824 cannot_pull_multiple_srcs:
825   {
826     GST_OBJECT_UNLOCK (tee);
827     GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
828         "pull-mode set to SINGLE");
829     return FALSE;
830   }
831 sink_activate_failed:
832   {
833     GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
834         active ? "" : "de");
835     return FALSE;
836   }
837 }
838
839 static gboolean
840 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
841 {
842   GstTee *tee;
843   gboolean res;
844   GstPad *sinkpad;
845
846   tee = GST_TEE (parent);
847
848   switch (GST_QUERY_TYPE (query)) {
849     case GST_QUERY_SCHEDULING:
850     {
851       gboolean pull_mode;
852
853       GST_OBJECT_LOCK (tee);
854       pull_mode = TRUE;
855       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
856         GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
857             "set to NEVER");
858         pull_mode = FALSE;
859       } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
860         GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
861             "pull-mode set to SINGLE");
862         pull_mode = FALSE;
863       }
864
865       sinkpad = gst_object_ref (tee->sinkpad);
866       GST_OBJECT_UNLOCK (tee);
867
868       if (pull_mode) {
869         /* ask peer if we can operate in pull mode */
870         res = gst_pad_peer_query (sinkpad, query);
871       } else {
872         res = TRUE;
873       }
874       gst_object_unref (sinkpad);
875       break;
876     }
877     default:
878       res = gst_pad_query_default (pad, parent, query);
879       break;
880   }
881
882   return res;
883 }
884
885 static void
886 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
887 {
888   GstPad *pad = g_value_get_object (vpad);
889
890   if (pad != tee->pull_pad)
891     gst_pad_push_event (pad, gst_event_new_eos ());
892 }
893
894 static void
895 gst_tee_pull_eos (GstTee * tee)
896 {
897   GstIterator *iter;
898
899   iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
900   gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos,
901       tee);
902   gst_iterator_free (iter);
903 }
904
905 static GstFlowReturn
906 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
907     guint length, GstBuffer ** buf)
908 {
909   GstTee *tee;
910   GstFlowReturn ret;
911
912   tee = GST_TEE (parent);
913
914   ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
915
916   if (ret == GST_FLOW_OK)
917     ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
918   else if (ret == GST_FLOW_EOS)
919     gst_tee_pull_eos (tee);
920
921   return ret;
922 }