gst/rtpmanager/gstrtpbin.c: Calculate and configure the NTP base time so that we...
authorWim Taymans <wim.taymans@gmail.com>
Wed, 12 Sep 2007 18:04:32 +0000 (18:04 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:29 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (calc_ntp_ns_base),
(gst_rtp_bin_change_state), (new_payload_found), (create_send_rtp):
Calculate and configure the NTP base time so that we can generate better
NTP times in SR packets.
Set caps on new ghostpad.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_loop):
Clean debug statement.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init),
(gst_rtp_session_init), (gst_rtp_session_set_property),
(gst_rtp_session_get_property), (get_current_ntp_ns_time),
(rtcp_thread), (gst_rtp_session_event_recv_rtp_sink),
(gst_rtp_session_internal_links), (gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_event_send_rtp_sink),
(gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink),
(create_send_rtp_sink):
* gst/rtpmanager/gstrtpsession.h:
Add ntp-ns-base property to convert running_time to NTP time.
Handle NEWSEGMENT events on send and recv RTP pads so that we can
calculate the running time and thus NTP time of the packets.
Simplify getting the current NTP time using the pipeline clock.
Implement internal links functions.
Use the buffer timestamp to calculate the NTP time instead of the clock.
* gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc),
(gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event),
(gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain),
(gst_rtp_ssrc_demux_internal_links),
(gst_rtp_ssrc_demux_src_query):
* gst/rtpmanager/gstrtpssrcdemux.h:
Implement internal links function.
Calculate the diff between different streams, this might be used later
to get the inter stream latency.
* gst/rtpmanager/rtpsession.c: (rtp_session_send_rtp):
Simple cleanup.
* gst/rtpmanager/rtpsource.c: (rtp_source_init),
(calculate_jitter), (rtp_source_send_rtp), (rtp_source_get_new_sr):
Make the clock skew window a little bigger.
Apply the clock skew to all buffers, not just one with a new timestamp.
Calculate and debug sender clock drift.
Use extended last timestamp to interpollate for SR reports.

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/gstrtpsession.h
gst/rtpmanager/gstrtpssrcdemux.c
gst/rtpmanager/gstrtpssrcdemux.h
gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsource.c

index a4ba67c..7d5fb5d 100644 (file)
@@ -208,6 +208,8 @@ GST_STATIC_PAD_TEMPLATE ("sink_%d",
 struct _GstRtpBinPrivate
 {
   GMutex *bin_lock;
+
+  GstClockTime ntp_ns_base;
 };
 
 /* signals and args */
@@ -1142,6 +1144,30 @@ gst_rtp_bin_provide_clock (GstElement * element)
   return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock));
 }
 
