rtpmanager: Google Transport-Wide Congestion Control RTP Extension
authorHavard Graff <havard.graff@gmail.com>
Sat, 29 Jun 2019 16:06:11 +0000 (18:06 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 14 Feb 2020 10:09:02 +0000 (10:09 +0000)
Generating and parsing the RTCP-messages described in:
https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01

gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/meson.build
gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpstats.c
gst/rtpmanager/rtpstats.h
gst/rtpmanager/rtptwcc.c [new file with mode: 0644]
gst/rtpmanager/rtptwcc.h [new file with mode: 0644]
tests/check/elements/rtpsession.c

index ce5694d..8ce9275 100644 (file)
@@ -239,6 +239,7 @@ enum
   PROP_MAX_DROPOUT_TIME,
   PROP_MAX_MISORDER_TIME,
   PROP_STATS,
+  PROP_TWCC_STATS,
   PROP_RTP_PROFILE,
   PROP_NTP_TIME_SOURCE,
   PROP_RTCP_SYNC_SEND_TIME
@@ -277,6 +278,8 @@ struct _GstRtpSessionPrivate
   guint recv_rtx_req_count;
   guint sent_rtx_req_count;
 
+  GstStructure *last_twcc_stats;
+
   /*
    * This is the list of processed packets in the receive path when upstream
    * pushed a buffer list.
@@ -302,6 +305,8 @@ static GstClockTime gst_rtp_session_request_time (RTPSession * session,
     gpointer user_data);
 static void gst_rtp_session_notify_nack (RTPSession * sess,
     guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
+static void gst_rtp_session_notify_twcc (RTPSession * sess,
+    GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data);
 static void gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data);
 static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
     gpointer user_data);
@@ -326,6 +331,7 @@ static RTPSessionCallbacks callbacks = {
   gst_rtp_session_request_key_unit,
   gst_rtp_session_request_time,
   gst_rtp_session_notify_nack,
+  gst_rtp_session_notify_twcc,
   gst_rtp_session_reconfigure,
   gst_rtp_session_notify_early_rtcp
 };
@@ -754,6 +760,30 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
           "Various statistics", GST_TYPE_STRUCTURE,
           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstRtpSession::twcc-stats:
+   *
+   * Various statistics derived from TWCC. This property returns a GstStructure
+   * with name RTPTWCCStats with the following fields:
+   *
+   *  "bitrate-sent"     G_TYPE_UINT    The actual sent bitrate of TWCC packets
+   *  "bitrate-recv"     G_TYPE_UINT    The estimated bitrate for the receiver.
+   *  "packets-sent"     G_TYPE_UINT    Number of packets sent
+   *  "packets-recv"     G_TYPE_UINT    Number of packets reported recevied
+   *  "packet-loss-pct"  G_TYPE_DOUBLE  Packetloss percentage, based on
+          packets reported as lost from the recevier.
+   *  "avg-delta-of-delta", G_TYPE_INT64 In nanoseconds, a moving window
+          average of the difference in inter-packet spacing between
+          sender and receiver. A sudden increase in this number can indicate
+          network congestion.
+   *
+   * Since: 1.18
+   */
+  g_object_class_install_property (gobject_class, PROP_TWCC_STATS,
+      g_param_spec_boxed ("twcc-stats", "TWCC Statistics",
+          "Various statistics from TWCC", GST_TYPE_STRUCTURE,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
       g_param_spec_enum ("rtp-profile", "RTP Profile",
           "RTP profile to use", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
@@ -880,6 +910,8 @@ gst_rtp_session_finalize (GObject * object)
   g_cond_clear (&rtpsession->priv->cond);
   g_object_unref (rtpsession->priv->sysclock);
   g_object_unref (rtpsession->priv->session);
+  if (rtpsession->priv->last_twcc_stats)
+    gst_structure_free (rtpsession->priv->last_twcc_stats);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -1004,6 +1036,11 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
     case PROP_STATS:
       g_value_take_boxed (value, gst_rtp_session_create_stats (rtpsession));
       break;
+    case PROP_TWCC_STATS:
+      GST_RTP_SESSION_LOCK (rtpsession);
+      g_value_set_boxed (value, priv->last_twcc_stats);
+      GST_RTP_SESSION_UNLOCK (rtpsession);
+      break;
     case PROP_RTP_PROFILE:
       g_object_get_property (G_OBJECT (priv->session), "rtp-profile", value);
       break;
@@ -1563,12 +1600,15 @@ gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
   GST_DEBUG_OBJECT (rtpsession, "parsing caps");
 
   s = gst_caps_get_structure (caps, 0);
+
   if (!gst_structure_get_int (s, "payload", &payload))
     return;
 
   if (g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)))
     return;
 
+  rtp_session_update_recv_caps_structure (rtpsession->priv->session, s);
+
   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload),
       gst_caps_ref (caps));
 }
@@ -2802,6 +2842,31 @@ gst_rtp_session_notify_nack (RTPSession * sess, guint16 seqnum,
 }
 
 static void
+gst_rtp_session_notify_twcc (RTPSession * sess,
+    GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data)
+{
+  GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
+  GstEvent *event;
+  GstPad *send_rtp_sink;
+
+  GST_RTP_SESSION_LOCK (rtpsession);
+  if ((send_rtp_sink = rtpsession->send_rtp_sink))
+    gst_object_ref (send_rtp_sink);
+  if (rtpsession->priv->last_twcc_stats)
+    gst_structure_free (rtpsession->priv->last_twcc_stats);
+  rtpsession->priv->last_twcc_stats = twcc_stats;
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+
+  if (send_rtp_sink) {
+    event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, twcc_packets);
+    gst_pad_push_event (send_rtp_sink, event);
+    gst_object_unref (send_rtp_sink);
+  }
+
+  g_object_notify (G_OBJECT (rtpsession), "twcc-stats");
+}
+
+static void
 gst_rtp_session_reconfigure (RTPSession * sess, gpointer user_data)
 {
   GstRtpSession *rtpsession = GST_RTP_SESSION (user_data);
index 221c8e3..118a1e1 100644 (file)
@@ -14,6 +14,7 @@ rtpmanager_sources = [
   'rtpsource.c',
   'rtpstats.c',
   'rtptimerqueue.c',
+  'rtptwcc.c',
   'gstrtpsession.c',
   'gstrtpfunnel.c',
 ]
index bfd96ca..b96d3cd 100644 (file)
@@ -22,6 +22,7 @@
 #define GLIB_DISABLE_DEPRECATION_WARNINGS
 
 #include <string.h>
+#include <stdlib.h>
 
 #include <gst/rtp/gstrtpbuffer.h>
 #include <gst/rtp/gstrtcpbuffer.h>
@@ -30,7 +31,7 @@
 
 #include "rtpsession.h"
 
-GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
+GST_DEBUG_CATEGORY (rtp_session_debug);
 #define GST_CAT_DEFAULT rtp_session_debug
 
 /* signals and args */
@@ -115,6 +116,8 @@ enum
    (avg) = ((val) + (15 * (avg))) >> 4;
 
 
+#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
+
 /* GObject vmethods */
 static void rtp_session_finalize (GObject * object);
 static void rtp_session_set_property (GObject * object, guint prop_id,
@@ -706,6 +709,9 @@ rtp_session_init (RTPSession * sess)
   sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
 
   sess->is_doing_ptp = TRUE;
+
+  sess->twcc = rtp_twcc_manager_new (sess->mtu);
+  sess->twcc_stats = rtp_twcc_stats_new ();
 }
 
 static void
@@ -727,6 +733,9 @@ rtp_session_finalize (GObject * object)
   for (i = 0; i < 1; i++)
     g_hash_table_destroy (sess->ssrcs[i]);
 
+  rtp_twcc_manager_free (sess->twcc);
+  rtp_twcc_stats_free (sess->twcc_stats);
+
   g_mutex_clear (&sess->lock);
 
   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
@@ -847,6 +856,7 @@ rtp_session_set_property (GObject * object, guint prop_id,
       break;
     case PROP_RTCP_MTU:
       sess->mtu = g_value_get_uint (value);
+      rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
       break;
     case PROP_SDES:
       rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
@@ -1206,6 +1216,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
     sess->callbacks.notify_nack = callbacks->notify_nack;
     sess->notify_nack_user_data = user_data;
   }
+  if (callbacks->notify_twcc) {
+    sess->callbacks.notify_twcc = callbacks->notify_twcc;
+    sess->notify_twcc_user_data = user_data;
+  }
   if (callbacks->reconfigure) {
     sess->callbacks.reconfigure = callbacks->reconfigure;
     sess->reconfigure_user_data = user_data;
@@ -2067,10 +2081,15 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
       pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
       pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
       pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+      pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
       /* copy available csrc */
       pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
       for (i = 0; i < pinfo->csrc_count; i++)
         pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
+
+      /* RTP header extensions */
+      pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
+          &pinfo->header_ext_bit_pattern);
     }
     gst_rtp_buffer_unmap (&rtp);
   }
@@ -2119,6 +2138,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
   pinfo->bytes = 0;
   pinfo->payload_len = 0;
   pinfo->packets = 0;
+  pinfo->marker = FALSE;
 
   if (is_list) {
     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
@@ -2129,6 +2149,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
     GstBuffer *buffer = GST_BUFFER_CAST (data);
     res = update_packet (&buffer, 0, pinfo);
   }
+
   return res;
 }
 
@@ -2141,6 +2162,23 @@ clean_packet_info (RTPPacketInfo * pinfo)
     gst_mini_object_unref (pinfo->data);
     pinfo->data = NULL;
   }
+  if (pinfo->header_ext)
+    g_bytes_unref (pinfo->header_ext);
+}
+
+static gint32
+packet_info_get_twcc_seqnum (RTPPacketInfo * pinfo, guint8 ext_id)
+{
+  gint32 val = -1;
+  gpointer data;
+  guint size;
+
+  if (gst_rtp_buffer_get_extension_onebyte_header_from_bytes (pinfo->header_ext,
+          pinfo->header_ext_bit_pattern, ext_id, 0, &data, &size)) {
+    if (size == 2)
+      val = GST_READ_UINT16_BE (data);
+  }
+  return val;
 }
 
 static gboolean
@@ -2165,6 +2203,30 @@ source_update_active (RTPSession * sess, RTPSource * source,
   return TRUE;
 }
 
+static void
+process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
+{
+  gint32 twcc_seqnum;
+
+  if (sess->twcc_recv_ext_id == 0)
+    return;
+
+  twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_recv_ext_id);
+  if (twcc_seqnum == -1)
+    return;
+
+  if (rtp_twcc_manager_recv_packet (sess->twcc, twcc_seqnum, pinfo)) {
+    RTP_SESSION_UNLOCK (sess);
+
+    /* TODO: find a better rational for this number, and possibly tune it based
+       on factors like framerate / bandwidth etc */
+    if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
+      GST_INFO ("Could not schedule TWCC straight away");
+    }
+    RTP_SESSION_LOCK (sess);
+  }
+}
+
 static gboolean
 source_update_sender (RTPSession * sess, RTPSource * source,
     gboolean prevsender)
@@ -2244,6 +2306,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
 
   /* let source process the packet */
   result = rtp_source_process_rtp (source, &pinfo);
+  process_twcc_packet (sess, &pinfo);
 
   /* source became active */
   if (source_update_active (sess, source, prevactive))
@@ -2802,6 +2865,35 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
 }
 
 static void
+rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
+    guint32 media_ssrc, guint8 * fci_data, guint fci_length)
+{
+  GArray *twcc_packets;
+  GstStructure *twcc_packets_s;
+  GstStructure *twcc_stats_s;
+
+  twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
+      fci_data, fci_length * sizeof (guint32));
+  if (twcc_packets == NULL)
+    return;
+
+  twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
+  twcc_stats_s =
+      rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
+
+  GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
+  GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
+
+  g_array_unref (twcc_packets);
+
+  RTP_SESSION_UNLOCK (sess);
+  if (sess->callbacks.notify_twcc)
+    sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
+        sess->notify_twcc_user_data);
+  RTP_SESSION_LOCK (sess);
+}
+
+static void
 rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
     RTPPacketInfo * pinfo, GstClockTime current_time)
 {
@@ -2862,7 +2954,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
 
   if ((src && src->internal) ||
       /* PSFB FIR puts the media ssrc inside the FCI */
-      (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
+      (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
+      /* TWCC is for all sources, so a single media-ssrc is not enough */
+      (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
     switch (type) {
       case GST_RTCP_TYPE_PSFB:
         switch (fbtype) {
@@ -2890,6 +2984,10 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
             rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
                 fci_data, fci_length, current_time);
             break;
+          case GST_RTCP_RTPFB_TYPE_TWCC:
+            rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
+                fci_data, fci_length);
+            break;
           default:
             break;
         }
@@ -3021,6 +3119,29 @@ invalid_packet:
   }
 }
 
+static guint8
+_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
+{
+  guint i;
+  guint8 extmap_id = 0;
+  guint n_fields = gst_structure_n_fields (s);
+
+  for (i = 0; i < n_fields; i++) {
+    const gchar *field_name = gst_structure_nth_field_name (s, i);
+    if (g_str_has_prefix (field_name, "extmap-")) {
+      const gchar *str = gst_structure_get_string (s, field_name);
+      if (str && g_strcmp0 (str, ext_name) == 0) {
+        gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
+        if (id > 0 && id < 15) {
+          extmap_id = id;
+          break;
+        }
+      }
+    }
+  }
+  return extmap_id;
+}
+
 /**
  * rtp_session_update_send_caps:
  * @sess: an #RTPSession
@@ -3075,8 +3196,30 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
   } else {
     sess->internal_ssrc_from_caps_or_property = FALSE;
   }
+
+  sess->twcc_send_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
+  if (sess->twcc_send_ext_id > 0) {
+    GST_INFO ("TWCC enabled for send using extension id: %u",
+        sess->twcc_send_ext_id);
+  }
+}
+
+static void
+send_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
+{
+  gint32 twcc_seqnum;
+
+  if (sess->twcc_send_ext_id == 0)
+    return;
+
+  twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_send_ext_id);
+  if (twcc_seqnum == -1)
+    return;
+
+  rtp_twcc_manager_send_packet (sess->twcc, twcc_seqnum, pinfo);
 }
 
+
 /**
  * rtp_session_send_rtp:
  * @sess: an #RTPSession
@@ -3111,6 +3254,8 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
           current_time, running_time, -1))
     goto invalid_packet;
 
+  send_twcc_packet (sess, &pinfo);
+
   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
   if (created)
     on_new_sender_ssrc (sess, source);
@@ -3168,7 +3313,7 @@ invalid_packet:
 collision:
   {
     g_object_unref (source);
-    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    clean_packet_info (&pinfo);
     RTP_SESSION_UNLOCK (sess);
     GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
         pinfo.ssrc);
@@ -4115,6 +4260,37 @@ remove_closing_sources (const gchar * key, RTPSource * source,
 }
 
 static void
+generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
+{
+  RTPSession *sess = data->sess;
+  GstBuffer *buf;
+
+  /* only generate RTCP for active internal sources */
+  if (!source->internal || source->sent_bye)
+    return;
+
+  /* ignore other sources when we do the timeout after a scheduled BYE */
+  if (sess->scheduled_bye && !source->marked_bye)
+    return;
+
+  /* skip if RTCP is disabled */
+  if (source->disable_rtcp) {
+    GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+    return;
+  }
+
+  while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
+    ReportOutput *output = g_slice_new (ReportOutput);
+    output->source = g_object_ref (source);
+    output->is_bye = FALSE;
+    output->buffer = buf;
+    /* queue the RTCP packet to push later */
+    g_queue_push_tail (&data->output, output);
+  }
+}
+
+
+static void
 generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
 {
   RTPSession *sess = data->sess;
@@ -4338,6 +4514,9 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
       (GHFunc) generate_rtcp, &data);
 
