gst/rtpmanager/gstrtpbin.*: Add signal to notify listeners when a sender becomes...
authorWim Taymans <wim.taymans@gmail.com>
Fri, 5 Sep 2008 13:52:34 +0000 (13:52 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:37 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (on_sender_timeout),
(create_session), (gst_rtp_bin_associate),
(gst_rtp_bin_sync_chain), (gst_rtp_bin_class_init),
(gst_rtp_bin_request_new_pad):
* gst/rtpmanager/gstrtpbin.h:
Add signal to notify listeners when a sender becomes a receiver.
Tweak lip-sync code, don't store our own copy of the ts-offset of the
jitterbuffer, don't adjust sync if the change is less than 4msec.
Get the RTP timestamp <-> GStreamer timestamp relation directly from
the jitterbuffer instead of our inaccurate version from the source.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop),
(gst_rtp_jitter_buffer_get_sync):
* gst/rtpmanager/gstrtpjitterbuffer.h:
Add G_LIKELY macros, use global defines for max packet reorder and
dropouts.
Reset the jitterbuffer clock skew detection when packets seqnums are
changed unexpectedly.
* gst/rtpmanager/gstrtpsession.c: (on_sender_timeout),
(gst_rtp_session_class_init), (gst_rtp_session_init):
* gst/rtpmanager/gstrtpsession.h:
Add sender timeout signal.
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew),
(calculate_skew), (rtp_jitter_buffer_insert),
(rtp_jitter_buffer_get_sync):
* gst/rtpmanager/rtpjitterbuffer.h:
Add some G_LIKELY macros.
Keep track of the extended RTP timestamp so that we can report the RTP
timestamp <-> GStreamer timestamp relation for lip-sync.
Remove server timestamp gap detection code, the server can sometimes
make a huge gap in timestamps (talk spurts,...) see #549774.
Detect timetamp weirdness instead by observing the sender/receiver
timestamp relation and resync if it changes more than 1 second.
Add method to report about the current rtp <-> gst timestamp relation
which is needed for lip-sync.
* gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
(on_sender_timeout), (check_collision), (rtp_session_process_sr),
(session_cleanup):
* gst/rtpmanager/rtpsession.h:
Add sender timeout signal.
Remove inaccurate rtp <-> gst timestamp relation code, the
jitterbuffer can now do an accurate reporting about this.
* gst/rtpmanager/rtpsource.c: (rtp_source_init),
(rtp_source_update_caps), (calculate_jitter),
(rtp_source_process_rtp):
* gst/rtpmanager/rtpsource.h:
Remove inaccurate rtp <-> gst timestamp relation code.
* gst/rtpmanager/rtpstats.h:
Define global max-reorder and max-dropout constants for use in various
subsystems.

13 files changed:
gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpbin.h
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpjitterbuffer.h
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/gstrtpsession.h
gst/rtpmanager/rtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.h
gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpsource.h
gst/rtpmanager/rtpstats.h

index 46ef4bb..7f402c3 100644 (file)
 #include "gstrtpbin-marshal.h"
 #include "gstrtpbin.h"
 #include "gstrtpsession.h"
+#include "gstrtpjitterbuffer.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
 #define GST_CAT_DEFAULT gst_rtp_bin_debug
@@ -236,6 +237,7 @@ enum
   SIGNAL_ON_BYE_SSRC,
   SIGNAL_ON_BYE_TIMEOUT,
   SIGNAL_ON_TIMEOUT,
+  SIGNAL_ON_SENDER_TIMEOUT,
   LAST_SIGNAL
 };
 
@@ -323,7 +325,6 @@ struct _GstRtpBinStream
   guint64 clock_base_time;
   gint clock_rate;
   gint64 ts_offset;
-  gint64 prev_ts_offset;
   gint last_pt;
 };
 
@@ -455,6 +456,13 @@ on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
       sess->id, ssrc);
 }
 
+static void
+on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
+{
+  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
+      sess->id, ssrc);
+}
+
 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
 static GstRtpBinSession *
 create_session (GstRtpBin * rtpbin, gint id)
@@ -507,6 +515,8 @@ create_session (GstRtpBin * rtpbin, gint id)
   g_signal_connect (sess->session, "on-bye-timeout",
       (GCallback) on_bye_timeout, sess);
   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