+static void
+calc_ntp_ns_base (GstRtpBin * bin)
+{
+  GstClockTime now;
+  GTimeVal current;
+  GSList *walk;
+
+  /* get the current time and convert it to NTP time in nanoseconds */
+  g_get_current_time (&current);
+  now = GST_TIMEVAL_TO_TIME (current);
+  now += (2208988800LL * GST_SECOND);
+
+  GST_RTP_BIN_LOCK (bin);
+  bin->priv->ntp_ns_base = now;
+  for (walk = bin->sessions; walk; walk = g_slist_next (walk)) {
+    GstRtpBinSession *session = (GstRtpBinSession *) walk->data;
+
+    g_object_set (session->session, "ntp-ns-base", now, NULL);
+  }
+  GST_RTP_BIN_UNLOCK (bin);
+
+  return;
+}
+
 static GstStateChangeReturn
 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
 {
@@ -1156,6 +1182,7 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      calc_ntp_ns_base (rtpbin);
       break;
     default:
       break;
@@ -1199,6 +1226,7 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad,
   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
   g_free (padname);
 
+  gst_pad_set_caps (gpad, GST_PAD_CAPS (pad));
   gst_pad_set_active (gpad, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
 }
@@ -1553,9 +1581,6 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
   if (session->send_rtp_sink == NULL)
     goto pad_failed;
 
-  g_signal_connect (session->send_rtp_sink, "notify::caps",
-      (GCallback) caps_changed, session);
-
   result =
       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
   gst_pad_set_active (result, TRUE);
index a23fbb8..08a55f2 100644 (file)
@@ -977,7 +977,7 @@ again:
 
   GST_DEBUG_OBJECT (jitterbuffer,
       "Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT
-      ",now %d left", seqnum, rtp_time, exttimestamp,
+      ", now %d left", seqnum, rtp_time, exttimestamp,
       rtp_jitter_buffer_num_packets (priv->jbuf));
 
   /* If we don't know what the next seqnum should be (== -1) we have to wait
index 87948a4..7083365 100644 (file)
@@ -214,6 +214,8 @@ enum
   LAST_SIGNAL
 };
 
+#define DEFAULT_NTP_NS_BASE 0
+
 enum
 {
   PROP_0,
@@ -462,6 +464,11 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
       NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
 
+  g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
+      g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
+          "The NTP base time corresponding to running_time 0", 0,
+          G_MAXUINT64, DEFAULT_NTP_NS_BASE, G_PARAM_READWRITE));
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
   gstelement_class->request_new_pad =
@@ -497,6 +504,9 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
   g_signal_connect (rtpsession->priv->session, "on-timeout",
       (GCallback) on_timeout, rtpsession);
   rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL);
+
+  gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+  gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
 }
 
 static void
@@ -521,7 +531,11 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_NTP_NS_BASE:
+      GST_OBJECT_LOCK (rtpsession);
       rtpsession->priv->ntpnsbase = g_value_get_uint64 (value);
+      GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (rtpsession->priv->ntpnsbase));
+      GST_OBJECT_UNLOCK (rtpsession);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -539,7 +553,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_NTP_NS_BASE:
+      GST_OBJECT_LOCK (rtpsession);
       g_value_set_uint64 (value, rtpsession->priv->ntpnsbase);
+      GST_OBJECT_UNLOCK (rtpsession);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -548,19 +564,31 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
 }
 
 static guint64
-get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock)
+get_current_ntp_ns_time (GstRtpSession * rtpsession)
 {
   guint64 ntpnstime;
+  GstClock *clock;
+  GstClockTime base_time, ntpnsbase;
+
+  GST_OBJECT_LOCK (rtpsession);
+  if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
+    base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
+    ntpnsbase = rtpsession->priv->ntpnsbase;
+    gst_object_ref (clock);
+    GST_OBJECT_UNLOCK (rtpsession);
 
-  if (clock) {
     /* get current NTP time */
     ntpnstime = gst_clock_get_time (clock);
     /* convert to running time */
-    ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession));
+    ntpnstime -= base_time;
     /* add NTP base offset */
-    ntpnstime += rtpsession->priv->ntpnsbase;
-  } else
+    ntpnstime += ntpnsbase;
+
+    gst_object_unref (clock);
+  } else {
+    GST_OBJECT_UNLOCK (rtpsession);
     ntpnstime = -1;
+  }
 
   return ntpnstime;
 }
@@ -568,7 +596,7 @@ get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock)
 static void
 rtcp_thread (GstRtpSession * rtpsession)
 {
-  GstClock *sysclock, *clock;
+  GstClock *sysclock;
   GstClockID id;
   GstClockTime current_time;
   GstClockTime next_timeout;
@@ -579,9 +607,6 @@ rtcp_thread (GstRtpSession * rtpsession)
   if (sysclock == NULL)
     goto no_sysclock;
 
-  /* to get the current NTP time, we use the pipeline clock */
-  clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
-
   current_time = gst_clock_get_time (sysclock);
 
   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
@@ -619,7 +644,7 @@ rtcp_thread (GstRtpSession * rtpsession)
     current_time = gst_clock_get_time (sysclock);
 
     /* get current NTP time */
-    ntpnstime = get_current_ntp_ns_time (rtpsession, clock);
+    ntpnstime = get_current_ntp_ns_time (rtpsession);
 
     /* we get unlocked because we need to perform reconsideration, don't perform
      * the timeout but get a new reporting estimate. */
@@ -969,6 +994,41 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
       GST_EVENT_TYPE_NAME (event));
 
   switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:
