inputselector: Properly sync when changing streams
authorAndre Moreira Magalhaes (andrunko) <andre.magalhaes@collabora.co.uk>
Mon, 28 May 2012 17:29:00 +0000 (14:29 -0300)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 31 May 2012 10:38:20 +0000 (12:38 +0200)
This adds properties to use the clock time for deciding when
to drop buffers for inactive pads and a property to buffer all
not rendered buffers for the active pad to allow pad switching
without losing any buffers at all.

Conflicts:

plugins/elements/gstinputselector.c

plugins/elements/gstinputselector.c
plugins/elements/gstinputselector.h

index 62a5aaa..9802b4d 100644 (file)
 
 #include "gst/glib-compat-private.h"
 
+#define DEBUG_CACHED_BUFFERS 0
+
 GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
 #define GST_CAT_DEFAULT input_selector_debug
 
+#define GST_TYPE_INPUT_SELECTOR_SYNC_MODE (gst_input_selector_sync_mode_get_type())
+static GType
+gst_input_selector_sync_mode_get_type (void)
+{
+  static GType type = 0;
+  static const GEnumValue data[] = {
+    {GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT,
+          "Sync using the current active segment",
+        "active-segment"},
+    {GST_INPUT_SELECTOR_SYNC_MODE_CLOCK, "Sync using the clock", "clock"},
+    {0, NULL, NULL},
+  };
+
+  if (!type) {
+    type = g_enum_register_static ("GstInputSelectorSyncMode", data);
+  }
+  return type;
+}
+
+#if GLIB_CHECK_VERSION(2, 26, 0)
+#define NOTIFY_MUTEX_LOCK()
+#define NOTIFY_MUTEX_UNLOCK()
+#else
+static GStaticRecMutex notify_mutex = G_STATIC_REC_MUTEX_INIT;
+#define NOTIFY_MUTEX_LOCK() g_static_rec_mutex_lock (&notify_mutex)
+#define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (&notify_mutex)
+#endif
+
+#define GST_INPUT_SELECTOR_GET_LOCK(sel) (&((GstInputSelector*)(sel))->lock)
+#define GST_INPUT_SELECTOR_GET_COND(sel) (&((GstInputSelector*)(sel))->cond)
+#define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
+                       GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
+
 static GstStaticPadTemplate gst_input_selector_sink_factory =
 GST_STATIC_PAD_TEMPLATE ("sink_%u",
     GST_PAD_SINK,
@@ -81,11 +119,14 @@ enum
   PROP_0,
   PROP_N_PADS,
   PROP_ACTIVE_PAD,
-  PROP_SYNC_STREAMS
+  PROP_SYNC_STREAMS,
+  PROP_SYNC_MODE,
+  PROP_CACHE_BUFFERS
 };
 
 #define DEFAULT_SYNC_STREAMS TRUE
-
+#define DEFAULT_SYNC_MODE GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT
+#define DEFAULT_CACHE_BUFFERS FALSE
 #define DEFAULT_PAD_ALWAYS_OK TRUE
 
 enum
@@ -106,6 +147,8 @@ enum
 };
 static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 };
 
+static void gst_input_selector_active_pad_changed (GstInputSelector * sel,
+    GParamSpec * pspec, gpointer user_data);
 static inline gboolean gst_input_selector_is_active_sinkpad (GstInputSelector *
     sel, GstPad * pad);
 static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel,
@@ -128,6 +171,7 @@ static GstPad *gst_input_selector_get_linked_pad (GstInputSelector * sel,
 
 typedef struct _GstSelectorPad GstSelectorPad;
 typedef struct _GstSelectorPadClass GstSelectorPadClass;
+typedef struct _GstSelectorPadCachedBuffer GstSelectorPadCachedBuffer;
 
 struct _GstSelectorPad
 {
@@ -147,6 +191,15 @@ struct _GstSelectorPad
   guint32 segment_seqnum;       /* sequence number of the current segment */
 
   gboolean events_pending;      /* TRUE if sticky events need to be updated */
+
+  gboolean sending_cached_buffers;
+  GQueue *cached_buffers;
+};
+
+struct _GstSelectorPadCachedBuffer
+{
+  GstBuffer *buffer;
+  GstSegment segment;
 };
 
 struct _GstSelectorPadClass
@@ -171,6 +224,9 @@ static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad,
     GstObject * parent);
 static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buf);
+static void gst_selector_pad_cache_buffer (GstSelectorPad * selpad,
+    GstBuffer * buffer);
+static void gst_selector_pad_free_cached_buffers (GstSelectorPad * selpad);
 
 G_DEFINE_TYPE (GstSelectorPad, gst_selector_pad, GST_TYPE_PAD);
 
