SIGNAL_ON_SENDING_RTCP,
SIGNAL_ON_FEEDBACK_RTCP,
SIGNAL_SEND_RTCP,
+ SIGNAL_SEND_RTCP_FULL,
+ SIGNAL_ON_RECEIVING_RTCP,
LAST_SIGNAL
};
#define DEFAULT_INTERNAL_SOURCE NULL
-#define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH
-#define DEFAULT_RTCP_FRACTION (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
+#define DEFAULT_BANDWIDTH 0.0
+#define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_FRACTION
#define DEFAULT_RTCP_RR_BANDWIDTH -1
#define DEFAULT_RTCP_RS_BANDWIDTH -1
#define DEFAULT_RTCP_MTU 1400
PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
PROP_PROBATION,
- PROP_LAST
+ PROP_STATS
};
/* update average packet size */
(avg) = ((val) + (15 * (avg))) >> 4;
-/* The number RTCP intervals after which to timeout entries in the
- * collision table
- */
-#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
-
/* GObject vmethods */
static void rtp_session_finalize (GObject * object);
static void rtp_session_set_property (GObject * object, guint prop_id,
static void rtp_session_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static void rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp (RTPSession * sess,
+ GstClockTime max_delay);
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
- gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
+ gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
static RTPSource *obtain_internal_source (RTPSession * sess,
- guint32 ssrc, gboolean * created);
+ guint32 ssrc, gboolean * created, GstClockTime current_time);
static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
GstClockTime current_time);
static GstClockTime calculate_rtcp_interval (RTPSession * sess,
G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
+ /**
+ * RTPSession::send-rtcp-full:
+ * @session: the object which received the signal
+ * @max_delay: The maximum delay after which the feedback will not be useful
+ * anymore
+ *
+ * Requests that the #RTPSession initiate a new RTCP packet as soon as
+ * possible within the requested delay.
+ *
+ * Returns: TRUE if the new RTCP packet could be scheduled within the
+ * requested delay, FALSE otherwise.
+ *
+ * Since: 1.6
+ */
+ rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
+ g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
+
+ /**
+ * RTPSession::on-receiving-rtcp
+ * @session: the object which received the signal
+ * @buffer: the #GstBuffer containing the RTCP packet that was received
+ *
+ * This signal is emitted when receiving an RTCP packet before it is handled
+ * by the session. It can be used to extract custom information from RTCP packets.
+ *
+ * Since: 1.6
+ */
+ rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
+ g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
+ GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
g_param_spec_uint ("internal-ssrc", "Internal SSRC",
"The internal SSRC used for the session (deprecated)",
g_param_spec_uint ("rtcp-immediate-feedback-threshold",
"RTCP Immediate Feedback threshold",
"The maximum number of members of a RTP session for which immediate"
- " feedback is used",
+ " feedback is used (DEPRECATED: has no effect and is not needed)",
0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
g_object_class_install_property (gobject_class, PROP_PROBATION,
g_param_spec_uint ("probation", "Number of probations",
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * RTPSession::stats:
+ *
+ * Various session statistics. This property returns a GstStructure
+ * with name application/x-rtp-session-stats with the following fields:
+ *
+ * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
+ * dropped (due to bandwidth constraints)
+ * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
+ * "recv-nack-count" G_TYPE_UINT Number of NACKs received
+ *
+ * Since: 1.4
+ */
+ g_object_class_install_property (gobject_class, PROP_STATS,
+ g_param_spec_boxed ("stats", "Statistics",
+ "Various statistics", GST_TYPE_STRUCTURE,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
klass->get_source_by_ssrc =
GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
sess->mask_idx = 0;
sess->mask = 0;
- for (i = 0; i < 32; i++) {
+ /* TODO: We currently only use the first hash table but this is the
+ * beginning of an implementation for RFC2762
+ for (i = 0; i < 32; i++) {
+ */
+ for (i = 0; i < 1; i++) {
sess->ssrcs[i] =
g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) g_object_unref);
sess->first_rtcp = TRUE;
sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
sess->allow_early = TRUE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
+
+ sess->is_doing_ptp = TRUE;
}
static void
gst_structure_free (sess->sdes);
- for (i = 0; i < 32; i++)
+ g_list_free_full (sess->conflicting_addresses,
+ (GDestroyNotify) rtp_conflicting_address_free);
+
+ /* TODO: Change this again when implementing RFC 2762
+ * for (i = 0; i < 32; i++)
+ */
+ for (i = 0; i < 1; i++)
g_hash_table_destroy (sess->ssrcs[i]);
g_mutex_clear (&sess->lock);
return res;
}
+static GstStructure *
+rtp_session_create_stats (RTPSession * sess)
+{
+ GstStructure *s;
+
+ s = gst_structure_new ("application/x-rtp-session-stats",
+ "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
+ "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
+ "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
+
+ return s;
+}
+
static void
rtp_session_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
switch (prop_id) {
case PROP_INTERNAL_SSRC:
+ RTP_SESSION_LOCK (sess);
+ sess->suggested_ssrc = g_value_get_uint (value);
+ RTP_SESSION_UNLOCK (sess);
+ if (sess->callbacks.reconfigure)
+ sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
break;
case PROP_BANDWIDTH:
RTP_SESSION_LOCK (sess);
case PROP_PROBATION:
g_value_set_uint (value, sess->probation);
break;
+ case PROP_STATS:
+ g_value_take_boxed (value, rtp_session_create_stats (sess));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
sess->callbacks.notify_nack = callbacks->notify_nack;
sess->notify_nack_user_data = user_data;
}
+ if (callbacks->reconfigure) {
+ sess->callbacks.reconfigure = callbacks->reconfigure;
+ sess->reconfigure_user_data = user_data;
+ }
}
/**
(RTPSourceClockRate) source_clock_rate,
};
+
+/**
+ * rtp_session_find_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to check for
+ * @time: The time when the packet that is possibly in conflict arrived
+ *
+ * Checks if an address which has a conflict is already known. If it is
+ * a known conflict, remember the time
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+static gboolean
+rtp_session_find_conflicting_address (RTPSession * session,
+ GSocketAddress * address, GstClockTime time)
+{
+ return find_conflicting_address (session->conflicting_addresses, address,
+ time);
+}
+
+/**
+ * rtp_session_add_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to remember
+ * @time: The time when the packet that is in conflict arrived
+ *
+ * Adds a new conflict address
+ */
+static void
+rtp_session_add_conflicting_address (RTPSession * sess,
+ GSocketAddress * address, GstClockTime time)
+{
+ sess->conflicting_addresses =
+ add_conflicting_address (sess->conflicting_addresses, address, time);
+}
+
+
static gboolean
check_collision (RTPSession * sess, RTPSource * source,
- RTPArrivalStats * arrival, gboolean rtp)
+ RTPPacketInfo * pinfo, gboolean rtp)
{
guint32 ssrc;
- /* If we have no arrival address, we can't do collision checking */
- if (!arrival->address)
+ /* If we have no pinfo address, we can't do collision checking */
+ if (!pinfo->address)
return FALSE;
ssrc = rtp_source_get_ssrc (source);
}
if (from) {
- if (__g_socket_address_equal (from, arrival->address)) {
+ if (__g_socket_address_equal (from, pinfo->address)) {
/* Address is the same */
return FALSE;
} else {
GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
if (sess->favor_new) {
if (rtp_source_find_conflicting_address (source,
- arrival->address, arrival->current_time)) {
+ pinfo->address, pinfo->current_time)) {
gchar *buf1;
- buf1 = __g_socket_address_to_string (arrival->address);
+ buf1 = __g_socket_address_to_string (pinfo->address);
GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
buf1);
g_free (buf1);
* a new source. Save old address in possible conflict list
*/
rtp_source_add_conflicting_address (source, from,
- arrival->current_time);
+ pinfo->current_time);
buf1 = __g_socket_address_to_string (from);
- buf2 = __g_socket_address_to_string (arrival->address);
+ buf2 = __g_socket_address_to_string (pinfo->address);
GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
" saving old as known conflict", ssrc, buf1, buf2);
if (rtp)
- rtp_source_set_rtp_from (source, arrival->address);
+ rtp_source_set_rtp_from (source, pinfo->address);
else
- rtp_source_set_rtcp_from (source, arrival->address);
+ rtp_source_set_rtcp_from (source, pinfo->address);
g_free (buf1);
g_free (buf2);
} else {
/* We don't already have a from address for RTP, just set it */
if (rtp)
- rtp_source_set_rtp_from (source, arrival->address);
+ rtp_source_set_rtp_from (source, pinfo->address);
else
- rtp_source_set_rtcp_from (source, arrival->address);
+ rtp_source_set_rtcp_from (source, pinfo->address);
return FALSE;
}
*/
} else {
/* This is sending with our ssrc, is it an address we already know */
- if (rtp_source_find_conflicting_address (source, arrival->address,
- arrival->current_time)) {
+ if (rtp_session_find_conflicting_address (sess, pinfo->address,
+ pinfo->current_time)) {
/* Its a known conflict, its probably a loop, not a collision
* lets just drop the incoming packet
*/
GST_DEBUG ("Our packets are being looped back to us, dropping");
} else {
/* Its a new collision, lets change our SSRC */
- rtp_source_add_conflicting_address (source, arrival->address,
- arrival->current_time);
+ rtp_session_add_conflicting_address (sess, pinfo->address,
+ pinfo->current_time);
GST_DEBUG ("Collision for SSRC %x", ssrc);
/* mark the source BYE */
on_ssrc_collision (sess, source);
- rtp_session_schedule_bye_locked (sess, arrival->current_time);
+ rtp_session_schedule_bye_locked (sess, pinfo->current_time);
}
}
return TRUE;
}
-static RTPSource *
-find_source (RTPSession * sess, guint32 ssrc)
+typedef struct
{
- return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (ssrc));
+ gboolean is_doing_ptp;
+ GSocketAddress *new_addr;
+} CompareAddrData;
+
+/* check if the two given ip addr are the same (do not care about the port) */
+static gboolean
+ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
+{
+ return
+ g_inet_address_equal (g_inet_socket_address_get_address
+ (G_INET_SOCKET_ADDRESS (a)),
+ g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
+}
+
+static void
+compare_rtp_source_addr (const gchar * key, RTPSource * source,
+ CompareAddrData * data)
+{
+ /* only compare ip addr of remote sources which are also not closing */
+ if (!source->internal && !source->closing && source->rtp_from) {
+ /* look for the first rtp source */
+ if (!data->new_addr)
+ data->new_addr = source->rtp_from;
+ /* compare current ip addr with the first one */
+ else
+ data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
+ }
+}
+
+static void
+compare_rtcp_source_addr (const gchar * key, RTPSource * source,
+ CompareAddrData * data)
+{
+ /* only compare ip addr of remote sources which are also not closing */
+ if (!source->internal && !source->closing && source->rtcp_from) {
+ /* look for the first rtcp source */
+ if (!data->new_addr)
+ data->new_addr = source->rtcp_from;
+ else
+ /* compare current ip addr with the first one */
+ data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
+ }
+}
+
+/* loop over our non-internal source to know if the session
+ * is doing point-to-point */
+static void
+session_update_ptp (RTPSession * sess)
+{
+ /* to know if the session is doing point to point, the ip addr
+ * of each non-internal (=remotes) source have to be compared
+ * to each other.
+ */
+ gboolean is_doing_rtp_ptp;
+ gboolean is_doing_rtcp_ptp;
+ CompareAddrData data;
+
+ /* compare the first remote source's ip addr that receive rtp packets
+ * with other remote rtp source.
+ * it's enough because the session just needs to know if they are all
+ * equals or not
+ */
+ data.is_doing_ptp = TRUE;
+ data.new_addr = NULL;
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) compare_rtp_source_addr, (gpointer) & data);
+ is_doing_rtp_ptp = data.is_doing_ptp;
+
+ /* same but about rtcp */
+ data.is_doing_ptp = TRUE;
+ data.new_addr = NULL;
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
+ is_doing_rtcp_ptp = data.is_doing_ptp;
+
+ /* the session is doing point-to-point if all rtp remote have the same
+ * ip addr and if all rtcp remote sources have the same ip addr */
+ sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
+
+ GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
}
static void
if (sess->suggested_ssrc != src->ssrc)
sess->suggested_ssrc = src->ssrc;
}
+
+ /* update point-to-point status */
+ if (!src->internal)
+ session_update_ptp (sess);
+}
+
+static RTPSource *
+find_source (RTPSession * sess, guint32 ssrc)
+{
+ return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (ssrc));
}
/* must be called with the session lock, the returned source needs to be
* unreffed after usage. */
static RTPSource *
obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
- RTPArrivalStats * arrival, gboolean rtp)
+ RTPPacketInfo * pinfo, gboolean rtp)
{
RTPSource *source;
g_object_set (source, "probation", 0, NULL);
/* store from address, if any */
- if (arrival->address) {
+ if (pinfo->address) {
if (rtp)
- rtp_source_set_rtp_from (source, arrival->address);
+ rtp_source_set_rtp_from (source, pinfo->address);
else
- rtp_source_set_rtcp_from (source, arrival->address);
+ rtp_source_set_rtcp_from (source, pinfo->address);
}
/* configure a callback on the source */
} else {
*created = FALSE;
/* check for collision, this updates the address when not previously set */
- if (check_collision (sess, source, arrival, rtp)) {
+ if (check_collision (sess, source, pinfo, rtp)) {
return NULL;
}
/* Receiving RTCP packets of an SSRC is a strong indication that we
g_object_set (source, "probation", 0, NULL);
}
/* update last activity */
- source->last_activity = arrival->current_time;
+ source->last_activity = pinfo->current_time;
if (rtp)
- source->last_rtp_activity = arrival->current_time;
+ source->last_rtp_activity = pinfo->current_time;
g_object_ref (source);
return source;
/* must be called with the session lock, the returned source needs to be
* unreffed after usage. */
static RTPSource *
-obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
+obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
+ GstClockTime current_time)
{
RTPSource *source;
source->validated = TRUE;
source->internal = TRUE;
+ source->probation = FALSE;
rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
rtp_source_set_callbacks (source, &callbacks, sess);
} else {
*created = FALSE;
}
+ /* update last activity */
+ if (current_time != GST_CLOCK_TIME_NONE) {
+ source->last_activity = current_time;
+ source->last_rtp_activity = current_time;
+ }
g_object_ref (source);
return source;
RTP_SESSION_LOCK (sess);
result = find_source (sess, ssrc);
- if (result)
+ if (result != NULL)
g_object_ref (result);
RTP_SESSION_UNLOCK (sess);
return source;
}
-/* update the RTPArrivalStats structure with the current time and other bits
- * about the current buffer we are handling.
- * This function is typically called when a validated packet is received.
- * This function should be called with the SESSION_LOCK
- */
-static void
-update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
- gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
- GstClockTime running_time, guint64 ntpnstime)
+static gboolean
+update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
{
GstNetAddressMeta *meta;
- GstRTPBuffer rtpb = { NULL };
-
- /* get time of arrival */
- arrival->current_time = current_time;
- arrival->running_time = running_time;
- arrival->ntpnstime = ntpnstime;
/* get packet size including header overhead */
- arrival->bytes = gst_buffer_get_size (buffer) + sess->header_len;
+ pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
+ pinfo->packets++;
+
+ if (pinfo->rtp) {
+ GstRTPBuffer rtp = { NULL };
+
+ if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
+ goto invalid_packet;
+
+ pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
+ if (idx == 0) {
+ gint i;
+
+ /* only keep info for first buffer */
+ pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
+ pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
+ pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ /* copy available csrc */
+ pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
+ for (i = 0; i < pinfo->csrc_count; i++)
+ pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
+ }
+ gst_rtp_buffer_unmap (&rtp);
+ }
- if (rtp) {
- gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpb);
- arrival->payload_len = gst_rtp_buffer_get_payload_len (&rtpb);
- gst_rtp_buffer_unmap (&rtpb);
- } else {
- arrival->payload_len = 0;
+ if (idx == 0) {
+ /* for netbuffer we can store the IP address to check for collisions */
+ meta = gst_buffer_get_net_address_meta (*buffer);
+ if (pinfo->address)
+ g_object_unref (pinfo->address);
+ if (meta) {
+ pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
+ } else {
+ pinfo->address = NULL;
+ }
}
+ return TRUE;
- /* for netbuffer we can store the IP address to check for collisions */
- meta = gst_buffer_get_net_address_meta (buffer);
- if (arrival->address)
- g_object_unref (arrival->address);
- if (meta) {
- arrival->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
+ /* ERRORS */
+invalid_packet:
+ {
+ GST_DEBUG ("invalid RTP packet received");
+ return FALSE;
+ }
+}
+
+/* update the RTPPacketInfo structure with the current time and other bits
+ * about the current buffer we are handling.
+ * This function is typically called when a validated packet is received.
+ * This function should be called with the SESSION_LOCK
+ */
+static gboolean
+update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
+ gboolean send, gboolean rtp, gboolean is_list, gpointer data,
+ GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
+{
+ gboolean res;
+
+ pinfo->send = send;
+ pinfo->rtp = rtp;
+ pinfo->is_list = is_list;
+ pinfo->data = data;
+ pinfo->current_time = current_time;
+ pinfo->running_time = running_time;
+ pinfo->ntpnstime = ntpnstime;
+ pinfo->header_len = sess->header_len;
+ pinfo->bytes = 0;
+ pinfo->payload_len = 0;
+ pinfo->packets = 0;
+
+ if (is_list) {
+ GstBufferList *list = GST_BUFFER_LIST_CAST (data);
+ res =
+ gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
+ pinfo);
} else {
- arrival->address = NULL;
+ GstBuffer *buffer = GST_BUFFER_CAST (data);
+ res = update_packet (&buffer, 0, pinfo);
}
+ return res;
}
static void
-clean_arrival_stats (RTPArrivalStats * arrival)
+clean_packet_info (RTPPacketInfo * pinfo)
{
- if (arrival->address)
- g_object_unref (arrival->address);
+ if (pinfo->address)
+ g_object_unref (pinfo->address);
+ if (pinfo->data) {
+ gst_mini_object_unref (pinfo->data);
+ pinfo->data = NULL;
+ }
}
static gboolean
*/
GstFlowReturn
rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
- GstClockTime current_time, GstClockTime running_time)
+ GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
{
GstFlowReturn result;
guint32 ssrc;
RTPSource *source;
gboolean created;
gboolean prevsender, prevactive;
- RTPArrivalStats arrival = { NULL, };
- guint32 csrcs[16];
- guint8 i, count;
+ RTPPacketInfo pinfo = { 0, };
guint64 oldrate;
- GstRTPBuffer rtp = { NULL };
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
- if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
- goto invalid_packet;
-
- /* get SSRC to look up in session database */
- ssrc = gst_rtp_buffer_get_ssrc (&rtp);
- /* copy available csrc for later */
- count = gst_rtp_buffer_get_csrc_count (&rtp);
- /* make sure to not overflow our array. An RTP buffer can maximally contain
- * 16 CSRCs */
- count = MIN (count, 16);
-
- for (i = 0; i < count; i++)
- csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
-
- gst_rtp_buffer_unmap (&rtp);
-
RTP_SESSION_LOCK (sess);
- /* update arrival stats */
- update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
- running_time, -1);
+ /* update pinfo stats */
+ if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
+ current_time, running_time, ntpnstime)) {
+ GST_DEBUG ("invalid RTP packet received");
+ RTP_SESSION_UNLOCK (sess);
+ return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
+ }
+
+ ssrc = pinfo.ssrc;
- source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
+ source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
if (!source)
goto collision;
oldrate = source->bitrate;
/* let source process the packet */
- result = rtp_source_process_rtp (source, buffer, &arrival);
+ result = rtp_source_process_rtp (source, &pinfo);
/* source became active */
if (source_update_active (sess, source, prevactive))
if (source->validated) {
gboolean created;
+ gint i;
/* for validated sources, we add the CSRCs as well */
- for (i = 0; i < count; i++) {
+ for (i = 0; i < pinfo.csrc_count; i++) {
guint32 csrc;
RTPSource *csrc_src;
- csrc = csrcs[i];
+ csrc = pinfo.csrcs[i];
/* get source */
- csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+ csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
if (!csrc_src)
continue;
RTP_SESSION_UNLOCK (sess);
- clean_arrival_stats (&arrival);
+ clean_packet_info (&pinfo);
return result;
/* ERRORS */
-invalid_packet:
- {
- gst_buffer_unref (buffer);
- GST_DEBUG ("invalid RTP packet received");
- return GST_FLOW_OK;
- }
collision:
{
RTP_SESSION_UNLOCK (sess);
- gst_buffer_unref (buffer);
- clean_arrival_stats (&arrival);
+ clean_packet_info (&pinfo);
GST_DEBUG ("ignoring packet because its collisioning");
return GST_FLOW_OK;
}
static void
rtp_session_process_rb (RTPSession * sess, RTPSource * source,
- GstRTCPPacket * packet, RTPArrivalStats * arrival)
+ GstRTCPPacket * packet, RTPPacketInfo * pinfo)
{
guint count, i;
* the sender of the RTCP message. We could also compare our stats against
* the other sender to see if we are better or worse. */
/* FIXME, need to keep track who the RB block is from */
- rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
+ rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
packetslost, exthighestseq, jitter, lsr, dlsr);
}
}
*/
static void
rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival, gboolean * do_sync)
+ RTPPacketInfo * pinfo, gboolean * do_sync)
{
guint32 senderssrc, rtptime, packet_count, octet_count;
guint64 ntptime;
&packet_count, &octet_count);
GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
- senderssrc, GST_TIME_ARGS (arrival->current_time));
+ senderssrc, GST_TIME_ARGS (pinfo->current_time));
- source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto out;
+
/* don't try to do lip-sync for sources that sent a BYE */
if (RTP_SOURCE_IS_MARKED_BYE (source))
*do_sync = FALSE;
prevsender = RTP_SOURCE_IS_SENDER (source);
/* first update the source */
- rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
+ rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
packet_count, octet_count);
source_update_sender (sess, source, prevsender);
if (created)
on_new_ssrc (sess, source);
- rtp_session_process_rb (sess, source, packet, arrival);
+ rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
g_object_unref (source);
}
*/
static void
rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
guint32 senderssrc;
RTPSource *source;
GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
- source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto out;
+
if (created)
on_new_ssrc (sess, source);
- rtp_session_process_rb (sess, source, packet, arrival);
+ rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
g_object_unref (source);
}
/* Get SDES items and store them in the SSRC */
static void
rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
guint items, i, j;
gboolean more_items, more_entries;
changed = FALSE;
/* find src, no probation when dealing with RTCP */
- source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto next;
+
sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
more_entries = gst_rtcp_packet_sdes_first_entry (packet);
if (changed)
on_ssrc_sdes (sess, source);
+ next:
g_object_unref (source);
more_items = gst_rtcp_packet_sdes_next_item (packet);
*/
static void
rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
guint count, i;
gchar *reason;
GST_DEBUG ("SSRC: %08x", ssrc);
/* find src and mark bye, no probation when dealing with RTCP */
- source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
if (!source)
return;
}
/* store time for when we need to time out this source */
- source->bye_time = arrival->current_time;
+ source->bye_time = pinfo->current_time;
prevactive = RTP_SOURCE_IS_ACTIVE (source);
prevsender = RTP_SOURCE_IS_SENDER (source);
* Perform reverse reconsideration but only when we are not scheduling a
* BYE ourselves. */
if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
- arrival->current_time < sess->next_rtcp_check_time) {
+ pinfo->current_time < sess->next_rtcp_check_time) {
GstClockTime time_remaining;
- time_remaining = sess->next_rtcp_check_time - arrival->current_time;
+ time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
sess->next_rtcp_check_time =
gst_util_uint64_scale (time_remaining, members, pmembers);
GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
GST_TIME_ARGS (sess->next_rtcp_check_time));
- sess->next_rtcp_check_time += arrival->current_time;
+ sess->next_rtcp_check_time += pinfo->current_time;
/* mark pending reconsider. We only want to signal the reconsideration
* once after we handled all the source in the bye packet */
static void
rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
GST_DEBUG ("received APP");
}
"RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
fir ? "FIR" : "PLI",
GST_TIME_ARGS (current_time - sess->last_keyframe_request),
- GST_TIME_ARGS (round_trip_in_ns));;
+ GST_TIME_ARGS (round_trip_in_ns));
return FALSE;
}
}
return;
src = find_source (sess, sender_ssrc);
- if (!src)
+ if (src == NULL)
return;
rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
+
+ src->stats.recv_pli_count++;
}
static void
ssrc = GST_READ_UINT32_BE (data);
own = find_source (sess, ssrc);
+ if (own == NULL)
+ continue;
+
if (own->internal) {
our_request = TRUE;
break;
return;
rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
+ src->stats.recv_fir_count++;
}
static void
guint32 media_ssrc, guint8 * fci_data, guint fci_length,
GstClockTime current_time)
{
+ sess->stats.nacks_received++;
+
if (!sess->callbacks.notify_nack)
return;
seqnum = GST_READ_UINT16_BE (fci_data);
blp = GST_READ_UINT16_BE (fci_data + 2);
- GST_DEBUG ("NACK #%u, blp %04x", seqnum, blp);
+ GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
RTP_SESSION_UNLOCK (sess);
- sess->callbacks.notify_nack (sess, seqnum, blp,
+ sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
sess->notify_nack_user_data);
RTP_SESSION_LOCK (sess);
static void
rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival, GstClockTime current_time)
+ RTPPacketInfo * pinfo, GstClockTime current_time)
{
GstRTCPType type = gst_rtcp_packet_get_type (packet);
GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
RTPSource *src;
+ src = find_source (sess, media_ssrc);
+
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
+ return;
+
GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
"length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
fci_length);
- GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
+ GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
}
RTP_SESSION_UNLOCK (sess);
gst_buffer_unref (fci_buffer);
}
- src = find_source (sess, media_ssrc);
- if (!src)
- return;
-
- if (sess->rtcp_feedback_retention_window) {
- rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
+ if (src && sess->rtcp_feedback_retention_window) {
+ rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
}
- if (src->internal ||
+ if ((src && src->internal) ||
/* PSFB FIR puts the media ssrc inside the FCI */
(type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
switch (type) {
{
GstRTCPPacket packet;
gboolean more, is_bye = FALSE, do_sync = FALSE;
- RTPArrivalStats arrival = { NULL, };
+ RTPPacketInfo pinfo = { 0, };
GstFlowReturn result = GST_FLOW_OK;
GstRTCPBuffer rtcp = { NULL, };
GST_DEBUG ("received RTCP packet");
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
+ buffer);
+
RTP_SESSION_LOCK (sess);
- /* update arrival stats */
- update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
- ntpnstime);
+ /* update pinfo stats */
+ update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
+ -1, ntpnstime);
/* start processing the compound packet */
gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
type = gst_rtcp_packet_get_type (&packet);
- /* when we are leaving the session, we should ignore all non-BYE messages */
- if (sess->scheduled_bye && type != GST_RTCP_TYPE_BYE) {
- GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
- goto next;
- }
-
switch (type) {
case GST_RTCP_TYPE_SR:
- rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
+ rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
break;
case GST_RTCP_TYPE_RR:
- rtp_session_process_rr (sess, &packet, &arrival);
+ rtp_session_process_rr (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_SDES:
- rtp_session_process_sdes (sess, &packet, &arrival);
+ rtp_session_process_sdes (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_BYE:
is_bye = TRUE;
/* don't try to attempt lip-sync anymore for streams with a BYE */
do_sync = FALSE;
- rtp_session_process_bye (sess, &packet, &arrival);
+ rtp_session_process_bye (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_APP:
- rtp_session_process_app (sess, &packet, &arrival);
+ rtp_session_process_app (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_RTPFB:
case GST_RTCP_TYPE_PSFB:
- rtp_session_process_feedback (sess, &packet, &arrival, current_time);
+ rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
break;
default:
GST_WARNING ("got unknown RTCP packet");
break;
}
- next:
more = gst_rtcp_packet_move_to_next (&packet);
}
/* if we are scheduling a BYE, we only want to count bye packets, else we
* count everything */
- if (sess->scheduled_bye) {
- if (is_bye) {
- sess->stats.bye_members++;
- UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
- }
- } else {
- /* keep track of average packet size */
- UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
+ if (sess->scheduled_bye && is_bye) {
+ sess->bye_stats.bye_members++;
+ UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
}
+
+ /* keep track of average packet size */
+ UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
+
GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
- sess->stats.avg_rtcp_packet_size, arrival.bytes);
+ sess->stats.avg_rtcp_packet_size, pinfo.bytes);
RTP_SESSION_UNLOCK (sess);
- clean_arrival_stats (&arrival);
+ pinfo.data = NULL;
+ clean_packet_info (&pinfo);
/* notify caller of sr packets in the callback */
if (do_sync && sess->callbacks.sync_rtcp) {
gboolean created;
RTP_SESSION_LOCK (sess);
- source = obtain_internal_source (sess, ssrc, &created);
+ source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
if (source) {
rtp_source_update_caps (source, caps);
g_object_unref (source);
}
+
+ if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
+ source =
+ obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
+ if (source) {
+ rtp_source_update_caps (source, caps);
+ g_object_unref (source);
+ }
+ }
RTP_SESSION_UNLOCK (sess);
}
}
RTPSource *source;
gboolean prevsender;
guint64 oldrate;
- GstBuffer *buffer;
- GstRTPBuffer rtp = { NULL };
- guint32 ssrc;
+ RTPPacketInfo pinfo = { 0, };
gboolean created;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
- if (is_list) {
- GstBufferList *list = GST_BUFFER_LIST_CAST (data);
-
- buffer = gst_buffer_list_get (list, 0);
- if (!buffer)
- goto no_buffer;
- } else {
- buffer = GST_BUFFER_CAST (data);
- }
-
- if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
- goto invalid_packet;
-
- /* get SSRC and look up in session database */
- ssrc = gst_rtp_buffer_get_ssrc (&rtp);
-
- gst_rtp_buffer_unmap (&rtp);
-
RTP_SESSION_LOCK (sess);
- source = obtain_internal_source (sess, ssrc, &created);
+ if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
+ current_time, running_time, -1))
+ goto invalid_packet;
- /* update last activity */
- source->last_rtp_activity = current_time;
+ source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
prevsender = RTP_SOURCE_IS_SENDER (source);
oldrate = source->bitrate;
/* we use our own source to send */
- result = rtp_source_send_rtp (source, data, is_list, running_time);
+ result = rtp_source_send_rtp (source, &pinfo);
source_update_sender (sess, source, prevsender);
RTP_SESSION_UNLOCK (sess);
g_object_unref (source);
+ clean_packet_info (&pinfo);
return result;
invalid_packet:
{
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+ RTP_SESSION_UNLOCK (sess);
GST_DEBUG ("invalid RTP packet received");
return GST_FLOW_OK;
}
-no_buffer:
- {
- gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
- GST_DEBUG ("no buffer in list");
- return GST_FLOW_OK;
- }
}
static void
gboolean first)
{
GstClockTime result;
+ RTPSessionStats *stats;
/* recalculate bandwidth when it changed */
if (sess->recalc_bandwidth) {
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) add_bitrates, &bandwidth);
- bandwidth /= 8.0;
}
- if (bandwidth < 8000)
+ if (bandwidth < RTP_STATS_BANDWIDTH)
bandwidth = RTP_STATS_BANDWIDTH;
rtp_stats_set_bandwidths (&sess->stats, bandwidth,
}
if (sess->scheduled_bye) {
- result = rtp_stats_calculate_bye_interval (&sess->stats);
+ stats = &sess->bye_stats;
+ result = rtp_stats_calculate_bye_interval (stats);
} else {
- result = rtp_stats_calculate_rtcp_interval (&sess->stats,
- sess->stats.internal_sender_sources > 0, first);
+ stats = &sess->stats;
+ result = rtp_stats_calculate_rtcp_interval (stats,
+ stats->internal_sender_sources > 0, first);
}
GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
GST_TIME_ARGS (result), first);
if (!deterministic && result != GST_CLOCK_TIME_NONE)
- result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+ result = rtp_stats_add_rtcp_jitter (stats, result);
GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
/* we schedule BYE now */
sess->scheduled_bye = TRUE;
/* at least one member wants to send a BYE */
- INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
- sess->stats.bye_members = 1;
+ memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
+ INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
+ sess->bye_stats.bye_members = 1;
sess->first_rtcp = TRUE;
sess->allow_early = TRUE;
GstFlowReturn
rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
{
- GstFlowReturn result = GST_FLOW_OK;
+ GstFlowReturn result;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
RTP_SESSION_LOCK (sess);
if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+ GST_DEBUG ("have early rtcp time");
result = sess->next_early_rtcp_time;
goto early_exit;
}
}
if (sess->scheduled_bye) {
- if (sess->stats.active_sources >= 50) {
+ if (sess->bye_stats.active_sources >= 50) {
GST_DEBUG ("reconsider BYE, more than 50 sources");
/* reconsider BYE if members >= 50 */
interval = calculate_rtcp_interval (sess, FALSE, TRUE);
gboolean is_early;
gboolean may_suppress;
GQueue output;
+ guint nacked_seqnums;
} ReportData;
static void
return;
}
- /* only report about other sender */
- if (source == data->source)
- goto reported;
+ if (g_hash_table_contains (source->reported_in_sr_of,
+ GUINT_TO_POINTER (data->source->ssrc))) {
+ GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
+ return;
+ }
if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
GST_DEBUG ("max RB count reached");
return;
}
+ /* only report about other sender */
+ if (source == data->source)
+ goto reported;
+
if (!RTP_SOURCE_IS_SENDER (source)) {
GST_DEBUG ("source %08x not sender", source->ssrc);
goto reported;
exthighestseq, jitter, lsr, dlsr);
reported:
- /* source is reported, move to next generation */
- source->generation = sess->generation + 1;
-
- /* if we reported all sources in this generation, move to next */
- if (--data->num_to_report == 0) {
- sess->generation++;
- GST_DEBUG ("all reported, generation now %u", sess->generation);
- }
+ g_hash_table_add (source->reported_in_sr_of,
+ GUINT_TO_POINTER (data->source->ssrc));
}
/* construct FIR */
fci_data[1] = fci_data[2] = fci_data[3] = 0;
source->send_fir = FALSE;
+ source->stats.sent_fir_count++;
}
static void
source->send_pli = FALSE;
data->may_suppress = FALSE;
+
+ source->stats.sent_pli_count++;
}
/* construct NACK */
for (i = 0; i < n_nacks; i++) {
GST_WRITE_UINT32_BE (fci_data, nacks[i]);
fci_data += 4;
+ data->nacked_seqnums++;
}
rtp_source_clear_nacks (source);
if (source->internal) {
GST_DEBUG ("Timing out collisions for %x", source->ssrc);
rtp_source_timeout (source, data->current_time,
- /* "a relatively long time" -- RFC 3550 section 8.2 */
- RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
data->running_time - sess->rtcp_feedback_retention_window);
}
GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
GST_TIME_ARGS (binterval));
- if (!source->internal) {
- if (source->marked_bye) {
- /* if we received a BYE from the source, remove the source after some
- * time. */
- if (data->current_time > source->bye_time &&
- data->current_time - source->bye_time > sess->stats.bye_timeout) {
- GST_DEBUG ("removing BYE source %08x", source->ssrc);
- remove = TRUE;
- byetimeout = TRUE;
- }
+ if (!source->internal && source->marked_bye) {
+ /* if we received a BYE from the source, remove the source after some
+ * time. */
+ if (data->current_time > source->bye_time &&
+ data->current_time - source->bye_time > sess->stats.bye_timeout) {
+ GST_DEBUG ("removing BYE source %08x", source->ssrc);
+ remove = TRUE;
+ byetimeout = TRUE;
}
- /* sources that were inactive for more than 5 times the deterministic reporting
- * interval get timed out. the min timeout is 5 seconds. */
- /* mind old time that might pre-date last time going to PLAYING */
- btime = MAX (source->last_activity, sess->start_time);
- if (data->current_time > btime) {
- interval = MAX (binterval * 5, 5 * GST_SECOND);
- if (data->current_time - btime > interval) {
- GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
- source->ssrc, GST_TIME_ARGS (btime));
+ }
+
+ if (source->internal && source->sent_bye) {
+ GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
+ remove = TRUE;
+ }
+
+ /* sources that were inactive for more than 5 times the deterministic reporting
+ * interval get timed out. the min timeout is 5 seconds. */
+ /* mind old time that might pre-date last time going to PLAYING */
+ btime = MAX (source->last_activity, sess->start_time);
+ if (data->current_time > btime) {
+ interval = MAX (binterval * 5, 5 * GST_SECOND);
+ if (data->current_time - btime > interval) {
+ GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
+ source->ssrc, GST_TIME_ARGS (btime));
+ if (source->internal) {
+ /* this is an internal source that is not using our suggested ssrc.
+ * since there must be another source using this ssrc, we can remove
+ * this one instead of making it a receiver forever */
+ if (source->ssrc != sess->suggested_ssrc) {
+ rtp_source_mark_bye (source, "timed out");
+ /* do not schedule bye here, since we are inside the RTCP timeout
+ * processing and scheduling bye will interfere with SR/RR sending */
+ }
+ } else {
remove = TRUE;
}
}
if (data->current_time > btime) {
interval = MAX (binterval * 2, 5 * GST_SECOND);
if (data->current_time - btime > interval) {
- if (source->internal && source->sent_bye) {
- /* an internal source is BYE and stopped sending RTP, remove */
- GST_DEBUG ("internal BYE source %08x timed out, last %"
- GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
- remove = TRUE;
- } else {
- GST_DEBUG ("sender source %08x timed out and became receiver, last %"
- GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
- sendertimeout = TRUE;
- }
+ GST_DEBUG ("sender source %08x timed out and became receiver, last %"
+ GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
+ sendertimeout = TRUE;
}
}
}
static gboolean
is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
{
- GstClockTime new_send_time, elapsed;
+ GstClockTime new_send_time;
+ GstClockTime interval;
+ RTPSessionStats *stats;
+
+ if (sess->scheduled_bye)
+ stats = &sess->bye_stats;
+ else
+ stats = &sess->stats;
if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
data->is_early = TRUE;
else
data->is_early = FALSE;
- if (data->is_early && sess->next_early_rtcp_time < current_time)
+ if (data->is_early && sess->next_early_rtcp_time < current_time) {
+ GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
+ GST_TIME_ARGS (current_time));
goto early;
+ }
/* no need to check yet */
if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
return FALSE;
}
- /* get elapsed time since we last reported */
- elapsed = current_time - sess->last_rtcp_send_time;
+early:
- new_send_time = data->interval;
- /* perform forward reconsideration */
- if (new_send_time != GST_CLOCK_TIME_NONE) {
- new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, new_send_time);
+ /* take interval and add jitter */
+ interval = data->interval;
+ if (interval != GST_CLOCK_TIME_NONE)
+ interval = rtp_stats_add_rtcp_jitter (stats, interval);
- GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
- GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time),
- GST_TIME_ARGS (elapsed));
+ if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
+ /* perform forward reconsideration */
+ if (interval != GST_CLOCK_TIME_NONE) {
+ GstClockTime elapsed;
- new_send_time += sess->last_rtcp_send_time;
- }
+ /* get elapsed time since we last reported */
+ elapsed = current_time - sess->last_rtcp_send_time;
- /* check if reconsideration */
- if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
- GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
- GST_TIME_ARGS (new_send_time));
- /* store new check time */
- sess->next_rtcp_check_time = new_send_time;
- return FALSE;
+ GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
+ new_send_time = interval + sess->last_rtcp_send_time;
+ } else {
+ new_send_time = sess->last_rtcp_send_time;
+ }
+ } else {
+ /* If this is the first RTCP packet, we can reconsider anything based
+ * on the last RTCP send time because there was none.
+ */
+ g_warn_if_fail (!data->is_early);
+ data->is_early = FALSE;
+ new_send_time = current_time;
}
-early:
-
- new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
-
- GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
- GST_TIME_ARGS (new_send_time));
-
- sess->next_rtcp_check_time = new_send_time;
- if (new_send_time != GST_CLOCK_TIME_NONE) {
- sess->next_rtcp_check_time += current_time;
-
+ if (!data->is_early) {
+ /* check if reconsideration */
+ if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
+ GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+ /* store new check time */
+ sess->next_rtcp_check_time = new_send_time;
+ return FALSE;
+ }
+ sess->next_rtcp_check_time = current_time + interval;
+ } else if (interval != GST_CLOCK_TIME_NONE) {
/* Apply the rules from RFC 4585 section 3.5.3 */
- if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
+ if (stats->min_interval != 0 && !sess->first_rtcp) {
GstClockTime T_rr_current_interval =
- g_random_double_range (0.5, 1.5) * sess->stats.min_interval;
+ g_random_double_range (0.5, 1.5) * stats->min_interval;
/* This will caused the RTCP to be suppressed if no FB packets are added */
- if (sess->last_rtcp_send_time + T_rr_current_interval >
- sess->next_rtcp_check_time) {
+ if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
" last: %" GST_TIME_FORMAT
" + T_rr_current_interval: %" GST_TIME_FORMAT
- " > sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (sess->stats.min_interval),
+ " > new_send_time: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stats->min_interval),
GST_TIME_ARGS (sess->last_rtcp_send_time),
GST_TIME_ARGS (T_rr_current_interval),
- GST_TIME_ARGS (sess->next_rtcp_check_time));
+ GST_TIME_ARGS (new_send_time));
data->may_suppress = TRUE;
}
}
}
+ GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+
return TRUE;
}
if (!source->internal || source->sent_bye)
return;
+ /* ignore other sources when we do the timeout after a scheduled BYE */
+ if (sess->scheduled_bye && !source->marked_bye)
+ return;
+
data->source = source;
/* open packet */
g_queue_push_tail (&data->output, output);
}
+static void
+update_generation (const gchar * key, RTPSource * source, ReportData * data)
+{
+ RTPSession *sess = data->sess;
+
+ if (g_hash_table_size (source->reported_in_sr_of) >=
+ sess->stats.internal_sources) {
+ /* source is reported, move to next generation */
+ source->generation = sess->generation + 1;
+ g_hash_table_remove_all (source->reported_in_sr_of);
+
+ GST_LOG ("reported source %x, new generation: %d", source->ssrc,
+ source->generation);
+
+ /* if we reported all sources in this generation, move to next */
+ if (--data->num_to_report == 0) {
+ sess->generation++;
+ GST_DEBUG ("all reported, generation now %u", sess->generation);
+ }
+ }
+}
+
/**
* rtp_session_on_timeout:
* @sess: an #RTPSession
data.running_time = running_time;
data.num_to_report = 0;
data.may_suppress = FALSE;
+ data.nacked_seqnums = 0;
g_queue_init (&data.output);
RTP_SESSION_LOCK (sess);
/* get a new interval, we need this for various cleanups etc */
data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
+ GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
+
/* we need an internal source now */
if (sess->stats.internal_sources == 0) {
RTPSource *source;
gboolean created;
- source = obtain_internal_source (sess, sess->suggested_ssrc, &created);
+ source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
+ current_time);
g_object_unref (source);
}
+ sess->conflicting_addresses =
+ timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
+
/* Make a local copy of the hashtable. We need to do this because the
* cleanup stage below releases the session lock. */
table_copy = g_hash_table_new_full (NULL, NULL, NULL,
g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
(GHRFunc) remove_closing_sources, &data);
+ /* update point-to-point status */
+ session_update_ptp (sess);
+
/* see if we need to generate SR or RR packets */
if (!is_rtcp_time (sess, current_time, &data))
goto done;
- GST_DEBUG ("doing RTCP generation %u for %u sources", sess->generation,
- data.num_to_report);
+ GST_DEBUG ("doing RTCP generation %u for %u sources, early %d",
+ sess->generation, data.num_to_report, data.is_early);
/* generate RTCP for all internal sources */
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) generate_rtcp, &data);
+ /* update the generation for all the sources that have been reported */
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) update_generation, &data);
+
/* we keep track of the last report time in order to timeout inactive
* receivers or senders */
if (!data.is_early && !data.may_suppress)
sess->last_rtcp_send_time = data.current_time;
sess->first_rtcp = FALSE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
+ sess->scheduled_bye = FALSE;
+
+ /* RFC 4585 section 3.5.2 step 6 */
+ if (!data.is_early) {
+ sess->allow_early = TRUE;
+ }
done:
RTP_SESSION_UNLOCK (sess);
result =
sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
sess->send_rtcp_user_data);
+ sess->stats.nacks_sent += data.nacked_seqnums;
} else {
GST_DEBUG ("freeing packet callback: %p"
" do_not_suppress: %d may_suppress: %d",
sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
+ sess->stats.nacks_dropped += data.nacked_seqnums;
gst_buffer_unref (buffer);
}
g_object_unref (source);
* @max_delay: maximum delay
*
* Request transmission of early RTCP
+ *
+ * Returns: %TRUE if the related RTCP can be scheduled.
*/
-void
+gboolean
rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
GstClockTime max_delay)
{
- GstClockTime T_dither_max;
+ GstClockTime T_dither_max, T_rr;
+ gboolean ret;
/* Implements the algorithm described in RFC 4585 section 3.5.2 */
/* Check if already requested */
/* RFC 4585 section 3.5.2 step 2 */
- if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
- goto dont_send;
+ if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+ GST_LOG_OBJECT (sess, "already have next early rtcp time");
+ ret = TRUE;
+ goto end;
+ }
+
+ if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
+ GST_LOG_OBJECT (sess, "no next RTCP check time");
+ ret = FALSE;
+ goto end;
+ }
- if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time))
- goto dont_send;
+ /* RFC 4585 section 3.5.3 step 1
+ * If no regular RTCP packet has been sent before, then a regular
+ * RTCP packet has to be scheduled first and FB messages might be
+ * included there
+ */
+ if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
+ GST_LOG_OBJECT (sess, "no RTCP sent yet");
+
+ if (current_time + max_delay > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+ GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ } else {
+ GST_LOG_OBJECT (sess,
+ "can't allow early feedback, next scheduled time is too late %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = FALSE;
+ }
+ goto end;
+ }
- /* Ignore the request a scheduled packet will be in time anyway */
- if (current_time + max_delay > sess->next_rtcp_check_time)
- goto dont_send;
+ T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
/* RFC 4585 section 3.5.2 step 2b */
/* If the total sources is <=2, then there is only us and one peer */
- if (sess->total_sources <= 2) {
+ /* When there is one auxiliary stream the session can still do point
+ * to point.
+ */
+ if (sess->is_doing_ptp) {
T_dither_max = 0;
} else {
/* Divide by 2 because l = 0.5 */
- T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+ T_dither_max = T_rr;
T_dither_max /= 2;
}
/* RFC 4585 section 3.5.2 step 3 */
- if (current_time + T_dither_max > sess->next_rtcp_check_time)
- goto dont_send;
-
- /* RFC 4585 section 3.5.2 step 4
- * Don't send if allow_early is FALSE, but not if we are in
- * immediate mode, meaning we are part of a group of at most the
- * application-specific threshold.
- */
- if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
- sess->allow_early == FALSE)
- goto dont_send;
+ if (current_time + T_dither_max > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "don't send because of dither, next scheduled time is soon %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ goto end;
+ }
+
+ /* RFC 4585 section 3.5.2 step 4a */
+ if (sess->allow_early == FALSE) {
+ /* Ignore the request a scheduled packet will be in time anyway */
+ if (current_time + max_delay > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+ GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ } else {
+ GST_LOG_OBJECT (sess,
+ "can't allow early feedback, next scheduled time is too late %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = FALSE;
+ }
+ goto end;
+ }
+ /* RFC 4585 section 3.5.2 step 4b */
if (T_dither_max) {
/* Schedule an early transmission later */
sess->next_early_rtcp_time = g_random_double () * T_dither_max +
sess->next_early_rtcp_time = current_time;
}
+ /* RFC 4585 section 3.5.2 step 6 */
+ sess->allow_early = FALSE;
+ /* Delay next regular RTCP packet to not exceed the short-term
+ * RTCP bandwidth when using early feedback as compared to
+ * without */
+ sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
+ sess->last_rtcp_send_time += T_rr;
+
+ GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
+ ", next regular RTCP time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (sess->next_early_rtcp_time),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
RTP_SESSION_UNLOCK (sess);
/* notify app of need to send packet early
if (sess->callbacks.reconsider)
sess->callbacks.reconsider (sess, sess->reconsider_user_data);
- return;
+ return TRUE;
-dont_send:
+end:
RTP_SESSION_UNLOCK (sess);
+
+ return ret;
}
-static void
+static gboolean
rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
{
GstClockTime now;
if (!sess->callbacks.send_rtcp)
- return;
+ return FALSE;
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
- rtp_session_request_early_rtcp (sess, now, max_delay);
+ return rtp_session_request_early_rtcp (sess, now, max_delay);
}
gboolean
rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
gboolean fir, gint count)
{
- RTPSource *src = find_source (sess, ssrc);
+ RTPSource *src;
- if (!src)
+ if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+ GST_DEBUG ("FIR/PLI not sent");
return FALSE;
+ }
+
+ RTP_SESSION_LOCK (sess);
+ src = find_source (sess, ssrc);
+ if (src == NULL)
+ goto no_source;
if (fir) {
src->send_pli = FALSE;
} else if (!src->send_fir) {
src->send_pli = TRUE;
}
-
- rtp_session_send_rtcp (sess, 200 * GST_MSECOND);
+ RTP_SESSION_UNLOCK (sess);
return TRUE;
+
+ /* ERRORS */
+no_source:
+ {
+ RTP_SESSION_UNLOCK (sess);
+ return FALSE;
+ }
}
/**
rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
GstClockTime max_delay)
{
- RTPSource *source = find_source (sess, ssrc);
+ RTPSource *source;
- if (source == NULL)
+ if (!rtp_session_send_rtcp (sess, max_delay)) {
+ GST_DEBUG ("NACK not sent");
return FALSE;
+ }
+
+ RTP_SESSION_LOCK (sess);
+ source = find_source (sess, ssrc);
+ if (source == NULL)
+ goto no_source;
GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
rtp_source_register_nack (source, seqnum);
-
- rtp_session_send_rtcp (sess, max_delay);
+ RTP_SESSION_UNLOCK (sess);
return TRUE;
+
+ /* ERRORS */
+no_source:
+ {
+ RTP_SESSION_UNLOCK (sess);
+ return FALSE;
+ }
}