gst/rtpmanager/gstrtpsession.c: Move reconsideration code to the rtpsession object.
authorWim Taymans <wim.taymans@gmail.com>
Fri, 27 Apr 2007 15:09:12 +0000 (15:09 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:26 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpsession.c: (rtcp_thread),
(gst_rtp_session_send_rtcp), (gst_rtp_session_reconsider):
Move reconsideration code to the rtpsession object.
Simplify timout handling and add reconsideration.
* gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
(rtp_session_init), (rtp_session_finalize), (on_bye_ssrc),
(on_bye_timeout), (on_timeout), (rtp_session_set_callbacks),
(obtain_source), (rtp_session_create_source),
(update_arrival_stats), (rtp_session_process_rtp),
(rtp_session_process_sr), (rtp_session_process_rr),
(rtp_session_process_bye), (rtp_session_process_rtcp),
(calculate_rtcp_interval), (rtp_session_send_bye),
(rtp_session_next_timeout), (session_start_rtcp),
(session_report_blocks), (session_cleanup), (session_sdes),
(session_bye), (is_rtcp_time), (rtp_session_on_timeout):
* gst/rtpmanager/rtpsession.h:
Handle timeout of inactive sources and senders.
Implement BYE scheduling.
* gst/rtpmanager/rtpsource.c: (calculate_jitter),
(rtp_source_process_sr), (rtp_source_get_last_sr),
(rtp_source_get_last_rb):
* gst/rtpmanager/rtpsource.h:
Add members to check for timeouts.
* gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults),
(rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter),
(rtp_stats_calculate_bye_interval):
* gst/rtpmanager/rtpstats.h:
Use RFC algorithm for calculating the reporting interval.

gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.h
gst/rtpmanager/rtpstats.c
gst/rtpmanager/rtpstats.h

index bc29723..9545a92 100644 (file)
@@ -144,13 +144,15 @@ static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
     gpointer user_data);
 static GstClockTime gst_rtp_session_get_time (RTPSession * sess,
     gpointer user_data);
+static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
 
 static RTPSessionCallbacks callbacks = {
   gst_rtp_session_process_rtp,
   gst_rtp_session_send_rtp,
   gst_rtp_session_send_rtcp,
   gst_rtp_session_clock_rate,
-  gst_rtp_session_get_time
+  gst_rtp_session_get_time,
+  gst_rtp_session_reconsider
 };
 
 /* GObject vmethods */
@@ -293,44 +295,39 @@ rtcp_thread (GstRTPSession * rtpsession)
 {
   GstClock *clock;
   GstClockID id;
-  gdouble interval;
   GstClockTime current_time;
-  GstClockTime next_rtcp_check_time;
-  GstClockTime new_rtcp_send_time;
-  GstClockTime last_rtcp_send_time;
-  GstClockTimeDiff jitter;
-  guint members, prev_members;
+  GstClockTime next_timeout;
 
   clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
   if (clock == NULL)
     return;
 
+  current_time = gst_clock_get_time (clock);
+
   GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
 
   GST_RTP_SESSION_LOCK (rtpsession);
 
-  /* get initial estimate */
-  interval = rtp_session_get_reporting_interval (rtpsession->priv->session);
-  current_time = gst_clock_get_time (clock);
-  last_rtcp_send_time = current_time;
-  next_rtcp_check_time = current_time + (GST_SECOND * interval);
-  /* we keep track of members before and after the timeout to do reverse
-   * reconsideration. */
-  prev_members = rtp_session_get_num_active_sources (rtpsession->priv->session);
-
-  GST_DEBUG_OBJECT (rtpsession, "first RTCP interval: %lf seconds", interval);
-
   while (!rtpsession->priv->stop_thread) {
     GstClockReturn res;
 
+    /* get initial estimate */
+    next_timeout =
+        rtp_session_next_timeout (rtpsession->priv->session, current_time);
+
+
     GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (next_rtcp_check_time));
+        GST_TIME_ARGS (next_timeout));
+
+    /* leave if no more timeouts, the session ended */
+    if (next_timeout == GST_CLOCK_TIME_NONE)
+      break;
 
     id = rtpsession->priv->id =
-        gst_clock_new_single_shot_id (clock, next_rtcp_check_time);
+        gst_clock_new_single_shot_id (clock, next_timeout);
     GST_RTP_SESSION_UNLOCK (rtpsession);
 
-    res = gst_clock_id_wait (id, &jitter);
+    res = gst_clock_id_wait (id, NULL);
 
     GST_RTP_SESSION_LOCK (rtpsession);
     gst_clock_id_unref (id);
@@ -339,52 +336,16 @@ rtcp_thread (GstRTPSession * rtpsession)
     if (rtpsession->priv->stop_thread)
       break;
 
