From 592c3f222f0947596b2a81bcbe1cba448925bae8 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 19 Nov 2008 09:06:29 +0000 Subject: [PATCH] gst/rtpmanager/gstrtpbin.c: Remove internal sync pad, use signals instead to get lip-sync notifications. Original commit message from CVS: * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_associate), (gst_rtp_bin_handle_sync), (create_stream), (free_stream), (new_ssrc_pad_found): Remove internal sync pad, use signals instead to get lip-sync notifications. * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_base_init), (gst_rtp_jitter_buffer_class_init), (gst_rtp_jitter_buffer_internal_links), (create_rtcp_sink), (remove_rtcp_sink), (gst_rtp_jitter_buffer_request_new_pad), (gst_rtp_jitter_buffer_release_pad), (gst_rtp_jitter_buffer_sink_rtcp_event), (gst_rtp_jitter_buffer_chain_rtcp), (gst_rtp_jitter_buffer_get_property): * gst/rtpmanager/gstrtpjitterbuffer.h: Make it possible to send SR packets to the jitterbuffer. Check if the SR timestamps are valid by comparing them to the RTP timestamps. Signal the SR packet and the timing information to listeners. * gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_rtcp_chain), (gst_rtp_ssrc_demux_src_query): Remove some unused code. * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew), (calculate_skew), (rtp_jitter_buffer_get_sync): * gst/rtpmanager/rtpjitterbuffer.h: Keep track of the last seen RTP timestamp so that we can filter out invalid SR packets. --- gst/rtpmanager/gstrtpbin.c | 131 +++-------- gst/rtpmanager/gstrtpjitterbuffer.c | 343 ++++++++++++++++++++++++++-- gst/rtpmanager/gstrtpjitterbuffer.h | 7 +- gst/rtpmanager/gstrtpssrcdemux.c | 9 +- gst/rtpmanager/rtpjitterbuffer.c | 27 ++- gst/rtpmanager/rtpjitterbuffer.h | 4 +- 6 files changed, 382 insertions(+), 139 deletions(-) diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 197be5283..11d14d516 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -176,14 +176,6 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", GST_STATIC_CAPS ("application/x-rtp") ); -/* padtemplate for the internal pad */ -static GstStaticPadTemplate rtpbin_sync_sink_template = -GST_STATIC_PAD_TEMPLATE ("sink_%d", - GST_PAD_SINK, - GST_PAD_SOMETIMES, - GST_STATIC_CAPS ("application/x-rtcp") - ); - #define GST_RTP_BIN_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate)) @@ -310,8 +302,7 @@ struct _GstRtpBinStream gulong demux_ptreq_sig; gulong demux_pt_change_sig; - /* the internal pad we use to get RTCP sync messages */ - GstPad *sync_pad; + /* data for the RTCP sync signal */ gboolean have_sync; guint64 last_unix; guint64 last_extrtptime; @@ -818,7 +809,8 @@ free_client (GstRtpBinClient * client) } /* associate a stream to the given CNAME. This will make sure all streams for - * that CNAME are synchronized together. */ + * that CNAME are synchronized together. + * Must be called with GST_RTP_BIN_LOCK */ static void gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, guint8 * data) @@ -828,7 +820,6 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, GSList *walk; /* first find or create the CNAME */ - GST_RTP_BIN_LOCK (bin); client = get_client (bin, len, data, &created); /* find stream in the client */ @@ -851,13 +842,6 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, stream->ssrc, client, client->cname); } - /* we can only continue if we know the local clock-base and clock-rate */ - if (stream->clock_base == -1) - goto no_clock_base; - - if (stream->clock_rate <= 0) - goto no_clock_rate; - /* take the extended rtptime we found in the SR packet and map it to the * local rtptime. The local rtp time is used to construct timestamps on the * buffers. */ @@ -897,7 +881,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, * offsets to streams, delaying their playback instead of trying to speed up * other streams (which might be imposible when we have to create negative * latencies). - * The stream that has the smalest diff is selected as the reference stream, + * The stream that has the smallest diff is selected as the reference stream, * all other streams will have a positive offset to this difference. */ min = G_MAXINT64; for (walk = client->streams; walk; walk = g_slist_next (walk)) { @@ -955,22 +939,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, ostream->ssrc, ostream->ts_offset); } } - GST_RTP_BIN_UNLOCK (bin); - return; - -no_clock_base: - { - GST_WARNING_OBJECT (bin, "we have no clock-base"); - GST_RTP_BIN_UNLOCK (bin); - return; - } -no_clock_rate: - { - GST_WARNING_OBJECT (bin, "we have no clock-rate"); - GST_RTP_BIN_UNLOCK (bin); - return; - } } #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \ @@ -985,44 +954,37 @@ no_clock_rate: for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \ (b) = gst_rtcp_packet_sdes_next_entry ((packet))) -static GstFlowReturn -gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) +static void +gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s, + GstRtpBinStream * stream) { - GstFlowReturn ret = GST_FLOW_OK; - GstRtpBinStream *stream; GstRtpBin *bin; GstRTCPPacket packet; guint32 ssrc; guint64 ntptime; - guint32 rtptime; gboolean have_sr, have_sdes; gboolean more; guint64 clock_base; guint64 clock_base_time; guint clock_rate; + guint64 extrtptime; + GstBuffer *buffer; - stream = gst_pad_get_element_private (pad); bin = stream->bin; - GST_DEBUG_OBJECT (bin, "received sync packet"); - - if (!gst_rtcp_buffer_validate (buffer)) - goto invalid_rtcp; + GST_DEBUG_OBJECT (bin, "sync handler called"); /* get the last relation between the rtp timestamps and the gstreamer * timestamps. We get this info directly from the jitterbuffer which * constructs gstreamer timestamps from rtp timestamps and so it know exactly * what the current situation is. */ - gst_rtp_jitter_buffer_get_sync (GST_RTP_JITTER_BUFFER (stream->buffer), - &clock_base, &clock_base_time, &clock_rate); - - /* clock base changes when there is a huge gap in the timestamps or seqnum. - * When this happens we don't want to calculate the extended timestamp based - * on the previous one but reset the calculation. */ - if (stream->last_clock_base != clock_base) { - stream->last_extrtptime = -1; - stream->last_clock_base = clock_base; - } + clock_base = g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime")); + clock_base_time = + g_value_get_uint64 (gst_structure_get_value (s, "base-time")); + clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate")); + extrtptime = + g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime")); + buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer")); have_sr = FALSE; have_sdes = FALSE; @@ -1035,7 +997,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) if (have_sr) break; /* get NTP and RTP times */ - gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime, + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL, NULL, NULL); GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc); @@ -1044,12 +1006,6 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) continue; have_sr = TRUE; - - /* store values in the stream */ - stream->have_sync = TRUE; - stream->last_unix = gst_rtcp_ntp_to_unix (ntptime); - /* use extended timestamp */ - gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime); break; case GST_RTCP_TYPE_SDES: { @@ -1075,11 +1031,17 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data); if (type == GST_RTCP_SDES_CNAME) { + GST_RTP_BIN_LOCK (bin); + /* store values in the stream */ + stream->have_sync = TRUE; + stream->last_unix = gst_rtcp_ntp_to_unix (ntptime); + stream->last_extrtptime = extrtptime; stream->clock_base = clock_base; stream->clock_base_time = clock_base_time; stream->clock_rate = clock_rate; /* associate the stream to CNAME */ gst_rtp_bin_associate (bin, stream, len, data); + GST_RTP_BIN_UNLOCK (bin); } } } @@ -1091,20 +1053,6 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) break; } } - - gst_buffer_unref (buffer); - - return ret; - - /* ERRORS */ -invalid_rtcp: - { - /* this is fatal and should be filtered earlier */ - GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL), - ("invalid RTCP packet received")); - gst_buffer_unref (buffer); - return GST_FLOW_ERROR; - } } /* create a new stream with @ssrc in @session. Must be called with @@ -1114,8 +1062,6 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) { GstElement *buffer, *demux; GstRtpBinStream *stream; - GstPadTemplate *templ; - gchar *padname; if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL))) goto no_jitterbuffer; @@ -1134,19 +1080,6 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->have_sync = FALSE; session->streams = g_slist_prepend (session->streams, stream); - /* make an internal sinkpad for RTCP sync packets. Take ownership of the - * pad. We will link this pad later. */ - padname = g_strdup_printf ("sync_%d", ssrc); - templ = gst_static_pad_template_get (&rtpbin_sync_sink_template); - stream->sync_pad = gst_pad_new_from_template (templ, padname); - gst_object_unref (templ); - g_free (padname); - gst_object_ref (stream->sync_pad); - gst_object_sink (stream->sync_pad); - gst_pad_set_element_private (stream->sync_pad, stream); - gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain); - gst_pad_set_active (stream->sync_pad, TRUE); - /* provide clock_rate to the jitterbuffer when needed */ g_signal_connect (buffer, "request-pt-map", (GCallback) pt_map_requested, session); @@ -1192,8 +1125,6 @@ free_stream (GstRtpBinStream * stream) gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer); gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux); - gst_object_unref (stream->sync_pad); - session->streams = g_slist_remove (session->streams, stream); g_free (stream); @@ -1985,7 +1916,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, } /* get pad and link */ - GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer"); + GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP"); padname = g_strdup_printf ("src_%d", ssrc); srcpad = gst_element_get_static_pad (element, padname); g_free (padname); @@ -1994,14 +1925,20 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, gst_object_unref (sinkpad); gst_object_unref (srcpad); - /* get the RTCP sync pad */ - GST_DEBUG_OBJECT (rtpbin, "linking sync pad"); + GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP"); padname = g_strdup_printf ("rtcp_src_%d", ssrc); srcpad = gst_element_get_static_pad (element, padname); g_free (padname); - gst_pad_link (srcpad, stream->sync_pad); + sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp"); + gst_pad_link (srcpad, sinkpad); + gst_object_unref (sinkpad); gst_object_unref (srcpad); + /* connect to the RTCP sync signal from the jitterbuffer */ + GST_DEBUG_OBJECT (rtpbin, "connecting sync signal"); + g_signal_connect (stream->buffer, + "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream); + /* connect to the new-pad signal of the payload demuxer, this will expose the * new pad by ghosting it. */ stream->demux_newpad_sig = g_signal_connect (stream->demux, diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index bd47bde43..e9c2f7949 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -87,6 +87,7 @@ enum { SIGNAL_REQUEST_PT_MAP, SIGNAL_CLEAR_PT_MAP, + SIGNAL_HANDLE_SYNC, LAST_SIGNAL }; @@ -127,6 +128,7 @@ enum struct _GstRtpJitterBufferPrivate { GstPad *sinkpad, *srcpad; + GstPad *rtcpsinkpad; RTPJitterBuffer *jbuf; GMutex *jbuf_lock; @@ -190,6 +192,13 @@ GST_STATIC_PAD_TEMPLATE ("sink", */ ) ); +static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template = +GST_STATIC_PAD_TEMPLATE ("sink_rtcp", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtcp") + ); + static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, @@ -216,20 +225,30 @@ static void gst_rtp_jitter_buffer_finalize (GObject * object); /* element overrides */ static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement * element, GstStateChange transition); +static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name); +static void gst_rtp_jitter_buffer_release_pad (GstElement * element, + GstPad * pad); /* pad overrides */ static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad); +static GList *gst_rtp_jitter_buffer_internal_links (GstPad * pad); /* sinkpad overrides */ static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps); -static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad, - GstEvent * event); static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event); static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer); +static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, + GstEvent * event); +static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, + GstBuffer * buffer); + /* srcpad overrides */ +static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad, + GstEvent * event); static gboolean gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active); static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer); @@ -247,6 +266,9 @@ gst_rtp_jitter_buffer_base_init (gpointer klass) gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template)); + gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details); } @@ -320,6 +342,19 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass, request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1, G_TYPE_UINT); + /** + * GstRtpJitterBuffer::handle-sync: + * @buffer: the object which received the signal + * @struct: a GstStructure containing sync values. + * + * Be notified of new sync values. + */ + gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] = + g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass, + handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED, + G_TYPE_NONE, 1, GST_TYPE_STRUCTURE); + /** * GstRtpJitterBuffer::clear-pt-map: * @buffer: the object which received the signal @@ -329,11 +364,16 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) */ gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] = g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass, - clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, - G_TYPE_NONE, 0, G_TYPE_NONE); + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); - gstelement_class->change_state = gst_rtp_jitter_buffer_change_state; + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad); klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map); @@ -403,6 +443,139 @@ gst_rtp_jitter_buffer_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } +static GList * +gst_rtp_jitter_buffer_internal_links (GstPad * pad) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + GList *res = NULL; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + if (pad == priv->sinkpad) { + res = g_list_prepend (res, priv->srcpad); + } else if (pad == priv->srcpad) { + res = g_list_prepend (res, priv->sinkpad); + } else if (pad == priv->rtcpsinkpad) { + res = NULL; + } + + gst_object_unref (jitterbuffer); + + return res; +} + +static GstPad * +create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad"); + + priv->rtcpsinkpad = + gst_pad_new_from_static_template + (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp"); + gst_pad_set_chain_function (priv->rtcpsinkpad, + gst_rtp_jitter_buffer_chain_rtcp); + gst_pad_set_event_function (priv->rtcpsinkpad, + (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event); + gst_pad_set_internal_link_function (priv->rtcpsinkpad, + gst_rtp_jitter_buffer_internal_links); + gst_pad_set_active (priv->rtcpsinkpad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad); + + return priv->rtcpsinkpad; +} + +static void +remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad"); + + gst_pad_set_active (priv->rtcpsinkpad, FALSE); + + gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad); + priv->rtcpsinkpad = NULL; +} + +static GstPad * +gst_rtp_jitter_buffer_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name) +{ + GstRtpJitterBuffer *jitterbuffer; + GstElementClass *klass; + GstPad *result; + GstRtpJitterBufferPrivate *priv; + + g_return_val_if_fail (templ != NULL, NULL); + g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL); + + jitterbuffer = GST_RTP_JITTER_BUFFER (element); + priv = jitterbuffer->priv; + klass = GST_ELEMENT_GET_CLASS (element); + + GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name)); + + /* figure out the template */ + if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) { + if (priv->rtcpsinkpad != NULL) + goto exists; + + result = create_rtcp_sink (jitterbuffer); + } else + goto wrong_template; + + return result; + + /* ERRORS */ +wrong_template: + { + g_warning ("gstrtpjitterbuffer: this is not our template"); + return NULL; + } +exists: + { + g_warning ("gstrtpjitterbuffer: pad already requested"); + return NULL; + } +} + +static void +gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element)); + g_return_if_fail (GST_IS_PAD (pad)); + + jitterbuffer = GST_RTP_JITTER_BUFFER (element); + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + if (priv->rtcpsinkpad == pad) { + remove_rtcp_sink (jitterbuffer); + } else + goto wrong_pad; + + return; + + /* ERRORS */ +wrong_pad: + { + g_warning ("gstjitterbuffer: asked to release an unknown pad"); + return; + } +} + static void gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer) { @@ -786,6 +959,31 @@ newseg_wrong_format: } } +static gboolean +gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstEvent * event) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + break; + case GST_EVENT_FLUSH_STOP: + break; + default: + break; + } + gst_event_unref (event); + gst_object_unref (jitterbuffer); + + return TRUE; +} + static gboolean gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer, guint8 pt) @@ -1335,6 +1533,124 @@ pause: } } +static GstFlowReturn +gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstBuffer * buffer) +{ + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + GstFlowReturn ret; + guint64 base_rtptime, timestamp; + guint32 clock_rate; + guint64 last_rtptime; + guint32 ssrc; + GstRTCPPacket packet; + guint64 ext_rtptime, diff; + guint32 rtptime; + gboolean drop = FALSE; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + + if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer))) + goto invalid_buffer; + + priv = jitterbuffer->priv; + + if (!gst_rtcp_buffer_get_first_packet (buffer, &packet)) + goto invalid_buffer; + + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime, + NULL, NULL); + break; + default: + goto ignore_buffer; + } + + GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc); + + JBUF_LOCK (priv); + /* convert the RTP timestamp to our extended timestamp, using the same offset + * we used in the jitterbuffer */ + ext_rtptime = priv->jbuf->ext_rtptime; + ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime); + + /* get the last values from the jitterbuffer */ + rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, ×tamp, + &clock_rate, &last_rtptime); + + GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %" + G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT, + ext_rtptime, base_rtptime, clock_rate); + + if (base_rtptime == -1 || clock_rate == -1 || timestamp == -1) { + GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values"); + drop = TRUE; + } else { + /* we can't accept anything that happened before we did the last resync */ + if (base_rtptime > ext_rtptime) { + GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time"); + drop = TRUE; + } else { + /* the SR RTP timestamp must be something close to what we last observed + * in the jitterbuffer */ + if (ext_rtptime > last_rtptime) { + /* check how far ahead it is to our RTP timestamps */ + diff = ext_rtptime - last_rtptime; + /* if bigger than 1 second, we drop it */ + if (diff > clock_rate) { + GST_DEBUG_OBJECT (jitterbuffer, "dropping, too far ahead"); + drop = TRUE; + } + GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %" + G_GUINT64_FORMAT, last_rtptime, diff); + } + } + } + JBUF_UNLOCK (priv); + + if (!drop) { + GstStructure *s; + + s = gst_structure_new ("application/x-rtp-sync", + "base-rtptime", G_TYPE_UINT64, base_rtptime, + "base-time", G_TYPE_UINT64, timestamp, + "clock-rate", G_TYPE_UINT, clock_rate, + "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime, + "sr-buffer", GST_TYPE_BUFFER, buffer, NULL); + + GST_DEBUG_OBJECT (jitterbuffer, "signaling sync"); + g_signal_emit (jitterbuffer, + gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s); + gst_structure_free (s); + } else { + GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet"); + ret = GST_FLOW_OK; + } + +done: + gst_buffer_unref (buffer); + gst_object_unref (jitterbuffer); + + return ret; + +invalid_buffer: + { + /* this is not fatal but should be filtered earlier */ + GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL), + ("Received invalid RTCP payload, dropping")); + ret = GST_FLOW_OK; + goto done; + } +ignore_buffer: + { + GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet"); + ret = GST_FLOW_OK; + goto done; + } +} + static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query) { @@ -1485,18 +1801,3 @@ gst_rtp_jitter_buffer_get_property (GObject * object, break; } } - -void -gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime, - guint64 * timestamp, guint32 * clock_rate) -{ - GstRtpJitterBufferPrivate *priv; - - g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer)); - - priv = buffer->priv; - - JBUF_LOCK (priv); - rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp, clock_rate); - JBUF_UNLOCK (priv); -} diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h index 40908eabf..45e689793 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.h +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -71,6 +71,9 @@ struct _GstRtpJitterBufferClass /* signals */ GstCaps* (*request_pt_map) (GstRtpJitterBuffer *buffer, guint pt); + void (*handle_sync) (GstRtpJitterBuffer *buffer, GstStructure *s); + + /* actions */ void (*clear_pt_map) (GstRtpJitterBuffer *buffer); /*< private > */ @@ -79,10 +82,6 @@ struct _GstRtpJitterBufferClass GType gst_rtp_jitter_buffer_get_type (void); -void gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer *buffer, - guint64 *rtptime, guint64 *timestamp, - guint32 *clock_rate); - G_END_DECLS #endif /* __GST_RTP_JITTER_BUFFER_H__ */ diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index e5b989f7f..64394c45d 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -137,7 +137,6 @@ struct _GstRtpSsrcDemuxPad GstPad *rtp_pad; GstCaps *caps; GstPad *rtcp_pad; - GstClockTime first_ts; }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found @@ -190,7 +189,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, demuxpad->ssrc = ssrc; demuxpad->rtp_pad = rtp_pad; demuxpad->rtcp_pad = rtcp_pad; - demuxpad->first_ts = timestamp; GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp)); @@ -484,9 +482,6 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL, NULL); break; - case GST_RTCP_TYPE_RR: - ssrc = gst_rtcp_packet_rr_get_ssrc (&packet); - break; default: goto invalid_rtcp; } @@ -599,9 +594,7 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query) GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT, GST_TIME_ARGS (min_latency)); - GST_DEBUG_OBJECT (demux, - "latency for SSRC %08x, latency %" GST_TIME_FORMAT, demuxpad->ssrc, - GST_TIME_ARGS (demuxpad->first_ts)); + GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc); gst_query_set_latency (query, live, min_latency, max_latency); } diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index 6cfbab619..d027dc2e9 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -107,6 +107,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf) jbuf->base_extrtp = -1; jbuf->clock_rate = -1; jbuf->ext_rtptime = -1; + jbuf->last_rtptime = -1; jbuf->window_pos = 0; jbuf->window_filling = TRUE; jbuf->window_min = 0; @@ -188,11 +189,15 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, gstrtptime = gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, clock_rate); - if (jbuf->clock_rate != -1 && jbuf->clock_rate != clock_rate) { - GST_DEBUG ("Clock rate changed from %" G_GUINT32_FORMAT " to %" + /* keep track of the last extended rtptime */ + jbuf->last_rtptime = ext_rtptime; + + if (jbuf->clock_rate != clock_rate) { + GST_WARNING ("Clock rate changed from %" G_GUINT32_FORMAT " to %" G_GUINT32_FORMAT, jbuf->clock_rate, clock_rate); jbuf->base_time = -1; jbuf->base_rtptime = -1; + jbuf->clock_rate = clock_rate; } /* first time, lock on to time and gstrtptime */ @@ -202,7 +207,6 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, } if (G_UNLIKELY (jbuf->base_rtptime == -1)) { jbuf->base_rtptime = gstrtptime; - jbuf->clock_rate = clock_rate; jbuf->base_extrtp = ext_rtptime; GST_DEBUG ("Taking new base rtptime %" GST_TIME_FORMAT, GST_TIME_ARGS (gstrtptime)); @@ -213,10 +217,9 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, else { /* elapsed time at sender, timestamps can go backwards and thus be smaller * than our base time, take a new base time in that case. */ - GST_DEBUG ("backward timestamps at server, taking new base time"); + GST_WARNING ("backward timestamps at server, taking new base time"); jbuf->base_time = time; jbuf->base_rtptime = gstrtptime; - jbuf->clock_rate = clock_rate; jbuf->base_extrtp = ext_rtptime; send_diff = 0; } @@ -245,12 +248,11 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time, * changed too quickly we have to resync because the server likely restarted * its timestamps. */ if (ABS (delta - jbuf->skew) > GST_SECOND) { - GST_DEBUG ("delta %" GST_TIME_FORMAT " too big, reset skew", + GST_WARNING ("delta %" GST_TIME_FORMAT " too big, reset skew", GST_TIME_ARGS (delta - jbuf->skew)); jbuf->base_time = time; jbuf->base_rtptime = gstrtptime; jbuf->base_extrtp = ext_rtptime; - jbuf->clock_rate = clock_rate; send_diff = 0; delta = 0; } @@ -536,13 +538,20 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf) * @rtptime: result RTP time * @timestamp: result GStreamer timestamp * @clock_rate: clock-rate of @rtptime + * @last_rtptime: last seen rtptime. * * Returns the relation between the RTP timestamp and the GStreamer timestamp * used for constructing timestamps. + * + * For extended RTP timestamp @rtptime with a clock-rate of @clock_rate, + * the GStreamer timestamp is currently @timestamp. + * + * The last seen extended RTP timestamp with clock-rate @clock-rate is returned in + * @last_rtptime. */ void rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime, - guint64 * timestamp, guint32 * clock_rate) + guint64 * timestamp, guint32 * clock_rate, guint64 * last_rtptime) { if (rtptime) *rtptime = jbuf->base_extrtp; @@ -550,4 +559,6 @@ rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime, *timestamp = jbuf->base_time + jbuf->skew; if (clock_rate) *clock_rate = jbuf->clock_rate; + if (last_rtptime) + *last_rtptime = jbuf->last_rtptime; } diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index 325f8f7ba..aa0091904 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -59,6 +59,7 @@ struct _RTPJitterBuffer { guint32 clock_rate; GstClockTime base_extrtp; guint64 ext_rtptime; + guint64 last_rtptime; gint64 window[RTP_JITTER_BUFFER_MAX_WINDOW]; guint window_pos; guint window_size; @@ -92,7 +93,8 @@ guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf) guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf); void rtp_jitter_buffer_get_sync (RTPJitterBuffer *jbuf, guint64 *rtptime, - guint64 *timestamp, guint32 *clock_rate); + guint64 *timestamp, guint32 *clock_rate, + guint64 *last_rtptime); #endif /* __RTP_JITTER_BUFFER_H__ */ -- 2.34.1