rtptwcc: changes to use rtp buffer arrival time and current time.
authorTulio Beloqui <tulio.beloqui@pexip.com>
Tue, 13 Apr 2021 14:19:22 +0000 (16:19 +0200)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 25 Aug 2021 08:36:06 +0000 (08:36 +0000)
For TWCC we are more interested to track the arrival time (receive side)
and the current time (sender side) of the buffers rather than the
running time.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/927>

gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpstats.h
gst/rtpmanager/rtptwcc.c
tests/check/elements/rtpsession.c

index 9bd537c..84fbfee 100644 (file)
@@ -2172,9 +2172,11 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
     res =
         gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
         pinfo);
+    pinfo->arrival_time = GST_CLOCK_TIME_NONE;
   } else {
     GstBuffer *buffer = GST_BUFFER_CAST (data);
     res = update_packet (&buffer, 0, pinfo);
+    pinfo->arrival_time = GST_BUFFER_DTS (buffer);
   }
 
   return res;
index 1804aa6..4b08e16 100644 (file)
@@ -70,6 +70,7 @@ typedef struct {
  * @address: address of the sender of the packet
  * @current_time: current time according to the system clock
  * @running_time: time of a packet as buffer running_time
+ * @arrival_time: time of arrival of a packet
  * @ntpnstime: time of a packet NTP time in nanoseconds
  * @header_len: number of overhead bytes per packet
  * @bytes: bytes of the packet including lowlevel overhead
@@ -92,6 +93,7 @@ typedef struct {
   GSocketAddress *address;
   GstClockTime  current_time;
   GstClockTime  running_time;
+  GstClockTime  arrival_time;
   guint64       ntpnstime;
   guint         header_len;
   guint         bytes;
index bcb4fc0..691428c 100644 (file)
@@ -159,7 +159,11 @@ recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
 {
   memset (packet, 0, sizeof (RecvPacket));
   packet->seqnum = seqnum;
-  packet->ts = pinfo->running_time;
+
+  if (GST_CLOCK_TIME_IS_VALID (pinfo->arrival_time))
+    packet->ts = pinfo->arrival_time;
+  else
+    packet->ts = pinfo->current_time;
 }
 
 static guint8
@@ -784,7 +788,7 @@ rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)
 
   GST_LOG ("Receive: twcc-seqnum: %u, pt: %u, marker: %d, ts: %"
       GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker,
-      GST_TIME_ARGS (pinfo->running_time));
+      GST_TIME_ARGS (pinfo->arrival_time));
 
   if (!pinfo->marker)
     twcc->packet_count_no_marker++;
@@ -841,7 +845,7 @@ static void
 sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
 {
   packet->seqnum = seqnum;
-  packet->ts = pinfo->running_time;
+  packet->ts = pinfo->current_time;
   packet->size = pinfo->payload_len;
   packet->pt = pinfo->pt;
   packet->remote_ts = GST_CLOCK_TIME_NONE;
@@ -864,8 +868,9 @@ rtp_twcc_manager_send_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)
   g_array_append_val (twcc->sent_packets, packet);
 
 
-  GST_LOG ("Send: twcc-seqnum: %u, pt: %u, marker: %d, ts: %" GST_TIME_FORMAT,
-      seqnum, pinfo->pt, pinfo->marker, GST_TIME_ARGS (pinfo->running_time));
+  GST_LOG ("Send: twcc-seqnum: %u, pt: %u, marker: %d, ts: %"
+      GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker,
+      GST_TIME_ARGS (pinfo->current_time));
 }
 
 static void
index b03babc..ccbcd3d 100644 (file)
@@ -55,7 +55,8 @@ generate_caps (void)
 static GstBuffer *
 generate_test_buffer_full (GstClockTime ts,
     guint seqnum, guint32 rtp_ts, guint ssrc,
-    gboolean marker_bit, guint8 twcc_ext_id, guint16 twcc_seqnum)
+    gboolean marker_bit, guint8 payload_type, guint8 twcc_ext_id,
+    guint16 twcc_seqnum)
 {
   GstBuffer *buf;
   guint8 *payload;
@@ -67,7 +68,7 @@ generate_test_buffer_full (GstClockTime ts,
   GST_BUFFER_DTS (buf) = ts;
 
   gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
-  gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT);
+  gst_rtp_buffer_set_payload_type (&rtp, payload_type);
   gst_rtp_buffer_set_seq (&rtp, seqnum);
   gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
   gst_rtp_buffer_set_ssrc (&rtp, ssrc);
@@ -93,7 +94,7 @@ static GstBuffer *
 generate_test_buffer (guint seqnum, guint ssrc)
 {
   return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
-      seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, 0, 0);
+      seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, TEST_BUF_PT, 0, 0);
 }
 
 static GstBuffer *