-    if (res != GST_CLOCK_UNSCHEDULED)
-      if (jitter < 0)
-        current_time = next_rtcp_check_time;
-      else
-        current_time = next_rtcp_check_time - jitter;
-    else
-      current_time = gst_clock_get_time (clock);
-
-    GST_DEBUG_OBJECT (rtpsession, "unlocked %d, jitter %" G_GINT64_FORMAT
-        ", current %" GST_TIME_FORMAT, res, jitter,
-        GST_TIME_ARGS (current_time));
-
-    members = rtp_session_get_num_active_sources (rtpsession->priv->session);
-
-    if (members < prev_members) {
-      GstClockTime time_remaining;
-
-      /* some members went away */
-      GST_DEBUG_OBJECT (rtpsession, "reverse reconsideration");
-      time_remaining = next_rtcp_check_time - current_time;
-      new_rtcp_send_time =
-          current_time + (time_remaining * members / prev_members);
-    } else {
-      interval = rtp_session_get_reporting_interval (rtpsession->priv->session);
-      GST_DEBUG_OBJECT (rtpsession, "forward reconsideration: %lf seconds",
-          interval);
-      new_rtcp_send_time = (interval * GST_SECOND) + last_rtcp_send_time;
-    }
-    prev_members = members;
-
-    if (current_time >= new_rtcp_send_time) {
-      GST_DEBUG_OBJECT (rtpsession, "sending RTCP now");
-
-      /* make the session manager produce RTCP, we ignore the result. */
-      rtp_session_perform_reporting (rtpsession->priv->session);
-
-      interval = rtp_session_get_reporting_interval (rtpsession->priv->session);
-
-      GST_DEBUG_OBJECT (rtpsession, "next RTCP interval: %lf seconds",
-          interval);
-      next_rtcp_check_time = (interval * GST_SECOND) + current_time;
-      last_rtcp_send_time = current_time;
-    } else {
-      GST_DEBUG_OBJECT (rtpsession, "reconsider RTCP");
-      next_rtcp_check_time = new_rtcp_send_time;
-    }
+    /* update current time */
+    current_time = gst_clock_get_time (clock);
+
+    /* we get unlocked because we need to perform reconsideration, don't perform
+     * the timeout but get a new reporting estimate. */
+    GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT,
+        res, GST_TIME_ARGS (current_time));
+
+    /* perform actions, we ignore result. */
+    rtp_session_on_timeout (rtpsession->priv->session, current_time);
   }
   GST_RTP_SESSION_UNLOCK (rtpsession);
 
@@ -536,6 +497,8 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
 
   GST_DEBUG_OBJECT (rtpsession, "sending RTCP");
 
+  gst_util_dump_mem (GST_BUFFER_DATA (buffer), GST_BUFFER_SIZE (buffer));
+
   if (rtpsession->send_rtcp_src) {
     result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
   } else {
@@ -616,6 +579,21 @@ gst_rtp_session_get_time (RTPSession * sess, gpointer user_data)
   return result;
 }
 
+/* called when the session manager asks us to reconsider the timeout */
+static void
+gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
+{
+  GstRTPSession *rtpsession;
+
+  rtpsession = GST_RTP_SESSION_CAST (user_data);
+
+  GST_RTP_SESSION_LOCK (rtpsession);
+  GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration");
+  if (rtpsession->priv->id)
+    gst_clock_id_unschedule (rtpsession->priv->id);
+  GST_RTP_SESSION_UNLOCK (rtpsession);
+}
+
 static GstFlowReturn
 gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
 {
index 1f6e1eb..7244f5f 100644 (file)
@@ -35,6 +35,8 @@ enum
   SIGNAL_ON_SSRC_COLLISION,
   SIGNAL_ON_SSRC_VALIDATED,
   SIGNAL_ON_BYE_SSRC,
+  SIGNAL_ON_BYE_TIMEOUT,
+  SIGNAL_ON_TIMEOUT,
   LAST_SIGNAL
 };
 
@@ -46,6 +48,14 @@ enum
   PROP_0
 };
 
+/* update average packet size, we keep this scaled by 16 to keep enough
+ * precision. */
+#define UPDATE_AVG(avg, val)           \
+  if ((avg) == 0)                      \
+   (avg) = (val) << 4;                 \
+  else                                         \
+   (avg) = ((val) + (15 * (avg))) >> 4;
+
 /* GObject vmethods */
 static void rtp_session_finalize (GObject * object);
 static void rtp_session_set_property (GObject * object, guint prop_id,
@@ -119,6 +129,30 @@ rtp_session_class_init (RTPSessionClass * klass)
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
       NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
       G_TYPE_OBJECT);
+  /**
+   * RTPSession::on-bye-timeout:
+   * @session: the object which received the signal
+   * @src: the RTPSource that timed out
+   *
+   * Notify of an SSRC that has timed out because of BYE
+   */
+  rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
+      g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
+      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+      G_TYPE_OBJECT);
+  /**
+   * RTPSession::on-timeout:
+   * @session: the object which received the signal
+   * @src: the RTPSource that timed out
+   *
+   * Notify of an SSRC that has timed out
+   */
+  rtp_session_signals[SIGNAL_ON_TIMEOUT] =
+      g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
+      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+      G_TYPE_OBJECT);
 
   GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
 }
@@ -144,6 +178,7 @@ rtp_session_init (RTPSession * sess)
 
   /* create an active SSRC for this session manager */
   sess->source = rtp_session_create_source (sess);
+  sess->source->validated = TRUE;
   sess->stats.active_sources++;
 
   /* default UDP header length */
@@ -156,6 +191,8 @@ rtp_session_init (RTPSession * sess)
   sess->name = g_strdup (g_get_real_name ());
   sess->tool = g_strdup ("GStreamer");
 
+  sess->first_rtcp = TRUE;
+
   GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
 }
 
@@ -176,6 +213,7 @@ rtp_session_finalize (GObject * object)
 
   g_free (sess->cname);
   g_free (sess->tool);
+  g_free (sess->bye_reason);
 
   G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
 }
@@ -233,9 +271,22 @@ on_ssrc_validated (RTPSession * sess, RTPSource * source)
 static void
 on_bye_ssrc (RTPSession * sess, RTPSource * source)
 {
+  /* notify app that reconsideration should be performed */
   g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
 }
 
