From 6bb53e75fb36637131bec823d950ca690e666b9e Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Mon, 25 Mar 2019 13:42:25 -0400 Subject: [PATCH] rtpsession: Send as many nack seqnum as possible In order to do that, we now split the nacks registration from the actual FB nack packet construction. We then try and add as many FB Nacks as possible into the active packets and leave the remaining seqnums in the RTPSource. In order to avoid sending outdated NACK later on, we save the seqnum calculated deadline and cleanup the outdated seqnums before the next RTCP send. Fixes #583 --- gst/rtpmanager/rtpsession.c | 146 ++++++++++++++++++++++++++++++++++++++------ gst/rtpmanager/rtpsource.c | 102 ++++++++++++++++++++----------- gst/rtpmanager/rtpsource.h | 9 ++- 3 files changed, 202 insertions(+), 55 deletions(-) diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 051140f..d43938b 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -121,6 +121,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id, static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay); +static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess, + GstClockTime deadline); static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; @@ -3581,13 +3583,36 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) { GstRTCPBuffer *rtcp = &data->rtcpbuf; GstRTCPPacket *packet = &data->packet; - guint32 *nacks; - guint n_nacks, i; + guint16 *nacks; + GstClockTime *nack_deadlines; + guint n_nacks, i = 0; + guint nacked_seqnums = 0; + guint16 n_fb_nacks = 0; guint8 *fci_data; if (!source->send_nack) return; + nacks = rtp_source_get_nacks (source, &n_nacks); + nack_deadlines = rtp_source_get_nack_deadlines (source, NULL); + GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks, + GST_TIME_ARGS (data->current_time)); + + /* cleanup expired nacks */ + for (i = 0; i < n_nacks; i++) { + GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i], + GST_TIME_ARGS (nack_deadlines[i])); + if (nack_deadlines[i] >= data->current_time) + break; + } + if (i) { + GST_WARNING ("Removing %u expired NACKS", i); + rtp_source_clear_nacks (source, i); + n_nacks -= i; + if (n_nacks == 0) + return; + } + if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet)) /* exit because the packet is full, will put next request in a * further packet */ @@ -3597,21 +3622,46 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc); gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc); - nacks = rtp_source_get_nacks (source, &n_nacks); - GST_DEBUG ("%u NACKs", n_nacks); - if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks)) + if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) { + gst_rtcp_packet_remove (packet); + GST_WARNING ("no nacks fit in the packet"); return; + } fci_data = gst_rtcp_packet_fb_get_fci (packet); - for (i = 0; i < n_nacks; i++) { - GST_WRITE_UINT32_BE (fci_data, nacks[i]); + for (i = 0; i < n_nacks; i = nacked_seqnums) { + guint16 seqnum = nacks[i]; + guint16 blp = 0; + guint j; + + if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1)) + break; + + n_fb_nacks++; + nacked_seqnums++; + + for (j = i + 1; j < n_nacks; j++) { + gint diff; + + diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]); + GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff); + if (diff > 16) + break; + + blp |= 1 << (diff - 1); + nacked_seqnums++; + } + + GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp); fci_data += 4; - data->nacked_seqnums++; } - rtp_source_clear_nacks (source); + data->nacked_seqnums += nacked_seqnums; + rtp_source_clear_nacks (source, nacked_seqnums); data->may_suppress = FALSE; - source->stats.sent_nack_count += n_nacks; + source->stats.sent_nack_count += n_fb_nacks; + + GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks); } /* perform cleanup of sources that timed out */ @@ -4037,6 +4087,28 @@ update_generation (const gchar * key, RTPSource * source, ReportData * data) } } +static void +schedule_remaining_nacks (const gchar * key, RTPSource * source, + ReportData * data) +{ + RTPSession *sess = data->sess; + GstClockTime *nack_deadlines; + GstClockTime deadline; + guint n_nacks; + + if (!source->send_nack) + return; + + /* the scheduling is entirely based on available bandwidth, just take the + * biggest seqnum, which will have the largest deadline to request early + * RTCP. */ + nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks); + deadline = nack_deadlines[n_nacks - 1]; + RTP_SESSION_UNLOCK (sess); + rtp_session_send_rtcp_with_deadline (sess, deadline); + RTP_SESSION_LOCK (sess); +} + static gboolean rtp_session_are_all_sources_bye (RTPSession * sess) { @@ -4238,6 +4310,12 @@ done: if (all_empty) GST_ERROR ("generated empty RTCP messages for all the sources"); + /* schedule remaining nacks */ + RTP_SESSION_LOCK (sess); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) schedule_remaining_nacks, &data); + RTP_SESSION_UNLOCK (sess); + return result; } @@ -4410,6 +4488,35 @@ end: } static gboolean +rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now, + GstClockTime max_delay) +{ + /* notify the application that we intend to send early RTCP */ + if (sess->callbacks.notify_early_rtcp) + sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data); + + return rtp_session_request_early_rtcp (sess, now, max_delay); +} + +static gboolean +rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline) +{ + GstClockTime now, max_delay; + + if (!sess->callbacks.send_rtcp) + return FALSE; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); + + if (deadline < now) + return FALSE; + + max_delay = deadline - now; + + return rtp_session_send_rtcp_internal (sess, now, max_delay); +} + +static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) { GstClockTime now; @@ -4419,11 +4526,7 @@ rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) now = sess->callbacks.request_time (sess, sess->request_time_user_data); - /* notify the application that we intend to send early RTCP */ - if (sess->callbacks.notify_early_rtcp) - sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data); - - return rtp_session_request_early_rtcp (sess, now, max_delay); + return rtp_session_send_rtcp_internal (sess, now, max_delay); } gboolean @@ -4479,17 +4582,24 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum, GstClockTime max_delay) { RTPSource *source; + GstClockTime now; + + if (!sess->callbacks.send_rtcp) + return FALSE; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); RTP_SESSION_LOCK (sess); source = find_source (sess, ssrc); if (source == NULL) goto no_source; - GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum); - rtp_source_register_nack (source, seqnum); + GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT, + ssrc, seqnum, GST_TIME_ARGS (now + max_delay)); + rtp_source_register_nack (source, seqnum, now + max_delay); RTP_SESSION_UNLOCK (sess); - if (!rtp_session_send_rtcp (sess, max_delay)) { + if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) { GST_DEBUG ("NACK not sent early, sending with next regular RTCP"); } diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 52fe81d..4d90795 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -311,7 +311,8 @@ rtp_source_init (RTPSource * src) src->seqnum_offset = -1; src->retained_feedback = g_queue_new (); - src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32)); + src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16)); + src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime)); src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal); @@ -351,6 +352,7 @@ rtp_source_finalize (GObject * object) g_queue_free (src->retained_feedback); g_array_free (src->nacks, TRUE); + g_array_free (src->nack_deadlines, TRUE); if (src->rtp_from) g_object_unref (src->rtp_from); @@ -1933,47 +1935,46 @@ rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data) * rtp_source_register_nack: * @src: The #RTPSource * @seqnum: a seqnum + * @deadline: the deadline before which RTX is still possible * * Register that @seqnum has not been received from @src. */ void -rtp_source_register_nack (RTPSource * src, guint16 seqnum) +rtp_source_register_nack (RTPSource * src, guint16 seqnum, + GstClockTime deadline) { - guint i, len; - guint32 dword = seqnum << 16; - gint diff = 16; + gint i; + guint len; + gint diff = -1; + guint16 tseq; len = src->nacks->len; - for (i = 0; i < len; i++) { - guint32 tdword; - guint16 tseq; + for (i = len - 1; i >= 0; i--) { + tseq = g_array_index (src->nacks, guint16, i); + diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum); - tdword = g_array_index (src->nacks, guint32, i); - tseq = tdword >> 16; + GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len); - diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum); - if (diff < 16) + if (diff >= 0) break; } - /* we already have this seqnum */ - if (diff == 0) - return; - /* it comes before the recorded seqnum, FIXME, we could merge it - * if not to far away */ - if (diff < 0) { - GST_DEBUG ("insert NACK #%u at %u", seqnum, i); - g_array_insert_val (src->nacks, i, dword); - } else if (diff < 16) { - /* we can merge it */ - dword = g_array_index (src->nacks, guint32, i); - dword |= 1 << (diff - 1); - GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i, - dword >> 16, dword); - g_array_index (src->nacks, guint32, i) = dword; + + if (diff == 0) { + GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_index (src->nack_deadlines, GstClockTime, i) = deadline; + } else if (i == len - 1) { + GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_append_val (src->nacks, seqnum); + g_array_append_val (src->nack_deadlines, deadline); } else { - GST_DEBUG ("append NACK #%u", seqnum); - g_array_append_val (src->nacks, dword); + GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_insert_val (src->nacks, i + 1, seqnum); + g_array_insert_val (src->nack_deadlines, i + 1, deadline); } + src->send_nack = TRUE; } @@ -1986,18 +1987,51 @@ rtp_source_register_nack (RTPSource * src, guint16 seqnum) * * Returns: an array of @n_nacks seqnum values. */ -guint32 * +guint16 * rtp_source_get_nacks (RTPSource * src, guint * n_nacks) { if (n_nacks) *n_nacks = src->nacks->len; - return (guint32 *) src->nacks->data; + return (guint16 *) src->nacks->data; } +/** + * rtp_source_get_nacks: + * @src: The #RTPSource + * @n_nacks: result number of nacks + * + * Get the registered NACKS deadlines. + * + * Returns: an array of @n_nacks deadline values. + */ +GstClockTime * +rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks) +{ + if (n_nacks) + *n_nacks = src->nack_deadlines->len; + + return (GstClockTime *) src->nack_deadlines->data; +} + +/** + * rtp_source_clear_nacks: + * @src: The #RTPSource + * @n_nacks: number of nacks + * + * Remove @n_nacks oldest NACKS form array. + */ void -rtp_source_clear_nacks (RTPSource * src) +rtp_source_clear_nacks (RTPSource * src, guint n_nacks) { - g_array_set_size (src->nacks, 0); - src->send_nack = FALSE; + g_return_if_fail (n_nacks <= src->nacks->len); + + if (src->nacks->len == n_nacks) { + g_array_set_size (src->nacks, 0); + g_array_set_size (src->nack_deadlines, 0); + src->send_nack = FALSE; + } else { + g_array_remove_range (src->nacks, 0, n_nacks); + g_array_remove_range (src->nack_deadlines, 0, n_nacks); + } } diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index e292074..dff7b31 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -198,6 +198,7 @@ struct _RTPSource { gboolean send_nack; GArray *nacks; + GArray *nack_deadlines; gboolean pt_set; guint8 pt; @@ -301,8 +302,10 @@ gboolean rtp_source_has_retained (RTPSource * src, gconstpointer data); void rtp_source_register_nack (RTPSource * src, - guint16 seqnum); -guint32 * rtp_source_get_nacks (RTPSource * src, guint *n_nacks); -void rtp_source_clear_nacks (RTPSource * src); + guint16 seqnum, + GstClockTime deadline); +guint16 * rtp_source_get_nacks (RTPSource * src, guint *n_nacks); +GstClockTime * rtp_source_get_nack_deadlines (RTPSource * src, guint *n_nacks); +void rtp_source_clear_nacks (RTPSource * src, guint n_nacks); #endif /* __RTP_SOURCE_H__ */ -- 2.7.4