Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / plugins / elements / gsttee.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4  *               2007 Wim Taymans <wim.taymans@gmail.com>
5  *
6  * gsttee.c: Tee element, one in N out
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 /**
25  * SECTION:element-tee
26  * @see_also: #GstIdentity
27  *
28  * Split data to multiple pads. Branching the data flow is useful when e.g.
29  * capturing a video where the video is shown on the screen and also encoded and
30  * written to a file. Another example is playing music and hooking up a
31  * visualisation module.
32  *
33  * One needs to use separate queue elements (or a multiqueue) in each branch to
34  * provide separate threads for each branch. Otherwise a blocked dataflow in one
35  * branch would stall the other branches.
36  *
37  * <refsect2>
38  * <title>Example launch line</title>
39  * |[
40  * gst-launch filesrc location=song.ogg ! decodebin2 ! tee name=t ! queue ! autoaudiosink t. ! queue ! audioconvert ! goom ! ffmpegcolorspace ! autovideosink
41  * ]| Play a song.ogg from local dir and render visualisations using the goom
42  * element.
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #  include "config.h"
48 #endif
49
50 #include "gsttee.h"
51
52 #include <string.h>
53
54 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
55     GST_PAD_SINK,
56     GST_PAD_ALWAYS,
57     GST_STATIC_CAPS_ANY);
58
59 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
60 #define GST_CAT_DEFAULT gst_tee_debug
61
62 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
63 static GType
64 gst_tee_pull_mode_get_type (void)
65 {
66   static GType type = 0;
67   static const GEnumValue data[] = {
68     {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
69     {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
70         "single"},
71     {0, NULL, NULL},
72   };
73
74   if (!type) {
75     type = g_enum_register_static ("GstTeePullMode", data);
76   }
77   return type;
78 }
79
80 /* lock to protect request pads from being removed while downstream */
81 #define GST_TEE_DYN_LOCK(tee) g_mutex_lock ((tee)->dyn_lock)
82 #define GST_TEE_DYN_UNLOCK(tee) g_mutex_unlock ((tee)->dyn_lock)
83
84 #define DEFAULT_PROP_NUM_SRC_PADS       0
85 #define DEFAULT_PROP_HAS_SINK_LOOP      FALSE
86 #define DEFAULT_PROP_HAS_CHAIN          TRUE
87 #define DEFAULT_PROP_SILENT             TRUE
88 #define DEFAULT_PROP_LAST_MESSAGE       NULL
89 #define DEFAULT_PULL_MODE               GST_TEE_PULL_MODE_NEVER
90
91 enum
92 {
93   PROP_0,
94   PROP_NUM_SRC_PADS,
95   PROP_HAS_SINK_LOOP,
96   PROP_HAS_CHAIN,
97   PROP_SILENT,
98   PROP_LAST_MESSAGE,
99   PROP_PULL_MODE,
100   PROP_ALLOC_PAD,
101 };
102
103 static GstStaticPadTemplate tee_src_template =
104 GST_STATIC_PAD_TEMPLATE ("src_%u",
105     GST_PAD_SRC,
106     GST_PAD_REQUEST,
107     GST_STATIC_CAPS_ANY);
108
109 /* structure and quark to keep track of which pads have been pushed */
110 static GQuark push_data;
111
112 #define _do_init \
113     GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element"); \
114     push_data = g_quark_from_static_string ("tee-push-data");
115 #define gst_tee_parent_class parent_class
116 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
117
118 static GParamSpec *pspec_last_message = NULL;
119 static GParamSpec *pspec_alloc_pad = NULL;
120
121 typedef struct
122 {
123   gboolean pushed;
124   GstFlowReturn result;
125   gboolean removed;
126 } PushData;
127
128 static GstPad *gst_tee_request_new_pad (GstElement * element,
129     GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
130 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
131
132 static void gst_tee_finalize (GObject * object);
133 static void gst_tee_set_property (GObject * object, guint prop_id,
134     const GValue * value, GParamSpec * pspec);
135 static void gst_tee_get_property (GObject * object, guint prop_id,
136     GValue * value, GParamSpec * pspec);
137 static void gst_tee_dispose (GObject * object);
138
139 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
140     GstBuffer * buffer);
141 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
142     GstBufferList * list);
143 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
144     GstEvent * event);
145 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
146     GstQuery * query);
147 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
148     GstPadMode mode, gboolean active);
149 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
150     GstQuery * query);
151 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
152     GstPadMode mode, gboolean active);
153 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
154     guint64 offset, guint length, GstBuffer ** buf);
155
156 static void
157 gst_tee_dispose (GObject * object)
158 {
159   GList *item;
160
161 restart:
162   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
163     GstPad *pad = GST_PAD (item->data);
164     if (GST_PAD_IS_SRC (pad)) {
165       gst_element_release_request_pad (GST_ELEMENT (object), pad);
166       goto restart;
167     }
168   }
169
170   G_OBJECT_CLASS (parent_class)->dispose (object);
171 }
172
173 static void
174 gst_tee_finalize (GObject * object)
175 {
176   GstTee *tee;
177
178   tee = GST_TEE (object);
179
180   g_free (tee->last_message);
181
182   g_mutex_free (tee->dyn_lock);
183
184   G_OBJECT_CLASS (parent_class)->finalize (object);
185 }
186
187 static void
188 gst_tee_class_init (GstTeeClass * klass)
189 {
190   GObjectClass *gobject_class;
191   GstElementClass *gstelement_class;
192
193   gobject_class = G_OBJECT_CLASS (klass);
194   gstelement_class = GST_ELEMENT_CLASS (klass);
195
196   gobject_class->finalize = gst_tee_finalize;
197   gobject_class->set_property = gst_tee_set_property;
198   gobject_class->get_property = gst_tee_get_property;
199   gobject_class->dispose = gst_tee_dispose;
200
201   g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
202       g_param_spec_int ("num-src-pads", "Num Src Pads",
203           "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
204           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
205   g_object_class_install_property (gobject_class, PROP_HAS_SINK_LOOP,
206       g_param_spec_boolean ("has-sink-loop", "Has Sink Loop",
207           "If the element should spawn a thread (unimplemented and deprecated)",
208           DEFAULT_PROP_HAS_SINK_LOOP,
209           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
211       g_param_spec_boolean ("has-chain", "Has Chain",
212           "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
213           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214   g_object_class_install_property (gobject_class, PROP_SILENT,
215       g_param_spec_boolean ("silent", "Silent",
216           "Don't produce last_message events", DEFAULT_PROP_SILENT,
217           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218   pspec_last_message = g_param_spec_string ("last-message", "Last Message",
219       "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
220       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
221   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
222       pspec_last_message);
223   g_object_class_install_property (gobject_class, PROP_PULL_MODE,
224       g_param_spec_enum ("pull-mode", "Pull mode",
225           "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
226           DEFAULT_PULL_MODE,
227           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228   pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
229       "The pad used for gst_pad_alloc_buffer", GST_TYPE_PAD,
230       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
231   g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
232       pspec_alloc_pad);
233
234   gst_element_class_set_details_simple (gstelement_class,
235       "Tee pipe fitting",
236       "Generic",
237       "1-to-N pipe fitting",
238       "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
239   gst_element_class_add_pad_template (gstelement_class,
240       gst_static_pad_template_get (&sinktemplate));
241   gst_element_class_add_pad_template (gstelement_class,
242       gst_static_pad_template_get (&tee_src_template));
243
244   gstelement_class->request_new_pad =
245       GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
246   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
247 }
248
249 static void
250 gst_tee_init (GstTee * tee)
251 {
252   tee->dyn_lock = g_mutex_new ();
253
254   tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
255   tee->sink_mode = GST_PAD_MODE_NONE;
256
257   gst_pad_set_event_function (tee->sinkpad,
258       GST_DEBUG_FUNCPTR (gst_tee_sink_event));
259   gst_pad_set_query_function (tee->sinkpad,
260       GST_DEBUG_FUNCPTR (gst_tee_sink_query));
261   gst_pad_set_activatemode_function (tee->sinkpad,
262       GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
263   gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
264   gst_pad_set_chain_list_function (tee->sinkpad,
265       GST_DEBUG_FUNCPTR (gst_tee_chain_list));
266   GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
267   gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
268
269   tee->last_message = NULL;
270 }
271
272 static void
273 gst_tee_notify_alloc_pad (GstTee * tee)
274 {
275 #if !GLIB_CHECK_VERSION(2,26,0)
276   g_object_notify ((GObject *) tee, "alloc-pad");
277 #else
278   g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
279 #endif
280 }
281
282 static gboolean
283 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
284 {
285   GstPad *srcpad = GST_PAD_CAST (user_data);
286
287   gst_pad_push_event (srcpad, gst_event_ref (*event));
288
289   return TRUE;
290 }
291
292 static GstPad *
293 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
294     const gchar * unused, const GstCaps * caps)
295 {
296   gchar *name;
297   GstPad *srcpad;
298   GstTee *tee;
299   GstPadMode mode;
300   gboolean res;
301   PushData *data;
302
303   tee = GST_TEE (element);
304
305   GST_DEBUG_OBJECT (tee, "requesting pad");
306
307   GST_OBJECT_LOCK (tee);
308   name = g_strdup_printf ("src_%u", tee->pad_counter++);
309
310   srcpad = gst_pad_new_from_template (templ, name);
311   g_free (name);
312
313   mode = tee->sink_mode;
314
315   /* install the data, we automatically free it when the pad is disposed because
316    * of _release_pad or when the element goes away. */
317   data = g_new0 (PushData, 1);
318   data->pushed = FALSE;
319   data->result = GST_FLOW_NOT_LINKED;
320   data->removed = FALSE;
321   g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free);
322
323   GST_OBJECT_UNLOCK (tee);
324
325   switch (mode) {
326     case GST_PAD_MODE_PULL:
327       /* we already have a src pad in pull mode, and our pull mode can only be
328          SINGLE, so fall through to activate this new pad in push mode */
329     case GST_PAD_MODE_PUSH:
330       res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
331       break;
332     default:
333       res = TRUE;
334       break;
335   }
336
337   if (!res)
338     goto activate_failed;
339
340   gst_pad_set_activatemode_function (srcpad,
341       GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
342   gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
343   gst_pad_set_getrange_function (srcpad,
344       GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
345   /* Forward sticky events to the new srcpad */
346   gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
347   GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
348   gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
349
350   return srcpad;
351
352   /* ERRORS */
353 activate_failed:
354   {
355     gboolean changed = FALSE;
356
357     GST_OBJECT_LOCK (tee);
358     GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
359     if (tee->allocpad == srcpad) {
360       tee->allocpad = NULL;
361       changed = TRUE;
362     }
363     GST_OBJECT_UNLOCK (tee);
364     gst_object_unref (srcpad);
365     if (changed) {
366       gst_tee_notify_alloc_pad (tee);
367     }
368     return NULL;
369   }
370 }
371
372 static void
373 gst_tee_release_pad (GstElement * element, GstPad * pad)
374 {
375   GstTee *tee;
376   PushData *data;
377   gboolean changed = FALSE;
378
379   tee = GST_TEE (element);
380
381   GST_DEBUG_OBJECT (tee, "releasing pad");
382
383   /* wait for pending pad_alloc to finish */
384   GST_TEE_DYN_LOCK (tee);
385   data = g_object_get_qdata (G_OBJECT (pad), push_data);
386
387   GST_OBJECT_LOCK (tee);
388   /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
389   data->removed = TRUE;
390   if (tee->allocpad == pad) {
391     tee->allocpad = NULL;
392     changed = TRUE;
393   }
394   GST_OBJECT_UNLOCK (tee);
395
396   gst_object_ref (pad);
397   gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
398
399   gst_pad_set_active (pad, FALSE);
400   GST_TEE_DYN_UNLOCK (tee);
401
402   gst_object_unref (pad);
403
404   if (changed) {
405     gst_tee_notify_alloc_pad (tee);
406   }
407 }
408
409 static void
410 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
411     GParamSpec * pspec)
412 {
413   GstTee *tee = GST_TEE (object);
414
415   GST_OBJECT_LOCK (tee);
416   switch (prop_id) {
417     case PROP_HAS_SINK_LOOP:
418       tee->has_sink_loop = g_value_get_boolean (value);
419       if (tee->has_sink_loop) {
420         g_warning ("tee will never implement has-sink-loop==TRUE");
421       }
422       break;
423     case PROP_HAS_CHAIN:
424       tee->has_chain = g_value_get_boolean (value);
425       break;
426     case PROP_SILENT:
427       tee->silent = g_value_get_boolean (value);
428       break;
429     case PROP_PULL_MODE:
430       tee->pull_mode = g_value_get_enum (value);
431       break;
432     case PROP_ALLOC_PAD:
433     {
434       GstPad *pad = g_value_get_object (value);
435       GST_OBJECT_LOCK (pad);
436       if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
437         tee->allocpad = pad;
438       else
439         GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
440             " is not my pad", GST_OBJECT_NAME (pad));
441       GST_OBJECT_UNLOCK (pad);
442       break;
443     }
444     default:
445       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
446       break;
447   }
448   GST_OBJECT_UNLOCK (tee);
449 }
450
451 static void
452 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
453     GParamSpec * pspec)
454 {
455   GstTee *tee = GST_TEE (object);
456
457   GST_OBJECT_LOCK (tee);
458   switch (prop_id) {
459     case PROP_NUM_SRC_PADS:
460       g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
461       break;
462     case PROP_HAS_SINK_LOOP:
463       g_value_set_boolean (value, tee->has_sink_loop);
464       break;
465     case PROP_HAS_CHAIN:
466       g_value_set_boolean (value, tee->has_chain);
467       break;
468     case PROP_SILENT:
469       g_value_set_boolean (value, tee->silent);
470       break;
471     case PROP_LAST_MESSAGE:
472       g_value_set_string (value, tee->last_message);
473       break;
474     case PROP_PULL_MODE:
475       g_value_set_enum (value, tee->pull_mode);
476       break;
477     case PROP_ALLOC_PAD:
478       g_value_set_object (value, tee->allocpad);
479       break;
480     default:
481       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
482       break;
483   }
484   GST_OBJECT_UNLOCK (tee);
485 }
486
487 static gboolean
488 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
489 {
490   gboolean res;
491
492   switch (GST_EVENT_TYPE (event)) {
493     default:
494       res = gst_pad_event_default (pad, parent, event);
495       break;
496   }
497
498   return res;
499 }
500
501 static gboolean
502 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
503 {
504   gboolean res;
505
506   switch (GST_QUERY_TYPE (query)) {
507     default:
508       res = gst_pad_query_default (pad, parent, query);
509       break;
510   }
511   return res;
512 }
513
514 static void
515 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
516 {
517   GST_OBJECT_LOCK (tee);
518   g_free (tee->last_message);
519   if (is_list) {
520     tee->last_message =
521         g_strdup_printf ("chain-list   ******* (%s:%s)t %p",
522         GST_DEBUG_PAD_NAME (pad), data);
523   } else {
524     tee->last_message =
525         g_strdup_printf ("chain        ******* (%s:%s)t (%" G_GSIZE_FORMAT
526         " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
527         gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
528   }
529   GST_OBJECT_UNLOCK (tee);
530
531 #if !GLIB_CHECK_VERSION(2,26,0)
532   g_object_notify ((GObject *) tee, "last-message");
533 #else
534   g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
535 #endif
536 }
537
538 static GstFlowReturn
539 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
540 {
541   GstFlowReturn res;
542
543   /* Push */
544   if (pad == tee->pull_pad) {
545     /* don't push on the pad we're pulling from */
546     res = GST_FLOW_OK;
547   } else if (is_list) {
548     res =
549         gst_pad_push_list (pad,
550         gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
551   } else {
552     res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
553   }
554   return res;
555 }
556
557 static void
558 clear_pads (GstPad * pad, GstTee * tee)
559 {
560   PushData *data;
561
562   data = g_object_get_qdata ((GObject *) pad, push_data);
563
564   /* the data must be there or we have a screwed up internal state */
565   g_assert (data != NULL);
566
567   data->pushed = FALSE;
568   data->result = GST_FLOW_NOT_LINKED;
569 }
570
571 static GstFlowReturn
572 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
573 {
574   GList *pads;
575   guint32 cookie;
576   GstFlowReturn ret, cret;
577
578   if (G_UNLIKELY (!tee->silent))
579     gst_tee_do_message (tee, tee->sinkpad, data, is_list);
580
581   GST_OBJECT_LOCK (tee);
582   pads = GST_ELEMENT_CAST (tee)->srcpads;
583
584   /* special case for zero pads */
585   if (G_UNLIKELY (!pads))
586     goto no_pads;
587
588   /* special case for just one pad that avoids reffing the buffer */
589   if (!pads->next) {
590     GstPad *pad = GST_PAD_CAST (pads->data);
591
592     GST_OBJECT_UNLOCK (tee);
593
594     if (pad == tee->pull_pad) {
595       ret = GST_FLOW_OK;
596     } else if (is_list) {
597       ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
598     } else {
599       ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
600     }
601     return ret;
602   }
603
604   /* mark all pads as 'not pushed on yet' */
605   g_list_foreach (pads, (GFunc) clear_pads, tee);
606
607 restart:
608   cret = GST_FLOW_NOT_LINKED;
609   pads = GST_ELEMENT_CAST (tee)->srcpads;
610   cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
611
612   while (pads) {
613     GstPad *pad;
614     PushData *pdata;
615
616     pad = GST_PAD_CAST (pads->data);
617
618     /* get the private data, something is really wrong with the internal state
619      * when it is not there */
620     pdata = g_object_get_qdata ((GObject *) pad, push_data);
621     g_assert (pdata != NULL);
622
623     if (G_LIKELY (!pdata->pushed)) {
624       /* not yet pushed, release lock and start pushing */
625       gst_object_ref (pad);
626       GST_OBJECT_UNLOCK (tee);
627
628       GST_LOG_OBJECT (tee, "Starting to push %s %p",
629           is_list ? "list" : "buffer", data);
630
631       ret = gst_tee_do_push (tee, pad, data, is_list);
632
633       GST_LOG_OBJECT (tee, "Pushing item %p yielded result %s", data,
634           gst_flow_get_name (ret));
635
636       GST_OBJECT_LOCK (tee);
637       /* keep track of which pad we pushed and the result value. We need to do
638        * this before we release the refcount on the pad, the PushData is
639        * destroyed when the last ref of the pad goes away. */
640       pdata->pushed = TRUE;
641       pdata->result = ret;
642       gst_object_unref (pad);
643     } else {
644       /* already pushed, use previous return value */
645       ret = pdata->result;
646       GST_LOG_OBJECT (tee, "pad already pushed with %s",
647           gst_flow_get_name (ret));
648     }
649
650     /* before we go combining the return value, check if the pad list is still
651      * the same. It could be possible that the pad we just pushed was removed
652      * and the return value it not valid anymore */
653     if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
654       GST_LOG_OBJECT (tee, "pad list changed");
655       /* the list of pads changed, restart iteration. Pads that we already
656        * pushed on and are still in the new list, will not be pushed on
657        * again. */
658       goto restart;
659     }
660
661     /* stop pushing more buffers when we have a fatal error */
662     if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
663       goto error;
664
665     /* keep all other return values, overwriting the previous one. */
666     if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
667       GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
668       cret = ret;
669     }
670     pads = g_list_next (pads);
671   }
672   GST_OBJECT_UNLOCK (tee);
673
674   gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
675
676   /* no need to unset gvalue */
677   return cret;
678
679   /* ERRORS */
680 no_pads:
681   {
682     GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
683     ret = GST_FLOW_NOT_LINKED;
684     goto error;
685   }
686 error:
687   {
688     GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
689     GST_OBJECT_UNLOCK (tee);
690     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
691     return ret;
692   }
693 }
694
695 static GstFlowReturn
696 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
697 {
698   GstFlowReturn res;
699   GstTee *tee;
700
701   tee = GST_TEE_CAST (parent);
702
703   GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
704
705   res = gst_tee_handle_data (tee, buffer, FALSE);
706
707   GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
708
709   return res;
710 }
711
712 static GstFlowReturn
713 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
714 {
715   GstFlowReturn res;
716   GstTee *tee;
717
718   tee = GST_TEE_CAST (parent);
719
720   GST_DEBUG_OBJECT (tee, "received list %p", list);
721
722   res = gst_tee_handle_data (tee, list, TRUE);
723
724   GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
725
726   return res;
727 }
728
729 static gboolean
730 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
731     gboolean active)
732 {
733   gboolean res;
734   GstTee *tee;
735
736   tee = GST_TEE (parent);
737
738   switch (mode) {
739     case GST_PAD_MODE_PUSH:
740     {
741       GST_OBJECT_LOCK (tee);
742       tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
743
744       if (active && !tee->has_chain)
745         goto no_chain;
746       GST_OBJECT_UNLOCK (tee);
747       res = TRUE;
748       break;
749     }
750     default:
751       res = FALSE;
752       break;
753   }
754   return res;
755
756   /* ERRORS */
757 no_chain:
758   {
759     GST_OBJECT_UNLOCK (tee);
760     GST_INFO_OBJECT (tee,
761         "Tee cannot operate in push mode with has-chain==FALSE");
762     return FALSE;
763   }
764 }
765
766 static gboolean
767 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
768     gboolean active)
769 {
770   GstTee *tee;
771   gboolean res;
772   GstPad *sinkpad;
773
774   tee = GST_TEE (parent);
775
776   switch (mode) {
777     case GST_PAD_MODE_PULL:
778     {
779       GST_OBJECT_LOCK (tee);
780
781       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
782         goto cannot_pull;
783
784       if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
785         goto cannot_pull_multiple_srcs;
786
787       sinkpad = gst_object_ref (tee->sinkpad);
788
789       GST_OBJECT_UNLOCK (tee);
790
791       res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
792       gst_object_unref (sinkpad);
793
794       if (!res)
795         goto sink_activate_failed;
796
797       GST_OBJECT_LOCK (tee);
798       if (active) {
799         if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
800           tee->pull_pad = pad;
801       } else {
802         if (pad == tee->pull_pad)
803           tee->pull_pad = NULL;
804       }
805       tee->sink_mode = active & GST_PAD_MODE_PULL;
806       GST_OBJECT_UNLOCK (tee);
807       break;
808     }
809     default:
810       res = TRUE;
811       break;
812   }
813   return res;
814
815   /* ERRORS */
816 cannot_pull:
817   {
818     GST_OBJECT_UNLOCK (tee);
819     GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
820         "set to NEVER");
821     return FALSE;
822   }
823 cannot_pull_multiple_srcs:
824   {
825     GST_OBJECT_UNLOCK (tee);
826     GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
827         "pull-mode set to SINGLE");
828     return FALSE;
829   }
830 sink_activate_failed:
831   {
832     GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
833         active ? "" : "de");
834     return FALSE;
835   }
836 }
837
838 static gboolean
839 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
840 {
841   GstTee *tee;
842   gboolean res;
843   GstPad *sinkpad;
844
845   tee = GST_TEE (parent);
846
847   switch (GST_QUERY_TYPE (query)) {
848     case GST_QUERY_SCHEDULING:
849     {
850       gboolean pull_mode;
851
852       GST_OBJECT_LOCK (tee);
853       pull_mode = TRUE;
854       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
855         GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
856             "set to NEVER");
857         pull_mode = FALSE;
858       } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
859         GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
860             "pull-mode set to SINGLE");
861         pull_mode = FALSE;
862       }
863
864       sinkpad = gst_object_ref (tee->sinkpad);
865       GST_OBJECT_UNLOCK (tee);
866
867       if (pull_mode) {
868         /* ask peer if we can operate in pull mode */
869         res = gst_pad_peer_query (sinkpad, query);
870       } else {
871         res = TRUE;
872       }
873       gst_object_unref (sinkpad);
874       break;
875     }
876     default:
877       res = gst_pad_query_default (pad, parent, query);
878       break;
879   }
880
881   return res;
882 }
883
884 static void
885 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
886 {
887   GstPad *pad = g_value_get_object (vpad);
888
889   if (pad != tee->pull_pad)
890     gst_pad_push_event (pad, gst_event_new_eos ());
891 }
892
893 static void
894 gst_tee_pull_eos (GstTee * tee)
895 {
896   GstIterator *iter;
897
898   iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
899   gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos,
900       tee);
901   gst_iterator_free (iter);
902 }
903
904 static GstFlowReturn
905 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
906     guint length, GstBuffer ** buf)
907 {
908   GstTee *tee;
909   GstFlowReturn ret;
910
911   tee = GST_TEE (parent);
912
913   ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
914
915   if (ret == GST_FLOW_OK)
916     ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
917   else if (ret == GST_FLOW_EOS)
918     gst_tee_pull_eos (tee);
919
920   return ret;
921 }