rtpsource: Add more information to probation warning
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / rtpsource.c
index 06c869d..7079e16 100644 (file)
@@ -44,6 +44,7 @@ enum
 #define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
 #define DEFAULT_MAX_DROPOUT_TIME     60000
 #define DEFAULT_MAX_MISORDER_TIME    2000
+#define DEFAULT_DISABLE_RTCP         FALSE
 
 enum
 {
@@ -56,7 +57,8 @@ enum
   PROP_STATS,
   PROP_PROBATION,
   PROP_MAX_DROPOUT_TIME,
-  PROP_MAX_MISORDER_TIME
+  PROP_MAX_MISORDER_TIME,
+  PROP_DISABLE_RTCP
 };
 
 /* GObject vmethods */
@@ -108,7 +110,7 @@ rtp_source_class_init (RTPSourceClass * klass)
    * The current SDES items of the source. Returns a structure with name
    * application/x-rtp-source-sdes and may contain the following fields:
    *
-   *  'cname'       G_TYPE_STRING  : The canonical name
+   *  'cname'       G_TYPE_STRING  : The canonical name in the form user@host
    *  'name'        G_TYPE_STRING  : The user name
    *  'email'       G_TYPE_STRING  : The user's electronic mail address
    *  'phone'       G_TYPE_STRING  : The user's phone number
@@ -188,30 +190,30 @@ rtp_source_class_init (RTPSourceClass * klass)
    * These values are only updated when the source is sending.
    *
    *  "sent-rb"               G_TYPE_BOOLEAN  we have sent an RB
-   *  "sent-rb-fractionlost"  G_TYPE_UINT     calculated lost fraction
+   *  "sent-rb-fractionlost"  G_TYPE_UINT     calculated lost 8-bit fraction
    *  "sent-rb-packetslost"   G_TYPE_INT      lost packets
    *  "sent-rb-exthighestseq" G_TYPE_UINT     last seen seqnum
    *  "sent-rb-jitter"        G_TYPE_UINT     jitter (in clock rate units)
-   *  "sent-rb-lsr"           G_TYPE_UINT     last SR time (in NTP Short Format, 16.16 fixed point)
-   *  "sent-rb-dlsr"          G_TYPE_UINT     delay since last SR (in NTP Short Format, 16.16 fixed point)
+   *  "sent-rb-lsr"           G_TYPE_UINT     last SR time (seconds in NTP Short Format, 16.16 fixed point)
+   *  "sent-rb-dlsr"          G_TYPE_UINT     delay since last SR (seconds in NTP Short Format, 16.16 fixed point)
    *
    * The following fields are only present for non-internal sources and
    * represents the last RB that this source sent. This is only updated
    * when the source is receiving data and sending RB blocks.
    *
    *  "have-rb"          G_TYPE_BOOLEAN  the source has sent RB
-   *  "rb-fractionlost"  G_TYPE_UINT     lost fraction
+   *  "rb-fractionlost"  G_TYPE_UINT     lost 8-bit fraction
    *  "rb-packetslost"   G_TYPE_INT      lost packets
    *  "rb-exthighestseq" G_TYPE_UINT     highest received seqnum
    *  "rb-jitter"        G_TYPE_UINT     reception jitter (in clock rate units)
-   *  "rb-lsr"           G_TYPE_UINT     last SR time (in NTP Short Format, 16.16 fixed point)
-   *  "rb-dlsr"          G_TYPE_UINT     delay since last SR (in NTP Short Format, 16.16 fixed point)
+   *  "rb-lsr"           G_TYPE_UINT     last SR time (seconds in NTP Short Format, 16.16 fixed point)
+   *  "rb-dlsr"          G_TYPE_UINT     delay since last SR (seconds in NTP Short Format, 16.16 fixed point)
    *
    * The round trip of this source is calculated from the last RB
    * values and the reception time of the last RB packet. It is only present for
    * non-internal sources.
    *