+      gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+      break;
+    case GST_EVENT_NEWSEGMENT:
+    {
+      gboolean update;
+      gdouble rate, arate;
+      GstFormat format;
+      gint64 start, stop, time;
+      GstSegment *segment;
+
+      segment = &rtpsession->recv_rtp_seg;
+
+      /* the newsegment event is needed to convert the RTP timestamp to
+       * running_time, which is needed to generate a mapping from RTP to NTP
+       * timestamps in SR reports */
+      gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
+          &start, &stop, &time);
+
+      GST_DEBUG_OBJECT (rtpsession,
+          "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
+          "format GST_FORMAT_TIME, "
+          "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
+          ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
+          update, rate, arate, GST_TIME_ARGS (segment->start),
+          GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
+          GST_TIME_ARGS (segment->accum));
+
+      gst_segment_set_newsegment_full (segment, update, rate,
+          arate, format, start, stop, time);
+
+      /* push event forward */
+      ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
+      break;
+    }
     default:
       ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
       break;
@@ -976,6 +1036,31 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
   gst_object_unref (rtpsession);
 
   return ret;
+
+}
+static GList *
+gst_rtp_session_internal_links (GstPad * pad)
+{
+  GstRtpSession *rtpsession;
+  GstRtpSessionPrivate *priv;
+  GList *res = NULL;
+
+  rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+  priv = rtpsession->priv;
+
+  if (pad == rtpsession->recv_rtp_src) {
+    res = g_list_prepend (res, rtpsession->recv_rtp_sink);
+  } else if (pad == rtpsession->recv_rtp_sink) {
+    res = g_list_prepend (res, rtpsession->recv_rtp_src);
+  } else if (pad == rtpsession->send_rtp_src) {
+    res = g_list_prepend (res, rtpsession->send_rtp_sink);
+  } else if (pad == rtpsession->send_rtp_sink) {
+    res = g_list_prepend (res, rtpsession->send_rtp_src);
+  }
+
+  gst_object_unref (rtpsession);
+
+  return res;
 }
 
 static gboolean
@@ -1006,14 +1091,25 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
   GstRtpSessionPrivate *priv;
   GstFlowReturn ret;
   guint64 ntpnstime;
+  GstClockTime timestamp;
 
   rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
   priv = rtpsession->priv;
 
   GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
 
-  ntpnstime =
-      get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession));
+  /* get NTP time when this packet was captured, this depends on the timestamp. */
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+    /* convert to running time using the segment values */
+    ntpnstime =
+        gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
+        timestamp);
+    /* add constant to convert running time to NTP time */
+    ntpnstime += priv->ntpnsbase;
+  } else {
+    ntpnstime = get_current_ntp_ns_time (rtpsession);
+  }
 
   ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
 
@@ -1084,6 +1180,9 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
   GST_DEBUG_OBJECT (rtpsession, "received event");
 
   switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:
+      gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
+      break;
     case GST_EVENT_NEWSEGMENT:
     {
       gboolean update;
@@ -1146,7 +1245,10 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
   timestamp = GST_BUFFER_TIMESTAMP (buffer);
   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
     /* convert to running time using the segment start value. */
-    ntpnstime = timestamp - rtpsession->send_rtp_seg.start;
+    ntpnstime =
+        gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
+        timestamp);
+    /* convert to NTP time by adding the NTP base */
     ntpnstime += priv->ntpnsbase;
   } else
     ntpnstime = -1;
@@ -1175,6 +1277,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
       gst_rtp_session_event_recv_rtp_sink);
   gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink,
       gst_rtp_session_sink_setcaps);
+  gst_pad_set_internal_link_function (rtpsession->recv_rtp_sink,
+      gst_rtp_session_internal_links);
   gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
       rtpsession->recv_rtp_sink);
@@ -1183,6 +1287,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
   rtpsession->recv_rtp_src =
       gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
       "recv_rtp_src");
+  gst_pad_set_internal_link_function (rtpsession->recv_rtp_src,
+      gst_rtp_session_internal_links);
   gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
   gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
