SIGNAL_ON_BYE_TIMEOUT,
SIGNAL_ON_TIMEOUT,
SIGNAL_ON_SENDER_TIMEOUT,
+ SIGNAL_ON_SENDING_RTCP,
+ SIGNAL_ON_FEEDBACK_RTCP,
+ SIGNAL_SEND_RTCP,
LAST_SIGNAL
};
#define DEFAULT_NUM_SOURCES 0
#define DEFAULT_NUM_ACTIVE_SOURCES 0
#define DEFAULT_SOURCES NULL
+#define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND)
+#define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
enum
{
PROP_NUM_ACTIVE_SOURCES,
PROP_SOURCES,
PROP_FAVOR_NEW,
+ PROP_RTCP_MIN_INTERVAL,
+ PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
PROP_LAST
};
static void rtp_session_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
+static gboolean rtp_session_on_sending_rtcp (RTPSession * sess,
+ GstBuffer * buffer, gboolean early);
+static void rtp_session_send_rtcp (RTPSession * sess,
+ GstClockTimeDiff max_delay);
+
+
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT);
static GstClockTime calculate_rtcp_interval (RTPSession * sess,
gboolean deterministic, gboolean first);
+static gboolean
+accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu,
+ const GValue * handler_return, gpointer data)
+{
+ if (g_value_get_boolean (handler_return))
+ g_value_set_boolean (return_accu, TRUE);
+
+ return TRUE;
+}
+
+static void
+gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN (GClosure * closure,
+ GValue * return_value G_GNUC_UNUSED, guint n_param_values,
+ const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
+ gpointer marshal_data)
+{
+ typedef gboolean (*GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (gpointer data1,
+ gpointer arg_1, gboolean arg_2, gpointer data2);
+ register GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN callback;
+ register GCClosure *cc = (GCClosure *) closure;
+ register gpointer data1, data2;
+ gboolean v_return;
+
+ g_return_if_fail (return_value != NULL);
+ g_return_if_fail (n_param_values == 3);
+
+ if (G_CCLOSURE_SWAP_DATA (closure)) {
+ data1 = closure->data;
+ data2 = g_value_peek_pointer (param_values + 0);
+ } else {
+ data1 = g_value_peek_pointer (param_values + 0);
+ data2 = closure->data;
+ }
+ callback =
+ (GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (marshal_data ? marshal_data :
+ cc->callback);
+
+ v_return = callback (data1,
+ gst_value_get_mini_object (param_values + 1),
+ g_value_get_boolean (param_values + 2), data2);
+
+ g_value_set_boolean (return_value, v_return);
+}
+
+static void
+gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT (GClosure * closure,
+ GValue * return_value G_GNUC_UNUSED, guint n_param_values,
+ const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED,
+ gpointer marshal_data)
+{
+ typedef void (*GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (gpointer
+ data1, guint arg_1, guint arg_2, guint arg_3, guint arg_4, gpointer arg_5,
+ gpointer data2);
+ register GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT callback;
+ register GCClosure *cc = (GCClosure *) closure;
+ register gpointer data1, data2;
+
+ g_return_if_fail (n_param_values == 6);
+
+ if (G_CCLOSURE_SWAP_DATA (closure)) {
+ data1 = closure->data;
+ data2 = g_value_peek_pointer (param_values + 0);
+ } else {
+ data1 = g_value_peek_pointer (param_values + 0);
+ data2 = closure->data;
+ }
+ callback =
+ (GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (marshal_data ?
+ marshal_data : cc->callback);
+
+ callback (data1,
+ g_value_get_uint (param_values + 1),
+ g_value_get_uint (param_values + 2),
+ g_value_get_uint (param_values + 3),
+ g_value_get_uint (param_values + 4),
+ gst_value_get_mini_object (param_values + 5), data2);
+}
+
+
static void
rtp_session_class_init (RTPSessionClass * klass)
{
NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
RTP_TYPE_SOURCE);
+ /**
+ * RTPSession::on-sending-rtcp
+ * @session: the object which received the signal
+ * @buffer: the #GstBuffer containing the RTCP packet about to be sent
+ * @early: %TRUE if the packet is early, %FALSE if it is regular
+ *
+ * This signal is emitted before sending an RTCP packet, it can be used
+ * to add extra RTCP Packets.
+ *
+ * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE
+ * if suppressing it is acceptable
+ */
+ rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
+ g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
+ accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN,
+ G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER, G_TYPE_BOOLEAN);
+
+ /**
+ * RTPSession::on-feedback-rtcp:
+ * @session: the object which received the signal
+ * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
+ * %GST_RTCP_TYPE_RTPFB
+ * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType
+ * @sender_ssrc: The SSRC of the sender
+ * @media_ssrc: The SSRC of the media this refers to
+ * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if
+ * there was no FCI
+ *
+ * Notify that a RTCP feedback packet has been received
+ */
+
+ rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
+ g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
+ NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT,
+ G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
+ GST_TYPE_BUFFER);
+
+ /**
+ * RTPSession::send-rtcp:
+ * @session: the object which received the signal
+ * @max_delay: The maximum delay after which the feedback will not be useful
+ * anymore
+ *
+ * Requests that the #RTPSession initiate a new RTCP packet as soon as
+ * possible within the requested delay.
+ */
+
+ rtp_session_signals[SIGNAL_SEND_RTCP] =
+ g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
+ gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
+
g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
g_param_spec_uint ("internal-ssrc", "Internal SSRC",
"The internal SSRC used for the session",
"Resolve SSRC conflict in favor of new sources", FALSE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL,
+ g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval",
+ "Minimum interval between Regular RTCP packet (in ns)",
+ 0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
+ g_param_spec_uint64 ("rtcp-feedback-retention-window",
+ "RTCP Feedback retention window",
+ "Duration during which RTCP Feedback packets are retained (in ns)",
+ 0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
klass->get_source_by_ssrc =
GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
+ klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
+ klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
}
rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer");
sess->first_rtcp = TRUE;
+ sess->allow_early = TRUE;
+ sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
+
+ sess->rtcp_pli_requests = g_array_new (FALSE, FALSE, sizeof (guint32));
GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
}
g_hash_table_destroy (sess->cnames);
g_object_unref (sess->source);
+ g_array_free (sess->rtcp_pli_requests, TRUE);
+
G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
}
case PROP_FAVOR_NEW:
sess->favor_new = g_value_get_boolean (value);
break;
+ case PROP_RTCP_MIN_INTERVAL:
+ rtp_stats_set_min_interval (&sess->stats,
+ (gdouble) g_value_get_uint64 (value) / GST_SECOND);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_FAVOR_NEW:
g_value_set_boolean (value, sess->favor_new);
break;
+ case PROP_RTCP_MIN_INTERVAL:
+ g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
sess->callbacks.reconsider = callbacks->reconsider;
sess->reconsider_user_data = user_data;
}
+ if (callbacks->request_key_unit) {
+ sess->callbacks.request_key_unit = callbacks->request_key_unit;
+ sess->request_key_unit_user_data = user_data;
+ }
+ if (callbacks->request_time) {
+ sess->callbacks.request_time = callbacks->request_time;
+ sess->request_time_user_data = user_data;
+ }
}
/**
}
/**
+ * rtp_session_set_request_time_callback:
+ * @sess: an #RTPSession
+ * @callback: callback to set
+ * @user_data: user data passed in the callback
+ *
+ * Configure only the request_time callback
+ */
+void
+rtp_session_set_request_time_callback (RTPSession * sess,
+ RTPSessionRequestTime callback, gpointer user_data)
+{
+ g_return_if_fail (RTP_IS_SESSION (sess));
+
+ sess->callbacks.request_time = callback;
+ sess->request_time_user_data = user_data;
+}
+
+/**
* rtp_session_set_bandwidth:
* @sess: an #RTPSession
* @bandwidth: the bandwidth allocated
static void
update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
gboolean rtp, GstBuffer * buffer, GstClockTime current_time,
- GstClockTime running_time)
+ GstClockTime running_time, guint64 ntpnstime)
{
+ GstMetaNetAddress *meta;
+
/* get time of arrival */
arrival->current_time = current_time;
arrival->running_time = running_time;
+ arrival->ntpnstime = ntpnstime;
/* get packet size including header overhead */
arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
}
/* for netbuffer we can store the IP address to check for collisions */
- arrival->have_address = GST_IS_NETBUFFER (buffer);
- if (arrival->have_address) {
- GstNetBuffer *netbuf = (GstNetBuffer *) buffer;
-
- memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress));
+ meta = gst_buffer_get_meta_net_address (buffer);
+ if (meta) {
+ arrival->have_address = TRUE;
+ memcpy (&arrival->address, &meta->naddr, sizeof (GstNetAddress));
+ } else {
+ arrival->have_address = FALSE;
}
}
RTP_SESSION_LOCK (sess);
/* update arrival stats */
update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
- running_time);
+ running_time, -1);
/* ignore more RTP packets when we left the session */
if (sess->source->received_bye)
/* only deal with report blocks for our session, we update the stats of
* the sender of the RTCP message. We could also compare our stats against
* the other sender to see if we are better or worse. */
- rtp_source_process_rb (source, arrival->current_time, fractionlost,
+ rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
packetslost, exthighestseq, jitter, lsr, dlsr);
-
- on_ssrc_active (sess, source);
}
}
+ on_ssrc_active (sess, source);
}
/* A Sender report contains statistics about how the sender is doing. This
i = 0;
while (more_items) {
guint32 ssrc;
- gboolean changed, created;
+ gboolean changed, created, validated;
RTPSource *source;
GstStructure *sdes;
/* takes ownership of sdes */
changed = rtp_source_set_sdes_struct (source, sdes);
+ validated = !RTP_SOURCE_IS_ACTIVE (source);
source->validated = TRUE;
+ /* source became active */
+ if (validated) {
+ sess->stats.active_sources++;
+ GST_DEBUG ("source: %08x became active, %d active sources", ssrc,
+ sess->stats.active_sources);
+ on_ssrc_validated (sess, source);
+ }
+
if (created)
on_new_ssrc (sess, source);
if (changed)
ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
GST_DEBUG ("SSRC: %08x", ssrc);
+ if (ssrc == sess->source->ssrc)
+ return;
+
/* find src and mark bye, no probation when dealing with RTCP */
source = obtain_source (sess, ssrc, &created, arrival, FALSE);
if (!source)
GST_DEBUG ("received APP");
}
+static void
+rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc,
+ guint32 media_ssrc, GstClockTime current_time)
+{
+ RTPSource *src;
+ guint32 round_trip = 0;
+
+ if (!sess->callbacks.request_key_unit)
+ return;
+
+ src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (sender_ssrc));
+
+ if (!src)
+ return;
+
+ if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
+ rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL,
+ &round_trip)) {
+ GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
+ GST_SECOND, 65536);
+
+ if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE &&
+ current_time - sess->last_keyframe_request < round_trip_in_ns) {
+ GST_DEBUG ("Ignoring PLI because one was send without one RTT (%"
+ GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
+ GST_TIME_ARGS (current_time - sess->last_keyframe_request),
+ GST_TIME_ARGS (round_trip_in_ns));;
+ return;
+ }
+ }
+
+ sess->last_keyframe_request = current_time;
+
+ GST_LOG ("received PLI from %X %p(%p)", sender_ssrc,
+ sess->callbacks.process_rtp, sess->callbacks.request_key_unit);
+
+ sess->callbacks.request_key_unit (sess, FALSE,
+ sess->request_key_unit_user_data);
+}
+
+static void
+rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
+ RTPArrivalStats * arrival, GstClockTime current_time)
+{
+ GstRTCPType type = gst_rtcp_packet_get_type (packet);
+ GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
+ guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
+ guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
+ guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
+ guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
+
+ GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
+ "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
+
+ if (g_signal_has_handler_pending (sess,
+ rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) {
+ GstBuffer *fci_buffer = NULL;
+
+ if (fci_length > 0) {
+ fci_buffer = gst_buffer_create_sub (packet->buffer,
+ fci_data - GST_BUFFER_DATA (packet->buffer), fci_length);
+ GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time;
+ }
+
+ RTP_SESSION_UNLOCK (sess);
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0,
+ type, fbtype, sender_ssrc, media_ssrc, fci_buffer);
+ RTP_SESSION_LOCK (sess);
+
+ if (fci_buffer)
+ gst_buffer_unref (fci_buffer);
+ }
+
+ if (sess->rtcp_feedback_retention_window) {
+ RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+ GINT_TO_POINTER (media_ssrc));
+
+ if (src)
+ rtp_source_retain_rtcp_packet (src, packet, arrival->running_time);
+ }
+
+ if (rtp_source_get_ssrc (sess->source) == media_ssrc) {
+ switch (type) {
+ case GST_RTCP_TYPE_PSFB:
+ switch (fbtype) {
+ case GST_RTCP_PSFB_TYPE_PLI:
+ rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
+ current_time);
+ break;
+ default:
+ break;
+ }
+ break;
+ case GST_RTCP_TYPE_RTPFB:
+ default:
+ break;
+ }
+ }
+}
+
/**
* rtp_session_process_rtcp:
* @sess: and #RTPSession
* @buffer: an RTCP buffer
* @current_time: the current system time
+ * @ntpnstime: the current NTP time in nanoseconds
*
* Process an RTCP buffer in the session manager. This function takes ownership
* of @buffer.
*/
GstFlowReturn
rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
- GstClockTime current_time)
+ GstClockTime current_time, guint64 ntpnstime)
{
GstRTCPPacket packet;
gboolean more, is_bye = FALSE, do_sync = FALSE;
RTP_SESSION_LOCK (sess);
/* update arrival stats */
- update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1);
+ update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
+ ntpnstime);
if (sess->sent_bye)
goto ignore;
- /* make writable, we might want to change the buffer */
- buffer = gst_buffer_make_metadata_writable (buffer);
-
/* start processing the compound packet */
more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
while (more) {
case GST_RTCP_TYPE_APP:
rtp_session_process_app (sess, &packet, &arrival);
break;
+ case GST_RTCP_TYPE_RTPFB:
+ case GST_RTCP_TYPE_PSFB:
+ rtp_session_process_feedback (sess, &packet, &arrival, current_time);
+ break;
default:
GST_WARNING ("got unknown RTCP packet");
break;
RTP_SESSION_UNLOCK (sess);
/* notify caller of sr packets in the callback */
- if (do_sync && sess->callbacks.sync_rtcp)
+ if (do_sync && sess->callbacks.sync_rtcp) {
+ /* make writable, we might want to change the buffer */
+ buffer = gst_buffer_make_metadata_writable (buffer);
+
result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
sess->sync_rtcp_user_data);
- else
+ } else
gst_buffer_unref (buffer);
return result;
sess->stats.bye_members = 1;
sess->first_rtcp = TRUE;
sess->sent_bye = FALSE;
+ sess->allow_early = TRUE;
/* reschedule transmission */
sess->last_rtcp_send_time = current_time;
RTP_SESSION_LOCK (sess);
+ if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
+ result = sess->next_early_rtcp_time;
+ goto early_exit;
+ }
+
result = sess->next_rtcp_check_time;
GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT,
sess->next_rtcp_check_time = result;
- GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
+early_exit:
+
+ GST_DEBUG ("current time: %" GST_TIME_FORMAT
+ ", next time: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (result));
RTP_SESSION_UNLOCK (sess);
return result;
GstRTCPPacket packet;
gboolean is_bye;
gboolean has_sdes;
+ gboolean is_early;
+ gboolean may_suppress;
} ReportData;
static void
/* create a new buffer if needed */
if (data->rtcp == NULL) {
session_start_rtcp (sess, data);
+ } else if (data->is_early) {
+ /* Put a single RR or SR in minimal compound packets */
+ return;
}
if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
/* only report about other sender sources */
rtp_source_get_new_rb (source, data->current_time, &fractionlost,
&packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+ /* store last generated RR packet */
+ source->last_rr.is_valid = TRUE;
+ source->last_rr.fractionlost = fractionlost;
+ source->last_rr.packetslost = packetslost;
+ source->last_rr.exthighestseq = exthighestseq;
+ source->last_rr.jitter = jitter;
+ source->last_rr.lsr = lsr;
+ source->last_rr.dlsr = dlsr;
+
/* packet is not yet filled, add report block for this source. */
gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
exthighestseq, jitter, lsr, dlsr);
continue;
type = gst_rtcp_sdes_name_to_type (field);
+ /* Early packets are minimal and only include the CNAME */
+ if (data->is_early && type != GST_RTCP_SDES_CNAME)
+ continue;
+
if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) {
gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value),
(const guint8 *) value);
is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data)
{
GstClockTime new_send_time, elapsed;
- gboolean result;
+
+ if (data->is_early && sess->next_early_rtcp_time < current_time)
+ goto early;
/* no need to check yet */
if (sess->next_rtcp_check_time > current_time) {
if (current_time < new_send_time) {
GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_send_time));
- result = FALSE;
/* store new check time */
sess->next_rtcp_check_time = new_send_time;
- } else {
- result = TRUE;
- new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
+ return FALSE;
+ }
- GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
- GST_TIME_ARGS (new_send_time));
- sess->next_rtcp_check_time = current_time + new_send_time;
+early:
+
+ new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
+
+ GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+ sess->next_rtcp_check_time = current_time + new_send_time;
+
+ /* Apply the rules from RFC 4585 section 3.5.3 */
+ if (sess->stats.min_interval != 0 && !sess->first_rtcp) {
+ GstClockTimeDiff T_rr_current_interval = g_random_double_range (0.5, 1.5) *
+ sess->stats.min_interval;
+
+ /* This will caused the RTCP to be suppressed if no FB packets are added */
+ if (sess->last_rtcp_send_time + T_rr_current_interval >
+ sess->next_rtcp_check_time) {
+ GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
+ " last: %" GST_TIME_FORMAT
+ " + T_rr_current_interval: %" GST_TIME_FORMAT
+ " > sess->next_rtcp_check_time: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (sess->stats.min_interval),
+ GST_TIME_ARGS (sess->last_rtcp_send_time),
+ GST_TIME_ARGS (T_rr_current_interval),
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+ data->may_suppress = TRUE;
+ }
}
- return result;
+
+ return TRUE;
}
static void
data.ntpnstime = ntpnstime;
data.is_bye = FALSE;
data.has_sdes = FALSE;
+ data.may_suppress = FALSE;
data.running_time = running_time;
own = sess->source;
g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
(GHRFunc) remove_closing_sources, NULL);
+ if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
+ data.is_early = TRUE;
+ else
+ data.is_early = FALSE;
+
/* see if we need to generate SR or RR packets */
if (is_rtcp_time (sess, current_time, &data)) {
if (own->received_bye) {
if (data.rtcp) {
/* we keep track of the last report time in order to timeout inactive
* receivers or senders */
- sess->last_rtcp_send_time = data.current_time;
+ if (!data.is_early && !data.may_suppress)
+ sess->last_rtcp_send_time = data.current_time;
sess->first_rtcp = FALSE;
+ sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
/* add SDES for this source when not already added */
if (!data.has_sdes)
/* check for outdated collisions */
GST_DEBUG ("Timing out collisions");
rtp_source_timeout (sess->source, current_time,
- data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT);
+ data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT,
+ running_time - sess->rtcp_feedback_retention_window);
if (sess->change_ssrc) {
GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
notify = TRUE;
GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
}
+
+ sess->allow_early = TRUE;
+
RTP_SESSION_UNLOCK (sess);
if (notify)
/* push out the RTCP packet */
if (data.rtcp) {
- /* close the RTCP packet */
- gst_rtcp_buffer_end (data.rtcp);
+ gboolean do_not_suppress;
- if (sess->callbacks.send_rtcp) {
+ /* Give the user a change to add its own packet */
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
+ data.rtcp, data.is_early, &do_not_suppress);
+
+ if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
guint packet_size;
+ /* close the RTCP packet */
+ gst_rtcp_buffer_end (data.rtcp);
+
packet_size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
sess->callbacks.send_rtcp (sess, own, data.rtcp, sess->sent_bye,
sess->send_rtcp_user_data);
} else {
- GST_DEBUG ("freeing packet");
+ GST_DEBUG ("freeing packet callback: %p"
+ " do_not_suppress: %d may_suppress: %d",
+ sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
gst_buffer_unref (data.rtcp);
}
}
return result;
}
+
+void
+rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
+ GstClockTimeDiff max_delay)
+{
+ GstClockTime T_dither_max;
+
+ /* Implements the algorithm described in RFC 4585 section 3.5.2 */
+
+ RTP_SESSION_LOCK (sess);
+
+ /* Check if already requested */
+ /* RFC 4585 section 3.5.2 step 2 */
+ if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time))
+ goto dont_send;
+
+ /* Ignore the request a scheduled packet will be in time anyway */
+ if (current_time + max_delay > sess->next_rtcp_check_time)
+ goto dont_send;
+
+ /* RFC 4585 section 3.5.2 step 2b */
+ /* If the total sources is <=2, then there is only us and one peer */
+ if (sess->total_sources <= 2) {
+ T_dither_max = 0;
+ } else {
+ /* Divide by 2 because l = 0.5 */
+ T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+ T_dither_max /= 2;
+ }
+
+ /* RFC 4585 section 3.5.2 step 3 */
+ if (current_time + T_dither_max > sess->next_rtcp_check_time)
+ goto dont_send;
+
+ /* RFC 4585 section 3.5.2 step 4 */
+ if (sess->allow_early == FALSE)
+ goto dont_send;
+
+ if (T_dither_max) {
+ /* Schedule an early transmission later */
+ sess->next_early_rtcp_time = g_random_double () * T_dither_max +
+ current_time;
+ } else {
+ /* If no dithering, schedule it for NOW */
+ sess->next_early_rtcp_time = current_time;
+ }
+
+ RTP_SESSION_UNLOCK (sess);
+
+ /* notify app of need to send packet early
+ * and therefore of timeout change */
+ if (sess->callbacks.reconsider)
+ sess->callbacks.reconsider (sess, sess->reconsider_user_data);
+
+ return;
+
+dont_send:
+
+ RTP_SESSION_UNLOCK (sess);
+
+}
+
+void
+rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc)
+{
+ guint i;
+
+ for (i = 0; i < sess->rtcp_pli_requests->len; i++)
+ if (ssrc == g_array_index (sess->rtcp_pli_requests, guint32, i))
+ return;
+
+ g_array_append_val (sess->rtcp_pli_requests, ssrc);
+}
+
+static gboolean
+has_pli_compare_func (gconstpointer a, gconstpointer ignored)
+{
+ GstRTCPPacket packet;
+
+ packet.buffer = (GstBuffer *) a;
+ packet.offset = 0;
+
+ if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB &&
+ gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI)
+ return TRUE;
+ else
+ return FALSE;
+}
+
+static gboolean
+rtp_session_on_sending_rtcp (RTPSession * sess, GstBuffer * buffer,
+ gboolean early)
+{
+ gboolean ret = FALSE;
+
+ RTP_SESSION_LOCK (sess);
+
+ while (sess->rtcp_pli_requests->len) {
+ GstRTCPPacket rtcppacket;
+ guint media_ssrc = g_array_index (sess->rtcp_pli_requests, guint32, 0);
+ RTPSource *media_src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx],
+ GUINT_TO_POINTER (media_ssrc));
+
+ if (media_src && !rtp_source_has_retained (media_src,
+ has_pli_compare_func, NULL)) {
+ if (gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_PSFB, &rtcppacket)) {
+ gst_rtcp_packet_fb_set_type (&rtcppacket, GST_RTCP_PSFB_TYPE_PLI);
+ gst_rtcp_packet_fb_set_sender_ssrc (&rtcppacket,
+ rtp_source_get_ssrc (sess->source));
+ gst_rtcp_packet_fb_set_media_ssrc (&rtcppacket, media_ssrc);
+ ret = TRUE;
+ } else {
+ /* Break because the packet is full, will put next request in a
+ * further packet
+ */
+ break;
+ }
+ }
+
+ g_array_remove_index (sess->rtcp_pli_requests, 0);
+ }
+
+ RTP_SESSION_UNLOCK (sess);
+
+ return ret;
+}
+
+static void
+rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay)
+{
+ GstClockTime now;
+
+ if (!sess->callbacks.send_rtcp)
+ return;
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+ rtp_session_request_early_rtcp (sess, now, max_delay);
+}