+static void
+on_bye_timeout (RTPSession * sess, RTPSource * source)
+{
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
+}
+
+static void
+on_timeout (RTPSession * sess, RTPSource * source)
+{
+  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
+}
+
 /**
  * rtp_session_new:
  *
@@ -272,6 +323,7 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
   sess->callbacks.send_rtcp = callbacks->send_rtcp;
   sess->callbacks.clock_rate = callbacks->clock_rate;
   sess->callbacks.get_time = callbacks->get_time;
+  sess->callbacks.reconsider = callbacks->reconsider;
   sess->user_data = user_data;
 }
 
@@ -657,6 +709,11 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
     if (check_collision (sess, source, arrival))
       on_ssrc_collision (sess, source);
   }
+  /* update last activity */
+  source->last_activity = arrival->time;
+  if (rtp)
+    source->last_rtp_activity = arrival->time;
+
   return source;
 }
 
@@ -819,6 +876,7 @@ rtp_session_create_source (RTPSession * sess)
       break;
   }
   source = rtp_source_new (ssrc);
+  g_object_ref (source);
   g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
       source);
   /* we have one more source now */
@@ -831,6 +889,7 @@ rtp_session_create_source (RTPSession * sess)
 /* update the RTPArrivalStats structure with the current time and other bits
  * about the current buffer we are handling.
  * This function is typically called when a validated packet is received.
+ * This function should be called with the SESSION_LOCK
  */
 static void
 update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
@@ -842,9 +901,14 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
   else
     arrival->time = GST_CLOCK_TIME_NONE;
 
-  /* update sizes */
-  arrival->bytes = GST_BUFFER_SIZE (buffer) + 28;
-  arrival->payload_len = (rtp ? gst_rtp_buffer_get_payload_len (buffer) : 0);
+  /* get packet size including header overhead */
+  arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
+
+  if (rtp) {
+    arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
+  } else {
+    arrival->payload_len = 0;
+  }
 
   /* for netbuffer we can store the IP address to check for collisions */
   arrival->have_address = GST_IS_NETBUFFER (buffer);
@@ -881,13 +945,16 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
   if (!gst_rtp_buffer_validate (buffer))
     goto invalid_packet;
 
+  RTP_SESSION_LOCK (sess);
   /* update arrival stats */
   update_arrival_stats (sess, &arrival, TRUE, buffer);
 
+  /* ignore more RTP packets when we left the session */
+  if (sess->source->received_bye)
+    goto ignore;
+
   /* get SSRC and look up in session database */
   ssrc = gst_rtp_buffer_get_ssrc (buffer);
-
-  RTP_SESSION_LOCK (sess);
   source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
 
   prevsender = RTP_SOURCE_IS_SENDER (source);
@@ -930,6 +997,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
 
       /* get source */
       csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+
       if (created) {
         GST_DEBUG ("created new CSRC: %08x", csrc);
         rtp_source_set_as_csrc (csrc_src);
@@ -948,9 +1016,17 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
   /* ERRORS */
 invalid_packet:
   {
+    gst_buffer_unref (buffer);
     GST_DEBUG ("invalid RTP packet received");
     return GST_FLOW_OK;
   }
+ignore:
+  {
+    gst_buffer_unref (buffer);
+    RTP_SESSION_UNLOCK (sess);
+    GST_DEBUG ("ignoring RTP packet because we are leaving");
+    return GST_FLOW_OK;
+  }
 }
 
 /* A Sender report contains statistics about how the sender is doing. This
@@ -977,7 +1053,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
 
   GST_DEBUG ("got SR packet: SSRC %08x", senderssrc);
 
-  RTP_SESSION_LOCK (sess);
   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
 
   prevsender = RTP_SOURCE_IS_SENDER (source);
@@ -1012,7 +1087,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
           exthighestseq, jitter, lsr, dlsr);
     }
   }
-  RTP_SESSION_UNLOCK (sess);
 }
 
 /* A receiver report contains statistics about how a receiver is doing. It
@@ -1034,7 +1108,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
 
   GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
 
-  RTP_SESSION_LOCK (sess);
   source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
 
   if (created)
@@ -1054,7 +1127,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
           exthighestseq, jitter, lsr, dlsr);
     }
   }
-  RTP_SESSION_UNLOCK (sess);
 }
 
 /* FIXME, we're just printing this for now... */
@@ -1113,20 +1185,25 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
     guint32 ssrc;
     RTPSource *source;
     gboolean created, prevactive, prevsender;
+    guint pmembers, members;
 
     ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
     GST_DEBUG ("SSRC: %08x", ssrc);
 
     /* find src and mark bye, no probation when dealing with RTCP */
-    RTP_SESSION_LOCK (sess);
     source = obtain_source (sess, ssrc, &created, arrival, FALSE);
 
+    /* store time for when we need to time out this source */
+    source->bye_time = arrival->time;
+
     prevactive = RTP_SOURCE_IS_ACTIVE (source);
     prevsender = RTP_SOURCE_IS_SENDER (source);
 
     /* let the source handle the rest */
     rtp_source_process_bye (source, reason);
 
+    pmembers = sess->stats.active_sources;
+
     if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
       sess->stats.active_sources--;
       GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
@@ -1137,12 +1214,34 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
       GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
           sess->stats.sender_sources);
     }
+    members = sess->stats.active_sources;
+
+    if (!sess->source->received_bye && members < pmembers) {
+      /* some members went away since the previous timeout estimate. 
+       * Perform reverse reconsideration but only when we are not scheduling a
+       * BYE ourselves. */
+      if (arrival->time < sess->next_rtcp_check_time) {
+        GstClockTime time_remaining;
+
+        time_remaining = sess->next_rtcp_check_time - arrival->time;
+        sess->next_rtcp_check_time =
+            gst_util_uint64_scale (time_remaining, members, pmembers);
+
+        GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (sess->next_rtcp_check_time));
+
+        sess->next_rtcp_check_time += arrival->time;
+
+        /* notify app of reconsideration */
+        if (sess->callbacks.reconsider)
+          sess->callbacks.reconsider (sess, sess->user_data);
+      }
+    }
 
     if (created)
       on_new_ssrc (sess, source);
 
     on_bye_ssrc (sess, source);
