2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4 * 2007 Wim Taymans <wim.taymans@gmail.com>
6 * gsttee.c: Tee element, one in N out
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
27 * @see_also: #GstIdentity
29 * Split data to multiple pads. Branching the data flow is useful when e.g.
30 * capturing a video where the video is shown on the screen and also encoded and
31 * written to a file. Another example is playing music and hooking up a
32 * visualisation module.
34 * One needs to use separate queue elements (or a multiqueue) in each branch to
35 * provide separate threads for each branch. Otherwise a blocked dataflow in one
36 * branch would stall the other branches.
38 * ## Example launch line
40 * gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
43 * Play song.ogg audio file which must be in the current working directory
44 * and render visualisations using the goom element (this can be easier done
45 * using the playbin element, this is just an example pipeline).
53 #include "gst/glib-compat-private.h"
58 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
63 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
64 #define GST_CAT_DEFAULT gst_tee_debug
66 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
68 gst_tee_pull_mode_get_type (void)
70 static GType type = 0;
71 static const GEnumValue data[] = {
72 {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
73 {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
79 type = g_enum_register_static ("GstTeePullMode", data);
84 #define DEFAULT_PROP_NUM_SRC_PADS 0
85 #define DEFAULT_PROP_HAS_CHAIN TRUE
86 #define DEFAULT_PROP_SILENT TRUE
87 #define DEFAULT_PROP_LAST_MESSAGE NULL
88 #define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER
89 #define DEFAULT_PROP_ALLOW_NOT_LINKED FALSE
100 PROP_ALLOW_NOT_LINKED,
103 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
106 GST_STATIC_CAPS_ANY);
109 GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
110 #define gst_tee_parent_class parent_class
111 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
113 static GParamSpec *pspec_last_message = NULL;
114 static GParamSpec *pspec_alloc_pad = NULL;
116 GType gst_tee_pad_get_type (void);
118 #define GST_TYPE_TEE_PAD \
119 (gst_tee_pad_get_type())
120 #define GST_TEE_PAD(obj) \
121 (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
122 #define GST_TEE_PAD_CLASS(klass) \
123 (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
124 #define GST_IS_TEE_PAD(obj) \
125 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
126 #define GST_IS_TEE_PAD_CLASS(klass) \
127 (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
128 #define GST_TEE_PAD_CAST(obj) \
131 typedef struct _GstTeePad GstTeePad;
132 typedef struct _GstTeePadClass GstTeePadClass;
140 GstFlowReturn result;
144 struct _GstTeePadClass
149 G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
152 gst_tee_pad_class_init (GstTeePadClass * klass)
157 gst_tee_pad_reset (GstTeePad * pad)
160 pad->result = GST_FLOW_NOT_LINKED;
161 pad->removed = FALSE;
165 gst_tee_pad_init (GstTeePad * pad)
167 gst_tee_pad_reset (pad);
170 static GstPad *gst_tee_request_new_pad (GstElement * element,
171 GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
172 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
174 static void gst_tee_finalize (GObject * object);
175 static void gst_tee_set_property (GObject * object, guint prop_id,
176 const GValue * value, GParamSpec * pspec);
177 static void gst_tee_get_property (GObject * object, guint prop_id,
178 GValue * value, GParamSpec * pspec);
179 static void gst_tee_dispose (GObject * object);
181 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
183 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
184 GstBufferList * list);
185 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
187 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
189 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
190 GstPadMode mode, gboolean active);
191 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
193 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
194 GstPadMode mode, gboolean active);
195 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
196 guint64 offset, guint length, GstBuffer ** buf);
199 gst_tee_dispose (GObject * object)
204 for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
205 GstPad *pad = GST_PAD (item->data);
206 if (GST_PAD_IS_SRC (pad)) {
207 gst_element_release_request_pad (GST_ELEMENT (object), pad);
212 G_OBJECT_CLASS (parent_class)->dispose (object);
216 gst_tee_finalize (GObject * object)
220 tee = GST_TEE (object);
222 g_hash_table_unref (tee->pad_indexes);
224 g_free (tee->last_message);
226 G_OBJECT_CLASS (parent_class)->finalize (object);
230 gst_tee_class_init (GstTeeClass * klass)
232 GObjectClass *gobject_class;
233 GstElementClass *gstelement_class;
235 gobject_class = G_OBJECT_CLASS (klass);
236 gstelement_class = GST_ELEMENT_CLASS (klass);
238 gobject_class->finalize = gst_tee_finalize;
239 gobject_class->set_property = gst_tee_set_property;
240 gobject_class->get_property = gst_tee_get_property;
241 gobject_class->dispose = gst_tee_dispose;
243 g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
244 g_param_spec_int ("num-src-pads", "Num Src Pads",
245 "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
246 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
247 g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
248 g_param_spec_boolean ("has-chain", "Has Chain",
249 "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
250 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 g_object_class_install_property (gobject_class, PROP_SILENT,
252 g_param_spec_boolean ("silent", "Silent",
253 "Don't produce last_message events", DEFAULT_PROP_SILENT,
254 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
255 pspec_last_message = g_param_spec_string ("last-message", "Last Message",
256 "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
257 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
258 g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
260 g_object_class_install_property (gobject_class, PROP_PULL_MODE,
261 g_param_spec_enum ("pull-mode", "Pull mode",
262 "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
264 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
266 "The pad ALLOCATION queries will be proxied to (DEPRECATED, has no effect)",
268 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED);
269 g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
273 * GstTee:allow-not-linked
275 * This property makes sink pad return GST_FLOW_OK even if there are no
276 * source pads or any of them is linked.
278 * This is useful to avoid errors when you have a dynamic pipeline and during
279 * a reconnection you can have all the pads unlinked or removed.
283 g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED,
284 g_param_spec_boolean ("allow-not-linked", "Allow not linked",
285 "Return GST_FLOW_OK even if there are no source pads or they are "
286 "all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED,
287 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289 gst_element_class_set_static_metadata (gstelement_class,
292 "1-to-N pipe fitting",
293 "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
294 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
295 gst_element_class_add_static_pad_template (gstelement_class, &src_template);
297 gstelement_class->request_new_pad =
298 GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
299 gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
303 gst_tee_init (GstTee * tee)
305 tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
306 tee->sink_mode = GST_PAD_MODE_NONE;
308 gst_pad_set_event_function (tee->sinkpad,
309 GST_DEBUG_FUNCPTR (gst_tee_sink_event));
310 gst_pad_set_query_function (tee->sinkpad,
311 GST_DEBUG_FUNCPTR (gst_tee_sink_query));
312 gst_pad_set_activatemode_function (tee->sinkpad,
313 GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
314 gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
315 gst_pad_set_chain_list_function (tee->sinkpad,
316 GST_DEBUG_FUNCPTR (gst_tee_chain_list));
317 GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
318 gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
320 tee->pad_indexes = g_hash_table_new (NULL, NULL);
322 tee->last_message = NULL;
326 gst_tee_notify_alloc_pad (GstTee * tee)
328 g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
332 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
334 GstPad *srcpad = GST_PAD_CAST (user_data);
337 ret = gst_pad_store_sticky_event (srcpad, *event);
338 if (ret != GST_FLOW_OK) {
339 GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
340 GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
347 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
348 const gchar * name_templ, const GstCaps * caps)
357 tee = GST_TEE (element);
359 GST_DEBUG_OBJECT (tee, "requesting pad");
361 GST_OBJECT_LOCK (tee);
363 if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) {
364 GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index);
365 if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) {
366 GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ);
367 GST_OBJECT_UNLOCK (tee);
370 if (index >= tee->next_pad_index)
371 tee->next_pad_index = index + 1;
373 index = tee->next_pad_index;
375 while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index)))
378 tee->next_pad_index = index + 1;
381 g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL);
383 name = g_strdup_printf ("src_%u", index);
385 srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
386 "name", name, "direction", templ->direction, "template", templ,
388 GST_TEE_PAD_CAST (srcpad)->index = index;
391 mode = tee->sink_mode;
393 GST_OBJECT_UNLOCK (tee);
396 case GST_PAD_MODE_PULL:
397 /* we already have a src pad in pull mode, and our pull mode can only be
398 SINGLE, so fall through to activate this new pad in push mode */
399 case GST_PAD_MODE_PUSH:
400 res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
408 goto activate_failed;
410 gst_pad_set_activatemode_function (srcpad,
411 GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
412 gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
413 gst_pad_set_getrange_function (srcpad,
414 GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
415 GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
416 /* Forward sticky events to the new srcpad */
417 gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
418 gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
425 gboolean changed = FALSE;
427 GST_OBJECT_LOCK (tee);
428 GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
429 if (tee->allocpad == srcpad) {
430 tee->allocpad = NULL;
433 GST_OBJECT_UNLOCK (tee);
434 gst_object_unref (srcpad);
436 gst_tee_notify_alloc_pad (tee);
443 gst_tee_release_pad (GstElement * element, GstPad * pad)
446 gboolean changed = FALSE;
449 tee = GST_TEE (element);
451 GST_DEBUG_OBJECT (tee, "releasing pad");
453 GST_OBJECT_LOCK (tee);
454 index = GST_TEE_PAD_CAST (pad)->index;
455 /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
456 GST_TEE_PAD_CAST (pad)->removed = TRUE;
457 if (tee->allocpad == pad) {
458 tee->allocpad = NULL;
461 GST_OBJECT_UNLOCK (tee);
463 gst_object_ref (pad);
464 gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
466 gst_pad_set_active (pad, FALSE);
468 gst_object_unref (pad);
471 gst_tee_notify_alloc_pad (tee);
474 GST_OBJECT_LOCK (tee);
475 g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index));
476 GST_OBJECT_UNLOCK (tee);
480 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
483 GstTee *tee = GST_TEE (object);
485 GST_OBJECT_LOCK (tee);
488 tee->has_chain = g_value_get_boolean (value);
491 tee->silent = g_value_get_boolean (value);
494 tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
498 GstPad *pad = g_value_get_object (value);
499 GST_OBJECT_LOCK (pad);
500 if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
503 GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
504 " is not my pad", GST_OBJECT_NAME (pad));
505 GST_OBJECT_UNLOCK (pad);
508 case PROP_ALLOW_NOT_LINKED:
509 tee->allow_not_linked = g_value_get_boolean (value);
512 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
515 GST_OBJECT_UNLOCK (tee);
519 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
522 GstTee *tee = GST_TEE (object);
524 GST_OBJECT_LOCK (tee);
526 case PROP_NUM_SRC_PADS:
527 g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
530 g_value_set_boolean (value, tee->has_chain);
533 g_value_set_boolean (value, tee->silent);
535 case PROP_LAST_MESSAGE:
536 g_value_set_string (value, tee->last_message);
539 g_value_set_enum (value, tee->pull_mode);
542 g_value_set_object (value, tee->allocpad);
544 case PROP_ALLOW_NOT_LINKED:
545 g_value_set_boolean (value, tee->allow_not_linked);
548 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
551 GST_OBJECT_UNLOCK (tee);
555 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
559 switch (GST_EVENT_TYPE (event)) {
561 res = gst_pad_event_default (pad, parent, event);
572 GstAllocationParams params;
575 gboolean first_query;
579 /* This function will aggregate some of the allocation query information with
580 * the strategy to force upstream allocation. Depending on downstream
581 * allocation would otherwise make dynamic pipelines much more complicated as
582 * application would need to now drain buffer in certain cases before getting
583 * rid of a tee branch. */
585 gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data)
587 struct AllocQueryCtx *ctx = user_data;
588 GstPad *src_pad = g_value_get_object (item);
592 guint count, i, size, min;
594 GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s",
595 GST_DEBUG_PAD_NAME (src_pad));
597 peer_pad = gst_pad_get_peer (src_pad);
599 if (ctx->tee->allow_not_linked) {
600 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.",
601 GST_DEBUG_PAD_NAME (src_pad));
604 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.",
605 GST_DEBUG_PAD_NAME (src_pad));
606 g_value_set_boolean (ret, FALSE);
611 gst_query_parse_allocation (ctx->query, &caps, NULL);
613 query = gst_query_new_allocation (caps, FALSE);
614 if (!gst_pad_query (peer_pad, query)) {
615 GST_DEBUG_OBJECT (ctx->tee,
616 "Allocation query failed on pad %s, ignoring allocation",
617 GST_PAD_NAME (src_pad));
618 g_value_set_boolean (ret, FALSE);
619 gst_query_unref (query);
620 gst_object_unref (peer_pad);
624 gst_object_unref (peer_pad);
626 /* Allocation Params:
627 * store the maximum alignment, prefix and pading, but ignore the
628 * allocators and the flags which are tied to downstream allocation */
629 count = gst_query_get_n_allocation_params (query);
630 for (i = 0; i < count; i++) {
631 GstAllocationParams params = { 0, };
633 gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms);
635 GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%"
636 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
637 G_GSIZE_FORMAT, params.align, params.prefix, params.padding);
639 if (ctx->params.align < params.align)
640 ctx->params.align = params.align;
642 if (ctx->params.prefix < params.prefix)
643 ctx->params.prefix = params.prefix;
645 if (ctx->params.padding < params.padding)
646 ctx->params.padding = params.padding;
650 * We want to keep the biggest size and biggest minimum number of buffers to
651 * make sure downstream requirement can be satisfied. We don't really care
652 * about the maximum, as this is a parameter of the downstream provided
653 * pool. We only read the first allocation pool as the minimum number of
654 * buffers is normally constant regardless of the pool being used. */
655 if (gst_query_get_n_allocation_pools (query) > 0) {
656 gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL);
658 GST_DEBUG_OBJECT (ctx->tee,
659 "Aggregating allocation pool size=%u min_buffers=%u", size, min);
661 if (ctx->size < size)
664 if (ctx->min_buffers < min)
665 ctx->min_buffers = min;
669 * For allocation meta, we'll need to aggregate the argument using the new
670 * GstMetaInfo::agggregate_func */
671 count = gst_query_get_n_allocation_metas (query);
672 for (i = 0; i < count; i++) {
675 const GstStructure *param;
677 api = gst_query_parse_nth_allocation_meta (query, i, ¶m);
679 /* For the first query, copy all metas */
680 if (ctx->first_query) {
681 gst_query_add_allocation_meta (ctx->query, api, param);
685 /* Afterward, aggregate the common params */
686 if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) {
687 const GstStructure *ctx_param;
689 gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param);
691 /* Keep meta which has no params */
692 if (ctx_param == NULL && param == NULL)
695 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
697 gst_query_remove_nth_allocation_meta (ctx->query, ctx_index);
701 /* Finally, cleanup metas from the stored query that aren't support on this
703 count = gst_query_get_n_allocation_metas (ctx->query);
704 for (i = 0; i < count;) {
705 GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL);
707 if (!gst_query_find_allocation_meta (query, api, NULL)) {
708 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
710 gst_query_remove_nth_allocation_meta (ctx->query, i);
718 ctx->first_query = FALSE;
720 gst_query_unref (query);
727 gst_tee_clear_query_allocation_meta (GstQuery * query)
729 guint count = gst_query_get_n_allocation_metas (query);
732 for (i = 1; i <= count; i++)
733 gst_query_remove_nth_allocation_meta (query, count - i);
737 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
739 GstTee *tee = GST_TEE (parent);
742 switch (GST_QUERY_TYPE (query)) {
743 case GST_QUERY_ALLOCATION:
746 GValue ret = G_VALUE_INIT;
747 struct AllocQueryCtx ctx = { tee, query, };
749 g_value_init (&ret, G_TYPE_BOOLEAN);
750 g_value_set_boolean (&ret, TRUE);
752 ctx.first_query = TRUE;
753 gst_allocation_params_init (&ctx.params);
755 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
756 while (GST_ITERATOR_RESYNC ==
757 gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) {
758 gst_iterator_resync (iter);
759 ctx.first_query = TRUE;
760 gst_allocation_params_init (&ctx.params);
764 gst_tee_clear_query_allocation_meta (query);
767 gst_iterator_free (iter);
768 res = g_value_get_boolean (&ret);
769 g_value_unset (&ret);
772 GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%"
773 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
774 G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix,
777 GST_DEBUG_OBJECT (tee,
778 "Aggregated allocation pools size=%u min_buffers=%u", ctx.size,
781 #ifndef GST_DISABLE_GST_DEBUG
783 guint count = gst_query_get_n_allocation_metas (query);
786 GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count);
788 for (i = 0; i < count; i++)
789 GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s",
790 g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i,
795 /* Allocate one more buffers when multiplexing so we don't starve the
796 * downstream threads. */
797 if (ctx.num_pads > 1)
800 /* Check that we actually have parameters besides the defaults. */
801 if (ctx.params.align || ctx.params.prefix || ctx.params.padding) {
802 gst_query_add_allocation_param (ctx.query, NULL, &ctx.params);
804 /* When size == 0, buffers created from this pool would have no memory
807 gst_query_add_allocation_pool (ctx.query, NULL, ctx.size,
811 gst_tee_clear_query_allocation_meta (query);
816 res = gst_pad_query_default (pad, parent, query);
823 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
825 GST_OBJECT_LOCK (tee);
826 g_free (tee->last_message);
829 g_strdup_printf ("chain-list ******* (%s:%s)t %p",
830 GST_DEBUG_PAD_NAME (pad), data);
833 g_strdup_printf ("chain ******* (%s:%s)t (%" G_GSIZE_FORMAT
834 " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
835 gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
837 GST_OBJECT_UNLOCK (tee);
839 g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
843 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
848 if (pad == tee->pull_pad) {
849 /* don't push on the pad we're pulling from */
851 } else if (is_list) {
853 gst_pad_push_list (pad,
854 gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
856 res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
862 clear_pads (GstPad * pad, GstTee * tee)
864 GST_TEE_PAD_CAST (pad)->pushed = FALSE;
865 GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
869 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
873 GstFlowReturn ret, cret;
875 if (G_UNLIKELY (!tee->silent))
876 gst_tee_do_message (tee, tee->sinkpad, data, is_list);
878 GST_OBJECT_LOCK (tee);
879 pads = GST_ELEMENT_CAST (tee)->srcpads;
881 /* special case for zero pads */
882 if (G_UNLIKELY (!pads))
885 /* special case for just one pad that avoids reffing the buffer */
887 GstPad *pad = GST_PAD_CAST (pads->data);
889 /* Keep another ref around, a pad probe
890 * might release and destroy the pad */
891 gst_object_ref (pad);
892 GST_OBJECT_UNLOCK (tee);
894 if (pad == tee->pull_pad) {
896 } else if (is_list) {
897 ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
899 ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
902 if (GST_TEE_PAD_CAST (pad)->removed)
903 ret = GST_FLOW_NOT_LINKED;
905 if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) {
909 gst_object_unref (pad);
914 /* mark all pads as 'not pushed on yet' */
915 g_list_foreach (pads, (GFunc) clear_pads, tee);
918 if (tee->allow_not_linked) {
921 cret = GST_FLOW_NOT_LINKED;
923 pads = GST_ELEMENT_CAST (tee)->srcpads;
924 cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
929 pad = GST_PAD_CAST (pads->data);
931 if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
932 /* not yet pushed, release lock and start pushing */
933 gst_object_ref (pad);
934 GST_OBJECT_UNLOCK (tee);
936 GST_LOG_OBJECT (pad, "Starting to push %s %p",
937 is_list ? "list" : "buffer", data);
939 ret = gst_tee_do_push (tee, pad, data, is_list);
941 GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data,
942 gst_flow_get_name (ret));
944 GST_OBJECT_LOCK (tee);
945 /* keep track of which pad we pushed and the result value */
946 GST_TEE_PAD_CAST (pad)->pushed = TRUE;
947 GST_TEE_PAD_CAST (pad)->result = ret;
948 gst_object_unref (pad);
951 /* already pushed, use previous return value */
952 ret = GST_TEE_PAD_CAST (pad)->result;
953 GST_LOG_OBJECT (pad, "pad already pushed with %s",
954 gst_flow_get_name (ret));
957 /* before we go combining the return value, check if the pad list is still
958 * the same. It could be possible that the pad we just pushed was removed
959 * and the return value it not valid anymore */
960 if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
961 GST_LOG_OBJECT (tee, "pad list changed");
962 /* the list of pads changed, restart iteration. Pads that we already
963 * pushed on and are still in the new list, will not be pushed on
968 /* stop pushing more buffers when we have a fatal error */
969 if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
972 /* keep all other return values, overwriting the previous one. */
973 if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
974 GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
977 pads = g_list_next (pads);
979 GST_OBJECT_UNLOCK (tee);
981 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
983 /* no need to unset gvalue */
989 if (tee->allow_not_linked) {
990 GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s",
991 is_list ? "buffer-list" : "buffer");
994 GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
995 ret = GST_FLOW_NOT_LINKED;
1001 GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
1006 GST_OBJECT_UNLOCK (tee);
1007 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1012 static GstFlowReturn
1013 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1018 tee = GST_TEE_CAST (parent);
1020 GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
1022 res = gst_tee_handle_data (tee, buffer, FALSE);
1024 GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
1029 static GstFlowReturn
1030 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
1035 tee = GST_TEE_CAST (parent);
1037 GST_DEBUG_OBJECT (tee, "received list %p", list);
1039 res = gst_tee_handle_data (tee, list, TRUE);
1041 GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
1047 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1053 tee = GST_TEE (parent);
1056 case GST_PAD_MODE_PUSH:
1058 GST_OBJECT_LOCK (tee);
1059 tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
1061 if (active && !tee->has_chain)
1063 GST_OBJECT_UNLOCK (tee);
1076 GST_OBJECT_UNLOCK (tee);
1077 GST_INFO_OBJECT (tee,
1078 "Tee cannot operate in push mode with has-chain==FALSE");
1084 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1091 tee = GST_TEE (parent);
1094 case GST_PAD_MODE_PULL:
1096 GST_OBJECT_LOCK (tee);
1098 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
1101 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
1102 goto cannot_pull_multiple_srcs;
1104 sinkpad = gst_object_ref (tee->sinkpad);
1106 GST_OBJECT_UNLOCK (tee);
1108 res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
1109 gst_object_unref (sinkpad);
1112 goto sink_activate_failed;
1114 GST_OBJECT_LOCK (tee);
1116 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
1117 tee->pull_pad = pad;
1119 if (pad == tee->pull_pad)
1120 tee->pull_pad = NULL;
1122 tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
1123 GST_OBJECT_UNLOCK (tee);
1136 GST_OBJECT_UNLOCK (tee);
1137 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1141 cannot_pull_multiple_srcs:
1143 GST_OBJECT_UNLOCK (tee);
1144 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1145 "pull-mode set to SINGLE");
1148 sink_activate_failed:
1150 GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
1151 active ? "" : "de");
1157 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1163 tee = GST_TEE (parent);
1165 switch (GST_QUERY_TYPE (query)) {
1166 case GST_QUERY_SCHEDULING:
1170 GST_OBJECT_LOCK (tee);
1172 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
1173 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1176 } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
1177 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1178 "pull-mode set to SINGLE");
1182 sinkpad = gst_object_ref (tee->sinkpad);
1183 GST_OBJECT_UNLOCK (tee);
1186 /* ask peer if we can operate in pull mode */
1187 res = gst_pad_peer_query (sinkpad, query);
1191 gst_object_unref (sinkpad);
1195 res = gst_pad_query_default (pad, parent, query);
1203 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
1205 GstPad *pad = g_value_get_object (vpad);
1207 if (pad != tee->pull_pad)
1208 gst_pad_push_event (pad, gst_event_new_eos ());
1212 gst_tee_pull_eos (GstTee * tee)
1216 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
1217 while (gst_iterator_foreach (iter,
1218 (GstIteratorForeachFunction) gst_tee_push_eos,
1219 tee) == GST_ITERATOR_RESYNC)
1220 gst_iterator_resync (iter);
1221 gst_iterator_free (iter);
1224 static GstFlowReturn
1225 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
1226 guint length, GstBuffer ** buf)
1231 tee = GST_TEE (parent);
1233 ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
1235 if (ret == GST_FLOW_OK)
1236 ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
1237 else if (ret == GST_FLOW_EOS)
1238 gst_tee_pull_eos (tee);