gst/rtpmanager/gstrtpbin.*: Add signal to notify listeners when a sender becomes...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
index 08a55f2..d48bc40 100644 (file)
 
 /**
  * SECTION:element-gstrtpjitterbuffer
- * @short_description: buffer, reorder and remove duplicate RTP packets to
- * compensate for network oddities.
  *
- * <refsect2>
- * <para>
  * This element reorders and removes duplicate RTP packets as they are received
  * from a network source. It will also wait for missing packets up to a
- * configurable time limit using the ::latency property. Packets arriving too
- * late are considered to be lost packets.
- * </para>
- * <para>
- * This element acts as a live element and so adds ::latency to the pipeline.
- * </para>
- * <para>
+ * configurable time limit using the #GstRtpJitterBuffer:latency property.
+ * Packets arriving too late are considered to be lost packets.
+ * 
+ * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
+ * to the pipeline.
+ * 
  * The element needs the clock-rate of the RTP payload in order to estimate the
  * delay. This information is obtained either from the caps on the sink pad or,
- * when no caps are present, from the ::request-pt-map signal. To clear the
- * previous pt-map use the ::clear-pt-map signal.
- * </para>
- * <para>
+ * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
+ * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
+ * 
  * This element will automatically be used inside gstrtpbin.
- * </para>
+ * 
+ * <refsect2>
  * <title>Example pipelines</title>
- * <para>
- * <programlisting>
+ * |[
  * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
- * </programlisting>
- * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
+ * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
  * inserted into the pipeline to smooth out network jitter and to reorder the
  * out-of-order RTP packets.
- * </para>
  * </refsect2>
  *
  * Last reviewed on 2007-05-28 (0.10.5)
@@ -73,6 +65,7 @@
 
 #include "gstrtpjitterbuffer.h"
 #include "rtpjitterbuffer.h"
+#include "rtpstats.h"
 
 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
@@ -100,20 +93,23 @@ enum
 #define DEFAULT_LATENCY_MS      200
 #define DEFAULT_DROP_ON_LATENCY FALSE
 #define DEFAULT_TS_OFFSET       0
+#define DEFAULT_DO_LOST         FALSE
 
 enum
 {
   PROP_0,
   PROP_LATENCY,
   PROP_DROP_ON_LATENCY,
-  PROP_TS_OFFSET
+  PROP_TS_OFFSET,
+  PROP_DO_LOST,
+  PROP_LAST
 };
 
 #define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
 
 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
   JBUF_LOCK (priv);                                   \
-  if (priv->srcresult != GST_FLOW_OK)                 \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
     goto label;                                       \
 } G_STMT_END
 
@@ -122,7 +118,7 @@ enum
 
 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
   JBUF_WAIT(priv);                                    \
-  if (priv->srcresult != GST_FLOW_OK)                 \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
     goto label;                                       \
 } G_STMT_END
 
@@ -135,24 +131,29 @@ struct _GstRtpJitterBufferPrivate
   RTPJitterBuffer *jbuf;
   GMutex *jbuf_lock;
   GCond *jbuf_cond;
+  gboolean waiting;
+  gboolean discont;
 
   /* properties */
   guint latency_ms;
   gboolean drop_on_latency;
   gint64 ts_offset;
+  gboolean do_lost;
 
   /* the last seqnum we pushed out */
   guint32 last_popped_seqnum;
   /* the next expected seqnum */
   guint32 next_seqnum;
+  /* last output time */
+  GstClockTime last_out_time;
 
   /* state */
   gboolean eos;
 
   /* clock rate and rtp timestamp offset */
+  gint last_pt;
   gint32 clock_rate;
   gint64 clock_base;
-  guint64 exttimestamp;
   gint64 prev_ts_offset;
 
   /* when we are shutting down */
@@ -162,7 +163,6 @@ struct _GstRtpJitterBufferPrivate
   /* for sync */
   GstSegment segment;
   GstClockID clock_id;
-  guint32 waiting_seqnum;
   /* the latency of the upstream peer, we have to take this into account when
    * synchronizing the buffers. */
   GstClockTime peer_latency;
@@ -208,7 +208,7 @@ static void gst_rtp_jitter_buffer_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec);
 static void gst_rtp_jitter_buffer_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