+  g_signal_connect (sess->session, "on-sender-timeout",
+      (GCallback) on_sender_timeout, sess);
 
   /* FIXME, change state only to what's needed */
   gst_bin_add (GST_BIN_CAST (rtpbin), session);
@@ -863,32 +873,31 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
     /* calculate offsets for each stream */
     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
-
-      if (ostream->unix_delta == 0)
-        continue;
+      gint64 prev_ts_offset;
 
       ostream->ts_offset = ostream->unix_delta - min;
 
+      g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL);
+
       /* delta changed, see how much */
-      if (ostream->prev_ts_offset != ostream->ts_offset) {
+      if (prev_ts_offset != ostream->ts_offset) {
         gint64 diff;
 
-        if (ostream->prev_ts_offset > ostream->ts_offset)
-          diff = ostream->prev_ts_offset - ostream->ts_offset;
+        if (prev_ts_offset > ostream->ts_offset)
+          diff = prev_ts_offset - ostream->ts_offset;
         else
-          diff = ostream->ts_offset - ostream->prev_ts_offset;
+          diff = ostream->ts_offset - prev_ts_offset;
 
         GST_DEBUG_OBJECT (bin,
             "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT
-            ", diff: %" G_GINT64_FORMAT, ostream->ts_offset,
-            ostream->prev_ts_offset, diff);
+            ", diff: %" G_GINT64_FORMAT, ostream->ts_offset, prev_ts_offset,
+            diff);
 
-        /* only change diff when it changed more than 1 millisecond. This
+        /* only change diff when it changed more than 4 milliseconds. This
          * compensates for rounding errors in NTP to RTP timestamp
          * conversions */
-        if (diff > GST_MSECOND && diff < (3 * GST_SECOND)) {
+        if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) {
           g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL);
-          ostream->prev_ts_offset = ostream->ts_offset;
         }
       }
       GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
@@ -937,8 +946,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
   gboolean have_sr, have_sdes;
   gboolean more;
   guint64 clock_base;
-
-  clock_base = GST_BUFFER_OFFSET (buffer);
+  guint64 clock_base_time;
 
   stream = gst_pad_get_element_private (pad);
   bin = stream->bin;
@@ -948,6 +956,12 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
   if (!gst_rtcp_buffer_validate (buffer))
     goto invalid_rtcp;
 
+  /* get the last relation between the rtp timestamps and the gstreamer
+   * timestamps. We get this info directly from the jitterbuffer which
+   * constructs gstreamer timestamps from rtp timestamps */
+  gst_rtp_jitter_buffer_get_sync (GST_RTP_JITTER_BUFFER (stream->buffer),
+      &clock_base, &clock_base_time);
+
   /* clock base changes when there is a huge gap in the timestamps or seqnum.
    * When this happens we don't want to calculate the extended timestamp based
    * on the previous one but reset the calculation. */
@@ -1008,7 +1022,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
 
             if (type == GST_RTCP_SDES_CNAME) {
               stream->clock_base = clock_base;
-              stream->clock_base_time = GST_BUFFER_OFFSET_END (buffer);
+              stream->clock_base_time = clock_base_time;
               /* associate the stream to CNAME */
               gst_rtp_bin_associate (bin, stream, len, data);
             }
@@ -1328,6 +1342,19 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
       G_TYPE_UINT, G_TYPE_UINT);
