gst/rtpmanager/gstrtpbin.*: Add signal to notify listeners when a sender becomes...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
index e654064..d48bc40 100644 (file)
@@ -4,7 +4,7 @@
  *  Copyright 2007 Collabora Ltd, 
  *  Copyright 2007 Nokia Corporation
  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
- *  Copyright 2007 Wim Taymans <wim@fluendo.com>
+ *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
  */
 
 /**
- * SECTION:element-rtpjitterbuffer
- * @short_description: buffer, reorder and remove duplicate RTP packets to
- * compensate for network oddities.
+ * SECTION:element-gstrtpjitterbuffer
  *
- * <refsect2>
- * <para>
  * This element reorders and removes duplicate RTP packets as they are received
  * from a network source. It will also wait for missing packets up to a
- * configurable time limit using the ::latency property. Packets arriving too
- * late are considered as lost packets.
- * </para>
- * <para>
- * This element acts as a live element and so adds ::latency to the pipeline.
- * </para>
+ * configurable time limit using the #GstRtpJitterBuffer:latency property.
+ * Packets arriving too late are considered to be lost packets.
+ * 
+ * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
+ * to the pipeline.
+ * 
+ * The element needs the clock-rate of the RTP payload in order to estimate the
+ * delay. This information is obtained either from the caps on the sink pad or,
+ * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
+ * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
+ * 
+ * This element will automatically be used inside gstrtpbin.
+ * 
+ * <refsect2>
  * <title>Example pipelines</title>
- * <para>
- * <programlisting>
- * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
- * </programlisting>
- * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
+ * |[
+ * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
+ * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
  * inserted into the pipeline to smooth out network jitter and to reorder the
  * out-of-order RTP packets.
- * </para>
  * </refsect2>
  *
- * Last reviewed on 2007-03-27 (0.10.13)
+ * Last reviewed on 2007-05-28 (0.10.5)
  */
 
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 
+#include <stdlib.h>
 #include <string.h>
 #include <gst/rtp/gstrtpbuffer.h>
 
 #include "gstrtpbin-marshal.h"
 
 #include "gstrtpjitterbuffer.h"
-#include "async_jitter_queue.h"
+#include "rtpjitterbuffer.h"
+#include "rtpstats.h"
 
 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
@@ -74,55 +77,95 @@ GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
 /* elementfactory information */
 static const GstElementDetails gst_rtp_jitter_buffer_details =
 GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
-    "Filter/Network",
+    "Filter/Network/RTP",
     "A buffer that deals with network jitter and other transmission faults",
     "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
-    "Wim Taymans <wim@fluendo.com>");
+    "Wim Taymans <wim.taymans@gmail.com>");
 
 /* RTPJitterBuffer signals and args */
 enum
 {
-  /* FILL ME */
   SIGNAL_REQUEST_PT_MAP,
+  SIGNAL_CLEAR_PT_MAP,
   LAST_SIGNAL
 };
 
 #define DEFAULT_LATENCY_MS      200
 #define DEFAULT_DROP_ON_LATENCY FALSE
+#define DEFAULT_TS_OFFSET       0
+#define DEFAULT_DO_LOST         FALSE
 
 enum
 {
   PROP_0,
   PROP_LATENCY,
-  PROP_DROP_ON_LATENCY
+  PROP_DROP_ON_LATENCY,
+  PROP_TS_OFFSET,
+  PROP_DO_LOST,
+  PROP_LAST
 };
 
-struct _GstRTPJitterBufferPrivate
+#define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
+
+#define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
+  JBUF_LOCK (priv);                                   \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
+    goto label;                                       \
+} G_STMT_END
+
+#define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
+#define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
+
+#define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
+  JBUF_WAIT(priv);                                    \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
+    goto label;                                       \
+} G_STMT_END
+
+#define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
+
+struct _GstRtpJitterBufferPrivate
 {
   GstPad *sinkpad, *srcpad;
 
-  AsyncJitterQueue *queue;
+  RTPJitterBuffer *jbuf;
+  GMutex *jbuf_lock;
+  GCond *jbuf_cond;
+  gboolean waiting;
+  gboolean discont;
 
   /* properties */
   guint latency_ms;
   gboolean drop_on_latency;
+  gint64 ts_offset;
+  gboolean do_lost;
 
   /* the last seqnum we pushed out */
   guint32 last_popped_seqnum;
   /* the next expected seqnum */
   guint32 next_seqnum;
+  /* last output time */
+  GstClockTime last_out_time;
+
+  /* state */
+  gboolean eos;
 
   /* clock rate and rtp timestamp offset */
+  gint last_pt;
   gint32 clock_rate;
   gint64 clock_base;
+  gint64 prev_ts_offset;
 
   /* when we are shutting down */
   GstFlowReturn srcresult;
+  gboolean blocked;
 
   /* for sync */
   GstSegment segment;
   GstClockID clock_id;
-  guint32 waiting_seqnum;
+  /* the latency of the upstream peer, we have to take this into account when
+   * synchronizing the buffers. */
+  GstClockTime peer_latency;
 
   /* some accounting */
   guint64 num_late;
@@ -131,7 +174,7 @@ struct _GstRTPJitterBufferPrivate
 
 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
-                                GstRTPJitterBufferPrivate))
+                                GstRtpJitterBufferPrivate))
 
 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
 GST_STATIC_PAD_TEMPLATE ("sink",
@@ -157,7 +200,7 @@ GST_STATIC_PAD_TEMPLATE ("src",
 
 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
 
-GST_BOILERPLATE (GstRTPJitterBuffer, gst_rtp_jitter_buffer, GstElement,
+GST_BOILERPLATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GstElement,
     GST_TYPE_ELEMENT);
 
 /* object overrides */
@@ -165,7 +208,7 @@ static void gst_rtp_jitter_buffer_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec);
 static void gst_rtp_jitter_buffer_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
-static void gst_rtp_jitter_buffer_dispose (GObject * object);
+static void gst_rtp_jitter_buffer_finalize (GObject * object);
 
 /* element overrides */
 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
@@ -176,6 +219,8 @@ static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
 
 /* sinkpad overrides */
 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
+static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
+    GstEvent * event);
 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
     GstEvent * event);
 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