-    RTP_SESSION_UNLOCK (sess);
   }
   g_free (reason);
 }
@@ -1167,9 +1266,8 @@ GstFlowReturn
 rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
 {
   GstRTCPPacket packet;
-  gboolean more;
+  gboolean more, is_bye = FALSE;
   RTPArrivalStats arrival;
-  guint size;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@@ -1177,27 +1275,29 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
   if (!gst_rtcp_buffer_validate (buffer))
     goto invalid_packet;
 
-  /* update arrival stats */
-  update_arrival_stats (sess, &arrival, FALSE, buffer);
-
   GST_DEBUG ("received RTCP packet");
 
-  /* get packet size including header overhead */
   RTP_SESSION_LOCK (sess);
-  size = GST_BUFFER_SIZE (buffer) + sess->header_len;
+  /* update arrival stats */
+  update_arrival_stats (sess, &arrival, FALSE, buffer);
 
-  /* update average RTCP packet size */
-  if (sess->stats.avg_rtcp_packet_size == 0)
-    sess->stats.avg_rtcp_packet_size = size;
-  else
-    sess->stats.avg_rtcp_packet_size =
-        (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4;
-  RTP_SESSION_UNLOCK (sess);
+  if (sess->sent_bye)
+    goto ignore;
 
   /* start processing the compound packet */
   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
   while (more) {
-    switch (gst_rtcp_packet_get_type (&packet)) {
+    GstRTCPType type;
+
+    type = gst_rtcp_packet_get_type (&packet);
+
+    /* when we are leaving the session, we should ignore all non-BYE messages */
+    if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
+      GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
+      goto next;
+    }
+
+    switch (type) {
       case GST_RTCP_TYPE_SR:
         rtp_session_process_sr (sess, &packet, &arrival);
         break;
@@ -1208,6 +1308,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
         rtp_session_process_sdes (sess, &packet, &arrival);
         break;
       case GST_RTCP_TYPE_BYE:
+        is_bye = TRUE;
         rtp_session_process_bye (sess, &packet, &arrival);
         break;
       case GST_RTCP_TYPE_APP:
@@ -1217,9 +1318,23 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
         GST_WARNING ("got unknown RTCP packet");
         break;
     }
+  next:
     more = gst_rtcp_packet_move_to_next (&packet);
   }
 
+  /* if we are scheduling a BYE, we only want to count bye packets, else we
+   * count everything */
+  if (sess->source->received_bye) {
+    if (is_bye) {
+      sess->stats.bye_members++;
+      UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
+    }
+  } else {
+    /* keep track of average packet size */
+    UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
+  }
+  RTP_SESSION_UNLOCK (sess);
+
   gst_buffer_unref (buffer);
 
   return GST_FLOW_OK;
@@ -1230,11 +1345,18 @@ invalid_packet:
     GST_DEBUG ("invalid RTCP packet received");
     return GST_FLOW_OK;
   }
+ignore:
+  {
+    gst_buffer_unref (buffer);
+    RTP_SESSION_UNLOCK (sess);
+    GST_DEBUG ("ignoring RTP packet because we left");
+    return GST_FLOW_OK;
+  }
 }
 
 /**
  * rtp_session_send_rtp:
- * @sess: and #RTPSession
+ * @sess: an #RTPSession
  * @buffer: an RTP buffer
  *
  * Send the RTP buffer in the session manager.
@@ -1266,25 +1388,125 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
   return result;
 }
 
+static GstClockTime
+calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
+    gboolean first)
+{
+  GstClockTime result;
+
+  if (sess->source->received_bye) {
+    result = rtp_stats_calculate_rtcp_interval (&sess->stats,
+        RTP_SOURCE_IS_SENDER (sess->source), first);
+  } else {
+    result = rtp_stats_calculate_bye_interval (&sess->stats);
+  }
+
+  GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (result));
+
+  if (!deterministic)
+    result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+
+  GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
+
+  return result;
+}
+
 /**
- * rtp_session_get_reporting_interval:
+ * rtp_session_send_bye:
  * @sess: an #RTPSession
+ * @reason: a reason or NULL
  *
- * Get the interval for sending out the next RTCP packet and doing general
- * maintenance tasks.
+ * Stop the current @sess and schedule a BYE message for the other members.
  *
- * Returns: an interval in seconds.
+ * Returns: a #GstFlowReturn.
  */
