static gboolean rtp_session_send_rtcp (RTPSession * sess,
GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
+ GstClockTime deadline);
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
{
GstRTCPBuffer *rtcp = &data->rtcpbuf;
GstRTCPPacket *packet = &data->packet;
- guint32 *nacks;
- guint n_nacks, i;
+ guint16 *nacks;
+ GstClockTime *nack_deadlines;
+ guint n_nacks, i = 0;
+ guint nacked_seqnums = 0;
+ guint16 n_fb_nacks = 0;
guint8 *fci_data;
if (!source->send_nack)
return;
+ nacks = rtp_source_get_nacks (source, &n_nacks);
+ nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
+ GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
+ GST_TIME_ARGS (data->current_time));
+
+ /* cleanup expired nacks */
+ for (i = 0; i < n_nacks; i++) {
+ GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
+ GST_TIME_ARGS (nack_deadlines[i]));
+ if (nack_deadlines[i] >= data->current_time)
+ break;
+ }
+ if (i) {
+ GST_WARNING ("Removing %u expired NACKS", i);
+ rtp_source_clear_nacks (source, i);
+ n_nacks -= i;
+ if (n_nacks == 0)
+ return;
+ }
+
if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
/* exit because the packet is full, will put next request in a
* further packet */
gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
- nacks = rtp_source_get_nacks (source, &n_nacks);
- GST_DEBUG ("%u NACKs", n_nacks);
- if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
+ gst_rtcp_packet_remove (packet);
+ GST_WARNING ("no nacks fit in the packet");
return;
+ }
fci_data = gst_rtcp_packet_fb_get_fci (packet);
- for (i = 0; i < n_nacks; i++) {
- GST_WRITE_UINT32_BE (fci_data, nacks[i]);
+ for (i = 0; i < n_nacks; i = nacked_seqnums) {
+ guint16 seqnum = nacks[i];
+ guint16 blp = 0;
+ guint j;
+
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
+ break;
+
+ n_fb_nacks++;
+ nacked_seqnums++;
+
+ for (j = i + 1; j < n_nacks; j++) {
+ gint diff;
+
+ diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
+ GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
+ if (diff > 16)
+ break;
+
+ blp |= 1 << (diff - 1);
+ nacked_seqnums++;
+ }
+
+ GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
fci_data += 4;
- data->nacked_seqnums++;
}
- rtp_source_clear_nacks (source);
+ data->nacked_seqnums += nacked_seqnums;
+ rtp_source_clear_nacks (source, nacked_seqnums);
data->may_suppress = FALSE;
- source->stats.sent_nack_count += n_nacks;
+ source->stats.sent_nack_count += n_fb_nacks;
+
+ GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
}
/* perform cleanup of sources that timed out */
}
}
+static void
+schedule_remaining_nacks (const gchar * key, RTPSource * source,
+ ReportData * data)
+{
+ RTPSession *sess = data->sess;
+ GstClockTime *nack_deadlines;
+ GstClockTime deadline;
+ guint n_nacks;
+
+ if (!source->send_nack)
+ return;
+
+ /* the scheduling is entirely based on available bandwidth, just take the
+ * biggest seqnum, which will have the largest deadline to request early
+ * RTCP. */
+ nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
+ deadline = nack_deadlines[n_nacks - 1];
+ RTP_SESSION_UNLOCK (sess);
+ rtp_session_send_rtcp_with_deadline (sess, deadline);
+ RTP_SESSION_LOCK (sess);
+}
+
static gboolean
rtp_session_are_all_sources_bye (RTPSession * sess)
{
if (all_empty)
GST_ERROR ("generated empty RTCP messages for all the sources");
+ /* schedule remaining nacks */
+ RTP_SESSION_LOCK (sess);
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) schedule_remaining_nacks, &data);
+ RTP_SESSION_UNLOCK (sess);
+
return result;
}
}
static gboolean
+rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
+ GstClockTime max_delay)
+{
+ /* notify the application that we intend to send early RTCP */
+ if (sess->callbacks.notify_early_rtcp)
+ sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
+
+ return rtp_session_request_early_rtcp (sess, now, max_delay);
+}
+
+static gboolean
+rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
+{
+ GstClockTime now, max_delay;
+
+ if (!sess->callbacks.send_rtcp)
+ return FALSE;
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+ if (deadline < now)
+ return FALSE;
+
+ max_delay = deadline - now;
+
+ return rtp_session_send_rtcp_internal (sess, now, max_delay);
+}
+
+static gboolean
rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
{
GstClockTime now;
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
- /* notify the application that we intend to send early RTCP */
- if (sess->callbacks.notify_early_rtcp)
- sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
-
- return rtp_session_request_early_rtcp (sess, now, max_delay);
+ return rtp_session_send_rtcp_internal (sess, now, max_delay);
}
gboolean
GstClockTime max_delay)
{
RTPSource *source;
+ GstClockTime now;
+
+ if (!sess->callbacks.send_rtcp)
+ return FALSE;
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
RTP_SESSION_LOCK (sess);
source = find_source (sess, ssrc);
if (source == NULL)
goto no_source;
- GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
- rtp_source_register_nack (source, seqnum);
+ GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
+ ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
+ rtp_source_register_nack (source, seqnum, now + max_delay);
RTP_SESSION_UNLOCK (sess);
- if (!rtp_session_send_rtcp (sess, max_delay)) {
+ if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
}
src->seqnum_offset = -1;
src->retained_feedback = g_queue_new ();
- src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32));
+ src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16));
+ src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime));
src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal);
g_queue_free (src->retained_feedback);
g_array_free (src->nacks, TRUE);
+ g_array_free (src->nack_deadlines, TRUE);
if (src->rtp_from)
g_object_unref (src->rtp_from);
* rtp_source_register_nack:
* @src: The #RTPSource
* @seqnum: a seqnum
+ * @deadline: the deadline before which RTX is still possible
*
* Register that @seqnum has not been received from @src.
*/
void
-rtp_source_register_nack (RTPSource * src, guint16 seqnum)
+rtp_source_register_nack (RTPSource * src, guint16 seqnum,
+ GstClockTime deadline)
{
- guint i, len;
- guint32 dword = seqnum << 16;
- gint diff = 16;
+ gint i;
+ guint len;
+ gint diff = -1;
+ guint16 tseq;
len = src->nacks->len;
- for (i = 0; i < len; i++) {
- guint32 tdword;
- guint16 tseq;
+ for (i = len - 1; i >= 0; i--) {
+ tseq = g_array_index (src->nacks, guint16, i);
+ diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
- tdword = g_array_index (src->nacks, guint32, i);
- tseq = tdword >> 16;
+ GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len);
- diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
- if (diff < 16)
+ if (diff >= 0)
break;
}
- /* we already have this seqnum */
- if (diff == 0)
- return;
- /* it comes before the recorded seqnum, FIXME, we could merge it
- * if not to far away */
- if (diff < 0) {
- GST_DEBUG ("insert NACK #%u at %u", seqnum, i);
- g_array_insert_val (src->nacks, i, dword);
- } else if (diff < 16) {
- /* we can merge it */
- dword = g_array_index (src->nacks, guint32, i);
- dword |= 1 << (diff - 1);
- GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i,
- dword >> 16, dword);
- g_array_index (src->nacks, guint32, i) = dword;
+
+ if (diff == 0) {
+ GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_index (src->nack_deadlines, GstClockTime, i) = deadline;
+ } else if (i == len - 1) {
+ GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_append_val (src->nacks, seqnum);
+ g_array_append_val (src->nack_deadlines, deadline);
} else {
- GST_DEBUG ("append NACK #%u", seqnum);
- g_array_append_val (src->nacks, dword);
+ GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_insert_val (src->nacks, i + 1, seqnum);
+ g_array_insert_val (src->nack_deadlines, i + 1, deadline);
}
+
src->send_nack = TRUE;
}
*
* Returns: an array of @n_nacks seqnum values.
*/
-guint32 *
+guint16 *
rtp_source_get_nacks (RTPSource * src, guint * n_nacks)
{
if (n_nacks)
*n_nacks = src->nacks->len;
- return (guint32 *) src->nacks->data;
+ return (guint16 *) src->nacks->data;
}
+/**
+ * rtp_source_get_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: result number of nacks
+ *
+ * Get the registered NACKS deadlines.
+ *
+ * Returns: an array of @n_nacks deadline values.
+ */
+GstClockTime *
+rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks)
+{
+ if (n_nacks)
+ *n_nacks = src->nack_deadlines->len;
+
+ return (GstClockTime *) src->nack_deadlines->data;
+}
+
+/**
+ * rtp_source_clear_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: number of nacks
+ *
+ * Remove @n_nacks oldest NACKS form array.
+ */
void
-rtp_source_clear_nacks (RTPSource * src)
+rtp_source_clear_nacks (RTPSource * src, guint n_nacks)
{
- g_array_set_size (src->nacks, 0);
- src->send_nack = FALSE;
+ g_return_if_fail (n_nacks <= src->nacks->len);
+
+ if (src->nacks->len == n_nacks) {
+ g_array_set_size (src->nacks, 0);
+ g_array_set_size (src->nack_deadlines, 0);
+ src->send_nack = FALSE;
+ } else {
+ g_array_remove_range (src->nacks, 0, n_nacks);
+ g_array_remove_range (src->nack_deadlines, 0, n_nacks);
+ }
}