gst/rtpmanager/gstrtpbin.c: Use lock to protect variable.
authorWim Taymans <wim.taymans@gmail.com>
Sun, 16 Sep 2007 19:40:31 +0000 (19:40 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:30 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_set_property),
(gst_rtp_bin_get_property):
Use lock to protect variable.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_class_init),
(gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_chain),
(convert_rtptime_to_gsttime), (gst_rtp_jitter_buffer_loop):
Reconstruct GST timestamp from RTP timestamps based on measured clock
skew and sync offset.
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_init),
(rtp_jitter_buffer_set_tail_changed),
(rtp_jitter_buffer_set_clock_rate),
(rtp_jitter_buffer_get_clock_rate), (calculate_skew),
(rtp_jitter_buffer_insert), (rtp_jitter_buffer_peek):
* gst/rtpmanager/rtpjitterbuffer.h:
Measure clock skew.
Add callback to be notfied when a new packet was inserted at the tail.
* gst/rtpmanager/rtpsource.c: (rtp_source_init),
(calculate_jitter), (rtp_source_send_rtp):
* gst/rtpmanager/rtpsource.h:
Remove clock skew detection, it's move to the jitterbuffer now.

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.h
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpsource.h

index eb028fb..cdbdaf6 100644 (file)
@@ -1183,7 +1183,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_LATENCY:
+      GST_RTP_BIN_LOCK (rtpbin);
       rtpbin->latency = g_value_get_uint (value);
+      GST_RTP_BIN_UNLOCK (rtpbin);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1201,7 +1203,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_LATENCY:
+      GST_RTP_BIN_LOCK (rtpbin);
       g_value_set_uint (value, rtpbin->latency);
+      GST_RTP_BIN_UNLOCK (rtpbin);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
index 327ff0a..57be42e 100644 (file)
@@ -320,7 +320,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
 
   GST_DEBUG_CATEGORY_INIT
-      (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+      (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
 }
 
 static void
@@ -453,6 +453,8 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
   if (priv->clock_rate <= 0)
     goto wrong_rate;
 
+  rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
+
   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
 
   /* gah, clock-base is uint. If we don't have a base, we will use the first
@@ -794,6 +796,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   GstRtpJitterBufferPrivate *priv;
   guint16 seqnum;
   GstFlowReturn ret = GST_FLOW_OK;
+  GstClockTime timestamp;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
@@ -811,10 +814,23 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
     if (priv->clock_rate == -1)
       goto not_negotiated;
+
+    rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
   }
 
+  /* take the timestamp of the buffer. This is the time when the packet was
+   * received and is used to calculate jitter and clock skew. We will adjust
+   * this timestamp with the smoothed value after processing it in the
+   * jitterbuffer. */
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  /* bring to running time */
+  timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
+      timestamp);
+
   seqnum = gst_rtp_buffer_get_seq (buffer);
-  GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
+  GST_DEBUG_OBJECT (jitterbuffer,
+      "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
+      GST_TIME_ARGS (timestamp));
 
   JBUF_LOCK_CHECK (priv, out_flushing);
   /* don't accept more data on EOS */
@@ -852,7 +868,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   /* now insert the packet into the queue in sorted order. This function returns
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
-  if (!rtp_jitter_buffer_insert (priv->jbuf, buffer))
+  if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp))
     goto duplicate;
 
   /* signal addition of new buffer */
@@ -926,6 +942,37 @@ duplicate:
   }
 }
 
