tee: Keep another ref to our one and only srcpad around while pushing
[platform/upstream/gstreamer.git] / plugins / elements / gsttee.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4  *               2007 Wim Taymans <wim.taymans@gmail.com>
5  *
6  * gsttee.c: Tee element, one in N out
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /**
25  * SECTION:element-tee
26  * @see_also: #GstIdentity
27  *
28  * Split data to multiple pads. Branching the data flow is useful when e.g.
29  * capturing a video where the video is shown on the screen and also encoded and
30  * written to a file. Another example is playing music and hooking up a
31  * visualisation module.
32  *
33  * One needs to use separate queue elements (or a multiqueue) in each branch to
34  * provide separate threads for each branch. Otherwise a blocked dataflow in one
35  * branch would stall the other branches.
36  *
37  * <refsect2>
38  * <title>Example launch line</title>
39  * |[
40  * gst-launch filesrc location=song.ogg ! decodebin2 ! tee name=t ! queue ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
41  * ]| Play a song.ogg from local dir and render visualisations using the goom
42  * element.
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #  include "config.h"
48 #endif
49
50 #include "gsttee.h"
51 #include "gst/glib-compat-private.h"
52
53 #include <string.h>
54
55 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
56     GST_PAD_SINK,
57     GST_PAD_ALWAYS,
58     GST_STATIC_CAPS_ANY);
59
60 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
61 #define GST_CAT_DEFAULT gst_tee_debug
62
63 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
64 static GType
65 gst_tee_pull_mode_get_type (void)
66 {
67   static GType type = 0;
68   static const GEnumValue data[] = {
69     {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
70     {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
71         "single"},
72     {0, NULL, NULL},
73   };
74
75   if (!type) {
76     type = g_enum_register_static ("GstTeePullMode", data);
77   }
78   return type;
79 }
80
81 /* lock to protect request pads from being removed while downstream */
82 #define GST_TEE_DYN_LOCK(tee) g_mutex_lock (&(tee)->dyn_lock)
83 #define GST_TEE_DYN_UNLOCK(tee) g_mutex_unlock (&(tee)->dyn_lock)
84
85 #define DEFAULT_PROP_NUM_SRC_PADS       0
86 #define DEFAULT_PROP_HAS_CHAIN          TRUE
87 #define DEFAULT_PROP_SILENT             TRUE
88 #define DEFAULT_PROP_LAST_MESSAGE       NULL
89 #define DEFAULT_PULL_MODE               GST_TEE_PULL_MODE_NEVER
90
91 enum
92 {
93   PROP_0,
94   PROP_NUM_SRC_PADS,
95   PROP_HAS_CHAIN,
96   PROP_SILENT,
97   PROP_LAST_MESSAGE,
98   PROP_PULL_MODE,
99   PROP_ALLOC_PAD,
100 };
101
102 static GstStaticPadTemplate tee_src_template =
103 GST_STATIC_PAD_TEMPLATE ("src_%u",
104     GST_PAD_SRC,
105     GST_PAD_REQUEST,
106     GST_STATIC_CAPS_ANY);
107
108 #define _do_init \
109     GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
110 #define gst_tee_parent_class parent_class
111 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
112
113 static GParamSpec *pspec_last_message = NULL;
114 static GParamSpec *pspec_alloc_pad = NULL;
115
116 GType gst_tee_pad_get_type (void);
117
118 #define GST_TYPE_TEE_PAD \
119   (gst_tee_pad_get_type())
120 #define GST_TEE_PAD(obj) \
121   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
122 #define GST_TEE_PAD_CLASS(klass) \
123   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
124 #define GST_IS_TEE_PAD(obj) \
125   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
126 #define GST_IS_TEE_PAD_CLASS(klass) \
127   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
128 #define GST_TEE_PAD_CAST(obj) \
129   ((GstTeePad *)(obj))
130
131 typedef struct _GstTeePad GstTeePad;
132 typedef struct _GstTeePadClass GstTeePadClass;
133
134 struct _GstTeePad
135 {
136   GstPad parent;
137
138   gboolean pushed;
139   GstFlowReturn result;
140   gboolean removed;
141 };
142
143 struct _GstTeePadClass
144 {
145   GstPadClass parent;
146 };
147
148 G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
149
150 static void
151 gst_tee_pad_class_init (GstTeePadClass * klass)
152 {
153 }
154
155 static void
156 gst_tee_pad_reset (GstTeePad * pad)
157 {
158   pad->pushed = FALSE;
159   pad->result = GST_FLOW_NOT_LINKED;
160   pad->removed = FALSE;
161 }
162
163 static void
164 gst_tee_pad_init (GstTeePad * pad)
165 {
166   gst_tee_pad_reset (pad);
167 }
168
169 static GstPad *gst_tee_request_new_pad (GstElement * element,
170     GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
171 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
172
173 static void gst_tee_finalize (GObject * object);
174 static void gst_tee_set_property (GObject * object, guint prop_id,
175     const GValue * value, GParamSpec * pspec);
176 static void gst_tee_get_property (GObject * object, guint prop_id,
177     GValue * value, GParamSpec * pspec);
178 static void gst_tee_dispose (GObject * object);
179
180 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
181     GstBuffer * buffer);
182 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
183     GstBufferList * list);
184 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
185     GstEvent * event);
186 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
187     GstQuery * query);
188 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
189     GstPadMode mode, gboolean active);
190 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
191     GstQuery * query);
192 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
193     GstPadMode mode, gboolean active);
194 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
195     guint64 offset, guint length, GstBuffer ** buf);
196
197 static void
198 gst_tee_dispose (GObject * object)
199 {
200   GList *item;
201
202 restart:
203   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
204     GstPad *pad = GST_PAD (item->data);
205     if (GST_PAD_IS_SRC (pad)) {
206       gst_element_release_request_pad (GST_ELEMENT (object), pad);
207       goto restart;
208     }
209   }
210
211   G_OBJECT_CLASS (parent_class)->dispose (object);
212 }
213
214 static void
215 gst_tee_finalize (GObject * object)
216 {
217   GstTee *tee;
218
219   tee = GST_TEE (object);
220
221   g_free (tee->last_message);
222
223   g_mutex_clear (&tee->dyn_lock);
224
225   G_OBJECT_CLASS (parent_class)->finalize (object);
226 }
227
228 static void
229 gst_tee_class_init (GstTeeClass * klass)
230 {
231   GObjectClass *gobject_class;
232   GstElementClass *gstelement_class;
233
234   gobject_class = G_OBJECT_CLASS (klass);
235   gstelement_class = GST_ELEMENT_CLASS (klass);
236
237   gobject_class->finalize = gst_tee_finalize;
238   gobject_class->set_property = gst_tee_set_property;
239   gobject_class->get_property = gst_tee_get_property;
240   gobject_class->dispose = gst_tee_dispose;
241
242   g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
243       g_param_spec_int ("num-src-pads", "Num Src Pads",
244           "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
245           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
246   g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
247       g_param_spec_boolean ("has-chain", "Has Chain",
248           "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
249           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250   g_object_class_install_property (gobject_class, PROP_SILENT,
251       g_param_spec_boolean ("silent", "Silent",
252           "Don't produce last_message events", DEFAULT_PROP_SILENT,
253           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254   pspec_last_message = g_param_spec_string ("last-message", "Last Message",
255       "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
256       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
257   g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
258       pspec_last_message);
259   g_object_class_install_property (gobject_class, PROP_PULL_MODE,
260       g_param_spec_enum ("pull-mode", "Pull mode",
261           "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
262           DEFAULT_PULL_MODE,
263           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
264   pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
265       "The pad ALLOCATION queries will be proxied to (unused)", GST_TYPE_PAD,
266       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
267   g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
268       pspec_alloc_pad);
269
270   gst_element_class_set_static_metadata (gstelement_class,
271       "Tee pipe fitting",
272       "Generic",
273       "1-to-N pipe fitting",
274       "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
275   gst_element_class_add_pad_template (gstelement_class,
276       gst_static_pad_template_get (&sinktemplate));
277   gst_element_class_add_pad_template (gstelement_class,
278       gst_static_pad_template_get (&tee_src_template));
279
280   gstelement_class->request_new_pad =
281       GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
282   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
283 }
284
285 static void
286 gst_tee_init (GstTee * tee)
287 {
288   g_mutex_init (&tee->dyn_lock);
289
290   tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
291   tee->sink_mode = GST_PAD_MODE_NONE;
292
293   gst_pad_set_event_function (tee->sinkpad,
294       GST_DEBUG_FUNCPTR (gst_tee_sink_event));
295   gst_pad_set_query_function (tee->sinkpad,
296       GST_DEBUG_FUNCPTR (gst_tee_sink_query));
297   gst_pad_set_activatemode_function (tee->sinkpad,
298       GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
299   gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
300   gst_pad_set_chain_list_function (tee->sinkpad,
301       GST_DEBUG_FUNCPTR (gst_tee_chain_list));
302   GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
303   gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
304
305   tee->last_message = NULL;
306 }
307
308 static void
309 gst_tee_notify_alloc_pad (GstTee * tee)
310 {
311   g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
312 }
313
314 static gboolean
315 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
316 {
317   GstPad *srcpad = GST_PAD_CAST (user_data);
318
319   gst_pad_push_event (srcpad, gst_event_ref (*event));
320
321   return TRUE;
322 }
323
324 static GstPad *
325 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
326     const gchar * unused, const GstCaps * caps)
327 {
328   gchar *name;
329   GstPad *srcpad;
330   GstTee *tee;
331   GstPadMode mode;
332   gboolean res;
333
334   tee = GST_TEE (element);
335
336   GST_DEBUG_OBJECT (tee, "requesting pad");
337
338   GST_OBJECT_LOCK (tee);
339   name = g_strdup_printf ("src_%u", tee->pad_counter++);
340
341   srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
342           "name", name, "direction", templ->direction, "template", templ,
343           NULL));
344   g_free (name);
345
346   mode = tee->sink_mode;
347
348   GST_OBJECT_UNLOCK (tee);
349
350   switch (mode) {
351     case GST_PAD_MODE_PULL:
352       /* we already have a src pad in pull mode, and our pull mode can only be
353          SINGLE, so fall through to activate this new pad in push mode */
354     case GST_PAD_MODE_PUSH:
355       res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
356       break;
357     default:
358       res = TRUE;
359       break;
360   }
361
362   if (!res)
363     goto activate_failed;
364
365   gst_pad_set_activatemode_function (srcpad,
366       GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
367   gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
368   gst_pad_set_getrange_function (srcpad,
369       GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
370   /* Forward sticky events to the new srcpad */
371   gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
372   GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
373   gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
374
375   return srcpad;
376
377   /* ERRORS */
378 activate_failed:
379   {
380     gboolean changed = FALSE;
381
382     GST_OBJECT_LOCK (tee);
383     GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
384     if (tee->allocpad == srcpad) {
385       tee->allocpad = NULL;
386       changed = TRUE;
387     }
388     GST_OBJECT_UNLOCK (tee);
389     gst_object_unref (srcpad);
390     if (changed) {
391       gst_tee_notify_alloc_pad (tee);
392     }
393     return NULL;
394   }
395 }
396
397 static void
398 gst_tee_release_pad (GstElement * element, GstPad * pad)
399 {
400   GstTee *tee;
401   gboolean changed = FALSE;
402
403   tee = GST_TEE (element);
404
405   GST_DEBUG_OBJECT (tee, "releasing pad");
406
407   /* wait for pending pad_alloc to finish */
408   GST_TEE_DYN_LOCK (tee);
409   GST_OBJECT_LOCK (tee);
410   /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
411   GST_TEE_PAD_CAST (pad)->removed = TRUE;
412   if (tee->allocpad == pad) {
413     tee->allocpad = NULL;
414     changed = TRUE;
415   }
416   GST_OBJECT_UNLOCK (tee);
417
418   gst_object_ref (pad);
419   gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
420
421   gst_pad_set_active (pad, FALSE);
422   GST_TEE_DYN_UNLOCK (tee);
423
424   gst_object_unref (pad);
425
426   if (changed) {
427     gst_tee_notify_alloc_pad (tee);
428   }
429 }
430
431 static void
432 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
433     GParamSpec * pspec)
434 {
435   GstTee *tee = GST_TEE (object);
436
437   GST_OBJECT_LOCK (tee);
438   switch (prop_id) {
439     case PROP_HAS_CHAIN:
440       tee->has_chain = g_value_get_boolean (value);
441       break;
442     case PROP_SILENT:
443       tee->silent = g_value_get_boolean (value);
444       break;
445     case PROP_PULL_MODE:
446       tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
447       break;
448     case PROP_ALLOC_PAD:
449     {
450       GstPad *pad = g_value_get_object (value);
451       GST_OBJECT_LOCK (pad);
452       if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
453         tee->allocpad = pad;
454       else
455         GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
456             " is not my pad", GST_OBJECT_NAME (pad));
457       GST_OBJECT_UNLOCK (pad);
458       break;
459     }
460     default:
461       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
462       break;
463   }
464   GST_OBJECT_UNLOCK (tee);
465 }
466
467 static void
468 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
469     GParamSpec * pspec)
470 {
471   GstTee *tee = GST_TEE (object);
472
473   GST_OBJECT_LOCK (tee);
474   switch (prop_id) {
475     case PROP_NUM_SRC_PADS:
476       g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
477       break;
478     case PROP_HAS_CHAIN:
479       g_value_set_boolean (value, tee->has_chain);
480       break;
481     case PROP_SILENT:
482       g_value_set_boolean (value, tee->silent);
483       break;
484     case PROP_LAST_MESSAGE:
485       g_value_set_string (value, tee->last_message);
486       break;
487     case PROP_PULL_MODE:
488       g_value_set_enum (value, tee->pull_mode);
489       break;
490     case PROP_ALLOC_PAD:
491       g_value_set_object (value, tee->allocpad);
492       break;
493     default:
494       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
495       break;
496   }
497   GST_OBJECT_UNLOCK (tee);
498 }
499
500 static gboolean
501 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
502 {
503   gboolean res;
504
505   switch (GST_EVENT_TYPE (event)) {
506     default:
507       res = gst_pad_event_default (pad, parent, event);
508       break;
509   }
510
511   return res;
512 }
513
514 static gboolean
515 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
516 {
517   gboolean res;
518
519   switch (GST_QUERY_TYPE (query)) {
520     default:
521       res = gst_pad_query_default (pad, parent, query);
522       break;
523   }
524   return res;
525 }
526
527 static void
528 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
529 {
530   GST_OBJECT_LOCK (tee);
531   g_free (tee->last_message);
532   if (is_list) {
533     tee->last_message =
534         g_strdup_printf ("chain-list   ******* (%s:%s)t %p",
535         GST_DEBUG_PAD_NAME (pad), data);
536   } else {
537     tee->last_message =
538         g_strdup_printf ("chain        ******* (%s:%s)t (%" G_GSIZE_FORMAT
539         " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
540         gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
541   }
542   GST_OBJECT_UNLOCK (tee);
543
544   g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
545 }
546
547 static GstFlowReturn
548 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
549 {
550   GstFlowReturn res;
551
552   /* Push */
553   if (pad == tee->pull_pad) {
554     /* don't push on the pad we're pulling from */
555     res = GST_FLOW_OK;
556   } else if (is_list) {
557     res =
558         gst_pad_push_list (pad,
559         gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
560   } else {
561     res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
562   }
563   return res;
564 }
565
566 static void
567 clear_pads (GstPad * pad, GstTee * tee)
568 {
569   GST_TEE_PAD_CAST (pad)->pushed = FALSE;
570   GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
571 }
572
573 static GstFlowReturn
574 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
575 {
576   GList *pads;
577   guint32 cookie;
578   GstFlowReturn ret, cret;
579
580   if (G_UNLIKELY (!tee->silent))
581     gst_tee_do_message (tee, tee->sinkpad, data, is_list);
582
583   GST_OBJECT_LOCK (tee);
584   pads = GST_ELEMENT_CAST (tee)->srcpads;
585
586   /* special case for zero pads */
587   if (G_UNLIKELY (!pads))
588     goto no_pads;
589
590   /* special case for just one pad that avoids reffing the buffer */
591   if (!pads->next) {
592     GstPad *pad = GST_PAD_CAST (pads->data);
593
594     /* Keep another ref around, a pad probe
595      * might release and destroy the pad */
596     gst_object_ref (pad);
597     GST_OBJECT_UNLOCK (tee);
598
599     if (pad == tee->pull_pad) {
600       ret = GST_FLOW_OK;
601     } else if (is_list) {
602       ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
603     } else {
604       ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
605     }
606
607     gst_object_unref (pad);
608
609     return ret;
610   }
611
612   /* mark all pads as 'not pushed on yet' */
613   g_list_foreach (pads, (GFunc) clear_pads, tee);
614
615 restart:
616   cret = GST_FLOW_NOT_LINKED;
617   pads = GST_ELEMENT_CAST (tee)->srcpads;
618   cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
619
620   while (pads) {
621     GstPad *pad;
622
623     pad = GST_PAD_CAST (pads->data);
624
625     if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
626       /* not yet pushed, release lock and start pushing */
627       gst_object_ref (pad);
628       GST_OBJECT_UNLOCK (tee);
629
630       GST_LOG_OBJECT (tee, "Starting to push %s %p",
631           is_list ? "list" : "buffer", data);
632
633       ret = gst_tee_do_push (tee, pad, data, is_list);
634
635       GST_LOG_OBJECT (tee, "Pushing item %p yielded result %s", data,
636           gst_flow_get_name (ret));
637
638       GST_OBJECT_LOCK (tee);
639       /* keep track of which pad we pushed and the result value */
640       GST_TEE_PAD_CAST (pad)->pushed = TRUE;
641       GST_TEE_PAD_CAST (pad)->result = ret;
642       gst_object_unref (pad);
643     } else {
644       /* already pushed, use previous return value */
645       ret = GST_TEE_PAD_CAST (pad)->result;
646       GST_LOG_OBJECT (tee, "pad already pushed with %s",
647           gst_flow_get_name (ret));
648     }
649
650     /* before we go combining the return value, check if the pad list is still
651      * the same. It could be possible that the pad we just pushed was removed
652      * and the return value it not valid anymore */
653     if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
654       GST_LOG_OBJECT (tee, "pad list changed");
655       /* the list of pads changed, restart iteration. Pads that we already
656        * pushed on and are still in the new list, will not be pushed on
657        * again. */
658       goto restart;
659     }
660
661     /* stop pushing more buffers when we have a fatal error */
662     if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
663       goto error;
664
665     /* keep all other return values, overwriting the previous one. */
666     if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
667       GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
668       cret = ret;
669     }
670     pads = g_list_next (pads);
671   }
672   GST_OBJECT_UNLOCK (tee);
673
674   gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
675
676   /* no need to unset gvalue */
677   return cret;
678
679   /* ERRORS */
680 no_pads:
681   {
682     GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
683     ret = GST_FLOW_NOT_LINKED;
684     goto error;
685   }
686 error:
687   {
688     GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
689     GST_OBJECT_UNLOCK (tee);
690     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
691     return ret;
692   }
693 }
694
695 static GstFlowReturn
696 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
697 {
698   GstFlowReturn res;
699   GstTee *tee;
700
701   tee = GST_TEE_CAST (parent);
702
703   GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
704
705   res = gst_tee_handle_data (tee, buffer, FALSE);
706
707   GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
708
709   return res;
710 }
711
712 static GstFlowReturn
713 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
714 {
715   GstFlowReturn res;
716   GstTee *tee;
717
718   tee = GST_TEE_CAST (parent);
719
720   GST_DEBUG_OBJECT (tee, "received list %p", list);
721
722   res = gst_tee_handle_data (tee, list, TRUE);
723
724   GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
725
726   return res;
727 }
728
729 static gboolean
730 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
731     gboolean active)
732 {
733   gboolean res;
734   GstTee *tee;
735
736   tee = GST_TEE (parent);
737
738   switch (mode) {
739     case GST_PAD_MODE_PUSH:
740     {
741       GST_OBJECT_LOCK (tee);
742       tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
743
744       if (active && !tee->has_chain)
745         goto no_chain;
746       GST_OBJECT_UNLOCK (tee);
747       res = TRUE;
748       break;
749     }
750     default:
751       res = FALSE;
752       break;
753   }
754   return res;
755
756   /* ERRORS */
757 no_chain:
758   {
759     GST_OBJECT_UNLOCK (tee);
760     GST_INFO_OBJECT (tee,
761         "Tee cannot operate in push mode with has-chain==FALSE");
762     return FALSE;
763   }
764 }
765
766 static gboolean
767 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
768     gboolean active)
769 {
770   GstTee *tee;
771   gboolean res;
772   GstPad *sinkpad;
773
774   tee = GST_TEE (parent);
775
776   switch (mode) {
777     case GST_PAD_MODE_PULL:
778     {
779       GST_OBJECT_LOCK (tee);
780
781       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
782         goto cannot_pull;
783
784       if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
785         goto cannot_pull_multiple_srcs;
786
787       sinkpad = gst_object_ref (tee->sinkpad);
788
789       GST_OBJECT_UNLOCK (tee);
790
791       res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
792       gst_object_unref (sinkpad);
793
794       if (!res)
795         goto sink_activate_failed;
796
797       GST_OBJECT_LOCK (tee);
798       if (active) {
799         if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
800           tee->pull_pad = pad;
801       } else {
802         if (pad == tee->pull_pad)
803           tee->pull_pad = NULL;
804       }
805       tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
806       GST_OBJECT_UNLOCK (tee);
807       break;
808     }
809     default:
810       res = TRUE;
811       break;
812   }
813
814   return res;
815
816   /* ERRORS */
817 cannot_pull:
818   {
819     GST_OBJECT_UNLOCK (tee);
820     GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
821         "set to NEVER");
822     return FALSE;
823   }
824 cannot_pull_multiple_srcs:
825   {
826     GST_OBJECT_UNLOCK (tee);
827     GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
828         "pull-mode set to SINGLE");
829     return FALSE;
830   }
831 sink_activate_failed:
832   {
833     GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
834         active ? "" : "de");
835     return FALSE;
836   }
837 }
838
839 static gboolean
840 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
841 {
842   GstTee *tee;
843   gboolean res;
844   GstPad *sinkpad;
845
846   tee = GST_TEE (parent);
847
848   switch (GST_QUERY_TYPE (query)) {
849     case GST_QUERY_SCHEDULING:
850     {
851       gboolean pull_mode;
852
853       GST_OBJECT_LOCK (tee);
854       pull_mode = TRUE;
855       if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
856         GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
857             "set to NEVER");
858         pull_mode = FALSE;
859       } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
860         GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
861             "pull-mode set to SINGLE");
862         pull_mode = FALSE;
863       }
864
865       sinkpad = gst_object_ref (tee->sinkpad);
866       GST_OBJECT_UNLOCK (tee);
867
868       if (pull_mode) {
869         /* ask peer if we can operate in pull mode */
870         res = gst_pad_peer_query (sinkpad, query);
871       } else {
872         res = TRUE;
873       }
874       gst_object_unref (sinkpad);
875       break;
876     }
877     default:
878       res = gst_pad_query_default (pad, parent, query);
879       break;
880   }
881
882   return res;
883 }
884
885 static void
886 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
887 {
888   GstPad *pad = g_value_get_object (vpad);
889
890   if (pad != tee->pull_pad)
891     gst_pad_push_event (pad, gst_event_new_eos ());
892 }
893
894 static void
895 gst_tee_pull_eos (GstTee * tee)
896 {
897   GstIterator *iter;
898
899   iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
900   gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos,
901       tee);
902   gst_iterator_free (iter);
903 }
904
905 static GstFlowReturn
906 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
907     guint length, GstBuffer ** buf)
908 {
909   GstTee *tee;
910   GstFlowReturn ret;
911
912   tee = GST_TEE (parent);
913
914   ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
915
916   if (ret == GST_FLOW_OK)
917     ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
918   else if (ret == GST_FLOW_EOS)
919     gst_tee_pull_eos (tee);
920
921   return ret;
922 }