-gdouble
-rtp_session_get_reporting_interval (RTPSession * sess)
+GstFlowReturn
+rtp_session_send_bye (RTPSession * sess, const gchar * reason)
+{
+  GstFlowReturn result = GST_FLOW_OK;
+  RTPSource *source;
+  GstClockTime current, interval;
+
+  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+
+  RTP_SESSION_LOCK (sess);
+  source = sess->source;
+
+  /* ignore more BYEs */
+  if (source->received_bye)
+    goto done;
+
+  /* we have BYE now */
+  source->received_bye = TRUE;
+  /* at least one member wants to send a BYE */
+  sess->bye_reason = g_strdup (reason);
+  sess->stats.avg_rtcp_packet_size = 100;
+  sess->stats.bye_members = 1;
+  sess->first_rtcp = TRUE;
+  sess->sent_bye = FALSE;
+
+  /* get current time */
+  if (sess->callbacks.get_time)
+    current = sess->callbacks.get_time (sess, sess->user_data);
+  else
+    current = 0;
+
+  /* reschedule transmission */
+  sess->last_rtcp_send_time = current;
+  interval = calculate_rtcp_interval (sess, FALSE, TRUE);
+  sess->next_rtcp_check_time = current + interval;
+
+  GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
+
+  /* notify app of reconsideration */
+  if (sess->callbacks.reconsider)
+    sess->callbacks.reconsider (sess, sess->user_data);
+done:
+  RTP_SESSION_UNLOCK (sess);
+
+  return result;
+}
+
+/**
+ * rtp_session_next_timeout:
+ * @sess: an #RTPSession
+ * @time: the current time
+ *
+ * Get the next time we should perform session maintenance tasks.
+ *
+ * Returns: a time when rtp_session_on_timeout() should be called with the
+ * current time.
+ */
+GstClockTime
+rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
 {
-  gdouble result;
+  GstClockTime result;
 
   g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
 
   RTP_SESSION_LOCK (sess);
-  result = rtp_stats_calculate_rtcp_interval (&sess->stats, FALSE);
-  result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+
+  result = sess->next_rtcp_check_time;
+
+  if (sess->source->received_bye) {
+    if (sess->sent_bye)
+      result = GST_CLOCK_TIME_NONE;
+    else if (sess->stats.active_sources >= 50)
+      /* reconsider BYE if members >= 50 */
+      result = time + calculate_rtcp_interval (sess, FALSE, TRUE);;
+  } else {
+    if (sess->first_rtcp)
+      /* we are called for the first time */
+      result = time + calculate_rtcp_interval (sess, FALSE, TRUE);
+    else if (sess->next_rtcp_check_time < time)
+      /* get a new timeout when we need to */
+      result = time + calculate_rtcp_interval (sess, FALSE, FALSE);
+  }
+  sess->next_rtcp_check_time = result;
+
+  GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
   RTP_SESSION_UNLOCK (sess);
 
   return result;
@@ -1295,34 +1517,46 @@ typedef struct
   RTPSession *sess;
   GstBuffer *rtcp;
   GstClockTime time;
+  GstClockTime interval;
   GstRTCPPacket packet;
+  gboolean is_bye;
+  gboolean has_sdes;
 } ReportData;
 
 static void
+session_start_rtcp (RTPSession * sess, ReportData * data)
+{
+  GstRTCPPacket *packet = &data->packet;
+  RTPSource *own = sess->source;
+
+  data->rtcp = gst_rtcp_buffer_new (sess->mtu);
+
+  if (RTP_SOURCE_IS_SENDER (own)) {
+    /* we are a sender, create SR */
+    GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
+    gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
+
+    /* fill in sender report info, FIXME NTP and RTP timestamps missing */
+    gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
+        0, 0, own->stats.packets_sent, own->stats.octets_sent);
+  } else {
+    /* we are only receiver, create RR */
+    GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
+    gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
+    gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
+  }
+}
+
+/* construct a Sender or Receiver Report */
+static void
 session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
 {
   RTPSession *sess = data->sess;
-  RTPSource *own = sess->source;
   GstRTCPPacket *packet = &data->packet;
 
   /* create a new buffer if needed */
   if (data->rtcp == NULL) {
-    data->rtcp = gst_rtcp_buffer_new (sess->mtu);
-
-    if (RTP_SOURCE_IS_SENDER (own)) {
-      /* we are a sender, create SR */
-      GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
-      gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
-
-      /* fill in sender report info */
-      gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
-          0, 0, own->stats.packets_sent, own->stats.octets_sent);
-    } else {
-      /* we are only receiver, create RR */
-      GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
-      gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
-      gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
-    }
+    session_start_rtcp (sess, data);
   }
   if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
     /* only report about other sender sources */
@@ -1381,16 +1615,85 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
   }
 }
 
+/* perform cleanup of sources that timed out */
+static gboolean
+session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
+{
+  gboolean remove = FALSE;
+  gboolean byetimeout = FALSE;
+  gboolean is_sender, is_active;
+  RTPSession *sess = data->sess;
+  GstClockTime interval;
+
+  is_sender = RTP_SOURCE_IS_SENDER (source);
+  is_active = RTP_SOURCE_IS_ACTIVE (source);
+
+  /* check for our own source, we don't want to delete our own source. */
+  if (!(source == sess->source)) {
+    if (source->received_bye) {
+      /* if we received a BYE from the source, remove the source after some
+       * time. */
+      if (data->time > source->bye_time &&
+          data->time - source->bye_time > sess->stats.bye_timeout) {
+        GST_DEBUG ("removing BYE source %08x", source->ssrc);
+        remove = TRUE;
+        byetimeout = TRUE;
+      }
+    }
+    /* sources that were inactive for more than 5 times the deterministic reporting
+     * interval get timed out. the min timeout is 5 seconds. */
+    if (data->time > source->last_activity) {
+      interval = MAX (data->interval * 5, 5 * GST_SECOND);
+      if (data->time - source->last_activity > interval) {
+        GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
+            source->ssrc, GST_TIME_ARGS (source->last_activity));
+        remove = TRUE;
+      }
+    }
+  }
+
+  /* senders that did not send for a long time become a receiver, this also
+   * holds for our own source. */
+  if (is_sender) {
+    if (data->time > source->last_rtp_activity) {
+      interval = MAX (data->interval * 2, 5 * GST_SECOND);
+
+      if (data->time - source->last_rtp_activity > interval) {
+        GST_DEBUG ("sender source %08x timed out and became receiver, last %"
+            GST_TIME_FORMAT, source->ssrc,
+            GST_TIME_ARGS (source->last_rtp_activity));
+        source->is_sender = FALSE;
+        sess->stats.sender_sources--;
+      }
+    }
+  }
+
+  if (remove) {
+    sess->total_sources--;
+    if (is_sender)
+      sess->stats.sender_sources--;
+    if (is_active)
+      sess->stats.active_sources--;
+
+    if (byetimeout)
+      on_bye_timeout (sess, source);
+    else
+      on_timeout (sess, source);
+
+  }
+  return remove;
+}
+
 static void