@@ -101,18 +102,25 @@ generate_twcc_recv_buffer (guint seqnum,
     GstClockTime arrival_time, gboolean marker_bit)
 {
   return generate_test_buffer_full (arrival_time, seqnum,
-      seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
+      seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit, TEST_BUF_PT,
       TEST_TWCC_EXT_ID, seqnum);
 }
 
 static GstBuffer *
-generate_twcc_send_buffer (guint seqnum, gboolean marker_bit)
+generate_twcc_send_buffer_full (guint seqnum, gboolean marker_bit,
+    guint ssrc, guint8 payload_type)
 {
   return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
-      seqnum, seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
-      TEST_TWCC_EXT_ID, seqnum);
+      seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, marker_bit,
+      payload_type, TEST_TWCC_EXT_ID, seqnum);
 }
 
+static GstBuffer *
+generate_twcc_send_buffer (guint seqnum, gboolean marker_bit)
+{
+  return generate_twcc_send_buffer_full (seqnum, marker_bit, TEST_BUF_SSRC,
+      TEST_BUF_PT);
+}
 
 typedef struct
 {
@@ -2569,7 +2577,8 @@ generate_stepped_ts_buffer (guint i, gboolean stepped)
       GST_TIME_ARGS (gst_util_uint64_scale_int (GST_SECOND, ts,
               TEST_BUF_CLOCK_RATE)), i);
 
-  buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE, 0, 0);
+  buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE,
+      TEST_BUF_PT, 0, 0);
   return buf;
 }
 
@@ -2716,8 +2725,12 @@ GST_START_TEST (test_twcc_header_and_run_length)
   for (i = 0; i < td->num_packets; i++) {
     gboolean last_packet = i == (td->num_packets - 1);
 
-    buf = generate_twcc_recv_buffer (i + td->base_seqnum,
-        td->base_time + i * td->duration, last_packet);
+    GstClockTime now = gst_clock_get_time (GST_CLOCK_CAST (h->testclock));
+    GstClockTime ts = td->base_time + i * td->duration;
+    if (ts > now)
+      gst_test_clock_set_time (h->testclock, ts);
+
+    buf = generate_twcc_recv_buffer (i + td->base_seqnum, ts, last_packet);
     res = session_harness_recv_rtp (h, buf);
     fail_unless_equals_int (GST_FLOW_OK, res);
   }
@@ -2869,6 +2882,32 @@ G_STMT_START {                                                                 \
   twcc_verify_packets_to_event (packets, event);                               \
 } G_STMT_END
 