+static GstClockTime
+convert_rtptime_to_gsttime (GstRtpJitterBuffer * jitterbuffer,
+    guint64 exttimestamp)
+{
+  GstClockTime timestamp;
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  /* construct a timestamp from the RTP timestamp now. We don't apply this
+   * timestamp to the outgoing buffer yet as the popped buffer might not be the
+   * one we need to push out right now. */
+  timestamp =
+      gst_util_uint64_scale_int (exttimestamp, GST_SECOND, priv->clock_rate);
+
+  /* apply first observed timestamp */
+  timestamp += priv->jbuf->base_time;
+
+  /* apply the current clock skew */
+  timestamp += priv->jbuf->skew;
+
+  /* apply the timestamp offset */
+  timestamp += priv->ts_offset;
+
+  /* add latency, this includes our own latency and the peer latency. */
+  timestamp += (priv->latency_ms * GST_MSECOND);
+  timestamp += priv->peer_latency;
+
+  return timestamp;
+}
+
 /**
  * This funcion will push out buffers on the source pad.
  *
@@ -942,9 +989,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
   guint16 seqnum;
   guint32 rtp_time;
   GstClockTime timestamp;
-  gint64 running_time;
   guint64 exttimestamp;
-  gint ts_offset_rtp;
 
   priv = jitterbuffer->priv;
 
@@ -968,19 +1013,29 @@ again:
 
   /* pop a buffer, we must have a buffer now */
   outbuf = rtp_jitter_buffer_pop (priv->jbuf);
-
   seqnum = gst_rtp_buffer_get_seq (outbuf);
 
-  /* get the max deadline to wait for the missing packets, this is the time
-   * of the currently popped packet */
+  /* construct extended RTP timestamp from packet */
   rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
   exttimestamp = gst_rtp_buffer_ext_timestamp (&priv->exttimestamp, rtp_time);
 
+  /* if no clock_base was given, take first ts as base */
+  if (priv->clock_base == -1) {
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp);
+    priv->clock_base = exttimestamp;
+  }
+  /* subtract the base clock time so that we start counting from 0 */
+  exttimestamp -= priv->clock_base;
+
   GST_DEBUG_OBJECT (jitterbuffer,
       "Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT
       ", now %d left", seqnum, rtp_time, exttimestamp,
       rtp_jitter_buffer_num_packets (priv->jbuf));
 
+  /* convert the RTP timestamp to a gstreamer timestamp. */
+  timestamp = convert_rtptime_to_gsttime (jitterbuffer, exttimestamp);
+
   /* If we don't know what the next seqnum should be (== -1) we have to wait
    * because it might be possible that we are not receiving this buffer in-order,
    * a buffer with a lower seqnum could arrive later and we want to push that
@@ -991,7 +1046,7 @@ again:
    * packet expires. */
   if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) {
     GstClockID id;
-    GstClockTimeDiff jitter;
+    GstClockTime sync_time;
     GstClockReturn ret;
     GstClock *clock;
 
@@ -1007,34 +1062,6 @@ again:
       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
     }
 
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "exttimestamp %" G_GUINT64_FORMAT ", base %" G_GINT64_FORMAT,
-        exttimestamp, priv->clock_base);
-
-    /* if no clock_base was given, take first ts as base */
-    if (priv->clock_base == -1) {
-      GST_DEBUG_OBJECT (jitterbuffer,
-          "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp);
-      priv->clock_base = exttimestamp;
-    }
-
-    /* take rtp timestamp offset into account, this should not wrap around since
-     * we are dealing with the extended timestamp here. */
-    exttimestamp -= priv->clock_base;
-
-    /* bring timestamp to gst time */
-    timestamp =
-        gst_util_uint64_scale_int (exttimestamp, GST_SECOND, priv->clock_rate);
-
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "exttimestamp %" G_GUINT64_FORMAT ", clock-rate %u, timestamp %"
-        GST_TIME_FORMAT, exttimestamp, priv->clock_rate,
-        GST_TIME_ARGS (timestamp));
-
-    /* bring to running time */
-    running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
-        timestamp);
-
     GST_OBJECT_LOCK (jitterbuffer);
     clock = GST_ELEMENT_CLOCK (jitterbuffer);
     if (!clock) {
@@ -1043,25 +1070,21 @@ again:
       goto push_buffer;
     }
 
-    /* add latency, this includes our own latency and the peer latency. */
-    running_time += (priv->latency_ms * GST_MSECOND);
-    running_time += priv->peer_latency;
-
-    GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (running_time));
+    GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (timestamp));
 
     /* prepare for sync against clock */
