1 /* GStreamer input selector
2 * Copyright (C) 2003 Julien Moutte <julien@moutte.net>
3 * Copyright (C) 2005 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
4 * Copyright (C) 2005 Jan Schmidt <thaytan@mad.scientist.com>
5 * Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
6 * Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
7 * Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
8 * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 02111-1307, USA.
27 * SECTION:element-input-selector
28 * @see_also: #GstOutputSelector
30 * Direct one out of N input streams to the output pad.
32 * The input pads are from a GstPad subclass and have additional
33 * properties, which users may find useful, namely:
37 * "running-time": Running time of stream on pad (#gint64)
40 * "tags": The currently active tags on the pad (#GstTagList, boxed type)
43 * "active": If the pad is currently active (#gboolean)
46 * "always-ok" : Make an inactive pads return #GST_FLOW_OK instead of
47 * #GST_FLOW_NOT_LINKED
60 #include "gstinputselector.h"
62 GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
63 #define GST_CAT_DEFAULT input_selector_debug
65 #if GLIB_CHECK_VERSION(2, 26, 0)
66 #define NOTIFY_MUTEX_LOCK()
67 #define NOTIFY_MUTEX_UNLOCK()
69 static GStaticRecMutex notify_mutex = G_STATIC_REC_MUTEX_INIT;
70 #define NOTIFY_MUTEX_LOCK() g_static_rec_mutex_lock (¬ify_mutex)
71 #define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (¬ify_mutex)
74 #define GST_INPUT_SELECTOR_GET_LOCK(sel) (((GstInputSelector*)(sel))->lock)
75 #define GST_INPUT_SELECTOR_GET_COND(sel) (((GstInputSelector*)(sel))->cond)
76 #define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
77 #define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
78 #define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
79 GST_INPUT_SELECTOR_GET_LOCK(sel)))
80 #define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
82 static GstStaticPadTemplate gst_input_selector_sink_factory =
83 GST_STATIC_PAD_TEMPLATE ("sink_%u",
88 static GstStaticPadTemplate gst_input_selector_src_factory =
89 GST_STATIC_PAD_TEMPLATE ("src",
102 #define DEFAULT_SYNC_STREAMS FALSE
104 #define DEFAULT_PAD_ALWAYS_OK TRUE
109 PROP_PAD_RUNNING_TIME,
122 static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 };
124 static inline gboolean gst_input_selector_is_active_sinkpad (GstInputSelector *
126 static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel,
128 static GstPad *gst_input_selector_get_linked_pad (GstInputSelector * sel,
129 GstPad * pad, gboolean strict);
131 #define GST_TYPE_SELECTOR_PAD \
132 (gst_selector_pad_get_type())
133 #define GST_SELECTOR_PAD(obj) \
134 (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SELECTOR_PAD, GstSelectorPad))
135 #define GST_SELECTOR_PAD_CLASS(klass) \
136 (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SELECTOR_PAD, GstSelectorPadClass))
137 #define GST_IS_SELECTOR_PAD(obj) \
138 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SELECTOR_PAD))
139 #define GST_IS_SELECTOR_PAD_CLASS(klass) \
140 (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SELECTOR_PAD))
141 #define GST_SELECTOR_PAD_CAST(obj) \
142 ((GstSelectorPad *)(obj))
144 typedef struct _GstSelectorPad GstSelectorPad;
145 typedef struct _GstSelectorPadClass GstSelectorPadClass;
147 struct _GstSelectorPad
151 gboolean active; /* when buffer have passed the pad */
152 gboolean pushed; /* when buffer was pushed downstream since activation */
153 gboolean eos; /* when EOS has been received */
154 gboolean eos_sent; /* when EOS was sent downstream */
155 gboolean discont; /* after switching we create a discont */
156 gboolean flushing; /* set after flush-start and before flush-stop */
158 GstTagList *tags; /* last tags received on the pad */
160 GstClockTime position; /* the current position in the segment */
161 GstSegment segment; /* the current segment on the pad */
162 guint32 segment_seqnum; /* sequence number of the current segment */
164 gboolean segment_pending;
167 struct _GstSelectorPadClass
172 GType gst_selector_pad_get_type (void);
173 static void gst_selector_pad_finalize (GObject * object);
174 static void gst_selector_pad_get_property (GObject * object,
175 guint prop_id, GValue * value, GParamSpec * pspec);
176 static void gst_selector_pad_set_property (GObject * object,
177 guint prop_id, const GValue * value, GParamSpec * pspec);
179 static gint64 gst_selector_pad_get_running_time (GstSelectorPad * pad);
180 static void gst_selector_pad_reset (GstSelectorPad * pad);
181 static gboolean gst_selector_pad_event (GstPad * pad, GstEvent * event);
182 static GstCaps *gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter);
183 static gboolean gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps);
184 static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad);
185 static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstBuffer * buf);
187 G_DEFINE_TYPE (GstSelectorPad, gst_selector_pad, GST_TYPE_PAD);
190 gst_selector_pad_class_init (GstSelectorPadClass * klass)
192 GObjectClass *gobject_class;
194 gobject_class = (GObjectClass *) klass;
196 gobject_class->finalize = gst_selector_pad_finalize;
198 gobject_class->get_property = gst_selector_pad_get_property;
199 gobject_class->set_property = gst_selector_pad_set_property;
201 g_object_class_install_property (gobject_class, PROP_PAD_RUNNING_TIME,
202 g_param_spec_int64 ("running-time", "Running time",
203 "Running time of stream on pad", 0, G_MAXINT64, 0,
204 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
205 g_object_class_install_property (gobject_class, PROP_PAD_TAGS,
206 g_param_spec_boxed ("tags", "Tags",
207 "The currently active tags on the pad", GST_TYPE_TAG_LIST,
208 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
209 g_object_class_install_property (gobject_class, PROP_PAD_ACTIVE,
210 g_param_spec_boolean ("active", "Active",
211 "If the pad is currently active", FALSE,
212 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
213 /* FIXME: better property name? */
214 g_object_class_install_property (gobject_class, PROP_PAD_ALWAYS_OK,
215 g_param_spec_boolean ("always-ok", "Always OK",
216 "Make an inactive pad return OK instead of NOT_LINKED",
217 DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221 gst_selector_pad_init (GstSelectorPad * pad)
223 pad->always_ok = DEFAULT_PAD_ALWAYS_OK;
224 gst_selector_pad_reset (pad);
228 gst_selector_pad_finalize (GObject * object)
232 pad = GST_SELECTOR_PAD_CAST (object);
235 gst_tag_list_free (pad->tags);
237 G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object);
241 gst_selector_pad_set_property (GObject * object, guint prop_id,
242 const GValue * value, GParamSpec * pspec)
244 GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
247 case PROP_PAD_ALWAYS_OK:
248 GST_OBJECT_LOCK (object);
249 spad->always_ok = g_value_get_boolean (value);
250 GST_OBJECT_UNLOCK (object);
253 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
259 gst_selector_pad_get_property (GObject * object, guint prop_id,
260 GValue * value, GParamSpec * pspec)
262 GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
265 case PROP_PAD_RUNNING_TIME:
266 g_value_set_int64 (value, gst_selector_pad_get_running_time (spad));
269 GST_OBJECT_LOCK (object);
270 g_value_set_boxed (value, spad->tags);
271 GST_OBJECT_UNLOCK (object);
273 case PROP_PAD_ACTIVE:
275 GstInputSelector *sel;
277 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (spad));
278 g_value_set_boolean (value, gst_input_selector_is_active_sinkpad (sel,
279 GST_PAD_CAST (spad)));
280 gst_object_unref (sel);
283 case PROP_PAD_ALWAYS_OK:
284 GST_OBJECT_LOCK (object);
285 g_value_set_boolean (value, spad->always_ok);
286 GST_OBJECT_UNLOCK (object);
289 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
295 gst_selector_pad_get_running_time (GstSelectorPad * pad)
299 GST_OBJECT_LOCK (pad);
301 guint64 position = pad->position;
302 GstFormat format = pad->segment.format;
304 ret = gst_segment_to_running_time (&pad->segment, format, position);
306 GST_OBJECT_UNLOCK (pad);
308 GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT,
309 GST_TIME_ARGS (ret));
315 gst_selector_pad_reset (GstSelectorPad * pad)
317 GST_OBJECT_LOCK (pad);
321 pad->eos_sent = FALSE;
322 pad->segment_pending = FALSE;
323 pad->discont = FALSE;
324 pad->flushing = FALSE;
325 pad->position = GST_CLOCK_TIME_NONE;
326 gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
327 GST_OBJECT_UNLOCK (pad);
330 /* strictly get the linked pad from the sinkpad. If the pad is active we return
331 * the srcpad else we return NULL */
333 gst_selector_pad_iterate_linked_pads (GstPad * pad)
335 GstInputSelector *sel;
337 GstIterator *it = NULL;
340 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
341 if (G_UNLIKELY (sel == NULL))
344 otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
346 g_value_init (&val, GST_TYPE_PAD);
347 g_value_set_object (&val, otherpad);
348 it = gst_iterator_new_single (GST_TYPE_PAD, &val);
349 g_value_unset (&val);
350 gst_object_unref (otherpad);
352 gst_object_unref (sel);
358 gst_selector_pad_event (GstPad * pad, GstEvent * event)
362 GstInputSelector *sel;
363 GstSelectorPad *selpad;
364 GstPad *prev_active_sinkpad;
365 GstPad *active_sinkpad;
367 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
368 if (G_UNLIKELY (sel == NULL)) {
369 gst_event_unref (event);
372 selpad = GST_SELECTOR_PAD_CAST (pad);
374 GST_INPUT_SELECTOR_LOCK (sel);
375 prev_active_sinkpad = sel->active_sinkpad;
376 active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
378 /* only forward if we are dealing with the active sinkpad */
379 forward = (pad == active_sinkpad);
380 GST_INPUT_SELECTOR_UNLOCK (sel);
382 if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
383 NOTIFY_MUTEX_LOCK ();
384 g_object_notify (G_OBJECT (sel), "active-pad");
385 NOTIFY_MUTEX_UNLOCK ();
388 switch (GST_EVENT_TYPE (event)) {
389 case GST_EVENT_FLUSH_START:
390 /* Unblock the pad if it's waiting */
391 GST_INPUT_SELECTOR_LOCK (sel);
392 selpad->flushing = TRUE;
393 GST_INPUT_SELECTOR_BROADCAST (sel);
394 GST_INPUT_SELECTOR_UNLOCK (sel);
396 case GST_EVENT_FLUSH_STOP:
397 GST_INPUT_SELECTOR_LOCK (sel);
398 gst_selector_pad_reset (selpad);
399 GST_INPUT_SELECTOR_UNLOCK (sel);
401 case GST_EVENT_SEGMENT:
403 GST_INPUT_SELECTOR_LOCK (sel);
404 GST_OBJECT_LOCK (selpad);
405 gst_event_copy_segment (event, &selpad->segment);
406 selpad->segment_seqnum = gst_event_get_seqnum (event);
408 /* Update the position */
409 if (selpad->position == GST_CLOCK_TIME_NONE
410 || selpad->segment.position > selpad->position) {
411 selpad->position = selpad->segment.position;
412 } else if (selpad->position != GST_CLOCK_TIME_NONE
413 && selpad->position > selpad->segment.position) {
414 selpad->segment.position = selpad->position;
417 gst_event_unref (event);
418 event = gst_event_new_segment (&selpad->segment);
419 gst_event_set_seqnum (event, selpad->segment_seqnum);
422 GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT,
425 /* If we aren't forwarding the event because the pad is not the
426 * active_sinkpad, then set the flag on the pad
427 * that says a segment needs sending if/when that pad is activated.
428 * For all other cases, we send the event immediately, which makes
429 * sparse streams and other segment updates work correctly downstream.
432 selpad->segment_pending = TRUE;
434 GST_OBJECT_UNLOCK (selpad);
435 GST_INPUT_SELECTOR_UNLOCK (sel);
440 GstTagList *tags, *oldtags, *newtags;
442 gst_event_parse_tag (event, &tags);
444 GST_OBJECT_LOCK (selpad);
445 oldtags = selpad->tags;
447 newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE);
448 selpad->tags = newtags;
450 gst_tag_list_free (oldtags);
451 GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags);
452 GST_OBJECT_UNLOCK (selpad);
454 g_object_notify (G_OBJECT (selpad), "tags");
461 selpad->eos_sent = TRUE;
465 /* If the active sinkpad is in EOS state but EOS
466 * was not sent downstream this means that the pad
467 * got EOS before it was set as active pad and that
468 * the previously active pad got EOS after it was
471 GST_INPUT_SELECTOR_LOCK (sel);
472 active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
473 tmp = GST_SELECTOR_PAD (active_sinkpad);
474 forward = (tmp->eos && !tmp->eos_sent);
475 tmp->eos_sent = TRUE;
476 GST_INPUT_SELECTOR_UNLOCK (sel);
478 GST_DEBUG_OBJECT (pad, "received EOS");
484 GST_DEBUG_OBJECT (pad, "forwarding event");
485 res = gst_pad_push_event (sel->srcpad, event);
487 gst_event_unref (event);
489 gst_object_unref (sel);
495 gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter)
497 GstInputSelector *sel;
500 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
501 if (G_UNLIKELY (sel == NULL))
502 return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
504 GST_DEBUG_OBJECT (sel, "Getting caps of srcpad peer");
505 caps = gst_pad_peer_get_caps (sel->srcpad, filter);
507 caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
509 gst_object_unref (sel);
515 gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps)
517 GstInputSelector *sel;
520 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
521 if (G_UNLIKELY (sel == NULL))
524 GST_DEBUG_OBJECT (sel, "Checking acceptcaps of srcpad peer");
525 res = gst_pad_peer_accept_caps (sel->srcpad, caps);
526 gst_object_unref (sel);
531 /* must be called with the SELECTOR_LOCK, will block while the pad is blocked
532 * or return TRUE when flushing */
534 gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
536 while (self->blocked && !self->flushing && !pad->flushing) {
537 /* we can be unlocked here when we are shutting down (flushing) or when we
539 GST_INPUT_SELECTOR_WAIT (self);
541 return self->flushing;
544 /* must be called with the SELECTOR_LOCK, will block until the running time
545 * of the active pad is after this pad or return TRUE when flushing */
547 gst_input_selector_wait_running_time (GstInputSelector * sel,
548 GstSelectorPad * pad, GstBuffer * buf)
550 GstPad *active_sinkpad;
551 GstSelectorPad *active_selpad;
552 GstSegment *seg, *active_seg;
553 GstClockTime running_time, active_running_time = -1;
558 gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
559 active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
560 active_seg = &active_selpad->segment;
562 /* We can only sync if the segments are in time format or
563 * if the active pad had no newsegment event yet */
564 if (seg->format != GST_FORMAT_TIME ||
565 (active_seg->format != GST_FORMAT_TIME
566 && active_seg->format != GST_FORMAT_UNDEFINED))
569 /* If we have no valid timestamp we can't sync this buffer */
570 if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf))
573 running_time = GST_BUFFER_TIMESTAMP (buf);
574 /* If possible try to get the running time at the end of the buffer */
575 if (GST_BUFFER_DURATION_IS_VALID (buf))
576 running_time += GST_BUFFER_DURATION (buf);
577 if (running_time > seg->stop)
578 running_time = seg->stop;
580 gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
581 /* If this is outside the segment don't sync */
582 if (running_time == -1)
585 /* Get active pad's running time, if no configured segment yet keep at -1 */
586 if (active_seg->format == GST_FORMAT_TIME)
587 active_running_time =
588 gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
589 active_selpad->position);
592 * a) this is the active pad
593 * b) the pad or the selector is flushing
594 * c) the selector is not blocked
595 * d) the active pad has no running time or the active
596 * pad's running time is before this running time
597 * e) the active pad has a non-time segment
599 while (pad != active_selpad && !sel->flushing && !pad->flushing &&
600 (sel->blocked || active_running_time == -1
601 || running_time >= active_running_time)) {
603 GST_DEBUG_OBJECT (pad,
604 "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
605 GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
606 GST_TIME_ARGS (active_running_time));
608 GST_INPUT_SELECTOR_WAIT (sel);
610 /* Get new active pad, it might have changed */
612 gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
613 active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
614 active_seg = &active_selpad->segment;
616 /* If the active segment is configured but not to time format
617 * we can't do any syncing at all */
618 if (active_seg->format != GST_FORMAT_TIME
619 && active_seg->format != GST_FORMAT_UNDEFINED)
622 /* Get the new active pad running time */
623 if (active_seg->format == GST_FORMAT_TIME)
624 active_running_time =
625 gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
626 active_selpad->position);
628 active_running_time = -1;
631 GST_DEBUG_OBJECT (pad,
632 "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %"
633 GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
634 GST_TIME_ARGS (active_running_time));
638 /* Return TRUE if the selector or the pad is flushing */
639 return (sel->flushing || pad->flushing);
644 gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
646 GstInputSelector *sel;
648 GstPad *active_sinkpad;
649 GstPad *prev_active_sinkpad;
650 GstSelectorPad *selpad;
651 GstClockTime start_time;
653 GstEvent *start_event = NULL;
655 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
656 selpad = GST_SELECTOR_PAD_CAST (pad);
657 seg = &selpad->segment;
659 GST_INPUT_SELECTOR_LOCK (sel);
660 /* wait or check for flushing */
661 if (gst_input_selector_wait (sel, selpad))
664 GST_LOG_OBJECT (pad, "getting active pad");
666 prev_active_sinkpad = sel->active_sinkpad;
667 active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
669 /* In sync mode wait until the active pad has advanced
670 * after the running time of the current buffer */
671 if (sel->sync_streams && active_sinkpad != pad) {
672 if (gst_input_selector_wait_running_time (sel, selpad, buf))
676 /* Might have changed while waiting */
677 active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
679 /* update the segment on the srcpad */
680 start_time = GST_BUFFER_TIMESTAMP (buf);
681 if (GST_CLOCK_TIME_IS_VALID (start_time)) {
682 GST_LOG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
683 GST_TIME_ARGS (start_time));
684 if (GST_BUFFER_DURATION_IS_VALID (buf))
685 GST_LOG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
686 GST_TIME_ARGS (start_time + GST_BUFFER_DURATION (buf)));
688 GST_OBJECT_LOCK (pad);
689 selpad->position = start_time;
690 GST_OBJECT_UNLOCK (pad);
693 /* Ignore buffers from pads except the selected one */
694 if (pad != active_sinkpad)
697 /* Tell all non-active pads that we advanced the running time */
698 if (sel->sync_streams)
699 GST_INPUT_SELECTOR_BROADCAST (sel);
701 /* if we have a pending segment, push it out now */
702 if (G_UNLIKELY (prev_active_sinkpad != active_sinkpad
703 || selpad->segment_pending)) {
704 GST_DEBUG_OBJECT (pad,
705 "pushing pending NEWSEGMENT update %d, rate %lf, applied rate %lf, "
706 "format %d, " "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
707 G_GINT64_FORMAT, FALSE, seg->rate, seg->applied_rate, seg->format,
708 seg->start, seg->stop, seg->time);
710 start_event = gst_event_new_segment (seg);
711 gst_event_set_seqnum (start_event, selpad->segment_seqnum);
713 selpad->segment_pending = FALSE;
715 GST_INPUT_SELECTOR_UNLOCK (sel);
717 if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
718 NOTIFY_MUTEX_LOCK ();
719 g_object_notify (G_OBJECT (sel), "active-pad");
720 NOTIFY_MUTEX_UNLOCK ();
724 gst_pad_push_event (sel->srcpad, start_event);
726 if (selpad->discont) {
727 buf = gst_buffer_make_writable (buf);
729 GST_DEBUG_OBJECT (pad, "Marking discont buffer %p", buf);
730 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
731 selpad->discont = FALSE;
735 GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf);
737 res = gst_pad_push (sel->srcpad, buf);
738 selpad->pushed = TRUE;
741 gst_object_unref (sel);
744 /* dropped buffers */
747 gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
749 GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
750 /* when we drop a buffer, we're creating a discont on this pad */
751 selpad->discont = TRUE;
752 GST_INPUT_SELECTOR_UNLOCK (sel);
753 gst_buffer_unref (buf);
755 /* figure out what to return upstream */
756 GST_OBJECT_LOCK (selpad);
757 if (selpad->always_ok || !active_pad_pushed)
760 res = GST_FLOW_NOT_LINKED;
761 GST_OBJECT_UNLOCK (selpad);
767 GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf);
768 GST_INPUT_SELECTOR_UNLOCK (sel);
769 gst_buffer_unref (buf);
770 res = GST_FLOW_WRONG_STATE;
775 static void gst_input_selector_dispose (GObject * object);
777 static void gst_input_selector_set_property (GObject * object,
778 guint prop_id, const GValue * value, GParamSpec * pspec);
779 static void gst_input_selector_get_property (GObject * object,
780 guint prop_id, GValue * value, GParamSpec * pspec);
782 static GstPad *gst_input_selector_request_new_pad (GstElement * element,
783 GstPadTemplate * templ, const gchar * unused, const GstCaps * caps);
784 static void gst_input_selector_release_pad (GstElement * element, GstPad * pad);
786 static GstStateChangeReturn gst_input_selector_change_state (GstElement *
787 element, GstStateChange transition);
789 static GstCaps *gst_input_selector_getcaps (GstPad * pad, GstCaps * filter);
790 static gboolean gst_input_selector_event (GstPad * pad, GstEvent * event);
791 static gboolean gst_input_selector_query (GstPad * pad, GstQuery * query);
792 static gint64 gst_input_selector_block (GstInputSelector * self);
794 /* FIXME: create these marshallers using glib-genmarshal */
796 gst_input_selector_marshal_INT64__VOID (GClosure * closure,
797 GValue * return_value G_GNUC_UNUSED,
798 guint n_param_values,
799 const GValue * param_values,
800 gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
802 typedef gint64 (*GMarshalFunc_INT64__VOID) (gpointer data1, gpointer data2);
803 register GMarshalFunc_INT64__VOID callback;
804 register GCClosure *cc = (GCClosure *) closure;
805 register gpointer data1, data2;
808 g_return_if_fail (return_value != NULL);
809 g_return_if_fail (n_param_values == 1);
811 if (G_CCLOSURE_SWAP_DATA (closure)) {
812 data1 = closure->data;
813 data2 = g_value_peek_pointer (param_values + 0);
815 data1 = g_value_peek_pointer (param_values + 0);
816 data2 = closure->data;
819 (GMarshalFunc_INT64__VOID) (marshal_data ? marshal_data : cc->callback);
821 v_return = callback (data1, data2);
823 g_value_set_int64 (return_value, v_return);
827 GST_DEBUG_CATEGORY_INIT (input_selector_debug, \
828 "input-selector", 0, "An input stream selector element");
829 #define gst_input_selector_parent_class parent_class
830 G_DEFINE_TYPE_WITH_CODE (GstInputSelector, gst_input_selector, GST_TYPE_ELEMENT,
834 gst_input_selector_class_init (GstInputSelectorClass * klass)
836 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
837 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
839 gobject_class->dispose = gst_input_selector_dispose;
841 gobject_class->set_property = gst_input_selector_set_property;
842 gobject_class->get_property = gst_input_selector_get_property;
844 g_object_class_install_property (gobject_class, PROP_N_PADS,
845 g_param_spec_uint ("n-pads", "Number of Pads",
846 "The number of sink pads", 0, G_MAXUINT, 0,
847 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
849 g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
850 g_param_spec_object ("active-pad", "Active pad",
851 "The currently active sink pad", GST_TYPE_PAD,
852 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
855 * GstInputSelector:sync-streams
857 * If set to %TRUE all inactive streams will be synced to the
858 * running time of the active stream. This makes sure that no
859 * buffers are dropped by input-selector that might be needed
860 * when switching the active pad.
864 g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
865 g_param_spec_boolean ("sync-streams", "Sync Streams",
866 "Synchronize inactive streams to the running time of the active stream",
867 DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
870 * GstInputSelector::block:
871 * @inputselector: the #GstInputSelector
873 * Block all sink pads in preparation for a switch. Returns the stop time of
874 * the current switch segment, as a running time, or 0 if there is no current
875 * active pad or the current active pad never received data.
877 gst_input_selector_signals[SIGNAL_BLOCK] =
878 g_signal_new ("block", G_TYPE_FROM_CLASS (klass),
879 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
880 G_STRUCT_OFFSET (GstInputSelectorClass, block), NULL, NULL,
881 gst_input_selector_marshal_INT64__VOID, G_TYPE_INT64, 0);
883 gst_element_class_set_details_simple (gstelement_class, "Input selector",
884 "Generic", "N-to-1 input stream selector",
885 "Julien Moutte <julien@moutte.net>, "
886 "Jan Schmidt <thaytan@mad.scientist.com>, "
887 "Wim Taymans <wim.taymans@gmail.com>");
888 gst_element_class_add_pad_template (gstelement_class,
889 gst_static_pad_template_get (&gst_input_selector_sink_factory));
890 gst_element_class_add_pad_template (gstelement_class,
891 gst_static_pad_template_get (&gst_input_selector_src_factory));
893 gstelement_class->request_new_pad = gst_input_selector_request_new_pad;
894 gstelement_class->release_pad = gst_input_selector_release_pad;
895 gstelement_class->change_state = gst_input_selector_change_state;
897 klass->block = GST_DEBUG_FUNCPTR (gst_input_selector_block);
901 gst_input_selector_init (GstInputSelector * sel)
903 sel->srcpad = gst_pad_new ("src", GST_PAD_SRC);
904 gst_pad_set_iterate_internal_links_function (sel->srcpad,
905 GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
906 gst_pad_set_getcaps_function (sel->srcpad,
907 GST_DEBUG_FUNCPTR (gst_input_selector_getcaps));
908 gst_pad_set_query_function (sel->srcpad,
909 GST_DEBUG_FUNCPTR (gst_input_selector_query));
910 gst_pad_set_event_function (sel->srcpad,
911 GST_DEBUG_FUNCPTR (gst_input_selector_event));
912 gst_element_add_pad (GST_ELEMENT (sel), sel->srcpad);
913 /* sinkpad management */
914 sel->active_sinkpad = NULL;
916 sel->sync_streams = DEFAULT_SYNC_STREAMS;
918 sel->lock = g_mutex_new ();
919 sel->cond = g_cond_new ();
920 sel->blocked = FALSE;
924 gst_input_selector_dispose (GObject * object)
926 GstInputSelector *sel = GST_INPUT_SELECTOR (object);
928 if (sel->active_sinkpad) {
929 gst_object_unref (sel->active_sinkpad);
930 sel->active_sinkpad = NULL;
933 g_mutex_free (sel->lock);
937 g_cond_free (sel->cond);
941 G_OBJECT_CLASS (parent_class)->dispose (object);
944 /* this function must be called with the SELECTOR_LOCK. It returns TRUE when the
945 * active pad changed. */
947 gst_input_selector_set_active_pad (GstInputSelector * self, GstPad * pad)
949 GstSelectorPad *old, *new;
950 GstPad **active_pad_p;
952 if (pad == self->active_sinkpad)
955 old = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
956 new = GST_SELECTOR_PAD_CAST (pad);
958 GST_DEBUG_OBJECT (self, "setting active pad to %s:%s",
959 GST_DEBUG_PAD_NAME (new));
966 /* Send a new SEGMENT event on the new pad next */
967 if (old != new && new)
968 new->segment_pending = TRUE;
970 active_pad_p = &self->active_sinkpad;
971 gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
973 /* Wake up all non-active pads in sync mode, they might be
974 * the active pad now */
975 if (self->sync_streams)
976 GST_INPUT_SELECTOR_BROADCAST (self);
978 GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
979 self->active_sinkpad);
985 gst_input_selector_set_property (GObject * object, guint prop_id,
986 const GValue * value, GParamSpec * pspec)
988 GstInputSelector *sel = GST_INPUT_SELECTOR (object);
991 case PROP_ACTIVE_PAD:
995 pad = g_value_get_object (value);
997 GST_INPUT_SELECTOR_LOCK (sel);
998 gst_input_selector_set_active_pad (sel, pad);
999 GST_INPUT_SELECTOR_UNLOCK (sel);
1002 case PROP_SYNC_STREAMS:
1004 GST_INPUT_SELECTOR_LOCK (sel);
1005 sel->sync_streams = g_value_get_boolean (value);
1006 GST_INPUT_SELECTOR_UNLOCK (sel);
1010 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1016 gst_input_selector_get_property (GObject * object, guint prop_id,
1017 GValue * value, GParamSpec * pspec)
1019 GstInputSelector *sel = GST_INPUT_SELECTOR (object);
1023 GST_INPUT_SELECTOR_LOCK (object);
1024 g_value_set_uint (value, sel->n_pads);
1025 GST_INPUT_SELECTOR_UNLOCK (object);
1027 case PROP_ACTIVE_PAD:
1028 GST_INPUT_SELECTOR_LOCK (object);
1029 g_value_set_object (value, sel->active_sinkpad);
1030 GST_INPUT_SELECTOR_UNLOCK (object);
1032 case PROP_SYNC_STREAMS:
1033 GST_INPUT_SELECTOR_LOCK (object);
1034 g_value_set_boolean (value, sel->sync_streams);
1035 GST_INPUT_SELECTOR_UNLOCK (object);
1038 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1044 gst_input_selector_get_linked_pad (GstInputSelector * sel, GstPad * pad,
1047 GstPad *otherpad = NULL;
1049 GST_INPUT_SELECTOR_LOCK (sel);
1050 if (pad == sel->srcpad)
1051 otherpad = sel->active_sinkpad;
1052 else if (pad == sel->active_sinkpad || !strict)
1053 otherpad = sel->srcpad;
1055 gst_object_ref (otherpad);
1056 GST_INPUT_SELECTOR_UNLOCK (sel);
1062 gst_input_selector_event (GstPad * pad, GstEvent * event)
1064 GstInputSelector *sel;
1065 gboolean result = FALSE;
1067 gboolean done = FALSE;
1068 GValue item = { 0, };
1070 GList *pushed_pads = NULL;
1072 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1073 if (G_UNLIKELY (sel == NULL)) {
1074 gst_event_unref (event);
1078 /* Send upstream events to all sinkpads */
1079 iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (sel));
1081 /* This is now essentially a copy of gst_pad_event_default_dispatch
1082 * with a different iterator */
1084 switch (gst_iterator_next (iter, &item)) {
1085 case GST_ITERATOR_OK:
1086 eventpad = g_value_get_object (&item);
1088 /* if already pushed, skip */
1089 if (g_list_find (pushed_pads, eventpad)) {
1090 g_value_reset (&item);
1094 gst_event_ref (event);
1095 result |= gst_pad_push_event (eventpad, event);
1097 g_value_reset (&item);
1099 case GST_ITERATOR_RESYNC:
1100 /* We don't reset the result here because we don't push the event
1101 * again on pads that got the event already and because we need
1102 * to consider the result of the previous pushes */
1103 gst_iterator_resync (iter);
1105 case GST_ITERATOR_ERROR:
1106 GST_ERROR_OBJECT (pad, "Could not iterate over sinkpads");
1109 case GST_ITERATOR_DONE:
1114 g_value_unset (&item);
1115 gst_iterator_free (iter);
1117 g_list_free (pushed_pads);
1119 gst_event_unref (event);
1124 /* query on the srcpad. We override this function because by default it will
1125 * only forward the query to one random sinkpad */
1127 gst_input_selector_query (GstPad * pad, GstQuery * query)
1129 gboolean res = TRUE;
1130 GstInputSelector *sel;
1133 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1134 if (G_UNLIKELY (sel == NULL))
1137 otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
1139 switch (GST_QUERY_TYPE (query)) {
1140 case GST_QUERY_LATENCY:
1143 GstClockTime resmin, resmax;
1150 /* assume FALSE, we become TRUE if one query succeeds */
1153 /* perform the query on all sinkpads and combine the results. We take the
1154 * max of min and the min of max for the result latency. */
1155 GST_INPUT_SELECTOR_LOCK (sel);
1156 for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk;
1157 walk = g_list_next (walk)) {
1158 GstPad *sinkpad = GST_PAD_CAST (walk->data);
1160 if (gst_pad_peer_query (sinkpad, query)) {
1161 GstClockTime min, max;
1164 /* one query succeeded, we succeed too */
1167 gst_query_parse_latency (query, &live, &min, &max);
1169 GST_DEBUG_OBJECT (sinkpad,
1170 "peer latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1171 ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
1178 else if (max < resmax)
1180 if (reslive == FALSE)
1185 GST_INPUT_SELECTOR_UNLOCK (sel);
1187 gst_query_set_latency (query, reslive, resmin, resmax);
1189 GST_DEBUG_OBJECT (sel,
1190 "total latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1191 ", live %d", GST_TIME_ARGS (resmin), GST_TIME_ARGS (resmax),
1199 res = gst_pad_peer_query (otherpad, query);
1203 gst_object_unref (otherpad);
1204 gst_object_unref (sel);
1210 gst_input_selector_getcaps (GstPad * pad, GstCaps * filter)
1213 GstInputSelector *sel;
1216 sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1217 if (G_UNLIKELY (sel == NULL))
1218 return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1220 otherpad = gst_input_selector_get_linked_pad (sel, pad, FALSE);
1223 GST_DEBUG_OBJECT (pad, "Pad not linked, returning ANY");
1224 caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1226 GST_DEBUG_OBJECT (pad, "Pad is linked (to %s:%s), returning peer caps",
1227 GST_DEBUG_PAD_NAME (otherpad));
1228 /* if the peer has caps, use those. If the pad is not linked, this function
1229 * returns NULL and we return ANY */
1230 if (!(caps = gst_pad_peer_get_caps (otherpad, filter)))
1231 caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1232 gst_object_unref (otherpad);
1235 gst_object_unref (sel);
1239 /* check if the pad is the active sinkpad */
1240 static inline gboolean
1241 gst_input_selector_is_active_sinkpad (GstInputSelector * sel, GstPad * pad)
1245 GST_INPUT_SELECTOR_LOCK (sel);
1246 res = (pad == sel->active_sinkpad);
1247 GST_INPUT_SELECTOR_UNLOCK (sel);
1252 /* Get or create the active sinkpad, must be called with SELECTOR_LOCK */
1254 gst_input_selector_activate_sinkpad (GstInputSelector * sel, GstPad * pad)
1256 GstPad *active_sinkpad;
1257 GstSelectorPad *selpad;
1259 selpad = GST_SELECTOR_PAD_CAST (pad);
1261 selpad->active = TRUE;
1262 active_sinkpad = sel->active_sinkpad;
1263 if (active_sinkpad == NULL) {
1264 /* first pad we get activity on becomes the activated pad by default */
1265 if (sel->active_sinkpad)
1266 gst_object_unref (sel->active_sinkpad);
1267 active_sinkpad = sel->active_sinkpad = gst_object_ref (pad);
1268 GST_DEBUG_OBJECT (sel, "Activating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1271 return active_sinkpad;
1275 gst_input_selector_request_new_pad (GstElement * element,
1276 GstPadTemplate * templ, const gchar * unused, const GstCaps * caps)
1278 GstInputSelector *sel;
1280 GstPad *sinkpad = NULL;
1282 g_return_val_if_fail (templ->direction == GST_PAD_SINK, NULL);
1284 sel = GST_INPUT_SELECTOR (element);
1286 GST_INPUT_SELECTOR_LOCK (sel);
1288 GST_LOG_OBJECT (sel, "Creating new pad %d", sel->padcount);
1289 name = g_strdup_printf ("sink_%u", sel->padcount++);
1290 sinkpad = g_object_new (GST_TYPE_SELECTOR_PAD,
1291 "name", name, "direction", templ->direction, "template", templ, NULL);
1296 gst_pad_set_event_function (sinkpad,
1297 GST_DEBUG_FUNCPTR (gst_selector_pad_event));
1298 gst_pad_set_getcaps_function (sinkpad,
1299 GST_DEBUG_FUNCPTR (gst_selector_pad_getcaps));
1300 gst_pad_set_acceptcaps_function (sinkpad,
1301 GST_DEBUG_FUNCPTR (gst_selector_pad_acceptcaps));
1302 gst_pad_set_chain_function (sinkpad,
1303 GST_DEBUG_FUNCPTR (gst_selector_pad_chain));
1304 gst_pad_set_iterate_internal_links_function (sinkpad,
1305 GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
1307 gst_pad_set_active (sinkpad, TRUE);
1308 gst_element_add_pad (GST_ELEMENT (sel), sinkpad);
1309 GST_INPUT_SELECTOR_UNLOCK (sel);
1315 gst_input_selector_release_pad (GstElement * element, GstPad * pad)
1317 GstInputSelector *sel;
1319 sel = GST_INPUT_SELECTOR (element);
1320 GST_LOG_OBJECT (sel, "Releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1322 GST_INPUT_SELECTOR_LOCK (sel);
1323 /* if the pad was the active pad, makes us select a new one */
1324 if (sel->active_sinkpad == pad) {
1325 GST_DEBUG_OBJECT (sel, "Deactivating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1326 gst_object_unref (sel->active_sinkpad);
1327 sel->active_sinkpad = NULL;
1331 gst_pad_set_active (pad, FALSE);
1332 gst_element_remove_pad (GST_ELEMENT (sel), pad);
1333 GST_INPUT_SELECTOR_UNLOCK (sel);
1337 gst_input_selector_reset (GstInputSelector * sel)
1341 GST_INPUT_SELECTOR_LOCK (sel);
1342 /* clear active pad */
1343 if (sel->active_sinkpad) {
1344 gst_object_unref (sel->active_sinkpad);
1345 sel->active_sinkpad = NULL;
1347 /* reset each of our sinkpads state */
1348 for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
1349 GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data);
1351 gst_selector_pad_reset (selpad);
1354 gst_tag_list_free (selpad->tags);
1355 selpad->tags = NULL;
1358 GST_INPUT_SELECTOR_UNLOCK (sel);
1361 static GstStateChangeReturn
1362 gst_input_selector_change_state (GstElement * element,
1363 GstStateChange transition)
1365 GstInputSelector *self = GST_INPUT_SELECTOR (element);
1366 GstStateChangeReturn result;
1368 switch (transition) {
1369 case GST_STATE_CHANGE_READY_TO_PAUSED:
1370 GST_INPUT_SELECTOR_LOCK (self);
1371 self->blocked = FALSE;
1372 self->flushing = FALSE;
1373 GST_INPUT_SELECTOR_UNLOCK (self);
1375 case GST_STATE_CHANGE_PAUSED_TO_READY:
1376 /* first unlock before we call the parent state change function, which
1377 * tries to acquire the stream lock when going to ready. */
1378 GST_INPUT_SELECTOR_LOCK (self);
1379 self->blocked = FALSE;
1380 self->flushing = TRUE;
1381 GST_INPUT_SELECTOR_BROADCAST (self);
1382 GST_INPUT_SELECTOR_UNLOCK (self);
1388 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1390 switch (transition) {
1391 case GST_STATE_CHANGE_PAUSED_TO_READY:
1392 gst_input_selector_reset (self);
1402 gst_input_selector_block (GstInputSelector * self)
1405 GstSelectorPad *spad;
1407 GST_INPUT_SELECTOR_LOCK (self);
1410 GST_WARNING_OBJECT (self, "switch already blocked");
1412 self->blocked = TRUE;
1413 spad = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
1416 ret = gst_selector_pad_get_running_time (spad);
1418 GST_DEBUG_OBJECT (self, "no active pad while blocking");
1420 GST_INPUT_SELECTOR_UNLOCK (self);