Merge branch 'master' into 0.11-fdo
[platform/upstream/gstreamer.git] / plugins / elements / gstinputselector.c
index 51a33e1..2d31d89 100644 (file)
@@ -1,10 +1,11 @@
-/* GStreamer
+/* GStreamer input selector
  * Copyright (C) 2003 Julien Moutte <julien@moutte.net>
  * Copyright (C) 2005 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
  * Copyright (C) 2005 Jan Schmidt <thaytan@mad.scientist.com>
  * Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
  * Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
  * Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
+ * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
  * @see_also: #GstOutputSelector
  *
  * Direct one out of N input streams to the output pad.
+ *
+ * The input pads are from a GstPad subclass and have additional
+ * properties, which users may find useful, namely:
+ *
+ * <itemizedlist>
+ * <listitem>
+ * "running-time": Running time of stream on pad (#gint64)
+ * </listitem>
+ * <listitem>
+ * "tags": The currently active tags on the pad (#GstTagList, boxed type)
+ * </listitem>
+ * <listitem>
+ * "active": If the pad is currently active (#gboolean)
+ * </listitem>
+ * <listitem>
+ * "always-ok" : Make an inactive pads return #GST_FLOW_OK instead of
+ * #GST_FLOW_NOT_LINKED
+ * </listitem>
+ * </itemizedlist>
+ *
+ * Since: 0.10.32
  */
 
 #ifdef HAVE_CONFIG_H
 #include <string.h>
 
 #include "gstinputselector.h"
-#include "gstselector-marshal.h"
 
 GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
 #define GST_CAT_DEFAULT input_selector_debug
 
+#if GLIB_CHECK_VERSION(2, 26, 0)
+#define NOTIFY_MUTEX_LOCK()
+#define NOTIFY_MUTEX_UNLOCK()
+#else
+static GStaticRecMutex notify_mutex = G_STATIC_REC_MUTEX_INIT;
+#define NOTIFY_MUTEX_LOCK() g_static_rec_mutex_lock (&notify_mutex)
+#define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (&notify_mutex)
+#endif
+
+#define GST_INPUT_SELECTOR_GET_LOCK(sel) (((GstInputSelector*)(sel))->lock)
+#define GST_INPUT_SELECTOR_GET_COND(sel) (((GstInputSelector*)(sel))->cond)
+#define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
+                       GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
+
 static GstStaticPadTemplate gst_input_selector_sink_factory =
 GST_STATIC_PAD_TEMPLATE ("sink%d",
     GST_PAD_SINK,
@@ -57,12 +95,10 @@ enum
 {
   PROP_0,
   PROP_N_PADS,
-  PROP_ACTIVE_PAD,
-  PROP_SELECT_ALL,
-  PROP_LAST
+  PROP_ACTIVE_PAD
 };
 
-#define DEFAULT_PAD_ALWAYS_OK  TRUE
+#define DEFAULT_PAD_ALWAYS_OK TRUE
 
 enum
 {
@@ -70,8 +106,7 @@ enum
   PROP_PAD_RUNNING_TIME,
   PROP_PAD_TAGS,
   PROP_PAD_ACTIVE,
-  PROP_PAD_ALWAYS_OK,
-  PROP_PAD_LAST
+  PROP_PAD_ALWAYS_OK
 };
 
 enum
@@ -89,7 +124,6 @@ static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel,
     GstPad * pad);
 static GstPad *gst_input_selector_get_linked_pad (GstPad * pad,
     gboolean strict);
-static gboolean gst_input_selector_check_eos (GstElement * selector);
 
 #define GST_TYPE_SELECTOR_PAD \
   (gst_selector_pad_get_type())
@@ -112,8 +146,11 @@ struct _GstSelectorPad
   GstPad parent;
 
   gboolean active;              /* when buffer have passed the pad */
+  gboolean pushed;              /* when buffer was pushed downstream since activation */
   gboolean eos;                 /* when EOS has been received */
+  gboolean eos_sent;            /* when EOS was sent downstream */
   gboolean discont;             /* after switching we create a discont */
+  gboolean flushing;            /* set after flush-start and before flush-stop */
   gboolean always_ok;
   GstSegment segment;           /* the current segment on the pad */
   GstTagList *tags;             /* last tags received on the pad */