-    running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time;
+    sync_time = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time;
 
     /* create an entry for the clock */
-    id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time);
+    id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
     priv->waiting_seqnum = seqnum;
     GST_OBJECT_UNLOCK (jitterbuffer);
 
     /* release the lock so that the other end can push stuff or unlock */
     JBUF_UNLOCK (priv);
 
-    ret = gst_clock_id_wait (id, &jitter);
+    ret = gst_clock_id_wait (id, NULL);
 
     JBUF_LOCK (priv);
     /* and free the entry */
@@ -1080,8 +1103,9 @@ again:
     if (ret == GST_CLOCK_UNSCHEDULED) {
       GST_DEBUG_OBJECT (jitterbuffer,
           "Wait got unscheduled, will retry to push with new buffer");
-      /* reinsert popped buffer into queue */
-      if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) {
+      /* reinsert popped buffer into queue, no need to recalculate skew, we do
+       * that when inserting the buffer in the chain function */
+      if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf, -1)) {
         GST_DEBUG_OBJECT (jitterbuffer,
             "Duplicate packet #%d detected, dropping", seqnum);
         priv->num_duplicates++;
@@ -1089,6 +1113,9 @@ again:
       }
       goto again;
     }
+    /* After waiting, we might have a better estimate of skew, generate a new
+     * timestamp before pushing out the buffer */
+    timestamp = convert_rtptime_to_gsttime (jitterbuffer, exttimestamp);
   }
 push_buffer:
   /* check if we are pushing something unexpected */
@@ -1105,37 +1132,13 @@ push_buffer:
     /* update stats */
     priv->num_late += dropped;
 
-    /* set DISCONT flag */
+    /* set DISCONT flag when we missed a packet. */
     outbuf = gst_buffer_make_metadata_writable (outbuf);
     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
   }
 
-  /* apply the timestamp offset */
-  if (priv->ts_offset > 0)
-    ts_offset_rtp =
-        gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate,
-        GST_SECOND);
-  else if (priv->ts_offset < 0)
-    ts_offset_rtp =
-        -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate,
-        GST_SECOND);
-  else
-    ts_offset_rtp = 0;
-
-  if (ts_offset_rtp != 0) {
-    guint32 timestamp;
-
-    /* if the offset changed, mark with discont */
-    if (priv->ts_offset != priv->prev_ts_offset) {
-      GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp);
-      GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
-      priv->prev_ts_offset = priv->ts_offset;
-    }
-
-    timestamp = gst_rtp_buffer_get_timestamp (outbuf);
-    timestamp += ts_offset_rtp;
-    gst_rtp_buffer_set_timestamp (outbuf, timestamp);
-  }
+  /* apply timestamp to buffer now */
+  GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
 
   /* now we are ready to push the buffer. Save the seqnum and release the lock
    * so the other end can push stuff in the queue again. */
index c36a25c..7260e9e 100644 (file)
@@ -61,7 +61,20 @@ rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass)
 static void
 rtp_jitter_buffer_init (RTPJitterBuffer * jbuf)
 {
+  gint i;
+
   jbuf->packets = g_queue_new ();
+  jbuf->base_time = -1;
+  jbuf->base_rtptime = -1;
+  jbuf->ext_rtptime = -1;
+
+  for (i = 0; i < 100; i++) {
+    jbuf->window[i] = 0;
+  }
+  jbuf->window_pos = 0;
+  jbuf->window_filling = TRUE;
+  jbuf->window_min = 0;
+  jbuf->skew = 0;
 }
 
 static void
@@ -94,6 +107,168 @@ rtp_jitter_buffer_new (void)
   return jbuf;
 }
 
