src->seqnum_offset = -1;
src->retained_feedback = g_queue_new ();
- src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32));
+ src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16));
+ src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime));
src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal);
g_queue_free (src->retained_feedback);
g_array_free (src->nacks, TRUE);
+ g_array_free (src->nack_deadlines, TRUE);
if (src->rtp_from)
g_object_unref (src->rtp_from);
goto done;
}
} else {
- /* unexpected seqnum in probation */
+ /* unexpected seqnum in probation
+ *
+ * There is no need to clean the queue at this point because the
+ * invalid packets in the queue are not going to be pushed as we are
+ * still in probation, and some cleanup will be performed at future
+ * probation attempts anyway if there are too many old packets in the
+ * queue.
+ */
goto probation_seqnum;
}
} else if (delta >= 0 && delta < max_dropout) {
src->stats.octets_received += pinfo->payload_len;
src->stats.bytes_received += pinfo->bytes;
- src->stats.packets_received++;
+ src->stats.packets_received += pinfo->packets;
/* for the bitrate estimation */
src->bytes_received += pinfo->payload_len;
}
probation_seqnum:
{
- GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
+ GST_WARNING ("probation: seqnr %d != expected %d "
+ "(SSRC %u curr_probation %i probation %i)", seqnr, expected, src->ssrc,
+ src->curr_probation, src->probation);
src->curr_probation = src->probation;
src->stats.max_seq = seqnr;
return FALSE;
* @src: an #RTPSource
* @pinfo: an #RTPPacketInfo
*
- * Let @src handle the incomming RTP packet described in @pinfo.
+ * Let @src handle the incoming RTP packet described in @pinfo.
*
* Returns: a #GstFlowReturn.
*/
* rtp_source_register_nack:
* @src: The #RTPSource
* @seqnum: a seqnum
+ * @deadline: the deadline before which RTX is still possible
*
* Register that @seqnum has not been received from @src.
*/
void
-rtp_source_register_nack (RTPSource * src, guint16 seqnum)
+rtp_source_register_nack (RTPSource * src, guint16 seqnum,
+ GstClockTime deadline)
{
- guint i, len;
- guint32 dword = seqnum << 16;
- gint diff = 16;
+ gint i;
+ guint len;
+ gint diff = -1;
+ guint16 tseq;
len = src->nacks->len;
- for (i = 0; i < len; i++) {
- guint32 tdword;
- guint16 tseq;
+ for (i = len - 1; i >= 0; i--) {
+ tseq = g_array_index (src->nacks, guint16, i);
+ diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
- tdword = g_array_index (src->nacks, guint32, i);
- tseq = tdword >> 16;
+ GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len);
- diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
- if (diff < 16)
+ if (diff >= 0)
break;
}
- /* we already have this seqnum */
- if (diff == 0)
- return;
- /* it comes before the recorded seqnum, FIXME, we could merge it
- * if not to far away */
- if (diff < 0) {
- GST_DEBUG ("insert NACK #%u at %u", seqnum, i);
- g_array_insert_val (src->nacks, i, dword);
- } else if (diff < 16) {
- /* we can merge it */
- dword = g_array_index (src->nacks, guint32, i);
- dword |= 1 << (diff - 1);
- GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i,
- dword >> 16, dword);
- g_array_index (src->nacks, guint32, i) = dword;
+
+ if (diff == 0) {
+ GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_index (src->nack_deadlines, GstClockTime, i) = deadline;
+ } else if (i == len - 1) {
+ GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_append_val (src->nacks, seqnum);
+ g_array_append_val (src->nack_deadlines, deadline);
} else {
- GST_DEBUG ("append NACK #%u", seqnum);
- g_array_append_val (src->nacks, dword);
+ GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (deadline));
+ g_array_insert_val (src->nacks, i + 1, seqnum);
+ g_array_insert_val (src->nack_deadlines, i + 1, deadline);
}
+
src->send_nack = TRUE;
}
*
* Returns: an array of @n_nacks seqnum values.
*/
-guint32 *
+guint16 *
rtp_source_get_nacks (RTPSource * src, guint * n_nacks)
{
if (n_nacks)
*n_nacks = src->nacks->len;
- return (guint32 *) src->nacks->data;
+ return (guint16 *) src->nacks->data;
}
+/**
+ * rtp_source_get_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: result number of nacks
+ *
+ * Get the registered NACKS deadlines.
+ *
+ * Returns: an array of @n_nacks deadline values.
+ */
+GstClockTime *
+rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks)
+{
+ if (n_nacks)
+ *n_nacks = src->nack_deadlines->len;
+
+ return (GstClockTime *) src->nack_deadlines->data;
+}
+
+/**
+ * rtp_source_clear_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: number of nacks
+ *
+ * Remove @n_nacks oldest NACKS form array.
+ */
void
-rtp_source_clear_nacks (RTPSource * src)
+rtp_source_clear_nacks (RTPSource * src, guint n_nacks)
{
- g_array_set_size (src->nacks, 0);
- src->send_nack = FALSE;
+ g_return_if_fail (n_nacks <= src->nacks->len);
+
+ if (src->nacks->len == n_nacks) {
+ g_array_set_size (src->nacks, 0);
+ g_array_set_size (src->nack_deadlines, 0);
+ src->send_nack = FALSE;
+ } else {
+ g_array_remove_range (src->nacks, 0, n_nacks);
+ g_array_remove_range (src->nack_deadlines, 0, n_nacks);
+ }
}