+  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+      (GHFunc) generate_twcc, &data);
+
   /* update the generation for all the sources that have been reported */
   g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
       (GHFunc) update_generation, &data);
@@ -4721,3 +4900,22 @@ no_source:
     return FALSE;
   }
 }
+
+/**
+ * rtp_session_update_recv_caps_structure:
+ * @sess: an #RTPSession
+ * @s: a #GstStructure from a #GstCaps
+ *
+ * Update the caps of the receiver in the rtp session.
+ */
+void
+rtp_session_update_recv_caps_structure (RTPSession * sess,
+    const GstStructure * s)
+{
+  guint8 ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
+  if (ext_id > 0) {
+    sess->twcc_recv_ext_id = ext_id;
+    GST_INFO ("TWCC enabled for recv using extension id: %u",
+        sess->twcc_recv_ext_id);
+  }
+}
index 6aa28ca..949fcc4 100644 (file)
@@ -23,6 +23,7 @@
 #include <gst/gst.h>
 
 #include "rtpsource.h"
+#include "rtptwcc.h"
 
 typedef struct _RTPSession RTPSession;
 typedef struct _RTPSessionClass RTPSessionClass;
@@ -157,6 +158,15 @@ typedef void (*RTPSessionNotifyNACK) (RTPSession *sess,
     guint16 seqnum, guint16 blp, guint32 ssrc, gpointer user_data);
 
 /**
+ * RTPSessionNotifyTWCC:
+ * @user_data: user data specified when registering
+ *
+ * Notifies of Transport-wide congestion control packets and stats.
+ */
+typedef void (*RTPSessionNotifyTWCC) (RTPSession *sess,
+    GstStructure * twcc_packets, GstStructure * twcc_stats, gpointer user_data);
+
+/**
  * RTPSessionReconfigure:
  * @sess: an #RTPSession
  * @user_data: user data specified when registering
@@ -186,6 +196,7 @@ typedef void (*RTPSessionNotifyEarlyRTCP) (RTPSession *sess,
  * @RTPSessionRequestKeyUnit: callback for requesting a new key unit
  * @RTPSessionRequestTime: callback for requesting the current time
  * @RTPSessionNotifyNACK: callback for notifying NACK
+ * @RTPSessionNotifyTWCC: callback for notifying TWCC
  * @RTPSessionReconfigure: callback for requesting reconfiguration
  * @RTPSessionNotifyEarlyRTCP: callback for notifying early RTCP
  *
@@ -203,6 +214,7 @@ typedef struct {
   RTPSessionRequestKeyUnit request_key_unit;
   RTPSessionRequestTime request_time;
   RTPSessionNotifyNACK  notify_nack;
+  RTPSessionNotifyTWCC  notify_twcc;
   RTPSessionReconfigure reconfigure;
   RTPSessionNotifyEarlyRTCP notify_early_rtcp;
 } RTPSessionCallbacks;
@@ -280,6 +292,7 @@ struct _RTPSession {
   gpointer              request_key_unit_user_data;
   gpointer              request_time_user_data;
   gpointer              notify_nack_user_data;
+  gpointer              notify_twcc_user_data;
   gpointer              reconfigure_user_data;
   gpointer              notify_early_rtcp_user_data;
 
@@ -295,6 +308,12 @@ struct _RTPSession {
   GList         *conflicting_addresses;
 
   gboolean timestamp_sender_reports;
+
+  /* Transport-wide cc-extension */
+  RTPTWCCManager *twcc;
+  RTPTWCCStats *twcc_stats;
+  guint8 twcc_recv_ext_id;
+  guint8 twcc_send_ext_id;
 };
 
 /**
@@ -418,5 +437,7 @@ gboolean        rtp_session_request_nack           (RTPSession * sess,
                                                     guint16 seqnum,
                                                     GstClockTime max_delay);
 
+void            rtp_session_update_recv_caps_structure (RTPSession * sess, const GstStructure * s);
+
 
 #endif /* __RTP_SESSION_H__ */
index 6937c3a..a25875a 100644 (file)
@@ -920,8 +920,8 @@ push_packet (RTPSource * src, GstBuffer * buffer)
   return ret;
 }
 
-static gint
-get_clock_rate (RTPSource * src, guint8 payload)
+static void
+fetch_clock_rate_from_payload (RTPSource * src, guint8 payload)
 {
   if (src->payload == -1) {
     /* first payload received, nothing was in the caps, lock on to this payload */
@@ -946,7 +946,6 @@ get_clock_rate (RTPSource * src, guint8 payload)
     src->clock_rate = clock_rate;
     gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate);
   }
-  return src->clock_rate;
 }
 
 /* Jitter is the variation in the delay of received packets in a flow. It is
@@ -960,26 +959,23 @@ calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo)
   GstClockTime running_time;
   guint32 rtparrival, transit, rtptime;
   gint32 diff;
-  gint clock_rate;
-  guint8 pt;
 
   /* get arrival time */
   if ((running_time = pinfo->running_time) == GST_CLOCK_TIME_NONE)
     goto no_time;
 
-  pt = pinfo->pt;
+  GST_LOG ("SSRC %08x got payload %d", src->ssrc, pinfo->pt);
 
-  GST_LOG ("SSRC %08x got payload %d", src->ssrc, pt);
-
-  /* get clockrate */
-  if ((clock_rate = get_clock_rate (src, pt)) == -1)
+  /* check if clock-rate is valid */
+  if (src->clock_rate == -1)
     goto no_clock_rate;
 
   rtptime = pinfo->rtptime;
 
   /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
    * care about the absolute value, just the difference. */
-  rtparrival = gst_util_uint64_scale_int (running_time, clock_rate, GST_SECOND);
+  rtparrival =
+      gst_util_uint64_scale_int (running_time, src->clock_rate, GST_SECOND);
 
   /* transit time is difference with RTP timestamp */
   transit = rtparrival - rtptime;
@@ -1002,7 +998,7 @@ calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo)
   src->stats.last_rtptime = rtparrival;
 
   GST_LOG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %f",
-      rtparrival, rtptime, clock_rate, diff, (src->stats.jitter) / 16.0);
+      rtparrival, rtptime, src->clock_rate, diff, (src->stats.jitter) / 16.0);
 
   return;
 
@@ -1014,7 +1010,7 @@ no_time:
   }
 no_clock_rate:
   {
-    GST_WARNING ("cannot get clock-rate for pt %d", pt);
+    GST_WARNING ("cannot get clock-rate for pt %d", pinfo->pt);
     return;
   }
 }
@@ -1265,6 +1261,8 @@ rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo)
   g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
   g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR);
 
+  fetch_clock_rate_from_payload (src, pinfo->pt);
+
   if (!update_receiver_stats (src, pinfo, TRUE))
     return GST_FLOW_OK;
 
@@ -1553,7 +1551,7 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime,
   if (src->clock_rate == -1 && src->pt_set) {
     GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt,
         src->ssrc);
-    get_clock_rate (src, src->pt);
+    fetch_clock_rate_from_payload (src, src->pt);
   }
 
   if (src->clock_rate != -1) {
index c07ac9b..88e5f07 100644 (file)
  * Boston, MA 02110-1301, USA.
  */
 
+#define GLIB_DISABLE_DEPRECATION_WARNINGS
+
 #include "rtpstats.h"
+#include "rtptwcc.h"
 
 void
 gst_rtp_packet_rate_ctx_reset (RTPPacketRateCtx * ctx, gint32 clock_rate)
@@ -445,3 +448,230 @@ __g_socket_address_to_string (GSocketAddress * addr)
 
   return ret;
 }