-static void gst_rtp_jitter_buffer_dispose (GObject * object);
+static void gst_rtp_jitter_buffer_finalize (GObject * object);
 
 /* element overrides */
 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
@@ -219,6 +219,8 @@ static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
 
 /* sinkpad overrides */
 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
+static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
+    GstEvent * event);
 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
     GstEvent * event);
 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
@@ -256,7 +258,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
 
   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
 
-  gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_dispose);
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_finalize);
 
   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
@@ -284,13 +286,25 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
   /**
    * GstRtpJitterBuffer::ts-offset:
    * 
-   * Adjust RTP timestamps in the jitterbuffer with offset.
+   * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
+   * This is mainly used to ensure interstream synchronisation.
    */
   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
-      g_param_spec_int64 ("ts-offset",
-          "Timestamp Offset",
-          "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64,
-          G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE));
+      g_param_spec_int64 ("ts-offset", "Timestamp Offset",
+          "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
+          G_MAXINT64, DEFAULT_TS_OFFSET,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstRtpJitterBuffer::do-lost:
+   * 
+   * Send out a GstRTPPacketLost event downstream when a packet is considered
+   * lost.
+   */
+  g_object_class_install_property (gobject_class, PROP_DO_LOST,
+      g_param_spec_boolean ("do-lost", "Do Lost",
+          "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   /**
    * GstRtpJitterBuffer::request-pt-map:
    * @buffer: the object which received the signal
@@ -307,7 +321,8 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
    * GstRtpJitterBuffer::clear-pt-map:
    * @buffer: the object which received the signal
    *
-   * Invalidate the clock-rate as obtained with the ::request-pt-map signal.
+   * Invalidate the clock-rate as obtained with the
+   * #GstRtpJitterBuffer::request-pt-map signal.
    */
   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
@@ -320,7 +335,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
 
   GST_DEBUG_CATEGORY_INIT
-      (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+      (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
 }
 
 static void
@@ -334,13 +349,12 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
 
   priv->latency_ms = DEFAULT_LATENCY_MS;
   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
+  priv->do_lost = DEFAULT_DO_LOST;
 
   priv->jbuf = rtp_jitter_buffer_new ();
   priv->jbuf_lock = g_mutex_new ();
   priv->jbuf_cond = g_cond_new ();
 
-  priv->waiting_seqnum = -1;
-
   priv->srcpad =
       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
       "src");
@@ -351,6 +365,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
   gst_pad_set_getcaps_function (priv->srcpad,
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
+  gst_pad_set_event_function (priv->srcpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
 
   priv->sinkpad =
       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
@@ -370,17 +386,18 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
 }
 
 static void
-gst_rtp_jitter_buffer_dispose (GObject * object)
+gst_rtp_jitter_buffer_finalize (GObject * object)
 {
   GstRtpJitterBuffer *jitterbuffer;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
-  if (jitterbuffer->priv->jbuf) {
-    g_object_unref (jitterbuffer->priv->jbuf);
-    jitterbuffer->priv->jbuf = NULL;
-  }
 
-  G_OBJECT_CLASS (parent_class)->dispose (object);
+  g_mutex_free (jitterbuffer->priv->jbuf_lock);
+  g_cond_free (jitterbuffer->priv->jbuf_cond);
+
+  g_object_unref (jitterbuffer->priv->jbuf);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 static void
@@ -522,7 +539,6 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
   /* this unblocks any waiting pops on the src pad task */
   JBUF_SIGNAL (priv);
-  rtp_jitter_buffer_flush (priv->jbuf);
   /* unlock clock, we just unschedule, the entry will be released by the 
    * locking streaming thread. */
   if (priv->clock_id)
@@ -543,10 +559,12 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
   priv->srcresult = GST_FLOW_OK;
   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
   priv->last_popped_seqnum = -1;
+  priv->last_out_time = -1;
   priv->next_seqnum = -1;
   priv->clock_rate = -1;
   priv->eos = FALSE;
-  priv->exttimestamp = -1;
+  rtp_jitter_buffer_flush (priv->jbuf);
+  rtp_jitter_buffer_reset_skew (priv->jbuf);
   JBUF_UNLOCK (priv);
 }
 
@@ -601,9 +619,11 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       priv->clock_rate = -1;
       priv->clock_base = -1;
       priv->peer_latency = 0;
+      priv->last_pt = -1;
       /* block until we go to PLAYING */
       priv->blocked = TRUE;
-      priv->exttimestamp = -1;
+      /* reset skew detection initialy */
+      rtp_jitter_buffer_reset_skew (priv->jbuf);
       JBUF_UNLOCK (priv);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@@ -645,19 +665,26 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
   return ret;
 }
 
-/**
- * Performs comparison 'b - a' with check for overflows.
- */
-static inline gint
-priv_compare_rtp_seq_lt (guint16 a, guint16 b)
+static gboolean
+gst_rtp_jitter_buffer_src_event (GstPad * pad, GstEvent * event)
 {
-  /* check if diff more than half of the 16bit range */
-  if (abs (b - a) > (1 << 15)) {
-    /* one of a/b has wrapped */
-    return a - b;
-  } else {
-    return b - a;
+  gboolean ret = TRUE;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
+
+  switch (GST_EVENT_TYPE (event)) {
+    default:
+      ret = gst_pad_push_event (priv->sinkpad, event);
+      break;
   }
+  gst_object_unref (jitterbuffer);
+
+  return ret;
 }
 
 static gboolean
@@ -770,12 +797,17 @@ gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
       &ret);
 
-  caps = (GstCaps *) g_value_get_boxed (&ret);
+  g_value_unset (&args[0]);
+  g_value_unset (&args[1]);
+  caps = (GstCaps *) g_value_dup_boxed (&ret);
+  g_value_unset (&ret);
   if (!caps)
     goto no_caps;
 
   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
 
+  gst_caps_unref (caps);
+
   return res;
 
   /* ERRORS */
@@ -793,38 +825,93 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   GstRtpJitterBufferPrivate *priv;
   guint16 seqnum;
   GstFlowReturn ret = GST_FLOW_OK;
+  GstClockTime timestamp;
+  guint64 latency_ts;
+  gboolean tail;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
-  if (!gst_rtp_buffer_validate (buffer))
+  if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
     goto invalid_buffer;
 
   priv = jitterbuffer->priv;
 
-  if (priv->clock_rate == -1) {
+  if (G_UNLIKELY (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer))) {
+    GstCaps *caps;
+
+    priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
+    /* reset clock-rate so that we get a new one */
+    priv->clock_rate = -1;
+    /* Try to get the clock-rate from the caps first if we can. If there are no
+     * caps we must fire the signal to get the clock-rate. */
+    if ((caps = GST_BUFFER_CAPS (buffer))) {
+      gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+    }
+  }
+
+  if (G_UNLIKELY (priv->clock_rate == -1)) {
     guint8 pt;
 
     /* no clock rate given on the caps, try to get one with the signal */
     pt = gst_rtp_buffer_get_payload_type (buffer);
 
     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
-    if (priv->clock_rate == -1)
+    if (G_UNLIKELY (priv->clock_rate == -1))
       goto not_negotiated;
   }
 
+  /* take the timestamp of the buffer. This is the time when the packet was
+   * received and is used to calculate jitter and clock skew. We will adjust
+   * this timestamp with the smoothed value after processing it in the
+   * jitterbuffer. */
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  /* bring to running time */
+  timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
+      timestamp);
+
   seqnum = gst_rtp_buffer_get_seq (buffer);
-  GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
+  GST_DEBUG_OBJECT (jitterbuffer,
+      "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
+      GST_TIME_ARGS (timestamp));
 
   JBUF_LOCK_CHECK (priv, out_flushing);
   /* don't accept more data on EOS */
-  if (priv->eos)
+  if (G_UNLIKELY (priv->eos))
     goto have_eos;
 
-  /* let's check if this buffer is too late, we cannot accept packets with
-   * bigger seqnum than the one we already pushed. */
-  if (priv->last_popped_seqnum != -1) {
-    if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
-      goto too_late;
+  /* let's check if this buffer is too late, we can only accept packets with
+   * bigger seqnum than the one we last pushed. */
+  if (G_LIKELY (priv->last_popped_seqnum != -1)) {
+    gint gap;
+    gboolean reset = FALSE;
+
+    gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
+
+    if (G_UNLIKELY (gap <= 0)) {
+      /* priv->last_popped_seqnum >= seqnum, this packet is too late or the
+       * sender might have been restarted with different seqnum. */
+      if (gap < -RTP_MAX_MISORDER) {
+        GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
+        reset = TRUE;
+      } else {
+        goto too_late;
+      }
+    } else {
+      /* priv->last_popped_seqnum < seqnum, this is a new packet */
+      if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
+        GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
+            gap);
+        reset = TRUE;
+      } else {
+        GST_DEBUG_OBJECT (jitterbuffer, "dropped packets %d but <= %d", gap,
+            RTP_MAX_DROPOUT);
+      }
+    }
+    if (G_UNLIKELY (reset)) {
+      priv->last_popped_seqnum = -1;
+      priv->next_seqnum = -1;
+      rtp_jitter_buffer_reset_skew (priv->jbuf);
+    }
   }
 
   /* let's drop oldest packet if the queue is already full and drop-on-latency
@@ -832,38 +919,43 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
    * latency is set, we just pump it in the queue and let the other end push it
    * out as fast as possible. */
   if (priv->latency_ms && priv->drop_on_latency) {
-    guint64 latency_ts;
 
     latency_ts =
         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
 
-    if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
+    if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
       GstBuffer *old_buf;
 
+      old_buf = rtp_jitter_buffer_pop (priv->jbuf);
+
       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
-          seqnum);
+          gst_rtp_buffer_get_seq (old_buf));
 