-   *  "rb-round-trip"    G_TYPE_UINT     the round-trip time (in NTP Short Format, 16.16 fixed point)
+   *  "rb-round-trip"    G_TYPE_UINT     the round-trip time (seconds in NTP Short Format, 16.16 fixed point)
    *
    */
   g_object_class_install_property (gobject_class, PROP_STATS,
@@ -237,6 +239,16 @@ rtp_source_class_init (RTPSourceClass * klass)
           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * RTPSession::disable-rtcp:
+   *
+   * Allow disabling the sending of RTCP packets for this source.
+   */
+  g_object_class_install_property (gobject_class, PROP_DISABLE_RTCP,
+      g_param_spec_boolean ("disable-rtcp", "Disable RTCP",
+          "Disable sending RTCP packets for this source",
+          DEFAULT_DISABLE_RTCP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
 }
 
@@ -255,6 +267,9 @@ rtp_source_reset (RTPSource * src)
   src->bye_reason = NULL;
   src->sent_bye = FALSE;
   g_hash_table_remove_all (src->reported_in_sr_of);
+  g_queue_foreach (src->retained_feedback, (GFunc) gst_buffer_unref, NULL);
+  g_queue_clear (src->retained_feedback);
+  src->last_rtptime = -1;
 
   src->stats.cycles = -1;
   src->stats.jitter = 0;
@@ -271,12 +286,14 @@ rtp_source_reset (RTPSource * src)
 
   src->stats.sent_pli_count = 0;
   src->stats.sent_fir_count = 0;
+  src->stats.sent_nack_count = 0;
+  src->stats.recv_nack_count = 0;
 }
 
 static void
 rtp_source_init (RTPSource * src)
 {
-  /* sources are initialy on probation until we receive enough valid RTP
+  /* sources are initially on probation until we receive enough valid RTP
    * packets or a valid RTCP packet */
   src->validated = FALSE;
   src->internal = FALSE;
@@ -292,14 +309,18 @@ rtp_source_init (RTPSource * src)
   src->clock_rate = -1;
   src->packets = g_queue_new ();
   src->seqnum_offset = -1;
-  src->last_rtptime = -1;
 
   src->retained_feedback = g_queue_new ();
-  src->nacks = g_array_new (FALSE, FALSE, sizeof (guint32));
+  src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16));
+  src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime));
 
   src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal);
 
+  src->last_keyframe_request = GST_CLOCK_TIME_NONE;
+
   rtp_source_reset (src);
+
+  src->pt_set = FALSE;
 }
 
 void
@@ -331,6 +352,7 @@ rtp_source_finalize (GObject * object)
   g_queue_free (src->retained_feedback);
 
   g_array_free (src->nacks, TRUE);
+  g_array_free (src->nack_deadlines, TRUE);
 
   if (src->rtp_from)
     g_object_unref (src->rtp_from);
@@ -400,7 +422,9 @@ rtp_source_create_stats (RTPSource * src)
       "sent-pli-count", G_TYPE_UINT, src->stats.sent_pli_count,
       "recv-pli-count", G_TYPE_UINT, src->stats.recv_pli_count,
       "sent-fir-count", G_TYPE_UINT, src->stats.sent_fir_count,