+void
+rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer * jbuf, RTPTailChanged func,
+    gpointer user_data)
+{
+  g_return_if_fail (jbuf != NULL);
+
+  jbuf->tail_changed = func;
+  jbuf->user_data = user_data;
+}
+
+void
+rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, gint clock_rate)
+{
+  g_return_if_fail (jbuf != NULL);
+
+  jbuf->clock_rate = clock_rate;
+}
+
+gint
+rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf)
+{
+  g_return_val_if_fail (jbuf != NULL, 0);
+
+  return jbuf->clock_rate;
+}
+
+
+/* For the clock skew we use a windowed low point averaging algorithm as can be
+ * found in http://www.grame.fr/pub/TR-050601.pdf. The idea is that the jitter is
+ * composed of:
+ *
+ *  J = N + n
+ *
+ *   N   : a constant network delay.
+ *   n   : random added noise. The noise is concentrated around 0
+ *
+ * In the receiver we can track the elapsed time at the sender with:
+ *
+ *  send_diff(i) = (Tsi - Ts0);
+ *
+ *   Tsi : The time at the sender at packet i
+ *   Ts0 : The time at the sender at the first packet
+ *
+ * This is the difference between the RTP timestamp in the first received packet
+ * and the current packet.
+ *
+ * At the receiver we have to deal with the jitter introduced by the network.
+ *
+ *  recv_diff(i) = (Tri - Tr0)
+ *
+ *   Tri : The time at the receiver at packet i
+ *   Tr0 : The time at the receiver at the first packet
+ *
+ * Both of these values contain a jitter Ji, a jitter for packet i, so we can
+ * write:
+ *
+ *  recv_diff(i) = (Cri + D + ni) - (Cr0 + D + n0))
+ *
+ *    Cri    : The time of the clock at the receiver for packet i
+ *    D + ni : The jitter when receiving packet i
+ *
+ * We see that the network delay is irrelevant here as we can elliminate D:
+ *
+ *  recv_diff(i) = (Cri + ni) - (Cr0 + n0))
+ *
+ * The drift is now expressed as:
+ *
+ *  Drift(i) = recv_diff(i) - send_diff(i);
+ *
+ * We now keep the W latest values of Drift and find the minimum (this is the
+ * one with the lowest network jitter and thus the one which is least affected
+ * by it). We average this lowest value to smooth out the resulting network skew.
+ *
+ * Both the window and the weighting used for averaging influence the accuracy
+ * of the drift estimation. Finding the correct parameters turns out to be a
+ * compromise between accuracy and inertia.
+ */
+static void
+calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time)
+{
+  guint64 ext_rtptime;
+  guint64 send_diff, recv_diff;
+  gint64 delta;
+  gint64 old;
+  gint pos, i;
+  GstClockTime gstrtptime;
+
+  ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime);
+
+  gstrtptime =
+      gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate);
+
+  /* first time, lock on to time and gstrtptime */
+  if (jbuf->base_time == -1)
+    jbuf->base_time = time;
+  if (jbuf->base_rtptime == -1)
+    jbuf->base_rtptime = gstrtptime;
+
+  /* elapsed time at sender */
+  send_diff = gstrtptime - jbuf->base_rtptime;
+  /* elapsed time at receiver, includes the jitter */
+  recv_diff = time - jbuf->base_time;
+
+  /* measure the diff */
+  delta = ((gint64) recv_diff) - ((gint64) send_diff);
+
+  pos = jbuf->window_pos;
+
+  if (jbuf->window_filling) {
+    /* we are filling the window */
+    GST_DEBUG ("filling %d %" G_GINT64_FORMAT, pos, delta);
+    jbuf->window[pos++] = delta;
+    /* calc the min delta we observed */
+    if (pos == 1 || delta < jbuf->window_min)
+      jbuf->window_min = delta;
+
+    if (pos >= 100) {
+      /* window filled, fill window with min */
+      GST_DEBUG ("min %" G_GINT64_FORMAT, jbuf->window_min);
+      for (i = 0; i < 100; i++)
+        jbuf->window[i] = jbuf->window_min;
+
+      /* the skew is initially the min */
+      jbuf->skew = jbuf->window_min;
+      jbuf->window_filling = FALSE;
+    }
+  } else {
+    /* pick old value and store new value. We keep the previous value in order
+     * to quickly check if the min of the window changed */
+    old = jbuf->window[pos];
+    jbuf->window[pos++] = delta;
+
+    if (delta <= jbuf->window_min) {
+      /* if the new value we inserted is smaller or equal to the current min,
+       * it becomes the new min */
+      jbuf->window_min = delta;
+    } else if (old == jbuf->window_min) {
+      gint64 min = G_MAXINT64;
+
+      /* if we removed the old min, we have to find a new min */
+      for (i = 0; i < 100; i++) {
+        /* we found another value equal to the old min, we can stop searching now */
+        if (jbuf->window[i] == old) {
+          min = old;
+          break;
+        }
+        if (jbuf->window[i] < min)
+          min = jbuf->window[i];
+      }
+      jbuf->window_min = min;
+    }
+    /* average the min values */
+    jbuf->skew = (jbuf->window_min + (15 * jbuf->skew)) / 16;
+    GST_DEBUG ("new min: %" G_GINT64_FORMAT ", skew %" G_GINT64_FORMAT,
+        jbuf->window_min, jbuf->skew);
+  }
+  /* wrap around in the window */
+  if (pos >= 100)
+    pos = 0;
+  jbuf->window_pos = pos;
+}
+
 static gint
 compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
 {
@@ -115,6 +290,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
  * rtp_jitter_buffer_insert:
  * @jbuf: an #RTPJitterBuffer
  * @buf: a buffer
+ * @time: a timestamp when this buffer was received in nanoseconds
  *
  * Inserts @buf into the packet queue of @jbuf. The sequence number of the
  * packet will be used to sort the packets. This function takes ownerhip of
@@ -123,10 +299,12 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
  * Returns: %FALSE if a packet with the same number already existed.
  */
 gboolean
-rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf)
+rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
+    GstClockTime time)
 {
   GList *list;
   gint func_ret = 1;
+  guint32 rtptime;
 
   g_return_val_if_fail (jbuf != NULL, FALSE);
   g_return_val_if_fail (buf != NULL, FALSE);
@@ -142,11 +320,23 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf)
   if (func_ret == 0)
     return FALSE;
 