-      old_buf = rtp_jitter_buffer_pop (priv->jbuf);
       gst_buffer_unref (old_buf);
     }
   }
 
+  /* we need to make the metadata writable before pushing it in the jitterbuffer
+   * because the jitterbuffer will update the timestamp */
+  buffer = gst_buffer_make_metadata_writable (buffer);
+
   /* now insert the packet into the queue in sorted order. This function returns
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
-  if (!rtp_jitter_buffer_insert (priv->jbuf, buffer))
+  if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
+              priv->clock_rate, &tail)))
     goto duplicate;
 
-  /* signal addition of new buffer */
-  JBUF_SIGNAL (priv);
+  /* signal addition of new buffer when the _loop is waiting. */
+  if (priv->waiting)
+    JBUF_SIGNAL (priv);
 
   /* let's unschedule and unblock any waiting buffers. We only want to do this
-   * if there is a currently waiting newer (> seqnum) buffer  */
-  if (priv->clock_id) {
-    if (priv->waiting_seqnum > seqnum) {
-      gst_clock_id_unschedule (priv->clock_id);
-      GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
-    }
+   * when the tail buffer changed */
+  if (G_UNLIKELY (priv->clock_id && tail)) {
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "Unscheduling waiting buffer, new tail buffer");
+    gst_clock_id_unschedule (priv->clock_id);
   }
 
   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