-session_sdes (RTPSession * sess, GstBuffer * buffer)
+session_sdes (RTPSession * sess, ReportData * data)
 {
-  GstRTCPPacket packet;
+  GstRTCPPacket *packet = &data->packet;
 
   /* add SDES packet */
-  gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_SDES, &packet);
+  gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
 
-  gst_rtcp_packet_sdes_add_item (&packet, sess->source->ssrc);
-  gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_CNAME,
+  gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
+  gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME,
       strlen (sess->cname), (guint8 *) sess->cname);
 
   /* other SDES items must only be added at regular intervals and only when the
@@ -1401,20 +1704,87 @@ session_sdes (RTPSession * sess, GstBuffer * buffer)
   gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
       strlen (sess->tool), (guint8 *) sess->tool);
 #endif
+
+  data->has_sdes = TRUE;
+}
+
+/* schedule a BYE packet */
+static void
+session_bye (RTPSession * sess, ReportData * data)
+{
+  GstRTCPPacket *packet = &data->packet;
+
+  /* open packet */
+  session_start_rtcp (sess, data);
+
+  /* add SDES */
+  session_sdes (sess, data);
+
+  /* add a BYE packet */
+  gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
+  gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
+  if (sess->bye_reason)
+    gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
+
+  /* we have a BYE packet now */
+  data->is_bye = TRUE;
+}
+
+static gboolean
+is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
+{
+  GstClockTime new_send_time;
+  gboolean result;
+
+  /* no need to check yet */
+  if (sess->next_rtcp_check_time > time) {
+    GST_DEBUG ("no check time yet");
+    return FALSE;
+  }
+
+  /* perform forward reconsideration */
+  new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
+
+  GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (new_send_time));
+
+  new_send_time += sess->last_rtcp_send_time;
+
+  /* check if reconsideration */
+  if (time < new_send_time) {
+    GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (new_send_time));
+    result = FALSE;
+    /* store new check time */
+    sess->next_rtcp_check_time = new_send_time;
+  } else {
+    result = TRUE;
+    new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
+
+    GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (new_send_time));
+    sess->next_rtcp_check_time = time + new_send_time;
+  }
+  return result;
 }
 
 /**
- * rtp_session_perform_reporting:
+ * rtp_session_on_timeout:
  * @sess: an #RTPSession
  *
- * Instruct the session manager to generate RTCP packets with current stats.
- * This function will call the #RTPSessionSendRTCP callback, possibly multiple
+ * Perform maintenance actions after the timeout obtained with
+ * rtp_session_next_timeout() expired.
+ *
+ * This function will perform timeouts of receivers and senders, send a BYE
+ * packet or generate RTCP packets with current session stats.
+ *
+ * This function can call the #RTPSessionSendRTCP callback, possibly multiple
  * times, for each packet that should be processed.
  *
  * Returns: a #GstFlowReturn.
  */
 GstFlowReturn
-rtp_session_perform_reporting (RTPSession * sess)
+rtp_session_on_timeout (RTPSession * sess, GstClockTime time)
 {
   GstFlowReturn result = GST_FLOW_OK;
   ReportData data;
@@ -1423,21 +1793,49 @@ rtp_session_perform_reporting (RTPSession * sess)
 
   data.sess = sess;
   data.rtcp = NULL;
+  data.time = time;
+  data.is_bye = FALSE;
+  data.has_sdes = FALSE;
 
-  /* get time so it can be used later */
-  data.time = sess->callbacks.get_time (sess, sess->user_data);
+  GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
 
   RTP_SESSION_LOCK (sess);
-  /* loop over all known sources and do something */
-  g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
-      (GHFunc) session_report_blocks, &data);
+  /* get a new interval, we need this for various cleanups etc */
+  data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
+
+  /* first perform cleanups */
+  g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
+      (GHRFunc) session_cleanup, &data);
+
+  /* see if we need to generate SR or RR packets */
+  if (is_rtcp_time (sess, time, &data)) {
+    if (sess->source->received_bye) {
+      /* generate BYE instead */
+      session_bye (sess, &data);
+      sess->sent_bye = TRUE;
+    } else {
+      /* loop over all known sources and do something */
+      g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+          (GHFunc) session_report_blocks, &data);
+    }
+  }
 
-  /* add SDES for this source */
   if (data.rtcp) {
-    session_sdes (sess, data.rtcp);
-    sess->stats.sent_rtcp = TRUE;
-  }
+    guint size;
+
+    /* we keep track of the last report time in order to timeout inactive
+     * receivers or senders */
+    sess->last_rtcp_send_time = data.time;
+    sess->first_rtcp = FALSE;
 
+    /* add SDES for this source when not already added */
+    if (!data.has_sdes)
+      session_sdes (sess, &data);
+
+    /* update average RTCP size before sending */
+    size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
+    UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
+  }
   RTP_SESSION_UNLOCK (sess);
 
   /* push out the RTCP packet */
@@ -1451,5 +1849,6 @@ rtp_session_perform_reporting (RTPSession * sess)
     else
       gst_buffer_unref (data.rtcp);
   }
+
   return result;
 }