+
+static void
+_append_structure_to_value_array (GValueArray * array, GstStructure * s)
+{
+  GValue *val;
+  g_value_array_append (array, NULL);
+  val = g_value_array_get_nth (array, array->n_values - 1);
+  g_value_init (val, GST_TYPE_STRUCTURE);
+  g_value_take_boxed (val, s);
+}
+
+static void
+_structure_take_value_array (GstStructure * s,
+    const gchar * field_name, GValueArray * array)
+{
+  GValue value = G_VALUE_INIT;
+  g_value_init (&value, G_TYPE_VALUE_ARRAY);
+  g_value_take_boxed (&value, array);
+  gst_structure_take_value (s, field_name, &value);
+  g_value_unset (&value);
+}
+
+GstStructure *
+rtp_twcc_stats_get_packets_structure (GArray * twcc_packets)
+{
+  GstStructure *ret = gst_structure_new_empty ("RTPTWCCPackets");
+  GValueArray *array = g_value_array_new (0);
+  guint i;
+
+  for (i = 0; i < twcc_packets->len; i++) {
+    RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i);
+
+    GstStructure *pkt_s = gst_structure_new ("RTPTWCCPacket",
+        "seqnum", G_TYPE_UINT, pkt->seqnum,
+        "local-ts", G_TYPE_UINT64, pkt->local_ts,
+        "remote-ts", G_TYPE_UINT64, pkt->remote_ts,
+        "size", G_TYPE_UINT, pkt->size,
+        "lost", G_TYPE_BOOLEAN, pkt->status == RTP_TWCC_PACKET_STATUS_NOT_RECV,
+        NULL);
+    _append_structure_to_value_array (array, pkt_s);
+  }
+
+  _structure_take_value_array (ret, "packets", array);
+  return ret;
+}
+
+static void
+rtp_twcc_stats_calculate_stats (RTPTWCCStats * stats, GArray * twcc_packets)
+{
+  guint packets_recv = 0;
+  guint i;
+
+  for (i = 0; i < twcc_packets->len; i++) {
+    RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i);
+
+    if (pkt->status != RTP_TWCC_PACKET_STATUS_NOT_RECV)
+      packets_recv++;
+
+    if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts) &&
+        GST_CLOCK_TIME_IS_VALID (stats->last_local_ts)) {
+      pkt->local_delta = GST_CLOCK_DIFF (stats->last_local_ts, pkt->local_ts);
+    }
+
+    if (GST_CLOCK_TIME_IS_VALID (pkt->remote_ts) &&
+        GST_CLOCK_TIME_IS_VALID (stats->last_remote_ts)) {
+      pkt->remote_delta =
+          GST_CLOCK_DIFF (stats->last_remote_ts, pkt->remote_ts);
+    }
+
+    if (GST_CLOCK_STIME_IS_VALID (pkt->local_delta) &&
+        GST_CLOCK_STIME_IS_VALID (pkt->remote_delta)) {
+      pkt->delta_delta = pkt->remote_delta - pkt->local_delta;
+    }
+
+    stats->last_local_ts = pkt->local_ts;
+    stats->last_remote_ts = pkt->remote_ts;
+  }
+
+  stats->packets_sent = twcc_packets->len;
+  stats->packets_recv = packets_recv;
+}
+
+static gint
+_get_window_start_index (RTPTWCCStats * stats, GstClockTime duration,
+    GstClockTime * local_duration, GstClockTime * remote_duration)
+{
+  RTPTWCCPacket *last = NULL;
+  guint i;
+
+  if (stats->packets->len < 2)
+    return -1;
+
+  for (i = 0; i < stats->packets->len; i++) {
+    guint start_index = stats->packets->len - 1 - i;
+    RTPTWCCPacket *pkt =
+        &g_array_index (stats->packets, RTPTWCCPacket, start_index);
+    if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts)
+        && GST_CLOCK_TIME_IS_VALID (pkt->remote_ts)) {
+      /* first find the last valid packet */
+      if (last == NULL) {
+        last = pkt;
+      } else {
+        /* and then get the duration in local ts */
+        GstClockTimeDiff ld = GST_CLOCK_DIFF (pkt->local_ts, last->local_ts);
+        if (ld >= duration) {
+          *local_duration = ld;
+          *remote_duration = GST_CLOCK_DIFF (pkt->remote_ts, last->remote_ts);
+          return start_index;
+        }
+      }
+    }
+  }
+
+  return -1;
+}
+
+static void
+rtp_twcc_stats_calculate_windowed_stats (RTPTWCCStats * stats)
+{
+  guint i;
+  gint start_idx;
+  guint bits_sent = 0;
+  guint bits_recv = 0;
+  guint packets_sent = 0;
+  guint packets_recv = 0;
+  guint packets_lost;
+  GstClockTimeDiff delta_delta_sum = 0;
+  guint delta_delta_count = 0;
+  GstClockTime local_duration;
+  GstClockTime remote_duration;
+
+  start_idx = _get_window_start_index (stats, stats->window_size,
+      &local_duration, &remote_duration);
+  if (start_idx == -1) {
+    return;
+  }
+
+  /* remove the old packets */
+  if (start_idx > 0)
+    g_array_remove_range (stats->packets, 0, start_idx);
+
+  packets_sent = stats->packets->len - 1;
+
+  for (i = 0; i < packets_sent; i++) {
+    RTPTWCCPacket *pkt = &g_array_index (stats->packets, RTPTWCCPacket, i);
+
+    if (GST_CLOCK_TIME_IS_VALID (pkt->local_ts)) {
+      bits_sent += pkt->size * 8;
+    }
+
+    if (GST_CLOCK_TIME_IS_VALID (pkt->remote_ts)) {
+      bits_recv += pkt->size * 8;
+      packets_recv++;
+    }
+
+    if (GST_CLOCK_STIME_IS_VALID (pkt->delta_delta)) {
+      delta_delta_sum += pkt->delta_delta;
+      delta_delta_count++;
+    }
+  }
+
+  packets_lost = packets_sent - packets_recv;
+  stats->packet_loss_pct = (packets_lost * 100) / (gfloat) packets_sent;
+
+  if (delta_delta_count) {
+    GstClockTimeDiff avg_delta_of_delta = delta_delta_sum / delta_delta_count;
+    if (GST_CLOCK_STIME_IS_VALID (stats->avg_delta_of_delta)) {
+      stats->avg_delta_of_delta_change =
+          (avg_delta_of_delta -
+          stats->avg_delta_of_delta) / (250 * GST_USECOND);
+    }
+    stats->avg_delta_of_delta = avg_delta_of_delta;
+  }
+
+  stats->bitrate_sent =
+      gst_util_uint64_scale (bits_sent, GST_SECOND, local_duration);
+  stats->bitrate_recv =
+      gst_util_uint64_scale (bits_recv, GST_SECOND, remote_duration);
+
+  GST_DEBUG ("Got stats: bits_sent: %u, bits_recv: %u, packets_sent = %u, "
+      "packets_recv: %u, packetlost_pct = %f, sent_bitrate = %u, "
+      "recv_bitrate = %u, delta-delta-avg = %" GST_STIME_FORMAT ", "
+      "delta-delta-change: %f", bits_sent, bits_recv, stats->packets_sent,
+      packets_recv, stats->packet_loss_pct, stats->bitrate_sent,
+      stats->bitrate_recv, GST_STIME_ARGS (stats->avg_delta_of_delta),
+      stats->avg_delta_of_delta_change);
+}
+
+RTPTWCCStats *
+rtp_twcc_stats_new (void)
+{
+  RTPTWCCStats *stats = g_new0 (RTPTWCCStats, 1);
+  stats->packets = g_array_new (FALSE, FALSE, sizeof (RTPTWCCPacket));
+  stats->last_local_ts = GST_CLOCK_TIME_NONE;
+  stats->last_remote_ts = GST_CLOCK_TIME_NONE;
+  stats->avg_delta_of_delta = GST_CLOCK_STIME_NONE;
+  stats->window_size = 300 * GST_MSECOND;       /* FIXME: could be configurable? */
+  return stats;
+}
+
+void
+rtp_twcc_stats_free (RTPTWCCStats * stats)
+{
+  g_array_unref (stats->packets);
+  g_free (stats);
+}
+
+static GstStructure *
+rtp_twcc_stats_get_stats_structure (RTPTWCCStats * stats)
+{
+  return gst_structure_new ("RTPTWCCStats",
+      "bitrate-sent", G_TYPE_UINT, stats->bitrate_sent,
+      "bitrate-recv", G_TYPE_UINT, stats->bitrate_recv,
+      "packets-sent", G_TYPE_UINT, stats->packets_sent,
+      "packets-recv", G_TYPE_UINT, stats->packets_recv,
+      "packet-loss-pct", G_TYPE_DOUBLE, stats->packet_loss_pct,
+      "avg-delta-of-delta", G_TYPE_INT64, stats->avg_delta_of_delta, NULL);
+}
+
+GstStructure *
+rtp_twcc_stats_process_packets (RTPTWCCStats * stats, GArray * twcc_packets)
+{
+  rtp_twcc_stats_calculate_stats (stats, twcc_packets);
+  g_array_append_vals (stats->packets, twcc_packets->data, twcc_packets->len);
+  rtp_twcc_stats_calculate_windowed_stats (stats);
+  return rtp_twcc_stats_get_stats_structure (stats);
+}
index bd3a54e..776651f 100644 (file)
@@ -77,6 +77,10 @@ typedef struct {
  * @seqnum: the seqnum of the packet
  * @pt: the payload type of the packet
  * @rtptime: the RTP time of the packet
+ * @marker: the marker bit
+ *
+ * @tw_seqnum_ext_id: the extension-header ID for transport-wide seqnums
+ * @tw_seqnum: the transport-wide seqnum of the packet
  *
  * Structure holding information about the packet.
  */
@@ -97,8 +101,11 @@ typedef struct {
   guint16       seqnum;
   guint8        pt;
   guint32       rtptime;
+  gboolean      marker;
   guint32       csrc_count;
   guint32       csrcs[16];
+  GBytes        *header_ext;
+  guint16       header_ext_bit_pattern;
 } RTPPacketInfo;
 
 /**
@@ -245,6 +252,27 @@ typedef struct {
   guint         nacks_received;
 } RTPSessionStats;
 
+/**
+ * RTPTWCCStats:
+ *
+ * Stats kept for a session and used to produce TWCC stats.
+ */
+typedef struct {
+  GArray       *packets;
+  GstClockTime window_size;
+  GstClockTime  last_local_ts;
+  GstClockTime  last_remote_ts;
+
+  guint bitrate_sent;
+  guint bitrate_recv;
+  guint packets_sent;
+  guint packets_recv;
+  gfloat packet_loss_pct;
+  GstClockTimeDiff avg_delta_of_delta;
+  gfloat avg_delta_of_delta_change;
+} RTPTWCCStats;
+
+
 void           rtp_stats_init_defaults              (RTPSessionStats *stats);
 
 void           rtp_stats_set_bandwidths             (RTPSessionStats *stats,
@@ -264,4 +292,10 @@ void           rtp_stats_set_min_interval           (RTPSessionStats *stats,
 gboolean __g_socket_address_equal (GSocketAddress *a, GSocketAddress *b);
 gchar * __g_socket_address_to_string (GSocketAddress * addr);
 
+RTPTWCCStats * rtp_twcc_stats_new (void);
+void rtp_twcc_stats_free (RTPTWCCStats * stats);
+GstStructure * rtp_twcc_stats_process_packets (RTPTWCCStats * stats,
+    GArray * twcc_packets);
+GstStructure * rtp_twcc_stats_get_packets_structure (GArray * twcc_packets);
+
 #endif /* __RTP_STATS_H__ */
diff --git a/gst/rtpmanager/rtptwcc.c b/gst/rtpmanager/rtptwcc.c
new file mode 100644 (file)
index 0000000..0d7b446
--- /dev/null
@@ -0,0 +1,888 @@
+/* GStreamer
+ * Copyright (C)  2019 Pexip (http://pexip.com/)
+ *   @author: Havard Graff <havard@pexip.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+#include "rtptwcc.h"
+#include <gst/rtp/gstrtcpbuffer.h>
+#include <gst/base/gstbitreader.h>
+#include <gst/base/gstbitwriter.h>
+
+GST_DEBUG_CATEGORY_EXTERN (rtp_session_debug);
+#define GST_CAT_DEFAULT rtp_session_debug
+
+#define REF_TIME_UNIT (64 * GST_MSECOND)
+#define DELTA_UNIT (250 * GST_USECOND)
+#define MAX_TS_DELTA (0xff * DELTA_UNIT)
+
+struct _RTPTWCCManager
+{
+  guint mtu;
+  guint max_packets_per_rtcp;
+  GArray *recv_packets;
+
+  guint8 fb_pkt_count;
+  gint32 last_seqnum;
+
+  GArray *sent_packets;
+  GArray *parsed_packets;
+  GQueue *rtcp_buffers;
+
+  guint64 recv_sender_ssrc;
+  guint64 recv_media_ssrc;
+
+  guint16 expected_recv_seqnum;
+
+  gboolean first_fci_parse;
+  guint16 expected_parsed_seqnum;
+  guint8 expected_parsed_fb_pkt_count;
+};
+
+typedef enum
+{
+  RTP_TWCC_CHUNK_TYPE_RUN_LENGTH = 0,
+  RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR = 1,
+} RTPTWCCChunkType;
+
+typedef struct
+{
+  guint8 base_seqnum[2];
+  guint8 packet_count[2];
+  guint8 base_time[3];
+  guint8 fb_pkt_count[1];
+} RTPTWCCHeader;
+
+typedef struct
+{
+  GstClockTime ts;
+  guint16 seqnum;
+
+  gint64 delta;
+  RTPTWCCPacketStatus status;
+  guint16 missing_run;
+  guint equal_run;
+} RecvPacket;
+
+typedef struct
+{
+  GstClockTime ts;
+  GstClockTime socket_ts;
+  GstClockTime remote_ts;
+  guint16 seqnum;
+  guint size;
+  gboolean lost;
+} SentPacket;
+
+RTPTWCCManager *
+rtp_twcc_manager_new (guint mtu)
+{
+  RTPTWCCManager *twcc = g_new0 (RTPTWCCManager, 1);
+
+  twcc->recv_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket));
+
+  twcc->sent_packets = g_array_new (FALSE, FALSE, sizeof (SentPacket));
+  twcc->parsed_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket));
+
+  twcc->rtcp_buffers = g_queue_new ();
+
+  twcc->last_seqnum = -1;
+  twcc->recv_media_ssrc = -1;
+  twcc->recv_sender_ssrc = -1;
+
+  rtp_twcc_manager_set_mtu (twcc, mtu);
+
+  twcc->first_fci_parse = TRUE;
+
+  return twcc;
+}
+
+void
+rtp_twcc_manager_free (RTPTWCCManager * twcc)
+{
+  g_array_unref (twcc->recv_packets);
+  g_array_unref (twcc->sent_packets);
+  g_array_unref (twcc->parsed_packets);
+  g_queue_free_full (twcc->rtcp_buffers, (GDestroyNotify) gst_buffer_unref);
+  g_free (twcc);
+}
+
+static void
+recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
+{
+  memset (packet, 0, sizeof (RecvPacket));
+  packet->seqnum = seqnum;
+  packet->ts = pinfo->running_time;
+}
+
+void
+rtp_twcc_manager_set_mtu (RTPTWCCManager * twcc, guint mtu)
+{
+  twcc->mtu = mtu;
+
+  /* the absolute worst case is that 7 packets uses
+     header (4 * 4 * 4) 32 bytes) and 
+     packet_chunk 2 bytes +  
+     recv_deltas (2 * 7) 14 bytes */
+  twcc->max_packets_per_rtcp = ((twcc->mtu - 32) * 7) / (2 + 14);
+}
+
+static gint
+_twcc_seqnum_sort (gconstpointer a, gconstpointer b)
+{
+  gint32 seqa = ((RecvPacket *) a)->seqnum;
+  gint32 seqb = ((RecvPacket *) b)->seqnum;
+  gint res = seqa - seqb;
+  if (res < -65000)
+    res = 1;
+  if (res > 65000)
+    res = -1;
+  return res;
+}
+
+static void
+rtp_twcc_write_recv_deltas (guint8 * fci_data, GArray * twcc_packets)
+{
+  guint i;
+  for (i = 0; i < twcc_packets->len; i++) {
+    RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i);
+
+    if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) {
+      GST_WRITE_UINT8 (fci_data, pkt->delta);
+      fci_data += 1;
+    } else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) {
+      GST_WRITE_UINT16_BE (fci_data, pkt->delta);
+      fci_data += 2;
+    }
+  }
+}
+
+static void
+rtp_twcc_write_run_length_chunk (GArray * packet_chunks,
+    RTPTWCCPacketStatus status, guint run_length)
+{
+  guint written = 0;
+  while (written < run_length) {
+    GstBitWriter writer;
+    guint16 data = 0;
+    guint len = MIN (run_length - written, 8191);
+
+    GST_LOG ("Writing a run-lenght of %u with status %u", len, status);
+
+    gst_bit_writer_init_with_data (&writer, (guint8 *) & data, 2, FALSE);
+    gst_bit_writer_put_bits_uint8 (&writer, RTP_TWCC_CHUNK_TYPE_RUN_LENGTH, 1);
+    gst_bit_writer_put_bits_uint8 (&writer, status, 2);
+    gst_bit_writer_put_bits_uint16 (&writer, len, 13);
+    g_array_append_val (packet_chunks, data);
+    written += len;
+  }
+}
+
+typedef struct
+{
+  GArray *packet_chunks;
+  GstBitWriter writer;
+  guint16 data;
+  guint symbol_size;
+} ChunkBitWriter;
+
+static void
+chunk_bit_writer_reset (ChunkBitWriter * writer)
+{
+  writer->data = 0;
+  gst_bit_writer_init_with_data (&writer->writer,
+      (guint8 *) & writer->data, 2, FALSE);
+
+  gst_bit_writer_put_bits_uint8 (&writer->writer,
+      RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR, 1);
+  /* 1 for 2-bit symbol-size, 0 for 1-bit */
+  gst_bit_writer_put_bits_uint8 (&writer->writer, writer->symbol_size - 1, 1);
+}
+
+static void
+chunk_bit_writer_configure (ChunkBitWriter * writer, guint symbol_size)
+{
+  writer->symbol_size = symbol_size;
+  chunk_bit_writer_reset (writer);
+}
+
+static gboolean
+chunk_bit_writer_is_empty (ChunkBitWriter * writer)
+{
+  return writer->writer.bit_size == 2;
+}
+
+static gboolean
+chunk_bit_writer_is_full (ChunkBitWriter * writer)
+{
+  return writer->writer.bit_size == 16;
+}
+
+static guint
+chunk_bit_writer_get_available_slots (ChunkBitWriter * writer)
+{
+  return (16 - writer->writer.bit_size) / writer->symbol_size;
+}
+
+static guint
+chunk_bit_writer_get_total_slots (ChunkBitWriter * writer)
+{
+  return 14 / writer->symbol_size;
+}
+
+static void
+chunk_bit_writer_flush (ChunkBitWriter * writer)
+{
+  /* don't append a chunk if no bits have been written */
+  if (!chunk_bit_writer_is_empty (writer)) {
+    g_array_append_val (writer->packet_chunks, writer->data);
+    chunk_bit_writer_reset (writer);
+  }
+}
+
+static void
+chunk_bit_writer_init (ChunkBitWriter * writer,
+    GArray * packet_chunks, guint symbol_size)
+{
+  writer->packet_chunks = packet_chunks;
+  chunk_bit_writer_configure (writer, symbol_size);
+}
+
+static void
+chunk_bit_writer_write (ChunkBitWriter * writer, RTPTWCCPacketStatus status)
+{
+  gst_bit_writer_put_bits_uint8 (&writer->writer, status, writer->symbol_size);
+  if (chunk_bit_writer_is_full (writer)) {
+    chunk_bit_writer_flush (writer);
+  }
+}
+
+static void
+rtp_twcc_write_status_vector_chunk (ChunkBitWriter * writer, RecvPacket * pkt)
+{
+  if (pkt->missing_run > 0) {
+    guint available = chunk_bit_writer_get_available_slots (writer);
+    guint total = chunk_bit_writer_get_total_slots (writer);
+    if (pkt->missing_run > (available + total)) {
+      /* here it is better to finish up the current status-chunk and then
+         go for run-length */
+      for (guint i = 0; i < available; i++) {
+        chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV);
+      }
+      rtp_twcc_write_run_length_chunk (writer->packet_chunks,
+          RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run - available);
+    } else {
+      for (guint i = 0; i < pkt->missing_run; i++) {
+        chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV);
+      }
+    }
+  }
+
+  chunk_bit_writer_write (writer, pkt->status);
+}
+
+typedef struct
+{
+  RecvPacket *equal;
+} RunLengthHelper;
+
+static void
+run_lenght_helper_update (RunLengthHelper * rlh, RecvPacket * pkt)
+{
+  /* for missing packets we reset */
+  if (pkt->missing_run > 0) {
+    rlh->equal = NULL;
+  }
+
+  /* all status equal run */
+  if (rlh->equal == NULL) {
+    rlh->equal = pkt;
+    rlh->equal->equal_run = 0;
+  }
+
+  if (rlh->equal->status == pkt->status) {
+    rlh->equal->equal_run++;
+  } else {
+    rlh->equal = pkt;
+    rlh->equal->equal_run = 1;
+  }
+}
+
+static void
+rtp_twcc_write_chunks (GArray * packet_chunks,
+    GArray * twcc_packets, guint symbol_size)
+{
+  ChunkBitWriter writer;
+  guint i;
+  guint bits_per_chunks = 7 * symbol_size;
+
+  chunk_bit_writer_init (&writer, packet_chunks, symbol_size);
+
+  for (i = 0; i < twcc_packets->len; i++) {
+    RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i);
+    guint remaining_packets = twcc_packets->len - i;
+
+    /* we can only start a run-length chunk if the status-chunk is
+       completed */
+    if (chunk_bit_writer_is_empty (&writer)) {
+      /* first write in any preceeding gaps, we use run-length
+         if it would take up more than one chunk (14/7) */
+      if (pkt->missing_run > bits_per_chunks) {
+        rtp_twcc_write_run_length_chunk (packet_chunks,
+            RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run);
+      }
+
+      /* we have a run of the same status, write a run-length chunk and skip
+         to the next point */
+      if (pkt->missing_run == 0 &&
+          (pkt->equal_run > bits_per_chunks ||
+              pkt->equal_run == remaining_packets)) {
+        rtp_twcc_write_run_length_chunk (packet_chunks,
+            pkt->status, pkt->equal_run);
+        i += pkt->equal_run - 1;
+        continue;
+      }
+    }
+
+    GST_LOG ("i=%u: Writing a %u-bit vector of status: %u",
+        i, symbol_size, pkt->status);
+    rtp_twcc_write_status_vector_chunk (&writer, pkt);
+  }
+  chunk_bit_writer_flush (&writer);
+}
+
+static void
+rtp_twcc_manager_add_fci (RTPTWCCManager * twcc, GstRTCPPacket * packet)
+{
+  RecvPacket *first, *last, *prev;
+  guint16 packet_count;
+  GstClockTime base_time;
+  GstClockTime ts_rounded;
+  guint i;
+  GArray *packet_chunks = g_array_new (FALSE, FALSE, 2);
+  RTPTWCCHeader header;
+  guint header_size = sizeof (RTPTWCCHeader);
+  guint packet_chunks_size;
+  guint recv_deltas_size = 0;
+  guint16 fci_length;
+  guint16 fci_chunks;
+  guint8 *fci_data;
+  guint8 *fci_data_ptr;
+  RunLengthHelper rlh = { NULL };
+  guint symbol_size = 1;
+  GstClockTimeDiff delta_ts;
+  gint64 delta_ts_rounded;
+
+  g_array_sort (twcc->recv_packets, _twcc_seqnum_sort);
+
+  /* get first and last packet */
+  first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
+  last =
+      &g_array_index (twcc->recv_packets, RecvPacket,
+      twcc->recv_packets->len - 1);
+
+  packet_count = last->seqnum - first->seqnum + 1;
+  base_time = first->ts / REF_TIME_UNIT;
+
+  GST_WRITE_UINT16_BE (header.base_seqnum, first->seqnum);
+  GST_WRITE_UINT16_BE (header.packet_count, packet_count);
+  GST_WRITE_UINT24_BE (header.base_time, base_time);
+  GST_WRITE_UINT8 (header.fb_pkt_count, twcc->fb_pkt_count);
+
+  base_time *= REF_TIME_UNIT;
+  ts_rounded = base_time;
+
+  GST_DEBUG ("Created TWCC feedback: base_seqnum: #%u, packet_count: %u, "
+      "base_time %" GST_TIME_FORMAT " fb_pkt_count: %u",
+      first->seqnum, packet_count, GST_TIME_ARGS (base_time),
+      twcc->fb_pkt_count);
+
+  twcc->fb_pkt_count++;
+  twcc->expected_recv_seqnum = first->seqnum + packet_count;
+
+  /* calculate all deltas and check for gaps etc */
+  prev = first;
+  for (i = 0; i < twcc->recv_packets->len; i++) {
+    RecvPacket *pkt = &g_array_index (twcc->recv_packets, RecvPacket, i);
+    if (i != 0) {
+      pkt->missing_run = pkt->seqnum - prev->seqnum - 1;
+    }
+
+    delta_ts = GST_CLOCK_DIFF (ts_rounded, pkt->ts);
+    pkt->delta = delta_ts / DELTA_UNIT;
+    delta_ts_rounded = pkt->delta * DELTA_UNIT;
+    ts_rounded += delta_ts_rounded;
+
+    if (delta_ts_rounded < 0 || delta_ts_rounded > MAX_TS_DELTA) {
+      pkt->status = RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA;
+      recv_deltas_size += 2;
+      symbol_size = 2;
+    } else {
+      pkt->status = RTP_TWCC_PACKET_STATUS_SMALL_DELTA;
+      recv_deltas_size += 1;
+    }
+    run_lenght_helper_update (&rlh, pkt);
+
+    GST_LOG ("pkt: #%u, ts: %" GST_TIME_FORMAT
+        " ts_rounded: %" GST_TIME_FORMAT
+        " delta_ts: %" GST_STIME_FORMAT
+        " delta_ts_rounded: %" GST_STIME_FORMAT
+        " missing_run: %u, status: %u", pkt->seqnum,
+        GST_TIME_ARGS (pkt->ts), GST_TIME_ARGS (ts_rounded),
+        GST_STIME_ARGS (delta_ts), GST_STIME_ARGS (delta_ts_rounded),
+        pkt->missing_run, pkt->status);
+    prev = pkt;
+  }
+
+  rtp_twcc_write_chunks (packet_chunks, twcc->recv_packets, symbol_size);
+
+  packet_chunks_size = packet_chunks->len * 2;
+  fci_length = header_size + packet_chunks_size + recv_deltas_size;
+  fci_chunks = (fci_length - 1) / sizeof (guint32) + 1;
+
+  if (!gst_rtcp_packet_fb_set_fci_length (packet, fci_chunks)) {
+    GST_ERROR ("Could not fit: %u packets", packet_count);
+    g_assert_not_reached ();
+  }
+
+  fci_data = gst_rtcp_packet_fb_get_fci (packet);
+  fci_data_ptr = fci_data;
+
+  memcpy (fci_data_ptr, &header, header_size);
+  fci_data_ptr += header_size;
+
+  memcpy (fci_data_ptr, packet_chunks->data, packet_chunks_size);
+  fci_data_ptr += packet_chunks_size;
+
+  rtp_twcc_write_recv_deltas (fci_data_ptr, twcc->recv_packets);
+
+  GST_MEMDUMP ("twcc-header:", (guint8 *) & header, header_size);
+  GST_MEMDUMP ("packet-chunks:", (guint8 *) packet_chunks->data,
+      packet_chunks_size);
+  GST_MEMDUMP ("full fci:", fci_data, fci_length);
+
+  g_array_unref (packet_chunks);
+  g_array_set_size (twcc->recv_packets, 0);
+}
+
+static void
+rtp_twcc_manager_create_feedback (RTPTWCCManager * twcc)
+{
+  GstBuffer *buf;
+  GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+  GstRTCPPacket packet;
+
+  buf = gst_rtcp_buffer_new (twcc->mtu);
+
+  gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp);
+
+  gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_RTPFB, &packet);
+
+  gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_RTPFB_TYPE_TWCC);
+  if (twcc->recv_sender_ssrc != 1)
+    gst_rtcp_packet_fb_set_sender_ssrc (&packet, twcc->recv_sender_ssrc);
+  gst_rtcp_packet_fb_set_media_ssrc (&packet, twcc->recv_media_ssrc);
+
+  rtp_twcc_manager_add_fci (twcc, &packet);
+
+  gst_rtcp_buffer_unmap (&rtcp);
+
+  g_queue_push_tail (twcc->rtcp_buffers, buf);
+}
+
+/* we have calculated a (very pessimistic) max-packets per RTCP feedback,
+   so this is to make sure we don't exceed that */
+static gboolean
+_exceeds_max_packets (RTPTWCCManager * twcc, guint16 seqnum)
+{
+  RecvPacket *first, *last;
+  guint16 packet_count;
+
+  if (twcc->recv_packets->len == 0)
+    return FALSE;
+
+  /* find the delta betwen first stored packet and this seqnum */
+  first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
+  packet_count = seqnum - first->seqnum + 1;
+  if (packet_count > twcc->max_packets_per_rtcp)
+    return TRUE;
+
+  /* then find the delta between last stored packet and this seqnum */
+  last =
+      &g_array_index (twcc->recv_packets, RecvPacket,
+      twcc->recv_packets->len - 1);
+  packet_count = seqnum - (last->seqnum + 1);
+  if (packet_count > twcc->max_packets_per_rtcp)
+    return TRUE;
+
+  return FALSE;
+}
+
+/* in this case we could have lost the packet with the marker bit,
+   so with a large (30) amount of packets, lost packets and still no marker,
+   we send a feedback anyway */
+static gboolean
+_many_packets_some_lost (RTPTWCCManager * twcc, guint16 seqnum)
+{
+  RecvPacket *first;
+  guint16 packet_count;
+  guint received_packets = twcc->recv_packets->len;
+  if (received_packets == 0)
+    return FALSE;
+
+  first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
+  packet_count = seqnum - first->seqnum + 1;
+  /* packet-count larger than recevied-packets means we have lost packets */
+  if (packet_count >= 30 && packet_count > received_packets)
+    return TRUE;
+
+  return FALSE;
+}
+
+gboolean
+rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc,
+    guint16 seqnum, RTPPacketInfo * pinfo)
+{
+  gboolean send_feedback = FALSE;
+  RecvPacket packet;
+  gint32 diff;
+
+  /* if this packet would exceed the capacity of our MTU, we create a feedback
+     with the current packets, and start over with this one */
+  if (_exceeds_max_packets (twcc, seqnum)) {
+    GST_INFO ("twcc-seqnum: %u would overflow max packets: %u, create feedback"
+        " with current packets", seqnum, twcc->max_packets_per_rtcp);
+    rtp_twcc_manager_create_feedback (twcc);
+    send_feedback = TRUE;
+  }
+
+  /* we can have multiple ssrcs here, so just pick the first one */
+  if (twcc->recv_media_ssrc == -1)
+    twcc->recv_media_ssrc = pinfo->ssrc;
+
+  /* check if we are reordered, and treat it as lost if we already sent
+     a feedback msg with a higher seqnum. If the diff is huge, treat
+     it as a restart of a stream */
+  diff = (gint32) seqnum - (gint32) twcc->expected_recv_seqnum;
+  if (twcc->fb_pkt_count > 0 && diff < 0 && diff > -1000) {
+    GST_INFO ("Received out of order packet (%u after %u), treating as lost",
+        seqnum, twcc->expected_recv_seqnum);
+    return FALSE;
+  }
+
+  /* store the packet for Transport-wide RTCP feedback message */
+  recv_packet_init (&packet, seqnum, pinfo);
+  g_array_append_val (twcc->recv_packets, packet);
+  twcc->last_seqnum = seqnum;
+  GST_LOG ("Receive: twcc-seqnum: %u, marker: %d, ts: %" GST_TIME_FORMAT,
+      seqnum, pinfo->marker, GST_TIME_ARGS (pinfo->running_time));
+
+  if (pinfo->marker || _many_packets_some_lost (twcc, seqnum)) {
+    rtp_twcc_manager_create_feedback (twcc);
+    send_feedback = TRUE;
+  }
+
+  return send_feedback;
+}
+
+static void
+_change_rtcp_fb_sender_ssrc (GstBuffer * buf, guint32 sender_ssrc)
+{
+  GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+  GstRTCPPacket packet;
+  gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp);
+  gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
+  gst_rtcp_packet_fb_set_sender_ssrc (&packet, sender_ssrc);
+  gst_rtcp_buffer_unmap (&rtcp);
+}
+
+GstBuffer *
+rtp_twcc_manager_get_feedback (RTPTWCCManager * twcc, guint sender_ssrc)
+{
+  GstBuffer *buf;
+  buf = g_queue_pop_head (twcc->rtcp_buffers);
+
+  if (buf && twcc->recv_sender_ssrc != sender_ssrc) {
+    _change_rtcp_fb_sender_ssrc (buf, sender_ssrc);
+    twcc->recv_sender_ssrc = sender_ssrc;
+  }
+
+  return buf;
+}
+
+static void
+sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
+{
+  packet->seqnum = seqnum;
+  packet->ts = pinfo->running_time;
+  packet->size = pinfo->payload_len;
+  packet->remote_ts = GST_CLOCK_TIME_NONE;
+  packet->socket_ts = GST_CLOCK_TIME_NONE;
+  packet->lost = FALSE;
+}
+
+void
+rtp_twcc_manager_send_packet (RTPTWCCManager * twcc,
+    guint16 seqnum, RTPPacketInfo * pinfo)
+{
+  SentPacket packet;
+  sent_packet_init (&packet, seqnum, pinfo);
+  g_array_append_val (twcc->sent_packets, packet);
+
+  GST_LOG ("Send: twcc-seqnum: %u, marker: %d, ts: %" GST_TIME_FORMAT,
+      seqnum, pinfo->marker, GST_TIME_ARGS (pinfo->running_time));
+}
+
+void
+rtp_twcc_manager_set_send_packet_ts (RTPTWCCManager * twcc,
+    guint packet_id, GstClockTime ts)
+{
+  SentPacket *pkt = NULL;
+  pkt = &g_array_index (twcc->sent_packets, SentPacket, packet_id);
+  if (pkt) {
+    pkt->socket_ts = ts;
+    GST_DEBUG ("assigning: pkt-id: %u to packet: %u", packet_id, pkt->seqnum);
+  }
+}
+
+static void
+_add_twcc_packet (GArray * twcc_packets, guint16 seqnum, guint status)
+{
+  RTPTWCCPacket packet;
+  memset (&packet, 0, sizeof (RTPTWCCPacket));
+  packet.local_ts = GST_CLOCK_TIME_NONE;
+  packet.remote_ts = GST_CLOCK_TIME_NONE;
+  packet.local_delta = GST_CLOCK_STIME_NONE;
+  packet.remote_delta = GST_CLOCK_STIME_NONE;
+  packet.delta_delta = GST_CLOCK_STIME_NONE;
+  packet.seqnum = seqnum;
+  packet.status = status;
+  g_array_append_val (twcc_packets, packet);
+}
+
+static guint
+_parse_run_length_chunk (GstBitReader * reader, GArray * twcc_packets,
+    guint16 seqnum_offset, guint remaining_packets)
+{
+  guint run_length;
+  guint8 status_code;
+
+  gst_bit_reader_get_bits_uint8 (reader, &status_code, 2);
+
+  run_length = *(guint16 *) reader->data & ~0xE0;       /* mask out the 3 last bits */
+  run_length = MIN (remaining_packets, GST_READ_UINT16_BE (&run_length));
+
+  for (guint i = 0; i < run_length; i++) {
+    _add_twcc_packet (twcc_packets, seqnum_offset + i, status_code);
+  }
+
+  return run_length;
+}
+
+static guint
+_parse_status_vector_chunk (GstBitReader * reader, GArray * twcc_packets,
+    guint16 seqnum_offset, guint remaining_packets)
+{
+  guint8 symbol_size;
+  guint num_bits;
+
+  gst_bit_reader_get_bits_uint8 (reader, &symbol_size, 1);
+  symbol_size += 1;
+  num_bits = MIN (remaining_packets, 14 / symbol_size);
+
+  for (guint i = 0; i < num_bits; i++) {
+    guint8 status_code;
+    if (gst_bit_reader_get_bits_uint8 (reader, &status_code, symbol_size))
+      _add_twcc_packet (twcc_packets, seqnum_offset + i, status_code);
+  }
+
+  return num_bits;
+}
+
+/* Remove all locally stored packets that has been reported
+   back to us */
+static void
+_prune_sent_packets (RTPTWCCManager * twcc, GArray * twcc_packets)
+{
+  SentPacket *first;
+  RTPTWCCPacket *last;
+  guint16 last_idx;
+
+  if (twcc_packets->len == 0 || twcc->sent_packets->len == 0)
+    return;
+
+  first = &g_array_index (twcc->sent_packets, SentPacket, 0);
+  last = &g_array_index (twcc_packets, RTPTWCCPacket, twcc_packets->len - 1);
+
+  last_idx = last->seqnum - first->seqnum;
+
+  if (last_idx >= twcc->sent_packets->len)
+    g_array_remove_range (twcc->sent_packets, 0, last_idx);
+}
+
+static void
+_check_for_lost_packets (RTPTWCCManager * twcc, GArray * twcc_packets,
+    guint16 base_seqnum, guint16 packet_count, guint8 fb_pkt_count)
+{
+  guint packets_lost;
+  guint i;
+
+  /* first packet */
+  if (twcc->first_fci_parse) {
+    twcc->first_fci_parse = FALSE;
+    goto done;
+  }
+
+  /* we have gone backwards, don't reset the expectations,
+     but process the packet nonetheless */
+  if (fb_pkt_count < twcc->expected_parsed_fb_pkt_count) {
+    GST_WARNING ("feedback packet count going backwards (%u < %u)",
+        fb_pkt_count, twcc->expected_parsed_fb_pkt_count);
+    return;
+  }
+
+  /* we have jumped forwards, reset expectations, but don't trigger
+     lost packets in case the missing fb-packet(s) arrive later */
+  if (fb_pkt_count > twcc->expected_parsed_fb_pkt_count) {
+    GST_WARNING ("feedback packet count jumped ahead (%u > %u)",
+        fb_pkt_count, twcc->expected_parsed_fb_pkt_count);
+    goto done;
+  }
+
+  packets_lost = base_seqnum - twcc->expected_parsed_seqnum;
+  for (i = 0; i < packets_lost; i++) {
+    _add_twcc_packet (twcc_packets, twcc->expected_parsed_seqnum + i,
+        RTP_TWCC_PACKET_STATUS_NOT_RECV);
+  }
+
+done:
+  twcc->expected_parsed_seqnum = base_seqnum + packet_count;
+  twcc->expected_parsed_fb_pkt_count = fb_pkt_count + 1;
+  return;
+}
+
+GArray *
+rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc,
+    guint8 * fci_data, guint fci_length)
+{
+  GArray *twcc_packets;
+  guint16 base_seqnum;
+  guint16 packet_count;
+  GstClockTime base_time;
+  GstClockTime ts_rounded;
+  guint8 fb_pkt_count;
+  guint packets_parsed = 0;
+  guint fci_parsed;
+  guint i;
+  SentPacket *first_sent_pkt = NULL;
+
+  if (fci_length < 10) {
+    GST_WARNING ("Malformed TWCC RTCP feedback packet");
+    return NULL;
+  }
+
+  base_seqnum = GST_READ_UINT16_BE (&fci_data[0]);
+  packet_count = GST_READ_UINT16_BE (&fci_data[2]);
+  base_time = GST_READ_UINT24_BE (&fci_data[4]) * REF_TIME_UNIT;
+  fb_pkt_count = fci_data[7];
+
+  GST_DEBUG ("Parsed TWCC feedback: base_seqnum: #%u, packet_count: %u, "
+      "base_time %" GST_TIME_FORMAT " fb_pkt_count: %u",
+      base_seqnum, packet_count, GST_TIME_ARGS (base_time), fb_pkt_count);
+
+  twcc_packets = g_array_sized_new (FALSE, FALSE,
+      sizeof (RTPTWCCPacket), packet_count);
+
+  _check_for_lost_packets (twcc, twcc_packets,
+      base_seqnum, packet_count, fb_pkt_count);
+
+  fci_parsed = 8;
+  while (packets_parsed < packet_count && (fci_parsed + 1) < fci_length) {
+    GstBitReader reader = GST_BIT_READER_INIT (&fci_data[fci_parsed], 2);
+    guint8 chunk_type;
+    guint seqnum_offset = base_seqnum + packets_parsed;
+    guint remaining_packets = packet_count - packets_parsed;
+
+    gst_bit_reader_get_bits_uint8 (&reader, &chunk_type, 1);
+
+    if (chunk_type == RTP_TWCC_CHUNK_TYPE_RUN_LENGTH) {
+      packets_parsed += _parse_run_length_chunk (&reader,
+          twcc_packets, seqnum_offset, remaining_packets);
+    } else {
+      packets_parsed += _parse_status_vector_chunk (&reader,
+          twcc_packets, seqnum_offset, remaining_packets);
+    }
+    fci_parsed += 2;
+  }
+
+  if (twcc->sent_packets->len > 0)
+    first_sent_pkt = &g_array_index (twcc->sent_packets, SentPacket, 0);
+
+  ts_rounded = base_time;
+  for (i = 0; i < twcc_packets->len; i++) {
+    RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i);
+    gint16 delta = 0;
+    GstClockTimeDiff delta_ts;
+
+    if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) {
+      delta = fci_data[fci_parsed];
+      fci_parsed += 1;
+    } else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) {
+      delta = GST_READ_UINT16_BE (&fci_data[fci_parsed]);
+      fci_parsed += 2;
+    }
+
+    if (fci_parsed > fci_length) {
+      GST_WARNING ("Malformed TWCC RTCP feedback packet");
+      g_array_set_size (twcc_packets, 0);
+      break;
+    }
+
+    if (pkt->status != RTP_TWCC_PACKET_STATUS_NOT_RECV) {
+      delta_ts = delta * DELTA_UNIT;
+      ts_rounded += delta_ts;
+      pkt->remote_ts = ts_rounded;
+
+      GST_LOG ("pkt: #%u, remote_ts: %" GST_TIME_FORMAT
+          " delta_ts: %" GST_STIME_FORMAT
+          " status: %u", pkt->seqnum,
+          GST_TIME_ARGS (pkt->remote_ts), GST_STIME_ARGS (delta_ts),
+          pkt->status);
+    }
+
+    if (first_sent_pkt) {
+      SentPacket *found = NULL;
+      guint16 sent_idx = pkt->seqnum - first_sent_pkt->seqnum;
+      if (sent_idx < twcc->sent_packets->len)
+        found = &g_array_index (twcc->sent_packets, SentPacket, sent_idx);
+      if (found && found->seqnum == pkt->seqnum) {
+        if (GST_CLOCK_TIME_IS_VALID (found->socket_ts)) {
+          pkt->local_ts = found->socket_ts;
+        } else {
+          pkt->local_ts = found->ts;
+        }
+        pkt->size = found->size;
+
+        GST_LOG ("matching pkt: #%u with local_ts: %" GST_TIME_FORMAT
+            " size: %u", pkt->seqnum, GST_TIME_ARGS (pkt->local_ts), pkt->size);
+      }
+    }
+  }
+
+  _prune_sent_packets (twcc, twcc_packets);
+
+  return twcc_packets;
+}
diff --git a/gst/rtpmanager/rtptwcc.h b/gst/rtpmanager/rtptwcc.h
new file mode 100644 (file)
index 0000000..39f9d58
--- /dev/null
@@ -0,0 +1,70 @@
+/* GStreamer
+ * Copyright (C) 2019 Pexip (http://pexip.com/)
+ *   @author: Havard Graff <havard@pexip.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __RTP_TWCC_H__
+#define __RTP_TWCC_H__
+
+#include <gst/gst.h>
+#include <gst/rtp/rtp.h>
+#include "rtpstats.h"
+
+typedef struct _RTPTWCCManager RTPTWCCManager;
+typedef struct _RTPTWCCPacket RTPTWCCPacket;
+typedef enum _RTPTWCCPacketStatus RTPTWCCPacketStatus;
+
+enum _RTPTWCCPacketStatus
+{
+  RTP_TWCC_PACKET_STATUS_NOT_RECV = 0,
+  RTP_TWCC_PACKET_STATUS_SMALL_DELTA = 1,
+  RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA = 2,
+};
+
+struct _RTPTWCCPacket
+{
+  GstClockTime local_ts;
+  GstClockTime remote_ts;
+  GstClockTimeDiff local_delta;
+  GstClockTimeDiff remote_delta;
+  GstClockTimeDiff delta_delta;
+  RTPTWCCPacketStatus status;
+  guint16 seqnum;
+  guint size;
+};
+
+RTPTWCCManager * rtp_twcc_manager_new (guint mtu);
+void rtp_twcc_manager_free (RTPTWCCManager * twcc);
+
+void rtp_twcc_manager_set_mtu (RTPTWCCManager * twcc, guint mtu);
+
+gboolean rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc,
+    guint16 seqnum, RTPPacketInfo * pinfo);
+
+void rtp_twcc_manager_send_packet (RTPTWCCManager * twcc,
+    guint16 seqnum, RTPPacketInfo * pinfo);
+void rtp_twcc_manager_set_send_packet_ts (RTPTWCCManager * twcc,
+    guint packet_id, GstClockTime ts);
+
+GstBuffer * rtp_twcc_manager_get_feedback (RTPTWCCManager * twcc,
+    guint32 sender_ssrc);
+
+GArray * rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc,
+    guint8 * fci_data, guint fci_length);
+
+#endif /* __RTP_TWCC_H__ */
index 1f150b5..da1a1fb 100644 (file)
 #define TEST_BUF_SSRC 0x01BADBAD
 #define TEST_BUF_MS  20
 #define TEST_BUF_DURATION (TEST_BUF_MS * GST_MSECOND)