@@ -149,26 +186,26 @@ static GstFlowReturn gst_selector_pad_bufferalloc (GstPad * pad,
 static GType
 gst_selector_pad_get_type (void)
 {
-  static GType selector_pad_type = 0;
-
-  if (!selector_pad_type) {
-    static const GTypeInfo selector_pad_info = {
-      sizeof (GstSelectorPadClass),
-      NULL,
-      NULL,
-      (GClassInitFunc) gst_selector_pad_class_init,
-      NULL,
-      NULL,
-      sizeof (GstSelectorPad),
-      0,
-      (GInstanceInitFunc) gst_selector_pad_init,
-    };
-
-    selector_pad_type =
-        g_type_register_static (GST_TYPE_PAD, "GstSelectorPad",
+  static volatile gsize selector_pad_type = 0;
+  static const GTypeInfo selector_pad_info = {
+    sizeof (GstSelectorPadClass),
+    NULL,
+    NULL,
+    (GClassInitFunc) gst_selector_pad_class_init,
+    NULL,
+    NULL,
+    sizeof (GstSelectorPad),
+    0,
+    (GInstanceInitFunc) gst_selector_pad_init,
+  };
+
+  if (g_once_init_enter (&selector_pad_type)) {
+    GType tmp = g_type_register_static (GST_TYPE_PAD, "GstSelectorPad",
         &selector_pad_info, 0);
+    g_once_init_leave (&selector_pad_type, tmp);
   }
-  return selector_pad_type;
+
+  return (GType) selector_pad_type;
 }
 
 static void
@@ -187,18 +224,21 @@ gst_selector_pad_class_init (GstSelectorPadClass * klass)
 
   g_object_class_install_property (gobject_class, PROP_PAD_RUNNING_TIME,
       g_param_spec_int64 ("running-time", "Running time",
-          "Running time of stream on pad", 0, G_MAXINT64, 0, G_PARAM_READABLE));
+          "Running time of stream on pad", 0, G_MAXINT64, 0,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_PAD_TAGS,
       g_param_spec_boxed ("tags", "Tags",
           "The currently active tags on the pad", GST_TYPE_TAG_LIST,
-          G_PARAM_READABLE));
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_PAD_ACTIVE,
       g_param_spec_boolean ("active", "Active",
-          "If the pad is currently active", FALSE, G_PARAM_READABLE));
+          "If the pad is currently active", FALSE,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+  /* FIXME: better property name? */
   g_object_class_install_property (gobject_class, PROP_PAD_ALWAYS_OK,
       g_param_spec_boolean ("always-ok", "Always OK",
           "Make an inactive pad return OK instead of NOT_LINKED",
-          DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE));
+          DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
 static void
@@ -301,9 +341,12 @@ gst_selector_pad_reset (GstSelectorPad * pad)
 {
   GST_OBJECT_LOCK (pad);
   pad->active = FALSE;
+  pad->pushed = FALSE;
   pad->eos = FALSE;
+  pad->eos_sent = FALSE;
   pad->segment_pending = FALSE;
   pad->discont = FALSE;
+  pad->flushing = FALSE;
   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
   GST_OBJECT_UNLOCK (pad);
 }
@@ -332,7 +375,7 @@ static gboolean
 gst_selector_pad_event (GstPad * pad, GstEvent * event)
 {
   gboolean res = TRUE;
-  gboolean forward = TRUE;
+  gboolean forward;
   GstInputSelector *sel;
   GstSelectorPad *selpad;
   GstPad *prev_active_sinkpad;
@@ -345,18 +388,23 @@ gst_selector_pad_event (GstPad * pad, GstEvent * event)
   prev_active_sinkpad = sel->active_sinkpad;
   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
 
-  /* only forward if we are dealing with the active sinkpad or if select_all
-   * is enabled */
-  if (pad != active_sinkpad && !sel->select_all)
-    forward = FALSE;
+  /* only forward if we are dealing with the active sinkpad */
+  forward = (pad == active_sinkpad);
   GST_INPUT_SELECTOR_UNLOCK (sel);
 
-  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad)
+  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
+    NOTIFY_MUTEX_LOCK ();
     g_object_notify (G_OBJECT (sel), "active-pad");
+    NOTIFY_MUTEX_UNLOCK ();
+  }
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      /* FIXME, flush out the waiter */
+      /* Unblock the pad if it's waiting */
+      GST_INPUT_SELECTOR_LOCK (sel);
+      selpad->flushing = TRUE;
+      GST_INPUT_SELECTOR_BROADCAST (sel);
+      GST_INPUT_SELECTOR_UNLOCK (sel);
       break;
     case GST_EVENT_FLUSH_STOP:
       GST_INPUT_SELECTOR_LOCK (sel);