+  /**
+   * GstRtpBin::on-sender-timeout:
+   * @rtpbin: the object which received the signal
+   * @session: the session
+   * @ssrc: the SSRC 
+   *
+   * Notify of a sender SSRC that has timed out and became a receiver
+   */
+  gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
+      g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
+      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
+      G_TYPE_UINT, G_TYPE_UINT);
 
   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
       g_param_spec_string ("sdes-cname", "SDES CNAME",
@@ -2332,6 +2359,7 @@ gst_rtp_bin_request_new_pad (GstElement * element,
   GstRtpBin *rtpbin;
   GstElementClass *klass;
   GstPad *result;
+
   gchar *pad_name = NULL;
 
   g_return_val_if_fail (templ != NULL, NULL);
index 898b6db..7ef605d 100644 (file)
@@ -74,6 +74,7 @@ struct _GstRtpBinClass {
   void     (*on_bye_ssrc)       (GstRtpBin *rtpbin, guint session, guint32 ssrc);
   void     (*on_bye_timeout)    (GstRtpBin *rtpbin, guint session, guint32 ssrc);
   void     (*on_timeout)        (GstRtpBin *rtpbin, guint session, guint32 ssrc);
+  void     (*on_sender_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
 };
 
 GType gst_rtp_bin_get_type (void);
index b9b1569..d48bc40 100644 (file)
@@ -65,6 +65,7 @@
 
 #include "gstrtpjitterbuffer.h"
 #include "rtpjitterbuffer.h"
+#include "rtpstats.h"
 
 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
@@ -108,7 +109,7 @@ enum
 
 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
   JBUF_LOCK (priv);                                   \
-  if (priv->srcresult != GST_FLOW_OK)                 \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
     goto label;                                       \
 } G_STMT_END
 
@@ -117,7 +118,7 @@ enum
 
 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
   JBUF_WAIT(priv);                                    \
-  if (priv->srcresult != GST_FLOW_OK)                 \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
     goto label;                                       \
 } G_STMT_END
 
@@ -830,12 +831,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
 
-  if (!gst_rtp_buffer_validate (buffer))
+  if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
     goto invalid_buffer;
 
   priv = jitterbuffer->priv;
 
-  if (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer)) {
+  if (G_UNLIKELY (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer))) {
     GstCaps *caps;
 
     priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
@@ -848,14 +849,14 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
     }
   }
 