@@ -879,19 +971,19 @@ finished:
   /* ERRORS */
 invalid_buffer:
   {
-    /* this is fatal and should be filtered earlier */
-    GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
-        ("Received invalid RTP payload"));
+    /* this is not fatal but should be filtered earlier */
+    GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
+        ("Received invalid RTP payload, dropping"));
     gst_buffer_unref (buffer);
     gst_object_unref (jitterbuffer);
-    return GST_FLOW_ERROR;
+    return GST_FLOW_OK;
   }
 not_negotiated:
   {
     GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
     gst_buffer_unref (buffer);
     gst_object_unref (jitterbuffer);
-    return GST_FLOW_NOT_NEGOTIATED;
+    return GST_FLOW_OK;
   }
 out_flushing:
   {
@@ -925,61 +1017,105 @@ duplicate:
   }
 }
 
+static GstClockTime
+apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  if (timestamp == -1)
+    return -1;
+
+  /* apply the timestamp offset */
+  timestamp += priv->ts_offset;
+
+  return timestamp;
+}
+
 /**
  * This funcion will push out buffers on the source pad.
  *
  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
  * different seqnum (missing packets before B), this function will wait for the
- * missing packet to arrive up to the rtp timestamp of buffer B.
+ * missing packet to arrive up to the timestamp of buffer B.
  */
 static void
 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
 {
   GstRtpJitterBufferPrivate *priv;
-  GstBuffer *outbuf = NULL;
+  GstBuffer *outbuf;
   GstFlowReturn result;
   guint16 seqnum;
-  guint32 rtp_time;
-  GstClockTime timestamp;
-  gint64 running_time;
-  guint64 exttimestamp;
-  gint ts_offset_rtp;
+  guint32 next_seqnum;
+  GstClockTime timestamp, out_time;
+  gboolean discont = FALSE;
+  gint gap;
 
   priv = jitterbuffer->priv;
 
   JBUF_LOCK_CHECK (priv, flushing);
 again:
-  GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
+  GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
   while (TRUE) {
-
     /* always wait if we are blocked */
-    if (!priv->blocked) {
-      /* if we have a packet, we can grab it */
+    if (G_LIKELY (!priv->blocked)) {
+      /* if we have a packet, we can exit the loop and grab it */
       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
         break;
       /* no packets but we are EOS, do eos logic */
-      if (priv->eos)
+      if (G_UNLIKELY (priv->eos))
         goto do_eos;
     }
-    /* wait for packets or flushing now */
+    /* underrun, wait for packets or flushing now */
+    priv->waiting = TRUE;
     JBUF_WAIT_CHECK (priv, flushing);
+    priv->waiting = FALSE;
   }
 
-  /* pop a buffer, we must have a buffer now */
-  outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+  /* peek a buffer, we're just looking at the timestamp and the sequence number.
+   * If all is fine, we'll pop and push it. If the sequence number is wrong we
+   * wait on the timestamp. In the chain function we will unlock the wait when a
+   * new buffer is available. The peeked buffer is valid for as long as we hold
+   * the jitterbuffer lock. */
+  outbuf = rtp_jitter_buffer_peek (priv->jbuf);
 
+  /* get the seqnum and the next expected seqnum */
   seqnum = gst_rtp_buffer_get_seq (outbuf);
+  next_seqnum = priv->next_seqnum;
 
-  /* get the max deadline to wait for the missing packets, this is the time
-   * of the currently popped packet */
-  rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
-  exttimestamp = gst_rtp_buffer_ext_timestamp (&priv->exttimestamp, rtp_time);
+  /* get the timestamp, this is already corrected for clock skew by the
+   * jitterbuffer */
+  timestamp = GST_BUFFER_TIMESTAMP (outbuf);
 
   GST_DEBUG_OBJECT (jitterbuffer,
-      "Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT
-      ", now %d left", seqnum, rtp_time, exttimestamp,
+      "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
+      ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
       rtp_jitter_buffer_num_packets (priv->jbuf));
 
+  /* apply our timestamp offset to the incomming buffer, this will be our output
+   * timestamp. */
+  out_time = apply_offset (jitterbuffer, timestamp);
+
+  /* get the gap between this and the previous packet. If we don't know the
+   * previous packet seqnum assume no gap. */
+  if (G_LIKELY (next_seqnum != -1)) {
+    gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
+
+    /* if we have a packet that we already pushed or considered dropped, pop it
+     * off and get the next packet */
+    if (G_UNLIKELY (gap < 0)) {
+      GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
+          seqnum, next_seqnum);
+      outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+      gst_buffer_unref (outbuf);
+      goto again;
+    }
+  } else {
+    GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
+    gap = -1;
+  }
+
   /* If we don't know what the next seqnum should be (== -1) we have to wait
    * because it might be possible that we are not receiving this buffer in-order,
    * a buffer with a lower seqnum could arrive later and we want to push that
@@ -988,17 +1124,37 @@ again:
    * determine if we have missing a packet. If we have a missing packet (which
    * must be before this packet) we can wait for it until the deadline for this
    * packet expires. */
-  if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) {
+  if (G_UNLIKELY (gap != 0 && out_time != -1)) {
     GstClockID id;
-    GstClockTimeDiff jitter;
+    GstClockTime sync_time;
     GstClockReturn ret;
     GstClock *clock;
+    GstClockTime duration = GST_CLOCK_TIME_NONE;
 
-    if (priv->next_seqnum != -1) {
-      /* we expected next_seqnum but received something else, that's a gap */
+    if (gap > 0) {
+      /* we have a gap */
       GST_WARNING_OBJECT (jitterbuffer,
-          "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
-          seqnum);
+          "Sequence number GAP detected: expected %d instead of %d (%d missing)",
+          next_seqnum, seqnum, gap);
+
+      if (priv->last_out_time != -1) {
+        GST_DEBUG_OBJECT (jitterbuffer,
+            "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
+        /* interpolate between the current time and the last time based on
+         * number of packets we are missing, this is the estimated duration
+         * for the missing packet based on equidistant packet spacing. Also make
+         * sure we never go negative. */
+        if (out_time > priv->last_out_time)
+          duration = (out_time - priv->last_out_time) / (gap + 1);
+        else
+          goto lost;
+
+        GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (duration));
+        /* add this duration to the timestamp of the last packet we pushed */
+        out_time = (priv->last_out_time + duration);
+      }
     } else {
       /* we don't know what the next_seqnum should be, wait for the last
        * possible moment to push this buffer, maybe we get an earlier seqnum
@@ -1006,33 +1162,6 @@ again:
       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
     }
 
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "exttimestamp %" G_GUINT64_FORMAT ", base %" G_GINT64_FORMAT,
-        exttimestamp, priv->clock_base);
-
-    /* if no clock_base was given, take first ts as base */
-    if (priv->clock_base == -1) {
-      GST_DEBUG_OBJECT (jitterbuffer,
-          "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp);
-      priv->clock_base = exttimestamp;
-    }
-
-    /* take rtp timestamp offset into account, this can wrap around */
-    exttimestamp -= priv->clock_base;
-
-    /* bring timestamp to gst time */
-    timestamp =
-        gst_util_uint64_scale_int (exttimestamp, GST_SECOND, priv->clock_rate);
-
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "exttimestamp %" G_GUINT64_FORMAT ", clock-rate %u, timestamp %"
-        GST_TIME_FORMAT, exttimestamp, priv->clock_rate,
-        GST_TIME_ARGS (timestamp));
-
-    /* bring to running time */
-    running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
-        timestamp);
-
     GST_OBJECT_LOCK (jitterbuffer);
     clock = GST_ELEMENT_CLOCK (jitterbuffer);
     if (!clock) {
@@ -1041,110 +1170,107 @@ again:
       goto push_buffer;
     }
 
-    /* add latency, this includes our own latency and the peer latency. */
-    running_time += (priv->latency_ms * GST_MSECOND);
-    running_time += priv->peer_latency;
-
-    GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (running_time));
+    GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (out_time));
 
     /* prepare for sync against clock */
