pad: add more queries
[platform/upstream/gstreamer.git] / plugins / elements / gstinputselector.c
1 /* GStreamer input selector
2  * Copyright (C) 2003 Julien Moutte <julien@moutte.net>
3  * Copyright (C) 2005 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
4  * Copyright (C) 2005 Jan Schmidt <thaytan@mad.scientist.com>
5  * Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
6  * Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
7  * Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
8  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25
26 /**
27  * SECTION:element-input-selector
28  * @see_also: #GstOutputSelector
29  *
30  * Direct one out of N input streams to the output pad.
31  *
32  * The input pads are from a GstPad subclass and have additional
33  * properties, which users may find useful, namely:
34  *
35  * <itemizedlist>
36  * <listitem>
37  * "running-time": Running time of stream on pad (#gint64)
38  * </listitem>
39  * <listitem>
40  * "tags": The currently active tags on the pad (#GstTagList, boxed type)
41  * </listitem>
42  * <listitem>
43  * "active": If the pad is currently active (#gboolean)
44  * </listitem>
45  * <listitem>
46  * "always-ok" : Make an inactive pads return #GST_FLOW_OK instead of
47  * #GST_FLOW_NOT_LINKED
48  * </listitem>
49  * </itemizedlist>
50  *
51  * Since: 0.10.32
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include <string.h>
59
60 #include "gstinputselector.h"
61
62 GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
63 #define GST_CAT_DEFAULT input_selector_debug
64
65 #if GLIB_CHECK_VERSION(2, 26, 0)
66 #define NOTIFY_MUTEX_LOCK()
67 #define NOTIFY_MUTEX_UNLOCK()
68 #else
69 static GStaticRecMutex notify_mutex = G_STATIC_REC_MUTEX_INIT;
70 #define NOTIFY_MUTEX_LOCK() g_static_rec_mutex_lock (&notify_mutex)
71 #define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (&notify_mutex)
72 #endif
73
74 #define GST_INPUT_SELECTOR_GET_LOCK(sel) (((GstInputSelector*)(sel))->lock)
75 #define GST_INPUT_SELECTOR_GET_COND(sel) (((GstInputSelector*)(sel))->cond)
76 #define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
77 #define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
78 #define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
79                         GST_INPUT_SELECTOR_GET_LOCK(sel)))
80 #define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
81
82 static GstStaticPadTemplate gst_input_selector_sink_factory =
83 GST_STATIC_PAD_TEMPLATE ("sink_%u",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS_ANY);
87
88 static GstStaticPadTemplate gst_input_selector_src_factory =
89 GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 enum
95 {
96   PROP_0,
97   PROP_N_PADS,
98   PROP_ACTIVE_PAD,
99   PROP_SYNC_STREAMS
100 };
101
102 #define DEFAULT_SYNC_STREAMS FALSE
103
104 #define DEFAULT_PAD_ALWAYS_OK TRUE
105
106 enum
107 {
108   PROP_PAD_0,
109   PROP_PAD_RUNNING_TIME,
110   PROP_PAD_TAGS,
111   PROP_PAD_ACTIVE,
112   PROP_PAD_ALWAYS_OK
113 };
114
115 enum
116 {
117   /* methods */
118   SIGNAL_BLOCK,
119   SIGNAL_SWITCH,
120   LAST_SIGNAL
121 };
122 static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 };
123
124 static inline gboolean gst_input_selector_is_active_sinkpad (GstInputSelector *
125     sel, GstPad * pad);
126 static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel,
127     GstPad * pad);
128 static GstPad *gst_input_selector_get_linked_pad (GstInputSelector * sel,
129     GstPad * pad, gboolean strict);
130
131 #define GST_TYPE_SELECTOR_PAD \
132   (gst_selector_pad_get_type())
133 #define GST_SELECTOR_PAD(obj) \
134   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SELECTOR_PAD, GstSelectorPad))
135 #define GST_SELECTOR_PAD_CLASS(klass) \
136   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SELECTOR_PAD, GstSelectorPadClass))
137 #define GST_IS_SELECTOR_PAD(obj) \
138   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SELECTOR_PAD))
139 #define GST_IS_SELECTOR_PAD_CLASS(klass) \
140   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SELECTOR_PAD))
141 #define GST_SELECTOR_PAD_CAST(obj) \
142   ((GstSelectorPad *)(obj))
143
144 typedef struct _GstSelectorPad GstSelectorPad;
145 typedef struct _GstSelectorPadClass GstSelectorPadClass;
146
147 struct _GstSelectorPad
148 {
149   GstPad parent;
150
151   gboolean active;              /* when buffer have passed the pad */
152   gboolean pushed;              /* when buffer was pushed downstream since activation */
153   gboolean eos;                 /* when EOS has been received */
154   gboolean eos_sent;            /* when EOS was sent downstream */
155   gboolean discont;             /* after switching we create a discont */
156   gboolean flushing;            /* set after flush-start and before flush-stop */
157   gboolean always_ok;
158   GstTagList *tags;             /* last tags received on the pad */
159
160   GstClockTime position;        /* the current position in the segment */
161   GstSegment segment;           /* the current segment on the pad */
162   guint32 segment_seqnum;       /* sequence number of the current segment */
163
164   gboolean segment_pending;
165 };
166
167 struct _GstSelectorPadClass
168 {
169   GstPadClass parent;
170 };
171
172 GType gst_selector_pad_get_type (void);
173 static void gst_selector_pad_finalize (GObject * object);
174 static void gst_selector_pad_get_property (GObject * object,
175     guint prop_id, GValue * value, GParamSpec * pspec);
176 static void gst_selector_pad_set_property (GObject * object,
177     guint prop_id, const GValue * value, GParamSpec * pspec);
178
179 static gint64 gst_selector_pad_get_running_time (GstSelectorPad * pad);
180 static void gst_selector_pad_reset (GstSelectorPad * pad);
181 static gboolean gst_selector_pad_event (GstPad * pad, GstEvent * event);
182 static GstCaps *gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter);
183 static gboolean gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps);
184 static gboolean gst_selector_pad_query (GstPad * pad, GstQuery * query);
185 static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad);
186 static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstBuffer * buf);
187
188 G_DEFINE_TYPE (GstSelectorPad, gst_selector_pad, GST_TYPE_PAD);
189
190 static void
191 gst_selector_pad_class_init (GstSelectorPadClass * klass)
192 {
193   GObjectClass *gobject_class;
194
195   gobject_class = (GObjectClass *) klass;
196
197   gobject_class->finalize = gst_selector_pad_finalize;
198
199   gobject_class->get_property = gst_selector_pad_get_property;
200   gobject_class->set_property = gst_selector_pad_set_property;
201
202   g_object_class_install_property (gobject_class, PROP_PAD_RUNNING_TIME,
203       g_param_spec_int64 ("running-time", "Running time",
204           "Running time of stream on pad", 0, G_MAXINT64, 0,
205           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_PAD_TAGS,
207       g_param_spec_boxed ("tags", "Tags",
208           "The currently active tags on the pad", GST_TYPE_TAG_LIST,
209           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_PAD_ACTIVE,
211       g_param_spec_boolean ("active", "Active",
212           "If the pad is currently active", FALSE,
213           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
214   /* FIXME: better property name? */
215   g_object_class_install_property (gobject_class, PROP_PAD_ALWAYS_OK,
216       g_param_spec_boolean ("always-ok", "Always OK",
217           "Make an inactive pad return OK instead of NOT_LINKED",
218           DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219 }
220
221 static void
222 gst_selector_pad_init (GstSelectorPad * pad)
223 {
224   pad->always_ok = DEFAULT_PAD_ALWAYS_OK;
225   gst_selector_pad_reset (pad);
226 }
227
228 static void
229 gst_selector_pad_finalize (GObject * object)
230 {
231   GstSelectorPad *pad;
232
233   pad = GST_SELECTOR_PAD_CAST (object);
234
235   if (pad->tags)
236     gst_tag_list_free (pad->tags);
237
238   G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object);
239 }
240
241 static void
242 gst_selector_pad_set_property (GObject * object, guint prop_id,
243     const GValue * value, GParamSpec * pspec)
244 {
245   GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
246
247   switch (prop_id) {
248     case PROP_PAD_ALWAYS_OK:
249       GST_OBJECT_LOCK (object);
250       spad->always_ok = g_value_get_boolean (value);
251       GST_OBJECT_UNLOCK (object);
252       break;
253     default:
254       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
255       break;
256   }
257 }
258
259 static void
260 gst_selector_pad_get_property (GObject * object, guint prop_id,
261     GValue * value, GParamSpec * pspec)
262 {
263   GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
264
265   switch (prop_id) {
266     case PROP_PAD_RUNNING_TIME:
267       g_value_set_int64 (value, gst_selector_pad_get_running_time (spad));
268       break;
269     case PROP_PAD_TAGS:
270       GST_OBJECT_LOCK (object);
271       g_value_set_boxed (value, spad->tags);
272       GST_OBJECT_UNLOCK (object);
273       break;
274     case PROP_PAD_ACTIVE:
275     {
276       GstInputSelector *sel;
277
278       sel = GST_INPUT_SELECTOR (gst_pad_get_parent (spad));
279       g_value_set_boolean (value, gst_input_selector_is_active_sinkpad (sel,
280               GST_PAD_CAST (spad)));
281       gst_object_unref (sel);
282       break;
283     }
284     case PROP_PAD_ALWAYS_OK:
285       GST_OBJECT_LOCK (object);
286       g_value_set_boolean (value, spad->always_ok);
287       GST_OBJECT_UNLOCK (object);
288       break;
289     default:
290       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
291       break;
292   }
293 }
294
295 static gint64
296 gst_selector_pad_get_running_time (GstSelectorPad * pad)
297 {
298   gint64 ret = 0;
299
300   GST_OBJECT_LOCK (pad);
301   if (pad->active) {
302     guint64 position = pad->position;
303     GstFormat format = pad->segment.format;
304
305     ret = gst_segment_to_running_time (&pad->segment, format, position);
306   }
307   GST_OBJECT_UNLOCK (pad);
308
309   GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT,
310       GST_TIME_ARGS (ret));
311
312   return ret;
313 }
314
315 static void
316 gst_selector_pad_reset (GstSelectorPad * pad)
317 {
318   GST_OBJECT_LOCK (pad);
319   pad->active = FALSE;
320   pad->pushed = FALSE;
321   pad->eos = FALSE;
322   pad->eos_sent = FALSE;
323   pad->segment_pending = FALSE;
324   pad->discont = FALSE;
325   pad->flushing = FALSE;
326   pad->position = GST_CLOCK_TIME_NONE;
327   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
328   GST_OBJECT_UNLOCK (pad);
329 }
330
331 /* strictly get the linked pad from the sinkpad. If the pad is active we return
332  * the srcpad else we return NULL */
333 static GstIterator *
334 gst_selector_pad_iterate_linked_pads (GstPad * pad)
335 {
336   GstInputSelector *sel;
337   GstPad *otherpad;
338   GstIterator *it = NULL;
339   GValue val = { 0, };
340
341   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
342   if (G_UNLIKELY (sel == NULL))
343     return NULL;
344
345   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
346   if (otherpad) {
347     g_value_init (&val, GST_TYPE_PAD);
348     g_value_set_object (&val, otherpad);
349     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
350     g_value_unset (&val);
351     gst_object_unref (otherpad);
352   }
353   gst_object_unref (sel);
354
355   return it;
356 }
357
358 static gboolean
359 gst_selector_pad_event (GstPad * pad, GstEvent * event)
360 {
361   gboolean res = TRUE;
362   gboolean forward;
363   GstInputSelector *sel;
364   GstSelectorPad *selpad;
365   GstPad *prev_active_sinkpad;
366   GstPad *active_sinkpad;
367
368   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
369   if (G_UNLIKELY (sel == NULL)) {
370     gst_event_unref (event);
371     return FALSE;
372   }
373   selpad = GST_SELECTOR_PAD_CAST (pad);
374
375   GST_INPUT_SELECTOR_LOCK (sel);
376   prev_active_sinkpad = sel->active_sinkpad;
377   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
378
379   /* only forward if we are dealing with the active sinkpad */
380   forward = (pad == active_sinkpad);
381   GST_INPUT_SELECTOR_UNLOCK (sel);
382
383   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
384     NOTIFY_MUTEX_LOCK ();
385     g_object_notify (G_OBJECT (sel), "active-pad");
386     NOTIFY_MUTEX_UNLOCK ();
387   }
388
389   switch (GST_EVENT_TYPE (event)) {
390     case GST_EVENT_FLUSH_START:
391       /* Unblock the pad if it's waiting */
392       GST_INPUT_SELECTOR_LOCK (sel);
393       selpad->flushing = TRUE;
394       GST_INPUT_SELECTOR_BROADCAST (sel);
395       GST_INPUT_SELECTOR_UNLOCK (sel);
396       break;
397     case GST_EVENT_FLUSH_STOP:
398       GST_INPUT_SELECTOR_LOCK (sel);
399       gst_selector_pad_reset (selpad);
400       GST_INPUT_SELECTOR_UNLOCK (sel);
401       break;
402     case GST_EVENT_SEGMENT:
403     {
404       GST_INPUT_SELECTOR_LOCK (sel);
405       GST_OBJECT_LOCK (selpad);
406       gst_event_copy_segment (event, &selpad->segment);
407       selpad->segment_seqnum = gst_event_get_seqnum (event);
408
409       /* Update the position */
410       if (selpad->position == GST_CLOCK_TIME_NONE
411           || selpad->segment.position > selpad->position) {
412         selpad->position = selpad->segment.position;
413       } else if (selpad->position != GST_CLOCK_TIME_NONE
414           && selpad->position > selpad->segment.position) {
415         selpad->segment.position = selpad->position;
416
417         if (forward) {
418           gst_event_unref (event);
419           event = gst_event_new_segment (&selpad->segment);
420           gst_event_set_seqnum (event, selpad->segment_seqnum);
421         }
422       }
423       GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT,
424           &selpad->segment);
425
426       /* If we aren't forwarding the event because the pad is not the
427        * active_sinkpad, then set the flag on the pad
428        * that says a segment needs sending if/when that pad is activated.
429        * For all other cases, we send the event immediately, which makes
430        * sparse streams and other segment updates work correctly downstream.
431        */
432       if (!forward)
433         selpad->segment_pending = TRUE;
434
435       GST_OBJECT_UNLOCK (selpad);
436       GST_INPUT_SELECTOR_UNLOCK (sel);
437       break;
438     }
439     case GST_EVENT_TAG:
440     {
441       GstTagList *tags, *oldtags, *newtags;
442
443       gst_event_parse_tag (event, &tags);
444
445       GST_OBJECT_LOCK (selpad);
446       oldtags = selpad->tags;
447
448       newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE);
449       selpad->tags = newtags;
450       if (oldtags)
451         gst_tag_list_free (oldtags);
452       GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags);
453       GST_OBJECT_UNLOCK (selpad);
454
455       g_object_notify (G_OBJECT (selpad), "tags");
456       break;
457     }
458     case GST_EVENT_EOS:
459       selpad->eos = TRUE;
460
461       if (forward) {
462         selpad->eos_sent = TRUE;
463       } else {
464         GstSelectorPad *tmp;
465
466         /* If the active sinkpad is in EOS state but EOS
467          * was not sent downstream this means that the pad
468          * got EOS before it was set as active pad and that
469          * the previously active pad got EOS after it was
470          * active
471          */
472         GST_INPUT_SELECTOR_LOCK (sel);
473         active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
474         tmp = GST_SELECTOR_PAD (active_sinkpad);
475         forward = (tmp->eos && !tmp->eos_sent);
476         tmp->eos_sent = TRUE;
477         GST_INPUT_SELECTOR_UNLOCK (sel);
478       }
479       GST_DEBUG_OBJECT (pad, "received EOS");
480       break;
481     default:
482       break;
483   }
484   if (forward) {
485     GST_DEBUG_OBJECT (pad, "forwarding event");
486     res = gst_pad_push_event (sel->srcpad, event);
487   } else
488     gst_event_unref (event);
489
490   gst_object_unref (sel);
491
492   return res;
493 }
494
495 static gboolean
496 gst_selector_pad_query (GstPad * pad, GstQuery * query)
497 {
498   gboolean res = FALSE;
499   GstInputSelector *sel;
500   GstPad *otherpad;
501
502   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
503   if (G_UNLIKELY (sel == NULL))
504     return FALSE;
505
506   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
507
508   switch (GST_QUERY_TYPE (query)) {
509     default:
510       if (otherpad)
511         res = gst_pad_peer_query (otherpad, query);
512       break;
513   }
514   if (otherpad)
515     gst_object_unref (otherpad);
516
517   gst_object_unref (sel);
518
519   return res;
520 }
521
522 static GstCaps *
523 gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter)
524 {
525   GstInputSelector *sel;
526   GstCaps *caps;
527
528   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
529   if (G_UNLIKELY (sel == NULL))
530     return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
531
532   GST_DEBUG_OBJECT (sel, "Getting caps of srcpad peer");
533   caps = gst_pad_peer_get_caps (sel->srcpad, filter);
534   if (caps == NULL)
535     caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
536
537   gst_object_unref (sel);
538
539   return caps;
540 }
541
542 static gboolean
543 gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps)
544 {
545   GstInputSelector *sel;
546   gboolean res;
547
548   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
549   if (G_UNLIKELY (sel == NULL))
550     return FALSE;
551
552   GST_DEBUG_OBJECT (sel, "Checking acceptcaps of srcpad peer");
553   res = gst_pad_peer_accept_caps (sel->srcpad, caps);
554   gst_object_unref (sel);
555
556   return res;
557 }
558
559 /* must be called with the SELECTOR_LOCK, will block while the pad is blocked 
560  * or return TRUE when flushing */
561 static gboolean
562 gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
563 {
564   while (self->blocked && !self->flushing && !pad->flushing) {
565     /* we can be unlocked here when we are shutting down (flushing) or when we
566      * get unblocked */
567     GST_INPUT_SELECTOR_WAIT (self);
568   }
569   return self->flushing;
570 }
571
572 /* must be called with the SELECTOR_LOCK, will block until the running time
573  * of the active pad is after this pad or return TRUE when flushing */
574 static gboolean
575 gst_input_selector_wait_running_time (GstInputSelector * sel,
576     GstSelectorPad * pad, GstBuffer * buf)
577 {
578   GstPad *active_sinkpad;
579   GstSelectorPad *active_selpad;
580   GstSegment *seg, *active_seg;
581   GstClockTime running_time, active_running_time = -1;
582
583   seg = &pad->segment;
584
585   active_sinkpad =
586       gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
587   active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
588   active_seg = &active_selpad->segment;
589
590   /* We can only sync if the segments are in time format or
591    * if the active pad had no newsegment event yet */
592   if (seg->format != GST_FORMAT_TIME ||
593       (active_seg->format != GST_FORMAT_TIME
594           && active_seg->format != GST_FORMAT_UNDEFINED))
595     return FALSE;
596
597   /* If we have no valid timestamp we can't sync this buffer */
598   if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf))
599     return FALSE;
600
601   running_time = GST_BUFFER_TIMESTAMP (buf);
602   /* If possible try to get the running time at the end of the buffer */
603   if (GST_BUFFER_DURATION_IS_VALID (buf))
604     running_time += GST_BUFFER_DURATION (buf);
605   if (running_time > seg->stop)
606     running_time = seg->stop;
607   running_time =
608       gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
609   /* If this is outside the segment don't sync */
610   if (running_time == -1)
611     return FALSE;
612
613   /* Get active pad's running time, if no configured segment yet keep at -1 */
614   if (active_seg->format == GST_FORMAT_TIME)
615     active_running_time =
616         gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
617         active_selpad->position);
618
619   /* Wait until
620    *   a) this is the active pad
621    *   b) the pad or the selector is flushing
622    *   c) the selector is not blocked
623    *   d) the active pad has no running time or the active
624    *      pad's running time is before this running time
625    *   e) the active pad has a non-time segment
626    */
627   while (pad != active_selpad && !sel->flushing && !pad->flushing &&
628       (sel->blocked || active_running_time == -1
629           || running_time >= active_running_time)) {
630     if (!sel->blocked)
631       GST_DEBUG_OBJECT (pad,
632           "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
633           GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
634           GST_TIME_ARGS (active_running_time));
635
636     GST_INPUT_SELECTOR_WAIT (sel);
637
638     /* Get new active pad, it might have changed */
639     active_sinkpad =
640         gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
641     active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
642     active_seg = &active_selpad->segment;
643
644     /* If the active segment is configured but not to time format
645      * we can't do any syncing at all */
646     if (active_seg->format != GST_FORMAT_TIME
647         && active_seg->format != GST_FORMAT_UNDEFINED)
648       break;
649
650     /* Get the new active pad running time */
651     if (active_seg->format == GST_FORMAT_TIME)
652       active_running_time =
653           gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
654           active_selpad->position);
655     else
656       active_running_time = -1;
657
658     if (!sel->blocked)
659       GST_DEBUG_OBJECT (pad,
660           "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %"
661           GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
662           GST_TIME_ARGS (active_running_time));
663
664   }
665
666   /* Return TRUE if the selector or the pad is flushing */
667   return (sel->flushing || pad->flushing);
668 }
669
670
671 static GstFlowReturn
672 gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
673 {
674   GstInputSelector *sel;
675   GstFlowReturn res;
676   GstPad *active_sinkpad;
677   GstPad *prev_active_sinkpad;
678   GstSelectorPad *selpad;
679   GstClockTime start_time;
680   GstSegment *seg;
681   GstEvent *start_event = NULL;
682
683   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
684   selpad = GST_SELECTOR_PAD_CAST (pad);
685   seg = &selpad->segment;
686
687   GST_INPUT_SELECTOR_LOCK (sel);
688   /* wait or check for flushing */
689   if (gst_input_selector_wait (sel, selpad))
690     goto flushing;
691
692   GST_LOG_OBJECT (pad, "getting active pad");
693
694   prev_active_sinkpad = sel->active_sinkpad;
695   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
696
697   /* In sync mode wait until the active pad has advanced
698    * after the running time of the current buffer */
699   if (sel->sync_streams && active_sinkpad != pad) {
700     if (gst_input_selector_wait_running_time (sel, selpad, buf))
701       goto flushing;
702   }
703
704   /* Might have changed while waiting */
705   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
706
707   /* update the segment on the srcpad */
708   start_time = GST_BUFFER_TIMESTAMP (buf);
709   if (GST_CLOCK_TIME_IS_VALID (start_time)) {
710     GST_LOG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
711         GST_TIME_ARGS (start_time));
712     if (GST_BUFFER_DURATION_IS_VALID (buf))
713       GST_LOG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
714           GST_TIME_ARGS (start_time + GST_BUFFER_DURATION (buf)));
715
716     GST_OBJECT_LOCK (pad);
717     selpad->position = start_time;
718     GST_OBJECT_UNLOCK (pad);
719   }
720
721   /* Ignore buffers from pads except the selected one */
722   if (pad != active_sinkpad)
723     goto ignore;
724
725   /* Tell all non-active pads that we advanced the running time */
726   if (sel->sync_streams)
727     GST_INPUT_SELECTOR_BROADCAST (sel);
728
729   /* if we have a pending segment, push it out now */
730   if (G_UNLIKELY (prev_active_sinkpad != active_sinkpad
731           || selpad->segment_pending)) {
732     GST_DEBUG_OBJECT (pad,
733         "pushing pending NEWSEGMENT update %d, rate %lf, applied rate %lf, "
734         "format %d, " "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
735         G_GINT64_FORMAT, FALSE, seg->rate, seg->applied_rate, seg->format,
736         seg->start, seg->stop, seg->time);
737
738     start_event = gst_event_new_segment (seg);
739     gst_event_set_seqnum (start_event, selpad->segment_seqnum);
740
741     selpad->segment_pending = FALSE;
742   }
743   GST_INPUT_SELECTOR_UNLOCK (sel);
744
745   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
746     NOTIFY_MUTEX_LOCK ();
747     g_object_notify (G_OBJECT (sel), "active-pad");
748     NOTIFY_MUTEX_UNLOCK ();
749   }
750
751   if (start_event)
752     gst_pad_push_event (sel->srcpad, start_event);
753
754   if (selpad->discont) {
755     buf = gst_buffer_make_writable (buf);
756
757     GST_DEBUG_OBJECT (pad, "Marking discont buffer %p", buf);
758     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
759     selpad->discont = FALSE;
760   }
761
762   /* forward */
763   GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf);
764
765   res = gst_pad_push (sel->srcpad, buf);
766   selpad->pushed = TRUE;
767
768 done:
769   gst_object_unref (sel);
770   return res;
771
772   /* dropped buffers */
773 ignore:
774   {
775     gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
776
777     GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
778     /* when we drop a buffer, we're creating a discont on this pad */
779     selpad->discont = TRUE;
780     GST_INPUT_SELECTOR_UNLOCK (sel);
781     gst_buffer_unref (buf);
782
783     /* figure out what to return upstream */
784     GST_OBJECT_LOCK (selpad);
785     if (selpad->always_ok || !active_pad_pushed)
786       res = GST_FLOW_OK;
787     else
788       res = GST_FLOW_NOT_LINKED;
789     GST_OBJECT_UNLOCK (selpad);
790
791     goto done;
792   }
793 flushing:
794   {
795     GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf);
796     GST_INPUT_SELECTOR_UNLOCK (sel);
797     gst_buffer_unref (buf);
798     res = GST_FLOW_WRONG_STATE;
799     goto done;
800   }
801 }
802
803 static void gst_input_selector_dispose (GObject * object);
804
805 static void gst_input_selector_set_property (GObject * object,
806     guint prop_id, const GValue * value, GParamSpec * pspec);
807 static void gst_input_selector_get_property (GObject * object,
808     guint prop_id, GValue * value, GParamSpec * pspec);
809
810 static GstPad *gst_input_selector_request_new_pad (GstElement * element,
811     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps);
812 static void gst_input_selector_release_pad (GstElement * element, GstPad * pad);
813
814 static GstStateChangeReturn gst_input_selector_change_state (GstElement *
815     element, GstStateChange transition);
816
817 static GstCaps *gst_input_selector_getcaps (GstPad * pad, GstCaps * filter);
818 static gboolean gst_input_selector_event (GstPad * pad, GstEvent * event);
819 static gboolean gst_input_selector_query (GstPad * pad, GstQuery * query);
820 static gint64 gst_input_selector_block (GstInputSelector * self);
821
822 /* FIXME: create these marshallers using glib-genmarshal */
823 static void
824 gst_input_selector_marshal_INT64__VOID (GClosure * closure,
825     GValue * return_value G_GNUC_UNUSED,
826     guint n_param_values,
827     const GValue * param_values,
828     gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
829 {
830   typedef gint64 (*GMarshalFunc_INT64__VOID) (gpointer data1, gpointer data2);
831   register GMarshalFunc_INT64__VOID callback;
832   register GCClosure *cc = (GCClosure *) closure;
833   register gpointer data1, data2;
834   gint64 v_return;
835
836   g_return_if_fail (return_value != NULL);
837   g_return_if_fail (n_param_values == 1);
838
839   if (G_CCLOSURE_SWAP_DATA (closure)) {
840     data1 = closure->data;
841     data2 = g_value_peek_pointer (param_values + 0);
842   } else {
843     data1 = g_value_peek_pointer (param_values + 0);
844     data2 = closure->data;
845   }
846   callback =
847       (GMarshalFunc_INT64__VOID) (marshal_data ? marshal_data : cc->callback);
848
849   v_return = callback (data1, data2);
850
851   g_value_set_int64 (return_value, v_return);
852 }
853
854 #define _do_init \
855     GST_DEBUG_CATEGORY_INIT (input_selector_debug, \
856         "input-selector", 0, "An input stream selector element");
857 #define gst_input_selector_parent_class parent_class
858 G_DEFINE_TYPE_WITH_CODE (GstInputSelector, gst_input_selector, GST_TYPE_ELEMENT,
859     _do_init);
860
861 static void
862 gst_input_selector_class_init (GstInputSelectorClass * klass)
863 {
864   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
865   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
866
867   gobject_class->dispose = gst_input_selector_dispose;
868
869   gobject_class->set_property = gst_input_selector_set_property;
870   gobject_class->get_property = gst_input_selector_get_property;
871
872   g_object_class_install_property (gobject_class, PROP_N_PADS,
873       g_param_spec_uint ("n-pads", "Number of Pads",
874           "The number of sink pads", 0, G_MAXUINT, 0,
875           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
876
877   g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
878       g_param_spec_object ("active-pad", "Active pad",
879           "The currently active sink pad", GST_TYPE_PAD,
880           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
881
882   /**
883    * GstInputSelector:sync-streams
884    *
885    * If set to %TRUE all inactive streams will be synced to the
886    * running time of the active stream. This makes sure that no
887    * buffers are dropped by input-selector that might be needed
888    * when switching the active pad.
889    *
890    * Since: 0.10.36
891    */
892   g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
893       g_param_spec_boolean ("sync-streams", "Sync Streams",
894           "Synchronize inactive streams to the running time of the active stream",
895           DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
896
897   /**
898    * GstInputSelector::block:
899    * @inputselector: the #GstInputSelector
900    *
901    * Block all sink pads in preparation for a switch. Returns the stop time of
902    * the current switch segment, as a running time, or 0 if there is no current
903    * active pad or the current active pad never received data.
904    */
905   gst_input_selector_signals[SIGNAL_BLOCK] =
906       g_signal_new ("block", G_TYPE_FROM_CLASS (klass),
907       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
908       G_STRUCT_OFFSET (GstInputSelectorClass, block), NULL, NULL,
909       gst_input_selector_marshal_INT64__VOID, G_TYPE_INT64, 0);
910
911   gst_element_class_set_details_simple (gstelement_class, "Input selector",
912       "Generic", "N-to-1 input stream selector",
913       "Julien Moutte <julien@moutte.net>, "
914       "Jan Schmidt <thaytan@mad.scientist.com>, "
915       "Wim Taymans <wim.taymans@gmail.com>");
916   gst_element_class_add_pad_template (gstelement_class,
917       gst_static_pad_template_get (&gst_input_selector_sink_factory));
918   gst_element_class_add_pad_template (gstelement_class,
919       gst_static_pad_template_get (&gst_input_selector_src_factory));
920
921   gstelement_class->request_new_pad = gst_input_selector_request_new_pad;
922   gstelement_class->release_pad = gst_input_selector_release_pad;
923   gstelement_class->change_state = gst_input_selector_change_state;
924
925   klass->block = GST_DEBUG_FUNCPTR (gst_input_selector_block);
926 }
927
928 static void
929 gst_input_selector_init (GstInputSelector * sel)
930 {
931   sel->srcpad = gst_pad_new ("src", GST_PAD_SRC);
932   gst_pad_set_iterate_internal_links_function (sel->srcpad,
933       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
934   gst_pad_set_getcaps_function (sel->srcpad,
935       GST_DEBUG_FUNCPTR (gst_input_selector_getcaps));
936   gst_pad_set_query_function (sel->srcpad,
937       GST_DEBUG_FUNCPTR (gst_input_selector_query));
938   gst_pad_set_event_function (sel->srcpad,
939       GST_DEBUG_FUNCPTR (gst_input_selector_event));
940   gst_element_add_pad (GST_ELEMENT (sel), sel->srcpad);
941   /* sinkpad management */
942   sel->active_sinkpad = NULL;
943   sel->padcount = 0;
944   sel->sync_streams = DEFAULT_SYNC_STREAMS;
945
946   sel->lock = g_mutex_new ();
947   sel->cond = g_cond_new ();
948   sel->blocked = FALSE;
949 }
950
951 static void
952 gst_input_selector_dispose (GObject * object)
953 {
954   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
955
956   if (sel->active_sinkpad) {
957     gst_object_unref (sel->active_sinkpad);
958     sel->active_sinkpad = NULL;
959   }
960   if (sel->lock) {
961     g_mutex_free (sel->lock);
962     sel->lock = NULL;
963   }
964   if (sel->cond) {
965     g_cond_free (sel->cond);
966     sel->cond = NULL;
967   }
968
969   G_OBJECT_CLASS (parent_class)->dispose (object);
970 }
971
972 /* this function must be called with the SELECTOR_LOCK. It returns TRUE when the
973  * active pad changed. */
974 static gboolean
975 gst_input_selector_set_active_pad (GstInputSelector * self, GstPad * pad)
976 {
977   GstSelectorPad *old, *new;
978   GstPad **active_pad_p;
979
980   if (pad == self->active_sinkpad)
981     return FALSE;
982
983   old = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
984   new = GST_SELECTOR_PAD_CAST (pad);
985
986   GST_DEBUG_OBJECT (self, "setting active pad to %s:%s",
987       GST_DEBUG_PAD_NAME (new));
988
989   if (old)
990     old->pushed = FALSE;
991   if (new)
992     new->pushed = FALSE;
993
994   /* Send a new SEGMENT event on the new pad next */
995   if (old != new && new)
996     new->segment_pending = TRUE;
997
998   active_pad_p = &self->active_sinkpad;
999   gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
1000
1001   /* Wake up all non-active pads in sync mode, they might be
1002    * the active pad now */
1003   if (self->sync_streams)
1004     GST_INPUT_SELECTOR_BROADCAST (self);
1005
1006   GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
1007       self->active_sinkpad);
1008
1009   return TRUE;
1010 }
1011
1012 static void
1013 gst_input_selector_set_property (GObject * object, guint prop_id,
1014     const GValue * value, GParamSpec * pspec)
1015 {
1016   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
1017
1018   switch (prop_id) {
1019     case PROP_ACTIVE_PAD:
1020     {
1021       GstPad *pad;
1022
1023       pad = g_value_get_object (value);
1024
1025       GST_INPUT_SELECTOR_LOCK (sel);
1026       gst_input_selector_set_active_pad (sel, pad);
1027       GST_INPUT_SELECTOR_UNLOCK (sel);
1028       break;
1029     }
1030     case PROP_SYNC_STREAMS:
1031     {
1032       GST_INPUT_SELECTOR_LOCK (sel);
1033       sel->sync_streams = g_value_get_boolean (value);
1034       GST_INPUT_SELECTOR_UNLOCK (sel);
1035       break;
1036     }
1037     default:
1038       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1039       break;
1040   }
1041 }
1042
1043 static void
1044 gst_input_selector_get_property (GObject * object, guint prop_id,
1045     GValue * value, GParamSpec * pspec)
1046 {
1047   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
1048
1049   switch (prop_id) {
1050     case PROP_N_PADS:
1051       GST_INPUT_SELECTOR_LOCK (object);
1052       g_value_set_uint (value, sel->n_pads);
1053       GST_INPUT_SELECTOR_UNLOCK (object);
1054       break;
1055     case PROP_ACTIVE_PAD:
1056       GST_INPUT_SELECTOR_LOCK (object);
1057       g_value_set_object (value, sel->active_sinkpad);
1058       GST_INPUT_SELECTOR_UNLOCK (object);
1059       break;
1060     case PROP_SYNC_STREAMS:
1061       GST_INPUT_SELECTOR_LOCK (object);
1062       g_value_set_boolean (value, sel->sync_streams);
1063       GST_INPUT_SELECTOR_UNLOCK (object);
1064       break;
1065     default:
1066       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1067       break;
1068   }
1069 }
1070
1071 static GstPad *
1072 gst_input_selector_get_linked_pad (GstInputSelector * sel, GstPad * pad,
1073     gboolean strict)
1074 {
1075   GstPad *otherpad = NULL;
1076
1077   GST_INPUT_SELECTOR_LOCK (sel);
1078   if (pad == sel->srcpad)
1079     otherpad = sel->active_sinkpad;
1080   else if (pad == sel->active_sinkpad || !strict)
1081     otherpad = sel->srcpad;
1082   if (otherpad)
1083     gst_object_ref (otherpad);
1084   GST_INPUT_SELECTOR_UNLOCK (sel);
1085
1086   return otherpad;
1087 }
1088
1089 static gboolean
1090 gst_input_selector_event (GstPad * pad, GstEvent * event)
1091 {
1092   GstInputSelector *sel;
1093   gboolean result = FALSE;
1094   GstIterator *iter;
1095   gboolean done = FALSE;
1096   GValue item = { 0, };
1097   GstPad *eventpad;
1098   GList *pushed_pads = NULL;
1099
1100   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1101   if (G_UNLIKELY (sel == NULL)) {
1102     gst_event_unref (event);
1103     return FALSE;
1104   }
1105
1106   /* Send upstream events to all sinkpads */
1107   iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (sel));
1108
1109   /* This is now essentially a copy of gst_pad_event_default_dispatch
1110    * with a different iterator */
1111   while (!done) {
1112     switch (gst_iterator_next (iter, &item)) {
1113       case GST_ITERATOR_OK:
1114         eventpad = g_value_get_object (&item);
1115
1116         /* if already pushed,  skip */
1117         if (g_list_find (pushed_pads, eventpad)) {
1118           g_value_reset (&item);
1119           break;
1120         }
1121
1122         gst_event_ref (event);
1123         result |= gst_pad_push_event (eventpad, event);
1124
1125         g_value_reset (&item);
1126         break;
1127       case GST_ITERATOR_RESYNC:
1128         /* We don't reset the result here because we don't push the event
1129          * again on pads that got the event already and because we need
1130          * to consider the result of the previous pushes */
1131         gst_iterator_resync (iter);
1132         break;
1133       case GST_ITERATOR_ERROR:
1134         GST_ERROR_OBJECT (pad, "Could not iterate over sinkpads");
1135         done = TRUE;
1136         break;
1137       case GST_ITERATOR_DONE:
1138         done = TRUE;
1139         break;
1140     }
1141   }
1142   g_value_unset (&item);
1143   gst_iterator_free (iter);
1144
1145   g_list_free (pushed_pads);
1146
1147   gst_event_unref (event);
1148
1149   return result;
1150 }
1151
1152 /* query on the srcpad. We override this function because by default it will
1153  * only forward the query to one random sinkpad */
1154 static gboolean
1155 gst_input_selector_query (GstPad * pad, GstQuery * query)
1156 {
1157   gboolean res = FALSE;
1158   GstInputSelector *sel;
1159   GstPad *otherpad;
1160
1161   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1162   if (G_UNLIKELY (sel == NULL))
1163     return FALSE;
1164
1165   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
1166
1167   switch (GST_QUERY_TYPE (query)) {
1168     case GST_QUERY_LATENCY:
1169     {
1170       GList *walk;
1171       GstClockTime resmin, resmax;
1172       gboolean reslive;
1173
1174       resmin = 0;
1175       resmax = -1;
1176       reslive = FALSE;
1177
1178       /* perform the query on all sinkpads and combine the results. We take the
1179        * max of min and the min of max for the result latency. */
1180       GST_INPUT_SELECTOR_LOCK (sel);
1181       for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk;
1182           walk = g_list_next (walk)) {
1183         GstPad *sinkpad = GST_PAD_CAST (walk->data);
1184
1185         if (gst_pad_peer_query (sinkpad, query)) {
1186           GstClockTime min, max;
1187           gboolean live;
1188
1189           /* one query succeeded, we succeed too */
1190           res = TRUE;
1191
1192           gst_query_parse_latency (query, &live, &min, &max);
1193
1194           GST_DEBUG_OBJECT (sinkpad,
1195               "peer latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1196               ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
1197
1198           if (live) {
1199             if (min > resmin)
1200               resmin = min;
1201             if (resmax == -1)
1202               resmax = max;
1203             else if (max < resmax)
1204               resmax = max;
1205             if (reslive == FALSE)
1206               reslive = live;
1207           }
1208         }
1209       }
1210       GST_INPUT_SELECTOR_UNLOCK (sel);
1211       if (res) {
1212         gst_query_set_latency (query, reslive, resmin, resmax);
1213
1214         GST_DEBUG_OBJECT (sel,
1215             "total latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1216             ", live %d", GST_TIME_ARGS (resmin), GST_TIME_ARGS (resmax),
1217             reslive);
1218       }
1219
1220       break;
1221     }
1222     default:
1223       if (otherpad)
1224         res = gst_pad_peer_query (otherpad, query);
1225       break;
1226   }
1227   if (otherpad)
1228     gst_object_unref (otherpad);
1229
1230   gst_object_unref (sel);
1231
1232   return res;
1233 }
1234
1235 static GstCaps *
1236 gst_input_selector_getcaps (GstPad * pad, GstCaps * filter)
1237 {
1238   GstPad *otherpad;
1239   GstInputSelector *sel;
1240   GstCaps *caps;
1241
1242   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1243   if (G_UNLIKELY (sel == NULL))
1244     return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1245
1246   otherpad = gst_input_selector_get_linked_pad (sel, pad, FALSE);
1247
1248   if (!otherpad) {
1249     GST_DEBUG_OBJECT (pad, "Pad not linked, returning ANY");
1250     caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1251   } else {
1252     GST_DEBUG_OBJECT (pad, "Pad is linked (to %s:%s), returning peer caps",
1253         GST_DEBUG_PAD_NAME (otherpad));
1254     /* if the peer has caps, use those. If the pad is not linked, this function
1255      * returns NULL and we return ANY */
1256     if (!(caps = gst_pad_peer_get_caps (otherpad, filter)))
1257       caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1258     gst_object_unref (otherpad);
1259   }
1260
1261   gst_object_unref (sel);
1262   return caps;
1263 }
1264
1265 /* check if the pad is the active sinkpad */
1266 static inline gboolean
1267 gst_input_selector_is_active_sinkpad (GstInputSelector * sel, GstPad * pad)
1268 {
1269   gboolean res;
1270
1271   GST_INPUT_SELECTOR_LOCK (sel);
1272   res = (pad == sel->active_sinkpad);
1273   GST_INPUT_SELECTOR_UNLOCK (sel);
1274
1275   return res;
1276 }
1277
1278 /* Get or create the active sinkpad, must be called with SELECTOR_LOCK */
1279 static GstPad *
1280 gst_input_selector_activate_sinkpad (GstInputSelector * sel, GstPad * pad)
1281 {
1282   GstPad *active_sinkpad;
1283   GstSelectorPad *selpad;
1284
1285   selpad = GST_SELECTOR_PAD_CAST (pad);
1286
1287   selpad->active = TRUE;
1288   active_sinkpad = sel->active_sinkpad;
1289   if (active_sinkpad == NULL) {
1290     /* first pad we get activity on becomes the activated pad by default */
1291     if (sel->active_sinkpad)
1292       gst_object_unref (sel->active_sinkpad);
1293     active_sinkpad = sel->active_sinkpad = gst_object_ref (pad);
1294     GST_DEBUG_OBJECT (sel, "Activating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1295   }
1296
1297   return active_sinkpad;
1298 }
1299
1300 static GstPad *
1301 gst_input_selector_request_new_pad (GstElement * element,
1302     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps)
1303 {
1304   GstInputSelector *sel;
1305   gchar *name = NULL;
1306   GstPad *sinkpad = NULL;
1307
1308   g_return_val_if_fail (templ->direction == GST_PAD_SINK, NULL);
1309
1310   sel = GST_INPUT_SELECTOR (element);
1311
1312   GST_INPUT_SELECTOR_LOCK (sel);
1313
1314   GST_LOG_OBJECT (sel, "Creating new pad %d", sel->padcount);
1315   name = g_strdup_printf ("sink_%u", sel->padcount++);
1316   sinkpad = g_object_new (GST_TYPE_SELECTOR_PAD,
1317       "name", name, "direction", templ->direction, "template", templ, NULL);
1318   g_free (name);
1319
1320   sel->n_pads++;
1321
1322   gst_pad_set_event_function (sinkpad,
1323       GST_DEBUG_FUNCPTR (gst_selector_pad_event));
1324   gst_pad_set_getcaps_function (sinkpad,
1325       GST_DEBUG_FUNCPTR (gst_selector_pad_getcaps));
1326   gst_pad_set_acceptcaps_function (sinkpad,
1327       GST_DEBUG_FUNCPTR (gst_selector_pad_acceptcaps));
1328   gst_pad_set_query_function (sinkpad,
1329       GST_DEBUG_FUNCPTR (gst_selector_pad_query));
1330   gst_pad_set_chain_function (sinkpad,
1331       GST_DEBUG_FUNCPTR (gst_selector_pad_chain));
1332   gst_pad_set_iterate_internal_links_function (sinkpad,
1333       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
1334
1335   gst_pad_set_active (sinkpad, TRUE);
1336   gst_element_add_pad (GST_ELEMENT (sel), sinkpad);
1337   GST_INPUT_SELECTOR_UNLOCK (sel);
1338
1339   return sinkpad;
1340 }
1341
1342 static void
1343 gst_input_selector_release_pad (GstElement * element, GstPad * pad)
1344 {
1345   GstInputSelector *sel;
1346
1347   sel = GST_INPUT_SELECTOR (element);
1348   GST_LOG_OBJECT (sel, "Releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1349
1350   GST_INPUT_SELECTOR_LOCK (sel);
1351   /* if the pad was the active pad, makes us select a new one */
1352   if (sel->active_sinkpad == pad) {
1353     GST_DEBUG_OBJECT (sel, "Deactivating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1354     gst_object_unref (sel->active_sinkpad);
1355     sel->active_sinkpad = NULL;
1356   }
1357   sel->n_pads--;
1358
1359   gst_pad_set_active (pad, FALSE);
1360   gst_element_remove_pad (GST_ELEMENT (sel), pad);
1361   GST_INPUT_SELECTOR_UNLOCK (sel);
1362 }
1363
1364 static void
1365 gst_input_selector_reset (GstInputSelector * sel)
1366 {
1367   GList *walk;
1368
1369   GST_INPUT_SELECTOR_LOCK (sel);
1370   /* clear active pad */
1371   if (sel->active_sinkpad) {
1372     gst_object_unref (sel->active_sinkpad);
1373     sel->active_sinkpad = NULL;
1374   }
1375   /* reset each of our sinkpads state */
1376   for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
1377     GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data);
1378
1379     gst_selector_pad_reset (selpad);
1380
1381     if (selpad->tags) {
1382       gst_tag_list_free (selpad->tags);
1383       selpad->tags = NULL;
1384     }
1385   }
1386   GST_INPUT_SELECTOR_UNLOCK (sel);
1387 }
1388
1389 static GstStateChangeReturn
1390 gst_input_selector_change_state (GstElement * element,
1391     GstStateChange transition)
1392 {
1393   GstInputSelector *self = GST_INPUT_SELECTOR (element);
1394   GstStateChangeReturn result;
1395
1396   switch (transition) {
1397     case GST_STATE_CHANGE_READY_TO_PAUSED:
1398       GST_INPUT_SELECTOR_LOCK (self);
1399       self->blocked = FALSE;
1400       self->flushing = FALSE;
1401       GST_INPUT_SELECTOR_UNLOCK (self);
1402       break;
1403     case GST_STATE_CHANGE_PAUSED_TO_READY:
1404       /* first unlock before we call the parent state change function, which
1405        * tries to acquire the stream lock when going to ready. */
1406       GST_INPUT_SELECTOR_LOCK (self);
1407       self->blocked = FALSE;
1408       self->flushing = TRUE;
1409       GST_INPUT_SELECTOR_BROADCAST (self);
1410       GST_INPUT_SELECTOR_UNLOCK (self);
1411       break;
1412     default:
1413       break;
1414   }
1415
1416   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1417
1418   switch (transition) {
1419     case GST_STATE_CHANGE_PAUSED_TO_READY:
1420       gst_input_selector_reset (self);
1421       break;
1422     default:
1423       break;
1424   }
1425
1426   return result;
1427 }
1428
1429 static gint64
1430 gst_input_selector_block (GstInputSelector * self)
1431 {
1432   gint64 ret = 0;
1433   GstSelectorPad *spad;
1434
1435   GST_INPUT_SELECTOR_LOCK (self);
1436
1437   if (self->blocked)
1438     GST_WARNING_OBJECT (self, "switch already blocked");
1439
1440   self->blocked = TRUE;
1441   spad = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
1442
1443   if (spad)
1444     ret = gst_selector_pad_get_running_time (spad);
1445   else
1446     GST_DEBUG_OBJECT (self, "no active pad while blocking");
1447
1448   GST_INPUT_SELECTOR_UNLOCK (self);
1449
1450   return ret;
1451 }