identity: Fix the ts-offset property getter
[platform/upstream/gstreamer.git] / plugins / elements / gstidentity.c
index 3bb50a6..bf6ba40 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 /**
  * SECTION:element-identity
+ * @title: identity
  *
  * Dummy element that passes incoming data through unmodified. It has some
  * useful diagnostic functions, such as offset and timestamp checking.
@@ -34,9 +35,9 @@
 #include <stdlib.h>
 #include <string.h>
 
+#include "gstelements_private.h"
 #include "../../gst/gst-i18n-lib.h"
 #include "gstidentity.h"
-#include <gst/gstmarshal.h>
 
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
@@ -63,15 +64,18 @@ enum
 #define DEFAULT_DUPLICATE               1
 #define DEFAULT_ERROR_AFTER             -1
 #define DEFAULT_DROP_PROBABILITY        0.0
+#define DEFAULT_DROP_BUFFER_FLAGS       0
 #define DEFAULT_DATARATE                0
 #define DEFAULT_SILENT                  TRUE
 #define DEFAULT_SINGLE_SEGMENT          FALSE
 #define DEFAULT_DUMP                    FALSE
 #define DEFAULT_SYNC                    FALSE
-#define DEFAULT_CHECK_PERFECT           FALSE
 #define DEFAULT_CHECK_IMPERFECT_TIMESTAMP FALSE
 #define DEFAULT_CHECK_IMPERFECT_OFFSET    FALSE
 #define DEFAULT_SIGNAL_HANDOFFS           TRUE
+#define DEFAULT_TS_OFFSET               0
+#define DEFAULT_DROP_ALLOCATION         FALSE
+#define DEFAULT_EOS_AFTER               -1
 
 enum
 {
@@ -79,16 +83,19 @@ enum
   PROP_SLEEP_TIME,
   PROP_ERROR_AFTER,
   PROP_DROP_PROBABILITY,
+  PROP_DROP_BUFFER_FLAGS,
   PROP_DATARATE,
   PROP_SILENT,
   PROP_SINGLE_SEGMENT,
   PROP_LAST_MESSAGE,
   PROP_DUMP,
   PROP_SYNC,
-  PROP_CHECK_PERFECT,
+  PROP_TS_OFFSET,
   PROP_CHECK_IMPERFECT_TIMESTAMP,
   PROP_CHECK_IMPERFECT_OFFSET,
-  PROP_SIGNAL_HANDOFFS
+  PROP_SIGNAL_HANDOFFS,
+  PROP_DROP_ALLOCATION,
+  PROP_EOS_AFTER
 };
 
 
@@ -112,6 +119,10 @@ static gboolean gst_identity_start (GstBaseTransform * trans);
 static gboolean gst_identity_stop (GstBaseTransform * trans);
 static GstStateChangeReturn gst_identity_change_state (GstElement * element,
     GstStateChange transition);
+static gboolean gst_identity_accept_caps (GstBaseTransform * base,
+    GstPadDirection direction, GstCaps * caps);
+static gboolean gst_identity_query (GstBaseTransform * base,
+    GstPadDirection direction, GstQuery * query);
 
 static guint gst_identity_signals[LAST_SIGNAL] = { 0 };
 
@@ -125,6 +136,7 @@ gst_identity_finalize (GObject * object)
   identity = GST_IDENTITY (object);
 
   g_free (identity->last_message);
+  g_cond_clear (&identity->blocked_cond);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -149,13 +161,26 @@ gst_identity_class_init (GstIdentityClass * klass)
           DEFAULT_SLEEP_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_ERROR_AFTER,
       g_param_spec_int ("error-after", "Error After", "Error after N buffers",
-          G_MININT, G_MAXINT, DEFAULT_ERROR_AFTER,
+          -1, G_MAXINT, DEFAULT_ERROR_AFTER,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_DROP_PROBABILITY,
       g_param_spec_float ("drop-probability", "Drop Probability",
           "The Probability a buffer is dropped", 0.0, 1.0,
           DEFAULT_DROP_PROBABILITY,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstIdentity:drop-buffer-flags:
+   *
+   * Drop buffers with the given flags.
+   *
+   * Since: 1.8
+   **/
+  g_object_class_install_property (gobject_class, PROP_DROP_BUFFER_FLAGS,
+      g_param_spec_flags ("drop-buffer-flags", "Check flags to drop buffers",
+          "Drop buffers with the given flags",
+          GST_TYPE_BUFFER_FLAGS, DEFAULT_DROP_BUFFER_FLAGS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_DATARATE,
       g_param_spec_int ("datarate", "Datarate",
           "(Re)timestamps buffers with number of bytes per second (0 = inactive)",
@@ -179,12 +204,11 @@ gst_identity_class_init (GstIdentityClass * klass)
       g_param_spec_boolean ("sync", "Synchronize",
           "Synchronize to pipeline clock", DEFAULT_SYNC,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_CHECK_PERFECT,
-      g_param_spec_boolean ("check-perfect", "Check For Perfect Stream",
-          "Verify that the stream is time- and data-contiguous. "
-          "This only logs in the debug log.  This will be deprecated in favor "
-          "of the check-imperfect-timestamp/offset properties.",
-          DEFAULT_CHECK_PERFECT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
+      g_param_spec_int64 ("ts-offset", "Timestamp offset for synchronisation",
+          "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync",
+          G_MININT64, G_MAXINT64, DEFAULT_TS_OFFSET,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class,
       PROP_CHECK_IMPERFECT_TIMESTAMP,
       g_param_spec_boolean ("check-imperfect-timestamp",
@@ -202,16 +226,31 @@ gst_identity_class_init (GstIdentityClass * klass)
   /**
    * GstIdentity:signal-handoffs
    *
-   * If set to #TRUE, the identity will emit a handoff signal when handling a buffer.
-   * When set to #FALSE, no signal will be emited, which might improve performance.
-   *
-   * Since: 0.10.16
+   * If set to %TRUE, the identity will emit a handoff signal when handling a buffer.
+   * When set to %FALSE, no signal will be emitted, which might improve performance.
    */
   g_object_class_install_property (gobject_class, PROP_SIGNAL_HANDOFFS,
       g_param_spec_boolean ("signal-handoffs",
           "Signal handoffs", "Send a signal before pushing the buffer",
           DEFAULT_SIGNAL_HANDOFFS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_DROP_ALLOCATION,
+      g_param_spec_boolean ("drop-allocation", "Drop allocation query",
+          "Don't forward allocation queries", DEFAULT_DROP_ALLOCATION,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstIdentity:eos-after
+   *
+   * EOS after N buffers.
+   *
+   * Since: 1.16
+   **/
+  g_object_class_install_property (gobject_class, PROP_EOS_AFTER,
+      g_param_spec_int ("eos-after", "EOS After", "EOS after N buffers",
+          -1, G_MAXINT, DEFAULT_EOS_AFTER,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   /**
    * GstIdentity::handoff:
    * @identity: the identity instance
@@ -223,19 +262,17 @@ gst_identity_class_init (GstIdentityClass * klass)
   gst_identity_signals[SIGNAL_HANDOFF] =
       g_signal_new ("handoff", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstIdentityClass, handoff), NULL, NULL,
-      g_cclosure_marshal_VOID__BOXED, G_TYPE_NONE, 1,
+      g_cclosure_marshal_generic, G_TYPE_NONE, 1,
       GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
 
   gobject_class->finalize = gst_identity_finalize;
 
-  gst_element_class_set_details_simple (gstelement_class,
+  gst_element_class_set_static_metadata (gstelement_class,
       "Identity",
       "Generic",
       "Pass data without modification", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
+  gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
 
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_identity_change_state);
@@ -245,6 +282,9 @@ gst_identity_class_init (GstIdentityClass * klass)
       GST_DEBUG_FUNCPTR (gst_identity_transform_ip);
   gstbasetrans_class->start = GST_DEBUG_FUNCPTR (gst_identity_start);
   gstbasetrans_class->stop = GST_DEBUG_FUNCPTR (gst_identity_stop);
+  gstbasetrans_class->accept_caps =
+      GST_DEBUG_FUNCPTR (gst_identity_accept_caps);
+  gstbasetrans_class->query = gst_identity_query;
 }
 
 static void
@@ -252,17 +292,22 @@ gst_identity_init (GstIdentity * identity)
 {
   identity->sleep_time = DEFAULT_SLEEP_TIME;
   identity->error_after = DEFAULT_ERROR_AFTER;
+  identity->error_after_counter = DEFAULT_ERROR_AFTER;
   identity->drop_probability = DEFAULT_DROP_PROBABILITY;
+  identity->drop_buffer_flags = DEFAULT_DROP_BUFFER_FLAGS;
   identity->datarate = DEFAULT_DATARATE;
   identity->silent = DEFAULT_SILENT;
   identity->single_segment = DEFAULT_SINGLE_SEGMENT;
   identity->sync = DEFAULT_SYNC;
-  identity->check_perfect = DEFAULT_CHECK_PERFECT;
   identity->check_imperfect_timestamp = DEFAULT_CHECK_IMPERFECT_TIMESTAMP;
   identity->check_imperfect_offset = DEFAULT_CHECK_IMPERFECT_OFFSET;
   identity->dump = DEFAULT_DUMP;
   identity->last_message = NULL;
   identity->signal_handoffs = DEFAULT_SIGNAL_HANDOFFS;
+  identity->ts_offset = DEFAULT_TS_OFFSET;
+  g_cond_init (&identity->blocked_cond);
+  identity->eos_after = DEFAULT_EOS_AFTER;
+  identity->eos_after_counter = DEFAULT_EOS_AFTER;
 
   gst_base_transform_set_gap_aware (GST_BASE_TRANSFORM_CAST (identity), TRUE);
 }
@@ -273,6 +318,66 @@ gst_identity_notify_last_message (GstIdentity * identity)
   g_object_notify_by_pspec ((GObject *) identity, pspec_last_message);
 }
 
+static GstFlowReturn
+gst_identity_do_sync (GstIdentity * identity, GstClockTime running_time)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  if (identity->sync &&
+      GST_BASE_TRANSFORM_CAST (identity)->segment.format == GST_FORMAT_TIME) {
+    GstClock *clock;
+
+    GST_OBJECT_LOCK (identity);
+
+    if (identity->flushing) {
+      GST_OBJECT_UNLOCK (identity);
+      return GST_FLOW_FLUSHING;
+    }
+
+    while (identity->blocked)
+      g_cond_wait (&identity->blocked_cond, GST_OBJECT_GET_LOCK (identity));
+
+    if (identity->flushing) {
+      GST_OBJECT_UNLOCK (identity);
+      return GST_FLOW_FLUSHING;
+    }
+
+    if ((clock = GST_ELEMENT (identity)->clock)) {
+      GstClockReturn cret;
+      GstClockTime timestamp;
+      GstClockTimeDiff ts_offset = identity->ts_offset;
+
+      timestamp = running_time + GST_ELEMENT (identity)->base_time +
+          identity->upstream_latency;
+      if (ts_offset < 0) {
+        ts_offset = -ts_offset;
+        if (ts_offset < timestamp)
+          timestamp -= ts_offset;
+        else
+          timestamp = 0;
+      } else
+        timestamp += ts_offset;
+
+      /* save id if we need to unlock */
+      identity->clock_id = gst_clock_new_single_shot_id (clock, timestamp);
+      GST_OBJECT_UNLOCK (identity);
+
+      cret = gst_clock_id_wait (identity->clock_id, NULL);
+
+      GST_OBJECT_LOCK (identity);
+      if (identity->clock_id) {
+        gst_clock_id_unref (identity->clock_id);
+        identity->clock_id = NULL;
+      }
+      if (cret == GST_CLOCK_UNSCHEDULED || identity->flushing)
+        ret = GST_FLOW_FLUSHING;
+    }
+    GST_OBJECT_UNLOCK (identity);
+  }
+
+  return ret;
+}
+
 static gboolean
 gst_identity_sink_event (GstBaseTransform * trans, GstEvent * event)
 {
@@ -306,7 +411,7 @@ gst_identity_sink_event (GstBaseTransform * trans, GstEvent * event)
   }
 
   if (identity->single_segment && (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
-    if (trans->have_segment == FALSE) {
+    if (!trans->have_segment) {
       GstEvent *news;
       GstSegment segment;
 
@@ -319,10 +424,32 @@ gst_identity_sink_event (GstBaseTransform * trans, GstEvent * event)
       news = gst_event_new_segment (&segment);
 
       gst_pad_event_default (trans->sinkpad, GST_OBJECT_CAST (trans), news);
+    } else {
+      /* need to track segment for proper running time */
+      gst_event_copy_segment (event, &trans->segment);
+    }
+  }
+
+  if (GST_EVENT_TYPE (event) == GST_EVENT_GAP &&
+      trans->have_segment && trans->segment.format == GST_FORMAT_TIME) {
+    GstClockTime start, dur;
+
+    gst_event_parse_gap (event, &start, &dur);
+    if (GST_CLOCK_TIME_IS_VALID (start)) {
+      start = gst_segment_to_running_time (&trans->segment,
+          GST_FORMAT_TIME, start);
+
+      gst_identity_do_sync (identity, start);
+
+      /* also transform GAP timestamp similar to buffer timestamps */
+      if (identity->single_segment) {
+        gst_event_unref (event);
+        event = gst_event_new_gap (start, dur);
+      }
     }
   }
 
-  /* Reset previous timestamp, duration and offsets on NEWSEGMENT
+  /* Reset previous timestamp, duration and offsets on SEGMENT
    * to prevent false warnings when checking for perfect streams */
   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
     identity->prev_timestamp = identity->prev_duration = GST_CLOCK_TIME_NONE;
@@ -336,13 +463,16 @@ gst_identity_sink_event (GstBaseTransform * trans, GstEvent * event)
   } else {
     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START) {
       GST_OBJECT_LOCK (identity);
+      identity->flushing = TRUE;
       if (identity->clock_id) {
         GST_DEBUG_OBJECT (identity, "unlock clock wait");
         gst_clock_id_unschedule (identity->clock_id);
-        gst_clock_id_unref (identity->clock_id);
-        identity->clock_id = NULL;
       }
       GST_OBJECT_UNLOCK (identity);
+    } else if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) {
+      GST_OBJECT_LOCK (identity);
+      identity->flushing = FALSE;
+      GST_OBJECT_UNLOCK (identity);
     }
 
     ret = GST_BASE_TRANSFORM_CLASS (parent_class)->sink_event (trans, event);
@@ -352,51 +482,6 @@ gst_identity_sink_event (GstBaseTransform * trans, GstEvent * event)
 }
 
 static void
-gst_identity_check_perfect (GstIdentity * identity, GstBuffer * buf)
-{
-  GstClockTime timestamp;
-
-  timestamp = GST_BUFFER_TIMESTAMP (buf);
-
-  /* see if we need to do perfect stream checking */
-  /* invalid timestamp drops us out of check.  FIXME: maybe warn ? */
-  if (timestamp != GST_CLOCK_TIME_NONE) {
-    /* check if we had a previous buffer to compare to */
-    if (identity->prev_timestamp != GST_CLOCK_TIME_NONE &&
-        identity->prev_duration != GST_CLOCK_TIME_NONE) {
-      guint64 offset, t_expected;
-      gint64 dt;
-
-      t_expected = identity->prev_timestamp + identity->prev_duration;
-      dt = timestamp - t_expected;
-      if (dt != 0) {
-        GST_WARNING_OBJECT (identity,
-            "Buffer not time-contiguous with previous one: " "prev ts %"
-            GST_TIME_FORMAT ", prev dur %" GST_TIME_FORMAT ", new ts %"
-            GST_TIME_FORMAT " (expected ts %" GST_TIME_FORMAT ", delta=%c%"
-            GST_TIME_FORMAT ")", GST_TIME_ARGS (identity->prev_timestamp),
-            GST_TIME_ARGS (identity->prev_duration), GST_TIME_ARGS (timestamp),
-            GST_TIME_ARGS (t_expected), (dt < 0) ? '-' : '+',
-            GST_TIME_ARGS ((dt < 0) ? (GstClockTime) (-dt) : dt));
-      }
-
-      offset = GST_BUFFER_OFFSET (buf);
-      if (identity->prev_offset_end != offset &&
-          identity->prev_offset_end != GST_BUFFER_OFFSET_NONE &&
-          offset != GST_BUFFER_OFFSET_NONE) {
-        GST_WARNING_OBJECT (identity,
-            "Buffer not data-contiguous with previous one: "
-            "prev offset_end %" G_GINT64_FORMAT ", new offset %"
-            G_GINT64_FORMAT, identity->prev_offset_end, offset);
-      }
-    } else {
-      GST_DEBUG_OBJECT (identity, "can't check time-contiguity, no timestamp "
-          "and/or duration were set on previous buffer");
-    }
-  }
-}
-
-static void
 gst_identity_check_imperfect_timestamp (GstIdentity * identity, GstBuffer * buf)
 {
   GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buf);
@@ -415,6 +500,7 @@ gst_identity_check_imperfect_timestamp (GstIdentity * identity, GstBuffer * buf)
         /*
          * "imperfect-timestamp" bus message:
          * @identity:        the identity instance
+         * @delta:           the GST_CLOCK_DIFF to the prev timestamp
          * @prev-timestamp:  the previous buffer timestamp
          * @prev-duration:   the previous buffer duration
          * @prev-offset:     the previous buffer offset
@@ -431,6 +517,7 @@ gst_identity_check_imperfect_timestamp (GstIdentity * identity, GstBuffer * buf)
         gst_element_post_message (GST_ELEMENT (identity),
             gst_message_new_element (GST_OBJECT (identity),
                 gst_structure_new ("imperfect-timestamp",
+                    "delta", G_TYPE_INT64, dt,
                     "prev-timestamp", G_TYPE_UINT64,
                     identity->prev_timestamp, "prev-duration", G_TYPE_UINT64,
                     identity->prev_duration, "prev-offset", G_TYPE_UINT64,
@@ -506,40 +593,25 @@ gst_identity_update_last_message_for_buffer (GstIdentity * identity,
     const gchar * action, GstBuffer * buf, gsize size)
 {
   gchar dts_str[64], pts_str[64], dur_str[64];
-  gchar flag_str[100];
+  gchar *flag_str, *meta_str;
 
   GST_OBJECT_LOCK (identity);
 
-  {
-    const char *flag_list[15] = {
-      "", "", "", "", "live", "decode-only", "discont", "resync", "corrupted",
-      "marker", "header", "gap", "droppable", "delta-unit", "in-caps"
-    };
-    int i;
-    char *end = flag_str;
-    end[0] = '\0';
-    for (i = 0; i < G_N_ELEMENTS (flag_list); i++) {
-      if (GST_MINI_OBJECT_CAST (buf)->flags & (1 << i)) {
-        strcpy (end, flag_list[i]);
-        end += strlen (end);
-        end[0] = ' ';
-        end[1] = '\0';
-        end++;
-      }
-    }
-  }
+  flag_str = gst_buffer_get_flags_string (buf);
+  meta_str = gst_buffer_get_meta_string (buf);
 
   g_free (identity->last_message);
   identity->last_message = g_strdup_printf ("%s   ******* (%s:%s) "
-      "(%" G_GSIZE_FORMAT " bytes, dts: %s, pts:%s, duration: %s, offset: %"
+      "(%" G_GSIZE_FORMAT " bytes, dts: %s, pts: %s, duration: %s, offset: %"
       G_GINT64_FORMAT ", " "offset_end: % " G_GINT64_FORMAT
-      ", flags: %d %s) %p", action,
+      ", flags: %08x %s, meta: %s) %p", action,
       GST_DEBUG_PAD_NAME (GST_BASE_TRANSFORM_CAST (identity)->sinkpad), size,
       print_pretty_time (dts_str, sizeof (dts_str), GST_BUFFER_DTS (buf)),
       print_pretty_time (pts_str, sizeof (pts_str), GST_BUFFER_PTS (buf)),
       print_pretty_time (dur_str, sizeof (dur_str), GST_BUFFER_DURATION (buf)),
       GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf),
-      GST_BUFFER_FLAGS (buf), flag_str, buf);
+      GST_BUFFER_FLAGS (buf), flag_str, meta_str ? meta_str : "none", buf);
+  g_free (flag_str);
 
   GST_OBJECT_UNLOCK (identity);
 
@@ -551,13 +623,13 @@ gst_identity_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
 {
   GstFlowReturn ret = GST_FLOW_OK;
   GstIdentity *identity = GST_IDENTITY (trans);
-  GstClockTime runtimestamp = G_GINT64_CONSTANT (0);
+  GstClockTime rundts = GST_CLOCK_TIME_NONE;
+  GstClockTime runpts = GST_CLOCK_TIME_NONE;
+  GstClockTime ts, duration, runtimestamp;
   gsize size;
 
   size = gst_buffer_get_size (buf);
 
-  if (identity->check_perfect)
-    gst_identity_check_perfect (identity, buf);
   if (identity->check_imperfect_timestamp)
     gst_identity_check_imperfect_timestamp (identity, buf);
   if (identity->check_imperfect_offset)
@@ -569,23 +641,33 @@ gst_identity_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
   identity->prev_offset_end = GST_BUFFER_OFFSET_END (buf);
   identity->prev_offset = GST_BUFFER_OFFSET (buf);
 
-  if (identity->error_after >= 0) {
-    identity->error_after--;
-    if (identity->error_after == 0)
+  if (identity->error_after_counter >= 0) {
+    identity->error_after_counter--;
+    if (identity->error_after_counter == 0)
       goto error_after;
   }
 
+  if (identity->eos_after_counter >= 0) {
+    identity->eos_after_counter--;
+    if (identity->eos_after_counter == 0)
+      goto eos_after;
+  }
+
   if (identity->drop_probability > 0.0) {
     if ((gfloat) (1.0 * rand () / (RAND_MAX)) < identity->drop_probability)
       goto dropped;
   }
 
+  if (GST_BUFFER_FLAG_IS_SET (buf, identity->drop_buffer_flags))
+    goto dropped;
+
   if (identity->dump) {
     GstMapInfo info;
 
-    gst_buffer_map (buf, &info, GST_MAP_READ);
-    gst_util_dump_mem (info.data, info.size);
-    gst_buffer_unmap (buf, &info);
+    if (gst_buffer_map (buf, &info, GST_MAP_READ)) {
+      gst_util_dump_mem (info.data, info.size);
+      gst_buffer_unmap (buf, &info);
+    }
   }
 
   if (!identity->silent) {
@@ -596,44 +678,28 @@ gst_identity_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
     GstClockTime time = gst_util_uint64_scale_int (identity->offset,
         GST_SECOND, identity->datarate);
 
-    GST_BUFFER_TIMESTAMP (buf) = time;
+    GST_BUFFER_PTS (buf) = GST_BUFFER_DTS (buf) = time;
     GST_BUFFER_DURATION (buf) = size * GST_SECOND / identity->datarate;
   }
 
   if (identity->signal_handoffs)
     g_signal_emit (identity, gst_identity_signals[SIGNAL_HANDOFF], 0, buf);
 
-  if (trans->segment.format == GST_FORMAT_TIME)
-    runtimestamp = gst_segment_to_running_time (&trans->segment,
-        GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
-
-  if ((identity->sync) && (trans->segment.format == GST_FORMAT_TIME)) {
-    GstClock *clock;
-
-    GST_OBJECT_LOCK (identity);
-    if ((clock = GST_ELEMENT (identity)->clock)) {
-      GstClockReturn cret;
-      GstClockTime timestamp;
-
-      timestamp = runtimestamp + GST_ELEMENT (identity)->base_time;
-
-      /* save id if we need to unlock */
-      identity->clock_id = gst_clock_new_single_shot_id (clock, timestamp);
-      GST_OBJECT_UNLOCK (identity);
-
-      cret = gst_clock_id_wait (identity->clock_id, NULL);
-
-      GST_OBJECT_LOCK (identity);
-      if (identity->clock_id) {
-        gst_clock_id_unref (identity->clock_id);
-        identity->clock_id = NULL;
-      }
-      if (cret == GST_CLOCK_UNSCHEDULED)
-        ret = GST_FLOW_EOS;
-    }
-    GST_OBJECT_UNLOCK (identity);
+  if (trans->segment.format == GST_FORMAT_TIME) {
+    rundts = gst_segment_to_running_time (&trans->segment,
+        GST_FORMAT_TIME, GST_BUFFER_DTS (buf));
+    runpts = gst_segment_to_running_time (&trans->segment,
+        GST_FORMAT_TIME, GST_BUFFER_PTS (buf));
   }
 
+  if (GST_CLOCK_TIME_IS_VALID (rundts))
+    runtimestamp = rundts;
+  else if (GST_CLOCK_TIME_IS_VALID (runpts))
+    runtimestamp = runpts;
+  else
+    runtimestamp = 0;
+  ret = gst_identity_do_sync (identity, runtimestamp);
+
   identity->offset += size;
 
   if (identity->sleep_time && ret == GST_FLOW_OK)
@@ -641,7 +707,8 @@ gst_identity_transform_ip (GstBaseTransform * trans, GstBuffer * buf)
 
   if (identity->single_segment && (trans->segment.format == GST_FORMAT_TIME)
       && (ret == GST_FLOW_OK)) {
-    GST_BUFFER_TIMESTAMP (buf) = runtimestamp;
+    GST_BUFFER_DTS (buf) = rundts;
+    GST_BUFFER_PTS (buf) = runpts;
     GST_BUFFER_OFFSET (buf) = GST_CLOCK_TIME_NONE;
     GST_BUFFER_OFFSET_END (buf) = GST_CLOCK_TIME_NONE;
   }
@@ -655,12 +722,25 @@ error_after:
         (_("Failed after iterations as requested.")), (NULL));
     return GST_FLOW_ERROR;
   }
+eos_after:
+  {
+    GST_DEBUG_OBJECT (identity, "EOS after iterations as requested.");
+    return GST_FLOW_EOS;
+  }
 dropped:
   {
     if (!identity->silent) {
       gst_identity_update_last_message_for_buffer (identity, "dropping", buf,
           size);
     }
+
+    ts = GST_BUFFER_TIMESTAMP (buf);
+    if (GST_CLOCK_TIME_IS_VALID (ts)) {
+      duration = GST_BUFFER_DURATION (buf);
+      gst_pad_push_event (GST_BASE_TRANSFORM_SRC_PAD (identity),
+          gst_event_new_gap (ts, duration));
+    }
+
     /* return DROPPED to basetransform. */
     return GST_BASE_TRANSFORM_FLOW_DROPPED;
   }
@@ -693,14 +773,17 @@ gst_identity_set_property (GObject * object, guint prop_id,
     case PROP_DROP_PROBABILITY:
       identity->drop_probability = g_value_get_float (value);
       break;
+    case PROP_DROP_BUFFER_FLAGS:
+      identity->drop_buffer_flags = g_value_get_flags (value);
+      break;
     case PROP_DATARATE:
       identity->datarate = g_value_get_int (value);
       break;
     case PROP_SYNC:
       identity->sync = g_value_get_boolean (value);
       break;
-    case PROP_CHECK_PERFECT:
-      identity->check_perfect = g_value_get_boolean (value);
+    case PROP_TS_OFFSET:
+      identity->ts_offset = g_value_get_int64 (value);
       break;
     case PROP_CHECK_IMPERFECT_TIMESTAMP:
       identity->check_imperfect_timestamp = g_value_get_boolean (value);
@@ -711,6 +794,12 @@ gst_identity_set_property (GObject * object, guint prop_id,
     case PROP_SIGNAL_HANDOFFS:
       identity->signal_handoffs = g_value_get_boolean (value);
       break;
+    case PROP_DROP_ALLOCATION:
+      identity->drop_allocation = g_value_get_boolean (value);
+      break;
+    case PROP_EOS_AFTER:
+      identity->eos_after = g_value_get_int (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -739,6 +828,9 @@ gst_identity_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_DROP_PROBABILITY:
       g_value_set_float (value, identity->drop_probability);
       break;
+    case PROP_DROP_BUFFER_FLAGS:
+      g_value_set_flags (value, identity->drop_buffer_flags);
+      break;
     case PROP_DATARATE:
       g_value_set_int (value, identity->datarate);
       break;
@@ -759,8 +851,8 @@ gst_identity_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_SYNC:
       g_value_set_boolean (value, identity->sync);
       break;
-    case PROP_CHECK_PERFECT:
-      g_value_set_boolean (value, identity->check_perfect);
+    case PROP_TS_OFFSET:
+      g_value_set_int64 (value, identity->ts_offset);
       break;
     case PROP_CHECK_IMPERFECT_TIMESTAMP:
       g_value_set_boolean (value, identity->check_imperfect_timestamp);
@@ -771,6 +863,12 @@ gst_identity_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_SIGNAL_HANDOFFS:
       g_value_set_boolean (value, identity->signal_handoffs);
       break;
+    case PROP_DROP_ALLOCATION:
+      g_value_set_boolean (value, identity->drop_allocation);
+      break;
+    case PROP_EOS_AFTER:
+      g_value_set_int (value, identity->eos_after);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -784,13 +882,27 @@ gst_identity_start (GstBaseTransform * trans)
 
   identity = GST_IDENTITY (trans);
 
+  if (identity->eos_after != DEFAULT_EOS_AFTER
+      && identity->error_after != DEFAULT_ERROR_AFTER)
+    goto both_afters_defined;
+
   identity->offset = 0;
   identity->prev_timestamp = GST_CLOCK_TIME_NONE;
   identity->prev_duration = GST_CLOCK_TIME_NONE;
   identity->prev_offset_end = GST_BUFFER_OFFSET_NONE;
   identity->prev_offset = GST_BUFFER_OFFSET_NONE;
+  identity->error_after_counter = identity->error_after;
+  identity->eos_after_counter = identity->eos_after;
 
   return TRUE;
+
+  /* ERROR */
+both_afters_defined:
+  {
+    GST_ELEMENT_ERROR (identity, CORE, FAILED,
+        (_("eos-after and error-after can't both be defined.")), (NULL));
+    return FALSE;
+  }
 }
 
 static gboolean
@@ -808,27 +920,105 @@ gst_identity_stop (GstBaseTransform * trans)
   return TRUE;
 }
 
+static gboolean
+gst_identity_accept_caps (GstBaseTransform * base,
+    GstPadDirection direction, GstCaps * caps)
+{
+  gboolean ret;
+  GstPad *pad;
+
+  /* Proxy accept-caps */
+
+  if (direction == GST_PAD_SRC)
+    pad = GST_BASE_TRANSFORM_SINK_PAD (base);
+  else
+    pad = GST_BASE_TRANSFORM_SRC_PAD (base);
+
+  ret = gst_pad_peer_query_accept_caps (pad, caps);
+
+  return ret;
+}
+
+static gboolean
+gst_identity_query (GstBaseTransform * base, GstPadDirection direction,
+    GstQuery * query)
+{
+  GstIdentity *identity;
+  gboolean ret;
+
+  identity = GST_IDENTITY (base);
+
+  if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION &&
+      identity->drop_allocation) {
+    GST_DEBUG_OBJECT (identity, "Dropping allocation query.");
+    return FALSE;
+  }
+
+  ret = GST_BASE_TRANSFORM_CLASS (parent_class)->query (base, direction, query);
+
+  if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) {
+    gboolean live = FALSE;
+    GstClockTime min = 0, max = 0;
+
+    if (ret) {
+      gst_query_parse_latency (query, &live, &min, &max);
+
+      if (identity->sync && max < min) {
+        GST_ELEMENT_WARNING (base, CORE, CLOCK, (NULL),
+            ("Impossible to configure latency before identity sync=true:"
+                " max %" GST_TIME_FORMAT " < min %"
+                GST_TIME_FORMAT ". Add queues or other buffering elements.",
+                GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
+      }
+    }
+
+    /* Ignore the upstream latency if it is not live */
+    GST_OBJECT_LOCK (identity);
+    if (live)
+      identity->upstream_latency = min;
+    else
+      identity->upstream_latency = 0;
+    GST_OBJECT_UNLOCK (identity);
+
+    gst_query_set_latency (query, live || identity->sync, min, max);
+    ret = TRUE;
+  }
+  return ret;
+}
+
 static GstStateChangeReturn
 gst_identity_change_state (GstElement * element, GstStateChange transition)
 {
   GstStateChangeReturn ret;
   GstIdentity *identity = GST_IDENTITY (element);
+  gboolean no_preroll = FALSE;
 
   switch (transition) {
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+      GST_OBJECT_LOCK (identity);
+      identity->flushing = FALSE;
+      identity->blocked = TRUE;
+      GST_OBJECT_UNLOCK (identity);
+      if (identity->sync)
+        no_preroll = TRUE;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      GST_OBJECT_LOCK (identity);
+      identity->blocked = FALSE;
+      g_cond_broadcast (&identity->blocked_cond);
+      GST_OBJECT_UNLOCK (identity);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       GST_OBJECT_LOCK (identity);
+      identity->flushing = TRUE;
       if (identity->clock_id) {
         GST_DEBUG_OBJECT (identity, "unlock clock wait");
         gst_clock_id_unschedule (identity->clock_id);
-        gst_clock_id_unref (identity->clock_id);
-        identity->clock_id = NULL;
       }
+      identity->blocked = FALSE;
+      g_cond_broadcast (&identity->blocked_cond);
       GST_OBJECT_UNLOCK (identity);
       break;
     default:
@@ -839,6 +1029,12 @@ gst_identity_change_state (GstElement * element, GstStateChange transition)
 
   switch (transition) {
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      GST_OBJECT_LOCK (identity);
+      identity->upstream_latency = 0;
+      identity->blocked = TRUE;
+      GST_OBJECT_UNLOCK (identity);
+      if (identity->sync)
+        no_preroll = TRUE;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       break;
@@ -848,5 +1044,8 @@ gst_identity_change_state (GstElement * element, GstStateChange transition)
       break;
   }
 
+  if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
+    ret = GST_STATE_CHANGE_NO_PREROLL;
+
   return ret;
 }