/* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
static GstRtpBinClient *
-get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
+get_client (GstRtpBin * bin, guint8 len, const guint8 * data,
+ gboolean * created)
{
GstRtpBinClient *result = NULL;
GSList *walk;
* Must be called with GST_RTP_BIN_LOCK */
static void
gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
- guint8 * data, guint64 ntptime, guint64 last_extrtptime,
+ const guint8 * data, guint64 ntpnstime, guint64 last_extrtptime,
guint64 base_rtptime, guint64 base_time, guint clock_rate,
gint64 rtp_clock_base)
{
gboolean created;
GSList *walk;
GstClockTime running_time, running_time_rtp;
- guint64 ntpnstime;
/* first find or create the CNAME */
client = get_client (bin, len, data, &created);
gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
running_time += base_time;
- /* convert ntptime to nanoseconds */
- ntpnstime = gst_util_uint64_scale (ntptime, GST_SECOND,
- (G_GINT64_CONSTANT (1) << 32));
-
stream->have_sync = TRUE;
GST_DEBUG_OBJECT (bin,
GstRtpBin *bin;
GstRTCPPacket packet;
guint32 ssrc;
- guint64 ntptime;
+ guint64 ntpnstime, inband_ntpnstime;
gboolean have_sr, have_sdes;
gboolean more;
guint64 base_rtptime;
guint64 base_time;
guint clock_rate;
guint64 clock_base;
- guint64 extrtptime;
+ guint64 extrtptime, inband_ext_rtptime;
GstBuffer *buffer;
+ const gchar *cname;
GstRTCPBuffer rtcp = { NULL, };
bin = stream->bin;
* timestamps. We get this info directly from the jitterbuffer which
* constructs gstreamer timestamps from rtp timestamps and so it know exactly
* what the current situation is. */
- base_rtptime =
- g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
- base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
- clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
- clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
- extrtptime =
- g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
+ if (!gst_structure_get_uint64 (s, "base-rtptime", &base_rtptime) ||
+ !gst_structure_get_uint64 (s, "base-time", &base_time) ||
+ !gst_structure_get_uint (s, "clock-rate", &clock_rate) ||
+ !gst_structure_get_uint64 (s, "clock-base", &clock_base)) {
+ /* invalid structure */
+ return;
+ }
+
+ /* if the jitterbuffer directly got the NTP timestamp then don't work
+ * through the RTCP SR, otherwise extract it from there */
+ if (gst_structure_get_uint64 (s, "inband-ntpnstime", &inband_ntpnstime)
+ && gst_structure_get_uint64 (s, "inband-ext-rtptime", &inband_ext_rtptime)
+ && (cname = gst_structure_get_string (s, "cname"))
+ && gst_structure_get_uint (s, "ssrc", &ssrc)) {
+ GST_DEBUG_OBJECT (bin,
+ "handle sync from inband NTP-64 information for SSRC %08x", ssrc);
+
+ if (ssrc != stream->ssrc)
+ return;
+
+ GST_RTP_BIN_LOCK (bin);
+ gst_rtp_bin_associate (bin, stream, strlen (cname), (const guint8 *) cname,
+ inband_ntpnstime, inband_ext_rtptime, base_rtptime, base_time,
+ clock_rate, clock_base);
+ GST_RTP_BIN_UNLOCK (bin);
+ return;
+ }
+
+ if (!gst_structure_get_uint64 (s, "sr-ext-rtptime", &extrtptime)
+ || !gst_structure_has_field_typed (s, "sr-buffer", GST_TYPE_BUFFER)) {
+ /* invalid structure */
+ return;
+ }
+
+ GST_DEBUG_OBJECT (bin, "handle sync from RTCP SR information");
+
buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
have_sr = FALSE;
if (have_sr)
break;
/* get NTP and RTP times */
- gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
+ gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntpnstime, NULL,
NULL, NULL);
+ /* convert ntptime to nanoseconds */
+ ntpnstime = gst_util_uint64_scale (ntpnstime, GST_SECOND,
+ (G_GINT64_CONSTANT (1) << 32));
+
GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
/* ignore SR that is not ours */
if (ssrc != stream->ssrc)
GST_RTP_BIN_LOCK (bin);
/* associate the stream to CNAME */
gst_rtp_bin_associate (bin, stream, len, data,
- ntptime, extrtptime, base_rtptime, base_time, clock_rate,
+ ntpnstime, extrtptime, base_rtptime, base_time, clock_rate,
clock_base);
GST_RTP_BIN_UNLOCK (bin);
}
#include <stdio.h>
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
#include <gst/net/net.h>
#include "gstrtpjitterbuffer.h"
/* Reference for GstReferenceTimestampMeta */
GstCaps *rfc7273_ref;
+ /* RTP header extension ID for RFC6051 64-bit NTP timestamps */
+ guint8 ntp64_ext_id;
+
+ /* Known CNAME / SSRC mappings */
+ GList *cname_ssrc_mappings;
+
/* the last seqnum we pushed out */
guint32 last_popped_seqnum;
/* the next expected seqnum we push */
/* clock rate and rtp timestamp offset */
gint last_pt;
+ guint32 last_ssrc;
gint32 clock_rate;
gint64 clock_base;
gint64 ts_offset_remainder;
REASON_DROP_ON_LATENCY
} DropMessageReason;
+typedef struct
+{
+ gchar *cname;
+ guint32 ssrc;
+} CNameSSRCMapping;
+
+static void
+cname_ssrc_mapping_free (CNameSSRCMapping * mapping)
+{
+ g_free (mapping->cname);
+ g_free (mapping);
+}
+
+static void
+insert_cname_ssrc_mapping (GstRtpJitterBuffer * jbuf, const gchar * cname,
+ guint32 ssrc)
+{
+ CNameSSRCMapping *map;
+ GList *l;
+
+ GST_DEBUG_OBJECT (jbuf, "Adding SSRC %08x to CNAME %s", ssrc, cname);
+
+ for (l = jbuf->priv->cname_ssrc_mappings; l; l = l->next) {
+ map = l->data;
+
+ if (map->ssrc == ssrc) {
+ if (strcmp (cname, map->cname) != 0) {
+ g_free (map->cname);
+ map->cname = g_strdup (cname);
+ }
+ return;
+ }
+ }
+
+ map = g_new0 (CNameSSRCMapping, 1);
+ map->cname = g_strdup (cname);
+ map->ssrc = ssrc;
+ jbuf->priv->cname_ssrc_mappings =
+ g_list_prepend (jbuf->priv->cname_ssrc_mappings, map);
+}
+
static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
gboolean active, guint64 base_time);
static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
+static void do_handle_sync_inband (GstRtpJitterBuffer * jitterbuffer,
+ guint64 ntpnstime);
static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
return caps;
}
+static void
+_get_cname_ssrc_mappings (GstRtpJitterBuffer * jitterbuffer,
+ const GstStructure * s)
+{
+ guint i;
+ guint n_fields = gst_structure_n_fields (s);
+
+ for (i = 0; i < n_fields; i++) {
+ const gchar *field_name = gst_structure_nth_field_name (s, i);
+ if (g_str_has_prefix (field_name, "ssrc-")
+ && g_str_has_suffix (field_name, "-cname")) {
+ const gchar *str = gst_structure_get_string (s, field_name);
+ gchar *endptr;
+ guint32 ssrc = g_ascii_strtoll (field_name + 5, &endptr, 10);
+
+ if (!endptr || *endptr != '-')
+ continue;
+
+ insert_cname_ssrc_mapping (jitterbuffer, str, ssrc);
+ }
+ }
+}
+
+static guint8
+_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
+{
+ guint i;
+ guint8 extmap_id = 0;
+ guint n_fields = gst_structure_n_fields (s);
+
+ for (i = 0; i < n_fields; i++) {
+ const gchar *field_name = gst_structure_nth_field_name (s, i);
+ if (g_str_has_prefix (field_name, "extmap-")) {
+ const gchar *str = gst_structure_get_string (s, field_name);
+ if (str && g_strcmp0 (str, ext_name) == 0) {
+ gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
+ if (id > 0 && id < 15) {
+ extmap_id = id;
+ break;
+ }
+ }
+ }
+ }
+ return extmap_id;
+}
+
/*
* Must be called with JBUF_LOCK held
*/
}
gst_caps_take (&priv->rfc7273_ref, ts_meta_ref);
+
+ _get_cname_ssrc_mappings (jitterbuffer, caps_struct);
+ priv->ntp64_ext_id =
+ _get_extmap_id_for_attribute (caps_struct,
+ GST_RTP_HDREXT_BASE GST_RTP_HDREXT_NTP_64);
+
return TRUE;
/* ERRORS */
priv->packet_spacing = 0;
priv->next_in_seqnum = -1;
priv->clock_rate = -1;
+ priv->ntp64_ext_id = 0;
priv->last_pt = -1;
+ priv->last_ssrc = -1;
priv->eos = FALSE;
priv->estimated_eos = -1;
priv->last_elapsed = 0;
priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
priv->num_too_late = 0;
priv->num_drop_on_latency = 0;
+ g_list_free_full (priv->cname_ssrc_mappings,
+ (GDestroyNotify) cname_ssrc_mapping_free);
+ priv->cname_ssrc_mappings = NULL;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
priv->clock_base = -1;
priv->peer_latency = 0;
priv->last_pt = -1;
+ priv->last_ssrc = -1;
+ priv->ntp64_ext_id = 0;
+ g_list_free_full (priv->cname_ssrc_mappings,
+ (GDestroyNotify) cname_ssrc_mapping_free);
+ priv->cname_ssrc_mappings = NULL;
/* block until we go to PLAYING */
priv->blocked = TRUE;
priv->timer_running = TRUE;
return FALSE;
}
+static GstClockTime
+_get_inband_ntp_time (GstRtpJitterBuffer * jitterbuffer, GstRTPBuffer * rtp)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ guint8 *data;
+ guint size;
+ guint64 ntptime;
+ GstClockTime ntpnstime;
+
+ if (priv->ntp64_ext_id == 0)
+ return GST_CLOCK_TIME_NONE;
+
+ if (!gst_rtp_buffer_get_extension_onebyte_header (rtp, priv->ntp64_ext_id, 0,
+ (gpointer *) & data, &size)
+ && !gst_rtp_buffer_get_extension_twobytes_header (rtp, NULL,
+ priv->ntp64_ext_id, 0, (gpointer *) & data, &size))
+ return GST_CLOCK_TIME_NONE;
+
+ if (size != 8)
+ return GST_CLOCK_TIME_NONE;
+
+ ntptime = GST_READ_UINT64_BE (data);
+ ntpnstime =
+ gst_util_uint64_scale (ntptime, GST_SECOND, G_GUINT64_CONSTANT (1) << 32);
+
+ return ntpnstime;
+}
+
static GstFlowReturn
gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
GstClockTime now;
GstClockTime dts, pts;
GstClockTime ntp_time;
+ GstClockTime inband_ntp_time;
guint64 latency_ts;
gboolean head;
gboolean duplicate;
gint percent = -1;
guint8 pt;
+ guint32 ssrc;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
gboolean do_next_seqnum = FALSE;
GstMessage *msg = NULL;
pt = gst_rtp_buffer_get_payload_type (&rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp);
rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ inband_ntp_time = _get_inband_ntp_time (jitterbuffer, &rtp);
+ ssrc = gst_rtp_buffer_get_ssrc (&rtp);
gst_rtp_buffer_unmap (&rtp);
is_rtx = GST_BUFFER_IS_RETRANSMISSION (buffer);
}
GST_DEBUG_OBJECT (jitterbuffer,
- "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
- seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer), is_rtx);
+ "Received packet #%d at time %" GST_TIME_FORMAT
+ ", discont %d, rtx %d, inband NTP time %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer), is_rtx,
+ GST_TIME_ARGS (inband_ntp_time));
JBUF_LOCK_CHECK (priv, out_flushing);
gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
}
+ if (G_UNLIKELY (priv->last_ssrc != ssrc)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "SSRC changed from %u to %u",
+ priv->last_ssrc, ssrc);
+ priv->last_ssrc = ssrc;
+ }
+
/* don't accept more data on EOS */
if (G_UNLIKELY (priv->eos))
goto have_eos;
if (priv->last_sr)
do_handle_sync (jitterbuffer);
+ if (inband_ntp_time != GST_CLOCK_TIME_NONE)
+ do_handle_sync_inband (jitterbuffer, inband_ntp_time);
+
if (G_UNLIKELY (head)) {
/* signal addition of new buffer when the _loop is waiting. */
if (G_LIKELY (priv->active))
}
}
+static void
+do_handle_sync_inband (GstRtpJitterBuffer * jitterbuffer, guint64 ntpnstime)
+{
+ GstRtpJitterBufferPrivate *priv;
+ GstStructure *s;
+ guint64 base_rtptime, base_time;
+ guint32 clock_rate;
+ guint64 last_rtptime;
+ const gchar *cname = NULL;
+ GList *l;
+
+ priv = jitterbuffer->priv;
+
+ /* get the last values from the jitterbuffer */
+ rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
+ &clock_rate, &last_rtptime);
+
+ for (l = priv->cname_ssrc_mappings; l; l = l->next) {
+ const CNameSSRCMapping *map = l->data;
+
+ if (map->ssrc == priv->last_ssrc) {
+ cname = map->cname;
+ break;
+ }
+ }
+
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "inband NTP-64 %" GST_TIME_FORMAT " rtptime %" G_GUINT64_FORMAT ", base %"
+ G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT ", clock-base %"
+ G_GUINT64_FORMAT ", CNAME %s", GST_TIME_ARGS (ntpnstime), last_rtptime,
+ base_rtptime, clock_rate, priv->clock_base, GST_STR_NULL (cname));
+
+ /* no CNAME known yet for this ssrc */
+ if (cname == NULL) {
+ GST_DEBUG_OBJECT (jitterbuffer, "no CNAME for this packet known yet");
+ return;
+ }
+
+ s = gst_structure_new ("application/x-rtp-sync",
+ "base-rtptime", G_TYPE_UINT64, base_rtptime,
+ "base-time", G_TYPE_UINT64, base_time,
+ "clock-rate", G_TYPE_UINT, clock_rate,
+ "clock-base", G_TYPE_UINT64, priv->clock_base,
+ "cname", G_TYPE_STRING, cname,
+ "ssrc", G_TYPE_UINT, priv->last_ssrc,
+ "inband-ext-rtptime", G_TYPE_UINT64, last_rtptime,
+ "inband-ntpnstime", G_TYPE_UINT64, ntpnstime, NULL);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
+ JBUF_UNLOCK (priv);
+ g_signal_emit (jitterbuffer,
+ gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
+ JBUF_LOCK (priv);
+ gst_structure_free (s);
+}
+
/* collect the info from the latest RTCP packet and the jitterbuffer sync, do
* some sanity checks and then emit the handle-sync signal with the parameters.
* This function must be called with the LOCK */
}
}
+#define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
+ for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
+ (b) = gst_rtcp_packet_move_to_next ((packet)))
+
+#define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
+ for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
+ (b) = gst_rtcp_packet_sdes_next_item ((packet)))
+
+#define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
+ for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
+ (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
+
static GstFlowReturn
gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
guint64 ext_rtptime;
guint32 rtptime;
GstRTCPBuffer rtcp = { NULL, };
+ gchar *cname = NULL;
+ gboolean have_sr, have_sdes;
+ gboolean more;
jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
- if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
- goto empty_buffer;
-
- /* first packet must be SR or RR or else the validate would have failed */
- switch (gst_rtcp_packet_get_type (&packet)) {
- case GST_RTCP_TYPE_SR:
- gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
- NULL, NULL);
- break;
- default:
- goto ignore_buffer;
+ GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
+ /* first packet must be SR or RR or else the validate would have failed */
+ switch (gst_rtcp_packet_get_type (&packet)) {
+ case GST_RTCP_TYPE_SR:
+ gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
+ NULL, NULL);
+ have_sr = TRUE;
+ break;
+ case GST_RTCP_TYPE_SDES:
+ {
+ gboolean more_items, more_entries;
+
+ /* only deal with first SDES, there is only supposed to be one SDES in
+ * the RTCP packet but we deal with bad packets gracefully. Also bail
+ * out if we have not seen an SR item yet. */
+ if (have_sdes || !have_sr)
+ break;
+
+ GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
+ /* skip items that are not about the SSRC of the sender */
+ if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
+ continue;
+
+ /* find the CNAME entry */
+ GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
+ GstRTCPSDESType type;
+ guint8 len;
+ guint8 *data;
+
+ gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
+
+ if (type == GST_RTCP_SDES_CNAME) {
+ cname = g_strndup ((const gchar *) data, len);
+ }
+ }
+ }
+ have_sdes = TRUE;
+ break;
+ }
+ default:
+ goto ignore_buffer;
+ }
}
gst_rtcp_buffer_unmap (&rtcp);
- GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
+ GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x from CNAME %s",
+ ssrc, GST_STR_NULL (cname));
+
+ if (!have_sr)
+ goto empty_buffer;
JBUF_LOCK (priv);
+ if (cname)
+ insert_cname_ssrc_mapping (jitterbuffer, cname, ssrc);
+
/* convert the RTP timestamp to our extended timestamp, using the same offset
* we used in the jitterbuffer */
ext_rtptime = priv->jbuf->ext_rtptime;
gst_buffer_replace (&priv->last_sr, buffer);
do_handle_sync (jitterbuffer);
+
JBUF_UNLOCK (priv);
done:
+ g_free (cname);
gst_buffer_unref (buffer);
return ret;