-#define TEST_BUF_SIZE (64000 * TEST_BUF_MS / 1000)
+#define TEST_BUF_BPS 512000
+#define TEST_BUF_SIZE (TEST_BUF_BPS * TEST_BUF_MS / (1000 * 8))
 #define TEST_RTP_TS_DURATION (TEST_BUF_CLOCK_RATE * TEST_BUF_MS / 1000)
 
+#define TEST_TWCC_EXT_ID 5
+#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
+
 static GstCaps *
 generate_caps (void)
 {
@@ -49,8 +53,9 @@ generate_caps (void)
 }
 
 static GstBuffer *
-generate_test_buffer_full (GstClockTime dts,
-    guint seq_num, guint32 rtp_ts, guint ssrc)
+generate_test_buffer_full (GstClockTime ts,
+    guint seqnum, guint32 rtp_ts, guint ssrc,
+    gboolean marker_bit, guint8 twcc_ext_id, guint16 twcc_seqnum)
 {
   GstBuffer *buf;
   guint8 *payload;
@@ -58,30 +63,57 @@ generate_test_buffer_full (GstClockTime dts,
   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
 
   buf = gst_rtp_buffer_new_allocate (TEST_BUF_SIZE, 0, 0);
-  GST_BUFFER_DTS (buf) = dts;
+  GST_BUFFER_PTS (buf) = ts;
+  GST_BUFFER_DTS (buf) = ts;
 
   gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
   gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT);
-  gst_rtp_buffer_set_seq (&rtp, seq_num);
+  gst_rtp_buffer_set_seq (&rtp, seqnum);
   gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
   gst_rtp_buffer_set_ssrc (&rtp, ssrc);
+  gst_rtp_buffer_set_marker (&rtp, marker_bit);
 
   payload = gst_rtp_buffer_get_payload (&rtp);
   for (i = 0; i < TEST_BUF_SIZE; i++)
     payload[i] = 0xff;
 
+  if (twcc_ext_id > 0) {
+    guint8 twcc_seqnum_be[2];
+    GST_WRITE_UINT16_BE (twcc_seqnum_be, twcc_seqnum);
+    gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id,
+        twcc_seqnum_be, sizeof (twcc_seqnum_be));
+  }
+
   gst_rtp_buffer_unmap (&rtp);
 
   return buf;
 }
 
 static GstBuffer *
