inputselector: Always send a SEGMENT event when the active pad changes
[platform/upstream/gstreamer.git] / plugins / elements / gstinputselector.c
1 /* GStreamer input selector
2  * Copyright (C) 2003 Julien Moutte <julien@moutte.net>
3  * Copyright (C) 2005 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
4  * Copyright (C) 2005 Jan Schmidt <thaytan@mad.scientist.com>
5  * Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
6  * Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
7  * Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
8  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25
26 /**
27  * SECTION:element-input-selector
28  * @see_also: #GstOutputSelector
29  *
30  * Direct one out of N input streams to the output pad.
31  *
32  * The input pads are from a GstPad subclass and have additional
33  * properties, which users may find useful, namely:
34  *
35  * <itemizedlist>
36  * <listitem>
37  * "running-time": Running time of stream on pad (#gint64)
38  * </listitem>
39  * <listitem>
40  * "tags": The currently active tags on the pad (#GstTagList, boxed type)
41  * </listitem>
42  * <listitem>
43  * "active": If the pad is currently active (#gboolean)
44  * </listitem>
45  * <listitem>
46  * "always-ok" : Make an inactive pads return #GST_FLOW_OK instead of
47  * #GST_FLOW_NOT_LINKED
48  * </listitem>
49  * </itemizedlist>
50  *
51  * Since: 0.10.32
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include <string.h>
59
60 #include "gstinputselector.h"
61
62 GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
63 #define GST_CAT_DEFAULT input_selector_debug
64
65 #if GLIB_CHECK_VERSION(2, 26, 0)
66 #define NOTIFY_MUTEX_LOCK()
67 #define NOTIFY_MUTEX_UNLOCK()
68 #else
69 static GStaticRecMutex notify_mutex = G_STATIC_REC_MUTEX_INIT;
70 #define NOTIFY_MUTEX_LOCK() g_static_rec_mutex_lock (&notify_mutex)
71 #define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (&notify_mutex)
72 #endif
73
74 #define GST_INPUT_SELECTOR_GET_LOCK(sel) (((GstInputSelector*)(sel))->lock)
75 #define GST_INPUT_SELECTOR_GET_COND(sel) (((GstInputSelector*)(sel))->cond)
76 #define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
77 #define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
78 #define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
79                         GST_INPUT_SELECTOR_GET_LOCK(sel)))
80 #define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
81
82 static GstStaticPadTemplate gst_input_selector_sink_factory =
83 GST_STATIC_PAD_TEMPLATE ("sink%d",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS_ANY);
87
88 static GstStaticPadTemplate gst_input_selector_src_factory =
89 GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 enum
95 {
96   PROP_0,
97   PROP_N_PADS,
98   PROP_ACTIVE_PAD,
99   PROP_SYNC_STREAMS
100 };
101
102 #define DEFAULT_SYNC_STREAMS FALSE
103
104 #define DEFAULT_PAD_ALWAYS_OK TRUE
105
106 enum
107 {
108   PROP_PAD_0,
109   PROP_PAD_RUNNING_TIME,
110   PROP_PAD_TAGS,
111   PROP_PAD_ACTIVE,
112   PROP_PAD_ALWAYS_OK
113 };
114
115 enum
116 {
117   /* methods */
118   SIGNAL_BLOCK,
119   SIGNAL_SWITCH,
120   LAST_SIGNAL
121 };
122 static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 };
123
124 static inline gboolean gst_input_selector_is_active_sinkpad (GstInputSelector *
125     sel, GstPad * pad);
126 static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel,
127     GstPad * pad);
128 static GstPad *gst_input_selector_get_linked_pad (GstInputSelector * sel,
129     GstPad * pad, gboolean strict);
130
131 #define GST_TYPE_SELECTOR_PAD \
132   (gst_selector_pad_get_type())
133 #define GST_SELECTOR_PAD(obj) \
134   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SELECTOR_PAD, GstSelectorPad))
135 #define GST_SELECTOR_PAD_CLASS(klass) \
136   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SELECTOR_PAD, GstSelectorPadClass))
137 #define GST_IS_SELECTOR_PAD(obj) \
138   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SELECTOR_PAD))
139 #define GST_IS_SELECTOR_PAD_CLASS(klass) \
140   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SELECTOR_PAD))
141 #define GST_SELECTOR_PAD_CAST(obj) \
142   ((GstSelectorPad *)(obj))
143
144 typedef struct _GstSelectorPad GstSelectorPad;
145 typedef struct _GstSelectorPadClass GstSelectorPadClass;
146
147 struct _GstSelectorPad
148 {
149   GstPad parent;
150
151   gboolean active;              /* when buffer have passed the pad */
152   gboolean pushed;              /* when buffer was pushed downstream since activation */
153   gboolean eos;                 /* when EOS has been received */
154   gboolean eos_sent;            /* when EOS was sent downstream */
155   gboolean discont;             /* after switching we create a discont */
156   gboolean flushing;            /* set after flush-start and before flush-stop */
157   gboolean always_ok;
158   GstTagList *tags;             /* last tags received on the pad */
159
160   GstClockTime position;        /* the current position in the segment */
161   GstSegment segment;           /* the current segment on the pad */
162   guint32 segment_seqnum;       /* sequence number of the current segment */
163
164   gboolean segment_pending;
165 };
166
167 struct _GstSelectorPadClass
168 {
169   GstPadClass parent;
170 };
171
172 GType gst_selector_pad_get_type (void);
173 static void gst_selector_pad_finalize (GObject * object);
174 static void gst_selector_pad_get_property (GObject * object,
175     guint prop_id, GValue * value, GParamSpec * pspec);
176 static void gst_selector_pad_set_property (GObject * object,
177     guint prop_id, const GValue * value, GParamSpec * pspec);
178
179 static gint64 gst_selector_pad_get_running_time (GstSelectorPad * pad);
180 static void gst_selector_pad_reset (GstSelectorPad * pad);
181 static gboolean gst_selector_pad_event (GstPad * pad, GstEvent * event);
182 static GstCaps *gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter);
183 static gboolean gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps);
184 static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad);
185 static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstBuffer * buf);
186
187 G_DEFINE_TYPE (GstSelectorPad, gst_selector_pad, GST_TYPE_PAD);
188
189 static void
190 gst_selector_pad_class_init (GstSelectorPadClass * klass)
191 {
192   GObjectClass *gobject_class;
193
194   gobject_class = (GObjectClass *) klass;
195
196   gobject_class->finalize = gst_selector_pad_finalize;
197
198   gobject_class->get_property = gst_selector_pad_get_property;
199   gobject_class->set_property = gst_selector_pad_set_property;
200
201   g_object_class_install_property (gobject_class, PROP_PAD_RUNNING_TIME,
202       g_param_spec_int64 ("running-time", "Running time",
203           "Running time of stream on pad", 0, G_MAXINT64, 0,
204           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
205   g_object_class_install_property (gobject_class, PROP_PAD_TAGS,
206       g_param_spec_boxed ("tags", "Tags",
207           "The currently active tags on the pad", GST_TYPE_TAG_LIST,
208           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
209   g_object_class_install_property (gobject_class, PROP_PAD_ACTIVE,
210       g_param_spec_boolean ("active", "Active",
211           "If the pad is currently active", FALSE,
212           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
213   /* FIXME: better property name? */
214   g_object_class_install_property (gobject_class, PROP_PAD_ALWAYS_OK,
215       g_param_spec_boolean ("always-ok", "Always OK",
216           "Make an inactive pad return OK instead of NOT_LINKED",
217           DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 }
219
220 static void
221 gst_selector_pad_init (GstSelectorPad * pad)
222 {
223   pad->always_ok = DEFAULT_PAD_ALWAYS_OK;
224   gst_selector_pad_reset (pad);
225 }
226
227 static void
228 gst_selector_pad_finalize (GObject * object)
229 {
230   GstSelectorPad *pad;
231
232   pad = GST_SELECTOR_PAD_CAST (object);
233
234   if (pad->tags)
235     gst_tag_list_free (pad->tags);
236
237   G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object);
238 }
239
240 static void
241 gst_selector_pad_set_property (GObject * object, guint prop_id,
242     const GValue * value, GParamSpec * pspec)
243 {
244   GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
245
246   switch (prop_id) {
247     case PROP_PAD_ALWAYS_OK:
248       GST_OBJECT_LOCK (object);
249       spad->always_ok = g_value_get_boolean (value);
250       GST_OBJECT_UNLOCK (object);
251       break;
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255   }
256 }
257
258 static void
259 gst_selector_pad_get_property (GObject * object, guint prop_id,
260     GValue * value, GParamSpec * pspec)
261 {
262   GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
263
264   switch (prop_id) {
265     case PROP_PAD_RUNNING_TIME:
266       g_value_set_int64 (value, gst_selector_pad_get_running_time (spad));
267       break;
268     case PROP_PAD_TAGS:
269       GST_OBJECT_LOCK (object);
270       g_value_set_boxed (value, spad->tags);
271       GST_OBJECT_UNLOCK (object);
272       break;
273     case PROP_PAD_ACTIVE:
274     {
275       GstInputSelector *sel;
276
277       sel = GST_INPUT_SELECTOR (gst_pad_get_parent (spad));
278       g_value_set_boolean (value, gst_input_selector_is_active_sinkpad (sel,
279               GST_PAD_CAST (spad)));
280       gst_object_unref (sel);
281       break;
282     }
283     case PROP_PAD_ALWAYS_OK:
284       GST_OBJECT_LOCK (object);
285       g_value_set_boolean (value, spad->always_ok);
286       GST_OBJECT_UNLOCK (object);
287       break;
288     default:
289       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
290       break;
291   }
292 }
293
294 static gint64
295 gst_selector_pad_get_running_time (GstSelectorPad * pad)
296 {
297   gint64 ret = 0;
298
299   GST_OBJECT_LOCK (pad);
300   if (pad->active) {
301     guint64 position = pad->position;
302     GstFormat format = pad->segment.format;
303
304     ret = gst_segment_to_running_time (&pad->segment, format, position);
305   }
306   GST_OBJECT_UNLOCK (pad);
307
308   GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT,
309       GST_TIME_ARGS (ret));
310
311   return ret;
312 }
313
314 static void
315 gst_selector_pad_reset (GstSelectorPad * pad)
316 {
317   GST_OBJECT_LOCK (pad);
318   pad->active = FALSE;
319   pad->pushed = FALSE;
320   pad->eos = FALSE;
321   pad->eos_sent = FALSE;
322   pad->segment_pending = FALSE;
323   pad->discont = FALSE;
324   pad->flushing = FALSE;
325   pad->position = GST_CLOCK_TIME_NONE;
326   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
327   GST_OBJECT_UNLOCK (pad);
328 }
329
330 /* strictly get the linked pad from the sinkpad. If the pad is active we return
331  * the srcpad else we return NULL */
332 static GstIterator *
333 gst_selector_pad_iterate_linked_pads (GstPad * pad)
334 {
335   GstInputSelector *sel;
336   GstPad *otherpad;
337   GstIterator *it;
338   GValue val = { 0, };
339
340   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
341   if (G_UNLIKELY (sel == NULL))
342     return NULL;
343
344   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
345   g_value_init (&val, GST_TYPE_PAD);
346   g_value_set_object (&val, otherpad);
347   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
348   g_value_unset (&val);
349
350   if (otherpad)
351     gst_object_unref (otherpad);
352   gst_object_unref (sel);
353
354   return it;
355 }
356
357 static gboolean
358 gst_selector_pad_event (GstPad * pad, GstEvent * event)
359 {
360   gboolean res = TRUE;
361   gboolean forward;
362   GstInputSelector *sel;
363   GstSelectorPad *selpad;
364   GstPad *prev_active_sinkpad;
365   GstPad *active_sinkpad;
366
367   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
368   if (G_UNLIKELY (sel == NULL)) {
369     gst_event_unref (event);
370     return FALSE;
371   }
372   selpad = GST_SELECTOR_PAD_CAST (pad);
373
374   GST_INPUT_SELECTOR_LOCK (sel);
375   prev_active_sinkpad = sel->active_sinkpad;
376   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
377
378   /* only forward if we are dealing with the active sinkpad */
379   forward = (pad == active_sinkpad);
380   GST_INPUT_SELECTOR_UNLOCK (sel);
381
382   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
383     NOTIFY_MUTEX_LOCK ();
384     g_object_notify (G_OBJECT (sel), "active-pad");
385     NOTIFY_MUTEX_UNLOCK ();
386   }
387
388   switch (GST_EVENT_TYPE (event)) {
389     case GST_EVENT_FLUSH_START:
390       /* Unblock the pad if it's waiting */
391       GST_INPUT_SELECTOR_LOCK (sel);
392       selpad->flushing = TRUE;
393       GST_INPUT_SELECTOR_BROADCAST (sel);
394       GST_INPUT_SELECTOR_UNLOCK (sel);
395       break;
396     case GST_EVENT_FLUSH_STOP:
397       GST_INPUT_SELECTOR_LOCK (sel);
398       gst_selector_pad_reset (selpad);
399       GST_INPUT_SELECTOR_UNLOCK (sel);
400       break;
401     case GST_EVENT_SEGMENT:
402     {
403       GST_INPUT_SELECTOR_LOCK (sel);
404       GST_OBJECT_LOCK (selpad);
405       gst_event_copy_segment (event, &selpad->segment);
406       selpad->segment_seqnum = gst_event_get_seqnum (event);
407
408       /* Update the position */
409       if (selpad->position == GST_CLOCK_TIME_NONE
410           || selpad->segment.position > selpad->position) {
411         selpad->position = selpad->segment.position;
412       } else if (selpad->position != GST_CLOCK_TIME_NONE
413           && selpad->position > selpad->segment.position) {
414         selpad->segment.position = selpad->position;
415
416         if (forward) {
417           gst_event_unref (event);
418           event = gst_event_new_segment (&selpad->segment);
419           gst_event_set_seqnum (event, selpad->segment_seqnum);
420         }
421       }
422       GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT,
423           &selpad->segment);
424
425       /* If we aren't forwarding the event because the pad is not the
426        * active_sinkpad, then set the flag on the pad
427        * that says a segment needs sending if/when that pad is activated.
428        * For all other cases, we send the event immediately, which makes
429        * sparse streams and other segment updates work correctly downstream.
430        */
431       if (!forward)
432         selpad->segment_pending = TRUE;
433
434       GST_OBJECT_UNLOCK (selpad);
435       GST_INPUT_SELECTOR_UNLOCK (sel);
436       break;
437     }
438     case GST_EVENT_TAG:
439     {
440       GstTagList *tags, *oldtags, *newtags;
441
442       gst_event_parse_tag (event, &tags);
443
444       GST_OBJECT_LOCK (selpad);
445       oldtags = selpad->tags;
446
447       newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE);
448       selpad->tags = newtags;
449       if (oldtags)
450         gst_tag_list_free (oldtags);
451       GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags);
452       GST_OBJECT_UNLOCK (selpad);
453
454       g_object_notify (G_OBJECT (selpad), "tags");
455       break;
456     }
457     case GST_EVENT_EOS:
458       selpad->eos = TRUE;
459
460       if (forward) {
461         selpad->eos_sent = TRUE;
462       } else {
463         GstSelectorPad *tmp;
464
465         /* If the active sinkpad is in EOS state but EOS
466          * was not sent downstream this means that the pad
467          * got EOS before it was set as active pad and that
468          * the previously active pad got EOS after it was
469          * active
470          */
471         GST_INPUT_SELECTOR_LOCK (sel);
472         active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
473         tmp = GST_SELECTOR_PAD (active_sinkpad);
474         forward = (tmp->eos && !tmp->eos_sent);
475         tmp->eos_sent = TRUE;
476         GST_INPUT_SELECTOR_UNLOCK (sel);
477       }
478       GST_DEBUG_OBJECT (pad, "received EOS");
479       break;
480     default:
481       break;
482   }
483   if (forward) {
484     GST_DEBUG_OBJECT (pad, "forwarding event");
485     res = gst_pad_push_event (sel->srcpad, event);
486   } else
487     gst_event_unref (event);
488
489   gst_object_unref (sel);
490
491   return res;
492 }
493
494 static GstCaps *
495 gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter)
496 {
497   GstInputSelector *sel;
498   GstCaps *caps;
499
500   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
501   if (G_UNLIKELY (sel == NULL))
502     return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
503
504   GST_DEBUG_OBJECT (sel, "Getting caps of srcpad peer");
505   caps = gst_pad_peer_get_caps (sel->srcpad, filter);
506   if (caps == NULL)
507     caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
508
509   gst_object_unref (sel);
510
511   return caps;
512 }
513
514 static gboolean
515 gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps)
516 {
517   GstInputSelector *sel;
518   gboolean res;
519
520   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
521   if (G_UNLIKELY (sel == NULL))
522     return FALSE;
523
524   GST_DEBUG_OBJECT (sel, "Checking acceptcaps of srcpad peer");
525   res = gst_pad_peer_accept_caps (sel->srcpad, caps);
526   gst_object_unref (sel);
527
528   return res;
529 }
530
531 /* must be called with the SELECTOR_LOCK, will block while the pad is blocked 
532  * or return TRUE when flushing */
533 static gboolean
534 gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
535 {
536   while (self->blocked && !self->flushing && !pad->flushing) {
537     /* we can be unlocked here when we are shutting down (flushing) or when we
538      * get unblocked */
539     GST_INPUT_SELECTOR_WAIT (self);
540   }
541   return self->flushing;
542 }
543
544 /* must be called with the SELECTOR_LOCK, will block until the running time
545  * of the active pad is after this pad or return TRUE when flushing */
546 static gboolean
547 gst_input_selector_wait_running_time (GstInputSelector * sel,
548     GstSelectorPad * pad, GstBuffer * buf)
549 {
550   GstPad *active_sinkpad;
551   GstSelectorPad *active_selpad;
552   GstSegment *seg, *active_seg;
553   GstClockTime running_time, active_running_time = -1;
554
555   seg = &pad->segment;
556
557   active_sinkpad =
558       gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
559   active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
560   active_seg = &active_selpad->segment;
561
562   /* We can only sync if the segments are in time format or
563    * if the active pad had no newsegment event yet */
564   if (seg->format != GST_FORMAT_TIME ||
565       (active_seg->format != GST_FORMAT_TIME
566           && active_seg->format != GST_FORMAT_UNDEFINED))
567     return FALSE;
568
569   /* If we have no valid timestamp we can't sync this buffer */
570   if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf))
571     return FALSE;
572
573   running_time = GST_BUFFER_TIMESTAMP (buf);
574   /* If possible try to get the running time at the end of the buffer */
575   if (GST_BUFFER_DURATION_IS_VALID (buf))
576     running_time += GST_BUFFER_DURATION (buf);
577   if (running_time > seg->stop)
578     running_time = seg->stop;
579   running_time =
580       gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
581   /* If this is outside the segment don't sync */
582   if (running_time == -1)
583     return FALSE;
584
585   /* Get active pad's running time, if no configured segment yet keep at -1 */
586   if (active_seg->format == GST_FORMAT_TIME)
587     active_running_time =
588         gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
589         active_selpad->position);
590
591   /* Wait until
592    *   a) this is the active pad
593    *   b) the pad or the selector is flushing
594    *   c) the selector is not blocked
595    *   d) the active pad has no running time or the active
596    *      pad's running time is before this running time
597    *   e) the active pad has a non-time segment
598    */
599   while (pad != active_selpad && !sel->flushing && !pad->flushing &&
600       (sel->blocked || active_running_time == -1
601           || running_time >= active_running_time)) {
602     if (!sel->blocked)
603       GST_DEBUG_OBJECT (pad,
604           "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
605           GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
606           GST_TIME_ARGS (active_running_time));
607
608     GST_INPUT_SELECTOR_WAIT (sel);
609
610     /* Get new active pad, it might have changed */
611     active_sinkpad =
612         gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
613     active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
614     active_seg = &active_selpad->segment;
615
616     /* If the active segment is configured but not to time format
617      * we can't do any syncing at all */
618     if (active_seg->format != GST_FORMAT_TIME
619         && active_seg->format != GST_FORMAT_UNDEFINED)
620       break;
621
622     /* Get the new active pad running time */
623     if (active_seg->format == GST_FORMAT_TIME)
624       active_running_time =
625           gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
626           active_selpad->position);
627     else
628       active_running_time = -1;
629
630     if (!sel->blocked)
631       GST_DEBUG_OBJECT (pad,
632           "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %"
633           GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
634           GST_TIME_ARGS (active_running_time));
635
636   }
637
638   /* Return TRUE if the selector or the pad is flushing */
639   return (sel->flushing || pad->flushing);
640 }
641
642
643 static GstFlowReturn
644 gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
645 {
646   GstInputSelector *sel;
647   GstFlowReturn res;
648   GstPad *active_sinkpad;
649   GstPad *prev_active_sinkpad;
650   GstSelectorPad *selpad;
651   GstClockTime start_time;
652   GstSegment *seg;
653   GstEvent *start_event = NULL;
654
655   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
656   selpad = GST_SELECTOR_PAD_CAST (pad);
657   seg = &selpad->segment;
658
659   GST_INPUT_SELECTOR_LOCK (sel);
660   /* wait or check for flushing */
661   if (gst_input_selector_wait (sel, selpad))
662     goto flushing;
663
664   GST_LOG_OBJECT (pad, "getting active pad");
665
666   prev_active_sinkpad = sel->active_sinkpad;
667   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
668
669   /* In sync mode wait until the active pad has advanced
670    * after the running time of the current buffer */
671   if (sel->sync_streams && active_sinkpad != pad) {
672     if (gst_input_selector_wait_running_time (sel, selpad, buf))
673       goto flushing;
674   }
675
676   /* Might have changed while waiting */
677   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
678
679   /* update the segment on the srcpad */
680   start_time = GST_BUFFER_TIMESTAMP (buf);
681   if (GST_CLOCK_TIME_IS_VALID (start_time)) {
682     GST_LOG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
683         GST_TIME_ARGS (start_time));
684     if (GST_BUFFER_DURATION_IS_VALID (buf))
685       GST_LOG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
686           GST_TIME_ARGS (start_time + GST_BUFFER_DURATION (buf)));
687
688     GST_OBJECT_LOCK (pad);
689     selpad->position = start_time;
690     GST_OBJECT_UNLOCK (pad);
691   }
692
693   /* Ignore buffers from pads except the selected one */
694   if (pad != active_sinkpad)
695     goto ignore;
696
697   /* Tell all non-active pads that we advanced the running time */
698   if (sel->sync_streams)
699     GST_INPUT_SELECTOR_BROADCAST (sel);
700
701   /* if we have a pending segment, push it out now */
702   if (G_UNLIKELY (prev_active_sinkpad != active_sinkpad
703           || selpad->segment_pending)) {
704     GST_DEBUG_OBJECT (pad,
705         "pushing pending NEWSEGMENT update %d, rate %lf, applied rate %lf, "
706         "format %d, " "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
707         G_GINT64_FORMAT, FALSE, seg->rate, seg->applied_rate, seg->format,
708         seg->start, seg->stop, seg->time);
709
710     start_event = gst_event_new_segment (seg);
711     gst_event_set_seqnum (start_event, selpad->segment_seqnum);
712
713     selpad->segment_pending = FALSE;
714   }
715   GST_INPUT_SELECTOR_UNLOCK (sel);
716
717   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
718     NOTIFY_MUTEX_LOCK ();
719     g_object_notify (G_OBJECT (sel), "active-pad");
720     NOTIFY_MUTEX_UNLOCK ();
721   }
722
723   if (start_event)
724     gst_pad_push_event (sel->srcpad, start_event);
725
726   if (selpad->discont) {
727     buf = gst_buffer_make_writable (buf);
728
729     GST_DEBUG_OBJECT (pad, "Marking discont buffer %p", buf);
730     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
731     selpad->discont = FALSE;
732   }
733
734   /* forward */
735   GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf);
736
737   res = gst_pad_push (sel->srcpad, buf);
738   selpad->pushed = TRUE;
739
740 done:
741   gst_object_unref (sel);
742   return res;
743
744   /* dropped buffers */
745 ignore:
746   {
747     gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
748
749     GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
750     /* when we drop a buffer, we're creating a discont on this pad */
751     selpad->discont = TRUE;
752     GST_INPUT_SELECTOR_UNLOCK (sel);
753     gst_buffer_unref (buf);
754
755     /* figure out what to return upstream */
756     GST_OBJECT_LOCK (selpad);
757     if (selpad->always_ok || !active_pad_pushed)
758       res = GST_FLOW_OK;
759     else
760       res = GST_FLOW_NOT_LINKED;
761     GST_OBJECT_UNLOCK (selpad);
762
763     goto done;
764   }
765 flushing:
766   {
767     GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf);
768     GST_INPUT_SELECTOR_UNLOCK (sel);
769     gst_buffer_unref (buf);
770     res = GST_FLOW_WRONG_STATE;
771     goto done;
772   }
773 }
774
775 static void gst_input_selector_dispose (GObject * object);
776
777 static void gst_input_selector_set_property (GObject * object,
778     guint prop_id, const GValue * value, GParamSpec * pspec);
779 static void gst_input_selector_get_property (GObject * object,
780     guint prop_id, GValue * value, GParamSpec * pspec);
781
782 static GstPad *gst_input_selector_request_new_pad (GstElement * element,
783     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps);
784 static void gst_input_selector_release_pad (GstElement * element, GstPad * pad);
785
786 static GstStateChangeReturn gst_input_selector_change_state (GstElement *
787     element, GstStateChange transition);
788
789 static GstCaps *gst_input_selector_getcaps (GstPad * pad, GstCaps * filter);
790 static gboolean gst_input_selector_event (GstPad * pad, GstEvent * event);
791 static gboolean gst_input_selector_query (GstPad * pad, GstQuery * query);
792 static gint64 gst_input_selector_block (GstInputSelector * self);
793
794 /* FIXME: create these marshallers using glib-genmarshal */
795 static void
796 gst_input_selector_marshal_INT64__VOID (GClosure * closure,
797     GValue * return_value G_GNUC_UNUSED,
798     guint n_param_values,
799     const GValue * param_values,
800     gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
801 {
802   typedef gint64 (*GMarshalFunc_INT64__VOID) (gpointer data1, gpointer data2);
803   register GMarshalFunc_INT64__VOID callback;
804   register GCClosure *cc = (GCClosure *) closure;
805   register gpointer data1, data2;
806   gint64 v_return;
807
808   g_return_if_fail (return_value != NULL);
809   g_return_if_fail (n_param_values == 1);
810
811   if (G_CCLOSURE_SWAP_DATA (closure)) {
812     data1 = closure->data;
813     data2 = g_value_peek_pointer (param_values + 0);
814   } else {
815     data1 = g_value_peek_pointer (param_values + 0);
816     data2 = closure->data;
817   }
818   callback =
819       (GMarshalFunc_INT64__VOID) (marshal_data ? marshal_data : cc->callback);
820
821   v_return = callback (data1, data2);
822
823   g_value_set_int64 (return_value, v_return);
824 }
825
826 #define _do_init \
827     GST_DEBUG_CATEGORY_INIT (input_selector_debug, \
828         "input-selector", 0, "An input stream selector element");
829 #define gst_input_selector_parent_class parent_class
830 G_DEFINE_TYPE_WITH_CODE (GstInputSelector, gst_input_selector, GST_TYPE_ELEMENT,
831     _do_init);
832
833 static void
834 gst_input_selector_class_init (GstInputSelectorClass * klass)
835 {
836   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
837   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
838
839   gobject_class->dispose = gst_input_selector_dispose;
840
841   gobject_class->set_property = gst_input_selector_set_property;
842   gobject_class->get_property = gst_input_selector_get_property;
843
844   g_object_class_install_property (gobject_class, PROP_N_PADS,
845       g_param_spec_uint ("n-pads", "Number of Pads",
846           "The number of sink pads", 0, G_MAXUINT, 0,
847           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
848
849   g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
850       g_param_spec_object ("active-pad", "Active pad",
851           "The currently active sink pad", GST_TYPE_PAD,
852           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
853
854   /**
855    * GstInputSelector:sync-streams
856    *
857    * If set to %TRUE all inactive streams will be synced to the
858    * running time of the active stream. This makes sure that no
859    * buffers are dropped by input-selector that might be needed
860    * when switching the active pad.
861    *
862    * Since: 0.10.35
863    */
864   g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
865       g_param_spec_boolean ("sync-streams", "Sync Streams",
866           "Synchronize inactive streams to the running time of the active stream",
867           DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
868
869   /**
870    * GstInputSelector::block:
871    * @inputselector: the #GstInputSelector
872    *
873    * Block all sink pads in preparation for a switch. Returns the stop time of
874    * the current switch segment, as a running time, or 0 if there is no current
875    * active pad or the current active pad never received data.
876    */
877   gst_input_selector_signals[SIGNAL_BLOCK] =
878       g_signal_new ("block", G_TYPE_FROM_CLASS (klass),
879       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
880       G_STRUCT_OFFSET (GstInputSelectorClass, block), NULL, NULL,
881       gst_input_selector_marshal_INT64__VOID, G_TYPE_INT64, 0);
882
883   gst_element_class_set_details_simple (gstelement_class, "Input selector",
884       "Generic", "N-to-1 input stream selector",
885       "Julien Moutte <julien@moutte.net>, "
886       "Jan Schmidt <thaytan@mad.scientist.com>, "
887       "Wim Taymans <wim.taymans@gmail.com>");
888   gst_element_class_add_pad_template (gstelement_class,
889       gst_static_pad_template_get (&gst_input_selector_sink_factory));
890   gst_element_class_add_pad_template (gstelement_class,
891       gst_static_pad_template_get (&gst_input_selector_src_factory));
892
893   gstelement_class->request_new_pad = gst_input_selector_request_new_pad;
894   gstelement_class->release_pad = gst_input_selector_release_pad;
895   gstelement_class->change_state = gst_input_selector_change_state;
896
897   klass->block = GST_DEBUG_FUNCPTR (gst_input_selector_block);
898 }
899
900 static void
901 gst_input_selector_init (GstInputSelector * sel)
902 {
903   sel->srcpad = gst_pad_new ("src", GST_PAD_SRC);
904   gst_pad_set_iterate_internal_links_function (sel->srcpad,
905       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
906   gst_pad_set_getcaps_function (sel->srcpad,
907       GST_DEBUG_FUNCPTR (gst_input_selector_getcaps));
908   gst_pad_set_query_function (sel->srcpad,
909       GST_DEBUG_FUNCPTR (gst_input_selector_query));
910   gst_pad_set_event_function (sel->srcpad,
911       GST_DEBUG_FUNCPTR (gst_input_selector_event));
912   gst_element_add_pad (GST_ELEMENT (sel), sel->srcpad);
913   /* sinkpad management */
914   sel->active_sinkpad = NULL;
915   sel->padcount = 0;
916   sel->sync_streams = DEFAULT_SYNC_STREAMS;
917
918   sel->lock = g_mutex_new ();
919   sel->cond = g_cond_new ();
920   sel->blocked = FALSE;
921 }
922
923 static void
924 gst_input_selector_dispose (GObject * object)
925 {
926   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
927
928   if (sel->active_sinkpad) {
929     gst_object_unref (sel->active_sinkpad);
930     sel->active_sinkpad = NULL;
931   }
932   if (sel->lock) {
933     g_mutex_free (sel->lock);
934     sel->lock = NULL;
935   }
936   if (sel->cond) {
937     g_cond_free (sel->cond);
938     sel->cond = NULL;
939   }
940
941   G_OBJECT_CLASS (parent_class)->dispose (object);
942 }
943
944 /* this function must be called with the SELECTOR_LOCK. It returns TRUE when the
945  * active pad changed. */
946 static gboolean
947 gst_input_selector_set_active_pad (GstInputSelector * self, GstPad * pad)
948 {
949   GstSelectorPad *old, *new;
950   GstPad **active_pad_p;
951
952   if (pad == self->active_sinkpad)
953     return FALSE;
954
955   old = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
956   new = GST_SELECTOR_PAD_CAST (pad);
957
958   GST_DEBUG_OBJECT (self, "setting active pad to %s:%s",
959       GST_DEBUG_PAD_NAME (new));
960
961   if (old)
962     old->pushed = FALSE;
963   if (new)
964     new->pushed = FALSE;
965
966   /* Send a new SEGMENT event on the new pad next */
967   if (old != new && new)
968     new->segment_pending = TRUE;
969
970   active_pad_p = &self->active_sinkpad;
971   gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
972
973   /* Wake up all non-active pads in sync mode, they might be
974    * the active pad now */
975   if (self->sync_streams)
976     GST_INPUT_SELECTOR_BROADCAST (self);
977
978   GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
979       self->active_sinkpad);
980
981   return TRUE;
982 }
983
984 static void
985 gst_input_selector_set_property (GObject * object, guint prop_id,
986     const GValue * value, GParamSpec * pspec)
987 {
988   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
989
990   switch (prop_id) {
991     case PROP_ACTIVE_PAD:
992     {
993       GstPad *pad;
994
995       pad = g_value_get_object (value);
996
997       GST_INPUT_SELECTOR_LOCK (sel);
998       gst_input_selector_set_active_pad (sel, pad);
999       GST_INPUT_SELECTOR_UNLOCK (sel);
1000       break;
1001     }
1002     case PROP_SYNC_STREAMS:
1003     {
1004       GST_INPUT_SELECTOR_LOCK (sel);
1005       sel->sync_streams = g_value_get_boolean (value);
1006       GST_INPUT_SELECTOR_UNLOCK (sel);
1007       break;
1008     }
1009     default:
1010       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1011       break;
1012   }
1013 }
1014
1015 static void
1016 gst_input_selector_get_property (GObject * object, guint prop_id,
1017     GValue * value, GParamSpec * pspec)
1018 {
1019   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
1020
1021   switch (prop_id) {
1022     case PROP_N_PADS:
1023       GST_INPUT_SELECTOR_LOCK (object);
1024       g_value_set_uint (value, sel->n_pads);
1025       GST_INPUT_SELECTOR_UNLOCK (object);
1026       break;
1027     case PROP_ACTIVE_PAD:
1028       GST_INPUT_SELECTOR_LOCK (object);
1029       g_value_set_object (value, sel->active_sinkpad);
1030       GST_INPUT_SELECTOR_UNLOCK (object);
1031       break;
1032     case PROP_SYNC_STREAMS:
1033       GST_INPUT_SELECTOR_LOCK (object);
1034       g_value_set_boolean (value, sel->sync_streams);
1035       GST_INPUT_SELECTOR_UNLOCK (object);
1036       break;
1037     default:
1038       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1039       break;
1040   }
1041 }
1042
1043 static GstPad *
1044 gst_input_selector_get_linked_pad (GstInputSelector * sel, GstPad * pad,
1045     gboolean strict)
1046 {
1047   GstPad *otherpad = NULL;
1048
1049   GST_INPUT_SELECTOR_LOCK (sel);
1050   if (pad == sel->srcpad)
1051     otherpad = sel->active_sinkpad;
1052   else if (pad == sel->active_sinkpad || !strict)
1053     otherpad = sel->srcpad;
1054   if (otherpad)
1055     gst_object_ref (otherpad);
1056   GST_INPUT_SELECTOR_UNLOCK (sel);
1057
1058   return otherpad;
1059 }
1060
1061 static gboolean
1062 gst_input_selector_event (GstPad * pad, GstEvent * event)
1063 {
1064   GstInputSelector *sel;
1065   gboolean result = FALSE;
1066   GstIterator *iter;
1067   gboolean done = FALSE;
1068   GValue item = { 0, };
1069   GstPad *eventpad;
1070   GList *pushed_pads = NULL;
1071
1072   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1073   if (G_UNLIKELY (sel == NULL)) {
1074     gst_event_unref (event);
1075     return FALSE;
1076   }
1077
1078   /* Send upstream events to all sinkpads */
1079   iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (sel));
1080
1081   /* This is now essentially a copy of gst_pad_event_default_dispatch
1082    * with a different iterator */
1083   while (!done) {
1084     switch (gst_iterator_next (iter, &item)) {
1085       case GST_ITERATOR_OK:
1086         eventpad = g_value_get_object (&item);
1087
1088         /* if already pushed,  skip */
1089         if (g_list_find (pushed_pads, eventpad)) {
1090           g_value_reset (&item);
1091           break;
1092         }
1093
1094         gst_event_ref (event);
1095         result |= gst_pad_push_event (eventpad, event);
1096
1097         g_value_reset (&item);
1098         break;
1099       case GST_ITERATOR_RESYNC:
1100         /* We don't reset the result here because we don't push the event
1101          * again on pads that got the event already and because we need
1102          * to consider the result of the previous pushes */
1103         gst_iterator_resync (iter);
1104         break;
1105       case GST_ITERATOR_ERROR:
1106         GST_ERROR_OBJECT (pad, "Could not iterate over sinkpads");
1107         done = TRUE;
1108         break;
1109       case GST_ITERATOR_DONE:
1110         done = TRUE;
1111         break;
1112     }
1113   }
1114   g_value_unset (&item);
1115   gst_iterator_free (iter);
1116
1117   g_list_free (pushed_pads);
1118
1119   gst_event_unref (event);
1120
1121   return result;
1122 }
1123
1124 /* query on the srcpad. We override this function because by default it will
1125  * only forward the query to one random sinkpad */
1126 static gboolean
1127 gst_input_selector_query (GstPad * pad, GstQuery * query)
1128 {
1129   gboolean res = TRUE;
1130   GstInputSelector *sel;
1131   GstPad *otherpad;
1132
1133   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1134   if (G_UNLIKELY (sel == NULL))
1135     return FALSE;
1136
1137   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
1138
1139   switch (GST_QUERY_TYPE (query)) {
1140     case GST_QUERY_LATENCY:
1141     {
1142       GList *walk;
1143       GstClockTime resmin, resmax;
1144       gboolean reslive;
1145
1146       resmin = 0;
1147       resmax = -1;
1148       reslive = FALSE;
1149
1150       /* assume FALSE, we become TRUE if one query succeeds */
1151       res = FALSE;
1152
1153       /* perform the query on all sinkpads and combine the results. We take the
1154        * max of min and the min of max for the result latency. */
1155       GST_INPUT_SELECTOR_LOCK (sel);
1156       for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk;
1157           walk = g_list_next (walk)) {
1158         GstPad *sinkpad = GST_PAD_CAST (walk->data);
1159
1160         if (gst_pad_peer_query (sinkpad, query)) {
1161           GstClockTime min, max;
1162           gboolean live;
1163
1164           /* one query succeeded, we succeed too */
1165           res = TRUE;
1166
1167           gst_query_parse_latency (query, &live, &min, &max);
1168
1169           GST_DEBUG_OBJECT (sinkpad,
1170               "peer latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1171               ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
1172
1173           if (live) {
1174             if (min > resmin)
1175               resmin = min;
1176             if (resmax == -1)
1177               resmax = max;
1178             else if (max < resmax)
1179               resmax = max;
1180             if (reslive == FALSE)
1181               reslive = live;
1182           }
1183         }
1184       }
1185       GST_INPUT_SELECTOR_UNLOCK (sel);
1186       if (res) {
1187         gst_query_set_latency (query, reslive, resmin, resmax);
1188
1189         GST_DEBUG_OBJECT (sel,
1190             "total latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1191             ", live %d", GST_TIME_ARGS (resmin), GST_TIME_ARGS (resmax),
1192             reslive);
1193       }
1194
1195       break;
1196     }
1197     default:
1198       if (otherpad)
1199         res = gst_pad_peer_query (otherpad, query);
1200       break;
1201   }
1202   if (otherpad)
1203     gst_object_unref (otherpad);
1204   gst_object_unref (sel);
1205
1206   return res;
1207 }
1208
1209 static GstCaps *
1210 gst_input_selector_getcaps (GstPad * pad, GstCaps * filter)
1211 {
1212   GstPad *otherpad;
1213   GstInputSelector *sel;
1214   GstCaps *caps;
1215
1216   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1217   if (G_UNLIKELY (sel == NULL))
1218     return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1219
1220   otherpad = gst_input_selector_get_linked_pad (sel, pad, FALSE);
1221
1222   if (!otherpad) {
1223     GST_DEBUG_OBJECT (pad, "Pad not linked, returning ANY");
1224     caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1225   } else {
1226     GST_DEBUG_OBJECT (pad, "Pad is linked (to %s:%s), returning peer caps",
1227         GST_DEBUG_PAD_NAME (otherpad));
1228     /* if the peer has caps, use those. If the pad is not linked, this function
1229      * returns NULL and we return ANY */
1230     if (!(caps = gst_pad_peer_get_caps (otherpad, filter)))
1231       caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1232     gst_object_unref (otherpad);
1233   }
1234
1235   gst_object_unref (sel);
1236   return caps;
1237 }
1238
1239 /* check if the pad is the active sinkpad */
1240 static inline gboolean
1241 gst_input_selector_is_active_sinkpad (GstInputSelector * sel, GstPad * pad)
1242 {
1243   gboolean res;
1244
1245   GST_INPUT_SELECTOR_LOCK (sel);
1246   res = (pad == sel->active_sinkpad);
1247   GST_INPUT_SELECTOR_UNLOCK (sel);
1248
1249   return res;
1250 }
1251
1252 /* Get or create the active sinkpad, must be called with SELECTOR_LOCK */
1253 static GstPad *
1254 gst_input_selector_activate_sinkpad (GstInputSelector * sel, GstPad * pad)
1255 {
1256   GstPad *active_sinkpad;
1257   GstSelectorPad *selpad;
1258
1259   selpad = GST_SELECTOR_PAD_CAST (pad);
1260
1261   selpad->active = TRUE;
1262   active_sinkpad = sel->active_sinkpad;
1263   if (active_sinkpad == NULL) {
1264     /* first pad we get activity on becomes the activated pad by default */
1265     if (sel->active_sinkpad)
1266       gst_object_unref (sel->active_sinkpad);
1267     active_sinkpad = sel->active_sinkpad = gst_object_ref (pad);
1268     GST_DEBUG_OBJECT (sel, "Activating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1269   }
1270
1271   return active_sinkpad;
1272 }
1273
1274 static GstPad *
1275 gst_input_selector_request_new_pad (GstElement * element,
1276     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps)
1277 {
1278   GstInputSelector *sel;
1279   gchar *name = NULL;
1280   GstPad *sinkpad = NULL;
1281
1282   g_return_val_if_fail (templ->direction == GST_PAD_SINK, NULL);
1283
1284   sel = GST_INPUT_SELECTOR (element);
1285
1286   GST_INPUT_SELECTOR_LOCK (sel);
1287
1288   GST_LOG_OBJECT (sel, "Creating new pad %d", sel->padcount);
1289   name = g_strdup_printf ("sink%d", sel->padcount++);
1290   sinkpad = g_object_new (GST_TYPE_SELECTOR_PAD,
1291       "name", name, "direction", templ->direction, "template", templ, NULL);
1292   g_free (name);
1293
1294   sel->n_pads++;
1295
1296   gst_pad_set_event_function (sinkpad,
1297       GST_DEBUG_FUNCPTR (gst_selector_pad_event));
1298   gst_pad_set_getcaps_function (sinkpad,
1299       GST_DEBUG_FUNCPTR (gst_selector_pad_getcaps));
1300   gst_pad_set_acceptcaps_function (sinkpad,
1301       GST_DEBUG_FUNCPTR (gst_selector_pad_acceptcaps));
1302   gst_pad_set_chain_function (sinkpad,
1303       GST_DEBUG_FUNCPTR (gst_selector_pad_chain));
1304   gst_pad_set_iterate_internal_links_function (sinkpad,
1305       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
1306
1307   gst_pad_set_active (sinkpad, TRUE);
1308   gst_element_add_pad (GST_ELEMENT (sel), sinkpad);
1309   GST_INPUT_SELECTOR_UNLOCK (sel);
1310
1311   return sinkpad;
1312 }
1313
1314 static void
1315 gst_input_selector_release_pad (GstElement * element, GstPad * pad)
1316 {
1317   GstInputSelector *sel;
1318
1319   sel = GST_INPUT_SELECTOR (element);
1320   GST_LOG_OBJECT (sel, "Releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1321
1322   GST_INPUT_SELECTOR_LOCK (sel);
1323   /* if the pad was the active pad, makes us select a new one */
1324   if (sel->active_sinkpad == pad) {
1325     GST_DEBUG_OBJECT (sel, "Deactivating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1326     gst_object_unref (sel->active_sinkpad);
1327     sel->active_sinkpad = NULL;
1328   }
1329   sel->n_pads--;
1330
1331   gst_pad_set_active (pad, FALSE);
1332   gst_element_remove_pad (GST_ELEMENT (sel), pad);
1333   GST_INPUT_SELECTOR_UNLOCK (sel);
1334 }
1335
1336 static void
1337 gst_input_selector_reset (GstInputSelector * sel)
1338 {
1339   GList *walk;
1340
1341   GST_INPUT_SELECTOR_LOCK (sel);
1342   /* clear active pad */
1343   if (sel->active_sinkpad) {
1344     gst_object_unref (sel->active_sinkpad);
1345     sel->active_sinkpad = NULL;
1346   }
1347   /* reset each of our sinkpads state */
1348   for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
1349     GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data);
1350
1351     gst_selector_pad_reset (selpad);
1352
1353     if (selpad->tags) {
1354       gst_tag_list_free (selpad->tags);
1355       selpad->tags = NULL;
1356     }
1357   }
1358   GST_INPUT_SELECTOR_UNLOCK (sel);
1359 }
1360
1361 static GstStateChangeReturn
1362 gst_input_selector_change_state (GstElement * element,
1363     GstStateChange transition)
1364 {
1365   GstInputSelector *self = GST_INPUT_SELECTOR (element);
1366   GstStateChangeReturn result;
1367
1368   switch (transition) {
1369     case GST_STATE_CHANGE_READY_TO_PAUSED:
1370       GST_INPUT_SELECTOR_LOCK (self);
1371       self->blocked = FALSE;
1372       self->flushing = FALSE;
1373       GST_INPUT_SELECTOR_UNLOCK (self);
1374       break;
1375     case GST_STATE_CHANGE_PAUSED_TO_READY:
1376       /* first unlock before we call the parent state change function, which
1377        * tries to acquire the stream lock when going to ready. */
1378       GST_INPUT_SELECTOR_LOCK (self);
1379       self->blocked = FALSE;
1380       self->flushing = TRUE;
1381       GST_INPUT_SELECTOR_BROADCAST (self);
1382       GST_INPUT_SELECTOR_UNLOCK (self);
1383       break;
1384     default:
1385       break;
1386   }
1387
1388   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1389
1390   switch (transition) {
1391     case GST_STATE_CHANGE_PAUSED_TO_READY:
1392       gst_input_selector_reset (self);
1393       break;
1394     default:
1395       break;
1396   }
1397
1398   return result;
1399 }
1400
1401 static gint64
1402 gst_input_selector_block (GstInputSelector * self)
1403 {
1404   gint64 ret = 0;
1405   GstSelectorPad *spad;
1406
1407   GST_INPUT_SELECTOR_LOCK (self);
1408
1409   if (self->blocked)
1410     GST_WARNING_OBJECT (self, "switch already blocked");
1411
1412   self->blocked = TRUE;
1413   spad = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
1414
1415   if (spad)
1416     ret = gst_selector_pad_get_running_time (spad);
1417   else
1418     GST_DEBUG_OBJECT (self, "no active pad while blocking");
1419
1420   GST_INPUT_SELECTOR_UNLOCK (self);
1421
1422   return ret;
1423 }