-  if (priv->clock_rate == -1) {
+  if (G_UNLIKELY (priv->clock_rate == -1)) {
     guint8 pt;
 
     /* no clock rate given on the caps, try to get one with the signal */
     pt = gst_rtp_buffer_get_payload_type (buffer);
 
     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
-    if (priv->clock_rate == -1)
+    if (G_UNLIKELY (priv->clock_rate == -1))
       goto not_negotiated;
   }
 
@@ -875,35 +876,42 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
 
   JBUF_LOCK_CHECK (priv, out_flushing);
   /* don't accept more data on EOS */
-  if (priv->eos)
+  if (G_UNLIKELY (priv->eos))
     goto have_eos;
 
   /* let's check if this buffer is too late, we can only accept packets with
    * bigger seqnum than the one we last pushed. */
-  if (priv->last_popped_seqnum != -1) {
+  if (G_LIKELY (priv->last_popped_seqnum != -1)) {
     gint gap;
+    gboolean reset = FALSE;
 
     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
 
-    if (gap <= 0) {
+    if (G_UNLIKELY (gap <= 0)) {
       /* priv->last_popped_seqnum >= seqnum, this packet is too late or the
        * sender might have been restarted with different seqnum. */
-      if (gap < -100) {
+      if (gap < -RTP_MAX_MISORDER) {
         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
-        priv->last_popped_seqnum = -1;
-        priv->next_seqnum = -1;
+        reset = TRUE;
       } else {
         goto too_late;
       }
     } else {
       /* priv->last_popped_seqnum < seqnum, this is a new packet */
-      if (gap > 3000) {
+      if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
             gap);
-        priv->last_popped_seqnum = -1;
-        priv->next_seqnum = -1;
+        reset = TRUE;
+      } else {
+        GST_DEBUG_OBJECT (jitterbuffer, "dropped packets %d but <= %d", gap,
+            RTP_MAX_DROPOUT);
       }
     }
+    if (G_UNLIKELY (reset)) {
+      priv->last_popped_seqnum = -1;
+      priv->next_seqnum = -1;
+      rtp_jitter_buffer_reset_skew (priv->jbuf);
+    }
   }
 
   /* let's drop oldest packet if the queue is already full and drop-on-latency
@@ -915,7 +923,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
     latency_ts =
         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
 
-    if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
+    if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
       GstBuffer *old_buf;
 
       old_buf = rtp_jitter_buffer_pop (priv->jbuf);
@@ -934,8 +942,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   /* now insert the packet into the queue in sorted order. This function returns
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
-  if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
-          priv->clock_rate, &tail))
+  if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
+              priv->clock_rate, &tail)))
     goto duplicate;
 
   /* signal addition of new buffer when the _loop is waiting. */
@@ -944,7 +952,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
 
   /* let's unschedule and unblock any waiting buffers. We only want to do this
    * when the tail buffer changed */
-  if (priv->clock_id && tail) {
+  if (G_UNLIKELY (priv->clock_id && tail)) {
     GST_DEBUG_OBJECT (jitterbuffer,
         "Unscheduling waiting buffer, new tail buffer");
     gst_clock_id_unschedule (priv->clock_id);
@@ -1051,12 +1059,12 @@ again:
   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
   while (TRUE) {
     /* always wait if we are blocked */
-    if (!priv->blocked) {
+    if (G_LIKELY (!priv->blocked)) {
       /* if we have a packet, we can exit the loop and grab it */
       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
         break;
       /* no packets but we are EOS, do eos logic */
-      if (priv->eos)
+      if (G_UNLIKELY (priv->eos))
         goto do_eos;
     }
     /* underrun, wait for packets or flushing now */
@@ -1091,12 +1099,12 @@ again:
 
   /* get the gap between this and the previous packet. If we don't know the
    * previous packet seqnum assume no gap. */
-  if (next_seqnum != -1) {
+  if (G_LIKELY (next_seqnum != -1)) {
     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
 
     /* if we have a packet that we already pushed or considered dropped, pop it
      * off and get the next packet */
-    if (gap < 0) {
+    if (G_UNLIKELY (gap < 0)) {
       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
           seqnum, next_seqnum);
       outbuf = rtp_jitter_buffer_pop (priv->jbuf);
@@ -1116,7 +1124,7 @@ again:
    * determine if we have missing a packet. If we have a missing packet (which
    * must be before this packet) we can wait for it until the deadline for this
    * packet expires. */
-  if (gap != 0 && out_time != -1) {
+  if (G_UNLIKELY (gap != 0 && out_time != -1)) {
     GstClockID id;
     GstClockTime sync_time;
     GstClockReturn ret;
@@ -1188,8 +1196,9 @@ again:
     /* at this point, the clock could have been unlocked by a timeout, a new
      * tail element was added to the queue or because we are shutting down. Check
      * for shutdown first. */
-    if (priv->srcresult != GST_FLOW_OK)
-      goto flushing;
+    if G_UNLIKELY
+      ((priv->srcresult != GST_FLOW_OK))
+          goto flushing;
 
     /* if we got unscheduled and we are not flushing, it's because a new tail
      * element became available in the queue. Grab it and try to push or sync. */
@@ -1239,7 +1248,7 @@ push_buffer:
   /* when we get here we are ready to pop and push the buffer */
   outbuf = rtp_jitter_buffer_pop (priv->jbuf);
 
-  if (discont || priv->discont) {
+  if (G_UNLIKELY (discont || priv->discont)) {
     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
      * into the jitterbuffer so we can modify now. */
     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
@@ -1261,7 +1270,7 @@ push_buffer:
       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
       GST_TIME_ARGS (out_time));
   result = gst_pad_push (priv->srcpad, outbuf);
-  if (result != GST_FLOW_OK)
+  if (G_UNLIKELY (result != GST_FLOW_OK))
     goto pause;
 
   return;
@@ -1451,3 +1460,18 @@ gst_rtp_jitter_buffer_get_property (GObject * object,
       break;
   }
 }
+
+void
+gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime,
+    guint64 * timestamp)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer));
+
+  priv = buffer->priv;
+
+  JBUF_LOCK (priv);
+  rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp);
+  JBUF_UNLOCK (priv);
+}
index 290aee0..15185a2 100644 (file)
@@ -79,6 +79,9 @@ struct _GstRtpJitterBufferClass
 
 GType gst_rtp_jitter_buffer_get_type (void);
 
+void        gst_rtp_jitter_buffer_get_sync            (GstRtpJitterBuffer *buffer,
+                                                       guint64 *rtptime, guint64 *timestamp);
+
 G_END_DECLS
 
 #endif /* __GST_RTP_JITTER_BUFFER_H__ */
index cc794b6..e78e972 100644 (file)
@@ -193,6 +193,7 @@ enum
   SIGNAL_ON_BYE_SSRC,
   SIGNAL_ON_BYE_TIMEOUT,
   SIGNAL_ON_TIMEOUT,
+  SIGNAL_ON_SENDER_TIMEOUT,
   LAST_SIGNAL
 };
 
@@ -416,6 +417,13 @@ on_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
       src->ssrc);
 }
 
+static void
+on_sender_timeout (RTPSession * session, RTPSource * src, GstRtpSession * sess)
+{
+  g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
+      src->ssrc);
+}
+
 GST_BOILERPLATE (GstRtpSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
 
 static void
@@ -574,6 +582,18 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+  /**
+   * GstRtpSession::on-sender-timeout:
+   * @sess: the object which received the signal
+   * @ssrc: the SSRC 
+   *
+   * Notify of a sender SSRC that has timed out and became a receiver
+   */
+  gst_rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
+      g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass,
+          on_sender_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT,
+      G_TYPE_NONE, 1, G_TYPE_UINT);
 
   g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
       g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
@@ -655,6 +675,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
   rtpsession->priv->lock = g_mutex_new ();
   rtpsession->priv->sysclock = gst_system_clock_obtain ();
   rtpsession->priv->session = rtp_session_new ();
+
   /* configure callbacks */
   rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
   /* configure signals */
@@ -674,6 +695,8 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
       (GCallback) on_bye_timeout, rtpsession);
   g_signal_connect (rtpsession->priv->session, "on-timeout",
       (GCallback) on_timeout, rtpsession);
+  g_signal_connect (rtpsession->priv->session, "on-sender-timeout",
+      (GCallback) on_sender_timeout, rtpsession);
   rtpsession->priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
       (GDestroyNotify) gst_caps_unref);
 
index 5bbf377..9481a1c 100644 (file)
@@ -71,6 +71,7 @@ struct _GstRtpSessionClass {
   void     (*on_bye_ssrc)       (GstRtpSession *sess, guint32 ssrc);
   void     (*on_bye_timeout)    (GstRtpSession *sess, guint32 ssrc);
   void     (*on_timeout)        (GstRtpSession *sess, guint32 ssrc);
+  void     (*on_sender_timeout) (GstRtpSession *sess, guint32 ssrc);
 };
 
 GType gst_rtp_session_get_type (void);
index 70a49c1..050adef 100644 (file)
@@ -104,6 +104,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
 {
   jbuf->base_time = -1;
   jbuf->base_rtptime = -1;
+  jbuf->base_extrtp = -1;
   jbuf->ext_rtptime = -1;
   jbuf->window_pos = 0;
   jbuf->window_filling = TRUE;
@@ -185,21 +186,23 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time,
 
   gstrtptime = gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, clock_rate);
 
-again:
   /* first time, lock on to time and gstrtptime */
-  if (jbuf->base_time == -1)
+  if (G_UNLIKELY (jbuf->base_time == -1))
     jbuf->base_time = time;
-  if (jbuf->base_rtptime == -1)
+  if (G_UNLIKELY (jbuf->base_rtptime == -1)) {
     jbuf->base_rtptime = gstrtptime;
+    jbuf->base_extrtp = ext_rtptime;
+  }
 
-  if (gstrtptime >= jbuf->base_rtptime)
+  if (G_LIKELY (gstrtptime >= jbuf->base_rtptime))
     send_diff = gstrtptime - jbuf->base_rtptime;
   else {
     /* elapsed time at sender, timestamps can go backwards and thus be smaller
      * than our base time, take a new base time in that case. */
     GST_DEBUG ("backward timestamps at server, taking new base time");
-    jbuf->base_rtptime = gstrtptime;
     jbuf->base_time = time;
+    jbuf->base_rtptime = gstrtptime;
+    jbuf->base_extrtp = ext_rtptime;
     send_diff = 0;
   }
 
@@ -208,27 +211,6 @@ again:
       GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
       GST_TIME_ARGS (send_diff));
 
