#include <gst/glib-compat-private.h>
-#include "gstrtpbin-marshal.h"
#include "rtpsession.h"
GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
SIGNAL_ON_SENDING_RTCP,
SIGNAL_ON_FEEDBACK_RTCP,
SIGNAL_SEND_RTCP,
+ SIGNAL_SEND_RTCP_FULL,
+ SIGNAL_ON_RECEIVING_RTCP,
LAST_SIGNAL
};
#define DEFAULT_INTERNAL_SOURCE NULL
-#define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH
-#define DEFAULT_RTCP_FRACTION (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
+#define DEFAULT_BANDWIDTH 0.0
+#define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_FRACTION
#define DEFAULT_RTCP_RR_BANDWIDTH -1
#define DEFAULT_RTCP_RS_BANDWIDTH -1
#define DEFAULT_RTCP_MTU 1400
PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
PROP_PROBATION,
- PROP_LAST
+ PROP_STATS
};
/* update average packet size */
(avg) = ((val) + (15 * (avg))) >> 4;
-/* The number RTCP intervals after which to timeout entries in the
- * collision table
- */
-#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
-
/* GObject vmethods */
static void rtp_session_finalize (GObject * object);
static void rtp_session_set_property (GObject * object, guint prop_id,
static void rtp_session_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static gboolean rtp_session_on_sending_rtcp (RTPSession * sess,
- GstBuffer * buffer, gboolean early);
-static void rtp_session_send_rtcp (RTPSession * sess,
- GstClockTimeDiff max_delay);
-
+static gboolean rtp_session_send_rtcp (RTPSession * sess,
+ GstClockTime max_delay);
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
+static guint32 rtp_session_create_new_ssrc (RTPSession * sess);
static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
- gboolean * created, RTPArrivalStats * arrival, gboolean rtp);
+ gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
+static RTPSource *obtain_internal_source (RTPSession * sess,
+ guint32 ssrc, gboolean * created, GstClockTime current_time);
static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
- const gchar * reason, GstClockTime current_time);
+ GstClockTime current_time);
static GstClockTime calculate_rtcp_interval (RTPSession * sess,
gboolean deterministic, gboolean first);
rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
- get_source_by_ssrc), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
+ get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
/**
rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
- accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__BOXED_BOOLEAN,
- G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE,
- G_TYPE_BOOLEAN);
+ accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
+ GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
/**
* RTPSession::on-feedback-rtcp:
rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
- NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_BOXED,
- G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
- GST_TYPE_BUFFER);
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
+ G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
/**
* RTPSession::send-rtcp:
* Requests that the #RTPSession initiate a new RTCP packet as soon as
* possible within the requested delay.
*/
-
rtp_session_signals[SIGNAL_SEND_RTCP] =
g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
- gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
+ g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
+
+ /**
+ * RTPSession::send-rtcp-full:
+ * @session: the object which received the signal
+ * @max_delay: The maximum delay after which the feedback will not be useful
+ * anymore
+ *
+ * Requests that the #RTPSession initiate a new RTCP packet as soon as
+ * possible within the requested delay.
+ *
+ * Returns: TRUE if the new RTCP packet could be scheduled within the
+ * requested delay, FALSE otherwise.
+ *
+ * Since: 1.6
+ */
+ rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
+ g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
+
+ /**
+ * RTPSession::on-receiving-rtcp
+ * @session: the object which received the signal
+ * @buffer: the #GstBuffer containing the RTCP packet that was received
+ *
+ * This signal is emitted when receiving an RTCP packet before it is handled
+ * by the session. It can be used to extract custom information from RTCP packets.
+ *
+ * Since: 1.6
+ */
+ rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
+ g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
+ GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
g_param_spec_uint ("internal-ssrc", "Internal SSRC",
- "The internal SSRC used for the session",
+ "The internal SSRC used for the session (deprecated)",
0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
g_param_spec_object ("internal-source", "Internal Source",
- "The internal source element of the session",
+ "The internal source element of the session (deprecated)",
RTP_TYPE_SOURCE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
g_param_spec_uint ("rtcp-immediate-feedback-threshold",
"RTCP Immediate Feedback threshold",
"The maximum number of members of a RTP session for which immediate"
- " feedback is used",
+ " feedback is used (DEPRECATED: has no effect and is not needed)",
0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
g_object_class_install_property (gobject_class, PROP_PROBATION,
g_param_spec_uint ("probation", "Number of probations",
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * RTPSession::stats:
+ *
+ * Various session statistics. This property returns a GstStructure
+ * with name application/x-rtp-session-stats with the following fields:
+ *
+ * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
+ * dropped (due to bandwidth constraints)
+ * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
+ * "recv-nack-count" G_TYPE_UINT Number of NACKs received
+ *
+ * Since: 1.4
+ */
+ g_object_class_install_property (gobject_class, PROP_STATS,
+ g_param_spec_boxed ("stats", "Statistics",
+ "Various statistics", GST_TYPE_STRUCTURE,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
klass->get_source_by_ssrc =
GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
- klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
{
gint i;
gchar *str;
- GstStructure *sdes;
g_mutex_init (&sess->lock);
sess->key = g_random_int ();
sess->mask_idx = 0;
sess->mask = 0;
- for (i = 0; i < 32; i++) {
+ /* TODO: We currently only use the first hash table but this is the
+ * beginning of an implementation for RFC2762
+ for (i = 0; i < 32; i++) {
+ */
+ for (i = 0; i < 1; i++) {
sess->ssrcs[i] =
g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) g_object_unref);
}
rtp_stats_init_defaults (&sess->stats);
+ INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
+ rtp_stats_set_min_interval (&sess->stats,
+ (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
sess->recalc_bandwidth = TRUE;
sess->bandwidth = DEFAULT_BANDWIDTH;
sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH;
sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
- /* create an active SSRC for this session manager */
- sess->source = rtp_session_create_source (sess);
- sess->source->validated = TRUE;
- sess->source->internal = TRUE;
- sess->stats.active_sources++;
- INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
- sess->source->stats.prev_rtcptime = 0;
- sess->source->stats.last_rtcptime = 1;
-
- rtp_stats_set_min_interval (&sess->stats,
- (gdouble) DEFAULT_RTCP_MIN_INTERVAL / GST_SECOND);
-
/* default UDP header length */
sess->header_len = 28;
sess->mtu = DEFAULT_RTCP_MTU;
sess->probation = DEFAULT_PROBATION;
/* some default SDES entries */
- sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
+ sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
/* we do not want to leak details like the username or hostname here */
str = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
- gst_structure_set (sdes, "cname", G_TYPE_STRING, str, NULL);
+ gst_structure_set (sess->sdes, "cname", G_TYPE_STRING, str, NULL);
g_free (str);
#if 0
g_free (str);
#endif
- gst_structure_set (sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
+ gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
- /* and configure in the source */
- rtp_source_set_sdes_struct (sess->source, sdes);
+ /* this is the SSRC we suggest */
+ sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
sess->first_rtcp = TRUE;
sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
sess->allow_early = TRUE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
- GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
+ sess->is_doing_ptp = TRUE;
}
static void
sess = RTP_SESSION_CAST (object);
- g_mutex_clear (&sess->lock);
+ gst_structure_free (sess->sdes);
+
+ g_list_free_full (sess->conflicting_addresses,
+ (GDestroyNotify) rtp_conflicting_address_free);
- for (i = 0; i < 32; i++)
+ /* TODO: Change this again when implementing RFC 2762
+ * for (i = 0; i < 32; i++)
+ */
+ for (i = 0; i < 1; i++)
g_hash_table_destroy (sess->ssrcs[i]);
- g_object_unref (sess->source);
+ g_mutex_clear (&sess->lock);
G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
}
return res;
}
+static GstStructure *
+rtp_session_create_stats (RTPSession * sess)
+{
+ GstStructure *s;
+
+ s = gst_structure_new ("application/x-rtp-session-stats",
+ "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
+ "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
+ "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
+
+ return s;
+}
+
static void
rtp_session_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
switch (prop_id) {
case PROP_INTERNAL_SSRC:
- rtp_session_set_internal_ssrc (sess, g_value_get_uint (value));
+ RTP_SESSION_LOCK (sess);
+ sess->suggested_ssrc = g_value_get_uint (value);
+ RTP_SESSION_UNLOCK (sess);
+ if (sess->callbacks.reconfigure)
+ sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
break;
case PROP_BANDWIDTH:
RTP_SESSION_LOCK (sess);
break;
case PROP_PROBATION:
sess->probation = g_value_get_uint (value);
- g_object_set_property (G_OBJECT (sess->source), "probation", value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
switch (prop_id) {
case PROP_INTERNAL_SSRC:
- g_value_set_uint (value, rtp_session_get_internal_ssrc (sess));
+ g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
break;
case PROP_INTERNAL_SOURCE:
- g_value_take_object (value, rtp_session_get_internal_source (sess));
+ /* FIXME, return a random source */
+ g_value_set_object (value, NULL);
break;
case PROP_BANDWIDTH:
g_value_set_double (value, sess->bandwidth);
break;
case PROP_PROBATION:
g_value_set_uint (value, sess->probation);
- g_object_get_property (G_OBJECT (sess->source), "probation", value);
+ break;
+ case PROP_STATS:
+ g_value_take_boxed (value, rtp_session_create_stats (sess));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
sess->callbacks.request_time = callbacks->request_time;
sess->request_time_user_data = user_data;
}
+ if (callbacks->notify_nack) {
+ sess->callbacks.notify_nack = callbacks->notify_nack;
+ sess->notify_nack_user_data = user_data;
+ }
+ if (callbacks->reconfigure) {
+ sess->callbacks.reconfigure = callbacks->reconfigure;
+ sess->reconfigure_user_data = user_data;
+ }
}
/**
GstStructure *
rtp_session_get_sdes_struct (RTPSession * sess)
{
- const GstStructure *sdes;
GstStructure *result = NULL;
g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
RTP_SESSION_LOCK (sess);
- sdes = rtp_source_get_sdes_struct (sess->source);
- if (sdes)
- result = gst_structure_copy (sdes);
+ if (sess->sdes)
+ result = gst_structure_copy (sess->sdes);
RTP_SESSION_UNLOCK (sess);
return result;
g_return_if_fail (RTP_IS_SESSION (sess));
RTP_SESSION_LOCK (sess);
- rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes));
+ if (sess->sdes)
+ gst_structure_free (sess->sdes);
+ sess->sdes = gst_structure_copy (sdes);
RTP_SESSION_UNLOCK (sess);
}
{
GstFlowReturn result = GST_FLOW_OK;
- if (source == session->source) {
+ if (source->internal) {
GST_LOG ("source %08x pushed sender RTP packet", source->ssrc);
RTP_SESSION_UNLOCK (session);
(RTPSourceClockRate) source_clock_rate,
};
+
+/**
+ * rtp_session_find_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to check for
+ * @time: The time when the packet that is possibly in conflict arrived
+ *
+ * Checks if an address which has a conflict is already known. If it is
+ * a known conflict, remember the time
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+static gboolean
+rtp_session_find_conflicting_address (RTPSession * session,
+ GSocketAddress * address, GstClockTime time)
+{
+ return find_conflicting_address (session->conflicting_addresses, address,
+ time);
+}
+
+/**
+ * rtp_session_add_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to remember
+ * @time: The time when the packet that is in conflict arrived
+ *
+ * Adds a new conflict address
+ */
+static void
+rtp_session_add_conflicting_address (RTPSession * sess,
+ GSocketAddress * address, GstClockTime time)
+{
+ sess->conflicting_addresses =
+ add_conflicting_address (sess->conflicting_addresses, address, time);
+}
+
+
static gboolean
check_collision (RTPSession * sess, RTPSource * source,
- RTPArrivalStats * arrival, gboolean rtp)
+ RTPPacketInfo * pinfo, gboolean rtp)
{
- /* If we have no arrival address, we can't do collision checking */
- if (!arrival->address)
+ guint32 ssrc;
+
+ /* If we have no pinfo address, we can't do collision checking */
+ if (!pinfo->address)
return FALSE;
- if (sess->source != source) {
+ ssrc = rtp_source_get_ssrc (source);
+
+ if (!source->internal) {
GSocketAddress *from;
/* This is not our local source, but lets check if two remote
}
if (from) {
- if (__g_socket_address_equal (from, arrival->address)) {
+ if (__g_socket_address_equal (from, pinfo->address)) {
/* Address is the same */
return FALSE;
} else {
- GST_LOG ("we have a third-party collision or loop ssrc:%x",
- rtp_source_get_ssrc (source));
+ GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc);
if (sess->favor_new) {
if (rtp_source_find_conflicting_address (source,
- arrival->address, arrival->current_time)) {
+ pinfo->address, pinfo->current_time)) {
gchar *buf1;
- buf1 = __g_socket_address_to_string (arrival->address);
- GST_LOG ("Known conflict on %x for %s, dropping packet",
- rtp_source_get_ssrc (source), buf1);
+ buf1 = __g_socket_address_to_string (pinfo->address);
+ GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc,
+ buf1);
g_free (buf1);
return TRUE;
* a new source. Save old address in possible conflict list
*/
rtp_source_add_conflicting_address (source, from,
- arrival->current_time);
+ pinfo->current_time);
buf1 = __g_socket_address_to_string (from);
- buf2 = __g_socket_address_to_string (arrival->address);
+ buf2 = __g_socket_address_to_string (pinfo->address);
GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s,"
- " saving old as known conflict",
- rtp_source_get_ssrc (source), buf1, buf2);
+ " saving old as known conflict", ssrc, buf1, buf2);
if (rtp)
- rtp_source_set_rtp_from (source, arrival->address);
+ rtp_source_set_rtp_from (source, pinfo->address);
else
- rtp_source_set_rtcp_from (source, arrival->address);
+ rtp_source_set_rtcp_from (source, pinfo->address);
g_free (buf1);
g_free (buf2);
} else {
/* We don't already have a from address for RTP, just set it */
if (rtp)
- rtp_source_set_rtp_from (source, arrival->address);
+ rtp_source_set_rtp_from (source, pinfo->address);
else
- rtp_source_set_rtcp_from (source, arrival->address);
+ rtp_source_set_rtcp_from (source, pinfo->address);
return FALSE;
}
*/
} else {
/* This is sending with our ssrc, is it an address we already know */
-
- if (rtp_source_find_conflicting_address (source, arrival->address,
- arrival->current_time)) {
+ if (rtp_session_find_conflicting_address (sess, pinfo->address,
+ pinfo->current_time)) {
/* Its a known conflict, its probably a loop, not a collision
* lets just drop the incoming packet
*/
GST_DEBUG ("Our packets are being looped back to us, dropping");
} else {
/* Its a new collision, lets change our SSRC */
+ rtp_session_add_conflicting_address (sess, pinfo->address,
+ pinfo->current_time);
- rtp_source_add_conflicting_address (source, arrival->address,
- arrival->current_time);
+ GST_DEBUG ("Collision for SSRC %x", ssrc);
+ /* mark the source BYE */
+ rtp_source_mark_bye (source, "SSRC Collision");
+ /* if we were suggesting this SSRC, change to something else */
+ if (sess->suggested_ssrc == ssrc)
+ sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
- GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source));
on_ssrc_collision (sess, source);
- sess->change_ssrc = TRUE;
-
- rtp_session_schedule_bye_locked (sess, "SSRC Collision",
- arrival->current_time);
+ rtp_session_schedule_bye_locked (sess, pinfo->current_time);
}
}
return TRUE;
}
-static RTPSource *
-find_source (RTPSession * sess, guint32 ssrc)
+typedef struct
{
- return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (ssrc));
+ gboolean is_doing_ptp;
+ GSocketAddress *new_addr;
+} CompareAddrData;
+
+/* check if the two given ip addr are the same (do not care about the port) */
+static gboolean
+ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
+{
+ return
+ g_inet_address_equal (g_inet_socket_address_get_address
+ (G_INET_SOCKET_ADDRESS (a)),
+ g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
+}
+
+static void
+compare_rtp_source_addr (const gchar * key, RTPSource * source,
+ CompareAddrData * data)
+{
+ /* only compare ip addr of remote sources which are also not closing */
+ if (!source->internal && !source->closing && source->rtp_from) {
+ /* look for the first rtp source */
+ if (!data->new_addr)
+ data->new_addr = source->rtp_from;
+ /* compare current ip addr with the first one */
+ else
+ data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
+ }
+}
+
+static void
+compare_rtcp_source_addr (const gchar * key, RTPSource * source,
+ CompareAddrData * data)
+{
+ /* only compare ip addr of remote sources which are also not closing */
+ if (!source->internal && !source->closing && source->rtcp_from) {
+ /* look for the first rtcp source */
+ if (!data->new_addr)
+ data->new_addr = source->rtcp_from;
+ else
+ /* compare current ip addr with the first one */
+ data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
+ }
+}
+
+/* loop over our non-internal source to know if the session
+ * is doing point-to-point */
+static void
+session_update_ptp (RTPSession * sess)
+{
+ /* to know if the session is doing point to point, the ip addr
+ * of each non-internal (=remotes) source have to be compared
+ * to each other.
+ */
+ gboolean is_doing_rtp_ptp;
+ gboolean is_doing_rtcp_ptp;
+ CompareAddrData data;
+
+ /* compare the first remote source's ip addr that receive rtp packets
+ * with other remote rtp source.
+ * it's enough because the session just needs to know if they are all
+ * equals or not
+ */
+ data.is_doing_ptp = TRUE;
+ data.new_addr = NULL;
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) compare_rtp_source_addr, (gpointer) & data);
+ is_doing_rtp_ptp = data.is_doing_ptp;
+
+ /* same but about rtcp */
+ data.is_doing_ptp = TRUE;
+ data.new_addr = NULL;
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
+ is_doing_rtcp_ptp = data.is_doing_ptp;
+
+ /* the session is doing point-to-point if all rtp remote have the same
+ * ip addr and if all rtcp remote sources have the same ip addr */
+ sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
+
+ GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
}
static void
{
g_hash_table_insert (sess->ssrcs[sess->mask_idx],
GINT_TO_POINTER (src->ssrc), src);
+ /* report the new source ASAP */
+ src->generation = sess->generation;
/* we have one more source now */
sess->total_sources++;
+ if (RTP_SOURCE_IS_ACTIVE (src))
+ sess->stats.active_sources++;
+ if (src->internal) {
+ sess->stats.internal_sources++;
+ if (sess->suggested_ssrc != src->ssrc)
+ sess->suggested_ssrc = src->ssrc;
+ }
+
+ /* update point-to-point status */
+ if (!src->internal)
+ session_update_ptp (sess);
+}
+
+static RTPSource *
+find_source (RTPSession * sess, guint32 ssrc)
+{
+ return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (ssrc));
}
/* must be called with the session lock, the returned source needs to be
* unreffed after usage. */
static RTPSource *
obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
- RTPArrivalStats * arrival, gboolean rtp)
+ RTPPacketInfo * pinfo, gboolean rtp)
{
RTPSource *source;
g_object_set (source, "probation", 0, NULL);
/* store from address, if any */
- if (arrival->address) {
+ if (pinfo->address) {
if (rtp)
- rtp_source_set_rtp_from (source, arrival->address);
+ rtp_source_set_rtp_from (source, pinfo->address);
else
- rtp_source_set_rtcp_from (source, arrival->address);
+ rtp_source_set_rtcp_from (source, pinfo->address);
}
/* configure a callback on the source */
} else {
*created = FALSE;
/* check for collision, this updates the address when not previously set */
- if (check_collision (sess, source, arrival, rtp)) {
+ if (check_collision (sess, source, pinfo, rtp)) {
return NULL;
}
/* Receiving RTCP packets of an SSRC is a strong indication that we
g_object_set (source, "probation", 0, NULL);
}
/* update last activity */
- source->last_activity = arrival->current_time;
+ source->last_activity = pinfo->current_time;
if (rtp)
- source->last_rtp_activity = arrival->current_time;
+ source->last_rtp_activity = pinfo->current_time;
g_object_ref (source);
return source;
}
-/**
- * rtp_session_get_internal_source:
- * @sess: a #RTPSession
- *
- * Get the internal #RTPSource of @sess.
- *
- * Returns: The internal #RTPSource. g_object_unref() after usage.
- */
-RTPSource *
-rtp_session_get_internal_source (RTPSession * sess)
+/* must be called with the session lock, the returned source needs to be
+ * unreffed after usage. */
+static RTPSource *
+obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
+ GstClockTime current_time)
{
- RTPSource *result;
-
- g_return_val_if_fail (RTP_IS_SESSION (sess), NULL);
-
- result = g_object_ref (sess->source);
+ RTPSource *source;
- return result;
-}
+ source = find_source (sess, ssrc);
+ if (source == NULL) {
+ /* make new internal Source and insert */
+ source = rtp_source_new (ssrc);
-/**
- * rtp_session_set_internal_ssrc:
- * @sess: a #RTPSession
- * @ssrc: an SSRC
- *
- * Set the SSRC of @sess to @ssrc.
- */
-void
-rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
-{
- RTP_SESSION_LOCK (sess);
- if (ssrc != sess->source->ssrc) {
- g_hash_table_steal (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (sess->source->ssrc));
+ GST_DEBUG ("creating new internal source %08x %p", ssrc, source);
- GST_DEBUG ("setting internal SSRC to %08x", ssrc);
- /* After this call, any receiver of the old SSRC either in RTP or RTCP
- * packets will timeout on the old SSRC, we could potentially schedule a
- * BYE RTCP for the old SSRC... */
- sess->source->ssrc = ssrc;
- rtp_source_reset (sess->source);
+ source->validated = TRUE;
+ source->internal = TRUE;
+ source->probation = FALSE;
+ rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes));
+ rtp_source_set_callbacks (source, &callbacks, sess);
- /* rehash with the new SSRC */
- g_hash_table_insert (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (sess->source->ssrc), sess->source);
+ add_source (sess, source);
+ *created = TRUE;
+ } else {
+ *created = FALSE;
}
- RTP_SESSION_UNLOCK (sess);
+ /* update last activity */
+ if (current_time != GST_CLOCK_TIME_NONE) {
+ source->last_activity = current_time;
+ source->last_rtp_activity = current_time;
+ }
+ g_object_ref (source);
- g_object_notify (G_OBJECT (sess), "internal-ssrc");
+ return source;
}
/**
- * rtp_session_get_internal_ssrc:
+ * rtp_session_suggest_ssrc:
* @sess: a #RTPSession
*
- * Get the internal SSRC of @sess.
+ * Suggest an unused SSRC in @sess.
*
- * Returns: The SSRC of the session.
+ * Returns: a free unused SSRC
*/
guint32
-rtp_session_get_internal_ssrc (RTPSession * sess)
+rtp_session_suggest_ssrc (RTPSession * sess)
{
- guint32 ssrc;
+ guint32 result;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), 0);
RTP_SESSION_LOCK (sess);
- ssrc = sess->source->ssrc;
+ result = sess->suggested_ssrc;
RTP_SESSION_UNLOCK (sess);
- return ssrc;
+ return result;
}
/**
RTP_SESSION_LOCK (sess);
result = find_source (sess, ssrc);
- if (result)
+ if (result != NULL)
g_object_ref (result);
RTP_SESSION_UNLOCK (sess);
return source;
}
-/* update the RTPArrivalStats structure with the current time and other bits
+static gboolean
+update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
+{
+ GstNetAddressMeta *meta;
+
+ /* get packet size including header overhead */
+ pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
+ pinfo->packets++;
+
+ if (pinfo->rtp) {
+ GstRTPBuffer rtp = { NULL };
+
+ if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
+ goto invalid_packet;
+
+ pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
+ if (idx == 0) {
+ gint i;
+
+ /* only keep info for first buffer */
+ pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
+ pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
+ pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ /* copy available csrc */
+ pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
+ for (i = 0; i < pinfo->csrc_count; i++)
+ pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
+ }
+ gst_rtp_buffer_unmap (&rtp);
+ }
+
+ if (idx == 0) {
+ /* for netbuffer we can store the IP address to check for collisions */
+ meta = gst_buffer_get_net_address_meta (*buffer);
+ if (pinfo->address)
+ g_object_unref (pinfo->address);
+ if (meta) {
+ pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
+ } else {
+ pinfo->address = NULL;
+ }
+ }
+ return TRUE;
+
+ /* ERRORS */
+invalid_packet:
+ {
+ GST_DEBUG ("invalid RTP packet received");
+ return FALSE;
+ }
+}
+
+/* update the RTPPacketInfo structure with the current time and other bits
* about the current buffer we are handling.
* This function is typically called when a validated packet is received.
* This function should be called with the SESSION_LOCK
*/
+static gboolean
+update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
+ gboolean send, gboolean rtp, gboolean is_list, gpointer data,
+ GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
+{
+ gboolean res;
+
+ pinfo->send = send;
+ pinfo->rtp = rtp;
+ pinfo->is_list = is_list;
+ pinfo->data = data;
+ pinfo->current_time = current_time;
+ pinfo->running_time = running_time;
+ pinfo->ntpnstime = ntpnstime;
+ pinfo->header_len = sess->header_len;
+ pinfo->bytes = 0;
+ pinfo->payload_len = 0;
+ pinfo->packets = 0;
+
+ if (is_list) {
+ GstBufferList *list = GST_BUFFER_LIST_CAST (data);
+ res =
+ gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
+ pinfo);
+ } else {
+ GstBuffer *buffer = GST_BUFFER_CAST (data);
+ res = update_packet (&buffer, 0, pinfo);
+ }
+ return res;
+}
+
static void
-update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
- gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
- GstClockTime running_time, guint64 ntpnstime)
+clean_packet_info (RTPPacketInfo * pinfo)
{
- GstNetAddressMeta *meta;
- GstRTPBuffer rtpb = { NULL };
-
- /* get time of arrival */
- arrival->current_time = current_time;
- arrival->running_time = running_time;
- arrival->ntpnstime = ntpnstime;
+ if (pinfo->address)
+ g_object_unref (pinfo->address);
+ if (pinfo->data) {
+ gst_mini_object_unref (pinfo->data);
+ pinfo->data = NULL;
+ }
+}
- /* get packet size including header overhead */
- arrival->bytes = gst_buffer_get_size (buffer) + sess->header_len;
+static gboolean
+source_update_active (RTPSession * sess, RTPSource * source,
+ gboolean prevactive)
+{
+ gboolean active = RTP_SOURCE_IS_ACTIVE (source);
+ guint32 ssrc = source->ssrc;
- if (rtp) {
- gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpb);
- arrival->payload_len = gst_rtp_buffer_get_payload_len (&rtpb);
- gst_rtp_buffer_unmap (&rtpb);
- } else {
- arrival->payload_len = 0;
- }
+ if (prevactive == active)
+ return FALSE;
- /* for netbuffer we can store the IP address to check for collisions */
- meta = gst_buffer_get_net_address_meta (buffer);
- if (arrival->address)
- g_object_unref (arrival->address);
- if (meta) {
- arrival->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr));
+ if (active) {
+ sess->stats.active_sources++;
+ GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
+ sess->stats.active_sources);
} else {
- arrival->address = NULL;
+ sess->stats.active_sources--;
+ GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
+ sess->stats.active_sources);
}
+ return TRUE;
}
-static void
-clean_arrival_stats (RTPArrivalStats * arrival)
+static gboolean
+source_update_sender (RTPSession * sess, RTPSource * source,
+ gboolean prevsender)
{
- if (arrival->address)
- g_object_unref (arrival->address);
+ gboolean sender = RTP_SOURCE_IS_SENDER (source);
+ guint32 ssrc = source->ssrc;
+
+ if (prevsender == sender)
+ return FALSE;
+
+ if (sender) {
+ sess->stats.sender_sources++;
+ if (source->internal)
+ sess->stats.internal_sender_sources++;
+ GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
+ sess->stats.sender_sources);
+ } else {
+ sess->stats.sender_sources--;
+ if (source->internal)
+ sess->stats.internal_sender_sources--;
+ GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
+ sess->stats.sender_sources);
+ }
+ return TRUE;
}
/**
*/
GstFlowReturn
rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
- GstClockTime current_time, GstClockTime running_time)
+ GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
{
GstFlowReturn result;
guint32 ssrc;
RTPSource *source;
gboolean created;
gboolean prevsender, prevactive;
- RTPArrivalStats arrival = { NULL, };
- guint32 csrcs[16];
- guint8 i, count;
+ RTPPacketInfo pinfo = { 0, };
guint64 oldrate;
- GstRTPBuffer rtp = { NULL };
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
- if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
- goto invalid_packet;
-
RTP_SESSION_LOCK (sess);
- /* ignore more RTP packets when we left the session */
- if (sess->source->marked_bye)
- goto ignore;
- /* update arrival stats */
- update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
- running_time, -1);
+ /* update pinfo stats */
+ if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
+ current_time, running_time, ntpnstime)) {
+ GST_DEBUG ("invalid RTP packet received");
+ RTP_SESSION_UNLOCK (sess);
+ return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
+ }
+
+ ssrc = pinfo.ssrc;
- /* get SSRC and look up in session database */
- ssrc = gst_rtp_buffer_get_ssrc (&rtp);
- source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
+ source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
if (!source)
goto collision;
- /* copy available csrc for later */
- count = gst_rtp_buffer_get_csrc_count (&rtp);
- /* make sure to not overflow our array. An RTP buffer can maximally contain
- * 16 CSRCs */
- count = MIN (count, 16);
-
- for (i = 0; i < count; i++)
- csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
-
- gst_rtp_buffer_unmap (&rtp);
-
prevsender = RTP_SOURCE_IS_SENDER (source);
prevactive = RTP_SOURCE_IS_ACTIVE (source);
oldrate = source->bitrate;
/* let source process the packet */
- result = rtp_source_process_rtp (source, buffer, &arrival);
+ result = rtp_source_process_rtp (source, &pinfo);
/* source became active */
- if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) {
- sess->stats.active_sources++;
- GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
- sess->stats.active_sources);
+ if (source_update_active (sess, source, prevactive))
on_ssrc_validated (sess, source);
- }
- if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
- sess->stats.sender_sources++;
- GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc,
- sess->stats.sender_sources);
- }
+
+ source_update_sender (sess, source, prevsender);
+
if (oldrate != source->bitrate)
sess->recalc_bandwidth = TRUE;
if (source->validated) {
gboolean created;
+ gint i;
/* for validated sources, we add the CSRCs as well */
- for (i = 0; i < count; i++) {
+ for (i = 0; i < pinfo.csrc_count; i++) {
guint32 csrc;
RTPSource *csrc_src;
- csrc = csrcs[i];
+ csrc = pinfo.csrcs[i];
/* get source */
- csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+ csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
if (!csrc_src)
continue;
if (created) {
GST_DEBUG ("created new CSRC: %08x", csrc);
rtp_source_set_as_csrc (csrc_src);
- if (RTP_SOURCE_IS_ACTIVE (csrc_src))
- sess->stats.active_sources++;
+ source_update_active (sess, csrc_src, FALSE);
on_new_ssrc (sess, csrc_src);
}
g_object_unref (csrc_src);
RTP_SESSION_UNLOCK (sess);
- clean_arrival_stats (&arrival);
+ clean_packet_info (&pinfo);
return result;
/* ERRORS */
-invalid_packet:
- {
- gst_buffer_unref (buffer);
- GST_DEBUG ("invalid RTP packet received");
- return GST_FLOW_OK;
- }
-ignore:
- {
- RTP_SESSION_UNLOCK (sess);
- gst_rtp_buffer_unmap (&rtp);
- gst_buffer_unref (buffer);
- GST_DEBUG ("ignoring RTP packet because we are leaving");
- return GST_FLOW_OK;
- }
collision:
{
RTP_SESSION_UNLOCK (sess);
- gst_rtp_buffer_unmap (&rtp);
- gst_buffer_unref (buffer);
- clean_arrival_stats (&arrival);
+ clean_packet_info (&pinfo);
GST_DEBUG ("ignoring packet because its collisioning");
return GST_FLOW_OK;
}
static void
rtp_session_process_rb (RTPSession * sess, RTPSource * source,
- GstRTCPPacket * packet, RTPArrivalStats * arrival)
+ GstRTCPPacket * packet, RTPPacketInfo * pinfo)
{
guint count, i;
guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
guint8 fractionlost;
gint32 packetslost;
+ RTPSource *src;
gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
&packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
- if (ssrc == sess->source->ssrc) {
+ /* find our own source */
+ src = find_source (sess, ssrc);
+ if (src == NULL)
+ continue;
+
+ if (src->internal && RTP_SOURCE_IS_ACTIVE (src)) {
/* only deal with report blocks for our session, we update the stats of
* the sender of the RTCP message. We could also compare our stats against
* the other sender to see if we are better or worse. */
- rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
+ /* FIXME, need to keep track who the RB block is from */
+ rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost,
packetslost, exthighestseq, jitter, lsr, dlsr);
}
}
*/
static void
rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival, gboolean * do_sync)
+ RTPPacketInfo * pinfo, gboolean * do_sync)
{
guint32 senderssrc, rtptime, packet_count, octet_count;
guint64 ntptime;
&packet_count, &octet_count);
GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT,
- senderssrc, GST_TIME_ARGS (arrival->current_time));
+ senderssrc, GST_TIME_ARGS (pinfo->current_time));
- source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto out;
+
/* don't try to do lip-sync for sources that sent a BYE */
if (RTP_SOURCE_IS_MARKED_BYE (source))
*do_sync = FALSE;
prevsender = RTP_SOURCE_IS_SENDER (source);
/* first update the source */
- rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime,
+ rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime,
packet_count, octet_count);
- if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
- sess->stats.sender_sources++;
- GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc,
- sess->stats.sender_sources);
- }
+ source_update_sender (sess, source, prevsender);
if (created)
on_new_ssrc (sess, source);
- rtp_session_process_rb (sess, source, packet, arrival);
+ rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
g_object_unref (source);
}
*/
static void
rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
guint32 senderssrc;
RTPSource *source;
GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
- source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, senderssrc, &created, pinfo, FALSE);
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto out;
+
if (created)
on_new_ssrc (sess, source);
- rtp_session_process_rb (sess, source, packet, arrival);
+ rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
g_object_unref (source);
}
/* Get SDES items and store them in the SSRC */
static void
rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
guint items, i, j;
gboolean more_items, more_entries;
i = 0;
while (more_items) {
guint32 ssrc;
- gboolean changed, created, validated;
+ gboolean changed, created, prevactive;
RTPSource *source;
GstStructure *sdes;
changed = FALSE;
/* find src, no probation when dealing with RTCP */
- source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto next;
+
sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
more_entries = gst_rtcp_packet_sdes_first_entry (packet);
/* takes ownership of sdes */
changed = rtp_source_set_sdes_struct (source, sdes);
- validated = !RTP_SOURCE_IS_ACTIVE (source);
+ prevactive = RTP_SOURCE_IS_ACTIVE (source);
source->validated = TRUE;
if (created)
on_new_ssrc (sess, source);
/* source became active */
- if (validated) {
- sess->stats.active_sources++;
- GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
- sess->stats.active_sources);
+ if (source_update_active (sess, source, prevactive))
on_ssrc_validated (sess, source);
- }
if (changed)
on_ssrc_sdes (sess, source);
+ next:
g_object_unref (source);
more_items = gst_rtcp_packet_sdes_next_item (packet);
*/
static void
rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
guint count, i;
gchar *reason;
ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
GST_DEBUG ("SSRC: %08x", ssrc);
- if (ssrc == sess->source->ssrc)
- return;
-
/* find src and mark bye, no probation when dealing with RTCP */
- source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+ source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
if (!source)
return;
+ if (source->internal) {
+ /* our own source, something weird with this packet */
+ g_object_unref (source);
+ continue;
+ }
+
/* store time for when we need to time out this source */
- source->bye_time = arrival->current_time;
+ source->bye_time = pinfo->current_time;
prevactive = RTP_SOURCE_IS_ACTIVE (source);
prevsender = RTP_SOURCE_IS_SENDER (source);
pmembers = sess->stats.active_sources;
- if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
- sess->stats.active_sources--;
- GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
- sess->stats.active_sources);
- }
- if (prevsender && !RTP_SOURCE_IS_SENDER (source)) {
- sess->stats.sender_sources--;
- GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
- sess->stats.sender_sources);
- }
+ source_update_active (sess, source, prevactive);
+ source_update_sender (sess, source, prevsender);
+
members = sess->stats.active_sources;
- if (!sess->source->marked_bye && members < pmembers) {
+ if (!sess->scheduled_bye && members < pmembers) {
/* some members went away since the previous timeout estimate.
* Perform reverse reconsideration but only when we are not scheduling a
* BYE ourselves. */
if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE &&
- arrival->current_time < sess->next_rtcp_check_time) {
+ pinfo->current_time < sess->next_rtcp_check_time) {
GstClockTime time_remaining;
- time_remaining = sess->next_rtcp_check_time - arrival->current_time;
+ time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
sess->next_rtcp_check_time =
gst_util_uint64_scale (time_remaining, members, pmembers);
GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
GST_TIME_ARGS (sess->next_rtcp_check_time));
- sess->next_rtcp_check_time += arrival->current_time;
+ sess->next_rtcp_check_time += pinfo->current_time;
/* mark pending reconsider. We only want to signal the reconsideration
* once after we handled all the source in the bye packet */
static void
rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival)
+ RTPPacketInfo * pinfo)
{
GST_DEBUG ("received APP");
}
GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
GST_SECOND, 65536);
- if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
- current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
+ if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
GST_DEBUG ("Ignoring %s request because one was send without one "
"RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
fir ? "FIR" : "PLI",
GST_TIME_ARGS (current_time - sess->last_keyframe_request),
- GST_TIME_ARGS (round_trip_in_ns));;
+ GST_TIME_ARGS (round_trip_in_ns));
return FALSE;
}
}
return;
src = find_source (sess, sender_ssrc);
- if (!src)
+ if (src == NULL)
return;
rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
+
+ src->stats.recv_pli_count++;
}
static void
if (!src && sender_ssrc == 1) {
GHashTableIter iter;
- if (sess->stats.sender_sources >
- RTP_SOURCE_IS_SENDER (sess->source) ? 2 : 1)
+ /* we can't find the source if there are multiple */
+ if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
return;
g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
-
while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
- if (src != sess->source && rtp_source_is_sender (src))
+ if (!src->internal && rtp_source_is_sender (src))
break;
src = NULL;
}
}
-
if (!src)
return;
for (position = 0; position < fci_length; position += 8) {
guint8 *data = fci_data + position;
+ RTPSource *own;
ssrc = GST_READ_UINT32_BE (data);
- if (ssrc == rtp_source_get_ssrc (sess->source)) {
+ own = find_source (sess, ssrc);
+ if (own == NULL)
+ continue;
+
+ if (own->internal) {
our_request = TRUE;
break;
}
return;
rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
+ src->stats.recv_fir_count++;
+}
+
+static void
+rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc,
+ guint32 media_ssrc, guint8 * fci_data, guint fci_length,
+ GstClockTime current_time)
+{
+ sess->stats.nacks_received++;
+
+ if (!sess->callbacks.notify_nack)
+ return;
+
+ while (fci_length > 0) {
+ guint16 seqnum, blp;
+
+ seqnum = GST_READ_UINT16_BE (fci_data);
+ blp = GST_READ_UINT16_BE (fci_data + 2);
+
+ GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
+
+ RTP_SESSION_UNLOCK (sess);
+ sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
+ sess->notify_nack_user_data);
+ RTP_SESSION_LOCK (sess);
+
+ fci_data += 4;
+ fci_length -= 4;
+ }
}
static void
rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
- RTPArrivalStats * arrival, GstClockTime current_time)
+ RTPPacketInfo * pinfo, GstClockTime current_time)
{
GstRTCPType type = gst_rtcp_packet_get_type (packet);
GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
+ RTPSource *src;
+
+ src = find_source (sess, media_ssrc);
+
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
+ return;
GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
"length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
fci_length);
- GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
+ GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
}
RTP_SESSION_UNLOCK (sess);
gst_buffer_unref (fci_buffer);
}
- if (sess->rtcp_feedback_retention_window) {
- RTPSource *src = find_source (sess, media_ssrc);
-
- if (src)
- rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
+ if (src && sess->rtcp_feedback_retention_window) {
+ rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
}
- if (rtp_source_get_ssrc (sess->source) == media_ssrc ||
+ if ((src && src->internal) ||
/* PSFB FIR puts the media ssrc inside the FCI */
(type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
switch (type) {
}
break;
case GST_RTCP_TYPE_RTPFB:
+ switch (fbtype) {
+ case GST_RTCP_RTPFB_TYPE_NACK:
+ rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
+ fci_data, fci_length, current_time);
+ break;
+ default:
+ break;
+ }
default:
break;
}
{
GstRTCPPacket packet;
gboolean more, is_bye = FALSE, do_sync = FALSE;
- RTPArrivalStats arrival = { NULL, };
+ RTPPacketInfo pinfo = { 0, };
GstFlowReturn result = GST_FLOW_OK;
GstRTCPBuffer rtcp = { NULL, };
GST_DEBUG ("received RTCP packet");
- RTP_SESSION_LOCK (sess);
- /* update arrival stats */
- update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
- ntpnstime);
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
+ buffer);
- if (sess->source->sent_bye)
- goto ignore;
+ RTP_SESSION_LOCK (sess);
+ /* update pinfo stats */
+ update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
+ -1, ntpnstime);
/* start processing the compound packet */
gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
type = gst_rtcp_packet_get_type (&packet);
- /* when we are leaving the session, we should ignore all non-BYE messages */
- if (sess->source->marked_bye && type != GST_RTCP_TYPE_BYE) {
- GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
- goto next;
- }
-
switch (type) {
case GST_RTCP_TYPE_SR:
- rtp_session_process_sr (sess, &packet, &arrival, &do_sync);
+ rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
break;
case GST_RTCP_TYPE_RR:
- rtp_session_process_rr (sess, &packet, &arrival);
+ rtp_session_process_rr (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_SDES:
- rtp_session_process_sdes (sess, &packet, &arrival);
+ rtp_session_process_sdes (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_BYE:
is_bye = TRUE;
/* don't try to attempt lip-sync anymore for streams with a BYE */
do_sync = FALSE;
- rtp_session_process_bye (sess, &packet, &arrival);
+ rtp_session_process_bye (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_APP:
- rtp_session_process_app (sess, &packet, &arrival);
+ rtp_session_process_app (sess, &packet, &pinfo);
break;
case GST_RTCP_TYPE_RTPFB:
case GST_RTCP_TYPE_PSFB:
- rtp_session_process_feedback (sess, &packet, &arrival, current_time);
+ rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
break;
default:
GST_WARNING ("got unknown RTCP packet");
break;
}
- next:
more = gst_rtcp_packet_move_to_next (&packet);
}
/* if we are scheduling a BYE, we only want to count bye packets, else we
* count everything */
- if (sess->source->marked_bye) {
- if (is_bye) {
- sess->stats.bye_members++;
- UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
- }
- } else {
- /* keep track of average packet size */
- UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
+ if (sess->scheduled_bye && is_bye) {
+ sess->bye_stats.bye_members++;
+ UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
}
+
+ /* keep track of average packet size */
+ UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
+
GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
- sess->stats.avg_rtcp_packet_size, arrival.bytes);
+ sess->stats.avg_rtcp_packet_size, pinfo.bytes);
RTP_SESSION_UNLOCK (sess);
- clean_arrival_stats (&arrival);
+ pinfo.data = NULL;
+ clean_packet_info (&pinfo);
/* notify caller of sr packets in the callback */
if (do_sync && sess->callbacks.sync_rtcp) {
- /* make writable, we might want to change the buffer */
- buffer = gst_buffer_make_writable (buffer);
-
result = sess->callbacks.sync_rtcp (sess, buffer,
sess->sync_rtcp_user_data);
} else
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
-ignore:
- {
- RTP_SESSION_UNLOCK (sess);
- gst_buffer_unref (buffer);
- clean_arrival_stats (&arrival);
- GST_DEBUG ("ignoring RTCP packet because we left");
- return GST_FLOW_OK;
- }
}
/**
void
rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
{
+ GstStructure *s;
+ guint ssrc;
+
g_return_if_fail (RTP_IS_SESSION (sess));
g_return_if_fail (GST_IS_CAPS (caps));
GST_LOG ("received caps %" GST_PTR_FORMAT, caps);
- RTP_SESSION_LOCK (sess);
- rtp_source_update_caps (sess->source, caps);
- RTP_SESSION_UNLOCK (sess);
+ s = gst_caps_get_structure (caps, 0);
+
+ if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
+ RTPSource *source;
+ gboolean created;
+
+ RTP_SESSION_LOCK (sess);
+ source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
+ if (source) {
+ rtp_source_update_caps (source, caps);
+ g_object_unref (source);
+ }
+
+ if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
+ source =
+ obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
+ if (source) {
+ rtp_source_update_caps (source, caps);
+ g_object_unref (source);
+ }
+ }
+ RTP_SESSION_UNLOCK (sess);
+ }
}
/**
RTPSource *source;
gboolean prevsender;
guint64 oldrate;
+ RTPPacketInfo pinfo = { 0, };
+ gboolean created;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
RTP_SESSION_LOCK (sess);
- source = sess->source;
+ if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
+ current_time, running_time, -1))
+ goto invalid_packet;
- /* update last activity */
- source->last_rtp_activity = current_time;
+ source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
prevsender = RTP_SOURCE_IS_SENDER (source);
oldrate = source->bitrate;
/* we use our own source to send */
- result = rtp_source_send_rtp (source, data, is_list, running_time);
+ result = rtp_source_send_rtp (source, &pinfo);
+
+ source_update_sender (sess, source, prevsender);
- if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
- sess->stats.sender_sources++;
if (oldrate != source->bitrate)
sess->recalc_bandwidth = TRUE;
RTP_SESSION_UNLOCK (sess);
+ g_object_unref (source);
+ clean_packet_info (&pinfo);
+
return result;
+
+invalid_packet:
+ {
+ gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+ RTP_SESSION_UNLOCK (sess);
+ GST_DEBUG ("invalid RTP packet received");
+ return GST_FLOW_OK;
+ }
}
static void
gboolean first)
{
GstClockTime result;
+ RTPSessionStats *stats;
/* recalculate bandwidth when it changed */
if (sess->recalc_bandwidth) {
bandwidth = sess->bandwidth;
else {
/* If it is <= 0, then try to estimate the actual bandwidth */
- bandwidth = sess->source->bitrate;
+ bandwidth = 0;
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) add_bitrates, &bandwidth);
- bandwidth /= 8.0;
}
- if (bandwidth < 8000)
+ if (bandwidth < RTP_STATS_BANDWIDTH)
bandwidth = RTP_STATS_BANDWIDTH;
rtp_stats_set_bandwidths (&sess->stats, bandwidth,
sess->recalc_bandwidth = FALSE;
}
- if (sess->source->marked_bye) {
- result = rtp_stats_calculate_bye_interval (&sess->stats);
+ if (sess->scheduled_bye) {
+ stats = &sess->bye_stats;
+ result = rtp_stats_calculate_bye_interval (stats);
} else {
- result = rtp_stats_calculate_rtcp_interval (&sess->stats,
- RTP_SOURCE_IS_SENDER (sess->source), first);
+ stats = &sess->stats;
+ result = rtp_stats_calculate_rtcp_interval (stats,
+ stats->internal_sender_sources > 0, first);
}
GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
GST_TIME_ARGS (result), first);
if (!deterministic && result != GST_CLOCK_TIME_NONE)
- result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+ result = rtp_stats_add_rtcp_jitter (stats, result);
GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
return result;
}
+static void
+source_mark_bye (const gchar * key, RTPSource * source, const gchar * reason)
+{
+ if (source->internal)
+ rtp_source_mark_bye (source, reason);
+}
+
+/**
+ * rtp_session_mark_all_bye:
+ * @sess: an #RTPSession
+ * @reason: a reason
+ *
+ * Mark all internal sources of the session as BYE with @reason.
+ */
+void
+rtp_session_mark_all_bye (RTPSession * sess, const gchar * reason)
+{
+ g_return_if_fail (RTP_IS_SESSION (sess));
+
+ RTP_SESSION_LOCK (sess);
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) source_mark_bye, (gpointer) reason);
+ RTP_SESSION_UNLOCK (sess);
+}
+
/* Stop the current @sess and schedule a BYE message for the other members.
* One must have the session lock to call this function
*/
static GstFlowReturn
-rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason,
- GstClockTime current_time)
+rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time)
{
GstFlowReturn result = GST_FLOW_OK;
- RTPSource *source;
GstClockTime interval;
- g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
-
- source = sess->source;
-
- /* ignore more BYEs */
- if (source->marked_bye)
+ /* nothing to do it we already scheduled bye */
+ if (sess->scheduled_bye)
goto done;
- /* we have BYE now */
- rtp_source_mark_bye (source, reason);
- INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
- sess->stats.bye_members = 1;
+ /* we schedule BYE now */
+ sess->scheduled_bye = TRUE;
+ /* at least one member wants to send a BYE */
+ memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
+ INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
+ sess->bye_stats.bye_members = 1;
sess->first_rtcp = TRUE;
sess->allow_early = TRUE;
/**
* rtp_session_schedule_bye:
* @sess: an #RTPSession
- * @reason: a reason or NULL
* @current_time: the current system time
*
- * Stop the current @sess and schedule a BYE message for the other members.
+ * Schedule a BYE message for all sources marked as BYE in @sess.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_schedule_bye (RTPSession * sess, const gchar * reason,
- GstClockTime current_time)
+rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
{
- GstFlowReturn result = GST_FLOW_OK;
+ GstFlowReturn result;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
RTP_SESSION_LOCK (sess);
- result = rtp_session_schedule_bye_locked (sess, reason, current_time);
+ result = rtp_session_schedule_bye_locked (sess, current_time);
RTP_SESSION_UNLOCK (sess);
return result;
RTP_SESSION_LOCK (sess);
if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+ GST_DEBUG ("have early rtcp time");
result = sess->next_early_rtcp_time;
goto early_exit;
}
result = current_time;
}
- if (sess->source->marked_bye) {
- if (sess->source->sent_bye) {
- GST_DEBUG ("we sent BYE already");
- interval = GST_CLOCK_TIME_NONE;
- } else if (sess->stats.active_sources >= 50) {
+ if (sess->scheduled_bye) {
+ if (sess->bye_stats.active_sources >= 50) {
GST_DEBUG ("reconsider BYE, more than 50 sources");
/* reconsider BYE if members >= 50 */
interval = calculate_rtcp_interval (sess, FALSE, TRUE);
typedef struct
{
+ RTPSource *source;
+ gboolean is_bye;
+ GstBuffer *buffer;
+} ReportOutput;
+
+typedef struct
+{
GstRTCPBuffer rtcpbuf;
RTPSession *sess;
+ RTPSource *source;
+ guint num_to_report;
+ gboolean have_fir;
+ gboolean have_pli;
+ gboolean have_nack;
GstBuffer *rtcp;
GstClockTime current_time;
guint64 ntpnstime;
GstClockTime running_time;
GstClockTime interval;
GstRTCPPacket packet;
- gboolean is_bye;
gboolean has_sdes;
gboolean is_early;
gboolean may_suppress;
+ GQueue output;
+ guint nacked_seqnums;
} ReportData;
static void
session_start_rtcp (RTPSession * sess, ReportData * data)
{
GstRTCPPacket *packet = &data->packet;
- RTPSource *own = sess->source;
+ RTPSource *own = data->source;
GstRTCPBuffer *rtcp = &data->rtcpbuf;
data->rtcp = gst_rtcp_buffer_new (sess->mtu);
+ data->has_sdes = FALSE;
gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
{
RTPSession *sess = data->sess;
GstRTCPPacket *packet = &data->packet;
+ guint8 fractionlost;
+ gint32 packetslost;
+ guint32 exthighestseq, jitter;
+ guint32 lsr, dlsr;
+
+ /* don't report for sources in future generations */
+ if (((gint16) (source->generation - sess->generation)) > 0) {
+ GST_DEBUG ("source %08x generation %u > %u", source->ssrc,
+ source->generation, sess->generation);
+ return;
+ }
- /* create a new buffer if needed */
- if (data->rtcp == NULL) {
- session_start_rtcp (sess, data);
- } else if (data->is_early) {
- /* Put a single RR or SR in minimal compound packets */
+ if (g_hash_table_contains (source->reported_in_sr_of,
+ GUINT_TO_POINTER (data->source->ssrc))) {
+ GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
return;
}
- if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
- /* only report about other sender sources */
- if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
- guint8 fractionlost;
- gint32 packetslost;
- guint32 exthighestseq, jitter;
- guint32 lsr, dlsr;
-
- /* get new stats */
- rtp_source_get_new_rb (source, data->current_time, &fractionlost,
- &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
-
- /* store last generated RR packet */
- source->last_rr.is_valid = TRUE;
- source->last_rr.fractionlost = fractionlost;
- source->last_rr.packetslost = packetslost;
- source->last_rr.exthighestseq = exthighestseq;
- source->last_rr.jitter = jitter;
- source->last_rr.lsr = lsr;
- source->last_rr.dlsr = dlsr;
-
- /* packet is not yet filled, add report block for this source. */
- gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
- exthighestseq, jitter, lsr, dlsr);
- }
+
+ if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
+ GST_DEBUG ("max RB count reached");
+ return;
+ }
+
+ /* only report about other sender */
+ if (source == data->source)
+ goto reported;
+
+ if (!RTP_SOURCE_IS_SENDER (source)) {
+ GST_DEBUG ("source %08x not sender", source->ssrc);
+ goto reported;
+ }
+
+ GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
+
+ /* get new stats */
+ rtp_source_get_new_rb (source, data->current_time, &fractionlost,
+ &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+
+ /* store last generated RR packet */
+ source->last_rr.is_valid = TRUE;
+ source->last_rr.fractionlost = fractionlost;
+ source->last_rr.packetslost = packetslost;
+ source->last_rr.exthighestseq = exthighestseq;
+ source->last_rr.jitter = jitter;
+ source->last_rr.lsr = lsr;
+ source->last_rr.dlsr = dlsr;
+
+ /* packet is not yet filled, add report block for this source. */
+ gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
+ exthighestseq, jitter, lsr, dlsr);
+
+reported:
+ g_hash_table_add (source->reported_in_sr_of,
+ GUINT_TO_POINTER (data->source->ssrc));
+}
+
+/* construct FIR */
+static void
+session_add_fir (const gchar * key, RTPSource * source, ReportData * data)
+{
+ GstRTCPPacket *packet = &data->packet;
+ guint16 len;
+ guint8 *fci_data;
+
+ if (!source->send_fir)
+ return;
+
+ len = gst_rtcp_packet_fb_get_fci_length (packet);
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, len + 2))
+ /* exit because the packet is full, will put next request in a
+ * further packet */
+ return;
+
+ fci_data = gst_rtcp_packet_fb_get_fci (packet) + (len * 4);
+
+ GST_WRITE_UINT32_BE (fci_data, source->ssrc);
+ fci_data += 4;
+ fci_data[0] = source->current_send_fir_seqnum;
+ fci_data[1] = fci_data[2] = fci_data[3] = 0;
+
+ source->send_fir = FALSE;
+ source->stats.sent_fir_count++;
+}
+
+static void
+session_fir (RTPSession * sess, ReportData * data)
+{
+ GstRTCPBuffer *rtcp = &data->rtcpbuf;
+ GstRTCPPacket *packet = &data->packet;
+
+ if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
+ return;
+
+ gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_FIR);
+ gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
+ gst_rtcp_packet_fb_set_media_ssrc (packet, 0);
+
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) session_add_fir, data);
+
+ if (gst_rtcp_packet_fb_get_fci_length (packet) == 0)
+ gst_rtcp_packet_remove (packet);
+ else
+ data->may_suppress = FALSE;
+}
+
+static gboolean
+has_pli_compare_func (gconstpointer a, gconstpointer ignored)
+{
+ GstRTCPPacket packet;
+ GstRTCPBuffer rtcp = { NULL, };
+ gboolean ret = FALSE;
+
+ gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
+
+ if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
+ if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
+ gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
+ ret = TRUE;
+ }
+
+ gst_rtcp_buffer_unmap (&rtcp);
+
+ return ret;
+}
+
+/* construct PLI */
+static void
+session_pli (const gchar * key, RTPSource * source, ReportData * data)
+{
+ GstRTCPBuffer *rtcp = &data->rtcpbuf;
+ GstRTCPPacket *packet = &data->packet;
+
+ if (!source->send_pli)
+ return;
+
+ if (rtp_source_has_retained (source, has_pli_compare_func, NULL))
+ return;
+
+ if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_PSFB, packet))
+ /* exit because the packet is full, will put next request in a
+ * further packet */
+ return;
+
+ gst_rtcp_packet_fb_set_type (packet, GST_RTCP_PSFB_TYPE_PLI);
+ gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
+ gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
+
+ source->send_pli = FALSE;
+ data->may_suppress = FALSE;
+
+ source->stats.sent_pli_count++;
+}
+
+/* construct NACK */
+static void
+session_nack (const gchar * key, RTPSource * source, ReportData * data)
+{
+ GstRTCPBuffer *rtcp = &data->rtcpbuf;
+ GstRTCPPacket *packet = &data->packet;
+ guint32 *nacks;
+ guint n_nacks, i;
+ guint8 *fci_data;
+
+ if (!source->send_nack)
+ return;
+
+ if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
+ /* exit because the packet is full, will put next request in a
+ * further packet */
+ return;
+
+ gst_rtcp_packet_fb_set_type (packet, GST_RTCP_RTPFB_TYPE_NACK);
+ gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
+ gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
+
+ nacks = rtp_source_get_nacks (source, &n_nacks);
+ GST_DEBUG ("%u NACKs", n_nacks);
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
+ return;
+
+ fci_data = gst_rtcp_packet_fb_get_fci (packet);
+ for (i = 0; i < n_nacks; i++) {
+ GST_WRITE_UINT32_BE (fci_data, nacks[i]);
+ fci_data += 4;
+ data->nacked_seqnums++;
}
+
+ rtp_source_clear_nacks (source);
+ data->may_suppress = FALSE;
}
/* perform cleanup of sources that timed out */
GstClockTime interval, binterval;
GstClockTime btime;
- is_sender = RTP_SOURCE_IS_SENDER (source);
- is_active = RTP_SOURCE_IS_ACTIVE (source);
+ GST_DEBUG ("look at %08x, generation %u", source->ssrc, source->generation);
+
+ /* check for outdated collisions */
+ if (source->internal) {
+ GST_DEBUG ("Timing out collisions for %x", source->ssrc);
+ rtp_source_timeout (source, data->current_time,
+ data->running_time - sess->rtcp_feedback_retention_window);
+ }
- /* nothing to do when without RTCP */
+ /* nothing else to do when without RTCP */
if (data->interval == GST_CLOCK_TIME_NONE)
return;
+ is_sender = RTP_SOURCE_IS_SENDER (source);
+ is_active = RTP_SOURCE_IS_ACTIVE (source);
+
/* our own rtcp interval may have been forced low by secondary configuration,
* while sender side may still operate with higher interval,
* so do not just take our interval to decide on timing out sender,
* interval = CLAMP (sender_interval, data->interval, 5 * GST_SECOND)
* where sender_interval is difference between last 2 received RTCP reports
*/
- if (data->interval >= 5 * GST_SECOND || (source == sess->source)) {
+ if (data->interval >= 5 * GST_SECOND || source->internal) {
binterval = data->interval;
} else {
GST_LOG ("prev_rtcp %" GST_TIME_FORMAT ", last_rtcp %" GST_TIME_FORMAT,
GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
GST_TIME_ARGS (binterval));
- /* check for our own source, we don't want to delete our own source. */
- if (!(source == sess->source)) {
- if (source->marked_bye) {
- /* if we received a BYE from the source, remove the source after some
- * time. */
- if (data->current_time > source->bye_time &&
- data->current_time - source->bye_time > sess->stats.bye_timeout) {
- GST_DEBUG ("removing BYE source %08x", source->ssrc);
- remove = TRUE;
- byetimeout = TRUE;
- }
+ if (!source->internal && source->marked_bye) {
+ /* if we received a BYE from the source, remove the source after some
+ * time. */
+ if (data->current_time > source->bye_time &&
+ data->current_time - source->bye_time > sess->stats.bye_timeout) {
+ GST_DEBUG ("removing BYE source %08x", source->ssrc);
+ remove = TRUE;
+ byetimeout = TRUE;
}
- /* sources that were inactive for more than 5 times the deterministic reporting
- * interval get timed out. the min timeout is 5 seconds. */
- /* mind old time that might pre-date last time going to PLAYING */
- btime = MAX (source->last_activity, sess->start_time);
- if (data->current_time > btime) {
- interval = MAX (binterval * 5, 5 * GST_SECOND);
- if (data->current_time - btime > interval) {
- GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
- source->ssrc, GST_TIME_ARGS (btime));
+ }
+
+ if (source->internal && source->sent_bye) {
+ GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
+ remove = TRUE;
+ }
+
+ /* sources that were inactive for more than 5 times the deterministic reporting
+ * interval get timed out. the min timeout is 5 seconds. */
+ /* mind old time that might pre-date last time going to PLAYING */
+ btime = MAX (source->last_activity, sess->start_time);
+ if (data->current_time > btime) {
+ interval = MAX (binterval * 5, 5 * GST_SECOND);
+ if (data->current_time - btime > interval) {
+ GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
+ source->ssrc, GST_TIME_ARGS (btime));
+ if (source->internal) {
+ /* this is an internal source that is not using our suggested ssrc.
+ * since there must be another source using this ssrc, we can remove
+ * this one instead of making it a receiver forever */
+ if (source->ssrc != sess->suggested_ssrc) {
+ rtp_source_mark_bye (source, "timed out");
+ /* do not schedule bye here, since we are inside the RTCP timeout
+ * processing and scheduling bye will interfere with SR/RR sending */
+ }
+ } else {
remove = TRUE;
}
}
}
/* senders that did not send for a long time become a receiver, this also
- * holds for our own source. */
+ * holds for our own sources. */
if (is_sender) {
/* mind old time that might pre-date last time going to PLAYING */
btime = MAX (source->last_rtp_activity, sess->start_time);
if (data->current_time - btime > interval) {
GST_DEBUG ("sender source %08x timed out and became receiver, last %"
GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
- source->is_sender = FALSE;
- sess->stats.sender_sources--;
sendertimeout = TRUE;
}
}
if (remove) {
sess->total_sources--;
- if (is_sender)
+ if (is_sender) {
sess->stats.sender_sources--;
+ if (source->internal)
+ sess->stats.internal_sender_sources--;
+ }
if (is_active)
sess->stats.active_sources--;
+ if (source->internal)
+ sess->stats.internal_sources--;
+
if (byetimeout)
on_bye_timeout (sess, source);
else
on_timeout (sess, source);
} else {
- if (sendertimeout)
+ if (sendertimeout) {
+ source->is_sender = FALSE;
+ sess->stats.sender_sources--;
+ if (source->internal)
+ sess->stats.internal_sender_sources--;
+
on_sender_timeout (sess, source);
+ }
+ /* count how many source to report in this generation */
+ if (((gint16) (source->generation - sess->generation)) <= 0)
+ data->num_to_report++;
}
-
source->closing = remove;
}
/* add SDES packet */
gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
- gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
+ gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
- sdes = rtp_source_get_sdes_struct (sess->source);
+ sdes = rtp_source_get_sdes_struct (data->source);
/* add all fields in the structure, the order is not important. */
n_fields = gst_structure_n_fields (sdes);
/* schedule a BYE packet */
static void
-session_bye (RTPSession * sess, ReportData * data)
+make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
{
GstRTCPPacket *packet = &data->packet;
GstRTCPBuffer *rtcp = &data->rtcpbuf;
- RTPSource *source = sess->source;
-
- /* open packet */
- session_start_rtcp (sess, data);
/* add SDES */
session_sdes (sess, data);
-
/* add a BYE packet */
gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
gst_rtcp_packet_bye_set_reason (packet, source->bye_reason);
/* we have a BYE packet now */
- data->is_bye = TRUE;
+ source->sent_bye = TRUE;
}
static gboolean
is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
{
- GstClockTime new_send_time, elapsed;
+ GstClockTime new_send_time;
+ GstClockTime interval;
+ RTPSessionStats *stats;
+
+ if (sess->scheduled_bye)
+ stats = &sess->bye_stats;
+ else
+ stats = &sess->stats;
+
+ if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
+ data->is_early = TRUE;
+ else
+ data->is_early = FALSE;
- if (data->is_early && sess->next_early_rtcp_time < current_time)
+ if (data->is_early && sess->next_early_rtcp_time < current_time) {
+ GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
+ GST_TIME_ARGS (current_time));
goto early;
+ }
/* no need to check yet */
if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
return FALSE;
}
- /* get elapsed time since we last reported */
- elapsed = current_time - sess->last_rtcp_send_time;
-
- new_send_time = data->interval;
- /* perform forward reconsideration */
- if (new_send_time != GST_CLOCK_TIME_NONE) {
- new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, new_send_time);
-
- GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
- GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time),
- GST_TIME_ARGS (elapsed));
-
- new_send_time += sess->last_rtcp_send_time;
- }
-
- /* check if reconsideration */
- if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
- GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
- GST_TIME_ARGS (new_send_time));
- /* store new check time */
- sess->next_rtcp_check_time = new_send_time;
- return FALSE;
- }
-
early:
- new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
+ /* take interval and add jitter */
+ interval = data->interval;
+ if (interval != GST_CLOCK_TIME_NONE)
+ interval = rtp_stats_add_rtcp_jitter (stats, interval);
- GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
- GST_TIME_ARGS (new_send_time));
+ if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
+ /* perform forward reconsideration */
+ if (interval != GST_CLOCK_TIME_NONE) {
+ GstClockTime elapsed;
- sess->next_rtcp_check_time = new_send_time;
- if (new_send_time != GST_CLOCK_TIME_NONE) {
- sess->next_rtcp_check_time += current_time;
+ /* get elapsed time since we last reported */
+ elapsed = current_time - sess->last_rtcp_send_time;
+ GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
+ new_send_time = interval + sess->last_rtcp_send_time;
+ } else {
+ new_send_time = sess->last_rtcp_send_time;
+ }
+ } else {
+ /* If this is the first RTCP packet, we can reconsider anything based
+ * on the last RTCP send time because there was none.
+ */
+ g_warn_if_fail (!data->is_early);
+ data->is_early = FALSE;
+ new_send_time = current_time;
+ }
+
+ if (!data->is_early) {
+ /* check if reconsideration */
+ if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) {
+ GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+ /* store new check time */
+ sess->next_rtcp_check_time = new_send_time;
+ return FALSE;
+ }
+ sess->next_rtcp_check_time = current_time + interval;
+ } else if (interval != GST_CLOCK_TIME_NONE) {
/* Apply the rules from RFC 4585 section 3.5.3 */
- if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
- GstClockTimeDiff T_rr_current_interval =
- g_random_double_range (0.5, 1.5) * sess->stats.min_interval;
+ if (stats->min_interval != 0 && !sess->first_rtcp) {
+ GstClockTime T_rr_current_interval =
+ g_random_double_range (0.5, 1.5) * stats->min_interval;
/* This will caused the RTCP to be suppressed if no FB packets are added */
- if (sess->last_rtcp_send_time + T_rr_current_interval >
- sess->next_rtcp_check_time) {
+ if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
" last: %" GST_TIME_FORMAT
" + T_rr_current_interval: %" GST_TIME_FORMAT
- " > sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (sess->stats.min_interval),
+ " > new_send_time: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stats->min_interval),
GST_TIME_ARGS (sess->last_rtcp_send_time),
GST_TIME_ARGS (T_rr_current_interval),
- GST_TIME_ARGS (sess->next_rtcp_check_time));
+ GST_TIME_ARGS (new_send_time));
data->may_suppress = TRUE;
}
}
}
+ GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+
return TRUE;
}
}
static gboolean
-remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
+remove_closing_sources (const gchar * key, RTPSource * source,
+ ReportData * data)
+{
+ if (source->closing)
+ return TRUE;
+
+ if (source->send_fir)
+ data->have_fir = TRUE;
+ if (source->send_pli)
+ data->have_pli = TRUE;
+ if (source->send_nack)
+ data->have_nack = TRUE;
+
+ return FALSE;
+}
+
+static void
+generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
+{
+ RTPSession *sess = data->sess;
+ gboolean is_bye = FALSE;
+ ReportOutput *output;
+
+ /* only generate RTCP for active internal sources */
+ if (!source->internal || source->sent_bye)
+ return;
+
+ /* ignore other sources when we do the timeout after a scheduled BYE */
+ if (sess->scheduled_bye && !source->marked_bye)
+ return;
+
+ data->source = source;
+
+ /* open packet */
+ session_start_rtcp (sess, data);
+
+ if (source->marked_bye) {
+ /* send BYE */
+ make_source_bye (sess, source, data);
+ is_bye = TRUE;
+ } else if (!data->is_early) {
+ /* loop over all known sources and add report blocks. If we are early, we
+ * just make a minimal RTCP packet and skip this step */
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) session_report_blocks, data);
+ }
+ if (!data->has_sdes)
+ session_sdes (sess, data);
+
+ if (data->have_fir)
+ session_fir (sess, data);
+
+ if (data->have_pli)
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) session_pli, data);
+
+ if (data->have_nack)
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) session_nack, data);
+
+ gst_rtcp_buffer_unmap (&data->rtcpbuf);
+
+ output = g_slice_new (ReportOutput);
+ output->source = g_object_ref (source);
+ output->is_bye = is_bye;
+ output->buffer = data->rtcp;
+ /* queue the RTCP packet to push later */
+ g_queue_push_tail (&data->output, output);
+}
+
+static void
+update_generation (const gchar * key, RTPSource * source, ReportData * data)
{
- return source->closing;
+ RTPSession *sess = data->sess;
+
+ if (g_hash_table_size (source->reported_in_sr_of) >=
+ sess->stats.internal_sources) {
+ /* source is reported, move to next generation */
+ source->generation = sess->generation + 1;
+ g_hash_table_remove_all (source->reported_in_sr_of);
+
+ GST_LOG ("reported source %x, new generation: %d", source->ssrc,
+ source->generation);
+
+ /* if we reported all sources in this generation, move to next */
+ if (--data->num_to_report == 0) {
+ sess->generation++;
+ GST_DEBUG ("all reported, generation now %u", sess->generation);
+ }
+ }
}
/**
{
GstFlowReturn result = GST_FLOW_OK;
ReportData data = { GST_RTCP_BUFFER_INIT };
- RTPSource *own;
GHashTable *table_copy;
- gboolean notify = FALSE;
+ ReportOutput *output;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
data.sess = sess;
- data.rtcp = NULL;
data.current_time = current_time;
data.ntpnstime = ntpnstime;
- data.is_bye = FALSE;
- data.has_sdes = FALSE;
- data.may_suppress = FALSE;
data.running_time = running_time;
-
- own = sess->source;
+ data.num_to_report = 0;
+ data.may_suppress = FALSE;
+ data.nacked_seqnums = 0;
+ g_queue_init (&data.output);
RTP_SESSION_LOCK (sess);
/* get a new interval, we need this for various cleanups etc */
data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
+ GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval));
+
+ /* we need an internal source now */
+ if (sess->stats.internal_sources == 0) {
+ RTPSource *source;
+ gboolean created;
+
+ source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
+ current_time);
+ g_object_unref (source);
+ }
+
+ sess->conflicting_addresses =
+ timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
+
/* Make a local copy of the hashtable. We need to do this because the
* cleanup stage below releases the session lock. */
table_copy = g_hash_table_new_full (NULL, NULL, NULL,
/* Now remove the marked sources */
g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
- (GHRFunc) remove_closing_sources, NULL);
+ (GHRFunc) remove_closing_sources, &data);
- if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
- data.is_early = TRUE;
- else
- data.is_early = FALSE;
+ /* update point-to-point status */
+ session_update_ptp (sess);
/* see if we need to generate SR or RR packets */
- if (is_rtcp_time (sess, current_time, &data)) {
- if (own->marked_bye) {
- /* generate BYE instead */
- GST_DEBUG ("generating BYE message");
- session_bye (sess, &data);
- own->sent_bye = TRUE;
- } else {
- /* loop over all known sources and do something */
- g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
- (GHFunc) session_report_blocks, &data);
- }
- }
-
- if (data.rtcp) {
- /* we keep track of the last report time in order to timeout inactive
- * receivers or senders */
- if (!data.is_early && !data.may_suppress)
- sess->last_rtcp_send_time = data.current_time;
- sess->first_rtcp = FALSE;
- sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
-
- /* add SDES for this source when not already added */
- if (!data.has_sdes)
- session_sdes (sess, &data);
- }
+ if (!is_rtcp_time (sess, current_time, &data))
+ goto done;
- /* check for outdated collisions */
- GST_DEBUG ("Timing out collisions");
- rtp_source_timeout (sess->source, current_time,
- /* "a relatively long time" -- RFC 3550 section 8.2 */
- RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
- running_time - sess->rtcp_feedback_retention_window);
+ GST_DEBUG ("doing RTCP generation %u for %u sources, early %d",
+ sess->generation, data.num_to_report, data.is_early);
- if (sess->change_ssrc) {
- GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
- g_hash_table_steal (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (own->ssrc));
+ /* generate RTCP for all internal sources */
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) generate_rtcp, &data);
- own->ssrc = rtp_session_create_new_ssrc (sess);
- rtp_source_reset (own);
+ /* update the generation for all the sources that have been reported */
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) update_generation, &data);
- g_hash_table_insert (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (own->ssrc), own);
+ /* we keep track of the last report time in order to timeout inactive
+ * receivers or senders */
+ if (!data.is_early && !data.may_suppress)
+ sess->last_rtcp_send_time = data.current_time;
+ sess->first_rtcp = FALSE;
+ sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
+ sess->scheduled_bye = FALSE;
- sess->change_ssrc = FALSE;
- notify = TRUE;
- GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
+ /* RFC 4585 section 3.5.2 step 6 */
+ if (!data.is_early) {
+ sess->allow_early = TRUE;
}
- sess->allow_early = TRUE;
-
+done:
RTP_SESSION_UNLOCK (sess);
- if (notify)
- g_object_notify (G_OBJECT (sess), "internal-ssrc");
-
- /* push out the RTCP packet */
- if (data.rtcp) {
+ /* push out the RTCP packets */
+ while ((output = g_queue_pop_head (&data.output))) {
gboolean do_not_suppress;
-
- gst_rtcp_buffer_unmap (&data.rtcpbuf);
+ GstBuffer *buffer = output->buffer;
+ RTPSource *source = output->source;
/* Give the user a change to add its own packet */
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
- data.rtcp, data.is_early, &do_not_suppress);
+ buffer, data.is_early, &do_not_suppress);
if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
guint packet_size;
- packet_size = gst_buffer_get_size (data.rtcp) + sess->header_len;
+ packet_size = gst_buffer_get_size (buffer) + sess->header_len;
UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
sess->stats.avg_rtcp_packet_size, packet_size);
result =
- sess->callbacks.send_rtcp (sess, own, data.rtcp, own->sent_bye,
+ sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
sess->send_rtcp_user_data);
+ sess->stats.nacks_sent += data.nacked_seqnums;
} else {
GST_DEBUG ("freeing packet callback: %p"
" do_not_suppress: %d may_suppress: %d",
sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
- gst_buffer_unref (data.rtcp);
+ sess->stats.nacks_dropped += data.nacked_seqnums;
+ gst_buffer_unref (buffer);
}
+ g_object_unref (source);
+ g_slice_free (ReportOutput, output);
}
-
return result;
}
-void
+/**
+ * rtp_session_request_early_rtcp:
+ * @sess: an #RTPSession
+ * @current_time: the current system time
+ * @max_delay: maximum delay
+ *
+ * Request transmission of early RTCP
+ *
+ * Returns: %TRUE if the related RTCP can be scheduled.
+ */
+gboolean
rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
- GstClockTimeDiff max_delay)
+ GstClockTime max_delay)
{
- GstClockTime T_dither_max;
+ GstClockTime T_dither_max, T_rr;
+ gboolean ret;
/* Implements the algorithm described in RFC 4585 section 3.5.2 */
/* Check if already requested */
/* RFC 4585 section 3.5.2 step 2 */
- if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
- goto dont_send;
+ if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+ GST_LOG_OBJECT (sess, "already have next early rtcp time");
+ ret = TRUE;
+ goto end;
+ }
- if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time))
- goto dont_send;
+ if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
+ GST_LOG_OBJECT (sess, "no next RTCP check time");
+ ret = FALSE;
+ goto end;
+ }
+
+ /* RFC 4585 section 3.5.3 step 1
+ * If no regular RTCP packet has been sent before, then a regular
+ * RTCP packet has to be scheduled first and FB messages might be
+ * included there
+ */
+ if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
+ GST_LOG_OBJECT (sess, "no RTCP sent yet");
+
+ if (current_time + max_delay > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+ GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ } else {
+ GST_LOG_OBJECT (sess,
+ "can't allow early feedback, next scheduled time is too late %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = FALSE;
+ }
+ goto end;
+ }
- /* Ignore the request a scheduled packet will be in time anyway */
- if (current_time + max_delay > sess->next_rtcp_check_time)
- goto dont_send;
+ T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
/* RFC 4585 section 3.5.2 step 2b */
/* If the total sources is <=2, then there is only us and one peer */
- if (sess->total_sources <= 2) {
+ /* When there is one auxiliary stream the session can still do point
+ * to point.
+ */
+ if (sess->is_doing_ptp) {
T_dither_max = 0;
} else {
/* Divide by 2 because l = 0.5 */
- T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+ T_dither_max = T_rr;
T_dither_max /= 2;
}
/* RFC 4585 section 3.5.2 step 3 */
- if (current_time + T_dither_max > sess->next_rtcp_check_time)
- goto dont_send;
-
- /* RFC 4585 section 3.5.2 step 4
- * Don't send if allow_early is FALSE, but not if we are in
- * immediate mode, meaning we are part of a group of at most the
- * application-specific threshold.
- */
- if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
- sess->allow_early == FALSE)
- goto dont_send;
+ if (current_time + T_dither_max > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "don't send because of dither, next scheduled time is soon %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ goto end;
+ }
+
+ /* RFC 4585 section 3.5.2 step 4a */
+ if (sess->allow_early == FALSE) {
+ /* Ignore the request a scheduled packet will be in time anyway */
+ if (current_time + max_delay > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+ GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ } else {
+ GST_LOG_OBJECT (sess,
+ "can't allow early feedback, next scheduled time is too late %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = FALSE;
+ }
+ goto end;
+ }
+ /* RFC 4585 section 3.5.2 step 4b */
if (T_dither_max) {
/* Schedule an early transmission later */
sess->next_early_rtcp_time = g_random_double () * T_dither_max +
sess->next_early_rtcp_time = current_time;
}
+ /* RFC 4585 section 3.5.2 step 6 */
+ sess->allow_early = FALSE;
+ /* Delay next regular RTCP packet to not exceed the short-term
+ * RTCP bandwidth when using early feedback as compared to
+ * without */
+ sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
+ sess->last_rtcp_send_time += T_rr;
+
+ GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
+ ", next regular RTCP time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (sess->next_early_rtcp_time),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
RTP_SESSION_UNLOCK (sess);
/* notify app of need to send packet early
if (sess->callbacks.reconsider)
sess->callbacks.reconsider (sess, sess->reconsider_user_data);
- return;
+ return TRUE;
-dont_send:
+end:
RTP_SESSION_UNLOCK (sess);
+
+ return ret;
+}
+
+static gboolean
+rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
+{
+ GstClockTime now;
+
+ if (!sess->callbacks.send_rtcp)
+ return FALSE;
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+ return rtp_session_request_early_rtcp (sess, now, max_delay);
}
gboolean
-rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, GstClockTime now,
+rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc,
gboolean fir, gint count)
{
- RTPSource *src = find_source (sess, ssrc);
+ RTPSource *src;
- if (!src)
+ if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+ GST_DEBUG ("FIR/PLI not sent");
return FALSE;
+ }
+
+ RTP_SESSION_LOCK (sess);
+ src = find_source (sess, ssrc);
+ if (src == NULL)
+ goto no_source;
if (fir) {
src->send_pli = FALSE;
} else if (!src->send_fir) {
src->send_pli = TRUE;
}
-
- rtp_session_request_early_rtcp (sess, now, 200 * GST_MSECOND);
+ RTP_SESSION_UNLOCK (sess);
return TRUE;
-}
-
-static gboolean
-has_pli_compare_func (gconstpointer a, gconstpointer ignored)
-{
- GstRTCPPacket packet;
- GstRTCPBuffer rtcp = { NULL, };
- gboolean ret = FALSE;
-
- gst_rtcp_buffer_map ((GstBuffer *) a, GST_MAP_READ, &rtcp);
- if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
- if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
- gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
- ret = TRUE;
+ /* ERRORS */
+no_source:
+ {
+ RTP_SESSION_UNLOCK (sess);
+ return FALSE;
}
-
- gst_rtcp_buffer_unmap (&rtcp);
-
- return ret;
}
-static gboolean
-rtp_session_on_sending_rtcp (RTPSession * sess, GstBuffer * buffer,
- gboolean early)
+/**
+ * rtp_session_request_nack:
+ * @sess: a #RTPSession
+ * @ssrc: the SSRC
+ * @seqnum: the missing seqnum
+ * @max_delay: max delay to request NACK
+ *
+ * Request scheduling of a NACK feedback packet for @seqnum in @ssrc.
+ *
+ * Returns: %TRUE if the NACK feedback could be scheduled
+ */
+gboolean
+rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum,
+ GstClockTime max_delay)
{
- gboolean ret = FALSE;
- GHashTableIter iter;
- gpointer key, value;
- gboolean started_fir = FALSE;
- GstRTCPPacket fir_rtcppacket;
- GstRTCPBuffer rtcp = { NULL, };
-
- RTP_SESSION_LOCK (sess);
-
- gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp);
-
- g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
- while (g_hash_table_iter_next (&iter, &key, &value)) {
- guint media_ssrc = GPOINTER_TO_UINT (key);
- RTPSource *media_src = value;
- guint8 *fci_data;
-
- if (media_src->send_fir) {
- if (!started_fir) {
- if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_PSFB,
- &fir_rtcppacket))
- break;
- gst_rtcp_packet_fb_set_type (&fir_rtcppacket, GST_RTCP_PSFB_TYPE_FIR);
- gst_rtcp_packet_fb_set_sender_ssrc (&fir_rtcppacket,
- rtp_source_get_ssrc (sess->source));
- gst_rtcp_packet_fb_set_media_ssrc (&fir_rtcppacket, 0);
-
- if (!gst_rtcp_packet_fb_set_fci_length (&fir_rtcppacket, 2)) {
- gst_rtcp_packet_remove (&fir_rtcppacket);
- break;
- }
- ret = TRUE;
- started_fir = TRUE;
- } else {
- if (!gst_rtcp_packet_fb_set_fci_length (&fir_rtcppacket,
- !gst_rtcp_packet_fb_get_fci_length (&fir_rtcppacket) + 2))
- break;
- }
-
- fci_data = gst_rtcp_packet_fb_get_fci (&fir_rtcppacket) -
- ((gst_rtcp_packet_fb_get_fci_length (&fir_rtcppacket) - 2) * 4);
+ RTPSource *source;
- GST_WRITE_UINT32_BE (fci_data, media_ssrc);
- fci_data += 4;
- fci_data[0] = media_src->current_send_fir_seqnum;
- fci_data[1] = fci_data[2] = fci_data[3] = 0;
- media_src->send_fir = FALSE;
- }
+ if (!rtp_session_send_rtcp (sess, max_delay)) {
+ GST_DEBUG ("NACK not sent");
+ return FALSE;
}
- g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
- while (g_hash_table_iter_next (&iter, &key, &value)) {
- guint media_ssrc = GPOINTER_TO_UINT (key);
- RTPSource *media_src = value;
- GstRTCPPacket pli_rtcppacket;
-
- if (media_src->send_pli && !rtp_source_has_retained (media_src,
- has_pli_compare_func, NULL)) {
- if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_PSFB,
- &pli_rtcppacket))
- /* Break because the packet is full, will put next request in a
- * further packet */
- break;
- gst_rtcp_packet_fb_set_type (&pli_rtcppacket, GST_RTCP_PSFB_TYPE_PLI);
- gst_rtcp_packet_fb_set_sender_ssrc (&pli_rtcppacket,
- rtp_source_get_ssrc (sess->source));
- gst_rtcp_packet_fb_set_media_ssrc (&pli_rtcppacket, media_ssrc);
- ret = TRUE;
- }
- media_src->send_pli = FALSE;
- }
- gst_rtcp_buffer_unmap (&rtcp);
+ RTP_SESSION_LOCK (sess);
+ source = find_source (sess, ssrc);
+ if (source == NULL)
+ goto no_source;
+ GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
+ rtp_source_register_nack (source, seqnum);
RTP_SESSION_UNLOCK (sess);
- return ret;
-}
-
-static void
-rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
-{
- GstClockTime now;
-
- if (!sess->callbacks.send_rtcp)
- return;
-
- now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+ return TRUE;
- rtp_session_request_early_rtcp (sess, now, max_delay);
+ /* ERRORS */
+no_source:
+ {
+ RTP_SESSION_UNLOCK (sess);
+ return FALSE;
+ }
}