@@ -221,6 +277,7 @@ gst_selector_pad_finalize (GObject * object)
 
   if (pad->tags)
     gst_tag_list_unref (pad->tags);
+  gst_selector_pad_free_cached_buffers (pad);
 
   G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object);
 }
@@ -293,12 +350,13 @@ gst_selector_pad_get_running_time (GstSelectorPad * pad)
   }
   GST_OBJECT_UNLOCK (pad);
 
-  GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (ret));
+  GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT
+      " segment: %" GST_SEGMENT_FORMAT, GST_TIME_ARGS (ret), &pad->segment);
 
   return ret;
 }
 
+/* must be called with the SELECTOR_LOCK */
 static void
 gst_selector_pad_reset (GstSelectorPad * pad)
 {
@@ -312,9 +370,55 @@ gst_selector_pad_reset (GstSelectorPad * pad)
   pad->flushing = FALSE;
   pad->position = GST_CLOCK_TIME_NONE;
   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
+  pad->sending_cached_buffers = FALSE;
+  gst_selector_pad_free_cached_buffers (pad);
   GST_OBJECT_UNLOCK (pad);
 }
 
+static GstSelectorPadCachedBuffer *
+gst_selector_pad_new_cached_buffer (GstSelectorPad * selpad, GstBuffer * buffer)
+{
+  GstSelectorPadCachedBuffer *cached_buffer =
+      g_slice_new (GstSelectorPadCachedBuffer);
+  cached_buffer->buffer = buffer;
+  cached_buffer->segment = selpad->segment;
+  return cached_buffer;
+}
+
+static void
+gst_selector_pad_free_cached_buffer (GstSelectorPadCachedBuffer * cached_buffer)
+{
+  gst_buffer_unref (cached_buffer->buffer);
+  g_slice_free (GstSelectorPadCachedBuffer, cached_buffer);
+}
+
+/* must be called with the SELECTOR_LOCK */
+static void
+gst_selector_pad_cache_buffer (GstSelectorPad * selpad, GstBuffer * buffer)
+{
+  GST_DEBUG_OBJECT (selpad, "Caching buffer %p", buffer);
+  if (!selpad->cached_buffers)
+    selpad->cached_buffers = g_queue_new ();
+  g_queue_push_tail (selpad->cached_buffers,
+      gst_selector_pad_new_cached_buffer (selpad, buffer));
+}
+
+/* must be called with the SELECTOR_LOCK */
+static void
+gst_selector_pad_free_cached_buffers (GstSelectorPad * selpad)
+{
+  GstSelectorPadCachedBuffer *cached_buffer;
+
+  if (!selpad->cached_buffers)
+    return;
+
+  GST_DEBUG_OBJECT (selpad, "Freeing cached buffers");
+  while ((cached_buffer = g_queue_pop_head (selpad->cached_buffers)))
+    gst_selector_pad_free_cached_buffer (cached_buffer);
+  g_queue_free (selpad->cached_buffers);
+  selpad->cached_buffers = NULL;
+}
+
 /* strictly get the linked pad from the sinkpad. If the pad is active we return
  * the srcpad else we return NULL */
 static GstIterator *
@@ -351,36 +455,35 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
   sel = GST_INPUT_SELECTOR (parent);
   selpad = GST_SELECTOR_PAD_CAST (pad);
+  GST_DEBUG_OBJECT (selpad, "received event %" GST_PTR_FORMAT, event);
 
   GST_INPUT_SELECTOR_LOCK (sel);
   prev_active_sinkpad = sel->active_sinkpad;
   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
-
-  /* only forward if we are dealing with the active sinkpad */
-  forward = (pad == active_sinkpad);
   GST_INPUT_SELECTOR_UNLOCK (sel);
 
   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
     g_object_notify (G_OBJECT (sel), "active-pad");
   }
 
+  GST_INPUT_SELECTOR_LOCK (sel);
+  active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+
+  /* only forward if we are dealing with the active sinkpad */
+  forward = (pad == active_sinkpad);
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
       /* Unblock the pad if it's waiting */
-      GST_INPUT_SELECTOR_LOCK (sel);
       selpad->flushing = TRUE;
       GST_INPUT_SELECTOR_BROADCAST (sel);
-      GST_INPUT_SELECTOR_UNLOCK (sel);
       break;
     case GST_EVENT_FLUSH_STOP:
-      GST_INPUT_SELECTOR_LOCK (sel);
       gst_selector_pad_reset (selpad);
