inputselector: Add sync mode that syncs inactive pads to the running time of the...
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Sat, 19 Mar 2011 09:28:49 +0000 (10:28 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Sat, 14 May 2011 09:39:35 +0000 (11:39 +0200)
Fixes bug #645017.

plugins/elements/gstinputselector.c
plugins/elements/gstinputselector.h

index 32a4aa9..4e4075a 100644 (file)
@@ -95,9 +95,12 @@ enum
 {
   PROP_0,
   PROP_N_PADS,
-  PROP_ACTIVE_PAD
+  PROP_ACTIVE_PAD,
+  PROP_SYNC_STREAMS
 };
 
+#define DEFAULT_SYNC_STREAMS FALSE
+
 #define DEFAULT_PAD_ALWAYS_OK TRUE
 
 enum
@@ -624,6 +627,105 @@ gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
   return self->flushing;
 }
 
+/* must be called with the SELECTOR_LOCK, will block until the running time
+ * of the active pad is after this pad or return TRUE when flushing */
+static gboolean
+gst_input_selector_wait_running_time (GstInputSelector * sel,
+    GstSelectorPad * pad, GstBuffer * buf)
+{
+  GstPad *active_sinkpad;
+  GstSelectorPad *active_selpad;
+  GstSegment *seg, *active_seg;
+  GstClockTime running_time, active_running_time = -1;
+
+  seg = &pad->segment;
+
+  active_sinkpad =
+      gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
+  active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
+  active_seg = &active_selpad->segment;
+
+  /* We can only sync if the segments are in time format or
+   * if the active pad had no newsegment event yet */
+  if (seg->format != GST_FORMAT_TIME ||
+      (active_seg->format != GST_FORMAT_TIME
+          && active_seg->format != GST_FORMAT_UNDEFINED))
+    return FALSE;
+
+  /* If we have no valid timestamp we can't sync this buffer */
+  if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf))
+    return FALSE;
+
+  running_time = GST_BUFFER_TIMESTAMP (buf);
+  /* If possible try to get the running time at the end of the buffer */
+  if (GST_BUFFER_DURATION_IS_VALID (buf))
+    running_time += GST_BUFFER_DURATION (buf);
+  if (running_time > seg->stop)
+    running_time = seg->stop;
+  running_time =
+      gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
+  /* If this is outside the segment don't sync */
+  if (running_time == -1)
+    return FALSE;
+
+  /* Get active pad's running time, if no configured segment yet keep at -1 */
+  if (active_seg->format == GST_FORMAT_TIME)
+    active_running_time =
+        gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
+        active_seg->last_stop);
+
+  /* Wait until
+   *   a) this is the active pad
+   *   b) the pad or the selector is flushing
+   *   c) the selector is not blocked
+   *   d) the active pad has no running time or the active
+   *      pad's running time is before this running time
+   *   e) the active pad has a non-time segment
+   */
+  while (pad != active_selpad && !sel->flushing && !pad->flushing &&
+      (sel->blocked || active_running_time == -1
+          || running_time >= active_running_time)) {
+    if (!sel->blocked)
+      GST_DEBUG_OBJECT (pad,
+          "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
+          GST_TIME_ARGS (active_running_time));
+
+    GST_INPUT_SELECTOR_WAIT (sel);
+
+    /* Get new active pad, it might have changed */
+    active_sinkpad =
+        gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
+    active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
+    active_seg = &active_selpad->segment;
+
+    /* If the active segment is configured but not to time format
+     * we can't do any syncing at all */
+    if (active_seg->format != GST_FORMAT_TIME
+        && active_seg->format != GST_FORMAT_UNDEFINED)
+      break;
+
+    /* Get the new active pad running time */
+    if (active_seg->format == GST_FORMAT_TIME)
+      active_running_time =
+          gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
+          active_seg->last_stop);
+    else
+      active_running_time = -1;
+
+    if (!sel->blocked)
+      GST_DEBUG_OBJECT (pad,
+          "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
+          GST_TIME_ARGS (active_running_time));
+
+  }
+
+  /* Return TRUE if the selector or the pad is flushing */
+  return (sel->flushing || pad->flushing);
+}
+
+
 static GstFlowReturn
 gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
 {
@@ -651,6 +753,16 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
   prev_active_sinkpad = sel->active_sinkpad;
   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
 
+  /* In sync mode wait until the active pad has advanced
+   * after the running time of the current buffer */
+  if (sel->sync_streams && active_sinkpad != pad) {
+    if (gst_input_selector_wait_running_time (sel, selpad, buf))
+      goto flushing;
+  }
+
+  /* Might have changed while waiting */
+  active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+
   /* update the segment on the srcpad */
   start_time = GST_BUFFER_TIMESTAMP (buf);
   if (GST_CLOCK_TIME_IS_VALID (start_time)) {
@@ -669,6 +781,10 @@ gst_selector_pad_chain (GstPad * pad, GstBuffer * buf)
   if (pad != active_sinkpad)
     goto ignore;
 
+  /* Tell all non-active pads that we advanced the running time */
+  if (sel->sync_streams)
+    GST_INPUT_SELECTOR_BROADCAST (sel);
+
   if (G_UNLIKELY (sel->pending_close)) {
     GstSegment *cseg = &sel->segment;
 
@@ -900,6 +1016,21 @@ gst_input_selector_class_init (GstInputSelectorClass * klass)
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
+   * GstInputSelector:sync-streams
+   *
+   * If set to %TRUE all inactive streams will be synced to the
+   * running time of the active stream. This makes sure that no
+   * buffers are dropped by input-selector that might be needed
+   * when switching the active pad.
+   *
+   * Since: 0.10.33
+   */
+  g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
+      g_param_spec_boolean ("sync-streams", "Sync Streams",
+          "Synchronize inactive streams to the running time of the active stream",
+          DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
    * GstInputSelector::block:
    * @inputselector: the #GstInputSelector
    *
@@ -991,6 +1122,7 @@ gst_input_selector_init (GstInputSelector * sel,
   sel->active_sinkpad = NULL;
   sel->padcount = 0;
   gst_segment_init (&sel->segment, GST_FORMAT_UNDEFINED);
+  sel->sync_streams = DEFAULT_SYNC_STREAMS;
 
   sel->lock = g_mutex_new ();
   sel->cond = g_cond_new ();
@@ -1105,6 +1237,12 @@ gst_input_selector_set_active_pad (GstInputSelector * self,
 
   active_pad_p = &self->active_sinkpad;
   gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
+
+  /* Wake up all non-active pads in sync mode, they might be
+   * the active pad now */
+  if (self->sync_streams)
+    GST_INPUT_SELECTOR_BROADCAST (self);
+
   GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
       self->active_sinkpad);
 
@@ -1130,6 +1268,13 @@ gst_input_selector_set_property (GObject * object, guint prop_id,
       GST_INPUT_SELECTOR_UNLOCK (sel);
       break;
     }
+    case PROP_SYNC_STREAMS:
+    {
+      GST_INPUT_SELECTOR_LOCK (sel);
+      sel->sync_streams = g_value_get_boolean (value);
+      GST_INPUT_SELECTOR_UNLOCK (sel);
+      break;
+    }
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1153,6 +1298,11 @@ gst_input_selector_get_property (GObject * object, guint prop_id,
       g_value_set_object (value, sel->active_sinkpad);
       GST_INPUT_SELECTOR_UNLOCK (object);
       break;
+    case PROP_SYNC_STREAMS:
+      GST_INPUT_SELECTOR_LOCK (object);
+      g_value_set_boolean (value, sel->sync_streams);
+      GST_INPUT_SELECTOR_UNLOCK (object);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index 0ed7a4b..f370999 100644 (file)
@@ -56,6 +56,7 @@ struct _GstInputSelector {
   GstPad *active_sinkpad;
   guint n_pads;
   guint padcount;
+  gboolean sync_streams;
 
   GstSegment segment;      /* the output segment */
   gboolean pending_close;  /* if we should push a close first */