-    running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time;
+    sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+    /* add latency, this includes our own latency and the peer latency. */
+    sync_time += (priv->latency_ms * GST_MSECOND);
+    sync_time += priv->peer_latency;
 
     /* create an entry for the clock */
-    id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time);
-    priv->waiting_seqnum = seqnum;
+    id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
     GST_OBJECT_UNLOCK (jitterbuffer);
 
     /* release the lock so that the other end can push stuff or unlock */
     JBUF_UNLOCK (priv);
 
-    ret = gst_clock_id_wait (id, &jitter);
+    ret = gst_clock_id_wait (id, NULL);
 
     JBUF_LOCK (priv);
     /* and free the entry */
     gst_clock_id_unref (id);
     priv->clock_id = NULL;
-    priv->waiting_seqnum = -1;
 
     /* at this point, the clock could have been unlocked by a timeout, a new
      * tail element was added to the queue or because we are shutting down. Check
      * for shutdown first. */
-    if (priv->srcresult != GST_FLOW_OK)
-      goto flushing;
+    if G_UNLIKELY
+      ((priv->srcresult != GST_FLOW_OK))
+          goto flushing;
 
     /* if we got unscheduled and we are not flushing, it's because a new tail
      * element became available in the queue. Grab it and try to push or sync. */
     if (ret == GST_CLOCK_UNSCHEDULED) {
       GST_DEBUG_OBJECT (jitterbuffer,
           "Wait got unscheduled, will retry to push with new buffer");
-      /* reinsert popped buffer into queue */
-      if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) {
-        GST_DEBUG_OBJECT (jitterbuffer,
-            "Duplicate packet #%d detected, dropping", seqnum);
-        priv->num_duplicates++;
-        gst_buffer_unref (outbuf);
-      }
       goto again;
     }