-      GST_INPUT_SELECTOR_UNLOCK (sel);
+      GST_INPUT_SELECTOR_BROADCAST (sel);
       break;
     case GST_EVENT_SEGMENT:
     {
-      GST_INPUT_SELECTOR_LOCK (sel);
-      GST_OBJECT_LOCK (selpad);
       gst_event_copy_segment (event, &selpad->segment);
       selpad->segment_seqnum = gst_event_get_seqnum (event);
 
@@ -400,9 +503,6 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
       }
       GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT,
           &selpad->segment);
-
-      GST_OBJECT_UNLOCK (selpad);
-      GST_INPUT_SELECTOR_UNLOCK (sel);
       break;
     }
     case GST_EVENT_TAG:
@@ -411,7 +511,6 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
       gst_event_parse_tag (event, &tags);
 
-      GST_OBJECT_LOCK (selpad);
       oldtags = selpad->tags;
 
       newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE);
@@ -419,7 +518,6 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (oldtags)
         gst_tag_list_unref (oldtags);
       GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags);
-      GST_OBJECT_UNLOCK (selpad);
 
       g_object_notify (G_OBJECT (selpad), "tags");
       break;
@@ -430,7 +528,7 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (forward) {
         selpad->eos_sent = TRUE;
       } else {
-        GstSelectorPad *tmp;
+        GstSelectorPad *active_selpad;
 
         /* If the active sinkpad is in EOS state but EOS
          * was not sent downstream this means that the pad
@@ -438,18 +536,16 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
          * the previously active pad got EOS after it was
          * active
          */
-        GST_INPUT_SELECTOR_LOCK (sel);
-        active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
-        tmp = GST_SELECTOR_PAD (active_sinkpad);
-        forward = (tmp->eos && !tmp->eos_sent);
-        tmp->eos_sent = TRUE;
-        GST_INPUT_SELECTOR_UNLOCK (sel);
+        active_selpad = GST_SELECTOR_PAD (active_sinkpad);
+        forward = (active_selpad->eos && !active_selpad->eos_sent);
+        active_selpad->eos_sent = TRUE;
       }
       GST_DEBUG_OBJECT (pad, "received EOS");
       break;
     default:
       break;
   }
+  GST_INPUT_SELECTOR_UNLOCK (sel);
   if (forward) {
     GST_DEBUG_OBJECT (pad, "forwarding event");
     res = gst_pad_push_event (sel->srcpad, event);
@@ -495,103 +591,107 @@ gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
   return self->flushing;
 }
 
-/* must be called with the SELECTOR_LOCK, will block until the running time
+/* must be called without the SELECTOR_LOCK, will wait until the running time
  * of the active pad is after this pad or return TRUE when flushing */
 static gboolean
 gst_input_selector_wait_running_time (GstInputSelector * sel,
-    GstSelectorPad * pad, GstBuffer * buf)
+    GstSelectorPad * selpad, GstBuffer * buf)
 {
-  GstPad *active_sinkpad;
-  GstSelectorPad *active_selpad;
-  GstSegment *seg, *active_seg;
-  GstClockTime running_time, active_running_time = GST_CLOCK_TIME_NONE;
-
-  seg = &pad->segment;
-
-  active_sinkpad =
-      gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
-  active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
-  active_seg = &active_selpad->segment;
-
-  /* We can only sync if the segments are in time format or
-   * if the active pad had no newsegment event yet */
-  if (seg->format != GST_FORMAT_TIME ||
-      (active_seg->format != GST_FORMAT_TIME
-          && active_seg->format != GST_FORMAT_UNDEFINED))
-    return FALSE;
+  GstSegment *seg;
 
-  /* If we have no valid timestamp we can't sync this buffer */
-  if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf))
-    return FALSE;
+  GST_DEBUG_OBJECT (selpad, "entering wait for buffer %p", buf);
 
-  running_time = GST_BUFFER_TIMESTAMP (buf);
-  /* If possible try to get the running time at the end of the buffer */
-  if (GST_BUFFER_DURATION_IS_VALID (buf))
-    running_time += GST_BUFFER_DURATION (buf);
-  if (running_time > seg->stop)
-    running_time = seg->stop;
-  running_time =
-      gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
-  /* If this is outside the segment don't sync */
-  if (running_time == -1)
+  /* If we have no valid timestamp we can't sync this buffer */
+  if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+    GST_DEBUG_OBJECT (selpad, "leaving wait for buffer with "
+        "invalid timestamp");
     return FALSE;
+  }
 