+  /* do skew calculation by measuring the difference between rtptime and the
+   * receive time */
+  if (time != -1) {
+    rtptime = gst_rtp_buffer_get_timestamp (buf);
+    calculate_skew (jbuf, rtptime, time);
+  }
+
   if (list)
     g_queue_insert_before (jbuf->packets, list, buf);
-  else
+  else {
     g_queue_push_tail (jbuf->packets, buf);
 
+    /* tail buffer changed, signal callback */
+    if (jbuf->tail_changed)
+      jbuf->tail_changed (jbuf, jbuf->user_data);
+  }
+
   return TRUE;
 }
 
@@ -171,6 +361,28 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
 }
 
 /**
+ * rtp_jitter_buffer_peek:
+ * @jbuf: an #RTPJitterBuffer
+ *
+ * Peek the oldest buffer from the packet queue of @jbuf. Register a callback
+ * with rtp_jitter_buffer_set_tail_changed() to be notified when an older packet
+ * was inserted in the queue.
+ *
+ * Returns: a #GstBuffer or %NULL when there was no packet in the queue.
+ */
+GstBuffer *
+rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf)
+{
+  GstBuffer *buf;
+
+  g_return_val_if_fail (jbuf != NULL, FALSE);
+
+  buf = g_queue_peek_tail (jbuf->packets);
+
+  return buf;
+}
+
+/**
  * rtp_jitter_buffer_flush:
  * @jbuf: an #RTPJitterBuffer
  *
index 8bff03c..b67e265 100644 (file)
@@ -35,14 +35,38 @@ typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
 #define RTP_JITTER_BUFFER_CAST(src)        ((RTPJitterBuffer *)(src))
 
 /**
+ * RTPTailChanged:
+ * @jbuf: an #RTPJitterBuffer
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when the tail buffer of @jbuf changed.
+ */
+typedef void (*RTPTailChanged) (RTPJitterBuffer *jbuf, gpointer user_data);
+
+/**
  * RTPJitterBuffer:
  *
  * A JitterBuffer in the #RTPSession
  */
 struct _RTPJitterBuffer {
-  GObject       object;
+  GObject        object;
+
+  GQueue        *packets;
 
-  GQueue       *packets;
+  gint           clock_rate;
+
+  /* for calculating skew */
+  GstClockTime   base_time;
+  GstClockTime   base_rtptime;
+  guint64        ext_rtptime;
+  gint64         window[100];
+  guint          window_pos;
+  gboolean       window_filling;
+  gint64         window_min;
+  gint64         skew;
+
+  RTPTailChanged tail_changed;
+  gpointer       user_data;
 };
 
 struct _RTPJitterBufferClass {
@@ -52,14 +76,20 @@ struct _RTPJitterBufferClass {
 GType rtp_jitter_buffer_get_type (void);
 
 /* managing lifetime */
-RTPJitterBuffer*      rtp_jitter_buffer_new            (void);
+RTPJitterBuffer*      rtp_jitter_buffer_new              (void);
+
+void                  rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer *jbuf, RTPTailChanged func, 
+                                                          gpointer user_data);
+
+void                  rtp_jitter_buffer_set_clock_rate   (RTPJitterBuffer *jbuf, gint clock_rate);
+gint                  rtp_jitter_buffer_get_clock_rate   (RTPJitterBuffer *jbuf);
 
-gboolean              rtp_jitter_buffer_insert         (RTPJitterBuffer *jbuf, GstBuffer *buf);
-GstBuffer *           rtp_jitter_buffer_pop            (RTPJitterBuffer *jbuf);
+gboolean              rtp_jitter_buffer_insert           (RTPJitterBuffer *jbuf, GstBuffer *buf, GstClockTime time);
+GstBuffer *           rtp_jitter_buffer_pop              (RTPJitterBuffer *jbuf);
 
-void                  rtp_jitter_buffer_flush          (RTPJitterBuffer *jbuf);
+void                  rtp_jitter_buffer_flush            (RTPJitterBuffer *jbuf);
 
-guint                 rtp_jitter_buffer_num_packets    (RTPJitterBuffer *jbuf);
-guint32               rtp_jitter_buffer_get_ts_diff    (RTPJitterBuffer *jbuf);
+guint                 rtp_jitter_buffer_num_packets      (RTPJitterBuffer *jbuf);
+guint32               rtp_jitter_buffer_get_ts_diff      (RTPJitterBuffer *jbuf);
 
 #endif /* __RTP_JITTER_BUFFER_H__ */
index c415247..4ffc6bb 100644 (file)
@@ -69,9 +69,6 @@ rtp_source_init (RTPSource * src)
   src->payload = 0;
   src->clock_rate = -1;
   src->clock_base = -1;
-  src->skew_base_ntpnstime = -1;
-  src->ext_rtptime = -1;
-  src->prev_ext_rtptime = -1;
   src->packets = g_queue_new ();
   src->seqnum_base = -1;
   src->last_rtptime = -1;
@@ -266,18 +263,20 @@ get_clock_rate (RTPSource * src, guint8 payload)
   return src->clock_rate;
 }
 