-      "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count, NULL);
+      "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count,
+      "sent-nack-count", G_TYPE_UINT, src->stats.sent_nack_count,
+      "recv-nack-count", G_TYPE_UINT, src->stats.recv_nack_count, NULL);
 
   /* get the last SR. */
   have_sr = rtp_source_get_last_sr (src, &time, &ntptime, &rtptime,
@@ -531,6 +555,9 @@ rtp_source_set_property (GObject * object, guint prop_id,
     case PROP_MAX_MISORDER_TIME:
       src->max_misorder_time = g_value_get_uint (value);
       break;
+    case PROP_DISABLE_RTCP:
+      src->disable_rtcp = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -573,6 +600,9 @@ rtp_source_get_property (GObject * object, guint prop_id,
     case PROP_MAX_MISORDER_TIME:
       g_value_set_uint (value, src->max_misorder_time);
       break;
+    case PROP_DISABLE_RTCP:
+      g_value_set_boolean (value, src->disable_rtcp);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1105,7 +1135,14 @@ update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo,
           goto done;
         }
       } else {
-        /* unexpected seqnum in probation */
+        /* unexpected seqnum in probation
+         *
+         * There is no need to clean the queue at this point because the
+         * invalid packets in the queue are not going to be pushed as we are
+         * still in probation, and some cleanup will be performed at future
+         * probation attempts anyway if there are too many old packets in the
+         * queue.
+         */
         goto probation_seqnum;
       }
     } else if (delta >= 0 && delta < max_dropout) {
@@ -1143,14 +1180,14 @@ update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo,
       g_queue_clear (src->packets);
 
       /* duplicate or reordered packet, will be filtered by jitterbuffer. */
-      GST_WARNING ("duplicate or reordered packet (seqnr %u, expected %u)",
+      GST_INFO ("duplicate or reordered packet (seqnr %u, expected %u)",
           seqnr, expected);
     }
   }
 
   src->stats.octets_received += pinfo->payload_len;
   src->stats.bytes_received += pinfo->bytes;
-  src->stats.packets_received++;
+  src->stats.packets_received += pinfo->packets;
   /* for the bitrate estimation */
   src->bytes_received += pinfo->payload_len;
 
@@ -1173,7 +1210,9 @@ bad_sequence:
   }
 probation_seqnum:
   {
-    GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
+    GST_WARNING ("probation: seqnr %d != expected %d "
+        "(SSRC %u curr_probation %i probation %i)", seqnr, expected, src->ssrc,
+        src->curr_probation, src->probation);
     src->curr_probation = src->probation;
     src->stats.max_seq = seqnr;
     return FALSE;
@@ -1185,7 +1224,7 @@ probation_seqnum:
  * @src: an #RTPSource
  * @pinfo: an #RTPPacketInfo
  *
- * Let @src handle the incomming RTP packet described in @pinfo.
+ * Let @src handle the incoming RTP packet described in @pinfo.
  *
  * Returns: a #GstFlowReturn.
  */
@@ -1243,12 +1282,10 @@ rtp_source_mark_bye (RTPSource * src, const gchar * reason)
 /**
  * rtp_source_send_rtp:
  * @src: an #RTPSource
- * @data: an RTP buffer or a list of RTP buffers
- * @is_list: if @data is a buffer or list
- * @running_time: the running time of @data
+ * @pinfo: an #RTPPacketInfo
  *
- * Send @data (an RTP buffer or list of buffers) originating from @src.
- * This will make @src a sender. This function takes ownership of @data and
+ * Send data (an RTP buffer or buffer list from @pinfo) originating from @src.
+ * This will make @src a sender. This function takes ownership of the data and
  * modifies the SSRC in the RTP packet to that of @src when needed.
  *
  * Returns: a #GstFlowReturn.
@@ -1271,6 +1308,14 @@ rtp_source_send_rtp (RTPSource * src, RTPPacketInfo * pinfo)
   if (!update_receiver_stats (src, pinfo, FALSE))
     return GST_FLOW_OK;
 
+  if (src->pt_set && src->pt != pinfo->pt) {
+    GST_WARNING ("Changing pt from %u to %u for SSRC %u", src->pt, pinfo->pt,
+        src->ssrc);
+  }
+
+  src->pt = pinfo->pt;
+  src->pt_set = TRUE;
+
   /* update stats for the SR */
   src->stats.packets_sent += pinfo->packets;
   src->stats.octets_sent += pinfo->payload_len;
@@ -1476,6 +1521,12 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime,
   GST_DEBUG ("last_rtime %" GST_TIME_FORMAT ", last_rtptime %"
       G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_rtime), t_rtp);
 