@@ -386,8 +434,8 @@ gst_selector_pad_event (GstPad * pad, GstEvent * event)
           rate, arate, format, start, stop, time);
       GST_OBJECT_UNLOCK (selpad);
 
-      /* If we aren't forwarding the event (because the pad is not the
-       * active_sinkpad, and select_all is not set, then set the flag on the
+      /* If we aren't forwarding the event because the pad is not the
+       * active_sinkpad, then set the flag on the pad
        * that says a segment needs sending if/when that pad is activated.
        * For all other cases, we send the event immediately, which makes
        * sparse streams and other segment updates work correctly downstream.
@@ -419,11 +467,26 @@ gst_selector_pad_event (GstPad * pad, GstEvent * event)
     }
     case GST_EVENT_EOS:
       selpad->eos = TRUE;
-      GST_DEBUG_OBJECT (pad, "received EOS");
-      /* don't forward eos in select_all mode until all sink pads have eos */
-      if (sel->select_all && !gst_input_selector_check_eos (GST_ELEMENT (sel))) {
-        forward = FALSE;
+
+      if (forward) {
+        selpad->eos_sent = TRUE;
+      } else {
+        GstSelectorPad *tmp;
+
+        /* If the active sinkpad is in EOS state but EOS
+         * was not sent downstream this means that the pad
+         * got EOS before it was set as active pad and that
+         * the previously active pad got EOS after it was
+         * active
+         */
+        GST_INPUT_SELECTOR_LOCK (sel);
+        active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+        tmp = GST_SELECTOR_PAD (active_sinkpad);
+        forward = (tmp->eos && !tmp->eos_sent);
+        tmp->eos_sent = TRUE;
+        GST_INPUT_SELECTOR_UNLOCK (sel);
       }
+      GST_DEBUG_OBJECT (pad, "received EOS");
       break;
     default:
       break;
@@ -448,7 +511,7 @@ gst_selector_pad_getcaps (GstPad * pad)
   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
 
   GST_DEBUG_OBJECT (sel, "Getting caps of srcpad peer");
-  caps = gst_pad_peer_get_caps (sel->srcpad);
+  caps = gst_pad_peer_get_caps_reffed (sel->srcpad);
   if (caps == NULL)
     caps = gst_caps_new_any ();
 
@@ -485,7 +548,7 @@ gst_selector_pad_bufferalloc (GstPad * pad, guint64 offset,
   sel = GST_INPUT_SELECTOR (gst_pad_get_parent (pad));
   selpad = GST_SELECTOR_PAD_CAST (pad);
 
-  GST_DEBUG_OBJECT (pad, "received alloc");
+  GST_LOG_OBJECT (pad, "received alloc");
 
   GST_INPUT_SELECTOR_LOCK (sel);
   prev_active_sinkpad = sel->active_sinkpad;
@@ -496,8 +559,11 @@ gst_selector_pad_bufferalloc (GstPad * pad, guint64 offset,
 
   GST_INPUT_SELECTOR_UNLOCK (sel);
 
-  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad)
+  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
+    NOTIFY_MUTEX_LOCK ();
     g_object_notify (G_OBJECT (sel), "active-pad");
+    NOTIFY_MUTEX_UNLOCK ();
+  }
 
   result = gst_pad_alloc_buffer (sel->srcpad, offset, size, caps, buf);
 
@@ -509,12 +575,14 @@ done:
   /* ERRORS */
 not_active:
   {
+    gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
+
     GST_INPUT_SELECTOR_UNLOCK (sel);
 
     /* unselected pad, perform fallback alloc or return unlinked when
      * asked */
     GST_OBJECT_LOCK (selpad);
-    if (selpad->always_ok) {
+    if (selpad->always_ok || !active_pad_pushed) {
       GST_DEBUG_OBJECT (pad, "Not selected, performing fallback allocation");
       *buf = NULL;
       result = GST_FLOW_OK;
@@ -531,9 +599,9 @@ not_active:
 /* must be called with the SELECTOR_LOCK, will block while the pad is blocked 
  * or return TRUE when flushing */
 static gboolean
-gst_input_selector_wait (GstInputSelector * self, GstPad * pad)
+gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
 {
-  while (self->blocked && !self->flushing) {
+  while (self->blocked && !self->flushing && !pad->flushing) {
     /* we can be unlocked here when we are shutting down (flushing) or when we
      * get unblocked */
     GST_INPUT_SELECTOR_WAIT (self);
@@ -560,10 +628,10 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
 
   GST_INPUT_SELECTOR_LOCK (sel);
   /* wait or check for flushing */
-  if (gst_input_selector_wait (sel, pad))
+  if (gst_input_selector_wait (sel, selpad))
     goto flushing;
 
-  GST_DEBUG_OBJECT (pad, "getting active pad");
+  GST_LOG_OBJECT (pad, "getting active pad");
 
   prev_active_sinkpad = sel->active_sinkpad;
   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
@@ -571,10 +639,10 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
   /* update the segment on the srcpad */
   start_time = GST_BUFFER_TIMESTAMP (buf);
   if (GST_CLOCK_TIME_IS_VALID (start_time)) {
-    GST_DEBUG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
+    GST_LOG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
         GST_TIME_ARGS (start_time));
     if (GST_BUFFER_DURATION_IS_VALID (buf))
-      GST_DEBUG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
+      GST_LOG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
           GST_TIME_ARGS (start_time + GST_BUFFER_DURATION (buf)));
 
     GST_OBJECT_LOCK (pad);
@@ -618,8 +686,11 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
   }
   GST_INPUT_SELECTOR_UNLOCK (sel);
 
-  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad)
+  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
+    NOTIFY_MUTEX_LOCK ();
     g_object_notify (G_OBJECT (sel), "active-pad");
+    NOTIFY_MUTEX_UNLOCK ();
+  }
 
   if (close_event)
     gst_pad_push_event (sel->srcpad, close_event);
@@ -628,7 +699,7 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
     gst_pad_push_event (sel->srcpad, start_event);
 
   if (selpad->discont) {
-    buf = gst_buffer_make_metadata_writable (buf);
+    buf = gst_buffer_make_writable (buf);
 
     GST_DEBUG_OBJECT (pad, "Marking discont buffer %p", buf);
     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
@@ -636,8 +707,7 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
   }
 
   /* forward */
-  GST_DEBUG_OBJECT (pad, "Forwarding buffer %p from pad %s:%s", buf,
-      GST_DEBUG_PAD_NAME (pad));
+  GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf);
 
   if ((caps = GST_BUFFER_CAPS (buf))) {
     if (GST_PAD_CAPS (sel->srcpad) != caps)
@@ -645,6 +715,7 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
   }
 
   res = gst_pad_push (sel->srcpad, buf);
+  selpad->pushed = TRUE;
 
 done:
   gst_object_unref (sel);
@@ -653,6 +724,8 @@ done:
   /* dropped buffers */
 ignore:
   {
+    gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
+
     GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
     /* when we drop a buffer, we're creating a discont on this pad */
     selpad->discont = TRUE;
@@ -661,7 +734,7 @@ ignore:
 
     /* figure out what to return upstream */
     GST_OBJECT_LOCK (selpad);
-    if (selpad->always_ok)
+    if (selpad->always_ok || !active_pad_pushed)
       res = GST_FLOW_OK;
     else
       res = GST_FLOW_NOT_LINKED;
@@ -700,6 +773,73 @@ static gint64 gst_input_selector_block (GstInputSelector * self);
 static void gst_input_selector_switch (GstInputSelector * self,
     GstPad * pad, gint64 stop_time, gint64 start_time);
 
+/* FIXME: create these marshallers using glib-genmarshal */
+#define g_marshal_value_peek_object(v)   (v)->data[0].v_pointer
+#define g_marshal_value_peek_int64(v)    (v)->data[0].v_int64
+
+static void
+gst_input_selector_marshal_INT64__VOID (GClosure * closure,
+    GValue * return_value G_GNUC_UNUSED,
+    guint n_param_values,
+    const GValue * param_values,
+    gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
+{
+  typedef gint64 (*GMarshalFunc_INT64__VOID) (gpointer data1, gpointer data2);
+  register GMarshalFunc_INT64__VOID callback;
+  register GCClosure *cc = (GCClosure *) closure;
+  register gpointer data1, data2;
+  gint64 v_return;
+
+  g_return_if_fail (return_value != NULL);
+  g_return_if_fail (n_param_values == 1);
+
+  if (G_CCLOSURE_SWAP_DATA (closure)) {
+    data1 = closure->data;
+    data2 = g_value_peek_pointer (param_values + 0);
+  } else {
+    data1 = g_value_peek_pointer (param_values + 0);
+    data2 = closure->data;
+  }
+  callback =
+      (GMarshalFunc_INT64__VOID) (marshal_data ? marshal_data : cc->callback);
+
+  v_return = callback (data1, data2);
+
+  g_value_set_int64 (return_value, v_return);
+}
+
+static void
+gst_input_selector_marshal_VOID__OBJECT_INT64_INT64 (GClosure * closure,
+    GValue * return_value G_GNUC_UNUSED,
+    guint n_param_values,
+    const GValue * param_values,
+    gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
+{
+  typedef void (*GMarshalFunc_VOID__OBJECT_INT64_INT64) (gpointer data1,
+      gpointer arg_1, gint64 arg_2, gint64 arg_3, gpointer data2);
+  register GMarshalFunc_VOID__OBJECT_INT64_INT64 callback;
+  register GCClosure *cc = (GCClosure *) closure;
+  register gpointer data1, data2;
+
+  g_return_if_fail (n_param_values == 4);
+
+  if (G_CCLOSURE_SWAP_DATA (closure)) {
+    data1 = closure->data;
+    data2 = g_value_peek_pointer (param_values + 0);
+  } else {
+    data1 = g_value_peek_pointer (param_values + 0);
+    data2 = closure->data;
+  }
+  callback =
+      (GMarshalFunc_VOID__OBJECT_INT64_INT64) (marshal_data ? marshal_data :
+      cc->callback);
+
+  callback (data1,
+      g_marshal_value_peek_object (param_values + 1),
+      g_marshal_value_peek_int64 (param_values + 2),
+      g_marshal_value_peek_int64 (param_values + 3), data2);
+}
+
 #define _do_init(bla) \
     GST_DEBUG_CATEGORY_INIT (input_selector_debug, \
         "input-selector", 0, "An input stream selector element");
@@ -713,7 +853,7 @@ gst_input_selector_base_init (gpointer g_class)
   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
   gst_element_class_set_details_simple (element_class, "Input selector",
-      "Generic", "N-to-1 input stream selectoring",
+      "Generic", "N-to-1 input stream selector",
       "Julien Moutte <julien@moutte.net>, "
       "Jan Schmidt <thaytan@mad.scientist.com>, "
       "Wim Taymans <wim.taymans@gmail.com>");
@@ -729,8 +869,6 @@ gst_input_selector_class_init (GstInputSelectorClass * klass)
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
 
-  parent_class = g_type_class_peek_parent (klass);
-
   gobject_class->dispose = gst_input_selector_dispose;
 
   gobject_class->set_property = gst_input_selector_set_property;
@@ -738,15 +876,13 @@ gst_input_selector_class_init (GstInputSelectorClass * klass)
 
   g_object_class_install_property (gobject_class, PROP_N_PADS,
       g_param_spec_uint ("n-pads", "Number of Pads",
-          "The number of sink pads", 0, G_MAXUINT, 0, G_PARAM_READABLE));
+          "The number of sink pads", 0, G_MAXUINT, 0,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
       g_param_spec_object ("active-pad", "Active pad",
-          "The currently active sink pad", GST_TYPE_PAD, G_PARAM_READWRITE));
-
-  g_object_class_install_property (gobject_class, PROP_SELECT_ALL,
-      g_param_spec_boolean ("select-all", "Select all mode",
-          "Forwards data from all input pads", FALSE, G_PARAM_READWRITE));
+          "The currently active sink pad", GST_TYPE_PAD,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
    * GstInputSelector::block:
@@ -760,7 +896,7 @@ gst_input_selector_class_init (GstInputSelectorClass * klass)
       g_signal_new ("block", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
       G_STRUCT_OFFSET (GstInputSelectorClass, block), NULL, NULL,
-      gst_selector_marshal_INT64__VOID, G_TYPE_INT64, 0);
+      gst_input_selector_marshal_INT64__VOID, G_TYPE_INT64, 0);
   /**
    * GstInputSelector::switch:
    * @inputselector: the #GstInputSelector
@@ -810,7 +946,7 @@ gst_input_selector_class_init (GstInputSelectorClass * klass)
   gst_input_selector_signals[SIGNAL_SWITCH] =
       g_signal_new ("switch", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstInputSelectorClass, switch_),
-      NULL, NULL, gst_selector_marshal_VOID__OBJECT_INT64_INT64,
+      NULL, NULL, gst_input_selector_marshal_VOID__OBJECT_INT64_INT64,
       G_TYPE_NONE, 3, GST_TYPE_PAD, G_TYPE_INT64, G_TYPE_INT64);
 
   gstelement_class->request_new_pad = gst_input_selector_request_new_pad;
@@ -844,8 +980,6 @@ gst_input_selector_init (GstInputSelector * sel,
   sel->lock = g_mutex_new ();
   sel->cond = g_cond_new ();
   sel->blocked = FALSE;
-
-  sel->select_all = FALSE;
 }
 
 static void
@@ -923,12 +1057,12 @@ gst_input_selector_set_active_pad (GstInputSelector * self,
   GST_DEBUG_OBJECT (self, "setting active pad to %s:%s",
       GST_DEBUG_PAD_NAME (new));
 
-  if (stop_time == -1 && old) {
+  if (!GST_CLOCK_TIME_IS_VALID (stop_time) && old) {
     /* no stop time given, get the latest running_time on the active pad to 
      * close and open the new segment */
     stop_time = start_time = gst_selector_pad_get_running_time (old);
-    GST_DEBUG_OBJECT (self, "using start/stop of %" G_GINT64_FORMAT,
-        start_time);
+    GST_DEBUG_OBJECT (self, "using start/stop of %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (start_time));
   }
 
   if (old && old->active && !self->pending_close && stop_time >= 0) {
@@ -936,19 +1070,23 @@ gst_input_selector_set_active_pad (GstInputSelector * self,
        segment has been pushed before. */
     memcpy (&self->segment, &old->segment, sizeof (self->segment));
 
-    GST_DEBUG_OBJECT (self, "setting stop_time to %" G_GINT64_FORMAT,
-        stop_time);
+    GST_DEBUG_OBJECT (self, "setting stop_time to %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (stop_time));
     gst_segment_set_stop (&self->segment, stop_time);
     self->pending_close = TRUE;
   }
+  if (old)
+    old->pushed = FALSE;
 
   if (new && new->active && start_time >= 0) {
-    GST_DEBUG_OBJECT (self, "setting start_time to %" G_GINT64_FORMAT,
-        start_time);
+    GST_DEBUG_OBJECT (self, "setting start_time to %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (start_time));
     /* schedule a new segment push */
     gst_segment_set_start (&new->segment, start_time);
     new->segment_pending = TRUE;
   }
+  if (new)
+    new->pushed = FALSE;
 
   active_pad_p = &self->active_sinkpad;
   gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
@@ -977,11 +1115,6 @@ gst_input_selector_set_property (GObject * object, guint prop_id,
       GST_INPUT_SELECTOR_UNLOCK (sel);
       break;
     }
-    case PROP_SELECT_ALL:
-      GST_INPUT_SELECTOR_LOCK (object);
-      sel->select_all = g_value_get_boolean (value);
-      GST_INPUT_SELECTOR_UNLOCK (object);
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1005,11 +1138,6 @@ gst_input_selector_get_property (GObject * object, guint prop_id,
       g_value_set_object (value, sel->active_sinkpad);
       GST_INPUT_SELECTOR_UNLOCK (object);
       break;
-    case PROP_SELECT_ALL:
-      GST_INPUT_SELECTOR_LOCK (object);
-      g_value_set_boolean (value, sel->select_all);
-      GST_INPUT_SELECTOR_UNLOCK (object);
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1150,23 +1278,14 @@ gst_input_selector_getcaps (GstPad * pad)
   otherpad = gst_input_selector_get_linked_pad (pad, FALSE);
 
   if (!otherpad) {
-    if (GST_INPUT_SELECTOR (parent)->select_all) {
-      GST_DEBUG_OBJECT (parent,
-          "Pad %s:%s not linked, returning merge of caps",
-          GST_DEBUG_PAD_NAME (pad));
-      caps = gst_pad_proxy_getcaps (pad);
-    } else {
-      GST_DEBUG_OBJECT (parent,
-          "Pad %s:%s not linked, returning ANY", GST_DEBUG_PAD_NAME (pad));
-      caps = gst_caps_new_any ();
-    }
+    GST_DEBUG_OBJECT (pad, "Pad not linked, returning ANY");
+    caps = gst_caps_new_any ();
   } else {
-    GST_DEBUG_OBJECT (parent,
-        "Pad %s:%s is linked (to %s:%s), returning peer caps",
-        GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (otherpad));
+    GST_DEBUG_OBJECT (pad, "Pad is linked (to %s:%s), returning peer caps",
+        GST_DEBUG_PAD_NAME (otherpad));
     /* if the peer has caps, use those. If the pad is not linked, this function
      * returns NULL and we return ANY */
-    if (!(caps = gst_pad_peer_get_caps (otherpad)))
+    if (!(caps = gst_pad_peer_get_caps_reffed (otherpad)))
       caps = gst_caps_new_any ();
     gst_object_unref (otherpad);
   }
@@ -1199,9 +1318,8 @@ gst_input_selector_activate_sinkpad (GstInputSelector * sel, GstPad * pad)
 
   selpad->active = TRUE;
   active_sinkpad = sel->active_sinkpad;
-  if (active_sinkpad == NULL || sel->select_all) {
-    /* first pad we get activity on becomes the activated pad by default, if we
-     * select all, we also remember the last used pad. */
+  if (active_sinkpad == NULL) {
+    /* first pad we get activity on becomes the activated pad by default */
     if (sel->active_sinkpad)
       gst_object_unref (sel->active_sinkpad);
     active_sinkpad = sel->active_sinkpad = gst_object_ref (pad);
@@ -1384,43 +1502,9 @@ gst_input_selector_switch (GstInputSelector * self, GstPad * pad,
   GST_INPUT_SELECTOR_BROADCAST (self);
   GST_INPUT_SELECTOR_UNLOCK (self);
 
-  if (changed)
+  if (changed) {
+    NOTIFY_MUTEX_LOCK ();
     g_object_notify (G_OBJECT (self), "active-pad");
-}
-
-static gboolean
-gst_input_selector_check_eos (GstElement * selector)
-{
-  GstIterator *it = gst_element_iterate_sink_pads (selector);
-  GstIteratorResult ires;
-  gpointer item;
-  gboolean done = FALSE, is_eos = FALSE;
-  GstSelectorPad *pad;
-
-  while (!done) {
-    ires = gst_iterator_next (it, &item);
-    switch (ires) {
-      case GST_ITERATOR_DONE:
-        GST_INFO_OBJECT (selector, "all sink pads have eos");
-        done = TRUE;
-        is_eos = TRUE;
-        break;
-      case GST_ITERATOR_OK:
-        pad = GST_SELECTOR_PAD_CAST (item);
-        if (!pad->eos) {
-          done = TRUE;
-        }
-        gst_object_unref (pad);
-        break;
-      case GST_ITERATOR_RESYNC:
-        gst_iterator_resync (it);
-        break;
-      default:
-        done = TRUE;
-        break;
-    }
+    NOTIFY_MUTEX_UNLOCK ();
   }
-  gst_iterator_free (it);
-
-  return is_eos;
 }