@@ -184,10 +229,13 @@ static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
 /* srcpad overrides */
 static gboolean
 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
-static void gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer);
+static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
 static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
 
 static void
+gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
+
+static void
 gst_rtp_jitter_buffer_base_init (gpointer klass)
 {
   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
@@ -200,7 +248,7 @@ gst_rtp_jitter_buffer_base_init (gpointer klass)
 }
 
 static void
-gst_rtp_jitter_buffer_class_init (GstRTPJitterBufferClass * klass)
+gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
@@ -208,26 +256,57 @@ gst_rtp_jitter_buffer_class_init (GstRTPJitterBufferClass * klass)
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
 
-  g_type_class_add_private (klass, sizeof (GstRTPJitterBufferPrivate));
+  g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
 
-  gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_dispose);
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_finalize);
 
   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
 
+  /**
+   * GstRtpJitterBuffer::latency:
+   * 
+   * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
+   * for at most this time.
+   */
   g_object_class_install_property (gobject_class, PROP_LATENCY,
       g_param_spec_uint ("latency", "Buffer latency in ms",
           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
           G_PARAM_READWRITE));
-
+  /**
+   * GstRtpJitterBuffer::drop-on-latency:
+   * 
+   * Drop oldest buffers when the queue is completely filled. 
+   */
   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
-      g_param_spec_boolean ("drop_on_latency",
+      g_param_spec_boolean ("drop-on-latency",
           "Drop buffers when maximum latency is reached",
           "Tells the jitterbuffer to never exceed the given latency in size",
           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
+  /**
+   * GstRtpJitterBuffer::ts-offset:
+   * 
+   * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
+   * This is mainly used to ensure interstream synchronisation.
+   */
+  g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
+      g_param_spec_int64 ("ts-offset", "Timestamp Offset",
+          "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
+          G_MAXINT64, DEFAULT_TS_OFFSET,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
-   * GstRTPJitterBuffer::request-pt-map:
+   * GstRtpJitterBuffer::do-lost:
+   * 
+   * Send out a GstRTPPacketLost event downstream when a packet is considered
+   * lost.
+   */
+  g_object_class_install_property (gobject_class, PROP_DO_LOST,
+      g_param_spec_boolean ("do-lost", "Do Lost",
+          "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstRtpJitterBuffer::request-pt-map:
    * @buffer: the object which received the signal
    * @pt: the pt
    *
@@ -235,33 +314,46 @@ gst_rtp_jitter_buffer_class_init (GstRTPJitterBufferClass * klass)
    */
   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPJitterBufferClass,
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
       GST_TYPE_CAPS, 1, G_TYPE_UINT);
+  /**
+   * GstRtpJitterBuffer::clear-pt-map:
+   * @buffer: the object which received the signal
+   *
+   * Invalidate the clock-rate as obtained with the
+   * #GstRtpJitterBuffer::request-pt-map signal.
+   */
+  gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
+      g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
+          clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID,
+      G_TYPE_NONE, 0, G_TYPE_NONE);
 
   gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
 
+  klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
+
   GST_DEBUG_CATEGORY_INIT
-      (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+      (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
 }
 
 static void
-gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
-    GstRTPJitterBufferClass * klass)
+gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
+    GstRtpJitterBufferClass * klass)
 {
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBufferPrivate *priv;
 
   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
   jitterbuffer->priv = priv;
 
   priv->latency_ms = DEFAULT_LATENCY_MS;
   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
+  priv->do_lost = DEFAULT_DO_LOST;
 
-  priv->queue = async_jitter_queue_new ();
-  async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD);
-  async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD);
-
-  priv->waiting_seqnum = -1;
+  priv->jbuf = rtp_jitter_buffer_new ();
+  priv->jbuf_lock = g_mutex_new ();
+  priv->jbuf_cond = g_cond_new ();
 
   priv->srcpad =
       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
@@ -273,6 +365,8 @@ gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
   gst_pad_set_getcaps_function (priv->srcpad,
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
+  gst_pad_set_event_function (priv->srcpad,
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
 
   priv->sinkpad =
       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
@@ -292,24 +386,36 @@ gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
 }
 
 static void
-gst_rtp_jitter_buffer_dispose (GObject * object)
+gst_rtp_jitter_buffer_finalize (GObject * object)
 {
-  GstRTPJitterBuffer *jitterbuffer;
+  GstRtpJitterBuffer *jitterbuffer;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
-  if (jitterbuffer->priv->queue) {
-    async_jitter_queue_unref (jitterbuffer->priv->queue);
-    jitterbuffer->priv->queue = NULL;
-  }
 
-  G_OBJECT_CLASS (parent_class)->dispose (object);
+  g_mutex_free (jitterbuffer->priv->jbuf_lock);
+  g_cond_free (jitterbuffer->priv->jbuf_cond);
+
+  g_object_unref (jitterbuffer->priv->jbuf);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  /* this will trigger a new pt-map request signal, FIXME, do something better. */
+  priv->clock_rate = -1;
 }
 
 static GstCaps *
 gst_rtp_jitter_buffer_getcaps (GstPad * pad)
 {
-  GstRTPJitterBuffer *jitterbuffer;
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
   GstPad *other;
   GstCaps *caps;
   const GstCaps *templ;
@@ -341,12 +447,12 @@ gst_rtp_jitter_buffer_getcaps (GstPad * pad)
 }
 
 static gboolean
-gst_jitter_buffer_sink_parse_caps (GstRTPJitterBuffer * jitterbuffer,
+gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
     GstCaps * caps)
 {
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBufferPrivate *priv;
   GstStructure *caps_struct;
-  const GValue *value;
+  guint val;
 
   priv = jitterbuffer->priv;
 
@@ -368,24 +474,21 @@ gst_jitter_buffer_sink_parse_caps (GstRTPJitterBuffer * jitterbuffer,
   /* gah, clock-base is uint. If we don't have a base, we will use the first
    * buffer timestamp as the base time. This will screw up sync but it's better
    * than nothing. */
-  value = gst_structure_get_value (caps_struct, "clock-base");
-  if (value && G_VALUE_HOLDS_UINT (value)) {
-    priv->clock_base = g_value_get_uint (value);
-    GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
-        priv->clock_base);
-  } else
+  if (gst_structure_get_uint (caps_struct, "clock-base", &val))
+    priv->clock_base = val;
+  else
     priv->clock_base = -1;
 
+  GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
+      priv->clock_base);
+
   /* first expected seqnum */
-  value = gst_structure_get_value (caps_struct, "seqnum-base");
-  if (value && G_VALUE_HOLDS_UINT (value)) {
-    priv->next_seqnum = g_value_get_uint (value);
-    GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
-  } else
+  if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
+    priv->next_seqnum = val;
+  else
     priv->next_seqnum = -1;
 
-  async_jitter_queue_set_max_queue_length (priv->queue,
-      priv->latency_ms * priv->clock_rate / 1000);
+  GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
 
   return TRUE;
 
@@ -405,8 +508,8 @@ wrong_rate:
 static gboolean
 gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
 {
-  GstRTPJitterBuffer *jitterbuffer;
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
   gboolean res;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
@@ -424,60 +527,52 @@ gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
 }
 
 static void
-free_func (gpointer data, GstRTPJitterBuffer * user_data)
+gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
 {
-  if (GST_IS_BUFFER (data))
-    gst_buffer_unref (GST_BUFFER_CAST (data));
-  else
-    gst_event_unref (GST_EVENT_CAST (data));
-}
-
-static void
-gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer)
-{
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBufferPrivate *priv;
 
   priv = jitterbuffer->priv;
 
-  async_jitter_queue_lock (priv->queue);
+  JBUF_LOCK (priv);
   /* mark ourselves as flushing */
   priv->srcresult = GST_FLOW_WRONG_STATE;
   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
   /* this unblocks any waiting pops on the src pad task */
-  async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue,
-      (GFunc) free_func, jitterbuffer);
+  JBUF_SIGNAL (priv);
   /* unlock clock, we just unschedule, the entry will be released by the 
    * locking streaming thread. */
   if (priv->clock_id)
     gst_clock_id_unschedule (priv->clock_id);
-
-  async_jitter_queue_unlock (priv->queue);
+  JBUF_UNLOCK (priv);
 }
 
 static void
-gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
+gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
 {
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBufferPrivate *priv;
 
   priv = jitterbuffer->priv;
 
-  async_jitter_queue_lock (priv->queue);
+  JBUF_LOCK (priv);
   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
   /* Mark as non flushing */
   priv->srcresult = GST_FLOW_OK;
   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
   priv->last_popped_seqnum = -1;
+  priv->last_out_time = -1;
   priv->next_seqnum = -1;
-  /* allow pops from the src pad task */
-  async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue);
-  async_jitter_queue_unlock (priv->queue);
+  priv->clock_rate = -1;
+  priv->eos = FALSE;
+  rtp_jitter_buffer_flush (priv->jbuf);
+  rtp_jitter_buffer_reset_skew (priv->jbuf);
+  JBUF_UNLOCK (priv);
 }
 
 static gboolean
 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
 {
   gboolean result = TRUE;
-  GstRTPJitterBuffer *jitterbuffer = NULL;
+  GstRtpJitterBuffer *jitterbuffer = NULL;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
@@ -508,8 +603,8 @@ static GstStateChangeReturn
 gst_rtp_jitter_buffer_change_state (GstElement * element,
     GstStateChange transition)
 {
-  GstRTPJitterBuffer *jitterbuffer;
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
@@ -519,21 +614,24 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      async_jitter_queue_lock (priv->queue);
+      JBUF_LOCK (priv);
       /* reset negotiated values */
       priv->clock_rate = -1;
       priv->clock_base = -1;
+      priv->peer_latency = 0;
+      priv->last_pt = -1;
       /* block until we go to PLAYING */
-      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
-          TRUE);
-      async_jitter_queue_unlock (priv->queue);
+      priv->blocked = TRUE;
+      /* reset skew detection initialy */
+      rtp_jitter_buffer_reset_skew (priv->jbuf);
+      JBUF_UNLOCK (priv);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      async_jitter_queue_lock (priv->queue);
+      JBUF_LOCK (priv);
       /* unblock to allow streaming in PLAYING */
-      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
-          FALSE);
-      async_jitter_queue_unlock (priv->queue);
+      priv->blocked = FALSE;
+      JBUF_SIGNAL (priv);
+      JBUF_UNLOCK (priv);
       break;
     default:
       break;
@@ -549,11 +647,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
         ret = GST_STATE_CHANGE_NO_PREROLL;
       break;
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      async_jitter_queue_lock (priv->queue);
+      JBUF_LOCK (priv);
       /* block to stop streaming when PAUSED */
-      async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
-          TRUE);
-      async_jitter_queue_unlock (priv->queue);
+      priv->blocked = TRUE;
+      JBUF_UNLOCK (priv);
       if (ret != GST_STATE_CHANGE_FAILURE)
         ret = GST_STATE_CHANGE_NO_PREROLL;
       break;
@@ -568,42 +665,25 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
   return ret;
 }
 
-/**
- * Performs comparison 'b - a' with check for overflows.
- */
-static inline gint
-priv_compare_rtp_seq_lt (guint16 a, guint16 b)
+static gboolean
+gst_rtp_jitter_buffer_src_event (GstPad * pad, GstEvent * event)
 {
-  /* check if diff more than half of the 16bit range */
-  if (abs (b - a) > (1 << 15)) {
-    /* one of a/b has wrapped */
-    return a - b;
-  } else {
-    return b - a;
-  }
-}
+  gboolean ret = TRUE;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
 
