SIGNAL_ON_RECEIVING_RTCP,
SIGNAL_ON_NEW_SENDER_SSRC,
SIGNAL_ON_SENDER_SSRC_ACTIVE,
+ SIGNAL_ON_SENDING_NACKS,
LAST_SIGNAL
};
#define DEFAULT_MAX_MISORDER_TIME 2000
#define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP
#define DEFAULT_RTCP_REDUCED_SIZE FALSE
+#define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
enum
{
PROP_MAX_MISORDER_TIME,
PROP_STATS,
PROP_RTP_PROFILE,
- PROP_RTCP_REDUCED_SIZE
+ PROP_RTCP_REDUCED_SIZE,
+ PROP_RTCP_DISABLE_SR_TIMESTAMP
};
/* update average packet size */
static gboolean rtp_session_send_rtcp (RTPSession * sess,
GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
+ GstClockTime deadline);
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
+ /**
+ * RTPSession::on-sending-nack
+ * @session: the object which received the signal
+ * @sender_ssrc: the sender ssrc
+ * @media_ssrc: the media ssrc
+ * @nacks: (element-type guint16): the list of seqnum to be nacked
+ * @buffer: the #GstBuffer containing the RTCP packet about to be sent
+ *
+ * This signal is emitted before NACK packets are added into the RTCP
+ * packet. This signal can be used to override the conversion of the NACK
+ * seqnum array into packets. This can be used if your protocol uses
+ * different type of NACK (e.g. based on RTCP APP).
+ *
+ * The handler should transform the seqnum from @nacks array into packets.
+ * @nacks seqnum must be consumed from the start. The remaining will be
+ * rescheduled for later base on bandwidth. Only one handler will be
+ * signalled.
+ *
+ * A handler may return 0 to signal that generic NACKs should be created
+ * for this set. This can be useful if the signal is used for other purpose
+ * or if the other type of NACK would use more space.
+ *
+ * Returns: the number of NACK seqnum that was consumed from @nacks.
+ *
+ * Since: 1.16
+ */
+ rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
+ g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
+ g_signal_accumulator_first_wins, NULL, g_cclosure_marshal_generic,
+ G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
+ GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
g_param_spec_uint ("internal-ssrc", "Internal SSRC",
"The internal SSRC used for the session (deprecated)",
DEFAULT_RTCP_REDUCED_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * RTPSession::disable-sr-timestamp:
+ *
+ * Whether sender reports should be timestamped.
+ *
+ * Since: 1.16
+ */
+ g_object_class_install_property (gobject_class,
+ PROP_RTCP_DISABLE_SR_TIMESTAMP,
+ g_param_spec_boolean ("disable-sr-timestamp",
+ "Disable Sender Report Timestamp",
+ "Whether sender reports should be timestamped",
+ DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
klass->get_source_by_ssrc =
GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
sess->rtp_profile = DEFAULT_RTP_PROFILE;
sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
+ sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
sess->is_doing_ptp = TRUE;
}
if (sess->callbacks.reconsider)
sess->callbacks.reconsider (sess, sess->reconsider_user_data);
break;
+ case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
+ sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
+ break;
case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
break;
case PROP_RTCP_REDUCED_SIZE:
sess->reduced_size_rtcp = g_value_get_boolean (value);
break;
+ case PROP_RTCP_DISABLE_SR_TIMESTAMP:
+ sess->timestamp_sender_reports = !g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_RTCP_MIN_INTERVAL:
g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
break;
+ case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
+ g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
+ break;
case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
break;
case PROP_RTCP_REDUCED_SIZE:
g_value_set_boolean (value, sess->reduced_size_rtcp);
break;
+ case PROP_RTCP_DISABLE_SR_TIMESTAMP:
+ g_value_set_boolean (value, !sess->timestamp_sender_reports);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
return ssrc;
}
-
-/**
- * rtp_session_create_source:
- * @sess: an #RTPSession
- *
- * Create an #RTPSource for use in @sess. This function will create a source
- * with an ssrc that is currently not used by any participants in the session.
- *
- * Returns: an #RTPSource.
- */
-RTPSource *
-rtp_session_create_source (RTPSession * sess)
-{
- guint32 ssrc;
- RTPSource *source;
-
- RTP_SESSION_LOCK (sess);
- ssrc = rtp_session_create_new_ssrc (sess);
- source = rtp_source_new (ssrc);
- rtp_source_set_callbacks (source, &callbacks, sess);
- /* we need an additional ref for the source in the hashtable */
- g_object_ref (source);
- add_source (sess, source);
- RTP_SESSION_UNLOCK (sess);
-
- return source;
-}
-
static gboolean
update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
{
/* update the RTPPacketInfo structure with the current time and other bits
* about the current buffer we are handling.
* This function is typically called when a validated packet is received.
- * This function should be called with the SESSION_LOCK
+ * This function should be called with the RTP_SESSION_LOCK
*/
static gboolean
update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
prevactive = RTP_SOURCE_IS_ACTIVE (source);
oldrate = source->bitrate;
+ if (created)
+ on_new_ssrc (sess, source);
+
/* let source process the packet */
result = rtp_source_process_rtp (source, &pinfo);
if (oldrate != source->bitrate)
sess->recalc_bandwidth = TRUE;
- if (created)
- on_new_ssrc (sess, source);
if (source->validated) {
gboolean created;
gst_buffer_unref (fci_buffer);
}
- if (src && sess->rtcp_feedback_retention_window) {
+ if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
}
obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
if (source) {
rtp_source_update_caps (source, caps);
+
+ if (created)
+ on_new_sender_ssrc (sess, source);
+
g_object_unref (source);
}
}
* @current_time: the current system time
* @running_time: the running time of @data
*
- * Send the RTP buffer in the session manager. This function takes ownership of
- * @buffer.
+ * Send the RTP data (a buffer or buffer list) in the session manager. This
+ * function takes ownership of @data.
*
* Returns: a #GstFlowReturn.
*/
/* fill in sender report info */
gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
- ntptime, rtptime, packet_count, octet_count);
+ sess->timestamp_sender_reports ? ntptime : 0,
+ sess->timestamp_sender_reports ? rtptime : 0,
+ packet_count, octet_count);
} else {
/* we are only receiver, create RR */
GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
goto reported;
}
+ if (source->disable_rtcp) {
+ GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+ goto reported;
+ }
+
GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
/* get new stats */
static void
session_nack (const gchar * key, RTPSource * source, ReportData * data)
{
+ RTPSession *sess = data->sess;
GstRTCPBuffer *rtcp = &data->rtcpbuf;
GstRTCPPacket *packet = &data->packet;
- guint32 *nacks;
- guint n_nacks, i;
+ guint16 *nacks;
+ GstClockTime *nack_deadlines;
+ guint n_nacks, i = 0;
+ guint nacked_seqnums = 0;
+ guint16 n_fb_nacks = 0;
guint8 *fci_data;
if (!source->send_nack)
return;
+ nacks = rtp_source_get_nacks (source, &n_nacks);
+ nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
+ GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
+ GST_TIME_ARGS (data->current_time));
+
+ /* cleanup expired nacks */
+ for (i = 0; i < n_nacks; i++) {
+ GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
+ GST_TIME_ARGS (nack_deadlines[i]));
+ if (nack_deadlines[i] >= data->current_time)
+ break;
+ }
+ if (i) {
+ GST_WARNING ("Removing %u expired NACKS", i);
+ rtp_source_clear_nacks (source, i);
+ n_nacks -= i;
+ if (n_nacks == 0)
+ return;
+ }
+
+ /* allow overriding NACK to packet conversion */
+ if (g_signal_has_handler_pending (sess,
+ rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
+ /* this is needed as it will actually resize the buffer */
+ gst_rtcp_buffer_unmap (rtcp);
+
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
+ data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
+ &nacked_seqnums);
+
+ /* and now remap for the remaining work */
+ gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
+
+ if (nacked_seqnums > 0)
+ goto done;
+ }
+
if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
/* exit because the packet is full, will put next request in a
* further packet */
gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
- nacks = rtp_source_get_nacks (source, &n_nacks);
- GST_DEBUG ("%u NACKs", n_nacks);
- if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
+ gst_rtcp_packet_remove (packet);
+ GST_WARNING ("no nacks fit in the packet");
return;
+ }
fci_data = gst_rtcp_packet_fb_get_fci (packet);
- for (i = 0; i < n_nacks; i++) {
- GST_WRITE_UINT32_BE (fci_data, nacks[i]);
+ for (i = 0; i < n_nacks; i = nacked_seqnums) {
+ guint16 seqnum = nacks[i];
+ guint16 blp = 0;
+ guint j;
+
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
+ break;
+
+ n_fb_nacks++;
+ nacked_seqnums++;
+
+ for (j = i + 1; j < n_nacks; j++) {
+ gint diff;
+
+ diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
+ GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
+ if (diff > 16)
+ break;
+
+ blp |= 1 << (diff - 1);
+ nacked_seqnums++;
+ }
+
+ GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
fci_data += 4;
- data->nacked_seqnums++;
}
- rtp_source_clear_nacks (source);
+ GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
+ source->stats.sent_nack_count += n_fb_nacks;
+
+done:
+ data->nacked_seqnums += nacked_seqnums;
+ rtp_source_clear_nacks (source, nacked_seqnums);
data->may_suppress = FALSE;
- source->stats.sent_nack_count += n_nacks;
}
/* perform cleanup of sources that timed out */
else
data->is_early = FALSE;
- if (data->is_early && sess->next_early_rtcp_time < current_time) {
- GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
+ if (data->is_early && sess->next_early_rtcp_time <= current_time) {
+ GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
GST_TIME_ARGS (current_time));
} else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
if (sess->scheduled_bye && !source->marked_bye)
return;
+ /* skip if RTCP is disabled */
+ if (source->disable_rtcp) {
+ GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+ return;
+ }
+
data->source = source;
/* open packet */
}
}
+static void
+schedule_remaining_nacks (const gchar * key, RTPSource * source,
+ ReportData * data)
+{
+ RTPSession *sess = data->sess;
+ GstClockTime *nack_deadlines;
+ GstClockTime deadline;
+ guint n_nacks;
+
+ if (!source->send_nack)
+ return;
+
+ /* the scheduling is entirely based on available bandwidth, just take the
+ * biggest seqnum, which will have the largest deadline to request early
+ * RTCP. */
+ nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
+ deadline = nack_deadlines[n_nacks - 1];
+ RTP_SESSION_UNLOCK (sess);
+ rtp_session_send_rtcp_with_deadline (sess, deadline);
+ RTP_SESSION_LOCK (sess);
+}
+
static gboolean
rtp_session_are_all_sources_bye (RTPSession * sess)
{
if (all_empty)
GST_ERROR ("generated empty RTCP messages for all the sources");
+ /* schedule remaining nacks */
+ RTP_SESSION_LOCK (sess);
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) schedule_remaining_nacks, &data);
+ RTP_SESSION_UNLOCK (sess);
+
return result;
}
}
static gboolean
+rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
+ GstClockTime max_delay)
+{
+ /* notify the application that we intend to send early RTCP */
+ if (sess->callbacks.notify_early_rtcp)
+ sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
+
+ return rtp_session_request_early_rtcp (sess, now, max_delay);
+}
+
+static gboolean
+rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
+{
+ GstClockTime now, max_delay;
+
+ if (!sess->callbacks.send_rtcp)
+ return FALSE;
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+ if (deadline < now)
+ return FALSE;
+
+ max_delay = deadline - now;
+
+ return rtp_session_send_rtcp_internal (sess, now, max_delay);
+}
+
+static gboolean
rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
{
GstClockTime now;
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
- /* notify the application that we intend to send early RTCP */
- if (sess->callbacks.notify_early_rtcp)
- sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
-
- return rtp_session_request_early_rtcp (sess, now, max_delay);
+ return rtp_session_send_rtcp_internal (sess, now, max_delay);
}
gboolean
{
RTPSource *src;
- if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
- GST_DEBUG ("FIR/PLI not sent");
- return FALSE;
- }
-
RTP_SESSION_LOCK (sess);
src = find_source (sess, ssrc);
if (src == NULL)
}
RTP_SESSION_UNLOCK (sess);
+ if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+ GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
+ }
+
return TRUE;
/* ERRORS */
GstClockTime max_delay)
{
RTPSource *source;
+ GstClockTime now;
- if (!rtp_session_send_rtcp (sess, max_delay)) {
- GST_DEBUG ("NACK not sent");
+ if (!sess->callbacks.send_rtcp)
return FALSE;
- }
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
RTP_SESSION_LOCK (sess);
source = find_source (sess, ssrc);
if (source == NULL)
goto no_source;
- GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
- rtp_source_register_nack (source, seqnum);
+ GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
+ ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
+ rtp_source_register_nack (source, seqnum, now + max_delay);
RTP_SESSION_UNLOCK (sess);
+ if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
+ GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
+ }
+
return TRUE;
/* ERRORS */