+  if (src->clock_rate == -1 && src->pt_set) {
+    GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt,
+        src->ssrc);
+    get_clock_rate (src, src->pt);
+  }
+
   if (src->clock_rate != -1) {
     /* get the diff between the clock running_time and the buffer running_time.
      * This is the elapsed time, as measured against the pipeline clock, between
@@ -1496,7 +1547,8 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime,
       t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND);
     }
   } else {
-    GST_WARNING ("no clock-rate, cannot interpolate rtp time");
+    GST_WARNING ("no clock-rate, cannot interpolate rtp time for SSRC %u",
+        src->ssrc);
   }
 
   /* convert the NTP time in nanoseconds to 32.32 fixed point */
@@ -1806,17 +1858,33 @@ rtp_source_add_conflicting_address (RTPSource * src,
  */
 void
 rtp_source_timeout (RTPSource * src, GstClockTime current_time,
-    GstClockTime feedback_retention_window)
+    GstClockTime running_time, GstClockTime feedback_retention_window)
 {
   GstRTCPPacket *pkt;
+  GstClockTime max_pts_window;
+  guint pruned = 0;
 
   src->conflicting_addresses =
       timeout_conflicting_addresses (src->conflicting_addresses, current_time);
 
+  if (feedback_retention_window == GST_CLOCK_TIME_NONE ||
+      running_time < feedback_retention_window) {
+    return;
+  }
+
+  max_pts_window = running_time - feedback_retention_window;
+
   /* Time out AVPF packets that are older than the desired length */
-  while ((pkt = g_queue_peek_tail (src->retained_feedback)) &&
-      GST_BUFFER_PTS (pkt) < feedback_retention_window)
-    gst_buffer_unref (g_queue_pop_tail (src->retained_feedback));
+  while ((pkt = g_queue_peek_head (src->retained_feedback)) &&
+      GST_BUFFER_PTS (pkt) < max_pts_window) {
+    gst_buffer_unref (g_queue_pop_head (src->retained_feedback));
+    pruned++;
+  }
+
+  GST_LOG_OBJECT (src,
+      "%u RTCP packets pruned with PTS less than %" GST_TIME_FORMAT
+      ", queue len: %u", pruned, GST_TIME_ARGS (max_pts_window),
+      g_queue_get_length (src->retained_feedback));
 }
 
 static gint
@@ -1825,7 +1893,16 @@ compare_buffers (gconstpointer a, gconstpointer b, gpointer user_data)
   const GstBuffer *bufa = a;
   const GstBuffer *bufb = b;
 
-  return GST_BUFFER_PTS (bufa) - GST_BUFFER_PTS (bufb);
+  g_return_val_if_fail (GST_BUFFER_PTS (bufa) != GST_CLOCK_TIME_NONE, -1);
+  g_return_val_if_fail (GST_BUFFER_PTS (bufb) != GST_CLOCK_TIME_NONE, 1);
+
+  if (GST_BUFFER_PTS (bufa) < GST_BUFFER_PTS (bufb)) {
+    return -1;
+  } else if (GST_BUFFER_PTS (bufa) > GST_BUFFER_PTS (bufb)) {
+    return 1;
+  }
+
+  return 0;
 }
 
 void
@@ -1834,12 +1911,17 @@ rtp_source_retain_rtcp_packet (RTPSource * src, GstRTCPPacket * packet,
 {
   GstBuffer *buffer;
 
+  g_return_if_fail (running_time != GST_CLOCK_TIME_NONE);
+
   buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY,
       packet->offset, (gst_rtcp_packet_get_length (packet) + 1) * 4);
 
   GST_BUFFER_PTS (buffer) = running_time;
 
   g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL);
+
+  GST_LOG_OBJECT (src, "RTCP packet retained with PTS: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (running_time));
 }
 
 gboolean
@@ -1855,47 +1937,46 @@ rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data)
  * rtp_source_register_nack:
  * @src: The #RTPSource
  * @seqnum: a seqnum
+ * @deadline: the deadline before which RTX is still possible
  *
  * Register that @seqnum has not been received from @src.
  */
 void