-/**
- * gets the seqnum from the buffers and compare them 
- */
-static gint
-compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b)
-{
-  gint ret;
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
 
-  if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) {
-    /* two buffers */
-    ret = priv_compare_rtp_seq_lt
-        (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)),
-        gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b)));
-  } else {
-    /* one of them is an event, the event always goes before the other element
-     * so we return -1. */
-    if (GST_IS_EVENT (a))
-      ret = -1;
-    else
-      ret = 1;
+  GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
+
+  switch (GST_EVENT_TYPE (event)) {
+    default:
+      ret = gst_pad_push_event (priv->sinkpad, event);
+      break;
   }
+  gst_object_unref (jitterbuffer);
+
   return ret;
 }
 
@@ -611,12 +691,14 @@ static gboolean
 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
 {
   gboolean ret = TRUE;
-  GstRTPJitterBuffer *jitterbuffer;
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
   priv = jitterbuffer->priv;
 
+  GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_NEWSEGMENT:
     {
@@ -649,22 +731,31 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
     }
     case GST_EVENT_FLUSH_START:
       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
+      ret = gst_pad_push_event (priv->srcpad, event);
       break;
     case GST_EVENT_FLUSH_STOP:
-      gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
+      ret = gst_pad_push_event (priv->srcpad, event);
+      ret = gst_rtp_jitter_buffer_src_activate_push (priv->srcpad, TRUE);
       break;
     case GST_EVENT_EOS:
     {
       /* push EOS in queue. We always push it at the head */
-      async_jitter_queue_lock (priv->queue);
+      JBUF_LOCK (priv);
       /* check for flushing, we need to discard the event and return FALSE when
        * we are flushing */
       ret = priv->srcresult == GST_FLOW_OK;
-      if (ret)
-        async_jitter_queue_push_unlocked (priv->queue, event);
-      else
-        gst_event_unref (event);
-      async_jitter_queue_unlock (priv->queue);
+      if (ret && !priv->eos) {
+        GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
+        priv->eos = TRUE;
+        JBUF_SIGNAL (priv);
+      } else if (priv->eos) {
+        GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
+      } else {
+        GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
+            gst_flow_get_name (priv->srcresult));
+      }
+      JBUF_UNLOCK (priv);
+      gst_event_unref (event);
       break;
     }
     default:
@@ -687,7 +778,7 @@ newseg_wrong_format:
 }
 
 static gboolean
-gst_rtp_jitter_buffer_get_clock_rate (GstRTPJitterBuffer * jitterbuffer,
+gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
     guint8 pt)
 {
   GValue ret = { 0 };
@@ -706,12 +797,17 @@ gst_rtp_jitter_buffer_get_clock_rate (GstRTPJitterBuffer * jitterbuffer,
   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
       &ret);
 
-  caps = (GstCaps *) g_value_get_boxed (&ret);
+  g_value_unset (&args[0]);
+  g_value_unset (&args[1]);
+  caps = (GstCaps *) g_value_dup_boxed (&ret);
+  g_value_unset (&ret);
   if (!caps)
     goto no_caps;
 
   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
 
+  gst_caps_unref (caps);
+
   return res;
 
   /* ERRORS */
@@ -725,79 +821,148 @@ no_caps:
 static GstFlowReturn
 gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
 {
-  GstRTPJitterBuffer *jitterbuffer;
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
   guint16 seqnum;
-  GstFlowReturn ret;
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstClockTime timestamp;
+  guint64 latency_ts;
+  gboolean tail;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
-  if (!gst_rtp_buffer_validate (buffer))
+  if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
     goto invalid_buffer;
 
   priv = jitterbuffer->priv;
 
-  if (priv->clock_rate == -1) {
+  if (G_UNLIKELY (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer))) {
+    GstCaps *caps;
+
+    priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
+    /* reset clock-rate so that we get a new one */
+    priv->clock_rate = -1;
+    /* Try to get the clock-rate from the caps first if we can. If there are no
+     * caps we must fire the signal to get the clock-rate. */
+    if ((caps = GST_BUFFER_CAPS (buffer))) {
+      gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+    }
+  }
+
+  if (G_UNLIKELY (priv->clock_rate == -1)) {
     guint8 pt;
 
     /* no clock rate given on the caps, try to get one with the signal */
     pt = gst_rtp_buffer_get_payload_type (buffer);
 
     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
-    if (priv->clock_rate == -1)
+    if (G_UNLIKELY (priv->clock_rate == -1))
       goto not_negotiated;
   }
 
+  /* take the timestamp of the buffer. This is the time when the packet was
+   * received and is used to calculate jitter and clock skew. We will adjust
+   * this timestamp with the smoothed value after processing it in the
+   * jitterbuffer. */
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  /* bring to running time */
+  timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
+      timestamp);
+
   seqnum = gst_rtp_buffer_get_seq (buffer);
-  GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
-
-  async_jitter_queue_lock (priv->queue);
-  ret = priv->srcresult;
-  if (ret != GST_FLOW_OK)
-    goto out_flushing;
-
-  /* let's check if this buffer is too late, we cannot accept packets with
-   * bigger seqnum than the one we already pushed. */
-  if (priv->last_popped_seqnum != -1) {
-    if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
-      goto too_late;
+  GST_DEBUG_OBJECT (jitterbuffer,
+      "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
+      GST_TIME_ARGS (timestamp));
+
+  JBUF_LOCK_CHECK (priv, out_flushing);
+  /* don't accept more data on EOS */
+  if (G_UNLIKELY (priv->eos))
+    goto have_eos;
+
+  /* let's check if this buffer is too late, we can only accept packets with
+   * bigger seqnum than the one we last pushed. */
+  if (G_LIKELY (priv->last_popped_seqnum != -1)) {
+    gint gap;
+    gboolean reset = FALSE;
+
+    gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
+
+    if (G_UNLIKELY (gap <= 0)) {
+      /* priv->last_popped_seqnum >= seqnum, this packet is too late or the
+       * sender might have been restarted with different seqnum. */
+      if (gap < -RTP_MAX_MISORDER) {
+        GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
+        reset = TRUE;
+      } else {
+        goto too_late;
+      }
+    } else {
+      /* priv->last_popped_seqnum < seqnum, this is a new packet */
+      if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
+        GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
+            gap);
+        reset = TRUE;
+      } else {
+        GST_DEBUG_OBJECT (jitterbuffer, "dropped packets %d but <= %d", gap,
+            RTP_MAX_DROPOUT);
+      }
+    }
+    if (G_UNLIKELY (reset)) {
+      priv->last_popped_seqnum = -1;
+      priv->next_seqnum = -1;
+      rtp_jitter_buffer_reset_skew (priv->jbuf);
+    }
   }
 
   /* let's drop oldest packet if the queue is already full and drop-on-latency
-   * is set. */
-  if (priv->drop_on_latency) {
-    if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >=
-        priv->latency_ms * priv->clock_rate / 1000) {
-      GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
-          seqnum);
+   * is set. We can only do this when there actually is a latency. When no
+   * latency is set, we just pump it in the queue and let the other end push it
+   * out as fast as possible. */
+  if (priv->latency_ms && priv->drop_on_latency) {
+
+    latency_ts =
+        gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
+
+    if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
       GstBuffer *old_buf;
 
-      old_buf = async_jitter_queue_pop_unlocked (priv->queue);
+      old_buf = rtp_jitter_buffer_pop (priv->jbuf);
+
+      GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
+          gst_rtp_buffer_get_seq (old_buf));
+
       gst_buffer_unref (old_buf);
     }
   }
 