-generate_test_buffer (guint seq_num, guint ssrc)
+generate_test_buffer (guint seqnum, guint ssrc)
+{
+  return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
+      seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, 0, 0);
+}
+
+static GstBuffer *
+generate_twcc_recv_buffer (guint seqnum,
+    GstClockTime arrival_time, gboolean marker_bit)
 {
-  return generate_test_buffer_full (seq_num * TEST_BUF_DURATION,
-      seq_num, seq_num * TEST_RTP_TS_DURATION, ssrc);
+  return generate_test_buffer_full (arrival_time, seqnum,
+      seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
+      TEST_TWCC_EXT_ID, seqnum);
 }
 
+static GstBuffer *
+generate_twcc_send_buffer (guint seqnum, gboolean marker_bit)
+{
+  return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
+      seqnum, seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
+      TEST_TWCC_EXT_ID, seqnum);
+}
+
+
 typedef struct
 {
   GstHarness *send_rtp_h;
@@ -94,6 +126,8 @@ typedef struct
   GstCaps *caps;
 
   gboolean running;
+  GMutex lock;
+  GstStructure *last_twcc_stats;
 } SessionHarness;
 
 static GstCaps *
@@ -103,11 +137,38 @@ _pt_map_requested (GstElement * element, guint pt, gpointer data)
   return gst_caps_copy (h->caps);
 }
 
+static void
+_notify_twcc_stats (GParamSpec * spec G_GNUC_UNUSED,
+    GObject * object G_GNUC_UNUSED, gpointer data)
+{
+  SessionHarness *h = data;
+  GstStructure *stats;
+  g_object_get (h->session, "twcc-stats", &stats, NULL);
+
+  g_mutex_lock (&h->lock);
+  if (h->last_twcc_stats)
+    gst_structure_free (h->last_twcc_stats);
+  h->last_twcc_stats = stats;
+  g_mutex_unlock (&h->lock);
+}
+
+static GstStructure *
+session_harness_get_last_twcc_stats (SessionHarness * h)
+{
+  GstStructure *ret = NULL;
+  g_mutex_lock (&h->lock);
+  if (h->last_twcc_stats)
+    ret = gst_structure_copy (h->last_twcc_stats);
+  g_mutex_unlock (&h->lock);
+  return ret;
+}
+
 static SessionHarness *
 session_harness_new (void)
 {
   SessionHarness *h = g_new0 (SessionHarness, 1);
   h->caps = generate_caps ();
+  g_mutex_init (&h->lock);
 
   h->testclock = GST_TEST_CLOCK_CAST (gst_test_clock_new ());
   gst_system_clock_set_default (GST_CLOCK_CAST (h->testclock));
@@ -130,6 +191,9 @@ session_harness_new (void)
   g_signal_connect (h->session, "request-pt-map",
       (GCallback) _pt_map_requested, h);
 
+  g_signal_connect (h->session, "notify::twcc-stats",
+      (GCallback) _notify_twcc_stats, h);
+
   g_object_get (h->session, "internal-session", &h->internal_session, NULL);
 
   return h;
@@ -147,6 +211,11 @@ session_harness_free (SessionHarness * h)
   gst_harness_teardown (h->recv_rtp_h);
   gst_harness_teardown (h->send_rtp_h);
 
+  g_mutex_clear (&h->lock);
+
+  if (h->last_twcc_stats)
+    gst_structure_free (h->last_twcc_stats);
+
   g_object_unref (h->internal_session);
   gst_object_unref (h->session);
   g_free (h);
@@ -158,6 +227,12 @@ session_harness_send_rtp (SessionHarness * h, GstBuffer * buf)
   return gst_harness_push (h->send_rtp_h, buf);
 }
 
