ristrtxsend: Store sent packets with extended seqnum
authorOlivier Crête <olivier.crete@collabora.com>
Fri, 26 Jul 2019 20:50:21 +0000 (16:50 -0400)
committerOlivier Crête <olivier.crete@ocrete.ca>
Thu, 30 Apr 2020 18:31:31 +0000 (18:31 +0000)
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1153>

gst/rist/gstristrtxsend.c

index 1846c38..0a28fb4 100644 (file)
@@ -125,7 +125,7 @@ G_DEFINE_TYPE_WITH_CODE (GstRistRtxSend, gst_rist_rtx_send, GST_TYPE_ELEMENT,
 
 typedef struct
 {
-  guint16 seqnum;
+  guint32 extseqnum;
   guint32 timestamp;
   GstBuffer *buffer;
 } BufferQueueItem;
@@ -145,6 +145,7 @@ typedef struct
 
   /* history of rtp packets */
   GSequence *queue;
+  guint32 max_extseqnum;
 } SSRCRtxData;
 
 static SSRCRtxData *
@@ -155,6 +156,7 @@ ssrc_rtx_data_new (guint32 rtx_ssrc)
   data->rtx_ssrc = rtx_ssrc;
   data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
+  data->max_extseqnum = -1;
 
   return data;
 }
@@ -368,7 +370,7 @@ buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
   /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
    * it returns negative when seqnum1 > seqnum2 and we want negative
    * when b > a, i.e. a is smaller, so it comes first in the sequence */
-  return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
+  return a->extseqnum - b->extseqnum;
 }
 
 static gboolean
@@ -404,18 +406,25 @@ gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
           SSRCRtxData *data;
           GSequenceIter *iter;
           BufferQueueItem search_item;
+          guint32 extseqnum;
+          guint32 max_extseqnum;
 
           /* update statistics */
           ++rtx->num_rtx_requests;
 
           data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
 
-          search_item.seqnum = seqnum;
+
+          max_extseqnum = data->max_extseqnum;
+          extseqnum = gst_rist_rtp_ext_seq (&max_extseqnum, seqnum);
+
+          search_item.extseqnum = extseqnum;
           iter = g_sequence_lookup (data->queue, &search_item,
               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
           if (iter) {
             BufferQueueItem *item = g_sequence_get (iter);
-            GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
+            GST_LOG_OBJECT (rtx, "found %u (%u:%u)", item->extseqnum,
+                item->extseqnum >> 16, item->extseqnum & 0xFFFF);
             rtx_buf = gst_rtp_rist_buffer_new (rtx, item->buffer, ssrc);
           }
 #ifndef GST_DISABLE_DEBUG
@@ -426,10 +435,10 @@ gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
             if (!g_sequence_iter_is_end (iter))
               item = g_sequence_get (iter);
 
-            if (item && seqnum < item->seqnum) {
+            if (item && extseqnum < item->extseqnum) {
               GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
                   "removed from the rtx queue; the first available is %u",
-                  seqnum, item->seqnum);
+                  seqnum, item->extseqnum);
             } else {
               GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
                   "transmitted yet in the original stream; either the remote end "
@@ -566,12 +575,25 @@ process_buffer (GstRistRtxSend * rtx, GstBuffer * buffer)
   SSRCRtxData *data;
   guint16 seqnum;
   guint32 ssrc, rtptime;
+  guint16 bits;
+  gpointer extdata;
+  guint extlen;
+  gboolean has_seqnum_ext = FALSE;
+  guint32 extseqnum;
 
   /* read the information we want from the buffer */
   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
   seqnum = gst_rtp_buffer_get_seq (&rtp);
   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+  if (gst_rtp_buffer_get_extension_data (&rtp, &bits, &extdata, &extlen)) {
+    /* Has header extension */
+    has_seqnum_ext = (bits >> 14) & 1;  /* E */
+    if (extlen != 1)
+      has_seqnum_ext = FALSE;
+    if (has_seqnum_ext)
+      extseqnum = GST_READ_UINT16_BE (extdata) << 16 | seqnum;
+  }
   gst_rtp_buffer_unmap (&rtp);
 
   GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
@@ -579,9 +601,14 @@ process_buffer (GstRistRtxSend * rtx, GstBuffer * buffer)
 
   data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
 
+  if (has_seqnum_ext)
+    data->max_extseqnum = MAX (data->max_extseqnum, extseqnum);
+  else
+    extseqnum = gst_rist_rtp_ext_seq (&data->max_extseqnum, seqnum);
+
   /* add current rtp buffer to queue history */
   item = g_slice_new0 (BufferQueueItem);
-  item->seqnum = seqnum;
+  item->extseqnum = extseqnum;
   item->timestamp = rtptime;
   item->buffer = gst_buffer_ref (buffer);
   g_sequence_append (data->queue, item);