+  /* we need to make the metadata writable before pushing it in the jitterbuffer
+   * because the jitterbuffer will update the timestamp */
+  buffer = gst_buffer_make_metadata_writable (buffer);
+
   /* now insert the packet into the queue in sorted order. This function returns
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
-  if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer,
-          (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL))
+  if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
+              priv->clock_rate, &tail)))
     goto duplicate;
 
+  /* signal addition of new buffer when the _loop is waiting. */
+  if (priv->waiting)
+    JBUF_SIGNAL (priv);
+
   /* let's unschedule and unblock any waiting buffers. We only want to do this
-   * if there is a currently waiting newer (> seqnum) buffer  */
-  if (priv->clock_id) {
-    if (priv->waiting_seqnum > seqnum) {
-      gst_clock_id_unschedule (priv->clock_id);
-      GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
-    }
+   * when the tail buffer changed */
+  if (G_UNLIKELY (priv->clock_id && tail)) {
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "Unscheduling waiting buffer, new tail buffer");
+    gst_clock_id_unschedule (priv->clock_id);
   }
 
-  GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d",
-      seqnum, async_jitter_queue_length_unlocked (priv->queue));
+  GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
+      seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
 
 finished:
-  async_jitter_queue_unlock (priv->queue);
+  JBUF_UNLOCK (priv);
 
   gst_object_unref (jitterbuffer);
 
@@ -806,29 +971,37 @@ finished:
   /* ERRORS */
 invalid_buffer:
   {
-    /* this is fatal and should be filtered earlier */
-    GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
-        ("Received invalid RTP payload"));
+    /* this is not fatal but should be filtered earlier */
+    GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
+        ("Received invalid RTP payload, dropping"));
     gst_buffer_unref (buffer);
     gst_object_unref (jitterbuffer);
-    return GST_FLOW_ERROR;
+    return GST_FLOW_OK;
   }
 not_negotiated:
   {
-    GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
+    GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
     gst_buffer_unref (buffer);
     gst_object_unref (jitterbuffer);
-    return GST_FLOW_NOT_NEGOTIATED;
+    return GST_FLOW_OK;
   }
 out_flushing:
   {
+    ret = priv->srcresult;
     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
     gst_buffer_unref (buffer);
     goto finished;
   }
+have_eos:
+  {
+    ret = GST_FLOW_UNEXPECTED;
+    GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
+    gst_buffer_unref (buffer);
+    goto finished;
+  }
 too_late:
   {
-    GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
+    GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
         " popped, dropping", seqnum, priv->last_popped_seqnum);
     priv->num_late++;
     gst_buffer_unref (buffer);
@@ -836,7 +1009,7 @@ too_late:
   }
 duplicate:
   {
-    GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
+    GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
         seqnum);
     priv->num_duplicates++;
     gst_buffer_unref (buffer);
@@ -844,65 +1017,104 @@ duplicate:
   }
 }
 
+static GstClockTime
+apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  if (timestamp == -1)
+    return -1;
+
+  /* apply the timestamp offset */
+  timestamp += priv->ts_offset;
+
+  return timestamp;
+}
+
 /**
  * This funcion will push out buffers on the source pad.
  *
  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
  * different seqnum (missing packets before B), this function will wait for the
- * missing packet to arrive up to the rtp timestamp of buffer B.
+ * missing packet to arrive up to the timestamp of buffer B.
  */
 static void
-gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
+gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
 {
-  GstRTPJitterBufferPrivate *priv;
-  gpointer elem;
+  GstRtpJitterBufferPrivate *priv;
   GstBuffer *outbuf;
   GstFlowReturn result;
   guint16 seqnum;
-  guint32 rtp_time;
-  GstClockTime timestamp;
-  gint64 running_time;
+  guint32 next_seqnum;
+  GstClockTime timestamp, out_time;
+  gboolean discont = FALSE;
+  gint gap;
 
   priv = jitterbuffer->priv;
 
-  async_jitter_queue_lock (priv->queue);
+  JBUF_LOCK_CHECK (priv, flushing);
 again:
-  GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
-  /* pop a buffer, we will get NULL if the queue was shut down */
-  elem = async_jitter_queue_pop_unlocked (priv->queue);
-  if (!elem)
-    goto no_elem;
-
-  /* special code for events */
-  if (G_UNLIKELY (GST_IS_EVENT (elem))) {
-    GstEvent *event = GST_EVENT_CAST (elem);
-
-    switch (GST_EVENT_TYPE (event)) {
-      case GST_EVENT_EOS:
-        GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue");
-        /* we don't expect more data now, makes upstream perform EOS actions */
-        priv->srcresult = GST_FLOW_UNEXPECTED;
-        break;
-      default:
-        GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue",
-            GST_EVENT_TYPE_NAME (event));
+  GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
+  while (TRUE) {
+    /* always wait if we are blocked */
+    if (G_LIKELY (!priv->blocked)) {
+      /* if we have a packet, we can exit the loop and grab it */
+      if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
         break;
+      /* no packets but we are EOS, do eos logic */
+      if (G_UNLIKELY (priv->eos))
+        goto do_eos;
     }
-    async_jitter_queue_unlock (priv->queue);
-
-    /* push event */
-    gst_pad_push_event (priv->srcpad, event);
-    return;
+    /* underrun, wait for packets or flushing now */
+    priv->waiting = TRUE;
+    JBUF_WAIT_CHECK (priv, flushing);
+    priv->waiting = FALSE;
   }
 
-  /* we know it's a buffer now */
-  outbuf = GST_BUFFER_CAST (elem);
+  /* peek a buffer, we're just looking at the timestamp and the sequence number.
+   * If all is fine, we'll pop and push it. If the sequence number is wrong we
+   * wait on the timestamp. In the chain function we will unlock the wait when a
+   * new buffer is available. The peeked buffer is valid for as long as we hold
+   * the jitterbuffer lock. */
+  outbuf = rtp_jitter_buffer_peek (priv->jbuf);
 
+  /* get the seqnum and the next expected seqnum */
   seqnum = gst_rtp_buffer_get_seq (outbuf);
-
-  GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d",
-      gst_rtp_buffer_get_seq (outbuf),
-      async_jitter_queue_length_unlocked (priv->queue));
+  next_seqnum = priv->next_seqnum;
+
+  /* get the timestamp, this is already corrected for clock skew by the
+   * jitterbuffer */
+  timestamp = GST_BUFFER_TIMESTAMP (outbuf);
+
+  GST_DEBUG_OBJECT (jitterbuffer,
+      "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
+      ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
+      rtp_jitter_buffer_num_packets (priv->jbuf));
+
+  /* apply our timestamp offset to the incomming buffer, this will be our output
+   * timestamp. */
+  out_time = apply_offset (jitterbuffer, timestamp);
+
+  /* get the gap between this and the previous packet. If we don't know the
+   * previous packet seqnum assume no gap. */
+  if (G_LIKELY (next_seqnum != -1)) {
+    gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
+
+    /* if we have a packet that we already pushed or considered dropped, pop it
+     * off and get the next packet */
+    if (G_UNLIKELY (gap < 0)) {
+      GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
+          seqnum, next_seqnum);
+      outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+      gst_buffer_unref (outbuf);
+      goto again;
+    }
+  } else {
+    GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
+    gap = -1;
+  }
 
   /* If we don't know what the next seqnum should be (== -1) we have to wait
    * because it might be possible that we are not receiving this buffer in-order,
@@ -912,17 +1124,37 @@ again:
    * determine if we have missing a packet. If we have a missing packet (which
    * must be before this packet) we can wait for it until the deadline for this
    * packet expires. */
-  if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) {
+  if (G_UNLIKELY (gap != 0 && out_time != -1)) {
     GstClockID id;
-    GstClockTimeDiff jitter;
+    GstClockTime sync_time;
     GstClockReturn ret;
     GstClock *clock;
+    GstClockTime duration = GST_CLOCK_TIME_NONE;
 
-    if (priv->next_seqnum != -1) {
-      /* we expected next_seqnum but received something else, that's a gap */
-      GST_DEBUG_OBJECT (jitterbuffer,
-          "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
-          seqnum);
+    if (gap > 0) {
+      /* we have a gap */
+      GST_WARNING_OBJECT (jitterbuffer,
+          "Sequence number GAP detected: expected %d instead of %d (%d missing)",
+          next_seqnum, seqnum, gap);
+
+      if (priv->last_out_time != -1) {
+        GST_DEBUG_OBJECT (jitterbuffer,
+            "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
+        /* interpolate between the current time and the last time based on
+         * number of packets we are missing, this is the estimated duration
+         * for the missing packet based on equidistant packet spacing. Also make
+         * sure we never go negative. */
+        if (out_time > priv->last_out_time)
+          duration = (out_time - priv->last_out_time) / (gap + 1);
+        else
+          goto lost;
+
+        GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (duration));
+        /* add this duration to the timestamp of the last packet we pushed */
+        out_time = (priv->last_out_time + duration);
+      }
     } else {
       /* we don't know what the next_seqnum should be, wait for the last
        * possible moment to push this buffer, maybe we get an earlier seqnum
@@ -930,31 +1162,6 @@ again:
       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
     }
 
-    /* get the max deadline to wait for the missing packets, this is the time
-     * of the currently popped packet */
-    rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
-
-    GST_DEBUG_OBJECT (jitterbuffer, "rtp_time %u, base %" G_GINT64_FORMAT,
-        rtp_time, priv->clock_base);
-
-    /* if no clock_base was given, take first ts as base */
-    if (priv->clock_base == -1)
-      priv->clock_base = rtp_time;
-
-    /* take rtp timestamp offset into account, this can wrap around */
-    rtp_time -= priv->clock_base;
-
-    /* bring timestamp to gst time */
-    timestamp = gst_util_uint64_scale (GST_SECOND, rtp_time, priv->clock_rate);
-
-    GST_DEBUG_OBJECT (jitterbuffer, "rtptime %u, timestamp %" GST_TIME_FORMAT,
-        rtp_time, GST_TIME_ARGS (timestamp));
-
-    /* bring to running time */
-    running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
-        timestamp);
-
-    /* correct for sync against the gstreamer clock, add latency */
     GST_OBJECT_LOCK (jitterbuffer);
     clock = GST_ELEMENT_CLOCK (jitterbuffer);
     if (!clock) {
@@ -963,101 +1170,127 @@ again:
       goto push_buffer;
     }
 
-    /* add latency */
-    running_time += (priv->latency_ms * GST_MSECOND);
-
-    GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (running_time));
+    GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (out_time));
 
     /* prepare for sync against clock */
-    running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time;
+    sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+    /* add latency, this includes our own latency and the peer latency. */
+    sync_time += (priv->latency_ms * GST_MSECOND);
+    sync_time += priv->peer_latency;
 
     /* create an entry for the clock */
-    id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time);
-    priv->waiting_seqnum = seqnum;
+    id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
     GST_OBJECT_UNLOCK (jitterbuffer);
 
     /* release the lock so that the other end can push stuff or unlock */