+static GstBuffer *
+session_harness_pull_send_rtp (SessionHarness * h)
+{
+  return gst_harness_pull (h->send_rtp_h);
+}
+
 static GstFlowReturn
 session_harness_recv_rtp (SessionHarness * h, GstBuffer * buf)
 {
@@ -254,6 +329,29 @@ session_harness_rtp_retransmission_request (SessionHarness * h,
       gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, s));
 }
 
+static void
+_add_twcc_field_to_caps (GstCaps * caps, guint8 ext_id)
+{
+  gchar *name = g_strdup_printf ("extmap-%u", ext_id);
+  gst_caps_set_simple (caps, name, G_TYPE_STRING, TWCC_EXTMAP_STR, NULL);
+  g_free (name);
+}
+
+static void
+session_harness_set_twcc_recv_ext_id (SessionHarness * h, guint8 ext_id)
+{
+  _add_twcc_field_to_caps (h->caps, ext_id);
+  g_signal_emit_by_name (h->session, "clear-pt-map");
+}
+
+static void
+session_harness_set_twcc_send_ext_id (SessionHarness * h, guint8 ext_id)
+{
+  GstCaps *caps = gst_caps_copy (h->caps);
+  _add_twcc_field_to_caps (caps, ext_id);
+  gst_harness_set_src_caps (h->send_rtp_h, caps);
+}
+
 GST_START_TEST (test_multiple_ssrc_rr)
 {
   SessionHarness *h = session_harness_new ();
@@ -2399,7 +2497,7 @@ generate_stepped_ts_buffer (guint i, gboolean stepped)
       GST_TIME_ARGS (gst_util_uint64_scale_int (GST_SECOND, ts,
               TEST_BUF_CLOCK_RATE)), i);
 
-  buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA);
+  buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE, 0, 0);
   return buf;
 }
 
@@ -2463,6 +2561,1054 @@ GST_START_TEST (test_stepped_packet_rate)
 
 GST_END_TEST;
 