-  }
-push_buffer:
-  /* check if we are pushing something unexpected */
-  if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
-    gint dropped;
 
-    /* calc number of missing packets, careful for wraparounds */
-    dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
+  lost:
+    /* we now timed out, this means we lost a packet or finished synchronizing
+     * on the first buffer. */
+    if (gap > 0) {
+      GstEvent *event;
+
+      /* we had a gap and thus we lost a packet. Create an event for this.  */
+      GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
+      priv->num_late++;
+      discont = TRUE;
+
+      if (priv->do_lost) {
+        /* create paket lost event */
+        event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+            gst_structure_new ("GstRTPPacketLost",
+                "seqnum", G_TYPE_UINT, (guint) next_seqnum,
+                "timestamp", G_TYPE_UINT64, out_time,
+                "duration", G_TYPE_UINT64, duration, NULL));
+        gst_pad_push_event (priv->srcpad, event);
+      }
 
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "Pushing DISCONT after dropping %d (%d to %d)", dropped,
-        priv->next_seqnum, seqnum);
+      /* update our expected next packet */
+      priv->last_popped_seqnum = next_seqnum;
+      priv->last_out_time = out_time;
+      priv->next_seqnum = (next_seqnum + 1) & 0xffff;
+      /* look for next packet */
+      goto again;
+    }
 