index 3554016..c9a2114 100644 (file)
@@ -106,6 +106,17 @@ typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer
 typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data);
 
 /**
+ * RTPSessionReconsider:
+ * @sess: an #RTPSession
+ * @user_data: user data specified when registering
+ *
+ * This callback will be called when @sess needs to cancel the previous timeout. 
+ * The currently running timeout should be canceled and a new reporting interval
+ * should be requested from @sess.
+ */
+typedef void (*RTPSessionReconsider) (RTPSession *sess, gpointer user_data);
+
+/**
  * RTPSessionCallbacks:
  * @RTPSessionProcessRTP: callback to process RTP packets
  * @RTPSessionSendRTP: callback for sending RTP packets
@@ -122,6 +133,7 @@ typedef struct {
   RTPSessionSendRTCP    send_rtcp;
   RTPSessionClockRate   clock_rate;
   RTPSessionGetTime     get_time;
+  RTPSessionReconsider  reconsider;
 } RTPSessionCallbacks;
 
 /**
@@ -164,6 +176,14 @@ struct _RTPSession {
   GHashTable   *cnames;
   guint         total_sources;
 
+  GstClockTime  next_rtcp_check_time;
+  GstClockTime  last_rtcp_send_time;
+  gboolean      first_rtcp;
+
+  GstBuffer    *bye_packet;
+  gchar        *bye_reason;
+  gboolean      sent_bye;
+
   RTPSessionCallbacks callbacks;
   gpointer            user_data;
 
@@ -185,6 +205,8 @@ struct _RTPSessionClass {
   void (*on_ssrc_collision) (RTPSession *sess, RTPSource *source);
   void (*on_ssrc_validated) (RTPSession *sess, RTPSource *source);
   void (*on_bye_ssrc)       (RTPSession *sess, RTPSource *source);
+  void (*on_bye_timeout)    (RTPSession *sess, RTPSource *source);
+  void (*on_timeout)        (RTPSession *sess, RTPSource *source);
 };
 
 GType rtp_session_get_type (void);
@@ -229,8 +251,11 @@ GstFlowReturn   rtp_session_process_rtcp           (RTPSession *sess, GstBuffer
 /* processing packets for sending */
 GstFlowReturn   rtp_session_send_rtp               (RTPSession *sess, GstBuffer *buffer);
 
+/* stopping the session */
+GstFlowReturn   rtp_session_send_bye               (RTPSession *sess, const gchar *reason);
+
 /* get interval for next RTCP interval */
-gdouble         rtp_session_get_reporting_interval (RTPSession *sess);
-GstFlowReturn   rtp_session_perform_reporting      (RTPSession *sess);
+GstClockTime    rtp_session_next_timeout          (RTPSession *sess, GstClockTime time);
+GstFlowReturn   rtp_session_on_timeout            (RTPSession *sess, GstClockTime time);
 
 #endif /* __RTP_SESSION_H__ */
index f5ca2a1..0df03f4 100644 (file)
@@ -136,6 +136,10 @@ struct _RTPSource {
   guint8        payload;
   gint          clock_rate;
 
+  GstClockTime  bye_time;
+  GstClockTime  last_activity;
+  GstClockTime  last_rtp_activity;
+
   GQueue       *packets;
 
   RTPSourceCallbacks callbacks;
index 456ed15..1e18f45 100644 (file)
@@ -33,63 +33,77 @@ rtp_stats_init_defaults (RTPSessionStats * stats)
   stats->receiver_fraction = RTP_STATS_RECEIVER_FRACTION;
   stats->rtcp_bandwidth = RTP_STATS_RTCP_BANDWIDTH;
   stats->min_interval = RTP_STATS_MIN_INTERVAL;
+  stats->bye_timeout = RTP_STATS_BYE_TIMEOUT;
 }
 
 /**
  * rtp_stats_calculate_rtcp_interval:
  * @stats: an #RTPSessionStats struct
+ * @sender: if we are a sender
+ * @first: if this is the first time
  * 
  * Calculate the RTCP interval. The result of this function is the amount of
- * time to wait (in seconds) before sender a new RTCP message.
+ * time to wait (in nanoseconds) before sending a new RTCP message.
  *
  * Returns: the RTCP interval.
  */
-gdouble
-rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender)
+GstClockTime
+rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean we_send,
+    gboolean first)
 {
-  gdouble active, senders, receivers, sfraction;
-  gboolean avg_rtcp;
+  gdouble members, senders, n;
+  gdouble avg_rtcp_size, rtcp_bw;
   gdouble interval;
+  gdouble rtcp_min_time;
 
-  active = stats->active_sources;
-  /* Try to avoid division by zero */
-  if (stats->active_sources == 0)
-    active += 1.0;
 
-  senders = (gdouble) stats->sender_sources;
-  receivers = (gdouble) (active - senders);
-  avg_rtcp = (gdouble) stats->avg_rtcp_packet_size;
-
-  sfraction = senders / active;
+  /* Very first call at application start-up uses half the min
+   * delay for quicker notification while still allowing some time
+   * before reporting for randomization and to learn about other
+   * sources so the report interval will converge to the correct
+   * interval more quickly.
+   */
+  rtcp_min_time = stats->min_interval;
+  if (first)
+    rtcp_min_time /= 2.0;
 
-  GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f",
-      senders, receivers, avg_rtcp, sfraction);
+  /* Dedicate a fraction of the RTCP bandwidth to senders unless
+   * the number of senders is large enough that their share is
+   * more than that fraction.
+   */
+  n = members = stats->active_sources;
+  senders = (gdouble) stats->sender_sources;
+  rtcp_bw = stats->rtcp_bandwidth;
 
