SIGNAL_ON_SENDING_RTCP,
SIGNAL_ON_FEEDBACK_RTCP,
SIGNAL_SEND_RTCP,
+ SIGNAL_SEND_RTCP_FULL,
+ SIGNAL_ON_RECEIVING_RTCP,
LAST_SIGNAL
};
#define DEFAULT_INTERNAL_SOURCE NULL
-#define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH
-#define DEFAULT_RTCP_FRACTION (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH)
+#define DEFAULT_BANDWIDTH 0.0
+#define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_FRACTION
#define DEFAULT_RTCP_RR_BANDWIDTH -1
#define DEFAULT_RTCP_RS_BANDWIDTH -1
#define DEFAULT_RTCP_MTU 1400
PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
PROP_PROBATION,
- PROP_STATS,
- PROP_LAST
+ PROP_STATS
};
/* update average packet size */
(avg) = ((val) + (15 * (avg))) >> 4;
-/* The number RTCP intervals after which to timeout entries in the
- * collision table
- */
-#define RTCP_INTERVAL_COLLISION_TIMEOUT 10
-
/* GObject vmethods */
static void rtp_session_finalize (GObject * object);
static void rtp_session_set_property (GObject * object, guint prop_id,
static void rtp_session_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static void rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp (RTPSession * sess,
+ GstClockTime max_delay);
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc,
gboolean * created, RTPPacketInfo * pinfo, gboolean rtp);
static RTPSource *obtain_internal_source (RTPSession * sess,
- guint32 ssrc, gboolean * created);
+ guint32 ssrc, gboolean * created, GstClockTime current_time);
static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess,
GstClockTime current_time);
static GstClockTime calculate_rtcp_interval (RTPSession * sess,
G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
+ /**
+ * RTPSession::send-rtcp-full:
+ * @session: the object which received the signal
+ * @max_delay: The maximum delay after which the feedback will not be useful
+ * anymore
+ *
+ * Requests that the #RTPSession initiate a new RTCP packet as soon as
+ * possible within the requested delay.
+ *
+ * Returns: TRUE if the new RTCP packet could be scheduled within the
+ * requested delay, FALSE otherwise.
+ *
+ * Since: 1.6
+ */
+ rtp_session_signals[SIGNAL_SEND_RTCP_FULL] =
+ g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
+
+ /**
+ * RTPSession::on-receiving-rtcp
+ * @session: the object which received the signal
+ * @buffer: the #GstBuffer containing the RTCP packet that was received
+ *
+ * This signal is emitted when receiving an RTCP packet before it is handled
+ * by the session. It can be used to extract custom information from RTCP packets.
+ *
+ * Since: 1.6
+ */
+ rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
+ g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
+ NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
+ GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
g_param_spec_uint ("internal-ssrc", "Internal SSRC",
"The internal SSRC used for the session (deprecated)",
g_param_spec_uint ("rtcp-immediate-feedback-threshold",
"RTCP Immediate Feedback threshold",
"The maximum number of members of a RTP session for which immediate"
- " feedback is used",
+ " feedback is used (DEPRECATED: has no effect and is not needed)",
0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
g_object_class_install_property (gobject_class, PROP_PROBATION,
g_param_spec_uint ("probation", "Number of probations",
sess->mask_idx = 0;
sess->mask = 0;
- for (i = 0; i < 32; i++) {
+ /* TODO: We currently only use the first hash table but this is the
+ * beginning of an implementation for RFC2762
+ for (i = 0; i < 32; i++) {
+ */
+ for (i = 0; i < 1; i++) {
sess->ssrcs[i] =
g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) g_object_unref);
sess->first_rtcp = TRUE;
sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
sess->allow_early = TRUE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
+
+ sess->is_doing_ptp = TRUE;
}
static void
gst_structure_free (sess->sdes);
- for (i = 0; i < 32; i++)
+ g_list_free_full (sess->conflicting_addresses,
+ (GDestroyNotify) rtp_conflicting_address_free);
+
+ /* TODO: Change this again when implementing RFC 2762
+ * for (i = 0; i < 32; i++)
+ */
+ for (i = 0; i < 1; i++)
g_hash_table_destroy (sess->ssrcs[i]);
g_mutex_clear (&sess->lock);
switch (prop_id) {
case PROP_INTERNAL_SSRC:
+ RTP_SESSION_LOCK (sess);
+ sess->suggested_ssrc = g_value_get_uint (value);
+ RTP_SESSION_UNLOCK (sess);
+ if (sess->callbacks.reconfigure)
+ sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
break;
case PROP_BANDWIDTH:
RTP_SESSION_LOCK (sess);
sess->callbacks.notify_nack = callbacks->notify_nack;
sess->notify_nack_user_data = user_data;
}
+ if (callbacks->reconfigure) {
+ sess->callbacks.reconfigure = callbacks->reconfigure;
+ sess->reconfigure_user_data = user_data;
+ }
}
/**
(RTPSourceClockRate) source_clock_rate,
};
+
+/**
+ * rtp_session_find_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to check for
+ * @time: The time when the packet that is possibly in conflict arrived
+ *
+ * Checks if an address which has a conflict is already known. If it is
+ * a known conflict, remember the time
+ *
+ * Returns: TRUE if it was a known conflict, FALSE otherwise
+ */
+static gboolean
+rtp_session_find_conflicting_address (RTPSession * session,
+ GSocketAddress * address, GstClockTime time)
+{
+ return find_conflicting_address (session->conflicting_addresses, address,
+ time);
+}
+
+/**
+ * rtp_session_add_conflicting_address:
+ * @session: The session the packet came in
+ * @address: address to remember
+ * @time: The time when the packet that is in conflict arrived
+ *
+ * Adds a new conflict address
+ */
+static void
+rtp_session_add_conflicting_address (RTPSession * sess,
+ GSocketAddress * address, GstClockTime time)
+{
+ sess->conflicting_addresses =
+ add_conflicting_address (sess->conflicting_addresses, address, time);
+}
+
+
static gboolean
check_collision (RTPSession * sess, RTPSource * source,
RTPPacketInfo * pinfo, gboolean rtp)
*/
} else {
/* This is sending with our ssrc, is it an address we already know */
- if (rtp_source_find_conflicting_address (source, pinfo->address,
+ if (rtp_session_find_conflicting_address (sess, pinfo->address,
pinfo->current_time)) {
/* Its a known conflict, its probably a loop, not a collision
* lets just drop the incoming packet
GST_DEBUG ("Our packets are being looped back to us, dropping");
} else {
/* Its a new collision, lets change our SSRC */
- rtp_source_add_conflicting_address (source, pinfo->address,
+ rtp_session_add_conflicting_address (sess, pinfo->address,
pinfo->current_time);
GST_DEBUG ("Collision for SSRC %x", ssrc);
return TRUE;
}
-static RTPSource *
-find_source (RTPSession * sess, guint32 ssrc)
+typedef struct
{
- return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
- GINT_TO_POINTER (ssrc));
+ gboolean is_doing_ptp;
+ GSocketAddress *new_addr;
+} CompareAddrData;
+
+/* check if the two given ip addr are the same (do not care about the port) */
+static gboolean
+ip_addr_equal (GSocketAddress * a, GSocketAddress * b)
+{
+ return
+ g_inet_address_equal (g_inet_socket_address_get_address
+ (G_INET_SOCKET_ADDRESS (a)),
+ g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b)));
+}
+
+static void
+compare_rtp_source_addr (const gchar * key, RTPSource * source,
+ CompareAddrData * data)
+{
+ /* only compare ip addr of remote sources which are also not closing */
+ if (!source->internal && !source->closing && source->rtp_from) {
+ /* look for the first rtp source */
+ if (!data->new_addr)
+ data->new_addr = source->rtp_from;
+ /* compare current ip addr with the first one */
+ else
+ data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from);
+ }
+}
+
+static void
+compare_rtcp_source_addr (const gchar * key, RTPSource * source,
+ CompareAddrData * data)
+{
+ /* only compare ip addr of remote sources which are also not closing */
+ if (!source->internal && !source->closing && source->rtcp_from) {
+ /* look for the first rtcp source */
+ if (!data->new_addr)
+ data->new_addr = source->rtcp_from;
+ else
+ /* compare current ip addr with the first one */
+ data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from);
+ }
+}
+
+/* loop over our non-internal source to know if the session
+ * is doing point-to-point */
+static void
+session_update_ptp (RTPSession * sess)
+{
+ /* to know if the session is doing point to point, the ip addr
+ * of each non-internal (=remotes) source have to be compared
+ * to each other.
+ */
+ gboolean is_doing_rtp_ptp;
+ gboolean is_doing_rtcp_ptp;
+ CompareAddrData data;
+
+ /* compare the first remote source's ip addr that receive rtp packets
+ * with other remote rtp source.
+ * it's enough because the session just needs to know if they are all
+ * equals or not
+ */
+ data.is_doing_ptp = TRUE;
+ data.new_addr = NULL;
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) compare_rtp_source_addr, (gpointer) & data);
+ is_doing_rtp_ptp = data.is_doing_ptp;
+
+ /* same but about rtcp */
+ data.is_doing_ptp = TRUE;
+ data.new_addr = NULL;
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) compare_rtcp_source_addr, (gpointer) & data);
+ is_doing_rtcp_ptp = data.is_doing_ptp;
+
+ /* the session is doing point-to-point if all rtp remote have the same
+ * ip addr and if all rtcp remote sources have the same ip addr */
+ sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp;
+
+ GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp);
}
static void
if (sess->suggested_ssrc != src->ssrc)
sess->suggested_ssrc = src->ssrc;
}
+
+ /* update point-to-point status */
+ if (!src->internal)
+ session_update_ptp (sess);
+}
+
+static RTPSource *
+find_source (RTPSession * sess, guint32 ssrc)
+{
+ return g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (ssrc));
}
/* must be called with the session lock, the returned source needs to be
/* must be called with the session lock, the returned source needs to be
* unreffed after usage. */
static RTPSource *
-obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
+obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created,
+ GstClockTime current_time)
{
RTPSource *source;
} else {
*created = FALSE;
}
+ /* update last activity */
+ if (current_time != GST_CLOCK_TIME_NONE) {
+ source->last_activity = current_time;
+ source->last_rtp_activity = current_time;
+ }
g_object_ref (source);
return source;
RTP_SESSION_LOCK (sess);
result = find_source (sess, ssrc);
- if (result)
+ if (result != NULL)
g_object_ref (result);
RTP_SESSION_UNLOCK (sess);
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto out;
+
/* don't try to do lip-sync for sources that sent a BYE */
if (RTP_SOURCE_IS_MARKED_BYE (source))
*do_sync = FALSE;
on_new_ssrc (sess, source);
rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
g_object_unref (source);
}
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto out;
+
if (created)
on_new_ssrc (sess, source);
rtp_session_process_rb (sess, source, packet, pinfo);
+
+out:
g_object_unref (source);
}
if (!source)
return;
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source))
+ goto next;
+
sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
more_entries = gst_rtcp_packet_sdes_first_entry (packet);
if (changed)
on_ssrc_sdes (sess, source);
+ next:
g_object_unref (source);
more_items = gst_rtcp_packet_sdes_next_item (packet);
"RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
fir ? "FIR" : "PLI",
GST_TIME_ARGS (current_time - sess->last_keyframe_request),
- GST_TIME_ARGS (round_trip_in_ns));;
+ GST_TIME_ARGS (round_trip_in_ns));
return FALSE;
}
}
return;
src = find_source (sess, sender_ssrc);
- if (!src)
+ if (src == NULL)
return;
rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
+
+ src->stats.recv_pli_count++;
}
static void
ssrc = GST_READ_UINT32_BE (data);
own = find_source (sess, ssrc);
+ if (own == NULL)
+ continue;
+
if (own->internal) {
our_request = TRUE;
break;
return;
rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
+ src->stats.recv_fir_count++;
}
static void
guint32 media_ssrc, guint8 * fci_data, guint fci_length,
GstClockTime current_time)
{
+ sess->stats.nacks_received++;
+
if (!sess->callbacks.notify_nack)
return;
seqnum = GST_READ_UINT16_BE (fci_data);
blp = GST_READ_UINT16_BE (fci_data + 2);
- GST_DEBUG ("NACK #%u, blp %04x", seqnum, blp);
+ GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc);
RTP_SESSION_UNLOCK (sess);
- sess->callbacks.notify_nack (sess, seqnum, blp,
+ sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc,
sess->notify_nack_user_data);
RTP_SESSION_LOCK (sess);
guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
RTPSource *src;
+ src = find_source (sess, media_ssrc);
+
+ /* skip non-bye packets for sources that are marked BYE */
+ if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
+ return;
+
GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
"length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
}
- sess->stats.nacks_received++;
-
RTP_SESSION_UNLOCK (sess);
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
gst_buffer_unref (fci_buffer);
}
- src = find_source (sess, media_ssrc);
- if (!src)
- return;
-
- if (sess->rtcp_feedback_retention_window) {
+ if (src && sess->rtcp_feedback_retention_window) {
rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
}
- if (src->internal ||
+ if ((src && src->internal) ||
/* PSFB FIR puts the media ssrc inside the FCI */
(type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
switch (type) {
GST_DEBUG ("received RTCP packet");
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0,
+ buffer);
+
RTP_SESSION_LOCK (sess);
/* update pinfo stats */
update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
type = gst_rtcp_packet_get_type (&packet);
- /* when we are leaving the session, we should ignore all non-BYE messages */
- if (sess->scheduled_bye && type != GST_RTCP_TYPE_BYE) {
- GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
- goto next;
- }
-
switch (type) {
case GST_RTCP_TYPE_SR:
rtp_session_process_sr (sess, &packet, &pinfo, &do_sync);
GST_WARNING ("got unknown RTCP packet");
break;
}
- next:
more = gst_rtcp_packet_move_to_next (&packet);
}
/* if we are scheduling a BYE, we only want to count bye packets, else we
* count everything */
- if (sess->scheduled_bye) {
- if (is_bye) {
- sess->stats.bye_members++;
- UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
- }
- } else {
- /* keep track of average packet size */
- UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
+ if (sess->scheduled_bye && is_bye) {
+ sess->bye_stats.bye_members++;
+ UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes);
}
+
+ /* keep track of average packet size */
+ UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes);
+
GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats,
sess->stats.avg_rtcp_packet_size, pinfo.bytes);
RTP_SESSION_UNLOCK (sess);
gboolean created;
RTP_SESSION_LOCK (sess);
- source = obtain_internal_source (sess, ssrc, &created);
+ source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
if (source) {
rtp_source_update_caps (source, caps);
g_object_unref (source);
}
+
+ if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) {
+ source =
+ obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
+ if (source) {
+ rtp_source_update_caps (source, caps);
+ g_object_unref (source);
+ }
+ }
RTP_SESSION_UNLOCK (sess);
}
}
current_time, running_time, -1))
goto invalid_packet;
- source = obtain_internal_source (sess, pinfo.ssrc, &created);
-
- /* update last activity */
- source->last_rtp_activity = current_time;
+ source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
prevsender = RTP_SOURCE_IS_SENDER (source);
oldrate = source->bitrate;
gboolean first)
{
GstClockTime result;
+ RTPSessionStats *stats;
/* recalculate bandwidth when it changed */
if (sess->recalc_bandwidth) {
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) add_bitrates, &bandwidth);
- bandwidth /= 8.0;
}
- if (bandwidth < 8000)
+ if (bandwidth < RTP_STATS_BANDWIDTH)
bandwidth = RTP_STATS_BANDWIDTH;
rtp_stats_set_bandwidths (&sess->stats, bandwidth,
}
if (sess->scheduled_bye) {
- result = rtp_stats_calculate_bye_interval (&sess->stats);
+ stats = &sess->bye_stats;
+ result = rtp_stats_calculate_bye_interval (stats);
} else {
- result = rtp_stats_calculate_rtcp_interval (&sess->stats,
- sess->stats.internal_sender_sources > 0, first);
+ stats = &sess->stats;
+ result = rtp_stats_calculate_rtcp_interval (stats,
+ stats->internal_sender_sources > 0, first);
}
GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
GST_TIME_ARGS (result), first);
if (!deterministic && result != GST_CLOCK_TIME_NONE)
- result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+ result = rtp_stats_add_rtcp_jitter (stats, result);
GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
/* we schedule BYE now */
sess->scheduled_bye = TRUE;
/* at least one member wants to send a BYE */
- INIT_AVG (sess->stats.avg_rtcp_packet_size, 100);
- sess->stats.bye_members = 1;
+ memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats));
+ INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
+ sess->bye_stats.bye_members = 1;
sess->first_rtcp = TRUE;
sess->allow_early = TRUE;
GstFlowReturn
rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time)
{
- GstFlowReturn result = GST_FLOW_OK;
+ GstFlowReturn result;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
}
if (sess->scheduled_bye) {
- if (sess->stats.active_sources >= 50) {
+ if (sess->bye_stats.active_sources >= 50) {
GST_DEBUG ("reconsider BYE, more than 50 sources");
/* reconsider BYE if members >= 50 */
interval = calculate_rtcp_interval (sess, FALSE, TRUE);
return;
}
- /* only report about other sender */
- if (source == data->source)
- goto reported;
+ if (g_hash_table_contains (source->reported_in_sr_of,
+ GUINT_TO_POINTER (data->source->ssrc))) {
+ GST_DEBUG ("source %08x already reported in this generation", source->ssrc);
+ return;
+ }
if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) {
GST_DEBUG ("max RB count reached");
return;
}
+ /* only report about other sender */
+ if (source == data->source)
+ goto reported;
+
if (!RTP_SOURCE_IS_SENDER (source)) {
GST_DEBUG ("source %08x not sender", source->ssrc);
goto reported;
exthighestseq, jitter, lsr, dlsr);
reported:
- /* source is reported, move to next generation */
- source->generation = sess->generation + 1;
-
- /* if we reported all sources in this generation, move to next */
- if (--data->num_to_report == 0) {
- sess->generation++;
- GST_DEBUG ("all reported, generation now %u", sess->generation);
- }
+ g_hash_table_add (source->reported_in_sr_of,
+ GUINT_TO_POINTER (data->source->ssrc));
}
/* construct FIR */
fci_data[1] = fci_data[2] = fci_data[3] = 0;
source->send_fir = FALSE;
+ source->stats.sent_fir_count++;
}
static void
source->send_pli = FALSE;
data->may_suppress = FALSE;
+
+ source->stats.sent_pli_count++;
}
/* construct NACK */
if (source->internal) {
GST_DEBUG ("Timing out collisions for %x", source->ssrc);
rtp_source_timeout (source, data->current_time,
- /* "a relatively long time" -- RFC 3550 section 8.2 */
- RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
data->running_time - sess->rtcp_feedback_retention_window);
}
GST_LOG ("timeout base interval %" GST_TIME_FORMAT,
GST_TIME_ARGS (binterval));
- if (!source->internal) {
- if (source->marked_bye) {
- /* if we received a BYE from the source, remove the source after some
- * time. */
- if (data->current_time > source->bye_time &&
- data->current_time - source->bye_time > sess->stats.bye_timeout) {
- GST_DEBUG ("removing BYE source %08x", source->ssrc);
- remove = TRUE;
- byetimeout = TRUE;
- }
+ if (!source->internal && source->marked_bye) {
+ /* if we received a BYE from the source, remove the source after some
+ * time. */
+ if (data->current_time > source->bye_time &&
+ data->current_time - source->bye_time > sess->stats.bye_timeout) {
+ GST_DEBUG ("removing BYE source %08x", source->ssrc);
+ remove = TRUE;
+ byetimeout = TRUE;
}
- /* sources that were inactive for more than 5 times the deterministic reporting
- * interval get timed out. the min timeout is 5 seconds. */
- /* mind old time that might pre-date last time going to PLAYING */
- btime = MAX (source->last_activity, sess->start_time);
- if (data->current_time > btime) {
- interval = MAX (binterval * 5, 5 * GST_SECOND);
- if (data->current_time - btime > interval) {
- GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
- source->ssrc, GST_TIME_ARGS (btime));
+ }
+
+ if (source->internal && source->sent_bye) {
+ GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc);
+ remove = TRUE;
+ }
+
+ /* sources that were inactive for more than 5 times the deterministic reporting
+ * interval get timed out. the min timeout is 5 seconds. */
+ /* mind old time that might pre-date last time going to PLAYING */
+ btime = MAX (source->last_activity, sess->start_time);
+ if (data->current_time > btime) {
+ interval = MAX (binterval * 5, 5 * GST_SECOND);
+ if (data->current_time - btime > interval) {
+ GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
+ source->ssrc, GST_TIME_ARGS (btime));
+ if (source->internal) {
+ /* this is an internal source that is not using our suggested ssrc.
+ * since there must be another source using this ssrc, we can remove
+ * this one instead of making it a receiver forever */
+ if (source->ssrc != sess->suggested_ssrc) {
+ rtp_source_mark_bye (source, "timed out");
+ /* do not schedule bye here, since we are inside the RTCP timeout
+ * processing and scheduling bye will interfere with SR/RR sending */
+ }
+ } else {
remove = TRUE;
}
}
if (data->current_time > btime) {
interval = MAX (binterval * 2, 5 * GST_SECOND);
if (data->current_time - btime > interval) {
- if (source->internal && source->sent_bye) {
- /* an internal source is BYE and stopped sending RTP, remove */
- GST_DEBUG ("internal BYE source %08x timed out, last %"
- GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
- remove = TRUE;
- } else {
- GST_DEBUG ("sender source %08x timed out and became receiver, last %"
- GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
- sendertimeout = TRUE;
- }
+ GST_DEBUG ("sender source %08x timed out and became receiver, last %"
+ GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime));
+ sendertimeout = TRUE;
}
}
}
static gboolean
is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
{
- GstClockTime new_send_time, elapsed;
+ GstClockTime new_send_time;
GstClockTime interval;
+ RTPSessionStats *stats;
+
+ if (sess->scheduled_bye)
+ stats = &sess->bye_stats;
+ else
+ stats = &sess->stats;
if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
data->is_early = TRUE;
}
early:
- /* get elapsed time since we last reported */
- elapsed = current_time - sess->last_rtcp_send_time;
/* take interval and add jitter */
interval = data->interval;
if (interval != GST_CLOCK_TIME_NONE)
- interval = rtp_stats_add_rtcp_jitter (&sess->stats, interval);
+ interval = rtp_stats_add_rtcp_jitter (stats, interval);
+
+ if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
+ /* perform forward reconsideration */
+ if (interval != GST_CLOCK_TIME_NONE) {
+ GstClockTime elapsed;
- /* perform forward reconsideration */
- if (interval != GST_CLOCK_TIME_NONE) {
- GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
- GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
- new_send_time = interval + sess->last_rtcp_send_time;
+ /* get elapsed time since we last reported */
+ elapsed = current_time - sess->last_rtcp_send_time;
+
+ GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
+ new_send_time = interval + sess->last_rtcp_send_time;
+ } else {
+ new_send_time = sess->last_rtcp_send_time;
+ }
} else {
- new_send_time = sess->last_rtcp_send_time;
+ /* If this is the first RTCP packet, we can reconsider anything based
+ * on the last RTCP send time because there was none.
+ */
+ g_warn_if_fail (!data->is_early);
+ data->is_early = FALSE;
+ new_send_time = current_time;
}
if (!data->is_early) {
sess->next_rtcp_check_time = current_time + interval;
} else if (interval != GST_CLOCK_TIME_NONE) {
/* Apply the rules from RFC 4585 section 3.5.3 */
- if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
+ if (stats->min_interval != 0 && !sess->first_rtcp) {
GstClockTime T_rr_current_interval =
- g_random_double_range (0.5, 1.5) * sess->stats.min_interval;
+ g_random_double_range (0.5, 1.5) * stats->min_interval;
/* This will caused the RTCP to be suppressed if no FB packets are added */
if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
" last: %" GST_TIME_FORMAT
" + T_rr_current_interval: %" GST_TIME_FORMAT
" > new_send_time: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (sess->stats.min_interval),
+ GST_TIME_ARGS (stats->min_interval),
GST_TIME_ARGS (sess->last_rtcp_send_time),
GST_TIME_ARGS (T_rr_current_interval),
GST_TIME_ARGS (new_send_time));
if (!source->internal || source->sent_bye)
return;
+ /* ignore other sources when we do the timeout after a scheduled BYE */
+ if (sess->scheduled_bye && !source->marked_bye)
+ return;
+
data->source = source;
/* open packet */
g_queue_push_tail (&data->output, output);
}
+static void
+update_generation (const gchar * key, RTPSource * source, ReportData * data)
+{
+ RTPSession *sess = data->sess;
+
+ if (g_hash_table_size (source->reported_in_sr_of) >=
+ sess->stats.internal_sources) {
+ /* source is reported, move to next generation */
+ source->generation = sess->generation + 1;
+ g_hash_table_remove_all (source->reported_in_sr_of);
+
+ GST_LOG ("reported source %x, new generation: %d", source->ssrc,
+ source->generation);
+
+ /* if we reported all sources in this generation, move to next */
+ if (--data->num_to_report == 0) {
+ sess->generation++;
+ GST_DEBUG ("all reported, generation now %u", sess->generation);
+ }
+ }
+}
+
/**
* rtp_session_on_timeout:
* @sess: an #RTPSession
RTPSource *source;
gboolean created;
- source = obtain_internal_source (sess, sess->suggested_ssrc, &created);
+ source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
+ current_time);
g_object_unref (source);
}
+ sess->conflicting_addresses =
+ timeout_conflicting_addresses (sess->conflicting_addresses, current_time);
+
/* Make a local copy of the hashtable. We need to do this because the
* cleanup stage below releases the session lock. */
table_copy = g_hash_table_new_full (NULL, NULL, NULL,
g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
(GHRFunc) remove_closing_sources, &data);
+ /* update point-to-point status */
+ session_update_ptp (sess);
+
/* see if we need to generate SR or RR packets */
if (!is_rtcp_time (sess, current_time, &data))
goto done;
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) generate_rtcp, &data);
+ /* update the generation for all the sources that have been reported */
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) update_generation, &data);
+
/* we keep track of the last report time in order to timeout inactive
* receivers or senders */
if (!data.is_early && !data.may_suppress)
sess->last_rtcp_send_time = data.current_time;
sess->first_rtcp = FALSE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
+ sess->scheduled_bye = FALSE;
+
+ /* RFC 4585 section 3.5.2 step 6 */
+ if (!data.is_early) {
+ sess->allow_early = TRUE;
+ }
done:
RTP_SESSION_UNLOCK (sess);
* @max_delay: maximum delay
*
* Request transmission of early RTCP
+ *
+ * Returns: %TRUE if the related RTCP can be scheduled.
*/
-void
+gboolean
rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
GstClockTime max_delay)
{
- GstClockTime T_dither_max;
+ GstClockTime T_dither_max, T_rr;
+ gboolean ret;
/* Implements the algorithm described in RFC 4585 section 3.5.2 */
/* RFC 4585 section 3.5.2 step 2 */
if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
GST_LOG_OBJECT (sess, "already have next early rtcp time");
- goto dont_send;
+ ret = TRUE;
+ goto end;
}
if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) {
GST_LOG_OBJECT (sess, "no next RTCP check time");
- goto dont_send;
+ ret = FALSE;
+ goto end;
}
- /* Ignore the request a scheduled packet will be in time anyway */
- if (current_time + max_delay > sess->next_rtcp_check_time) {
- GST_LOG_OBJECT (sess, "next scheduled time is soon %" GST_TIME_FORMAT " + %"
- GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
- GST_TIME_ARGS (current_time),
- GST_TIME_ARGS (max_delay), GST_TIME_ARGS (sess->next_rtcp_check_time));
- goto dont_send;
+ /* RFC 4585 section 3.5.3 step 1
+ * If no regular RTCP packet has been sent before, then a regular
+ * RTCP packet has to be scheduled first and FB messages might be
+ * included there
+ */
+ if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) {
+ GST_LOG_OBJECT (sess, "no RTCP sent yet");
+
+ if (current_time + max_delay > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+ GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ } else {
+ GST_LOG_OBJECT (sess,
+ "can't allow early feedback, next scheduled time is too late %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = FALSE;
+ }
+ goto end;
}
+ T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+
/* RFC 4585 section 3.5.2 step 2b */
/* If the total sources is <=2, then there is only us and one peer */
- if (sess->total_sources <= 2) {
+ /* When there is one auxiliary stream the session can still do point
+ * to point.
+ */
+ if (sess->is_doing_ptp) {
T_dither_max = 0;
} else {
/* Divide by 2 because l = 0.5 */
- T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+ T_dither_max = T_rr;
T_dither_max /= 2;
}
/* RFC 4585 section 3.5.2 step 3 */
if (current_time + T_dither_max > sess->next_rtcp_check_time) {
- GST_LOG_OBJECT (sess, "don't send because of dither");
- goto dont_send;
- }
-
- /* RFC 4585 section 3.5.2 step 4
- * Don't send if allow_early is FALSE, but not if we are in
- * immediate mode, meaning we are part of a group of at most the
- * application-specific threshold.
- */
- if (sess->total_sources > sess->rtcp_immediate_feedback_threshold &&
- sess->allow_early == FALSE) {
- GST_LOG_OBJECT (sess, "can't allow early feedback");
- goto dont_send;
+ GST_LOG_OBJECT (sess,
+ "don't send because of dither, next scheduled time is soon %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ goto end;
+ }
+
+ /* RFC 4585 section 3.5.2 step 4a */
+ if (sess->allow_early == FALSE) {
+ /* Ignore the request a scheduled packet will be in time anyway */
+ if (current_time + max_delay > sess->next_rtcp_check_time) {
+ GST_LOG_OBJECT (sess,
+ "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time),
+ GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = TRUE;
+ } else {
+ GST_LOG_OBJECT (sess,
+ "can't allow early feedback, next scheduled time is too late %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ ret = FALSE;
+ }
+ goto end;
}
+ /* RFC 4585 section 3.5.2 step 4b */
if (T_dither_max) {
/* Schedule an early transmission later */
sess->next_early_rtcp_time = g_random_double () * T_dither_max +
sess->next_early_rtcp_time = current_time;
}
- GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (sess->next_early_rtcp_time));
+ /* RFC 4585 section 3.5.2 step 6 */
+ sess->allow_early = FALSE;
+ /* Delay next regular RTCP packet to not exceed the short-term
+ * RTCP bandwidth when using early feedback as compared to
+ * without */
+ sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
+ sess->last_rtcp_send_time += T_rr;
+
+ GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
+ ", next regular RTCP time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (sess->next_early_rtcp_time),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
RTP_SESSION_UNLOCK (sess);
/* notify app of need to send packet early
if (sess->callbacks.reconsider)
sess->callbacks.reconsider (sess, sess->reconsider_user_data);
- return;
+ return TRUE;
-dont_send:
+end:
RTP_SESSION_UNLOCK (sess);
+
+ return ret;
}
-static void
+static gboolean
rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
{
GstClockTime now;
if (!sess->callbacks.send_rtcp)
- return;
+ return FALSE;
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
- rtp_session_request_early_rtcp (sess, now, max_delay);
+ return rtp_session_request_early_rtcp (sess, now, max_delay);
}
gboolean
{
RTPSource *src;
+ if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+ GST_DEBUG ("FIR/PLI not sent");
+ return FALSE;
+ }
+
RTP_SESSION_LOCK (sess);
src = find_source (sess, ssrc);
- if (!src)
+ if (src == NULL)
goto no_source;
if (fir) {
}
RTP_SESSION_UNLOCK (sess);
- rtp_session_send_rtcp (sess, 200 * GST_MSECOND);
-
return TRUE;
/* ERRORS */
{
RTPSource *source;
+ if (!rtp_session_send_rtcp (sess, max_delay)) {
+ GST_DEBUG ("NACK not sent");
+ return FALSE;
+ }
+
RTP_SESSION_LOCK (sess);
source = find_source (sess, ssrc);
if (source == NULL)
rtp_source_register_nack (source, seqnum);
RTP_SESSION_UNLOCK (sess);
- rtp_session_send_rtcp (sess, max_delay);
-
return TRUE;
/* ERRORS */