+#define twcc_verify_stats(h, bitrate_sent, bitrate_recv, pkts_sent, pkts_recv, loss_pct, avg_dod)  \
+G_STMT_START {                                                                                     \
+  GstStructure *twcc_stats;                                                                        \
+  guint stats_bitrate_sent;                                                                        \
+  guint stats_bitrate_recv;                                                                        \
+  guint stats_packets_sent;                                                                        \
+  guint stats_packets_recv;                                                                        \
+  gdouble stats_loss_pct;                                                                          \
+  GstClockTimeDiff stats_avg_dod;                                                                  \
+  twcc_stats = session_harness_get_last_twcc_stats (h);                                            \
+  fail_unless (gst_structure_get (twcc_stats,                                                      \
+          "bitrate-sent", G_TYPE_UINT, &stats_bitrate_sent,                                        \
+          "bitrate-recv", G_TYPE_UINT, &stats_bitrate_recv,                                        \
+          "packets-sent", G_TYPE_UINT, &stats_packets_sent,                                        \
+          "packets-recv", G_TYPE_UINT, &stats_packets_recv,                                        \
+          "packet-loss-pct", G_TYPE_DOUBLE, &stats_loss_pct,                                       \
+          "avg-delta-of-delta", G_TYPE_INT64, &stats_avg_dod, NULL));                              \
+  fail_unless_equals_int (bitrate_sent, stats_bitrate_sent);                                       \
+  fail_unless_equals_int (bitrate_recv, stats_bitrate_recv);                                       \
+  fail_unless_equals_int (pkts_sent, stats_packets_sent);                                          \
+  fail_unless_equals_int (pkts_recv, stats_packets_recv);                                          \
+  fail_unless_equals_float (loss_pct, stats_loss_pct);                                             \
+  fail_unless_equals_int64 (avg_dod, stats_avg_dod);                                               \
+  gst_structure_free (twcc_stats);                                                                 \
+} G_STMT_END
+
 GST_START_TEST (test_twcc_1_bit_status_vector)
 {
   SessionHarness *h0 = session_harness_new ();
@@ -3776,7 +3815,7 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
   guint8 exp_fci0[] = {
     0x01, 0x00,                 /* base sequence number: 256 */
     0x00, 0x01,                 /* packet status count: 1 */
-    0x00, 0x00, 0x01,           /* reference time: 0 */
+    0x00, 0x00, 0x01,           /* reference time: 1 */
     0x00,                       /* feedback packet count: 00 */
     /* packet chunks: */
     0x20, 0x01,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
@@ -3787,8 +3826,8 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
   guint8 exp_fci1[] = {
     0x01, 0x01,                 /* base sequence number: 257 */
     0x00, 0x01,                 /* packet status count: 1 */
-    0x00, 0x00, 0x01,           /* reference time: 0 */
-    0x01,                       /* feedback packet count: 0 */
+    0x00, 0x00, 0x01,           /* reference time: 1 */
+    0x01,                       /* feedback packet count: 1 */
     /* packet chunks: */
     0x20, 0x01,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
     0x01,                       /* 1 recv-delta */
@@ -3799,15 +3838,15 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
 
   /* Push packets to get the feedback packet count wrap limit */
   for (i = 0; i < 255; i++) {
+    GstClockTime ts = i * 250 * GST_USECOND;
+    gst_test_clock_set_time (h->testclock, ts);
     fail_unless_equals_int (GST_FLOW_OK,
         session_harness_recv_rtp ((h),
-            generate_twcc_recv_buffer (i, i * 250 * GST_USECOND, TRUE)));
-
-    /* ignore the twcc for these ones */
-    gst_buffer_unref (session_harness_produce_twcc (h));
+            generate_twcc_recv_buffer (i, ts, TRUE)));
   }
 
   /* push pkt #256 to jump ahead and force the overflow */
+  gst_test_clock_set_time (h->testclock, 256 * 250 * GST_USECOND);
   fail_unless_equals_int (GST_FLOW_OK,
       session_harness_recv_rtp ((h),
           generate_twcc_recv_buffer (256, 256 * 250 * GST_USECOND, TRUE)));
@@ -3817,11 +3856,16 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
       session_harness_recv_rtp ((h),
           generate_twcc_recv_buffer (255, 255 * 250 * GST_USECOND, TRUE)));
 
+
   /* push pkt #257 to verify fci is correct */
+  gst_test_clock_set_time (h->testclock, 257 * 250 * GST_USECOND);
   fail_unless_equals_int (GST_FLOW_OK,
       session_harness_recv_rtp ((h),
           generate_twcc_recv_buffer (257, 257 * 250 * GST_USECOND, TRUE)));
 
+  /* ignore the twcc for the first 255 packets  */
+  for (i = 0; i < 255; i++)
+    gst_buffer_unref (session_harness_produce_twcc (h));
 
   /* we expect a fci for pkt #256 */
   buf = session_harness_produce_twcc (h);
@@ -3943,6 +3987,7 @@ GST_START_TEST (test_twcc_send_and_recv)
       buf = generate_twcc_send_buffer (seq, slice == num_slices - 1);
       res = session_harness_send_rtp (h_send, buf);
       fail_unless_equals_int (GST_FLOW_OK, res);
+      session_harness_advance_and_crank (h_send, TEST_BUF_DURATION);
 
       /* get the buffer ready for the network */
       buf = session_harness_pull_send_rtp (h_send);
@@ -3958,30 +4003,9 @@ GST_START_TEST (test_twcc_send_and_recv)
     /* sender receives the TWCC packet */
     session_harness_recv_rtcp (h_send, buf);
 