+
+/********************* TWCC-tests *********************/
+
+static GstRTCPFBType
+_gst_buffer_get_rtcp_fbtype (GstBuffer * buf)
+{
+  GstRTCPFBType ret = GST_RTCP_FB_TYPE_INVALID;
+  GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+  GstRTCPPacket packet;
+
+  if (!gst_rtcp_buffer_validate_reduced (buf))
+    return ret;
+
+  if (!gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp))
+    return ret;
+
+  if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
+    goto done;
+
+  if (GST_RTCP_TYPE_RTPFB != gst_rtcp_packet_get_type (&packet))
+    goto done;
+
+  ret = gst_rtcp_packet_fb_get_type (&packet);
+
+done:
+  gst_rtcp_buffer_unmap (&rtcp);
+  return ret;
+}
+
+static GstBuffer *
+session_harness_pull_twcc_rtcp (SessionHarness * h)
+{
+  GstBuffer *ret = NULL;
+
+  while (ret == NULL) {
+    GstBuffer *buf = session_harness_pull_rtcp (h);
+    if (GST_RTCP_RTPFB_TYPE_TWCC == _gst_buffer_get_rtcp_fbtype (buf)) {
+      ret = buf;
+    } else {
+      gst_buffer_unref (buf);
+    }
+  }
+  return ret;
+}
+
+typedef struct
+{
+  guint16 base_seqnum;
+  guint16 num_packets;
+  GstClockTime base_time;
+  GstClockTime duration;
+} TWCCTestData;
+
+static TWCCTestData twcc_header_and_run_lenght_test_data[] = {
+  {0, 10, 0, 33 * GST_MSECOND},
+  {65530, 12, 37 * 64 * GST_MSECOND, 10 * GST_MSECOND}, /* seqnum wrap */
+  {99, 200, 1024 * 64 * GST_MSECOND, 10 * GST_MSECOND}, /* many packets */
+  {20000, 23, 0, 250 * GST_USECOND},    /* minimal duration */
+  {56000, 15, 1000 * 64 * GST_MSECOND, 10 * GST_MSECOND},       /* timestamp offset */
+};
+
+GST_START_TEST (test_twcc_header_and_run_length)
+{
+  SessionHarness *h = session_harness_new ();
+  gint i;
+  GstFlowReturn res;
+  GstBuffer *buf;
+  GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+  GstRTCPPacket packet;
+  guint8 *fci_data;
+  guint16 run_length;
+
+  TWCCTestData *td = &twcc_header_and_run_lenght_test_data[__i__];
+
+  /* enable twcc */
+  session_harness_set_twcc_recv_ext_id (h, TEST_TWCC_EXT_ID);
+
+  /* receive some buffers */
+  for (i = 0; i < td->num_packets; i++) {
+    gboolean last_packet = i == (td->num_packets - 1);
+
+    buf = generate_twcc_recv_buffer (i + td->base_seqnum,
+        td->base_time + i * td->duration, last_packet);
+    res = session_harness_recv_rtp (h, buf);
+    fail_unless_equals_int (GST_FLOW_OK, res);
+  }
+
+  session_harness_produce_rtcp (h, 1);
+  buf = session_harness_pull_twcc_rtcp (h);
+  fail_unless (buf);
+
+  gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
+  fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &packet));
+
+  fci_data = gst_rtcp_packet_fb_get_fci (&packet);
+
+  /* base seqnum */
+  fail_unless_equals_int (td->base_seqnum, GST_READ_UINT16_BE (&fci_data[0]));
+
+  /*  packet count */
+  fail_unless_equals_int (td->num_packets, GST_READ_UINT16_BE (&fci_data[2]));
+
+  /* reference time (in 64ms units) */
+  fail_unless_equals_int (td->base_time,
+      GST_READ_UINT24_BE (&fci_data[4]) * 64 * GST_MSECOND);
+
+  /* feedback packet number */
+  fail_unless_equals_int (0, fci_data[7]);
+
+  /* run-length coding */
+  fail_unless_equals_int (0, fci_data[8] & 0x80);
+
+  /* status: small-delta */
+  fail_unless_equals_int (0x20, fci_data[8] & 0x60);
+
+  /* packets in run_length */
+  run_length = *((guint16 *) & fci_data[8]);
+  run_length = run_length & ~0xE0;      /* mask out the 3 last bits */
+  fail_unless_equals_int (td->num_packets, GST_READ_UINT16_BE (&run_length));
+
+  /* first recv-delta always 0 */
+  fail_unless_equals_int (0, fci_data[10]);
+
+  /* following recv-delta equal to duration (in 250us units) */
+  fail_unless_equals_clocktime (td->duration, fci_data[11] * 250 * GST_USECOND);
+
+  gst_rtcp_buffer_unmap (&rtcp);
+  gst_buffer_unref (buf);
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+typedef struct
+{
+  guint16 seqnum;
+  GstClockTime timestamp;
+  gboolean marker;
+} TWCCPacket;
+
+#define twcc_push_packets(h, packets)                                          \
+G_STMT_START {                                                                 \
+  guint i;                                                                     \
+  session_harness_set_twcc_recv_ext_id ((h), TEST_TWCC_EXT_ID);                \
+  for (i = 0; i < G_N_ELEMENTS ((packets)); i++) {                             \
+    TWCCPacket *twcc_pkt = &(packets)[i];                                      \
+    fail_unless_equals_int (GST_FLOW_OK,                                       \
+        session_harness_recv_rtp ((h),                                         \
+            generate_twcc_recv_buffer (twcc_pkt->seqnum,                       \
+                twcc_pkt->timestamp, twcc_pkt->marker)));                      \
+  }                                                                            \
+} G_STMT_END
+
+#define twcc_verify_fci(buf, exp_fci)                                          \
+G_STMT_START {                                                                 \
+  GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;                                   \
+  GstRTCPPacket packet;                                                        \
+  guint8 *fci_data;                                                            \
+  guint16 fci_length;                                                          \
+  fail_unless (gst_rtcp_buffer_validate_reduced (buf));                        \
+  gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);                              \
+  fail_unless (gst_rtcp_buffer_get_first_packet (&rtcp, &packet));             \
+  fail_unless_equals_int (GST_RTCP_TYPE_RTPFB,                                 \
+      gst_rtcp_packet_get_type (&packet));                                     \
+  fail_unless_equals_int (GST_RTCP_RTPFB_TYPE_TWCC,                            \
+      gst_rtcp_packet_fb_get_type (&packet));                                  \
+  fci_data = gst_rtcp_packet_fb_get_fci (&packet);                             \
+  fci_length = gst_rtcp_packet_fb_get_fci_length (&packet) * sizeof (guint32); \
+  fail_unless_equals_int (fci_length, sizeof (exp_fci));                       \
+  fail_unless_equals_int (0, memcmp (fci_data, (exp_fci), fci_length));        \
+  gst_rtcp_buffer_unmap (&rtcp);                                               \
+} G_STMT_END
+
+#define twcc_verify_packets_to_fci(h, packets, exp_fci)                        \
+G_STMT_START {                                                                 \
+  GstBuffer *buf;                                                              \
+  twcc_push_packets (h, packets);                                              \
+  session_harness_produce_rtcp ((h), 1);                                       \
+  buf = session_harness_pull_twcc_rtcp ((h));                                  \
+  twcc_verify_fci (buf, exp_fci);                                              \
+  gst_buffer_unref (buf);                                                      \
+} G_STMT_END
+
+#define twcc_verify_packets_to_event(packets, event)                           \
+G_STMT_START {                                                                 \
+  guint i;                                                                     \
+  guint j = 0;                                                                 \
+  GValueArray *packets_array = g_value_get_boxed (                             \
+      gst_structure_get_value (gst_event_get_structure ((event)), "packets")); \
+  for (i = 0; i < packets_array->n_values; i++) {                              \
+    TWCCPacket *twcc_pkt;                                                      \
+    GstClockTime ts;                                                           \
+    guint seqnum;                                                              \
+    gboolean lost;                                                             \
+    const GstStructure *pkt_s =                                                \
+        gst_value_get_structure (g_value_array_get_nth (packets_array, i));    \
+    fail_unless (gst_structure_get_boolean (pkt_s, "lost", &lost));            \
+    if (lost)                                                                  \
+      continue;                                                                \
+    fail_unless (gst_structure_get_clock_time (pkt_s, "remote-ts", &ts));      \
+    fail_unless (gst_structure_get_uint (pkt_s, "seqnum", &seqnum));           \
+    twcc_pkt = &(packets)[j++];                                                \
+    fail_unless_equals_int (twcc_pkt->seqnum, seqnum);                         \
+    fail_unless_equals_clocktime (twcc_pkt->timestamp, ts);                    \
+  }                                                                            \
+  gst_event_unref (event);                                                     \
+} G_STMT_END
+
+#define twcc_verify_packets_to_packets(send_h, recv_h, packets)                \
+G_STMT_START {                                                                 \
+  guint i;                                                                     \
+  GstEvent *event;                                                             \
+  twcc_push_packets ((recv_h), packets);                                       \
+  session_harness_produce_rtcp ((recv_h), 1);                                  \
+  session_harness_recv_rtcp ((send_h),                                         \
+      session_harness_pull_twcc_rtcp ((recv_h)));                              \
+  for (i = 0; i < 2; i++)                                                      \
+    gst_event_unref (gst_harness_pull_upstream_event ((send_h)->send_rtp_h));  \
+  event = gst_harness_pull_upstream_event ((send_h)->send_rtp_h);              \
+  twcc_verify_packets_to_event (packets, event);                               \
+} G_STMT_END
+
+GST_START_TEST (test_twcc_1_bit_status_vector)
+{
+  SessionHarness *h0 = session_harness_new ();
+  SessionHarness *h1 = session_harness_new ();
+
+  TWCCPacket packets[] = {
+    {10, 0 * GST_MSECOND, FALSE},
+    {12, 12 * GST_MSECOND, FALSE},
+    {14, 14 * GST_MSECOND, FALSE},
+    {15, 15 * GST_MSECOND, FALSE},
+    {17, 17 * GST_MSECOND, FALSE},
+    {20, 20 * GST_MSECOND, FALSE},
+    {21, 21 * GST_MSECOND, FALSE},
+    {23, 23 * GST_MSECOND, TRUE},
+  };
+
+  guint8 exp_fci[] = {
+    0x00, 0x0a,                 /* base sequence number: 10 */
+    0x00, 0x0e,                 /* packet status count: 14 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x00,                       /* feedback packet count: 0 */
+    0xab, 0x4d,                 /* packet chunk: 1 0 1 0 1 0 1 1 | 0 1 0 0 1 1 0 1 */
+    0x00,                       /* recv delta: +0:00:00.000000000 */
+    0x30,                       /* recv delta: +0:00:00.012000000 */
+    0x08,                       /* recv delta: +0:00:00.002000000 */
+    0x04,                       /* recv delta: +0:00:00.001000000 */
+    0x08,                       /* recv delta: +0:00:00.002000000 */
+    0x0c,                       /* recv delta: +0:00:00.003000000 */
+    0x04,                       /* recv delta: +0:00:00.001000000 */
+    0x08,                       /* recv delta: +0:00:00.002000000 */
+    0x00, 0x00,                 /* padding */
+  };
+
+  /* check we get the expected fci */
+  twcc_verify_packets_to_fci (h0, packets, exp_fci);
+
+  /* and check we can parse this back to the original packets */
+  twcc_verify_packets_to_packets (h1, h1, packets);
+
+  session_harness_free (h0);
+  session_harness_free (h1);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_2_bit_status_vector)
+{
+  SessionHarness *h0 = session_harness_new ();
+  SessionHarness *h1 = session_harness_new ();
+
+  TWCCPacket packets[] = {
+    {5, 5 * 64 * GST_MSECOND, FALSE},
+    {7, 7 * 64 * GST_MSECOND, FALSE},
+    {8, 8 * 64 * GST_MSECOND, FALSE},
+    {11, 12 * 64 * GST_MSECOND, TRUE},
+  };
+
+  guint8 exp_fci[] = {
+    0x00, 0x05,                 /* base sequence number: 5 */
+    0x00, 0x07,                 /* packet status count: 7 */
+    0x00, 0x00, 0x05,           /* reference time: 5 */
+    0x00,                       /* feedback packet count: 0 */
+    0xd2, 0x82,                 /* packet chunk: 1 1 0 1 0 0 1 0 | 1 0 0 0 0 0 1 0 */
+    /* normal, missing, large, large, missing, missing, large */
+    0x00,                       /* recv delta: +0:00:00.000000000 */
+    0x02, 0x00,                 /* recv delta: +0:00:00.128000000 */
+    0x01, 0x00,                 /* recv delta: +0:00:00.064000000 */
+    0x04, 0x00,                 /* recv delta: +0:00:00.256000000 */
+    0x00, 0x00, 0x00,           /* padding */
+  };
+
+  twcc_verify_packets_to_fci (h0, packets, exp_fci);
+
+  twcc_verify_packets_to_packets (h1, h1, packets);
+
+  session_harness_free (h0);
+  session_harness_free (h1);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_various_gaps)
+{
+  SessionHarness *h = session_harness_new ();
+  guint16 seq = 1 + __i__;
+
+  TWCCPacket packets[] = {
+    {0, 0 * 250 * GST_USECOND, FALSE},
+    {seq, seq * 250 * GST_USECOND, TRUE},
+  };
+
+  twcc_verify_packets_to_packets (h, h, packets);
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_negative_delta)
+{
+  SessionHarness *h0 = session_harness_new ();
+  SessionHarness *h1 = session_harness_new ();
+
+  TWCCPacket packets[] = {
+    {0, 0 * 250 * GST_USECOND, FALSE},
+    {1, 2 * 250 * GST_USECOND, FALSE},
+    {2, 1 * 250 * GST_USECOND, FALSE},
+    {3, 3 * 250 * GST_USECOND, TRUE},
+  };
+
+  guint8 exp_fci[] = {
+    0x00, 0x00,                 /* base sequence number: 0 */
+    0x00, 0x04,                 /* packet status count: 4 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x00,                       /* feedback packet count: 0 */
+    0xd6, 0x40,                 /* packet chunk: 1 1 0 1 0 1 1 0 | 0 1 0 0 0 0 0 0 */
+    0x00,                       /* recv delta: +0:00:00.000000000 */
+    0x02,                       /* recv delta: +0:00:00.000500000 */
+    0xff, 0xff,                 /* recv delta: -0:00:00.000250000 */
+    0x02,                       /* recv delta: +0:00:00.000500000 */
+    0x00,                       /* padding */
+  };
+
+  twcc_verify_packets_to_fci (h0, packets, exp_fci);
+
+  twcc_verify_packets_to_packets (h1, h1, packets);
+
+  session_harness_free (h0);
+  session_harness_free (h1);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_seqnum_wrap)
+{
+  SessionHarness *h0 = session_harness_new ();
+  SessionHarness *h1 = session_harness_new ();
+
+  TWCCPacket packets[] = {
+    {65534, 0 * 250 * GST_USECOND, FALSE},
+    {65535, 1 * 250 * GST_USECOND, FALSE},
+    {0, 2 * 250 * GST_USECOND, FALSE},
+    {1, 3 * 250 * GST_USECOND, TRUE},
+  };
+
+  guint8 exp_fci[] = {
+    0xff, 0xfe,                 /* base sequence number: 65534 */
+    0x00, 0x04,                 /* packet status count: 4 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x00,                       /* feedback packet count: 0 */
+    0x20, 0x04,                 /* packet chunk: 0 0 1 0 0 0 0 0 | 0 0 0 0 0 1 0 0 */
+    0x00,                       /* recv delta: +0:00:00.000000000 */
+    0x01,                       /* recv delta: +0:00:00.000250000 */
+    0x01,                       /* recv delta: +0:00:00.000250000 */
+    0x01,                       /* recv delta: +0:00:00.000250000 */
+    0x00, 0x00,                 /* padding */
+  };
+
+  twcc_verify_packets_to_fci (h0, packets, exp_fci);
+
+  twcc_verify_packets_to_packets (h1, h1, packets);
+
+  session_harness_free (h0);
+  session_harness_free (h1);
+}
+
+GST_END_TEST;
+
+
+GST_START_TEST (test_twcc_double_packets)
+{
+  SessionHarness *h = session_harness_new ();
+
+  TWCCPacket packets0[] = {
+    {11, 11 * GST_MSECOND, FALSE},
+    {12, 12 * GST_MSECOND, TRUE},
+  };
+
+  TWCCPacket packets1[] = {
+    {13, 13 * GST_MSECOND, FALSE},
+    {14, 14 * GST_MSECOND, FALSE},
+    {15, 15 * GST_MSECOND, TRUE},
+  };
+
+  guint8 exp_fci0[] = {
+    0x00, 0x0b,                 /* base sequence number: 11 */
+    0x00, 0x02,                 /* packet status count: 14 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x00,                       /* feedback packet count: 0 */
+    0x20, 0x02,                 /* packet chunk: 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0 */
+    0x2c, 0x04,                 /* recv deltas */
+  };
+
+  guint8 exp_fci1[] = {
+    0x00, 0x0d,                 /* base sequence number: 13 */
+    0x00, 0x03,                 /* packet status count: 3 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x01,                       /* feedback packet count: 1 */
+    0x20, 0x03,                 /* packet chunk: 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 1 */
+    0x34, 0x04, 0x04,           /* recv deltas */
+    0x00, 0x00, 0x00,           /* padding */
+  };
+
+  twcc_verify_packets_to_fci (h, packets0, exp_fci0);
+  twcc_verify_packets_to_fci (h, packets1, exp_fci1);
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_huge_seqnum_gap)
+{
+  SessionHarness *h = session_harness_new ();
+  GstBuffer *buf;
+
+  TWCCPacket packets[] = {
+    {9, 4 * 32 * GST_MSECOND, FALSE},
+    {10, 5 * 32 * GST_MSECOND, FALSE},
+    {30011, 6 * 32 * GST_MSECOND, FALSE},
+    {30012, 7 * 32 * GST_MSECOND, FALSE},
+    {30013, 8 * 32 * GST_MSECOND, TRUE},
+  };
+
+  guint8 exp_fci0[] = {
+    0x00, 0x09,                 /* base sequence number: 9 */
+    0x00, 0x02,                 /* packet status count: 2 */
+    0x00, 0x00, 0x02,           /* reference time: 2 * 64ms */
+    0x00,                       /* feedback packet count: 0 */
+    /* packet chunks: */
+    0x20, 0x02,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0  */
+    0x00, 0x80,                 /* recv deltas, +0, +32ms */
+  };
+
+  guint8 exp_fci1[] = {
+    0x75, 0x3b,                 /* base sequence number: 300011 */
+    0x00, 0x03,                 /* packet status count: 3 */
+    0x00, 0x00, 0x03,           /* reference time: 3 * 64ms */
+    0x01,                       /* feedback packet count: 1 */
+    /* packet chunks: */
+    0x20, 0x03,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 1 */
+    0x00, 0x80, 0x80,           /* recv deltas, +4, +32ms, +32ms */
+    0x00, 0x00, 0x00,           /* padding */
+  };
+
+  /* The sequence-number does a huge leap. Instead of encoding this as
+     a massive run-lenght sequence, like so */
+#if 0
+  guint8 exp_fci[] = {
+    0x00, 0x09,                 /* base sequence number: 9 */
+    0x75, 0x35,                 /* packet status count: 30005 */
+    0x00, 0x00, 0x02,           /* reference time: 2 */
+    0x00,                       /* feedback packet count: 0 */
+    /* packet chunks: */
+    0xb0, 0x00,                 /* 1 bit 2 there, 12 lost: 1 0 1 1 0 0 0 0 | 0 0 0 0 0 0 0 0 */
+    0x1f, 0xff,                 /* run-length: 8191 lost:  0 0 0 1 1 1 1 1 | 1 1 1 1 1 1 1 1 */
+    0x1f, 0xff,                 /* run-length: 8191 lost:  0 0 0 1 1 1 1 1 | 1 1 1 1 1 1 1 1 */
+    0x1f, 0xff,                 /* run-length: 8191 lost:  0 0 0 1 1 1 1 1 | 1 1 1 1 1 1 1 1 */
+    0x15, 0x27,                 /* run-length: 5415 lost:  0 0 0 1 0 1 0 1 | 0 0 1 0 0 1 1 1 */
+    /* 12 + 8191 + 8191 + 8191 + 5415 = 30000 lost packets */
+    0xb8, 0x00,                 /* 1 bit 3 there         : 1 0 1 1 1 0 0 0 | 0 0 0 0 0 0 0 0 */
+
+    0x00, 0x80, 0x80, 0x80, 0x80,       /* recv deltas */
+    0x00, 0x00, 0x00,           /* padding */
+  };
+#endif
+
+  /* ...just send two feedback-packets, with
+     the second one starting from the new sequence-number. */
+  twcc_push_packets (h, packets);
+
+  session_harness_produce_rtcp (h, 1);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci0);
+  gst_buffer_unref (buf);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci1);
+  gst_buffer_unref (buf);
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_duplicate_seqnums)
+{
+  SessionHarness *h = session_harness_new ();
+  GstBuffer *buf;
+
+  /* A duplicate seqnum can be interpreted as a gap of 65536 packets.
+     Whatever the cause might be, we will follow the behavior of reordered
+     packets, and drop it */
+  TWCCPacket packets[] = {
+    {1, 4 * 32 * GST_MSECOND, FALSE},
+    {2, 5 * 32 * GST_MSECOND, FALSE},
+    {2, 6 * 32 * GST_MSECOND, FALSE},
+    {3, 7 * 32 * GST_MSECOND, TRUE},
+  };
+
+  guint8 exp_fci0[] = {
+    0x00, 0x01,                 /* base sequence number: 1 */
+    0x00, 0x02,                 /* packet status count: 2 */
+    0x00, 0x00, 0x02,           /* reference time: 2 * 64ms */
+    0x00,                       /* feedback packet count: 0 */
+    /* packet chunks: */
+    0x20, 0x02,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0  */
+    0x00, 0x80,                 /* recv deltas: +0, +32ms */
+  };
+
+  guint8 exp_fci1[] = {
+    0x00, 0x03,                 /* base sequence number: 3 */
+    0x00, 0x01,                 /* packet status count: 1 */
+    0x00, 0x00, 0x03,           /* reference time: 3 * 64ms */
+    0x01,                       /* feedback packet count: 1 */
+    /* packet chunks: */
+    0x20, 0x01,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1  */
+    0x80,                       /* recv deltas: +32ms */
+    0x00,                       /* padding */
+  };
+
+  twcc_push_packets (h, packets);
+
+  session_harness_produce_rtcp (h, 1);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci0);
+  gst_buffer_unref (buf);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci1);
+  gst_buffer_unref (buf);
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_multiple_markers)
+{
+  SessionHarness *h = session_harness_new ();
+  GstBuffer *buf;
+
+  /* for this test, notice how the first recv-delta should relate back to
+     the reference-time, which is 0 in this case. The packets are incrementing
+     in timestamps equal to the smallest unit for TWCC (250 microseconds) */
+  TWCCPacket packets[] = {
+    {1, 1 * 250 * GST_USECOND, FALSE},
+    {2, 2 * 250 * GST_USECOND, FALSE},
+    {3, 3 * 250 * GST_USECOND, TRUE},
+    {4, 4 * 250 * GST_USECOND, FALSE},
+    {5, 5 * 250 * GST_USECOND, TRUE},
+    {6, 6 * 250 * GST_USECOND, FALSE},
+    {7, 7 * 250 * GST_USECOND, FALSE},
+    {8, 8 * 250 * GST_USECOND, FALSE},
+    {9, 9 * 250 * GST_USECOND, TRUE},
+  };
+
+  guint8 exp_fci0[] = {
+    0x00, 0x01,                 /* base sequence number: 1 */
+    0x00, 0x03,                 /* packet status count: 3 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x00,                       /* feedback packet count: 0 */
+    /* packet chunks: */
+    0x20, 0x03,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 1  */
+    0x01, 0x01, 0x01,           /* recv deltas, +1, +1, +1 */
+    0x00, 0x00, 0x00,           /* padding */
+  };
+
+  guint8 exp_fci1[] = {
+    0x00, 0x04,                 /* base sequence number: 4 */
+    0x00, 0x02,                 /* packet status count: 2 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x01,                       /* feedback packet count: 1 */
+    /* packet chunks: */
+    0x20, 0x02,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 1 0 */
+    0x04, 0x01,                 /* recv deltas, +4, +1, +1 */
+  };
+
+  guint8 exp_fci2[] = {
+    0x00, 0x06,                 /* base sequence number: 6 */
+    0x00, 0x04,                 /* packet status count: 4 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x02,                       /* feedback packet count: 2 */
+    /* packet chunks: */
+    0x20, 0x04,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 1 0 0 */
+    0x06, 0x01, 0x01, 0x01,     /* recv deltas, +6, +1, +1, +1 */
+    0x00, 0x00,
+  };
+
+  twcc_push_packets (h, packets);
+
+  /* we should get 1 SR/RR, and then 3x TWCC packets */
+  session_harness_produce_rtcp (h, 1);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci0);
+  gst_buffer_unref (buf);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci1);
+  gst_buffer_unref (buf);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci2);
+  gst_buffer_unref (buf);
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_no_marker_and_gaps)
+{
+  SessionHarness *h = session_harness_new ();
+  guint i;
+
+  g_object_set (h->internal_session, "probation", 1, NULL);
+
+  /* Push packets with gaps and no marker bit. This should not prevent
+     the feedback packets from being sent at all. */
+  for (i = 0; i < 80; i += 10) {
+    TWCCPacket packets[] = { {i, i * 250 * GST_USECOND, FALSE}
+    };
+    twcc_push_packets (h, packets);
+  }
+
+  /* verify we did receive some feedback for these packets */
+  session_harness_produce_rtcp (h, 1);
+  for (i = 0; i < 2; i++) {
+    gst_buffer_unref (session_harness_pull_twcc_rtcp (h));
+  }
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+static GstBuffer *
+generate_twcc_feedback_rtcp (guint8 * fci_data, guint16 fci_length)
+{
+  GstRTCPPacket packet;
+  GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+  GstBuffer *buffer = gst_rtcp_buffer_new (1000);
+  guint8 *fci;
+
+  fail_unless (gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp));
+  fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_RTPFB,
+          &packet));
+  gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_RTPFB_TYPE_TWCC);
+  gst_rtcp_packet_fb_set_fci_length (&packet, fci_length);
+  fci = gst_rtcp_packet_fb_get_fci (&packet);
+  memcpy (fci, fci_data, fci_length);
+  gst_rtcp_packet_fb_set_sender_ssrc (&packet, TEST_BUF_SSRC);
+  gst_rtcp_packet_fb_set_media_ssrc (&packet, 0);
+  gst_rtcp_buffer_unmap (&rtcp);
+
+  return buffer;
+}
+
+GST_START_TEST (test_twcc_bad_rtcp)
+{
+  SessionHarness *h = session_harness_new ();
+  guint i;
+  GstBuffer *buf;
+  GstEvent *event;
+  GValueArray *packets_array;
+
+  guint8 fci[] = {
+    0xff, 0xff,                 /* base sequence number: max */
+    0xff, 0xff,                 /* packet status count: max */
+    0xff, 0xff, 0xff,           /* reference time: max */
+    0xff,                       /* feedback packet count: max */
+    0x3f, 0xff,                 /* packet chunk: run-length, max */
+    0x00,                       /* only 1 recv-delta */
+  };
+
+  buf = generate_twcc_feedback_rtcp (fci, sizeof (fci));
+  session_harness_recv_rtcp (h, buf);
+
+  /* two reconfigure events */
+  for (i = 0; i < 2; i++)
+    gst_event_unref (gst_harness_pull_upstream_event (h->send_rtp_h));
+
+  event = gst_harness_pull_upstream_event (h->send_rtp_h);
+  packets_array =
+      g_value_get_boxed (gst_structure_get_value (gst_event_get_structure
+          (event), "packets"));
+
+  /* this ends up with 0 packets, due to completely invalid data */
+  fail_unless_equals_int (packets_array->n_values, 0);
+
+  gst_event_unref (event);
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_delta_ts_rounding)
+{
+  SessionHarness *h = session_harness_new ();
+  guint i, j = 0;
+  GstEvent *event;
+  GstBuffer *buf;
+  GValueArray *packets_array;
+
+  TWCCPacket packets[] = {
+    {2002, 9 * GST_SECOND + 366458177, FALSE}
+    ,
+    {2003, 9 * GST_SECOND + 366497068, FALSE}
+    ,
+    {2017, 9 * GST_SECOND + 366929482, FALSE}
+    ,
+    {2019, 9 * GST_SECOND + 391595309, FALSE}
+    ,
+    {2020, 9 * GST_SECOND + 426883507, FALSE}
+    ,
+    {2025, 9 * GST_SECOND + 427021638, TRUE}
+    ,
+  };
+
+  TWCCPacket exp_packets[] = {
+    {2002, 9 * GST_SECOND + 366250000, FALSE}
+    ,
+    {2003, 9 * GST_SECOND + 366250000, FALSE}
+    ,
+    {2017, 9 * GST_SECOND + 366750000, FALSE}
+    ,
+    {2019, 9 * GST_SECOND + 391500000, FALSE}
+    ,
+    {2020, 9 * GST_SECOND + 426750000, FALSE}
+    ,
+    {2025, 9 * GST_SECOND + 427000000, TRUE}
+    ,
+  };
+
+  guint8 exp_fci[] = {
+    0x07, 0xd2,                 /* base sequence number: 2002 */
+    0x00, 0x18,                 /* packet status count: 24 */
+    0x00, 0x00, 0x92,           /* reference time: 0:00:09.344000000 */
+    0x00,                       /* feedback packet count: 0 */
+    0xb0, 0x00,                 /* packet chunk: 1 0 1 1 0 0 0 0 | 0 0 0 0 0 0 0 0 */
+    0x96, 0x10,                 /* packet chunk: 1 0 0 1 0 1 1 0 | 0 0 0 1 0 0 0 0 */
+    0x59,                       /* recv delta: 0:00:00.022250000 abs: 0:00:09.366250000 */
+    0x00,                       /* recv delta: 0:00:00.000000000 abs: 0:00:09.366250000 */
+    0x02,                       /* recv delta: 0:00:00.000500000 abs: 0:00:09.366750000 */
+    0x63,                       /* recv delta: 0:00:00.024750000 abs: 0:00:09.391500000 */
+    0x8d,                       /* recv delta: 0:00:00.035250000 abs: 0:00:09.426750000 */
+    0x01,                       /* recv delta: 0:00:00.000250000 abs: 0:00:09.427000000 */
+    0x00, 0x00,                 /* padding */
+  };
+
+  twcc_push_packets (h, packets);
+  session_harness_produce_rtcp (h, 1);
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci);
+
+  session_harness_recv_rtcp (h, buf);
+  for (i = 0; i < 2; i++)
+    gst_event_unref (gst_harness_pull_upstream_event (h->send_rtp_h));
+  event = gst_harness_pull_upstream_event (h->send_rtp_h);
+
+  packets_array =
+      g_value_get_boxed (gst_structure_get_value (gst_event_get_structure
+          (event), "packets"));
+  for (i = 0; i < packets_array->n_values; i++) {
+    TWCCPacket *twcc_pkt;
+    const GstStructure *pkt_s =
+        gst_value_get_structure (g_value_array_get_nth (packets_array, i));
+    GstClockTime ts;
+    guint seqnum;
+    gboolean lost;
+    fail_unless (gst_structure_get_boolean (pkt_s, "lost", &lost));
+    if (lost)
+      continue;
+    twcc_pkt = &exp_packets[j++];
+
+    fail_unless (gst_structure_get_clock_time (pkt_s, "remote-ts", &ts));
+    fail_unless (gst_structure_get_uint (pkt_s, "seqnum", &seqnum));
+
+    fail_unless_equals_int (twcc_pkt->seqnum, seqnum);
+    fail_unless_equals_clocktime (twcc_pkt->timestamp, ts);
+  }
+
+  gst_event_unref (event);
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_double_gap)
+{
+  SessionHarness *h0 = session_harness_new ();
+  SessionHarness *h1 = session_harness_new ();
+
+  TWCCPacket packets[] = {
+    {1202, 5 * GST_SECOND + 717000000, FALSE}
+    ,
+    {1215, 5 * GST_SECOND + 760250000, FALSE}
+    ,
+    {1221, 5 * GST_SECOND + 775500000, TRUE}
+    ,
+  };
+
+  guint8 exp_fci[] = {
+    0x04, 0xb2,                 /* base sequence number: 1202 */
+    0x00, 0x14,                 /* packet status count: 20 */
+    0x00, 0x00, 0x59,           /* reference time: 0:00:05.696000000 */
+    0x00,                       /* feedback packet count: 0 */
+    0xa0, 0x01,                 /* packet chunk: 1 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
+    0x81, 0x00,                 /* packet chunk: 1 0 0 0 0 0 0 1 | 0 0 0 0 0 0 0 0 */
+    0x54,                       /* recv delta: +0:00:00.021000000 */
+    0xad,                       /* recv delta: +0:00:00.043250000 */
+    0x3d,                       /* recv delta: +0:00:00.015250000 */
+    0x00,                       /* padding */
+  };
+
+  twcc_verify_packets_to_fci (h0, packets, exp_fci);
+
+  twcc_verify_packets_to_packets (h1, h1, packets);
+
+  session_harness_free (h0);
+  session_harness_free (h1);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_recv_packets_reordered)
+{
+  SessionHarness *h = session_harness_new ();
+  GstBuffer *buf;
+
+  /* a reordered seqence, with marker-bits for #3 and #4 */
+  TWCCPacket packets[] = {
+    {1, 1 * 250 * GST_USECOND, FALSE}
+    ,
+    {3, 2 * 250 * GST_USECOND, TRUE}
+    ,
+    {2, 3 * 250 * GST_USECOND, FALSE}
+    ,
+    {4, 4 * 250 * GST_USECOND, TRUE}
+    ,
+  };
+
+  /* first we expect #2 to be reported lost */
+  guint8 exp_fci0[] = {
+    0x00, 0x01,                 /* base sequence number: 1 */
+    0x00, 0x03,                 /* packet status count: 3 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x00,                       /* feedback packet count: 0 */
+    /* packet chunks: */
+    0xa8, 0x00,                 /* 1 0 1 0 1 0 0 0 | 0 0 0 0 0 0 0 0 */
+    0x01, 0x01,                 /* recv deltas, +1, +1 */
+  };
+
+  /* and then when 2 actually arrives, it is already reported lost,
+     so we will not re-report it, but drop it */
+  guint8 exp_fci1[] = {
+    0x00, 0x04,                 /* base sequence number: 4 */
+    0x00, 0x01,                 /* packet status count: 1 */
+    0x00, 0x00, 0x00,           /* reference time: 0 */
+    0x01,                       /* feedback packet count: 1 */
+    /* packet chunks: */
+    0x20, 0x01,                 /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
+    0x04,                       /* recv deltas, +4 */
+    0x00,                       /* padding */
+  };
+
+  twcc_push_packets (h, packets);
+
+  session_harness_produce_rtcp (h, 1);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci0);
+  gst_buffer_unref (buf);
+
+  buf = session_harness_pull_twcc_rtcp (h);
+  twcc_verify_fci (buf, exp_fci1);
+  gst_buffer_unref (buf);
+
+  session_harness_free (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_recv_rtcp_reordered)
+{
+  SessionHarness *send_h = session_harness_new ();
+  SessionHarness *recv_h = session_harness_new ();
+  GstBuffer *buf[4];
+  GstEvent *event;
+  guint i;
+
+  /* three frames, two packets each */
+  TWCCPacket packets[] = {
+    {1, 1 * GST_SECOND, FALSE}
+    ,
+    {2, 2 * GST_SECOND, TRUE}
+    ,
+    {3, 3 * GST_SECOND, FALSE}
+    ,
+    {4, 4 * GST_SECOND, TRUE}
+    ,
+    {5, 5 * GST_SECOND, FALSE}
+    ,
+    {6, 6 * GST_SECOND, TRUE}
+    ,
+    {7, 7 * GST_SECOND, FALSE}
+    ,
+    {8, 8 * GST_SECOND, TRUE}
+    ,
+  };
+
+/*
+  TWCCPacket expected_packets0[] = {
+    {1, 1 * 250 * GST_USECOND, FALSE},
+    {2, 2 * 250 * GST_USECOND, TRUE},
+  };
+*/
+  twcc_push_packets (recv_h, packets);
+
+  session_harness_produce_rtcp (recv_h, 1);
+
+  buf[0] = session_harness_pull_twcc_rtcp (recv_h);
+  buf[1] = session_harness_pull_twcc_rtcp (recv_h);
+  buf[2] = session_harness_pull_twcc_rtcp (recv_h);
+  buf[3] = session_harness_pull_twcc_rtcp (recv_h);
+
+  /* reorder the twcc-feedback */
+  session_harness_recv_rtcp (send_h, buf[0]);
+  session_harness_recv_rtcp (send_h, buf[2]);
+  session_harness_recv_rtcp (send_h, buf[1]);
+  session_harness_recv_rtcp (send_h, buf[3]);
+
+  for (i = 0; i < 2; i++)
+    gst_event_unref (gst_harness_pull_upstream_event (send_h->send_rtp_h));
+
+  event = gst_harness_pull_upstream_event (send_h->send_rtp_h);
+  twcc_verify_packets_to_event (&packets[0 * 2], event);
+
+  event = gst_harness_pull_upstream_event (send_h->send_rtp_h);
+  twcc_verify_packets_to_event (&packets[2 * 2], event);
+
+  event = gst_harness_pull_upstream_event (send_h->send_rtp_h);
+  twcc_verify_packets_to_event (&packets[1 * 2], event);
+
+  event = gst_harness_pull_upstream_event (send_h->send_rtp_h);
+  twcc_verify_packets_to_event (&packets[3 * 2], event);
+
+  session_harness_free (send_h);
+  session_harness_free (recv_h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_twcc_send_and_recv)
+{
+  SessionHarness *h_send = session_harness_new ();
+  SessionHarness *h_recv = session_harness_new ();
+  guint frame;
+  const guint num_frames = 2;
+  const guint num_slices = 15;
+
+  /* enable twcc */
+  session_harness_set_twcc_recv_ext_id (h_recv, TEST_TWCC_EXT_ID);
+  session_harness_set_twcc_send_ext_id (h_send, TEST_TWCC_EXT_ID);
+
+  for (frame = 0; frame < num_frames; frame++) {
+    GstBuffer *buf;
+    for (guint slice = 0; slice < num_slices; slice++) {
+      GstFlowReturn res;
+      guint seq = frame * num_slices + slice;
+
+      /* from payloder to rtpbin */
+      buf = generate_twcc_send_buffer (seq, slice == num_slices - 1);
+      res = session_harness_send_rtp (h_send, buf);
+      fail_unless_equals_int (GST_FLOW_OK, res);
+
+      /* get the buffer ready for the network */
+      buf = session_harness_pull_send_rtp (h_send);
+
+      /* buffer arrives at the receiver */
+      res = session_harness_recv_rtp (h_recv, buf);
+      fail_unless_equals_int (GST_FLOW_OK, res);
+    }
+
+    /* receiver sends a TWCC packet to the sender */
+    session_harness_produce_rtcp (h_recv, 1);
+    buf = session_harness_pull_twcc_rtcp (h_recv);
+    /* sender receives the TWCC packet */
+    session_harness_recv_rtcp (h_send, buf);
+
+    if (frame > 0) {
+      GstStructure *twcc_stats;
+      guint bitrate_sent;
+      guint bitrate_recv;
+      guint packets_sent;
+      guint packets_recv;
+      gdouble packet_loss_pct;
+      GstClockTimeDiff avg_delta_of_delta;
+      twcc_stats = session_harness_get_last_twcc_stats (h_send);
+      fail_unless (gst_structure_get (twcc_stats,
+              "bitrate-sent", G_TYPE_UINT, &bitrate_sent,
+              "bitrate-recv", G_TYPE_UINT, &bitrate_recv,
+              "packets-sent", G_TYPE_UINT, &packets_sent,
+              "packets-recv", G_TYPE_UINT, &packets_recv,
+              "packet-loss-pct", G_TYPE_DOUBLE, &packet_loss_pct,
+              "avg-delta-of-delta", G_TYPE_INT64, &avg_delta_of_delta, NULL));
+      fail_unless_equals_int (TEST_BUF_BPS, bitrate_sent);
+      fail_unless_equals_int (TEST_BUF_BPS, bitrate_recv);
+      fail_unless_equals_int (num_slices, packets_sent);
+      fail_unless_equals_int (num_slices, packets_recv);
+      fail_unless_equals_float (0.0f, packet_loss_pct);
+      fail_unless_equals_int64 (0, avg_delta_of_delta);
+      gst_structure_free (twcc_stats);
+    }
+  }
+
+  session_harness_free (h_send);
+  session_harness_free (h_recv);
+}
+
+GST_END_TEST;
+
 static Suite *
 rtpsession_suite (void)
 {
@@ -2503,6 +3649,26 @@ rtpsession_suite (void)
   tcase_add_test (tc_chain, test_packet_rate);
   tcase_add_test (tc_chain, test_stepped_packet_rate);
 
+  /* twcc */
+  tcase_add_loop_test (tc_chain, test_twcc_header_and_run_length,
+      0, G_N_ELEMENTS (twcc_header_and_run_lenght_test_data));
+  tcase_add_test (tc_chain, test_twcc_1_bit_status_vector);
+  tcase_add_test (tc_chain, test_twcc_2_bit_status_vector);
+  tcase_add_loop_test (tc_chain, test_twcc_various_gaps, 0, 50);
+  tcase_add_test (tc_chain, test_twcc_negative_delta);
+  tcase_add_test (tc_chain, test_twcc_seqnum_wrap);
+  tcase_add_test (tc_chain, test_twcc_huge_seqnum_gap);
+  tcase_add_test (tc_chain, test_twcc_double_packets);
+  tcase_add_test (tc_chain, test_twcc_duplicate_seqnums);
+  tcase_add_test (tc_chain, test_twcc_multiple_markers);
+  tcase_add_test (tc_chain, test_twcc_no_marker_and_gaps);
+  tcase_add_test (tc_chain, test_twcc_bad_rtcp);
+  tcase_add_test (tc_chain, test_twcc_delta_ts_rounding);
+  tcase_add_test (tc_chain, test_twcc_double_gap);
+  tcase_add_test (tc_chain, test_twcc_recv_packets_reordered);
+  tcase_add_test (tc_chain, test_twcc_recv_rtcp_reordered);
+  tcase_add_test (tc_chain, test_twcc_send_and_recv);
+
   return s;
 }