-  if (jbuf->prev_send_diff != -1 && time != -1) {
-    gint64 delta_diff;
-
-    if (send_diff > jbuf->prev_send_diff)
-      delta_diff = send_diff - jbuf->prev_send_diff;
-    else
-      delta_diff = jbuf->prev_send_diff - send_diff;
-
-    /* server changed rtp timestamps too quickly, reset skew detection and start
-     * again. This value is sortof arbitrary and can be a bad measurement up if
-     * there are many packets missing because then we get a big gap that is
-     * unrelated to a timestamp switch. */
-    if (delta_diff > GST_SECOND) {
-      GST_DEBUG ("delta changed too quickly %" GST_TIME_FORMAT " reset skew",
-          GST_TIME_ARGS (delta_diff));
-      rtp_jitter_buffer_reset_skew (jbuf);
-      goto again;
-    }
-  }
-  jbuf->prev_send_diff = send_diff;
-
   /* we don't have an arrival timestamp so we can't do skew detection. we
    * should still apply a timestamp based on RTP timestamp and base_time */
   if (time == -1)
@@ -244,17 +226,30 @@ again:
   /* measure the diff */
   delta = ((gint64) recv_diff) - ((gint64) send_diff);
 
+  /* if the difference between the sender timeline and the receiver timeline
+   * changed too quickly we have to resync because the server likely restarted
+   * its timestamps. */
+  if (ABS (delta - jbuf->skew) > GST_SECOND) {
+    GST_DEBUG ("delta %" GST_TIME_FORMAT " too big, reset skew",
+        delta - jbuf->skew);
+    jbuf->base_time = time;
+    jbuf->base_rtptime = gstrtptime;
+    jbuf->base_extrtp = ext_rtptime;
+    send_diff = 0;
+    delta = 0;
+  }
+
   pos = jbuf->window_pos;
 