-    /* update stats */
-    priv->num_late += dropped;
+    /* there was no known gap,just the first packet, exit the loop and push */
+    GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
 
-    /* set DISCONT flag */
-    outbuf = gst_buffer_make_metadata_writable (outbuf);
-    GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+    /* get new timestamp, latency might have changed */
+    out_time = apply_offset (jitterbuffer, timestamp);
   }
+push_buffer:
 
-  /* apply the timestamp offset */
-  if (priv->ts_offset > 0)
-    ts_offset_rtp =
-        gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate,
-        GST_SECOND);
-  else if (priv->ts_offset < 0)
-    ts_offset_rtp =
-        -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate,
-        GST_SECOND);
-  else
-    ts_offset_rtp = 0;
-
-  if (ts_offset_rtp != 0) {
-    guint32 timestamp;
-
-    /* if the offset changed, mark with discont */
-    if (priv->ts_offset != priv->prev_ts_offset) {
-      GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp);
-      GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
-      priv->prev_ts_offset = priv->ts_offset;
-    }
+  /* when we get here we are ready to pop and push the buffer */
+  outbuf = rtp_jitter_buffer_pop (priv->jbuf);
 
-    timestamp = gst_rtp_buffer_get_timestamp (outbuf);
-    timestamp += ts_offset_rtp;
-    gst_rtp_buffer_set_timestamp (outbuf, timestamp);
+  if (G_UNLIKELY (discont || priv->discont)) {
+    /* set DISCONT flag when we missed a packet. We pushed the buffer writable
+     * into the jitterbuffer so we can modify now. */
+    GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+    priv->discont = FALSE;
   }
 
+  /* apply timestamp with offset to buffer now */
+  GST_BUFFER_TIMESTAMP (outbuf) = out_time;
+
   /* now we are ready to push the buffer. Save the seqnum and release the lock
    * so the other end can push stuff in the queue again. */
   priv->last_popped_seqnum = seqnum;
+  priv->last_out_time = out_time;
   priv->next_seqnum = (seqnum + 1) & 0xffff;
   JBUF_UNLOCK (priv);
 
   /* push buffer */
-  GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
+  GST_DEBUG_OBJECT (jitterbuffer,
+      "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
+      GST_TIME_ARGS (out_time));
   result = gst_pad_push (priv->srcpad, outbuf);
-  if (result != GST_FLOW_OK)
+  if (G_UNLIKELY (result != GST_FLOW_OK))
     goto pause;
 
   return;
@@ -1164,8 +1290,6 @@ flushing:
   {
     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
     gst_pad_pause_task (priv->srcpad);
-    if (outbuf)
-      gst_buffer_unref (outbuf);
     JBUF_UNLOCK (priv);
     return;
   }