@@ -1235,8 +1341,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
       gst_rtp_session_chain_send_rtp);
   gst_pad_set_event_function (rtpsession->send_rtp_sink,
       gst_rtp_session_event_send_rtp_sink);
-  gst_pad_set_setcaps_function (rtpsession->send_rtp_sink,
-      gst_rtp_session_sink_setcaps);
+  gst_pad_set_internal_link_function (rtpsession->send_rtp_sink,
+      gst_rtp_session_internal_links);
   gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
       rtpsession->send_rtp_sink);
@@ -1245,6 +1351,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
       gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
       "send_rtp_src");
   gst_pad_use_fixed_caps (rtpsession->send_rtp_src);
+  gst_pad_set_internal_link_function (rtpsession->send_rtp_src,
+      gst_rtp_session_internal_links);
   gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
   gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
 
index 09565ac..3fffb06 100644 (file)
@@ -43,6 +43,7 @@ struct _GstRtpSession {
 
   /*< private >*/
   GstPad        *recv_rtp_sink;
+  GstSegment     recv_rtp_seg;
   GstPad        *recv_rtcp_sink;
   GstPad        *send_rtp_sink;
   GstSegment     send_rtp_seg;
index 5457bc3..c728a6d 100644 (file)
@@ -125,6 +125,8 @@ static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
 
 /* srcpad stuff */
 static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
+static GList *gst_rtp_ssrc_demux_internal_links (GstPad * pad);
+static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query);
 
 static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
 
@@ -137,6 +139,7 @@ struct _GstRtpSsrcDemuxPad
   GstPad *rtp_pad;
   GstCaps *caps;
   GstPad *rtcp_pad;
+  GstClockTime first_ts;
 };
 
 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
@@ -156,7 +159,8 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
 }
 
 static GstRtpSsrcDemuxPad *
-create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
+    GstClockTime timestamp)
 {
   GstPad *rtp_pad, *rtcp_pad;
   GstElementClass *klass;
@@ -177,13 +181,27 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   rtcp_pad = gst_pad_new_from_template (templ, padname);
   g_free (padname);
 
+  /* we use the first timestamp received to calculate the difference between
+   * timestamps on all streams */
+  GST_DEBUG_OBJECT (demux, "SSRC %08x, first timestamp %" GST_TIME_FORMAT,
+      ssrc, GST_TIME_ARGS (timestamp));
+
   /* wrap in structure and add to list */
   demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
   demuxpad->ssrc = ssrc;
   demuxpad->rtp_pad = rtp_pad;
   demuxpad->rtcp_pad = rtcp_pad;
+  demuxpad->first_ts = timestamp;
+
+  GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  gst_pad_set_element_private (rtp_pad, demuxpad);
+  gst_pad_set_element_private (rtcp_pad, demuxpad);
 
   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
+
+  /* unlock to perform the remainder and to fire our signal */
   GST_OBJECT_UNLOCK (demux);
 
   /* copy caps from input */
@@ -193,7 +211,13 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   gst_pad_use_fixed_caps (rtcp_pad);
 
   gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
+  gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
+  gst_pad_set_internal_link_function (rtp_pad,
+      gst_rtp_ssrc_demux_internal_links);
   gst_pad_set_active (rtp_pad, TRUE);
+
+  gst_pad_set_internal_link_function (rtcp_pad,
+      gst_rtp_ssrc_demux_internal_links);
   gst_pad_set_active (rtcp_pad, TRUE);
 
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
@@ -277,6 +301,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
   gst_pad_set_event_function (demux->rtcp_sink,
       gst_rtp_ssrc_demux_rtcp_sink_event);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
+
+  gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
 }
 
 static void
@@ -298,6 +324,9 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
   demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
 
   switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:
+      gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
+      break;
     case GST_EVENT_NEWSEGMENT:
     default:
     {
@@ -370,7 +399,9 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
   GST_OBJECT_LOCK (demux);
   dpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (dpad == NULL) {
-    if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc)))
+    if (!(dpad =
+            create_demux_pad_for_ssrc (demux, ssrc,
+                GST_BUFFER_TIMESTAMP (buf))))
       goto create_failed;
   }
   GST_OBJECT_UNLOCK (demux);