-    async_jitter_queue_unlock (priv->queue);
+    JBUF_UNLOCK (priv);
 
-    ret = gst_clock_id_wait (id, &jitter);
+    ret = gst_clock_id_wait (id, NULL);
 
-    async_jitter_queue_lock (priv->queue);
+    JBUF_LOCK (priv);
     /* and free the entry */
     gst_clock_id_unref (id);
     priv->clock_id = NULL;
-    priv->waiting_seqnum = -1;
 
     /* at this point, the clock could have been unlocked by a timeout, a new
      * tail element was added to the queue or because we are shutting down. Check
      * for shutdown first. */
-    if (priv->srcresult != GST_FLOW_OK)
-      goto flushing;
+    if G_UNLIKELY
+      ((priv->srcresult != GST_FLOW_OK))
+          goto flushing;
 
     /* if we got unscheduled and we are not flushing, it's because a new tail
      * element became available in the queue. Grab it and try to push or sync. */
     if (ret == GST_CLOCK_UNSCHEDULED) {
       GST_DEBUG_OBJECT (jitterbuffer,
           "Wait got unscheduled, will retry to push with new buffer");
-      /* reinserting popped buffer into queue */
-      if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf,
-              (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) {
-        GST_DEBUG_OBJECT (jitterbuffer,
-            "Duplicate packet #%d detected, dropping", seqnum);
-        priv->num_duplicates++;
-        gst_buffer_unref (outbuf);
+      goto again;
+    }
+
+  lost:
+    /* we now timed out, this means we lost a packet or finished synchronizing
+     * on the first buffer. */
+    if (gap > 0) {
+      GstEvent *event;
+
+      /* we had a gap and thus we lost a packet. Create an event for this.  */
+      GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
+      priv->num_late++;
+      discont = TRUE;
+
+      if (priv->do_lost) {
+        /* create paket lost event */
+        event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+            gst_structure_new ("GstRTPPacketLost",
+                "seqnum", G_TYPE_UINT, (guint) next_seqnum,
+                "timestamp", G_TYPE_UINT64, out_time,
+                "duration", G_TYPE_UINT64, duration, NULL));
+        gst_pad_push_event (priv->srcpad, event);
       }
+
+      /* update our expected next packet */
+      priv->last_popped_seqnum = next_seqnum;
+      priv->last_out_time = out_time;
+      priv->next_seqnum = (next_seqnum + 1) & 0xffff;
+      /* look for next packet */
       goto again;
     }
-  }
-push_buffer:
-  /* check if we are pushing something unexpected */
-  if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
-    gint dropped;
 
-    /* calc number of missing packets, careful for wraparounds */
-    dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
+    /* there was no known gap,just the first packet, exit the loop and push */
+    GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
 
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "Pushing DISCONT after dropping %d (%d to %d)", dropped,
-        priv->next_seqnum, seqnum);
+    /* get new timestamp, latency might have changed */
+    out_time = apply_offset (jitterbuffer, timestamp);
+  }
+push_buffer:
 
-    /* update stats */
-    priv->num_late += dropped;
+  /* when we get here we are ready to pop and push the buffer */
+  outbuf = rtp_jitter_buffer_pop (priv->jbuf);
 
-    /* set DISCONT flag */
-    outbuf = gst_buffer_make_metadata_writable (outbuf);
+  if (G_UNLIKELY (discont || priv->discont)) {
+    /* set DISCONT flag when we missed a packet. We pushed the buffer writable
+     * into the jitterbuffer so we can modify now. */
     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+    priv->discont = FALSE;
   }
+
+  /* apply timestamp with offset to buffer now */
+  GST_BUFFER_TIMESTAMP (outbuf) = out_time;
+
   /* now we are ready to push the buffer. Save the seqnum and release the lock
    * so the other end can push stuff in the queue again. */
   priv->last_popped_seqnum = seqnum;
+  priv->last_out_time = out_time;
   priv->next_seqnum = (seqnum + 1) & 0xffff;
-  async_jitter_queue_unlock (priv->queue);
+  JBUF_UNLOCK (priv);
 
   /* push buffer */
-  GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
+  GST_DEBUG_OBJECT (jitterbuffer,
+      "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
+      GST_TIME_ARGS (out_time));
   result = gst_pad_push (priv->srcpad, outbuf);
-  if (result != GST_FLOW_OK)
+  if (G_UNLIKELY (result != GST_FLOW_OK))
     goto pause;
 
   return;
 
   /* ERRORS */
