Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / plugins / elements / gstinputselector.c
1 /* GStreamer input selector
2  * Copyright (C) 2003 Julien Moutte <julien@moutte.net>
3  * Copyright (C) 2005 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
4  * Copyright (C) 2005 Jan Schmidt <thaytan@mad.scientist.com>
5  * Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
6  * Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
7  * Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
8  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25
26 /**
27  * SECTION:element-input-selector
28  * @see_also: #GstOutputSelector
29  *
30  * Direct one out of N input streams to the output pad.
31  *
32  * The input pads are from a GstPad subclass and have additional
33  * properties, which users may find useful, namely:
34  *
35  * <itemizedlist>
36  * <listitem>
37  * "running-time": Running time of stream on pad (#gint64)
38  * </listitem>
39  * <listitem>
40  * "tags": The currently active tags on the pad (#GstTagList, boxed type)
41  * </listitem>
42  * <listitem>
43  * "active": If the pad is currently active (#gboolean)
44  * </listitem>
45  * <listitem>
46  * "always-ok" : Make an inactive pads return #GST_FLOW_OK instead of
47  * #GST_FLOW_NOT_LINKED
48  * </listitem>
49  * </itemizedlist>
50  *
51  * Since: 0.10.32
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include <string.h>
59
60 #include "gstinputselector.h"
61
62 GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
63 #define GST_CAT_DEFAULT input_selector_debug
64
65 #if GLIB_CHECK_VERSION(2, 26, 0)
66 #define NOTIFY_MUTEX_LOCK()
67 #define NOTIFY_MUTEX_UNLOCK()
68 #else
69 static GStaticRecMutex notify_mutex = G_STATIC_REC_MUTEX_INIT;
70 #define NOTIFY_MUTEX_LOCK() g_static_rec_mutex_lock (&notify_mutex)
71 #define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (&notify_mutex)
72 #endif
73
74 #define GST_INPUT_SELECTOR_GET_LOCK(sel) (((GstInputSelector*)(sel))->lock)
75 #define GST_INPUT_SELECTOR_GET_COND(sel) (((GstInputSelector*)(sel))->cond)
76 #define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
77 #define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
78 #define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
79                         GST_INPUT_SELECTOR_GET_LOCK(sel)))
80 #define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
81
82 static GstStaticPadTemplate gst_input_selector_sink_factory =
83 GST_STATIC_PAD_TEMPLATE ("sink%d",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS_ANY);
87
88 static GstStaticPadTemplate gst_input_selector_src_factory =
89 GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 enum
95 {
96   PROP_0,
97   PROP_N_PADS,
98   PROP_ACTIVE_PAD,
99   PROP_SYNC_STREAMS
100 };
101
102 #define DEFAULT_SYNC_STREAMS FALSE
103
104 #define DEFAULT_PAD_ALWAYS_OK TRUE
105
106 enum
107 {
108   PROP_PAD_0,
109   PROP_PAD_RUNNING_TIME,
110   PROP_PAD_TAGS,
111   PROP_PAD_ACTIVE,
112   PROP_PAD_ALWAYS_OK
113 };
114
115 enum
116 {
117   /* methods */
118   SIGNAL_BLOCK,
119   SIGNAL_SWITCH,
120   LAST_SIGNAL
121 };
122 static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 };
123
124 static inline gboolean gst_input_selector_is_active_sinkpad (GstInputSelector *
125     sel, GstPad * pad);
126 static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel,
127     GstPad * pad);
128 static GstPad *gst_input_selector_get_linked_pad (GstInputSelector * sel,
129     GstPad * pad, gboolean strict);
130
131 #define GST_TYPE_SELECTOR_PAD \
132   (gst_selector_pad_get_type())
133 #define GST_SELECTOR_PAD(obj) \
134   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SELECTOR_PAD, GstSelectorPad))
135 #define GST_SELECTOR_PAD_CLASS(klass) \
136   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SELECTOR_PAD, GstSelectorPadClass))
137 #define GST_IS_SELECTOR_PAD(obj) \
138   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SELECTOR_PAD))
139 #define GST_IS_SELECTOR_PAD_CLASS(klass) \
140   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SELECTOR_PAD))
141 #define GST_SELECTOR_PAD_CAST(obj) \
142   ((GstSelectorPad *)(obj))
143
144 typedef struct _GstSelectorPad GstSelectorPad;
145 typedef struct _GstSelectorPadClass GstSelectorPadClass;
146
147 struct _GstSelectorPad
148 {
149   GstPad parent;
150
151   gboolean active;              /* when buffer have passed the pad */
152   gboolean pushed;              /* when buffer was pushed downstream since activation */
153   gboolean eos;                 /* when EOS has been received */
154   gboolean eos_sent;            /* when EOS was sent downstream */
155   gboolean discont;             /* after switching we create a discont */
156   gboolean flushing;            /* set after flush-start and before flush-stop */
157   gboolean always_ok;
158   GstSegment segment;           /* the current segment on the pad */
159   GstTagList *tags;             /* last tags received on the pad */
160
161   gboolean segment_pending;
162 };
163
164 struct _GstSelectorPadClass
165 {
166   GstPadClass parent;
167 };
168
169 GType gst_selector_pad_get_type (void);
170 static void gst_selector_pad_finalize (GObject * object);
171 static void gst_selector_pad_get_property (GObject * object,
172     guint prop_id, GValue * value, GParamSpec * pspec);
173 static void gst_selector_pad_set_property (GObject * object,
174     guint prop_id, const GValue * value, GParamSpec * pspec);
175
176 static gint64 gst_selector_pad_get_running_time (GstSelectorPad * pad);
177 static void gst_selector_pad_reset (GstSelectorPad * pad);
178 static gboolean gst_selector_pad_event (GstPad * pad, GstEvent * event);
179 static GstCaps *gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter);
180 static gboolean gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps);
181 static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad);
182 static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstBuffer * buf);
183
184 G_DEFINE_TYPE (GstSelectorPad, gst_selector_pad, GST_TYPE_PAD);
185
186 static void
187 gst_selector_pad_class_init (GstSelectorPadClass * klass)
188 {
189   GObjectClass *gobject_class;
190
191   gobject_class = (GObjectClass *) klass;
192
193   gobject_class->finalize = gst_selector_pad_finalize;
194
195   gobject_class->get_property = gst_selector_pad_get_property;
196   gobject_class->set_property = gst_selector_pad_set_property;
197
198   g_object_class_install_property (gobject_class, PROP_PAD_RUNNING_TIME,
199       g_param_spec_int64 ("running-time", "Running time",
200           "Running time of stream on pad", 0, G_MAXINT64, 0,
201           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
202   g_object_class_install_property (gobject_class, PROP_PAD_TAGS,
203       g_param_spec_boxed ("tags", "Tags",
204           "The currently active tags on the pad", GST_TYPE_TAG_LIST,
205           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
206   g_object_class_install_property (gobject_class, PROP_PAD_ACTIVE,
207       g_param_spec_boolean ("active", "Active",
208           "If the pad is currently active", FALSE,
209           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
210   /* FIXME: better property name? */
211   g_object_class_install_property (gobject_class, PROP_PAD_ALWAYS_OK,
212       g_param_spec_boolean ("always-ok", "Always OK",
213           "Make an inactive pad return OK instead of NOT_LINKED",
214           DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215 }
216
217 static void
218 gst_selector_pad_init (GstSelectorPad * pad)
219 {
220   pad->always_ok = DEFAULT_PAD_ALWAYS_OK;
221   gst_selector_pad_reset (pad);
222 }
223
224 static void
225 gst_selector_pad_finalize (GObject * object)
226 {
227   GstSelectorPad *pad;
228
229   pad = GST_SELECTOR_PAD_CAST (object);
230
231   if (pad->tags)
232     gst_tag_list_free (pad->tags);
233
234   G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object);
235 }
236
237 static void
238 gst_selector_pad_set_property (GObject * object, guint prop_id,
239     const GValue * value, GParamSpec * pspec)
240 {
241   GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
242
243   switch (prop_id) {
244     case PROP_PAD_ALWAYS_OK:
245       GST_OBJECT_LOCK (object);
246       spad->always_ok = g_value_get_boolean (value);
247       GST_OBJECT_UNLOCK (object);
248       break;
249     default:
250       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
251       break;
252   }
253 }
254
255 static void
256 gst_selector_pad_get_property (GObject * object, guint prop_id,
257     GValue * value, GParamSpec * pspec)
258 {
259   GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
260
261   switch (prop_id) {
262     case PROP_PAD_RUNNING_TIME:
263       g_value_set_int64 (value, gst_selector_pad_get_running_time (spad));
264       break;
265     case PROP_PAD_TAGS:
266       GST_OBJECT_LOCK (object);
267       g_value_set_boxed (value, spad->tags);
268       GST_OBJECT_UNLOCK (object);
269       break;
270     case PROP_PAD_ACTIVE:
271     {
272       GstInputSelector *sel;
273
274       sel = GST_INPUT_SELECTOR (gst_pad_get_parent (spad));
275       g_value_set_boolean (value, gst_input_selector_is_active_sinkpad (sel,
276               GST_PAD_CAST (spad)));
277       gst_object_unref (sel);
278       break;
279     }
280     case PROP_PAD_ALWAYS_OK:
281       GST_OBJECT_LOCK (object);
282       g_value_set_boolean (value, spad->always_ok);
283       GST_OBJECT_UNLOCK (object);
284       break;
285     default:
286       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
287       break;
288   }
289 }
290
291 static gint64
292 gst_selector_pad_get_running_time (GstSelectorPad * pad)
293 {
294   gint64 ret = 0;
295
296   GST_OBJECT_LOCK (pad);
297   if (pad->active) {
298     gint64 position = pad->segment.position;
299
300     if (position >= 0)
301       ret = gst_segment_to_running_time (&pad->segment, GST_FORMAT_TIME,
302           position);
303   }
304   GST_OBJECT_UNLOCK (pad);
305
306   GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT,
307       GST_TIME_ARGS (ret));
308
309   return ret;
310 }
311
312 static void
313 gst_selector_pad_reset (GstSelectorPad * pad)
314 {
315   GST_OBJECT_LOCK (pad);
316   pad->active = FALSE;
317   pad->pushed = FALSE;
318   pad->eos = FALSE;
319   pad->eos_sent = FALSE;
320   pad->segment_pending = FALSE;
321   pad->discont = FALSE;
322   pad->flushing = FALSE;
323   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
324   GST_OBJECT_UNLOCK (pad);
325 }
326
327 /* strictly get the linked pad from the sinkpad. If the pad is active we return
328  * the srcpad else we return NULL */
329 static GstIterator *
330 gst_selector_pad_iterate_linked_pads (GstPad * pad)
331 {
332   GstInputSelector *sel;
333   GstPad *otherpad;
334   GstIterator *it;
335   GValue val = { 0, };
336
337   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
338   if (G_UNLIKELY (sel == NULL))
339     return NULL;
340
341   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
342   g_value_init (&val, GST_TYPE_PAD);
343   g_value_set_object (&val, otherpad);
344   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
345   g_value_unset (&val);
346
347   if (otherpad)
348     gst_object_unref (otherpad);
349   gst_object_unref (sel);
350
351   return it;
352 }
353
354 static gboolean
355 gst_selector_pad_event (GstPad * pad, GstEvent * event)
356 {
357   gboolean res = TRUE;
358   gboolean forward;
359   GstInputSelector *sel;
360   GstSelectorPad *selpad;
361   GstPad *prev_active_sinkpad;
362   GstPad *active_sinkpad;
363
364   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
365   if (G_UNLIKELY (sel == NULL)) {
366     gst_event_unref (event);
367     return FALSE;
368   }
369   selpad = GST_SELECTOR_PAD_CAST (pad);
370
371   GST_INPUT_SELECTOR_LOCK (sel);
372   prev_active_sinkpad = sel->active_sinkpad;
373   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
374
375   /* only forward if we are dealing with the active sinkpad */
376   forward = (pad == active_sinkpad);
377   GST_INPUT_SELECTOR_UNLOCK (sel);
378
379   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
380     NOTIFY_MUTEX_LOCK ();
381     g_object_notify (G_OBJECT (sel), "active-pad");
382     NOTIFY_MUTEX_UNLOCK ();
383   }
384
385   switch (GST_EVENT_TYPE (event)) {
386     case GST_EVENT_FLUSH_START:
387       /* Unblock the pad if it's waiting */
388       GST_INPUT_SELECTOR_LOCK (sel);
389       selpad->flushing = TRUE;
390       GST_INPUT_SELECTOR_BROADCAST (sel);
391       GST_INPUT_SELECTOR_UNLOCK (sel);
392       break;
393     case GST_EVENT_FLUSH_STOP:
394       GST_INPUT_SELECTOR_LOCK (sel);
395       gst_selector_pad_reset (selpad);
396       sel->pending_close = FALSE;
397       GST_INPUT_SELECTOR_UNLOCK (sel);
398       break;
399     case GST_EVENT_SEGMENT:
400     {
401       GST_INPUT_SELECTOR_LOCK (sel);
402       GST_OBJECT_LOCK (selpad);
403       gst_event_parse_segment (event, &selpad->segment);
404       GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT,
405           &selpad->segment);
406       GST_OBJECT_UNLOCK (selpad);
407
408       /* If we aren't forwarding the event because the pad is not the
409        * active_sinkpad, then set the flag on the pad
410        * that says a segment needs sending if/when that pad is activated.
411        * For all other cases, we send the event immediately, which makes
412        * sparse streams and other segment updates work correctly downstream.
413        */
414       if (!forward)
415         selpad->segment_pending = TRUE;
416
417       GST_INPUT_SELECTOR_UNLOCK (sel);
418       break;
419     }
420     case GST_EVENT_TAG:
421     {
422       GstTagList *tags, *oldtags, *newtags;
423
424       gst_event_parse_tag (event, &tags);
425
426       GST_OBJECT_LOCK (selpad);
427       oldtags = selpad->tags;
428
429       newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE);
430       selpad->tags = newtags;
431       if (oldtags)
432         gst_tag_list_free (oldtags);
433       GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags);
434       GST_OBJECT_UNLOCK (selpad);
435
436       g_object_notify (G_OBJECT (selpad), "tags");
437       break;
438     }
439     case GST_EVENT_EOS:
440       selpad->eos = TRUE;
441
442       if (forward) {
443         selpad->eos_sent = TRUE;
444       } else {
445         GstSelectorPad *tmp;
446
447         /* If the active sinkpad is in EOS state but EOS
448          * was not sent downstream this means that the pad
449          * got EOS before it was set as active pad and that
450          * the previously active pad got EOS after it was
451          * active
452          */
453         GST_INPUT_SELECTOR_LOCK (sel);
454         active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
455         tmp = GST_SELECTOR_PAD (active_sinkpad);
456         forward = (tmp->eos && !tmp->eos_sent);
457         tmp->eos_sent = TRUE;
458         GST_INPUT_SELECTOR_UNLOCK (sel);
459       }
460       GST_DEBUG_OBJECT (pad, "received EOS");
461       break;
462     default:
463       break;
464   }
465   if (forward) {
466     GST_DEBUG_OBJECT (pad, "forwarding event");
467     res = gst_pad_push_event (sel->srcpad, event);
468   } else
469     gst_event_unref (event);
470
471   gst_object_unref (sel);
472
473   return res;
474 }
475
476 static GstCaps *
477 gst_selector_pad_getcaps (GstPad * pad, GstCaps * filter)
478 {
479   GstInputSelector *sel;
480   GstCaps *caps;
481
482   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
483   if (G_UNLIKELY (sel == NULL))
484     return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
485
486   GST_DEBUG_OBJECT (sel, "Getting caps of srcpad peer");
487   caps = gst_pad_peer_get_caps (sel->srcpad, filter);
488   if (caps == NULL)
489     caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
490
491   gst_object_unref (sel);
492
493   return caps;
494 }
495
496 static gboolean
497 gst_selector_pad_acceptcaps (GstPad * pad, GstCaps * caps)
498 {
499   GstInputSelector *sel;
500   gboolean res;
501
502   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
503   if (G_UNLIKELY (sel == NULL))
504     return FALSE;
505
506   GST_DEBUG_OBJECT (sel, "Checking acceptcaps of srcpad peer");
507   res = gst_pad_peer_accept_caps (sel->srcpad, caps);
508   gst_object_unref (sel);
509
510   return res;
511 }
512
513 /* must be called with the SELECTOR_LOCK, will block while the pad is blocked 
514  * or return TRUE when flushing */
515 static gboolean
516 gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
517 {
518   while (self->blocked && !self->flushing && !pad->flushing) {
519     /* we can be unlocked here when we are shutting down (flushing) or when we
520      * get unblocked */
521     GST_INPUT_SELECTOR_WAIT (self);
522   }
523   return self->flushing;
524 }
525
526 /* must be called with the SELECTOR_LOCK, will block until the running time
527  * of the active pad is after this pad or return TRUE when flushing */
528 static gboolean
529 gst_input_selector_wait_running_time (GstInputSelector * sel,
530     GstSelectorPad * pad, GstBuffer * buf)
531 {
532   GstPad *active_sinkpad;
533   GstSelectorPad *active_selpad;
534   GstSegment *seg, *active_seg;
535   GstClockTime running_time, active_running_time = -1;
536
537   seg = &pad->segment;
538
539   active_sinkpad =
540       gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
541   active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
542   active_seg = &active_selpad->segment;
543
544   /* We can only sync if the segments are in time format or
545    * if the active pad had no newsegment event yet */
546   if (seg->format != GST_FORMAT_TIME ||
547       (active_seg->format != GST_FORMAT_TIME
548           && active_seg->format != GST_FORMAT_UNDEFINED))
549     return FALSE;
550
551   /* If we have no valid timestamp we can't sync this buffer */
552   if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf))
553     return FALSE;
554
555   running_time = GST_BUFFER_TIMESTAMP (buf);
556   /* If possible try to get the running time at the end of the buffer */
557   if (GST_BUFFER_DURATION_IS_VALID (buf))
558     running_time += GST_BUFFER_DURATION (buf);
559   if (running_time > seg->stop)
560     running_time = seg->stop;
561   running_time =
562       gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
563   /* If this is outside the segment don't sync */
564   if (running_time == -1)
565     return FALSE;
566
567   /* Get active pad's running time, if no configured segment yet keep at -1 */
568   if (active_seg->format == GST_FORMAT_TIME)
569     active_running_time =
570         gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
571         active_seg->position);
572
573   /* Wait until
574    *   a) this is the active pad
575    *   b) the pad or the selector is flushing
576    *   c) the selector is not blocked
577    *   d) the active pad has no running time or the active
578    *      pad's running time is before this running time
579    *   e) the active pad has a non-time segment
580    */
581   while (pad != active_selpad && !sel->flushing && !pad->flushing &&
582       (sel->blocked || active_running_time == -1
583           || running_time >= active_running_time)) {
584     if (!sel->blocked)
585       GST_DEBUG_OBJECT (pad,
586           "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
587           GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
588           GST_TIME_ARGS (active_running_time));
589
590     GST_INPUT_SELECTOR_WAIT (sel);
591
592     /* Get new active pad, it might have changed */
593     active_sinkpad =
594         gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
595     active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
596     active_seg = &active_selpad->segment;
597
598     /* If the active segment is configured but not to time format
599      * we can't do any syncing at all */
600     if (active_seg->format != GST_FORMAT_TIME
601         && active_seg->format != GST_FORMAT_UNDEFINED)
602       break;
603
604     /* Get the new active pad running time */
605     if (active_seg->format == GST_FORMAT_TIME)
606       active_running_time =
607           gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
608           active_seg->position);
609     else
610       active_running_time = -1;
611
612     if (!sel->blocked)
613       GST_DEBUG_OBJECT (pad,
614           "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %"
615           GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
616           GST_TIME_ARGS (active_running_time));
617
618   }
619
620   /* Return TRUE if the selector or the pad is flushing */
621   return (sel->flushing || pad->flushing);
622 }
623
624
625 static GstFlowReturn
626 gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
627 {
628   GstInputSelector *sel;
629   GstFlowReturn res;
630   GstPad *active_sinkpad;
631   GstPad *prev_active_sinkpad;
632   GstSelectorPad *selpad;
633   GstClockTime start_time;
634   GstSegment *seg;
635   GstEvent *start_event = NULL;
636 #if 0
637   GstCaps *caps;
638 #endif
639
640   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
641   selpad = GST_SELECTOR_PAD_CAST (pad);
642   seg = &selpad->segment;
643
644   GST_INPUT_SELECTOR_LOCK (sel);
645   /* wait or check for flushing */
646   if (gst_input_selector_wait (sel, selpad))
647     goto flushing;
648
649   GST_LOG_OBJECT (pad, "getting active pad");
650
651   prev_active_sinkpad = sel->active_sinkpad;
652   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
653
654   /* In sync mode wait until the active pad has advanced
655    * after the running time of the current buffer */
656   if (sel->sync_streams && active_sinkpad != pad) {
657     if (gst_input_selector_wait_running_time (sel, selpad, buf))
658       goto flushing;
659   }
660
661   /* Might have changed while waiting */
662   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
663
664   /* update the segment on the srcpad */
665   start_time = GST_BUFFER_TIMESTAMP (buf);
666   if (GST_CLOCK_TIME_IS_VALID (start_time)) {
667     GST_LOG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
668         GST_TIME_ARGS (start_time));
669     if (GST_BUFFER_DURATION_IS_VALID (buf))
670       GST_LOG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
671           GST_TIME_ARGS (start_time + GST_BUFFER_DURATION (buf)));
672
673     GST_OBJECT_LOCK (pad);
674     seg->position = start_time;
675     GST_OBJECT_UNLOCK (pad);
676   }
677
678   /* Ignore buffers from pads except the selected one */
679   if (pad != active_sinkpad)
680     goto ignore;
681
682   /* Tell all non-active pads that we advanced the running time */
683   if (sel->sync_streams)
684     GST_INPUT_SELECTOR_BROADCAST (sel);
685
686   /* if we have a pending segment, push it out now */
687   if (G_UNLIKELY (selpad->segment_pending)) {
688     GST_DEBUG_OBJECT (pad,
689         "pushing pending NEWSEGMENT update %d, rate %lf, applied rate %lf, "
690         "format %d, "
691         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
692         G_GINT64_FORMAT, FALSE, seg->rate, seg->applied_rate, seg->format,
693         seg->start, seg->stop, seg->time);
694
695     start_event = gst_event_new_segment (seg);
696
697     selpad->segment_pending = FALSE;
698   }
699   GST_INPUT_SELECTOR_UNLOCK (sel);
700
701   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
702     NOTIFY_MUTEX_LOCK ();
703     g_object_notify (G_OBJECT (sel), "active-pad");
704     NOTIFY_MUTEX_UNLOCK ();
705   }
706
707   if (start_event)
708     gst_pad_push_event (sel->srcpad, start_event);
709
710   if (selpad->discont) {
711     buf = gst_buffer_make_writable (buf);
712
713     GST_DEBUG_OBJECT (pad, "Marking discont buffer %p", buf);
714     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
715     selpad->discont = FALSE;
716   }
717
718   /* forward */
719   GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf);
720
721 #if 0
722   if ((caps = GST_BUFFER_CAPS (buf))) {
723     if (GST_PAD_CAPS (sel->srcpad) != caps)
724       gst_pad_set_caps (sel->srcpad, caps);
725   }
726 #endif
727
728   res = gst_pad_push (sel->srcpad, buf);
729   selpad->pushed = TRUE;
730
731 done:
732   gst_object_unref (sel);
733   return res;
734
735   /* dropped buffers */
736 ignore:
737   {
738     gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
739
740     GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
741     /* when we drop a buffer, we're creating a discont on this pad */
742     selpad->discont = TRUE;
743     GST_INPUT_SELECTOR_UNLOCK (sel);
744     gst_buffer_unref (buf);
745
746     /* figure out what to return upstream */
747     GST_OBJECT_LOCK (selpad);
748     if (selpad->always_ok || !active_pad_pushed)
749       res = GST_FLOW_OK;
750     else
751       res = GST_FLOW_NOT_LINKED;
752     GST_OBJECT_UNLOCK (selpad);
753
754     goto done;
755   }
756 flushing:
757   {
758     GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf);
759     GST_INPUT_SELECTOR_UNLOCK (sel);
760     gst_buffer_unref (buf);
761     res = GST_FLOW_WRONG_STATE;
762     goto done;
763   }
764 }
765
766 static void gst_input_selector_dispose (GObject * object);
767
768 static void gst_input_selector_set_property (GObject * object,
769     guint prop_id, const GValue * value, GParamSpec * pspec);
770 static void gst_input_selector_get_property (GObject * object,
771     guint prop_id, GValue * value, GParamSpec * pspec);
772
773 static GstPad *gst_input_selector_request_new_pad (GstElement * element,
774     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps);
775 static void gst_input_selector_release_pad (GstElement * element, GstPad * pad);
776
777 static GstStateChangeReturn gst_input_selector_change_state (GstElement *
778     element, GstStateChange transition);
779
780 static GstCaps *gst_input_selector_getcaps (GstPad * pad, GstCaps * filter);
781 static gboolean gst_input_selector_event (GstPad * pad, GstEvent * event);
782 static gboolean gst_input_selector_query (GstPad * pad, GstQuery ** query);
783 static gint64 gst_input_selector_block (GstInputSelector * self);
784 static void gst_input_selector_switch (GstInputSelector * self,
785     GstPad * pad, gint64 stop_time, gint64 start_time);
786
787 /* FIXME: create these marshallers using glib-genmarshal */
788 #define g_marshal_value_peek_object(v)   (v)->data[0].v_pointer
789 #define g_marshal_value_peek_int64(v)    (v)->data[0].v_int64
790
791 static void
792 gst_input_selector_marshal_INT64__VOID (GClosure * closure,
793     GValue * return_value G_GNUC_UNUSED,
794     guint n_param_values,
795     const GValue * param_values,
796     gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
797 {
798   typedef gint64 (*GMarshalFunc_INT64__VOID) (gpointer data1, gpointer data2);
799   register GMarshalFunc_INT64__VOID callback;
800   register GCClosure *cc = (GCClosure *) closure;
801   register gpointer data1, data2;
802   gint64 v_return;
803
804   g_return_if_fail (return_value != NULL);
805   g_return_if_fail (n_param_values == 1);
806
807   if (G_CCLOSURE_SWAP_DATA (closure)) {
808     data1 = closure->data;
809     data2 = g_value_peek_pointer (param_values + 0);
810   } else {
811     data1 = g_value_peek_pointer (param_values + 0);
812     data2 = closure->data;
813   }
814   callback =
815       (GMarshalFunc_INT64__VOID) (marshal_data ? marshal_data : cc->callback);
816
817   v_return = callback (data1, data2);
818
819   g_value_set_int64 (return_value, v_return);
820 }
821
822 static void
823 gst_input_selector_marshal_VOID__OBJECT_INT64_INT64 (GClosure * closure,
824     GValue * return_value G_GNUC_UNUSED,
825     guint n_param_values,
826     const GValue * param_values,
827     gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
828 {
829   typedef void (*GMarshalFunc_VOID__OBJECT_INT64_INT64) (gpointer data1,
830       gpointer arg_1, gint64 arg_2, gint64 arg_3, gpointer data2);
831   register GMarshalFunc_VOID__OBJECT_INT64_INT64 callback;
832   register GCClosure *cc = (GCClosure *) closure;
833   register gpointer data1, data2;
834
835   g_return_if_fail (n_param_values == 4);
836
837   if (G_CCLOSURE_SWAP_DATA (closure)) {
838     data1 = closure->data;
839     data2 = g_value_peek_pointer (param_values + 0);
840   } else {
841     data1 = g_value_peek_pointer (param_values + 0);
842     data2 = closure->data;
843   }
844   callback =
845       (GMarshalFunc_VOID__OBJECT_INT64_INT64) (marshal_data ? marshal_data :
846       cc->callback);
847
848   callback (data1,
849       g_marshal_value_peek_object (param_values + 1),
850       g_marshal_value_peek_int64 (param_values + 2),
851       g_marshal_value_peek_int64 (param_values + 3), data2);
852 }
853
854 #define _do_init \
855     GST_DEBUG_CATEGORY_INIT (input_selector_debug, \
856         "input-selector", 0, "An input stream selector element");
857 #define gst_input_selector_parent_class parent_class
858 G_DEFINE_TYPE_WITH_CODE (GstInputSelector, gst_input_selector, GST_TYPE_ELEMENT,
859     _do_init);
860
861 static void
862 gst_input_selector_class_init (GstInputSelectorClass * klass)
863 {
864   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
865   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
866
867   gobject_class->dispose = gst_input_selector_dispose;
868
869   gobject_class->set_property = gst_input_selector_set_property;
870   gobject_class->get_property = gst_input_selector_get_property;
871
872   g_object_class_install_property (gobject_class, PROP_N_PADS,
873       g_param_spec_uint ("n-pads", "Number of Pads",
874           "The number of sink pads", 0, G_MAXUINT, 0,
875           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
876
877   g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
878       g_param_spec_object ("active-pad", "Active pad",
879           "The currently active sink pad", GST_TYPE_PAD,
880           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
881
882   /**
883    * GstInputSelector:sync-streams
884    *
885    * If set to %TRUE all inactive streams will be synced to the
886    * running time of the active stream. This makes sure that no
887    * buffers are dropped by input-selector that might be needed
888    * when switching the active pad.
889    *
890    * Since: 0.10.35
891    */
892   g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
893       g_param_spec_boolean ("sync-streams", "Sync Streams",
894           "Synchronize inactive streams to the running time of the active stream",
895           DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
896
897   /**
898    * GstInputSelector::block:
899    * @inputselector: the #GstInputSelector
900    *
901    * Block all sink pads in preparation for a switch. Returns the stop time of
902    * the current switch segment, as a running time, or 0 if there is no current
903    * active pad or the current active pad never received data.
904    */
905   gst_input_selector_signals[SIGNAL_BLOCK] =
906       g_signal_new ("block", G_TYPE_FROM_CLASS (klass),
907       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
908       G_STRUCT_OFFSET (GstInputSelectorClass, block), NULL, NULL,
909       gst_input_selector_marshal_INT64__VOID, G_TYPE_INT64, 0);
910   /**
911    * GstInputSelector::switch:
912    * @inputselector: the #GstInputSelector
913    * @pad:            the pad to switch to
914    * @stop_time:      running time at which to close the previous segment, or -1
915    *                  to use the running time of the previously active sink pad
916    * @start_time:     running time at which to start the new segment, or -1 to
917    *                  use the running time of the newly active sink pad
918    *
919    * Switch to a new feed. The segment opened by the previously active pad, if
920    * any, will be closed, and a new segment opened before data flows again.
921    *
922    * This signal must be emitted when the element has been blocked via the <link
923    * linkend="GstInputSelector-block">block</link> signal.
924    *
925    * If you have a stream with only one switch element, such as an audio-only
926    * stream, a stream switch should be performed by first emitting the block
927    * signal, and then emitting the switch signal with -1 for the stop and start
928    * time values.
929    *
930    * The intention of the @stop_time and @start_time arguments is to allow
931    * multiple switch elements to switch and maintain stream synchronization.
932    * When switching a stream with multiple feeds, you will need as many switch
933    * elements as you have feeds. For example, a feed with audio and video will
934    * have one switch element between the audio feeds and one for video.
935    *
936    * A switch over multiple switch elements should be performed as follows:
937    * First, emit the <link linkend="GstInputSelector-block">block</link>
938    * signal, collecting the returned values. The maximum running time returned
939    * by block should then be used as the time at which to close the previous
940    * segment.
941    *
942    * Then, query the running times of the new audio and video pads that you will
943    * switch to. Naturally, these pads are on separate switch elements. Take the
944    * minimum running time for those streams and use it for the time at which to
945    * open the new segment.
946    *
947    * If @pad is the same as the current active pad, the element will cancel any
948    * previous block without adjusting segments.
949    *
950    * <note><simpara>
951    * the signal changed from accepting the pad name to the pad object.
952    * </simpara></note>
953    *
954    * Since: 0.10.7
955    */
956   gst_input_selector_signals[SIGNAL_SWITCH] =
957       g_signal_new ("switch", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
958       G_STRUCT_OFFSET (GstInputSelectorClass, switch_),
959       NULL, NULL, gst_input_selector_marshal_VOID__OBJECT_INT64_INT64,
960       G_TYPE_NONE, 3, GST_TYPE_PAD, G_TYPE_INT64, G_TYPE_INT64);
961
962   gst_element_class_set_details_simple (gstelement_class, "Input selector",
963       "Generic", "N-to-1 input stream selector",
964       "Julien Moutte <julien@moutte.net>, "
965       "Jan Schmidt <thaytan@mad.scientist.com>, "
966       "Wim Taymans <wim.taymans@gmail.com>");
967   gst_element_class_add_pad_template (gstelement_class,
968       gst_static_pad_template_get (&gst_input_selector_sink_factory));
969   gst_element_class_add_pad_template (gstelement_class,
970       gst_static_pad_template_get (&gst_input_selector_src_factory));
971
972   gstelement_class->request_new_pad = gst_input_selector_request_new_pad;
973   gstelement_class->release_pad = gst_input_selector_release_pad;
974   gstelement_class->change_state = gst_input_selector_change_state;
975
976   klass->block = GST_DEBUG_FUNCPTR (gst_input_selector_block);
977   /* note the underscore because switch is a keyword otherwise */
978   klass->switch_ = GST_DEBUG_FUNCPTR (gst_input_selector_switch);
979 }
980
981 static void
982 gst_input_selector_init (GstInputSelector * sel)
983 {
984   sel->srcpad = gst_pad_new ("src", GST_PAD_SRC);
985   gst_pad_set_iterate_internal_links_function (sel->srcpad,
986       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
987   gst_pad_set_getcaps_function (sel->srcpad,
988       GST_DEBUG_FUNCPTR (gst_input_selector_getcaps));
989   gst_pad_set_query_function (sel->srcpad,
990       GST_DEBUG_FUNCPTR (gst_input_selector_query));
991   gst_pad_set_event_function (sel->srcpad,
992       GST_DEBUG_FUNCPTR (gst_input_selector_event));
993   gst_element_add_pad (GST_ELEMENT (sel), sel->srcpad);
994   /* sinkpad management */
995   sel->active_sinkpad = NULL;
996   sel->padcount = 0;
997   gst_segment_init (&sel->segment, GST_FORMAT_UNDEFINED);
998   sel->sync_streams = DEFAULT_SYNC_STREAMS;
999
1000   sel->lock = g_mutex_new ();
1001   sel->cond = g_cond_new ();
1002   sel->blocked = FALSE;
1003 }
1004
1005 static void
1006 gst_input_selector_dispose (GObject * object)
1007 {
1008   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
1009
1010   if (sel->active_sinkpad) {
1011     gst_object_unref (sel->active_sinkpad);
1012     sel->active_sinkpad = NULL;
1013   }
1014   if (sel->lock) {
1015     g_mutex_free (sel->lock);
1016     sel->lock = NULL;
1017   }
1018   if (sel->cond) {
1019     g_cond_free (sel->cond);
1020     sel->cond = NULL;
1021   }
1022
1023   G_OBJECT_CLASS (parent_class)->dispose (object);
1024 }
1025
1026 /* Solve the following equation for B.timestamp, and set that as the segment
1027  * stop:
1028  * B.running_time = (B.timestamp - NS.start) / NS.abs_rate + NS.accum
1029  */
1030 static gint64
1031 gst_segment_get_timestamp (GstSegment * segment, gint64 running_time)
1032 {
1033   if (running_time <= segment->base)
1034     return segment->start;
1035   else
1036     return (running_time - segment->base) * ABS (segment->rate) +
1037         segment->start;
1038 }
1039
1040 static void
1041 gst_segment_set_stop (GstSegment * segment, gint64 running_time)
1042 {
1043   segment->stop = gst_segment_get_timestamp (segment, running_time);
1044   segment->position = -1;
1045 }
1046
1047 static void
1048 gst_segment_set_start (GstSegment * segment, gint64 running_time)
1049 {
1050   gint64 new_start, duration;
1051
1052   new_start = gst_segment_get_timestamp (segment, running_time);
1053
1054   /* this is the duration we skipped */
1055   duration = new_start - segment->start;
1056   /* add the duration to the accumulated segment time */
1057   segment->base += duration;
1058   /* move position in the segment */
1059   segment->time += duration;
1060   segment->start += duration;
1061 }
1062
1063 /* this function must be called with the SELECTOR_LOCK. It returns TRUE when the
1064  * active pad changed. */
1065 static gboolean
1066 gst_input_selector_set_active_pad (GstInputSelector * self,
1067     GstPad * pad, gint64 stop_time, gint64 start_time)
1068 {
1069   GstSelectorPad *old, *new;
1070   GstPad **active_pad_p;
1071
1072   if (pad == self->active_sinkpad)
1073     return FALSE;
1074
1075   old = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
1076   new = GST_SELECTOR_PAD_CAST (pad);
1077
1078   GST_DEBUG_OBJECT (self, "setting active pad to %s:%s",
1079       GST_DEBUG_PAD_NAME (new));
1080
1081   if (!GST_CLOCK_TIME_IS_VALID (stop_time) && old) {
1082     /* no stop time given, get the latest running_time on the active pad to 
1083      * close and open the new segment */
1084     stop_time = start_time = gst_selector_pad_get_running_time (old);
1085     GST_DEBUG_OBJECT (self, "using start/stop of %" GST_TIME_FORMAT,
1086         GST_TIME_ARGS (start_time));
1087   }
1088
1089   if (old && old->active && !self->pending_close && stop_time >= 0) {
1090     /* schedule a last_stop update if one isn't already scheduled, and a
1091        segment has been pushed before. */
1092     memcpy (&self->segment, &old->segment, sizeof (self->segment));
1093
1094     GST_DEBUG_OBJECT (self, "setting stop_time to %" GST_TIME_FORMAT,
1095         GST_TIME_ARGS (stop_time));
1096     gst_segment_set_stop (&self->segment, stop_time);
1097     self->pending_close = TRUE;
1098   }
1099   if (old)
1100     old->pushed = FALSE;
1101
1102   if (new && new->active && start_time >= 0) {
1103     GST_DEBUG_OBJECT (self, "setting start_time to %" GST_TIME_FORMAT,
1104         GST_TIME_ARGS (start_time));
1105     /* schedule a new segment push */
1106     gst_segment_set_start (&new->segment, start_time);
1107     new->segment_pending = TRUE;
1108   }
1109   if (new)
1110     new->pushed = FALSE;
1111
1112   active_pad_p = &self->active_sinkpad;
1113   gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
1114
1115   /* Wake up all non-active pads in sync mode, they might be
1116    * the active pad now */
1117   if (self->sync_streams)
1118     GST_INPUT_SELECTOR_BROADCAST (self);
1119
1120   GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
1121       self->active_sinkpad);
1122
1123   return TRUE;
1124 }
1125
1126 static void
1127 gst_input_selector_set_property (GObject * object, guint prop_id,
1128     const GValue * value, GParamSpec * pspec)
1129 {
1130   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
1131
1132   switch (prop_id) {
1133     case PROP_ACTIVE_PAD:
1134     {
1135       GstPad *pad;
1136
1137       pad = g_value_get_object (value);
1138
1139       GST_INPUT_SELECTOR_LOCK (sel);
1140       gst_input_selector_set_active_pad (sel, pad,
1141           GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE);
1142       GST_INPUT_SELECTOR_UNLOCK (sel);
1143       break;
1144     }
1145     case PROP_SYNC_STREAMS:
1146     {
1147       GST_INPUT_SELECTOR_LOCK (sel);
1148       sel->sync_streams = g_value_get_boolean (value);
1149       GST_INPUT_SELECTOR_UNLOCK (sel);
1150       break;
1151     }
1152     default:
1153       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1154       break;
1155   }
1156 }
1157
1158 static void
1159 gst_input_selector_get_property (GObject * object, guint prop_id,
1160     GValue * value, GParamSpec * pspec)
1161 {
1162   GstInputSelector *sel = GST_INPUT_SELECTOR (object);
1163
1164   switch (prop_id) {
1165     case PROP_N_PADS:
1166       GST_INPUT_SELECTOR_LOCK (object);
1167       g_value_set_uint (value, sel->n_pads);
1168       GST_INPUT_SELECTOR_UNLOCK (object);
1169       break;
1170     case PROP_ACTIVE_PAD:
1171       GST_INPUT_SELECTOR_LOCK (object);
1172       g_value_set_object (value, sel->active_sinkpad);
1173       GST_INPUT_SELECTOR_UNLOCK (object);
1174       break;
1175     case PROP_SYNC_STREAMS:
1176       GST_INPUT_SELECTOR_LOCK (object);
1177       g_value_set_boolean (value, sel->sync_streams);
1178       GST_INPUT_SELECTOR_UNLOCK (object);
1179       break;
1180     default:
1181       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1182       break;
1183   }
1184 }
1185
1186 static GstPad *
1187 gst_input_selector_get_linked_pad (GstInputSelector * sel, GstPad * pad,
1188     gboolean strict)
1189 {
1190   GstPad *otherpad = NULL;
1191
1192   GST_INPUT_SELECTOR_LOCK (sel);
1193   if (pad == sel->srcpad)
1194     otherpad = sel->active_sinkpad;
1195   else if (pad == sel->active_sinkpad || !strict)
1196     otherpad = sel->srcpad;
1197   if (otherpad)
1198     gst_object_ref (otherpad);
1199   GST_INPUT_SELECTOR_UNLOCK (sel);
1200
1201   return otherpad;
1202 }
1203
1204 static gboolean
1205 gst_input_selector_event (GstPad * pad, GstEvent * event)
1206 {
1207   GstInputSelector *sel;
1208   gboolean res = FALSE;
1209   GstPad *otherpad;
1210
1211   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1212   if (G_UNLIKELY (sel == NULL)) {
1213     gst_event_unref (event);
1214     return FALSE;
1215   }
1216
1217   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
1218   if (otherpad) {
1219     res = gst_pad_push_event (otherpad, event);
1220
1221     gst_object_unref (otherpad);
1222   } else
1223     gst_event_unref (event);
1224
1225   gst_object_unref (sel);
1226   return res;
1227 }
1228
1229 /* query on the srcpad. We override this function because by default it will
1230  * only forward the query to one random sinkpad */
1231 static gboolean
1232 gst_input_selector_query (GstPad * pad, GstQuery ** query)
1233 {
1234   gboolean res = TRUE;
1235   GstInputSelector *sel;
1236   GstPad *otherpad;
1237
1238   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1239   if (G_UNLIKELY (sel == NULL))
1240     return FALSE;
1241
1242   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
1243
1244   switch (GST_QUERY_TYPE (*query)) {
1245     case GST_QUERY_LATENCY:
1246     {
1247       GList *walk;
1248       GstClockTime resmin, resmax;
1249       gboolean reslive;
1250
1251       resmin = 0;
1252       resmax = -1;
1253       reslive = FALSE;
1254
1255       /* assume FALSE, we become TRUE if one query succeeds */
1256       res = FALSE;
1257
1258       /* perform the query on all sinkpads and combine the results. We take the
1259        * max of min and the min of max for the result latency. */
1260       GST_INPUT_SELECTOR_LOCK (sel);
1261       for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk;
1262           walk = g_list_next (walk)) {
1263         GstPad *sinkpad = GST_PAD_CAST (walk->data);
1264
1265         if (gst_pad_peer_query (sinkpad, query)) {
1266           GstClockTime min, max;
1267           gboolean live;
1268
1269           /* one query succeeded, we succeed too */
1270           res = TRUE;
1271
1272           gst_query_parse_latency (*query, &live, &min, &max);
1273
1274           GST_DEBUG_OBJECT (sinkpad,
1275               "peer latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1276               ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
1277
1278           if (live) {
1279             if (min > resmin)
1280               resmin = min;
1281             if (resmax == -1)
1282               resmax = max;
1283             else if (max < resmax)
1284               resmax = max;
1285             if (reslive == FALSE)
1286               reslive = live;
1287           }
1288         }
1289       }
1290       GST_INPUT_SELECTOR_UNLOCK (sel);
1291       if (res) {
1292         gst_query_set_latency (*query, reslive, resmin, resmax);
1293
1294         GST_DEBUG_OBJECT (sel,
1295             "total latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1296             ", live %d", GST_TIME_ARGS (resmin), GST_TIME_ARGS (resmax),
1297             reslive);
1298       }
1299
1300       break;
1301     }
1302     default:
1303       if (otherpad)
1304         res = gst_pad_peer_query (otherpad, query);
1305       break;
1306   }
1307   if (otherpad)
1308     gst_object_unref (otherpad);
1309   gst_object_unref (sel);
1310
1311   return res;
1312 }
1313
1314 static GstCaps *
1315 gst_input_selector_getcaps (GstPad * pad, GstCaps * filter)
1316 {
1317   GstPad *otherpad;
1318   GstInputSelector *sel;
1319   GstCaps *caps;
1320
1321   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
1322   if (G_UNLIKELY (sel == NULL))
1323     return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1324
1325   otherpad = gst_input_selector_get_linked_pad (sel, pad, FALSE);
1326
1327   if (!otherpad) {
1328     GST_DEBUG_OBJECT (pad, "Pad not linked, returning ANY");
1329     caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1330   } else {
1331     GST_DEBUG_OBJECT (pad, "Pad is linked (to %s:%s), returning peer caps",
1332         GST_DEBUG_PAD_NAME (otherpad));
1333     /* if the peer has caps, use those. If the pad is not linked, this function
1334      * returns NULL and we return ANY */
1335     if (!(caps = gst_pad_peer_get_caps (otherpad, filter)))
1336       caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
1337     gst_object_unref (otherpad);
1338   }
1339
1340   gst_object_unref (sel);
1341   return caps;
1342 }
1343
1344 /* check if the pad is the active sinkpad */
1345 static inline gboolean
1346 gst_input_selector_is_active_sinkpad (GstInputSelector * sel, GstPad * pad)
1347 {
1348   gboolean res;
1349
1350   GST_INPUT_SELECTOR_LOCK (sel);
1351   res = (pad == sel->active_sinkpad);
1352   GST_INPUT_SELECTOR_UNLOCK (sel);
1353
1354   return res;
1355 }
1356
1357 /* Get or create the active sinkpad, must be called with SELECTOR_LOCK */
1358 static GstPad *
1359 gst_input_selector_activate_sinkpad (GstInputSelector * sel, GstPad * pad)
1360 {
1361   GstPad *active_sinkpad;
1362   GstSelectorPad *selpad;
1363
1364   selpad = GST_SELECTOR_PAD_CAST (pad);
1365
1366   selpad->active = TRUE;
1367   active_sinkpad = sel->active_sinkpad;
1368   if (active_sinkpad == NULL) {
1369     /* first pad we get activity on becomes the activated pad by default */
1370     if (sel->active_sinkpad)
1371       gst_object_unref (sel->active_sinkpad);
1372     active_sinkpad = sel->active_sinkpad = gst_object_ref (pad);
1373     GST_DEBUG_OBJECT (sel, "Activating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1374   }
1375
1376   return active_sinkpad;
1377 }
1378
1379 static GstPad *
1380 gst_input_selector_request_new_pad (GstElement * element,
1381     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps)
1382 {
1383   GstInputSelector *sel;
1384   gchar *name = NULL;
1385   GstPad *sinkpad = NULL;
1386
1387   g_return_val_if_fail (templ->direction == GST_PAD_SINK, NULL);
1388
1389   sel = GST_INPUT_SELECTOR (element);
1390
1391   GST_INPUT_SELECTOR_LOCK (sel);
1392
1393   GST_LOG_OBJECT (sel, "Creating new pad %d", sel->padcount);
1394   name = g_strdup_printf ("sink%d", sel->padcount++);
1395   sinkpad = g_object_new (GST_TYPE_SELECTOR_PAD,
1396       "name", name, "direction", templ->direction, "template", templ, NULL);
1397   g_free (name);
1398
1399   sel->n_pads++;
1400
1401   gst_pad_set_event_function (sinkpad,
1402       GST_DEBUG_FUNCPTR (gst_selector_pad_event));
1403   gst_pad_set_getcaps_function (sinkpad,
1404       GST_DEBUG_FUNCPTR (gst_selector_pad_getcaps));
1405   gst_pad_set_acceptcaps_function (sinkpad,
1406       GST_DEBUG_FUNCPTR (gst_selector_pad_acceptcaps));
1407   gst_pad_set_chain_function (sinkpad,
1408       GST_DEBUG_FUNCPTR (gst_selector_pad_chain));
1409   gst_pad_set_iterate_internal_links_function (sinkpad,
1410       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
1411
1412   gst_pad_set_active (sinkpad, TRUE);
1413   gst_element_add_pad (GST_ELEMENT (sel), sinkpad);
1414   GST_INPUT_SELECTOR_UNLOCK (sel);
1415
1416   return sinkpad;
1417 }
1418
1419 static void
1420 gst_input_selector_release_pad (GstElement * element, GstPad * pad)
1421 {
1422   GstInputSelector *sel;
1423
1424   sel = GST_INPUT_SELECTOR (element);
1425   GST_LOG_OBJECT (sel, "Releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1426
1427   GST_INPUT_SELECTOR_LOCK (sel);
1428   /* if the pad was the active pad, makes us select a new one */
1429   if (sel->active_sinkpad == pad) {
1430     GST_DEBUG_OBJECT (sel, "Deactivating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1431     gst_object_unref (sel->active_sinkpad);
1432     sel->active_sinkpad = NULL;
1433   }
1434   sel->n_pads--;
1435
1436   gst_pad_set_active (pad, FALSE);
1437   gst_element_remove_pad (GST_ELEMENT (sel), pad);
1438   GST_INPUT_SELECTOR_UNLOCK (sel);
1439 }
1440
1441 static void
1442 gst_input_selector_reset (GstInputSelector * sel)
1443 {
1444   GList *walk;
1445
1446   GST_INPUT_SELECTOR_LOCK (sel);
1447   /* clear active pad */
1448   if (sel->active_sinkpad) {
1449     gst_object_unref (sel->active_sinkpad);
1450     sel->active_sinkpad = NULL;
1451   }
1452   /* reset segment */
1453   gst_segment_init (&sel->segment, GST_FORMAT_UNDEFINED);
1454   sel->pending_close = FALSE;
1455   /* reset each of our sinkpads state */
1456   for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
1457     GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data);
1458
1459     gst_selector_pad_reset (selpad);
1460
1461     if (selpad->tags) {
1462       gst_tag_list_free (selpad->tags);
1463       selpad->tags = NULL;
1464     }
1465   }
1466   GST_INPUT_SELECTOR_UNLOCK (sel);
1467 }
1468
1469 static GstStateChangeReturn
1470 gst_input_selector_change_state (GstElement * element,
1471     GstStateChange transition)
1472 {
1473   GstInputSelector *self = GST_INPUT_SELECTOR (element);
1474   GstStateChangeReturn result;
1475
1476   switch (transition) {
1477     case GST_STATE_CHANGE_READY_TO_PAUSED:
1478       GST_INPUT_SELECTOR_LOCK (self);
1479       self->blocked = FALSE;
1480       self->flushing = FALSE;
1481       GST_INPUT_SELECTOR_UNLOCK (self);
1482       break;
1483     case GST_STATE_CHANGE_PAUSED_TO_READY:
1484       /* first unlock before we call the parent state change function, which
1485        * tries to acquire the stream lock when going to ready. */
1486       GST_INPUT_SELECTOR_LOCK (self);
1487       self->blocked = FALSE;
1488       self->flushing = TRUE;
1489       GST_INPUT_SELECTOR_BROADCAST (self);
1490       GST_INPUT_SELECTOR_UNLOCK (self);
1491       break;
1492     default:
1493       break;
1494   }
1495
1496   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1497
1498   switch (transition) {
1499     case GST_STATE_CHANGE_PAUSED_TO_READY:
1500       gst_input_selector_reset (self);
1501       break;
1502     default:
1503       break;
1504   }
1505
1506   return result;
1507 }
1508
1509 static gint64
1510 gst_input_selector_block (GstInputSelector * self)
1511 {
1512   gint64 ret = 0;
1513   GstSelectorPad *spad;
1514
1515   GST_INPUT_SELECTOR_LOCK (self);
1516
1517   if (self->blocked)
1518     GST_WARNING_OBJECT (self, "switch already blocked");
1519
1520   self->blocked = TRUE;
1521   spad = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
1522
1523   if (spad)
1524     ret = gst_selector_pad_get_running_time (spad);
1525   else
1526     GST_DEBUG_OBJECT (self, "no active pad while blocking");
1527
1528   GST_INPUT_SELECTOR_UNLOCK (self);
1529
1530   return ret;
1531 }
1532
1533 /* stop_time and start_time are running times */
1534 static void
1535 gst_input_selector_switch (GstInputSelector * self, GstPad * pad,
1536     gint64 stop_time, gint64 start_time)
1537 {
1538   gboolean changed;
1539
1540   g_return_if_fail (self->blocked == TRUE);
1541
1542   GST_INPUT_SELECTOR_LOCK (self);
1543   changed =
1544       gst_input_selector_set_active_pad (self, pad, stop_time, start_time);
1545
1546   self->blocked = FALSE;
1547   GST_INPUT_SELECTOR_BROADCAST (self);
1548   GST_INPUT_SELECTOR_UNLOCK (self);
1549
1550   if (changed) {
1551     NOTIFY_MUTEX_LOCK ();
1552     g_object_notify (G_OBJECT (self), "active-pad");
1553     NOTIFY_MUTEX_UNLOCK ();
1554   }
1555 }