@@ -419,6 +450,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
   /* first packet must be SR or RR or else the validate would have failed */
   switch (gst_rtcp_packet_get_type (&packet)) {
     case GST_RTCP_TYPE_SR:
+      /* get the ssrc so that we can route it to the right source pad */
       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
           NULL);
       break;
@@ -435,7 +467,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
   dpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (dpad == NULL) {
     GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
-    if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc)))
+    if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1)))
       goto create_failed;
   }
   GST_OBJECT_UNLOCK (demux);
@@ -482,6 +514,84 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
   return res;
 }
 
+static GList *
+gst_rtp_ssrc_demux_internal_links (GstPad * pad)
+{
+  GstRtpSsrcDemux *demux;
+  GList *res = NULL;
+  GSList *walk;
+
+  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+
+  GST_OBJECT_LOCK (demux);
+  for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+
+    if (pad == demux->rtp_sink) {
+      res = g_list_prepend (res, dpad->rtp_pad);
+    } else if (pad == demux->rtcp_sink) {
+      res = g_list_prepend (res, dpad->rtcp_pad);
+    } else if (pad == dpad->rtp_pad) {
+      res = g_list_prepend (res, demux->rtp_sink);
+      break;
+    } else if (pad == dpad->rtcp_pad) {
+      res = g_list_prepend (res, demux->rtcp_sink);
+      break;
+    }
+  }
+  GST_OBJECT_UNLOCK (demux);
+
+  gst_object_unref (demux);
+  return res;
+}
+
+static gboolean
+gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
+{
+  GstRtpSsrcDemux *demux;
+  gboolean res = FALSE;
+
+  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_LATENCY:
+    {
+
+      if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
+        gboolean live;
+        GstClockTime min_latency, max_latency;
+        GstRtpSsrcDemuxPad *demuxpad;
+
+        demuxpad = gst_pad_get_element_private (pad);
+
+        gst_query_parse_latency (query, &live, &min_latency, &max_latency);
+
+        GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (min_latency));
+
+        GST_DEBUG_OBJECT (demux,
+            "latency for SSRC %08x, latency %" GST_TIME_FORMAT, demuxpad->ssrc,
+            GST_TIME_ARGS (demuxpad->first_ts));
+
+#if 0
+        min_latency += demuxpad->first_ts;
+        if (max_latency != -1)
+          max_latency += demuxpad->first_ts;
+#endif
+
+        gst_query_set_latency (query, live, min_latency, max_latency);
+      }
+      break;
+    }
+    default:
+      res = gst_pad_query_default (pad, query);
+      break;
+  }
+  gst_object_unref (demux);
+
+  return res;
+}
+
 static GstStateChangeReturn
 gst_rtp_ssrc_demux_change_state (GstElement * element,
     GstStateChange transition)
index bea2769..88aeed8 100644 (file)
@@ -36,6 +36,8 @@ struct _GstRtpSsrcDemux
 {
   GstElement parent;
 
+  GstSegment   segment;
+
   GstPad *rtp_sink;
   GstPad *rtcp_sink;
   GSList *srcpads;
index e7f72b4..724fa24 100644 (file)
@@ -1423,7 +1423,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime)
   prevsender = RTP_SOURCE_IS_SENDER (source);
 
   /* we use our own source to send */
-  result = rtp_source_send_rtp (sess->source, buffer, ntptime);
+  result = rtp_source_send_rtp (source, buffer, ntptime);
 
   if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
     sess->stats.sender_sources++;
index 6354335..c415247 100644 (file)
@@ -74,6 +74,7 @@ rtp_source_init (RTPSource * src)
   src->prev_ext_rtptime = -1;
   src->packets = g_queue_new ();
   src->seqnum_base = -1;
+  src->last_rtptime = -1;
 
   src->stats.cycles = -1;
   src->stats.jitter = 0;
@@ -320,22 +321,19 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
      * out of sync and we must compensate. */
     skew = ntpdiff - rtpdiff;
     /* average out the skew to get a smooth value. */