-no_elem:
+do_eos:
   {
     /* store result, we are flushing now */
-    GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing");
-    priv->srcresult = GST_FLOW_WRONG_STATE;
+    GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
+    priv->srcresult = GST_FLOW_UNEXPECTED;
     gst_pad_pause_task (priv->srcpad);
-    async_jitter_queue_unlock (priv->queue);
+    gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
+    JBUF_UNLOCK (priv);
     return;
   }
 flushing:
   {
     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
-    gst_buffer_unref (outbuf);
-    async_jitter_queue_unlock (priv->queue);
+    gst_pad_pause_task (priv->srcpad);
+    JBUF_UNLOCK (priv);
     return;
   }
 pause:
@@ -1066,13 +1299,13 @@ pause:
 
     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
 
-    async_jitter_queue_lock (priv->queue);
+    JBUF_LOCK (priv);
     /* store result */
     priv->srcresult = result;
     /* we don't post errors or anything because upstream will do that for us
      * when we pass the return value upstream. */
     gst_pad_pause_task (priv->srcpad);
-    async_jitter_queue_unlock (priv->queue);
+    JBUF_UNLOCK (priv);
     return;
   }
 }
@@ -1080,8 +1313,8 @@ pause:
 static gboolean
 gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
 {
-  GstRTPJitterBuffer *jitterbuffer;
-  GstRTPJitterBufferPrivate *priv;
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
   gboolean res = FALSE;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
@@ -1094,32 +1327,43 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
        * own */
       GstClockTime min_latency, max_latency;
       gboolean us_live;
-      GstPad *peer;
+      GstClockTime our_latency;
+
+      if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
+        gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
 
-      if ((peer = gst_pad_get_peer (priv->sinkpad))) {
-        if ((res = gst_pad_query (peer, query))) {
-          gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
+        GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
+            GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
 
-          GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
-              GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+        /* store this so that we can safely sync on the peer buffers. */
+        JBUF_LOCK (priv);
+        priv->peer_latency = min_latency;
+        our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
+        JBUF_UNLOCK (priv);
 
-          min_latency += priv->latency_ms * GST_MSECOND;
-          max_latency += priv->latency_ms * GST_MSECOND;
+        GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (our_latency));
 
-          GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
-              GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+        /* we add some latency but can buffer an infinite amount of time */
+        min_latency += our_latency;
+        max_latency = -1;
 
-          gst_query_set_latency (query, TRUE, min_latency, max_latency);
-        }
-        gst_object_unref (peer);
+        GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
+            GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+
+        gst_query_set_latency (query, TRUE, min_latency, max_latency);
       }
       break;
     }
     default:
+      res = gst_pad_query_default (pad, query);
       break;
   }
+
+  gst_object_unref (jitterbuffer);
+
   return res;
 }
 
@@ -1127,36 +1371,53 @@ static void
 gst_rtp_jitter_buffer_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec)
 {
-  GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+  priv = jitterbuffer->priv;
 
   switch (prop_id) {
     case PROP_LATENCY:
     {
       guint new_latency, old_latency;
 
-      /* FIXME, not threadsafe */
       new_latency = g_value_get_uint (value);
-      old_latency = jitterbuffer->priv->latency_ms;
 
-      jitterbuffer->priv->latency_ms = new_latency;
-      if (jitterbuffer->priv->clock_rate != -1) {
-        async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue,
-            gst_util_uint64_scale_int (new_latency,
-                jitterbuffer->priv->clock_rate, 1000));
-      }
-      /* post message if latency changed, this will infor the parent pipeline
-       * that a latency reconfiguration is possible. */
+      JBUF_LOCK (priv);
+      old_latency = priv->latency_ms;
+      priv->latency_ms = new_latency;
+      JBUF_UNLOCK (priv);
+
+      /* post message if latency changed, this will inform the parent pipeline
+       * that a latency reconfiguration is possible/needed. */
       if (new_latency != old_latency) {
+        GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (new_latency * GST_MSECOND));
+
         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
       }
       break;
     }
     case PROP_DROP_ON_LATENCY:
-    {
-      jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value);
+      JBUF_LOCK (priv);
+      priv->drop_on_latency = g_value_get_boolean (value);
+      JBUF_UNLOCK (priv);
+      break;
+    case PROP_TS_OFFSET:
+      JBUF_LOCK (priv);
+      priv->ts_offset = g_value_get_int64 (value);
+      /* FIXME, we don't really have a method for signaling a timestamp
+       * DISCONT without also making this a data discont. */
+      /* priv->discont = TRUE; */
+      JBUF_UNLOCK (priv);
+      break;
+    case PROP_DO_LOST:
+      JBUF_LOCK (priv);
+      priv->do_lost = g_value_get_boolean (value);
+      JBUF_UNLOCK (priv);
       break;
-    }
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1167,17 +1428,50 @@ static void
 gst_rtp_jitter_buffer_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec)
 {
-  GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+  priv = jitterbuffer->priv;
 
   switch (prop_id) {
     case PROP_LATENCY:
-      g_value_set_uint (value, jitterbuffer->priv->latency_ms);
+      JBUF_LOCK (priv);
+      g_value_set_uint (value, priv->latency_ms);
+      JBUF_UNLOCK (priv);
       break;
     case PROP_DROP_ON_LATENCY:
-      g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency);
+      JBUF_LOCK (priv);
+      g_value_set_boolean (value, priv->drop_on_latency);
+      JBUF_UNLOCK (priv);
+      break;
+    case PROP_TS_OFFSET:
+      JBUF_LOCK (priv);
+      g_value_set_int64 (value, priv->ts_offset);
+      JBUF_UNLOCK (priv);
+      break;
+    case PROP_DO_LOST:
+      JBUF_LOCK (priv);
+      g_value_set_boolean (value, priv->do_lost);
+      JBUF_UNLOCK (priv);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
 }
+
+void
+gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime,
+    guint64 * timestamp)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer));
+
+  priv = buffer->priv;
+
+  JBUF_LOCK (priv);
+  rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp);
+  JBUF_UNLOCK (priv);
+}