-  /* Get active pad's running time, if no configured segment yet keep at -1 */
-  if (active_seg->format == GST_FORMAT_TIME)
-    active_running_time =
-        gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
-        active_selpad->position);
+  seg = &selpad->segment;
 
   /* Wait until
    *   a) this is the active pad
    *   b) the pad or the selector is flushing
    *   c) the selector is not blocked
-   *   d) the active pad has no running time or the active
-   *      pad's running time is before this running time
-   *   e) the active pad has a non-time segment
-   *   f) the active pad changed and has not pushed anything
+   *   d) the buffer running time is before the current running time
+   *      (either active-seg or clock, depending on sync-mode)
    */
-  while (pad != active_selpad && !sel->flushing && !pad->flushing
-      && active_selpad->pushed && (sel->blocked || active_running_time == -1
-          || running_time >= active_running_time)) {
-    if (!sel->blocked)
-      GST_DEBUG_OBJECT (pad,
-          "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
-          GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
-          GST_TIME_ARGS (active_running_time));
-
-    GST_INPUT_SELECTOR_WAIT (sel);
-
-    /* Get new active pad, it might have changed */
+
+  GST_INPUT_SELECTOR_LOCK (sel);
+  while (TRUE) {
+    GstPad *active_sinkpad;
+    GstSelectorPad *active_selpad;
+    GstClock *clock;
+    gint64 cur_running_time;
+    GstClockTime running_time;
+
     active_sinkpad =
-        gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
+        gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (selpad));
     active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
-    active_seg = &active_selpad->segment;
 
-    /* If the active segment is configured but not to time format
-     * we can't do any syncing at all */
-    if (active_seg->format != GST_FORMAT_TIME
-        && active_seg->format != GST_FORMAT_UNDEFINED)
-      break;
+    running_time = GST_BUFFER_TIMESTAMP (buf);
+    /* If possible try to get the running time at the end of the buffer */
+    if (GST_BUFFER_DURATION_IS_VALID (buf))
+      running_time += GST_BUFFER_DURATION (buf);
+    /* Only use the segment to convert to running time if the segment is
+     * in TIME format, otherwise do our best to try to sync */
+    if (seg->format == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (seg->stop)) {
+      if (running_time > seg->stop) {
+        running_time = seg->stop;
+      }
+      running_time =
+          gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
+      /* If this is outside the segment don't sync */
+      if (running_time == -1) {
+        GST_INPUT_SELECTOR_UNLOCK (sel);
+        return FALSE;
+      }
+    }
 
-    /* Get the new active pad running time */
-    if (active_seg->format == GST_FORMAT_TIME)
-      active_running_time =
-          gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
-          active_selpad->position);
-    else
-      active_running_time = -1;
+    cur_running_time = GST_CLOCK_TIME_NONE;
+    if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) {
+      clock = gst_element_get_clock (GST_ELEMENT_CAST (sel));
+      if (clock) {
+        GstClockTime base_time;
+
+        cur_running_time = gst_clock_get_time (clock);
+        base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel));
+        if (base_time <= cur_running_time)
+          cur_running_time -= base_time;
+        else
+          cur_running_time = 0;
+      }
+    } else {
+      GstSegment *active_seg;
 
-    if (!sel->blocked)
-      GST_DEBUG_OBJECT (pad,
-          "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %"
-          GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
-          GST_TIME_ARGS (active_running_time));
+      active_seg = &active_selpad->segment;
 
+      /* Get active pad's running time, if no configured segment yet keep at -1 */
+      if (active_seg->format == GST_FORMAT_TIME)
+        cur_running_time = gst_segment_to_running_time (active_seg,
+            GST_FORMAT_TIME, active_seg->position);
+    }
+
+    if (selpad != active_selpad && !sel->flushing && !selpad->flushing &&
+        (sel->cache_buffers || active_selpad->pushed) &&
+        (sel->blocked || cur_running_time == -1
+            || running_time >= cur_running_time)) {
+      if (!sel->blocked)
+        GST_DEBUG_OBJECT (selpad,
+            "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
+            GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
+            GST_TIME_ARGS (cur_running_time));
+
+      GST_INPUT_SELECTOR_WAIT (sel);
+    } else {
+      GST_INPUT_SELECTOR_UNLOCK (sel);
+      break;
+    }
   }
 
   /* Return TRUE if the selector or the pad is flushing */
-  return (sel->flushing || pad->flushing);
+  return (sel->flushing || selpad->flushing);
 }
 
 static gboolean
@@ -614,6 +714,165 @@ forward_sticky_events (GstPad * sinkpad, GstEvent ** event, gpointer user_data)
   return TRUE;
 }
 