@@ -1203,40 +1327,33 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
        * own */
       GstClockTime min_latency, max_latency;
       gboolean us_live;
-      GstPad *peer;
       GstClockTime our_latency;
 
-      if ((peer = gst_pad_get_peer (priv->sinkpad))) {
-        if ((res = gst_pad_query (peer, query))) {
-          gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
-
-          GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
-              GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+      if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
+        gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
 
-          /* store this so that we can safely sync on the peer buffers. */
-          JBUF_LOCK (priv);
-          priv->peer_latency = min_latency;
-          JBUF_UNLOCK (priv);
+        GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
+            GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
 
-          our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
+        /* store this so that we can safely sync on the peer buffers. */
+        JBUF_LOCK (priv);
+        priv->peer_latency = min_latency;
+        our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
+        JBUF_UNLOCK (priv);
 
-          GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (our_latency));
+        GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (our_latency));
 
-          min_latency += our_latency;
-          /* max_latency can be -1, meaning there is no upper limit for the
-           * latency. */
-          if (max_latency != -1)
-            max_latency += our_latency * GST_MSECOND;
+        /* we add some latency but can buffer an infinite amount of time */
+        min_latency += our_latency;
+        max_latency = -1;
 
-          GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
-              GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+        GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
+            GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
 
-          gst_query_set_latency (query, TRUE, min_latency, max_latency);
-        }
-        gst_object_unref (peer);
+        gst_query_set_latency (query, TRUE, min_latency, max_latency);
       }
       break;
     }
@@ -1244,6 +1361,9 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
       res = gst_pad_query_default (pad, query);
       break;
   }
+
+  gst_object_unref (jitterbuffer);
+
   return res;
 }
 
@@ -1262,11 +1382,12 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
     {
       guint new_latency, old_latency;
 
-      /* FIXME, not threadsafe */
       new_latency = g_value_get_uint (value);
-      old_latency = priv->latency_ms;
 
+      JBUF_LOCK (priv);
+      old_latency = priv->latency_ms;
       priv->latency_ms = new_latency;
+      JBUF_UNLOCK (priv);
 
       /* post message if latency changed, this will inform the parent pipeline
        * that a latency reconfiguration is possible/needed. */
@@ -1280,11 +1401,21 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
       break;
     }
     case PROP_DROP_ON_LATENCY:
+      JBUF_LOCK (priv);
       priv->drop_on_latency = g_value_get_boolean (value);
+      JBUF_UNLOCK (priv);
       break;
     case PROP_TS_OFFSET:
       JBUF_LOCK (priv);
       priv->ts_offset = g_value_get_int64 (value);
+      /* FIXME, we don't really have a method for signaling a timestamp
+       * DISCONT without also making this a data discont. */
+      /* priv->discont = TRUE; */
+      JBUF_UNLOCK (priv);
+      break;
+    case PROP_DO_LOST:
+      JBUF_LOCK (priv);
+      priv->do_lost = g_value_get_boolean (value);
       JBUF_UNLOCK (priv);
       break;
     default:
@@ -1305,18 +1436,42 @@ gst_rtp_jitter_buffer_get_property (GObject * object,
 
   switch (prop_id) {
     case PROP_LATENCY:
+      JBUF_LOCK (priv);
       g_value_set_uint (value, priv->latency_ms);
+      JBUF_UNLOCK (priv);
       break;
     case PROP_DROP_ON_LATENCY:
+      JBUF_LOCK (priv);
       g_value_set_boolean (value, priv->drop_on_latency);
+      JBUF_UNLOCK (priv);
       break;
     case PROP_TS_OFFSET:
       JBUF_LOCK (priv);
       g_value_set_int64 (value, priv->ts_offset);
       JBUF_UNLOCK (priv);
       break;
+    case PROP_DO_LOST:
+      JBUF_LOCK (priv);
+      g_value_set_boolean (value, priv->do_lost);
+      JBUF_UNLOCK (priv);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
 }
+
+void
+gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime,
+    guint64 * timestamp)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer));
+
+  priv = buffer->priv;
+
+  JBUF_LOCK (priv);
+  rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp);
+  JBUF_UNLOCK (priv);
+}