-rtp_source_register_nack (RTPSource * src, guint16 seqnum)
+rtp_source_register_nack (RTPSource * src, guint16 seqnum,
+    GstClockTime deadline)
 {
-  guint i, len;
-  guint32 dword = seqnum << 16;
-  gint diff = 16;
+  gint i;
+  guint len;
+  gint diff = -1;
+  guint16 tseq;
 
   len = src->nacks->len;
-  for (i = 0; i < len; i++) {
-    guint32 tdword;
-    guint16 tseq;
+  for (i = len - 1; i >= 0; i--) {
+    tseq = g_array_index (src->nacks, guint16, i);
+    diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
 
-    tdword = g_array_index (src->nacks, guint32, i);
-    tseq = tdword >> 16;
+    GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len);
 
-    diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum);
-    if (diff < 16)
+    if (diff >= 0)
       break;
   }
-  /* we already have this seqnum */
-  if (diff == 0)
-    return;
-  /* it comes before the recorded seqnum, FIXME, we could merge it
-   * if not to far away */
-  if (diff < 0) {
-    GST_DEBUG ("insert NACK #%u at %u", seqnum, i);
-    g_array_insert_val (src->nacks, i, dword);
-  } else if (diff < 16) {
-    /* we can merge it */
-    dword = g_array_index (src->nacks, guint32, i);
-    dword |= 1 << (diff - 1);
-    GST_DEBUG ("merge NACK #%u at %u with NACK #%u -> 0x%08x", seqnum, i,
-        dword >> 16, dword);
-    g_array_index (src->nacks, guint32, i) = dword;
+
+  if (diff == 0) {
+    GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum,
+        GST_TIME_ARGS (deadline));
+    g_array_index (src->nack_deadlines, GstClockTime, i) = deadline;
+  } else if (i == len - 1) {
+    GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+        GST_TIME_ARGS (deadline));
+    g_array_append_val (src->nacks, seqnum);
+    g_array_append_val (src->nack_deadlines, deadline);
   } else {
-    GST_DEBUG ("append NACK #%u", seqnum);
-    g_array_append_val (src->nacks, dword);
+    GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum,
+        GST_TIME_ARGS (deadline));
+    g_array_insert_val (src->nacks, i + 1, seqnum);
+    g_array_insert_val (src->nack_deadlines, i + 1, deadline);
   }
+
   src->send_nack = TRUE;
 }
 
@@ -1908,18 +1989,51 @@ rtp_source_register_nack (RTPSource * src, guint16 seqnum)
  *
  * Returns: an array of @n_nacks seqnum values.
  */
-guint32 *
+guint16 *
 rtp_source_get_nacks (RTPSource * src, guint * n_nacks)
 {
   if (n_nacks)
     *n_nacks = src->nacks->len;
 
-  return (guint32 *) src->nacks->data;
+  return (guint16 *) src->nacks->data;
+}
+
+/**
+ * rtp_source_get_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: result number of nacks
+ *
+ * Get the registered NACKS deadlines.
+ *
+ * Returns: an array of @n_nacks deadline values.
+ */
+GstClockTime *
+rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks)
+{
+  if (n_nacks)
+    *n_nacks = src->nack_deadlines->len;
+
+  return (GstClockTime *) src->nack_deadlines->data;
 }
 
+/**
+ * rtp_source_clear_nacks:
+ * @src: The #RTPSource
+ * @n_nacks: number of nacks
+ *
+ * Remove @n_nacks oldest NACKS form array.
+ */
 void
-rtp_source_clear_nacks (RTPSource * src)
+rtp_source_clear_nacks (RTPSource * src, guint n_nacks)
 {
-  g_array_set_size (src->nacks, 0);
-  src->send_nack = FALSE;
+  g_return_if_fail (n_nacks <= src->nacks->len);
+
+  if (src->nacks->len == n_nacks) {
+    g_array_set_size (src->nacks, 0);
+    g_array_set_size (src->nack_deadlines, 0);
+    src->send_nack = FALSE;
+  } else {
+    g_array_remove_range (src->nacks, 0, n_nacks);
+    g_array_remove_range (src->nack_deadlines, 0, n_nacks);
+  }
 }