+#if DEBUG_CACHED_BUFFERS
+static void
+gst_input_selector_debug_cached_buffers (GstInputSelector * sel)
+{
+  GList *walk;
+
+  for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
+    GstSelectorPad *selpad;
+    GString *timestamps;
+    gchar *str;
+    int i;
+
+    selpad = GST_SELECTOR_PAD_CAST (walk->data);
+    if (!selpad->cached_buffers) {
+      GST_DEBUG_OBJECT (selpad, "Cached buffers timestamps: <none>");
+      continue;
+    }
+
+    timestamps = g_string_new ("Cached buffers timestamps:");
+    for (i = 0; i < selpad->cached_buffers->length; ++i) {
+      GstSelectorPadCachedBuffer *cached_buffer;
+
+      cached_buffer = g_queue_peek_nth (selpad->cached_buffers, i);
+      g_string_append_printf (timestamps, " %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (cached_buffer->buffer)));
+    }
+    str = g_string_free (timestamps, FALSE);
+    GST_DEBUG_OBJECT (selpad, str);
+    g_free (str);
+  }
+}
+#endif
+
+/* must be called with the SELECTOR_LOCK */
+static void
+gst_input_selector_cleanup_old_cached_buffers (GstInputSelector * sel,
+    GstPad * pad)
+{
+  GstSelectorPad *selpad;
+  GstSegment *seg;
+  GstClock *clock;
+  gint64 cur_running_time;
+  GList *walk;
+
+  selpad = GST_SELECTOR_PAD_CAST (pad);
+  seg = &selpad->segment;
+
+  cur_running_time = GST_CLOCK_TIME_NONE;
+  if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) {
+    clock = gst_element_get_clock (GST_ELEMENT_CAST (sel));
+    if (clock) {
+      GstClockTime base_time;
+
+      cur_running_time = gst_clock_get_time (clock);
+      base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel));
+      if (base_time <= cur_running_time)
+        cur_running_time -= base_time;
+      else
+        cur_running_time = 0;
+    }
+  } else {
+    GstPad *active_sinkpad;
+    GstSelectorPad *active_selpad;
+    GstSegment *active_seg;
+
+    active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+    active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
+    active_seg = &active_selpad->segment;
+
+    /* Get active pad's running time, if no configured segment yet keep at -1 */
+    if (active_seg->format == GST_FORMAT_TIME)
+      cur_running_time = gst_segment_to_running_time (active_seg,
+          GST_FORMAT_TIME, active_seg->position);
+  }
+
+  if (!GST_CLOCK_TIME_IS_VALID (cur_running_time))
+    return;
+
+  GST_DEBUG_OBJECT (sel, "Cleaning up old cached buffers");
+  for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
+    GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data);
+    GstSelectorPadCachedBuffer *cached_buffer;
+    GSList *maybe_remove;
+    guint queue_position;
+
+    if (!selpad->cached_buffers)
+      continue;
+
+    maybe_remove = NULL;
+    queue_position = 0;
+    while ((cached_buffer = g_queue_peek_nth (selpad->cached_buffers,
+                queue_position))) {
+      GstBuffer *buffer = cached_buffer->buffer;
+      GstClockTime running_time;
+      GSList *l;
+
+      /* If we have no valid timestamp we can't sync this buffer */
+      if (!GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) {
+        maybe_remove = g_slist_append (maybe_remove, cached_buffer);
+        queue_position = g_slist_length (maybe_remove);
+        continue;
+      }
+
+      /* the buffer is still valid if its duration is valid and the
+       * timestamp + duration is >= time, or if its duration is invalid
+       * and the timestamp is >= time */
+      running_time = GST_BUFFER_TIMESTAMP (buffer);
+      /* If possible try to get the running time at the end of the buffer */
+      if (GST_BUFFER_DURATION_IS_VALID (buffer))
+        running_time += GST_BUFFER_DURATION (buffer);
+      /* Only use the segment to convert to running time if the segment is
+       * in TIME format, otherwise do our best to try to sync */
+      if (seg->format == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (seg->stop)) {
+        if (running_time > seg->stop) {
+          running_time = seg->stop;
+        }
+        running_time =
+            gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
+      }
+
+      GST_DEBUG_OBJECT (selpad,
+          "checking if buffer %p running time=%" GST_TIME_FORMAT
+          " >= stream time=%" GST_TIME_FORMAT, buffer,
+          GST_TIME_ARGS (running_time), GST_TIME_ARGS (cur_running_time));
+      if (running_time >= cur_running_time) {
+        break;
+      }
+
+      GST_DEBUG_OBJECT (selpad, "Removing old cached buffer %p", buffer);
+      g_queue_pop_nth (selpad->cached_buffers, queue_position);
+      gst_selector_pad_free_cached_buffer (cached_buffer);
+
+      for (l = maybe_remove; l != NULL; l = g_slist_next (l)) {
+        /* A buffer after some invalid buffers was removed, it means the invalid buffers
+         * are old, lets also remove them */
+        cached_buffer = l->data;
+        g_queue_remove (selpad->cached_buffers, cached_buffer);
+        gst_selector_pad_free_cached_buffer (cached_buffer);
+      }
+
+      g_slist_free (maybe_remove);
+      maybe_remove = NULL;
+      queue_position = 0;
+    }
+
+    g_slist_free (maybe_remove);
+    maybe_remove = NULL;
+
+    if (g_queue_is_empty (selpad->cached_buffers)) {
+      g_queue_free (selpad->cached_buffers);
+      selpad->cached_buffers = NULL;
+    }
+  }
+
+#if DEBUG_CACHED_BUFFERS
+  gst_input_selector_debug_cached_buffers (sel);
+#endif
+}
+
 static GstFlowReturn
 gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
 {
@@ -627,10 +886,16 @@ gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
   sel = GST_INPUT_SELECTOR (parent);
   selpad = GST_SELECTOR_PAD_CAST (pad);
 
+  GST_DEBUG_OBJECT (selpad,
+      "entering chain for buf %p with timestamp %" GST_TIME_FORMAT, buf,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
+
   GST_INPUT_SELECTOR_LOCK (sel);
   /* wait or check for flushing */
-  if (gst_input_selector_wait (sel, selpad))
+  if (gst_input_selector_wait (sel, selpad)) {
+    GST_INPUT_SELECTOR_UNLOCK (sel);
     goto flushing;
+  }
 
   GST_LOG_OBJECT (pad, "getting active pad");
 
@@ -639,13 +904,57 @@ gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
 
   /* In sync mode wait until the active pad has advanced
    * after the running time of the current buffer */
-  if (sel->sync_streams && active_sinkpad != pad) {
-    if (gst_input_selector_wait_running_time (sel, selpad, buf))
-      goto flushing;
-  }
+  if (sel->sync_streams) {
+    /* call chain for each cached buffer if we are not the active pad
+     * or if we are the active pad but didn't push anything yet. */
+    if (active_sinkpad != pad || !selpad->pushed) {
+      /* no need to check for sel->cache_buffers as selpad->cached_buffers
+       * will only be valid if cache_buffers is TRUE */
+      if (selpad->cached_buffers && !selpad->sending_cached_buffers) {
+        GstSelectorPadCachedBuffer *cached_buffer;
+        GstSegment saved_segment;
+
+        saved_segment = selpad->segment;
+
+        selpad->sending_cached_buffers = TRUE;
+        while (!sel->flushing && !selpad->flushing &&
+            (cached_buffer = g_queue_pop_head (selpad->cached_buffers))) {
+          GST_DEBUG_OBJECT (pad, "Cached buffers found, "
+              "invoking chain for cached buffer %p", cached_buffer->buffer);
+
+          selpad->segment = cached_buffer->segment;
+          selpad->events_pending = TRUE;
+          GST_INPUT_SELECTOR_UNLOCK (sel);
+          gst_selector_pad_chain (pad, parent, cached_buffer->buffer);
+          GST_INPUT_SELECTOR_LOCK (sel);
+
+          /* we may have cleaned up the queue in the meantime because of
+           * old buffers */
+          if (!selpad->cached_buffers) {
+            break;
+          }
+        }
+        selpad->sending_cached_buffers = FALSE;
 
-  /* Might have changed while waiting */
-  active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+        /* all cached buffers sent, restore segment for current buffer */
+        selpad->segment = saved_segment;
+        selpad->events_pending = TRUE;
+
+        /* Might have changed while calling chain for cached buffers */
+        active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+      }
+    }
+
+    if (active_sinkpad != pad) {
+      GST_INPUT_SELECTOR_UNLOCK (sel);
+      if (gst_input_selector_wait_running_time (sel, selpad, buf))
+        goto flushing;
+      GST_INPUT_SELECTOR_LOCK (sel);
+    }
+
+    /* Might have changed while waiting */
+    active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+  }
 
   /* update the segment on the srcpad */
   start_time = GST_BUFFER_TIMESTAMP (buf);
@@ -693,10 +1002,29 @@ gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
   }
 
   /* forward */
-  GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf);
+  GST_LOG_OBJECT (pad, "Forwarding buffer %p with timestamp %" GST_TIME_FORMAT,
+      buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
 
-  res = gst_pad_push (sel->srcpad, buf);
-  selpad->pushed = TRUE;
+  res = gst_pad_push (sel->srcpad, gst_buffer_ref (buf));
+  GST_LOG_OBJECT (pad, "Buffer %p forwarded result=%d", buf, res);
+
+  GST_INPUT_SELECTOR_LOCK (sel);
+
+  if (sel->sync_streams && sel->cache_buffers) {
+    /* Might have changed while pushing */
+    active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+    /* only set pad to pushed if we are still the active pad */
+    if (active_sinkpad == pad)
+      selpad->pushed = TRUE;
+
+    /* cache buffer as we may need it again if we change pads */
+    gst_selector_pad_cache_buffer (selpad, buf);
+    gst_input_selector_cleanup_old_cached_buffers (sel, pad);
+  } else {
+    selpad->pushed = TRUE;
+    gst_buffer_unref (buf);
+  }
+  GST_INPUT_SELECTOR_UNLOCK (sel);
 
 done:
   return res;
@@ -706,7 +1034,8 @@ ignore:
   {
     gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
 
-    GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
+    GST_DEBUG_OBJECT (pad, "Pad not active or buffer timestamp is invalid, "
+        "discard buffer %p", buf);
     /* when we drop a buffer, we're creating a discont on this pad */
     selpad->discont = TRUE;
     GST_INPUT_SELECTOR_UNLOCK (sel);
@@ -725,7 +1054,6 @@ ignore:
 flushing:
   {
     GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf);
-    GST_INPUT_SELECTOR_UNLOCK (sel);
     gst_buffer_unref (buf);
     res = GST_FLOW_FLUSHING;
     goto done;
@@ -818,16 +1146,59 @@ gst_input_selector_class_init (GstInputSelectorClass * klass)
    * GstInputSelector:sync-streams
    *
    * If set to %TRUE all inactive streams will be synced to the
-   * running time of the active stream. This makes sure that no
-   * buffers are dropped by input-selector that might be needed
-   * when switching the active pad.
+   * running time of the active stream or to the current clock.
+   *
+   * To make sure no buffers are dropped by input-selector
+   * that might be needed when switching the active pad,
+   * sync-mode should be set to "clock" and cache-buffers to TRUE.
    *
    * Since: 0.10.36
    */
   g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
       g_param_spec_boolean ("sync-streams", "Sync Streams",
-          "Synchronize inactive streams to the running time of the active stream",
-          DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Synchronize inactive streams to the running time of the active "
+          "stream or to the current clock",
+          DEFAULT_SYNC_STREAMS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+          GST_PARAM_MUTABLE_READY));
+
+  /**
+   * GstInputSelector:sync-mode
+   *
+   * Select how input-selector will sync buffers when in sync-streams mode.
+   *
+   * Note that when using the "active-segment" mode, the "active-segment" may
+   * be ahead of current clock time when switching the active pad, as the current
+   * active pad may have pushed more buffers than what was displayed/consumed,
+   * which may cause delays and some missing buffers.
+   *
+   * Since: 0.10.36
+   */
+  g_object_class_install_property (gobject_class, PROP_SYNC_MODE,
+      g_param_spec_enum ("sync-mode", "Sync mode",
+          "Behavior in sync-streams mode", GST_TYPE_INPUT_SELECTOR_SYNC_MODE,
+          DEFAULT_SYNC_MODE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+          GST_PARAM_MUTABLE_READY));
+
+  /**
+   * GstInputSelector:cache-buffers
+   *
+   * If set to %TRUE and GstInputSelector:sync-streams is also set to %TRUE,
+   * the active pad will cache the buffers still considered valid (after current
+   * running time, see sync-mode) to avoid missing frames if/when the pad is
+   * reactivated.
+   *
+   * The active pad may push more buffers than what is currently displayed/consumed
+   * and when changing pads those buffers will be discarded and the only way to
+   * reactivate that pad without loosing the already consumed buffers is to enable cache.
+   */
+  g_object_class_install_property (gobject_class, PROP_CACHE_BUFFERS,
+      g_param_spec_boolean ("cache-buffers", "Cache Buffers",
+          "Cache buffers for active-pad",
+          DEFAULT_CACHE_BUFFERS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+          GST_PARAM_MUTABLE_READY));
 
   /**
    * GstInputSelector::block:
@@ -880,6 +1251,12 @@ gst_input_selector_init (GstInputSelector * sel)
   g_mutex_init (&sel->lock);
   g_cond_init (&sel->cond);
   sel->blocked = FALSE;
+
+  /* lets give a change for downstream to do something on
+   * active-pad change before we start pushing new buffers */
+  g_signal_connect_data (sel, "notify::active-pad",
+      (GCallback) gst_input_selector_active_pad_changed, NULL,
+      NULL, G_CONNECT_AFTER);
 }
 
 static void