-    if (frame > 0) {
-      GstStructure *twcc_stats;
-      guint bitrate_sent;
-      guint bitrate_recv;
-      guint packets_sent;
-      guint packets_recv;
-      gdouble packet_loss_pct;
-      GstClockTimeDiff avg_delta_of_delta;
-      twcc_stats = session_harness_get_last_twcc_stats (h_send);
-      fail_unless (gst_structure_get (twcc_stats,
-              "bitrate-sent", G_TYPE_UINT, &bitrate_sent,
-              "bitrate-recv", G_TYPE_UINT, &bitrate_recv,
-              "packets-sent", G_TYPE_UINT, &packets_sent,
-              "packets-recv", G_TYPE_UINT, &packets_recv,
-              "packet-loss-pct", G_TYPE_DOUBLE, &packet_loss_pct,
-              "avg-delta-of-delta", G_TYPE_INT64, &avg_delta_of_delta, NULL));
-      fail_unless_equals_int (TEST_BUF_BPS, bitrate_sent);
-      fail_unless_equals_int (TEST_BUF_BPS, bitrate_recv);
-      fail_unless_equals_int (num_slices, packets_sent);
-      fail_unless_equals_int (num_slices, packets_recv);
-      fail_unless_equals_float (0.0f, packet_loss_pct);
-      fail_unless_equals_int64 (0, avg_delta_of_delta);
-      gst_structure_free (twcc_stats);
-    }
+    if (frame > 0)
+      twcc_verify_stats (h_send, TEST_BUF_BPS, TEST_BUF_BPS, num_slices,
+          num_slices, 0.0f, 0);
   }
 
   session_harness_free (h_send);
@@ -3990,6 +4014,51 @@ GST_START_TEST (test_twcc_send_and_recv)
 
 GST_END_TEST;
 
+GST_START_TEST (test_twcc_multiple_payloads_below_window)
+{
+  SessionHarness *h_send = session_harness_new ();
+  SessionHarness *h_recv = session_harness_new ();
+
+  guint i;
+
+  GstBuffer *buffers[] = {
+    generate_twcc_send_buffer_full (0, FALSE, 0xabc, 98),
+    generate_twcc_send_buffer_full (0, FALSE, 0xdef, 111),
+    generate_twcc_send_buffer_full (1, FALSE, 0xdef, 111),
+    generate_twcc_send_buffer_full (2, FALSE, 0xdef, 111),
+    generate_twcc_send_buffer_full (1, TRUE, 0xabc, 98),
+  };
+
+  /* enable twcc */
+  session_harness_set_twcc_recv_ext_id (h_recv, TEST_TWCC_EXT_ID);
+  session_harness_set_twcc_send_ext_id (h_send, TEST_TWCC_EXT_ID);
+
+  for (i = 0; i < G_N_ELEMENTS (buffers); i++) {
+    GstBuffer *buf = buffers[i];
+    GstFlowReturn res;
+
+    /* from payloder to rtpbin */
+    res = session_harness_send_rtp (h_send, buf);
+    fail_unless_equals_int (GST_FLOW_OK, res);
+
+    buf = session_harness_pull_send_rtp (h_send);
+    session_harness_advance_and_crank (h_send, TEST_BUF_DURATION);
+
+    /* buffer arrives at the receiver */
+    res = session_harness_recv_rtp (h_recv, buf);
+    fail_unless_equals_int (GST_FLOW_OK, res);
+  }
+
+  /* sender receives the TWCC packet from the receiver */
+  session_harness_recv_rtcp (h_send, session_harness_produce_twcc (h_recv));
+  twcc_verify_stats (h_send, 0, 0, 5, 5, 0.0f, GST_CLOCK_STIME_NONE);
+
+  session_harness_free (h_send);
+  session_harness_free (h_recv);
+}
+
+GST_END_TEST;
+
 typedef struct
 {
   GstClockTime interval;
@@ -4281,6 +4350,7 @@ rtpsession_suite (void)
   tcase_add_test (tc_chain, test_twcc_recv_rtcp_reordered);
   tcase_add_test (tc_chain, test_twcc_no_exthdr_in_buffer);
   tcase_add_test (tc_chain, test_twcc_send_and_recv);
+  tcase_add_test (tc_chain, test_twcc_multiple_payloads_below_window);
   tcase_add_loop_test (tc_chain, test_twcc_feedback_interval, 0,
       G_N_ELEMENTS (test_twcc_feedback_interval_ctx));
   tcase_add_test (tc_chain, test_twcc_feedback_count_wrap);