2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4 * 2007 Wim Taymans <wim.taymans@gmail.com>
6 * gsttee.c: Tee element, one in N out
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
27 * @see_also: #GstIdentity
29 * Split data to multiple pads. Branching the data flow is useful when e.g.
30 * capturing a video where the video is shown on the screen and also encoded and
31 * written to a file. Another example is playing music and hooking up a
32 * visualisation module.
34 * One needs to use separate queue elements (or a multiqueue) in each branch to
35 * provide separate threads for each branch. Otherwise a blocked dataflow in one
36 * branch would stall the other branches.
38 * ## Example launch line
40 * gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
43 * Play song.ogg audio file which must be in the current working directory
44 * and render visualisations using the goom element (this can be easier done
45 * using the playbin element, this is just an example pipeline).
53 #include "gst/glib-compat-private.h"
58 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
63 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
64 #define GST_CAT_DEFAULT gst_tee_debug
66 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
68 gst_tee_pull_mode_get_type (void)
70 static GType type = 0;
71 static const GEnumValue data[] = {
72 {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
73 {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
79 type = g_enum_register_static ("GstTeePullMode", data);
84 #define DEFAULT_PROP_NUM_SRC_PADS 0
85 #define DEFAULT_PROP_HAS_CHAIN TRUE
86 #define DEFAULT_PROP_SILENT TRUE
87 #define DEFAULT_PROP_LAST_MESSAGE NULL
88 #define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER
89 #define DEFAULT_PROP_ALLOW_NOT_LINKED FALSE
100 PROP_ALLOW_NOT_LINKED,
103 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
106 GST_STATIC_CAPS_ANY);
109 GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
110 #define gst_tee_parent_class parent_class
111 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
113 static GParamSpec *pspec_last_message = NULL;
114 static GParamSpec *pspec_alloc_pad = NULL;
116 GType gst_tee_pad_get_type (void);
118 #define GST_TYPE_TEE_PAD \
119 (gst_tee_pad_get_type())
120 #define GST_TEE_PAD(obj) \
121 (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
122 #define GST_TEE_PAD_CLASS(klass) \
123 (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
124 #define GST_IS_TEE_PAD(obj) \
125 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
126 #define GST_IS_TEE_PAD_CLASS(klass) \
127 (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
128 #define GST_TEE_PAD_CAST(obj) \
131 typedef struct _GstTeePad GstTeePad;
132 typedef struct _GstTeePadClass GstTeePadClass;
140 GstFlowReturn result;
144 struct _GstTeePadClass
149 G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
152 gst_tee_pad_class_init (GstTeePadClass * klass)
157 gst_tee_pad_reset (GstTeePad * pad)
160 pad->result = GST_FLOW_NOT_LINKED;
161 pad->removed = FALSE;
165 gst_tee_pad_init (GstTeePad * pad)
167 gst_tee_pad_reset (pad);
170 static GstPad *gst_tee_request_new_pad (GstElement * element,
171 GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
172 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
174 static void gst_tee_finalize (GObject * object);
175 static void gst_tee_set_property (GObject * object, guint prop_id,
176 const GValue * value, GParamSpec * pspec);
177 static void gst_tee_get_property (GObject * object, guint prop_id,
178 GValue * value, GParamSpec * pspec);
179 static void gst_tee_dispose (GObject * object);
181 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
183 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
184 GstBufferList * list);
185 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
187 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
189 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
190 GstPadMode mode, gboolean active);
191 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
193 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
194 GstPadMode mode, gboolean active);
195 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
196 guint64 offset, guint length, GstBuffer ** buf);
199 gst_tee_dispose (GObject * object)
204 for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
205 GstPad *pad = GST_PAD (item->data);
206 if (GST_PAD_IS_SRC (pad)) {
207 gst_element_release_request_pad (GST_ELEMENT (object), pad);
212 G_OBJECT_CLASS (parent_class)->dispose (object);
216 gst_tee_finalize (GObject * object)
220 tee = GST_TEE (object);
222 g_hash_table_unref (tee->pad_indexes);
224 g_free (tee->last_message);
226 G_OBJECT_CLASS (parent_class)->finalize (object);
230 gst_tee_class_init (GstTeeClass * klass)
232 GObjectClass *gobject_class;
233 GstElementClass *gstelement_class;
235 gobject_class = G_OBJECT_CLASS (klass);
236 gstelement_class = GST_ELEMENT_CLASS (klass);
238 gobject_class->finalize = gst_tee_finalize;
239 gobject_class->set_property = gst_tee_set_property;
240 gobject_class->get_property = gst_tee_get_property;
241 gobject_class->dispose = gst_tee_dispose;
243 g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
244 g_param_spec_int ("num-src-pads", "Num Src Pads",
245 "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
246 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
247 g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
248 g_param_spec_boolean ("has-chain", "Has Chain",
249 "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
250 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 g_object_class_install_property (gobject_class, PROP_SILENT,
252 g_param_spec_boolean ("silent", "Silent",
253 "Don't produce last_message events", DEFAULT_PROP_SILENT,
254 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
255 pspec_last_message = g_param_spec_string ("last-message", "Last Message",
256 "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
257 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
258 g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
260 g_object_class_install_property (gobject_class, PROP_PULL_MODE,
261 g_param_spec_enum ("pull-mode", "Pull mode",
262 "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
264 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
266 "The pad ALLOCATION queries will be proxied to (DEPRECATED, has no effect)",
268 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED);
269 g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
273 * GstTee:allow-not-linked
275 * This property makes sink pad return GST_FLOW_OK even if there are no
276 * source pads or any of them is linked.
278 * This is useful to avoid errors when you have a dynamic pipeline and during
279 * a reconnection you can have all the pads unlinked or removed.
283 g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED,
284 g_param_spec_boolean ("allow-not-linked", "Allow not linked",
285 "Return GST_FLOW_OK even if there are no source pads or they are "
286 "all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED,
287 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289 gst_element_class_set_static_metadata (gstelement_class,
292 "1-to-N pipe fitting",
293 "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
294 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
295 gst_element_class_add_static_pad_template (gstelement_class, &src_template);
297 gstelement_class->request_new_pad =
298 GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
299 gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
303 gst_tee_init (GstTee * tee)
305 tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
306 tee->sink_mode = GST_PAD_MODE_NONE;
308 gst_pad_set_event_function (tee->sinkpad,
309 GST_DEBUG_FUNCPTR (gst_tee_sink_event));
310 gst_pad_set_query_function (tee->sinkpad,
311 GST_DEBUG_FUNCPTR (gst_tee_sink_query));
312 gst_pad_set_activatemode_function (tee->sinkpad,
313 GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
314 gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
315 gst_pad_set_chain_list_function (tee->sinkpad,
316 GST_DEBUG_FUNCPTR (gst_tee_chain_list));
317 GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
318 gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
320 tee->pad_indexes = g_hash_table_new (NULL, NULL);
322 tee->last_message = NULL;
326 gst_tee_notify_alloc_pad (GstTee * tee)
328 g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
332 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
334 GstPad *srcpad = GST_PAD_CAST (user_data);
337 ret = gst_pad_store_sticky_event (srcpad, *event);
338 if (ret != GST_FLOW_OK) {
339 GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
340 GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
347 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
348 const gchar * name_templ, const GstCaps * caps)
357 tee = GST_TEE (element);
359 GST_DEBUG_OBJECT (tee, "requesting pad");
361 GST_OBJECT_LOCK (tee);
363 if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) {
364 GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index);
365 if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) {
366 GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ);
367 GST_OBJECT_UNLOCK (tee);
370 if (index >= tee->next_pad_index)
371 tee->next_pad_index = index + 1;
373 index = tee->next_pad_index;
375 while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index)))
378 tee->next_pad_index = index + 1;
381 g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL);
383 name = g_strdup_printf ("src_%u", index);
385 srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
386 "name", name, "direction", templ->direction, "template", templ,
388 GST_TEE_PAD_CAST (srcpad)->index = index;
391 mode = tee->sink_mode;
393 GST_OBJECT_UNLOCK (tee);
396 case GST_PAD_MODE_PULL:
397 /* we already have a src pad in pull mode, and our pull mode can only be
398 SINGLE, so fall through to activate this new pad in push mode */
399 case GST_PAD_MODE_PUSH:
400 res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
408 goto activate_failed;
410 gst_pad_set_activatemode_function (srcpad,
411 GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
412 gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
413 gst_pad_set_getrange_function (srcpad,
414 GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
415 GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
416 /* Forward sticky events to the new srcpad */
417 gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
418 gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
425 gboolean changed = FALSE;
427 GST_OBJECT_LOCK (tee);
428 GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
429 if (tee->allocpad == srcpad) {
430 tee->allocpad = NULL;
433 GST_OBJECT_UNLOCK (tee);
434 gst_object_unref (srcpad);
436 gst_tee_notify_alloc_pad (tee);
443 gst_tee_release_pad (GstElement * element, GstPad * pad)
446 gboolean changed = FALSE;
449 tee = GST_TEE (element);
451 GST_DEBUG_OBJECT (tee, "releasing pad");
453 GST_OBJECT_LOCK (tee);
454 index = GST_TEE_PAD_CAST (pad)->index;
455 /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
456 GST_TEE_PAD_CAST (pad)->removed = TRUE;
457 if (tee->allocpad == pad) {
458 tee->allocpad = NULL;
461 GST_OBJECT_UNLOCK (tee);
463 gst_object_ref (pad);
464 gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
466 gst_pad_set_active (pad, FALSE);
468 gst_object_unref (pad);
471 gst_tee_notify_alloc_pad (tee);
474 GST_OBJECT_LOCK (tee);
475 g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index));
476 GST_OBJECT_UNLOCK (tee);
480 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
483 GstTee *tee = GST_TEE (object);
485 GST_OBJECT_LOCK (tee);
488 tee->has_chain = g_value_get_boolean (value);
491 tee->silent = g_value_get_boolean (value);
494 tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
498 GstPad *pad = g_value_get_object (value);
499 GST_OBJECT_LOCK (pad);
500 if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
503 GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
504 " is not my pad", GST_OBJECT_NAME (pad));
505 GST_OBJECT_UNLOCK (pad);
508 case PROP_ALLOW_NOT_LINKED:
509 tee->allow_not_linked = g_value_get_boolean (value);
512 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
515 GST_OBJECT_UNLOCK (tee);
519 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
522 GstTee *tee = GST_TEE (object);
524 GST_OBJECT_LOCK (tee);
526 case PROP_NUM_SRC_PADS:
527 g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
530 g_value_set_boolean (value, tee->has_chain);
533 g_value_set_boolean (value, tee->silent);
535 case PROP_LAST_MESSAGE:
536 g_value_set_string (value, tee->last_message);
539 g_value_set_enum (value, tee->pull_mode);
542 g_value_set_object (value, tee->allocpad);
544 case PROP_ALLOW_NOT_LINKED:
545 g_value_set_boolean (value, tee->allow_not_linked);
548 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
551 GST_OBJECT_UNLOCK (tee);
555 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
559 switch (GST_EVENT_TYPE (event)) {
561 res = gst_pad_event_default (pad, parent, event);
572 GstAllocationParams params;
575 gboolean first_query;
578 /* This function will aggregate some of the allocation query information with
579 * the strategy to force upstream allocation. Depending on downstream
580 * allocation would otherwise make dynamic pipelines much more complicated as
581 * application would need to now drain buffer in certain cases before getting
582 * rid of a tee branch. */
584 gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data)
586 struct AllocQueryCtx *ctx = user_data;
587 GstPad *src_pad = g_value_get_object (item);
591 guint count, i, size, min;
593 GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s",
594 GST_DEBUG_PAD_NAME (src_pad));
596 peer_pad = gst_pad_get_peer (src_pad);
598 if (ctx->tee->allow_not_linked) {
599 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.",
600 GST_DEBUG_PAD_NAME (src_pad));
603 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.",
604 GST_DEBUG_PAD_NAME (src_pad));
605 g_value_set_boolean (ret, FALSE);
610 gst_query_parse_allocation (ctx->query, &caps, NULL);
612 query = gst_query_new_allocation (caps, FALSE);
613 if (!gst_pad_query (peer_pad, query)) {
614 GST_DEBUG_OBJECT (ctx->tee,
615 "Allocation query failed on pad %s, ignoring allocation",
616 GST_PAD_NAME (src_pad));
617 g_value_set_boolean (ret, FALSE);
618 gst_query_unref (query);
619 gst_object_unref (peer_pad);
623 gst_object_unref (peer_pad);
625 /* Allocation Params:
626 * store the maximum alignment, prefix and pading, but ignore the
627 * allocators and the flags which are tied to downstream allocation */
628 count = gst_query_get_n_allocation_params (query);
629 for (i = 0; i < count; i++) {
630 GstAllocationParams params = { 0, };
632 gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms);
634 GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%"
635 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
636 G_GSIZE_FORMAT, params.align, params.prefix, params.padding);
638 if (ctx->params.align < params.align)
639 ctx->params.align = params.align;
641 if (ctx->params.prefix < params.prefix)
642 ctx->params.prefix = params.prefix;
644 if (ctx->params.padding < params.padding)
645 ctx->params.padding = params.padding;
649 * We want to keep the biggest size and biggest minimum number of buffers to
650 * make sure downstream requirement can be satisfied. We don't really care
651 * about the maximum, as this is a parameter of the downstream provided
652 * pool. We only read the first allocation pool as the minimum number of
653 * buffers is normally constant regardless of the pool being used. */
654 if (gst_query_get_n_allocation_pools (query) > 0) {
655 gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL);
657 GST_DEBUG_OBJECT (ctx->tee,
658 "Aggregating allocation pool size=%u min_buffers=%u", size, min);
660 if (ctx->size < size)
663 if (ctx->min_buffers < min)
664 ctx->min_buffers = min;
668 * For allocation meta, we'll need to aggregate the argument using the new
669 * GstMetaInfo::agggregate_func */
670 count = gst_query_get_n_allocation_metas (query);
671 for (i = 0; i < count; i++) {
674 const GstStructure *param;
676 api = gst_query_parse_nth_allocation_meta (query, i, ¶m);
678 /* For the first query, copy all metas */
679 if (ctx->first_query) {
680 gst_query_add_allocation_meta (ctx->query, api, param);
684 /* Afterward, aggregate the common params */
685 if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) {
686 const GstStructure *ctx_param;
688 gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param);
690 /* Keep meta which has no params */
691 if (ctx_param == NULL && param == NULL)
694 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
696 gst_query_remove_nth_allocation_meta (ctx->query, ctx_index);
700 /* Finally, cleanup metas from the stored query that aren't support on this
702 count = gst_query_get_n_allocation_metas (ctx->query);
703 for (i = 0; i < count;) {
704 GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL);
706 if (!gst_query_find_allocation_meta (query, api, NULL)) {
707 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
709 gst_query_remove_nth_allocation_meta (ctx->query, i);
717 ctx->first_query = FALSE;
718 gst_query_unref (query);
725 gst_tee_clear_query_allocation_meta (GstQuery * query)
727 guint count = gst_query_get_n_allocation_metas (query);
730 for (i = 1; i <= count; i++)
731 gst_query_remove_nth_allocation_meta (query, count - i);
735 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
737 GstTee *tee = GST_TEE (parent);
740 switch (GST_QUERY_TYPE (query)) {
741 case GST_QUERY_ALLOCATION:
744 GValue ret = G_VALUE_INIT;
745 struct AllocQueryCtx ctx = { tee, query, };
747 g_value_init (&ret, G_TYPE_BOOLEAN);
748 g_value_set_boolean (&ret, TRUE);
750 ctx.first_query = TRUE;
751 gst_allocation_params_init (&ctx.params);
753 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
754 while (GST_ITERATOR_RESYNC ==
755 gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) {
756 gst_iterator_resync (iter);
757 ctx.first_query = TRUE;
758 gst_allocation_params_init (&ctx.params);
761 gst_tee_clear_query_allocation_meta (query);
764 gst_iterator_free (iter);
765 res = g_value_get_boolean (&ret);
766 g_value_unset (&ret);
769 GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%"
770 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
771 G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix,
774 GST_DEBUG_OBJECT (tee,
775 "Aggregated allocation pools size=%u min_buffers=%u", ctx.size,
778 #ifndef GST_DISABLE_GST_DEBUG
780 guint count = gst_query_get_n_allocation_metas (query);
783 GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count);
785 for (i = 0; i < count; i++)
786 GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s",
787 g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i,
792 gst_query_add_allocation_param (ctx.query, NULL, &ctx.params);
793 gst_query_add_allocation_pool (ctx.query, NULL, ctx.size,
796 gst_tee_clear_query_allocation_meta (query);
801 res = gst_pad_query_default (pad, parent, query);
808 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
810 GST_OBJECT_LOCK (tee);
811 g_free (tee->last_message);
814 g_strdup_printf ("chain-list ******* (%s:%s)t %p",
815 GST_DEBUG_PAD_NAME (pad), data);
818 g_strdup_printf ("chain ******* (%s:%s)t (%" G_GSIZE_FORMAT
819 " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
820 gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
822 GST_OBJECT_UNLOCK (tee);
824 g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
828 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
833 if (pad == tee->pull_pad) {
834 /* don't push on the pad we're pulling from */
836 } else if (is_list) {
838 gst_pad_push_list (pad,
839 gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
841 res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
847 clear_pads (GstPad * pad, GstTee * tee)
849 GST_TEE_PAD_CAST (pad)->pushed = FALSE;
850 GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
854 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
858 GstFlowReturn ret, cret;
860 if (G_UNLIKELY (!tee->silent))
861 gst_tee_do_message (tee, tee->sinkpad, data, is_list);
863 GST_OBJECT_LOCK (tee);
864 pads = GST_ELEMENT_CAST (tee)->srcpads;
866 /* special case for zero pads */
867 if (G_UNLIKELY (!pads))
870 /* special case for just one pad that avoids reffing the buffer */
872 GstPad *pad = GST_PAD_CAST (pads->data);
874 /* Keep another ref around, a pad probe
875 * might release and destroy the pad */
876 gst_object_ref (pad);
877 GST_OBJECT_UNLOCK (tee);
879 if (pad == tee->pull_pad) {
881 } else if (is_list) {
882 ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
884 ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
887 if (GST_TEE_PAD_CAST (pad)->removed)
888 ret = GST_FLOW_NOT_LINKED;
890 if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) {
894 gst_object_unref (pad);
899 /* mark all pads as 'not pushed on yet' */
900 g_list_foreach (pads, (GFunc) clear_pads, tee);
903 if (tee->allow_not_linked) {
906 cret = GST_FLOW_NOT_LINKED;
908 pads = GST_ELEMENT_CAST (tee)->srcpads;
909 cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
914 pad = GST_PAD_CAST (pads->data);
916 if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
917 /* not yet pushed, release lock and start pushing */
918 gst_object_ref (pad);
919 GST_OBJECT_UNLOCK (tee);
921 GST_LOG_OBJECT (pad, "Starting to push %s %p",
922 is_list ? "list" : "buffer", data);
924 ret = gst_tee_do_push (tee, pad, data, is_list);
926 GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data,
927 gst_flow_get_name (ret));
929 GST_OBJECT_LOCK (tee);
930 /* keep track of which pad we pushed and the result value */
931 GST_TEE_PAD_CAST (pad)->pushed = TRUE;
932 GST_TEE_PAD_CAST (pad)->result = ret;
933 gst_object_unref (pad);
936 /* already pushed, use previous return value */
937 ret = GST_TEE_PAD_CAST (pad)->result;
938 GST_LOG_OBJECT (pad, "pad already pushed with %s",
939 gst_flow_get_name (ret));
942 /* before we go combining the return value, check if the pad list is still
943 * the same. It could be possible that the pad we just pushed was removed
944 * and the return value it not valid anymore */
945 if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
946 GST_LOG_OBJECT (tee, "pad list changed");
947 /* the list of pads changed, restart iteration. Pads that we already
948 * pushed on and are still in the new list, will not be pushed on
953 /* stop pushing more buffers when we have a fatal error */
954 if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
957 /* keep all other return values, overwriting the previous one. */
958 if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
959 GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
962 pads = g_list_next (pads);
964 GST_OBJECT_UNLOCK (tee);
966 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
968 /* no need to unset gvalue */
974 if (tee->allow_not_linked) {
975 GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s",
976 is_list ? "buffer-list" : "buffer");
979 GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
980 ret = GST_FLOW_NOT_LINKED;
986 GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
991 GST_OBJECT_UNLOCK (tee);
992 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
998 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1003 tee = GST_TEE_CAST (parent);
1005 GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
1007 res = gst_tee_handle_data (tee, buffer, FALSE);
1009 GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
1014 static GstFlowReturn
1015 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
1020 tee = GST_TEE_CAST (parent);
1022 GST_DEBUG_OBJECT (tee, "received list %p", list);
1024 res = gst_tee_handle_data (tee, list, TRUE);
1026 GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
1032 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1038 tee = GST_TEE (parent);
1041 case GST_PAD_MODE_PUSH:
1043 GST_OBJECT_LOCK (tee);
1044 tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
1046 if (active && !tee->has_chain)
1048 GST_OBJECT_UNLOCK (tee);
1061 GST_OBJECT_UNLOCK (tee);
1062 GST_INFO_OBJECT (tee,
1063 "Tee cannot operate in push mode with has-chain==FALSE");
1069 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1076 tee = GST_TEE (parent);
1079 case GST_PAD_MODE_PULL:
1081 GST_OBJECT_LOCK (tee);
1083 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
1086 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
1087 goto cannot_pull_multiple_srcs;
1089 sinkpad = gst_object_ref (tee->sinkpad);
1091 GST_OBJECT_UNLOCK (tee);
1093 res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
1094 gst_object_unref (sinkpad);
1097 goto sink_activate_failed;
1099 GST_OBJECT_LOCK (tee);
1101 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
1102 tee->pull_pad = pad;
1104 if (pad == tee->pull_pad)
1105 tee->pull_pad = NULL;
1107 tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
1108 GST_OBJECT_UNLOCK (tee);
1121 GST_OBJECT_UNLOCK (tee);
1122 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1126 cannot_pull_multiple_srcs:
1128 GST_OBJECT_UNLOCK (tee);
1129 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1130 "pull-mode set to SINGLE");
1133 sink_activate_failed:
1135 GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
1136 active ? "" : "de");
1142 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1148 tee = GST_TEE (parent);
1150 switch (GST_QUERY_TYPE (query)) {
1151 case GST_QUERY_SCHEDULING:
1155 GST_OBJECT_LOCK (tee);
1157 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
1158 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1161 } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
1162 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1163 "pull-mode set to SINGLE");
1167 sinkpad = gst_object_ref (tee->sinkpad);
1168 GST_OBJECT_UNLOCK (tee);
1171 /* ask peer if we can operate in pull mode */
1172 res = gst_pad_peer_query (sinkpad, query);
1176 gst_object_unref (sinkpad);
1180 res = gst_pad_query_default (pad, parent, query);
1188 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
1190 GstPad *pad = g_value_get_object (vpad);
1192 if (pad != tee->pull_pad)
1193 gst_pad_push_event (pad, gst_event_new_eos ());
1197 gst_tee_pull_eos (GstTee * tee)
1201 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
1202 while (gst_iterator_foreach (iter,
1203 (GstIteratorForeachFunction) gst_tee_push_eos,
1204 tee) == GST_ITERATOR_RESYNC)
1205 gst_iterator_resync (iter);
1206 gst_iterator_free (iter);
1209 static GstFlowReturn
1210 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
1211 guint length, GstBuffer ** buf)
1216 tee = GST_TEE (parent);
1218 ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
1220 if (ret == GST_FLOW_OK)
1221 ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
1222 else if (ret == GST_FLOW_EOS)
1223 gst_tee_pull_eos (tee);