@@ -934,13 +1311,6 @@ gst_input_selector_set_active_pad (GstInputSelector * self, GstPad * pad)
   active_pad_p = &self->active_sinkpad;
   gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
 
-  gst_pad_push_event (pad, gst_event_new_reconfigure ());
-
-  /* Wake up all non-active pads in sync mode, they might be
-   * the active pad now */
-  if (self->sync_streams)
-    GST_INPUT_SELECTOR_BROADCAST (self);
-
   GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
       self->active_sinkpad);
 
@@ -961,17 +1331,35 @@ gst_input_selector_set_property (GObject * object, guint prop_id,
       pad = g_value_get_object (value);
 
       GST_INPUT_SELECTOR_LOCK (sel);
+
+#if DEBUG_CACHED_BUFFERS
+      gst_input_selector_debug_cached_buffers (sel);
+#endif
+
       gst_input_selector_set_active_pad (sel, pad);
+
+#if DEBUG_CACHED_BUFFERS
+      gst_input_selector_debug_cached_buffers (sel);
+#endif
+
       GST_INPUT_SELECTOR_UNLOCK (sel);
       break;
     }
     case PROP_SYNC_STREAMS:
-    {
       GST_INPUT_SELECTOR_LOCK (sel);
       sel->sync_streams = g_value_get_boolean (value);
       GST_INPUT_SELECTOR_UNLOCK (sel);
       break;
-    }
+    case PROP_SYNC_MODE:
+      GST_INPUT_SELECTOR_LOCK (sel);
+      sel->sync_mode = g_value_get_enum (value);
+      GST_INPUT_SELECTOR_UNLOCK (sel);
+      break;
+    case PROP_CACHE_BUFFERS:
+      GST_INPUT_SELECTOR_LOCK (object);
+      sel->cache_buffers = g_value_get_boolean (value);
+      GST_INPUT_SELECTOR_UNLOCK (object);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -979,6 +1367,16 @@ gst_input_selector_set_property (GObject * object, guint prop_id,
 }
 
 static void