-  if (jbuf->window_filling) {
+  if (G_UNLIKELY (jbuf->window_filling)) {
     /* we are filling the window */
     GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta);
     jbuf->window[pos++] = delta;
     /* calc the min delta we observed */
-    if (pos == 1 || delta < jbuf->window_min)
+    if (G_UNLIKELY (pos == 1 || delta < jbuf->window_min))
       jbuf->window_min = delta;
 
-    if (send_diff >= MAX_TIME || pos >= MAX_WINDOW) {
+    if (G_UNLIKELY (send_diff >= MAX_TIME || pos >= MAX_WINDOW)) {
       jbuf->window_size = pos;
 
       /* window filled */
@@ -288,11 +283,11 @@ again:
     old = jbuf->window[pos];
     jbuf->window[pos++] = delta;
 
-    if (delta <= jbuf->window_min) {
+    if (G_UNLIKELY (delta <= jbuf->window_min)) {
       /* if the new value we inserted is smaller or equal to the current min,
        * it becomes the new min */
       jbuf->window_min = delta;
-    } else if (old == jbuf->window_min) {
+    } else if (G_UNLIKELY (old == jbuf->window_min)) {
       gint64 min = G_MAXINT64;
 
       /* if we removed the old min, we have to find a new min */
@@ -313,7 +308,7 @@ again:
         delta, jbuf->window_min);
   }
   /* wrap around in the window */
-  if (pos >= jbuf->window_size)
+  if (G_UNLIKELY (pos >= jbuf->window_size))
     pos = 0;
   jbuf->window_pos = pos;
 
@@ -382,14 +377,14 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
   time = calculate_skew (jbuf, rtptime, time, clock_rate);
   GST_BUFFER_TIMESTAMP (buf) = time;
 
-  if (list)
+  if (G_LIKELY (list))
     g_queue_insert_before (jbuf->packets, list, buf);
   else
     g_queue_push_tail (jbuf->packets, buf);
 
   /* tail was changed when we did not find a previous packet, we set the return
    * flag when requested. */
-  if (tail)
+  if (G_UNLIKELY (tail))
     *tail = (list == NULL);
 
   return TRUE;
@@ -514,3 +509,22 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
   }
   return result;
 }
+
+/**
+ * rtp_jitter_buffer_get_sync:
+ * @jbuf: an #RTPJitterBuffer
+ * @rtptime: result RTP time
+ * @timestamp: result GStreamer timestamp
+ *
+ * Returns the relation between the RTP timestamp and the GStreamer timestamp
+ * used for constructing timestamps.
+ */
+void
+rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime,
+    guint64 * timestamp)
+{
+  if (rtptime)
+    *rtptime = jbuf->base_extrtp;
+  if (timestamp)
+    *timestamp = jbuf->base_time + jbuf->skew;
+}
index ffd73ff..62f3f47 100644 (file)
@@ -22,7 +22,6 @@
 
 #include <gst/gst.h>
 #include <gst/rtp/gstrtcpbuffer.h>