-    src->avg_skew = (31 * src->avg_skew + skew) / 32;
+    src->avg_skew = (63 * src->avg_skew + skew) / 64;
 
-    GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew,
+    GST_DEBUG ("new skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew,
         src->avg_skew);
-    if (src->avg_skew != 0) {
-      guint32 timestamp;
-
-      /* patch the buffer RTP timestamp with the skew */
-      GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew);
-      timestamp = gst_rtp_buffer_get_timestamp (buffer);
-      timestamp += src->avg_skew;
-      gst_rtp_buffer_set_timestamp (buffer, timestamp);
-    }
     /* store previous extended timestamp */
     src->prev_ext_rtptime = ext_rtptime;
   }
+  if (src->avg_skew != 0) {
+    /* patch the buffer RTP timestamp with the skew */
+    GST_DEBUG ("skew timestamp RTP %" G_GUINT32_FORMAT " -> %" G_GINT64_FORMAT,
+        rtptime, rtptime + src->avg_skew);
+    gst_rtp_buffer_set_timestamp (buffer, rtptime + src->avg_skew);
+  }
 
   /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
    * care about the absolute value, just the difference. */
@@ -555,6 +553,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
 {
   GstFlowReturn result = GST_FLOW_OK;
   guint len;
+  guint32 rtptime;
+  guint64 ext_rtptime;
+  guint64 ntp_diff, rtp_diff;
 
   g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@@ -570,9 +571,27 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
   src->stats.packets_sent++;
   src->stats.octets_sent += len;
 
+  rtptime = gst_rtp_buffer_get_timestamp (buffer);
+  ext_rtptime = src->last_rtptime;
+  ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
+
+  GST_DEBUG ("SSRC %08x, RTP %" G_GUINT64_FORMAT ", NTP %" GST_TIME_FORMAT,
+      src->ssrc, ext_rtptime, GST_TIME_ARGS (ntpnstime));
+
+  if (ext_rtptime > src->last_rtptime) {
+    rtp_diff = ext_rtptime - src->last_rtptime;
+    ntp_diff = ntpnstime - src->last_ntpnstime;
+
+    /* calc the diff so we can detect drift at the sender. This can also be used
+     * to guestimate the clock rate if the NTP time is locked to the RTP
+     * timestamps (as is the case when the capture device is providing the clock). */
+    GST_DEBUG ("SSRC %08x, diff RTP %" G_GUINT64_FORMAT ", diff NTP %"
+        GST_TIME_FORMAT, src->ssrc, rtp_diff, GST_TIME_ARGS (ntp_diff));
+  }
+
   /* we keep track of the last received RTP timestamp and the corresponding
    * NTP timestamp so that we can use this info when constructing SR reports */
-  src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer);
+  src->last_rtptime = ext_rtptime;
   src->last_ntpnstime = ntpnstime;
 
   /* push packet */
@@ -587,7 +606,8 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
        * get the correct SSRC. */
       buffer = gst_buffer_make_writable (buffer);
 
-      GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc);
+      GST_WARNING ("updating SSRC from %08x to %08x, fix the payloader", ssrc,
+          src->ssrc);
       gst_rtp_buffer_set_ssrc (buffer, src->ssrc);
     }
     GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT,
@@ -716,7 +736,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
     guint64 * ntptime, guint32 * rtptime, guint32 * packet_count,
     guint32 * octet_count)
 {
-  guint32 t_rtp;
+  guint64 t_rtp;
   guint64 t_current_ntp;
   GstClockTimeDiff diff;
 
@@ -730,7 +750,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
   t_rtp = src->last_rtptime;
 
   GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %"
-      G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp);
+      G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp);
 
   if (src->clock_rate != -1) {
     /* get the diff with the SR time */
@@ -752,11 +772,12 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
     GST_WARNING ("no clock-rate, cannot interpollate rtp time");
   }
 
+  /* convert the NTP time in nanoseconds to 32.32 fixed point */
   t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND);
 
   GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT,
       (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff),
-      t_rtp);
+      (guint32) t_rtp);
 
   if (ntptime)
     *ntptime = t_current_ntp;