-  if (senders > 0 && sfraction <= stats->sender_fraction) {
-    if (sender) {
-      interval =
-          (avg_rtcp * senders) / (stats->sender_fraction *
-          stats->rtcp_bandwidth);
+  if (senders <= members * RTP_STATS_SENDER_FRACTION) {
+    if (we_send) {
+      rtcp_bw *= RTP_STATS_SENDER_FRACTION;
+      n = senders;
     } else {
-      interval =
-          (avg_rtcp * receivers) / ((1.0 -
-              stats->sender_fraction) * stats->rtcp_bandwidth);
+      rtcp_bw *= RTP_STATS_RECEIVER_FRACTION;
+      n -= senders;
     }
-  } else {
-    interval = (avg_rtcp * active) / stats->rtcp_bandwidth;
   }
 
-  if (interval < stats->min_interval)
-    interval = stats->min_interval;
-
-  if (!stats->sent_rtcp)
-    interval /= 2.0;
+  avg_rtcp_size = stats->avg_rtcp_packet_size / 16.0;
+  /*
+   * The effective number of sites times the average packet size is
+   * the total number of octets sent when each site sends a report.
+   * Dividing this by the effective bandwidth gives the time
+   * interval over which those packets must be sent in order to
+   * meet the bandwidth target, with a minimum enforced.  In that
+   * time interval we send one report so this time is also our
+   * average time between reports.
+   */
+  interval = avg_rtcp_size * n / rtcp_bw;
+  if (interval < rtcp_min_time)
+    interval = rtcp_min_time;
 
-  return interval;
+  return interval * GST_SECOND;
 }
 
 /**
- * rtp_stats_calculate_rtcp_interval:
+ * rtp_stats_add_rtcp_jitter:
  * @stats: an #RTPSessionStats struct
  * @interval: an RTCP interval
  * 
@@ -98,14 +112,62 @@ rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender)
  *
  * Returns: the new RTCP interval.
  */
-gdouble
-rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, gdouble interval)
+GstClockTime
+rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, GstClockTime interval)
 {
+  gdouble temp;
+
   /* see RFC 3550 p 30 
    * To compensate for "unconditional reconsideration" converging to a
    * value below the intended average.
    */
 #define COMPENSATION  (2.71828 - 1.5);
 
-  return (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION;
+  temp = (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION;
+
+  return (GstClockTime) temp;
+}
+
+
+/**
+ * rtp_stats_calculate_bye_interval:
+ * @stats: an #RTPSessionStats struct
+ * 
+ * Calculate the BYE interval. The result of this function is the amount of
+ * time to wait (in nanoseconds) before sending a BYE message.
+ *
+ * Returns: the BYE interval.
+ */
+GstClockTime
+rtp_stats_calculate_bye_interval (RTPSessionStats * stats)
+{
+  gdouble members;
+  gdouble avg_rtcp_size, rtcp_bw;
+  gdouble interval;
+  gdouble rtcp_min_time;
+
+  rtcp_min_time = (stats->min_interval) / 2.0;
+
+  /* Dedicate a fraction of the RTCP bandwidth to senders unless
+   * the number of senders is large enough that their share is
+   * more than that fraction.
+   */
+  members = stats->bye_members;
+  rtcp_bw = stats->rtcp_bandwidth * RTP_STATS_RECEIVER_FRACTION;
+
+  avg_rtcp_size = stats->avg_rtcp_packet_size / 16.0;
+  /*
+   * The effective number of sites times the average packet size is
+   * the total number of octets sent when each site sends a report.
+   * Dividing this by the effective bandwidth gives the time
+   * interval over which those packets must be sent in order to
+   * meet the bandwidth target, with a minimum enforced.  In that
+   * time interval we send one report so this time is also our
+   * average time between reports.
+   */
+  interval = avg_rtcp_size * members / rtcp_bw;
+  if (interval < rtcp_min_time)
+    interval = rtcp_min_time;
+
+  return interval * GST_SECOND;
 }
index e8ea981..0ee1ed1 100644 (file)
@@ -134,7 +134,7 @@ typedef struct {
  * a network partition.
  */
 #define RTP_STATS_MIN_INTERVAL      5.0
- /*
+/*
  * Fraction of the RTCP bandwidth to be shared among active
  * senders.  (This fraction was chosen so that in a typical
  * session with one or two active senders, the computed report
@@ -145,6 +145,12 @@ typedef struct {
 #define RTP_STATS_SENDER_FRACTION       (0.25)
 #define RTP_STATS_RECEIVER_FRACTION     (1.0 - RTP_STATS_SENDER_FRACTION)
 
+/*
+ * When receiving a BYE from a source, remove the source fomr the database
+ * after this timeout.
+ */
+#define RTP_STATS_BYE_TIMEOUT           (2 * GST_SECOND)
+
 /**
  * RTPSessionStats:
  *
@@ -156,16 +162,17 @@ typedef struct {
   gdouble       receiver_fraction;
   gdouble       rtcp_bandwidth;
   gdouble       min_interval;
+  GstClockTime  bye_timeout;
   guint         sender_sources;
   guint         active_sources;
   guint         avg_rtcp_packet_size;
-  guint         avg_bye_packet_size;
-  gboolean      sent_rtcp;
+  guint         bye_members;
 } RTPSessionStats;
 
 void           rtp_stats_init_defaults               (RTPSessionStats *stats);
 
-gdouble        rtp_stats_calculate_rtcp_interval    (RTPSessionStats *stats, gboolean sender);
-gdouble        rtp_stats_add_rtcp_jitter            (RTPSessionStats *stats, gdouble interval);
+GstClockTime   rtp_stats_calculate_rtcp_interval    (RTPSessionStats *stats, gboolean sender, gboolean first);
+GstClockTime   rtp_stats_add_rtcp_jitter            (RTPSessionStats *stats, GstClockTime interval);
+GstClockTime   rtp_stats_calculate_bye_interval     (RTPSessionStats *stats);
 
 #endif /* __RTP_STATS_H__ */