+/* Jitter is the variation in the delay of received packets in a flow. It is
+ * measured by comparing the interval when RTP packets were sent to the interval
+ * at which they were received. For instance, if packet #1 and packet #2 leave
+ * 50 milliseconds apart and arrive 60 milliseconds apart, then the jitter is 10
+ * milliseconds. */
 static void
 calculate_jitter (RTPSource * src, GstBuffer * buffer,
     RTPArrivalStats * arrival)
 {
   guint64 ntpnstime;
   guint32 rtparrival, transit, rtptime;
-  guint64 ext_rtptime;
   gint32 diff;
   gint clock_rate;
   guint8 pt;
-  guint64 rtpdiff, ntpdiff;
-  gint64 skew;
 
   /* get arrival time */
   if ((ntpnstime = arrival->ntpnstime) == GST_CLOCK_TIME_NONE)
@@ -291,50 +290,12 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
 
   rtptime = gst_rtp_buffer_get_timestamp (buffer);
 
-  /* convert to extended timestamp right away */
-  ext_rtptime = gst_rtp_buffer_ext_timestamp (&src->ext_rtptime, rtptime);
-
   /* no clock-base, take first rtptime as base */
   if (src->clock_base == -1) {
     GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime);
     src->clock_base = rtptime;
   }
 