-#include <gst/netbuffer/gstnetbuffer.h>
 
 typedef struct _RTPJitterBuffer RTPJitterBuffer;
 typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
@@ -57,6 +56,7 @@ struct _RTPJitterBuffer {
   /* for calculating skew */
   GstClockTime   base_time;
   GstClockTime   base_rtptime;
+  GstClockTime   base_extrtp;
   guint64        ext_rtptime;
   gint64         window[RTP_JITTER_BUFFER_MAX_WINDOW];
   guint          window_pos;
@@ -90,4 +90,8 @@ void                  rtp_jitter_buffer_flush            (RTPJitterBuffer *jbuf)
 guint                 rtp_jitter_buffer_num_packets      (RTPJitterBuffer *jbuf);
 guint32               rtp_jitter_buffer_get_ts_diff      (RTPJitterBuffer *jbuf);
 
+void                  rtp_jitter_buffer_get_sync         (RTPJitterBuffer *jbuf, guint64 *rtptime,
+                                                          guint64 *timestamp);
+
+
 #endif /* __RTP_JITTER_BUFFER_H__ */
index 947fef7..428181f 100644 (file)
@@ -40,6 +40,7 @@ enum
   SIGNAL_ON_BYE_SSRC,
   SIGNAL_ON_BYE_TIMEOUT,
   SIGNAL_ON_TIMEOUT,
+  SIGNAL_ON_SENDER_TIMEOUT,
   LAST_SIGNAL
 };
 
@@ -212,6 +213,18 @@ rtp_session_class_init (RTPSessionClass * klass)
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
       RTP_TYPE_SOURCE);
+  /**
+   * RTPSession::on-sender-timeout:
+   * @session: the object which received the signal
+   * @src: the RTPSource that timed out
+   *
+   * Notify of an SSRC that was a sender but timed out and became a receiver.
+   */
+  rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
+      g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
+      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+      RTP_TYPE_SOURCE);
 
   g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
       g_param_spec_object ("internal-source", "Internal Source",
@@ -513,6 +526,15 @@ on_timeout (RTPSession * sess, RTPSource * source)
   RTP_SESSION_LOCK (sess);
 }
 
+static void
+on_sender_timeout (RTPSession * sess, RTPSource * source)
+{
+  RTP_SESSION_UNLOCK (sess);
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
+      source);
+  RTP_SESSION_LOCK (sess);
+}
+
 /**
  * rtp_session_new:
  *
@@ -908,9 +930,8 @@ check_collision (RTPSession * sess, RTPSource * source,
     RTPArrivalStats * arrival, gboolean rtp)
 {
   /* If we have not arrival address, we can't do collision checking */
-  if (!arrival->have_address) {
+  if (!arrival->have_address)
     return FALSE;
-  }
 
   if (sess->source != source) {
     /* This is not our local source, but lets check if two remote
@@ -1479,12 +1500,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
   if (!source)
     return;
 
-  /* we somehow need to transfer the clock_base and the base time to the next
-   * element, we use the offset and offset_end fields in the buffer for this
-   * hack */
-  GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
-  GST_BUFFER_OFFSET_END (packet->buffer) = source->clock_base_time;
-
   prevsender = RTP_SOURCE_IS_SENDER (source);
 
   /* first update the source */
@@ -2096,6 +2111,7 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
 {
   gboolean remove = FALSE;
   gboolean byetimeout = FALSE;
+  gboolean sendertimeout = FALSE;
   gboolean is_sender, is_active;
   RTPSession *sess = data->sess;
   GstClockTime interval;
@@ -2138,6 +2154,7 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
             GST_TIME_ARGS (source->last_rtp_activity));
         source->is_sender = FALSE;
         sess->stats.sender_sources--;
+        sendertimeout = TRUE;
       }
     }
   }
