#define DEFAULT_SDES NULL
#define DEFAULT_DO_LOST FALSE
#define DEFAULT_IGNORE_PT FALSE
+#define DEFAULT_NTP_SYNC FALSE
#define DEFAULT_AUTOREMOVE FALSE
#define DEFAULT_BUFFER_MODE RTP_JITTER_BUFFER_MODE_SLAVE
#define DEFAULT_USE_PIPELINE_CLOCK FALSE
PROP_SDES,
PROP_DO_LOST,
PROP_IGNORE_PT,
+ PROP_NTP_SYNC,
PROP_AUTOREMOVE,
PROP_BUFFER_MODE,
PROP_USE_PIPELINE_CLOCK,
gulong demux_ptreq_sig;
gulong demux_ptchange_sig;
- /* if we have calculated a valid unix_delta for this stream */
+ /* if we have calculated a valid rt_delta for this stream */
gboolean have_sync;
/* mapping to local RTP and NTP time */
- gint64 unix_delta;
+ gint64 rt_delta;
};
#define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock)
/* make use require a new SR packet for this stream before we attempt new
* lip-sync */
stream->have_sync = FALSE;
- stream->unix_delta = 0;
+ stream->rt_delta = 0;
}
}
GST_RTP_BIN_UNLOCK (rtpbin);
g_free (client);
}
+static void
+get_current_times (GstRtpBin * bin, GstClockTime * running_time,
+ guint64 * ntpnstime)
+{
+ guint64 ntpns;
+ GstClock *clock;
+ GstClockTime base_time, rt, clock_time;
+
+ GST_OBJECT_LOCK (bin);
+ if ((clock = GST_ELEMENT_CLOCK (bin))) {
+ base_time = GST_ELEMENT_CAST (bin)->base_time;
+ gst_object_ref (clock);
+ GST_OBJECT_UNLOCK (bin);
+
+ clock_time = gst_clock_get_time (clock);
+
+ if (bin->use_pipeline_clock) {
+ ntpns = clock_time;
+ } else {
+ GTimeVal current;
+
+ /* get current NTP time */
+ g_get_current_time (¤t);
+ ntpns = GST_TIMEVAL_TO_TIME (current);
+ }
+
+ /* add constant to convert from 1970 based time to 1900 based time */
+ ntpns += (2208988800LL * GST_SECOND);
+
+ /* get current clock time and convert to running time */
+ rt = clock_time - base_time;
+
+ gst_object_unref (clock);
+ } else {
+ GST_OBJECT_UNLOCK (bin);
+ rt = -1;
+ ntpns = -1;
+ }
+ if (running_time)
+ *running_time = rt;
+ if (ntpnstime)
+ *ntpnstime = ntpns;
+}
+
+static void
+stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
+ gint64 ts_offset)
+{
+ gint64 prev_ts_offset;
+
+ g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
+
+ /* delta changed, see how much */
+ if (prev_ts_offset != ts_offset) {
+ gint64 diff;
+
+ diff = prev_ts_offset - ts_offset;
+
+ GST_DEBUG_OBJECT (bin,
+ "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
+ ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
+
+ /* only change diff when it changed more than 4 milliseconds. This
+ * compensates for rounding errors in NTP to RTP timestamp
+ * conversions */
+ if (ABS (diff) > 4 * GST_MSECOND) {
+ if (ABS (diff) < (3 * GST_SECOND)) {
+ g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
+ } else {
+ GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
+ }
+ } else {
+ GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
+ }
+ }
+ GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
+ stream->ssrc, ts_offset);
+}
+
/* associate a stream to the given CNAME. This will make sure all streams for
* that CNAME are synchronized together.
* Must be called with GST_RTP_BIN_LOCK */
static void
gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
- guint8 * data, guint64 last_unix, guint64 last_extrtptime,
+ guint8 * data, guint64 ntptime, guint64 last_extrtptime,
guint64 base_rtptime, guint64 base_time, guint clock_rate)
{
GstRtpBinClient *client;
gboolean created;
GSList *walk;
- guint64 local_unix;
+ guint64 local_rt;
guint64 local_rtp;
+ GstClockTime running_time;
+ guint64 ntpnstime;
+ gint64 ntpdiff, rtdiff;
+ guint64 last_unix;
/* first find or create the CNAME */
client = get_client (bin, len, data, &created);
stream->ssrc, client, client->cname);
}
- /* take the extended rtptime we found in the SR packet and map it to the
+ /* Take the extended rtptime we found in the SR packet and map it to the
* local rtptime. The local rtp time is used to construct timestamps on the
- * buffers. */
+ * buffers so we will calculate what running_time corresponds to the RTP
+ * timestamp in the SR packet. */
local_rtp = last_extrtptime - base_rtptime;
GST_DEBUG_OBJECT (bin,
", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", base_rtptime,
last_extrtptime, local_rtp, clock_rate);
- /* calculate local NTP time in gstreamer timestamp, we essentially perform the
+ /* calculate local RTP time in gstreamer timestamp, we essentially perform the
* same conversion that a jitterbuffer would use to convert an rtp timestamp
- * into a corresponding gstreamer timestamp. */
- local_unix = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
- local_unix += base_time;
-
- /* calculate delta between server and receiver. last_unix is created by
- * converting the ntptime in the last SR packet to a gstreamer timestamp. This
- * delta expresses the difference to our timeline and the server timeline. */
- stream->unix_delta = last_unix - local_unix;
+ * into a corresponding gstreamer timestamp. Note that the base_time also
+ * contains the drift between sender and receiver. */
+ local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
+ local_rt += base_time;
+
+ /* convert ntptime to unix time since 1900 */
+ last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
+ (G_GINT64_CONSTANT (1) << 32));
+
stream->have_sync = TRUE;
GST_DEBUG_OBJECT (bin,
- "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
- ", delta %" G_GINT64_FORMAT, local_unix, last_unix, stream->unix_delta);
+ "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
+ local_rt, last_unix);
/* recalc inter stream playout offset, but only if there is more than one
- * stream. */
- if (client->nstreams > 1) {
+ * stream or we're doing NTP sync. */
+ if (bin->ntp_sync) {
+ /* For NTP sync we need to first get a snapshot of running_time and NTP
+ * time. We know at what running_time we play a certain RTP time, we also
+ * calculated when we would play the RTP time in the SR packet. Now we need
+ * to know how the running_time and the NTP time relate to eachother. */
+ get_current_times (bin, &running_time, &ntpnstime);
+
+ /* see how far away the NTP time is. This is the difference between the
+ * current NTP time and the NTP time in the last SR packet. */
+ ntpdiff = ntpnstime - last_unix;
+ /* see how far away the running_time is. This is the difference between the
+ * current running_time and the running_time of the RTP timestamp in the
+ * last SR packet. */
+ rtdiff = running_time - local_rt;
+
+ GST_DEBUG_OBJECT (bin,
+ "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
+ ntpnstime, last_unix);
+ GST_DEBUG_OBJECT (bin,
+ "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
+ rtdiff);
+
+ /* combine to get the final diff to apply to the running_time */
+ stream->rt_delta = rtdiff - ntpdiff;
+
+ stream_set_ts_offset (bin, stream, stream->rt_delta);
+ } else if (client->nstreams > 1) {
gint64 min;
+ /* calculate delta between server and receiver. last_unix is created by
+ * converting the ntptime in the last SR packet to a gstreamer timestamp. This
+ * delta expresses the difference to our timeline and the server timeline. The
+ * difference in itself doesn't mean much but we can combine the delta of
+ * multiple streams to create a stream specific offset. */
+ stream->rt_delta = last_unix - local_rt;
+
/* calculate the min of all deltas, ignoring streams that did not yet have a
- * valid unix_delta because we did not yet receive an SR packet for those
+ * valid rt_delta because we did not yet receive an SR packet for those
* streams.
* We calculate the mininum because we would like to only apply positive
* offsets to streams, delaying their playback instead of trying to speed up
if (!ostream->have_sync)
continue;
- if (ostream->unix_delta < min)
- min = ostream->unix_delta;
+ if (ostream->rt_delta < min)
+ min = ostream->rt_delta;
}
GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
/* calculate offsets for each stream */
for (walk = client->streams; walk; walk = g_slist_next (walk)) {
GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
- gint64 ts_offset, prev_ts_offset;
+ gint64 ts_offset;
/* ignore streams for which we didn't receive an SR packet yet, we
* can't synchronize them yet. We can however sync other streams just
/* calculate offset to our reference stream, this should always give a
* positive number. */
- ts_offset = ostream->unix_delta - min;
-
- g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL);
-
- /* delta changed, see how much */
- if (prev_ts_offset != ts_offset) {
- gint64 diff;
+ ts_offset = ostream->rt_delta - min;
- if (prev_ts_offset > ts_offset)
- diff = prev_ts_offset - ts_offset;
- else
- diff = ts_offset - prev_ts_offset;
-
- GST_DEBUG_OBJECT (bin,
- "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT
- ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
-
- /* only change diff when it changed more than 4 milliseconds. This
- * compensates for rounding errors in NTP to RTP timestamp
- * conversions */
- if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) {
- g_object_set (ostream->buffer, "ts-offset", ts_offset, NULL);
- }
- }
- GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
- ostream->ssrc, ts_offset);
+ stream_set_ts_offset (bin, ostream, ts_offset);
}
}
return;
GST_RTP_BIN_LOCK (bin);
/* associate the stream to CNAME */
gst_rtp_bin_associate (bin, stream, len, data,
- gst_rtcp_ntp_to_unix (ntptime), extrtptime,
- base_rtptime, base_time, clock_rate);
+ ntptime, extrtptime, base_rtptime, base_time, clock_rate);
GST_RTP_BIN_UNLOCK (bin);
}
}
stream->demux = demux;
stream->have_sync = FALSE;
- stream->unix_delta = 0;
+ stream->rt_delta = 0;
stream->percent = 100;
session->streams = g_slist_prepend (session->streams, stream);
g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
"Use the pipeline clock to set the NTP time in the RTCP SR messages",
DEFAULT_AUTOREMOVE, G_PARAM_READWRITE));
-
/**
* GstRtpBin::buffer-mode:
*
g_param_spec_enum ("buffer-mode", "Buffer Mode",
"Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstRtpBin::ntp-sync:
+ *
+ * Synchronize received streams to the NTP clock. When the NTP clock is shared
+ * between the receivers and the senders (such as when using ntpd) this option
+ * can be used to synchronize receivers on multiple machines.
+ *
+ * Since: 0.10.21
+ */
+ g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
+ g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
+ "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
gstelement_class->request_new_pad =
rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
rtpbin->do_lost = DEFAULT_DO_LOST;
rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
+ rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
GST_RTP_BIN_UNLOCK (rtpbin);
gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
break;
+ case PROP_NTP_SYNC:
+ rtpbin->ntp_sync = g_value_get_boolean (value);
+ break;
case PROP_IGNORE_PT:
rtpbin->ignore_pt = g_value_get_boolean (value);
break;
case PROP_IGNORE_PT:
g_value_set_boolean (value, rtpbin->ignore_pt);
break;
+ case PROP_NTP_SYNC:
+ g_value_set_boolean (value, rtpbin->ntp_sync);
+ break;
case PROP_AUTOREMOVE:
g_value_set_boolean (value, rtpbin->priv->autoremove);
break;