+gst_input_selector_active_pad_changed (GstInputSelector * sel,
+    GParamSpec * pspec, gpointer user_data)
+{
+  /* Wake up all non-active pads in sync mode, they might be
+   * the active pad now */
+  if (sel->sync_streams)
+    GST_INPUT_SELECTOR_BROADCAST (sel);
+}
+
+static void
 gst_input_selector_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec)
 {
@@ -1000,6 +1398,16 @@ gst_input_selector_get_property (GObject * object, guint prop_id,
       g_value_set_boolean (value, sel->sync_streams);
       GST_INPUT_SELECTOR_UNLOCK (object);
       break;
+    case PROP_SYNC_MODE:
+      GST_INPUT_SELECTOR_LOCK (object);
+      g_value_set_enum (value, sel->sync_mode);
+      GST_INPUT_SELECTOR_UNLOCK (object);
+      break;
+    case PROP_CACHE_BUFFERS:
+      GST_INPUT_SELECTOR_LOCK (object);
+      g_value_set_boolean (value, sel->cache_buffers);
+      GST_INPUT_SELECTOR_UNLOCK (object);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index 8a5248a..dd48a51 100644 (file)
@@ -48,6 +48,18 @@ typedef struct _GstInputSelectorClass GstInputSelectorClass;
                        GST_INPUT_SELECTOR_GET_LOCK(sel)))
 #define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
 
+/**
+ * GstInputSelectorSyncMode:
+ * @GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT: Sync using the current active segment.
+ * @GST_INPUT_SELECTOR_SYNC_MODE_CLOCK: Sync using the clock.
+ *
+ * The different ways that input-selector can behave when in sync-streams mode.
+ */
+typedef enum {
+  GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT,
+  GST_INPUT_SELECTOR_SYNC_MODE_CLOCK
+} GstInputSelectorSyncMode;
+
 struct _GstInputSelector {
   GstElement element;
 
@@ -57,6 +69,8 @@ struct _GstInputSelector {
   guint n_pads;
   guint padcount;
   gboolean sync_streams;
+  GstInputSelectorSyncMode sync_mode;
+  gboolean cache_buffers;
 
   GMutex lock;
   GCond cond;