@@ -2153,6 +2170,9 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
       on_bye_timeout (sess, source);
     else
       on_timeout (sess, source);
+  } else {
+    if (sendertimeout)
+      on_sender_timeout (sess, source);
   }
   return remove;
 }
index a32f911..dd3fbc1 100644 (file)
@@ -228,6 +228,7 @@ struct _RTPSessionClass {
   void (*on_bye_ssrc)       (RTPSession *sess, RTPSource *source);
   void (*on_bye_timeout)    (RTPSession *sess, RTPSource *source);
   void (*on_timeout)        (RTPSession *sess, RTPSource *source);
+  void (*on_sender_timeout) (RTPSession *sess, RTPSource *source);
 };
 
 GType rtp_session_get_type (void);
index ddbf733..8d9d6ec 100644 (file)
@@ -170,8 +170,6 @@ rtp_source_init (RTPSource * src)
 
   src->payload = 0;
   src->clock_rate = -1;
-  src->clock_base = -1;
-  src->clock_base_time = -1;
   src->packets = g_queue_new ();
   src->seqnum_base = -1;
   src->last_rtptime = -1;
@@ -527,10 +525,6 @@ rtp_source_update_caps (RTPSource * src, GstCaps * caps)
   gst_structure_get_int (s, "clock-rate", &src->clock_rate);
   GST_DEBUG ("got clock-rate %d", src->clock_rate);
 
-  if (gst_structure_get_uint (s, "clock-base", &val))
-    src->clock_base = val;
-  GST_DEBUG ("got clock-base %" G_GINT64_FORMAT, src->clock_base);
-
   if (gst_structure_get_uint (s, "seqnum-base", &val))
     src->seqnum_base = val;
   GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base);
@@ -771,13 +765,6 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
 
   rtptime = gst_rtp_buffer_get_timestamp (buffer);
 
-  /* no clock-base, take first rtptime as base */
-  if (src->clock_base == -1) {
-    GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime);
-    src->clock_base = rtptime;
-    src->clock_base_time = arrival->timestamp;
-  }
-
   /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
    * care about the absolute value, just the difference. */
   rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND);
@@ -923,13 +910,11 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
     } else {
       /* unacceptable jump */
       stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1);
-      src->clock_base = -1;
       goto bad_sequence;
     }
   } else {
     /* duplicate or reordered packet, will be filtered by jitterbuffer. */
     GST_WARNING ("duplicate or reordered packet");
-    src->clock_base = -1;
   }
 
   src->stats.octets_received += arrival->payload_len;
index a2ba2d6..c4c23a8 100644 (file)
@@ -32,8 +32,6 @@
 #define RTP_DEFAULT_PROBATION   2
 
 #define RTP_SEQ_MOD          (1 << 16)
-#define RTP_MAX_DROPOUT      3000
-#define RTP_MAX_MISORDER     100
 
 typedef struct _RTPSource RTPSource;
 typedef struct _RTPSourceClass RTPSourceClass;
@@ -133,8 +131,6 @@ struct _RTPSource {
   GstCaps      *caps;
   gint          clock_rate;
   gint32        seqnum_base;
-  gint64        clock_base;
-  guint64       clock_base_time;
 
   GstClockTime  bye_time;
   GstClockTime  last_activity;
index f82c958..3408300 100644 (file)
@@ -150,11 +150,22 @@ typedef struct {
 #define RTP_STATS_RECEIVER_FRACTION     (1.0 - RTP_STATS_SENDER_FRACTION)
 
 /*
- * When receiving a BYE from a source, remove the source fomr the database
+ * When receiving a BYE from a source, remove the source from the database
  * after this timeout.
  */
 #define RTP_STATS_BYE_TIMEOUT           (2 * GST_SECOND)
 
+/*
+ * The maximum number of missing packets we tollerate. These are packets with a
+ * sequence number bigger than the last seen packet.
+ */
+#define RTP_MAX_DROPOUT      3000
+/*
+ * The maximum number of misordered packets we tollerate. These are packets with
+ * a sequence number smaller than the last seen packet.
+ */
+#define RTP_MAX_MISORDER     100
+
 /**
  * RTPSessionStats:
  *