-  if (src->skew_base_ntpnstime == -1) {
-    /* lock on first observed NTP and RTP time, they should increment in-sync or
-     * we have a clock skew. */
-    GST_DEBUG ("using base_ntpnstime of %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (ntpnstime));
-    src->skew_base_ntpnstime = ntpnstime;
-    src->skew_base_rtptime = rtptime;
-    src->prev_ext_rtptime = ext_rtptime;
-    src->avg_skew = 0;
-  } else if (src->prev_ext_rtptime < ext_rtptime) {
-    /* get elapsed rtptime but only when the previous rtptime was stricly smaller
-     * than the new one. */
-    rtpdiff = ext_rtptime - src->skew_base_rtptime;
-    /* get NTP diff and convert to RTP time, this is always positive */
-    ntpdiff = ntpnstime - src->skew_base_ntpnstime;
-    ntpdiff = gst_util_uint64_scale_int (ntpdiff, clock_rate, GST_SECOND);
-
-    /* see how the NTP and RTP relate any deviation from 0 means that they drift
-     * out of sync and we must compensate. */
-    skew = ntpdiff - rtpdiff;
-    /* average out the skew to get a smooth value. */
-    src->avg_skew = (63 * src->avg_skew + skew) / 64;
-
-    GST_DEBUG ("new skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew,
-        src->avg_skew);
-    /* store previous extended timestamp */
-    src->prev_ext_rtptime = ext_rtptime;
-  }
-  if (src->avg_skew != 0) {
-    /* patch the buffer RTP timestamp with the skew */
-    GST_DEBUG ("skew timestamp RTP %" G_GUINT32_FORMAT " -> %" G_GINT64_FORMAT,
-        rtptime, rtptime + src->avg_skew);
-    gst_rtp_buffer_set_timestamp (buffer, rtptime + src->avg_skew);
-  }
-
   /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
    * care about the absolute value, just the difference. */
   rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND);
@@ -603,7 +564,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
       /* the SSRC of the packet is not correct, make a writable buffer and
        * update the SSRC. This could involve a complete copy of the packet when
        * it is not writable. Usually the payloader will use caps negotiation to
-       * get the correct SSRC. */
+       * get the correct SSRC from the session manager before pushing anything. */
       buffer = gst_buffer_make_writable (buffer);
 
       GST_WARNING ("updating SSRC from %08x to %08x, fix the payloader", ssrc,
@@ -614,7 +575,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
         src->stats.packets_sent);
     result = src->callbacks.push_rtp (src, buffer, src->user_data);
   } else {
-    GST_DEBUG ("no callback installed");
+    GST_WARNING ("no callback installed, dropping packet");
     gst_buffer_unref (buffer);
   }
 
index be79346..1952a4c 100644 (file)
@@ -137,16 +137,8 @@ struct _RTPSource {
   GstCaps      *caps;
   gint          clock_rate;
   gint32        seqnum_base;
-
   gint64        clock_base;
 
-  /* to calculate the clock skew */
-  guint64       skew_base_ntpnstime;
-  guint64       skew_base_rtptime;
-  gint64        avg_skew;
-  guint64       ext_rtptime;
-  guint64       prev_ext_rtptime;
-
   GstClockTime  bye_time;
   GstClockTime  last_activity;
   GstClockTime  last_rtp_activity;