#define GLIB_DISABLE_DEPRECATION_WARNINGS
#include <string.h>
+#include <stdlib.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>
#include "rtpsession.h"
-GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
+GST_DEBUG_CATEGORY (rtp_session_debug);
#define GST_CAT_DEFAULT rtp_session_debug
/* signals and args */
SIGNAL_ON_TIMEOUT,
SIGNAL_ON_SENDER_TIMEOUT,
SIGNAL_ON_SENDING_RTCP,
+ SIGNAL_ON_APP_RTCP,
SIGNAL_ON_FEEDBACK_RTCP,
SIGNAL_SEND_RTCP,
SIGNAL_SEND_RTCP_FULL,
SIGNAL_ON_RECEIVING_RTCP,
+ SIGNAL_ON_NEW_SENDER_SSRC,
+ SIGNAL_ON_SENDER_SSRC_ACTIVE,
+ SIGNAL_ON_SENDING_NACKS,
LAST_SIGNAL
};
#define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
#define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION
+#define DEFAULT_MAX_DROPOUT_TIME 60000
+#define DEFAULT_MAX_MISORDER_TIME 2000
+#define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP
+#define DEFAULT_RTCP_REDUCED_SIZE FALSE
+#define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE
enum
{
PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
PROP_PROBATION,
- PROP_STATS
+ PROP_MAX_DROPOUT_TIME,
+ PROP_MAX_MISORDER_TIME,
+ PROP_STATS,
+ PROP_RTP_PROFILE,
+ PROP_RTCP_REDUCED_SIZE,
+ PROP_RTCP_DISABLE_SR_TIMESTAMP
};
/* update average packet size */
(avg) = ((val) + (15 * (avg))) >> 4;
+#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
+
/* GObject vmethods */
static void rtp_session_finalize (GObject * object);
static void rtp_session_set_property (GObject * object, guint prop_id,
static gboolean rtp_session_send_rtcp (RTPSession * sess,
GstClockTime max_delay);
+static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess,
+ GstClockTime deadline);
static guint rtp_session_signals[LAST_SIGNAL] = { 0 };
rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] =
g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass,
- get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic,
+ get_source_by_ssrc), NULL, NULL, NULL,
RTP_TYPE_SOURCE, 1, G_TYPE_UINT);
/**
rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-ssrc-collision:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-ssrc-validated:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-ssrc-active:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] =
g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-ssrc-sdes:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_SSRC_SDES] =
g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-bye-ssrc:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-bye-timeout:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-timeout:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_TIMEOUT] =
g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-sender-timeout:
* @session: the object which received the signal
rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] =
g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout),
- NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
- RTP_TYPE_SOURCE);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
/**
* RTPSession::on-sending-rtcp
rtp_session_signals[SIGNAL_ON_SENDING_RTCP] =
g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp),
- accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2,
+ accumulate_trues, NULL, NULL, G_TYPE_BOOLEAN, 2,
GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN);
/**
+ * RTPSession::on-app-rtcp:
+ * @session: the object which received the signal
+ * @subtype: The subtype of the packet
+ * @ssrc: The SSRC/CSRC of the packet
+ * @name: The name of the packet
+ * @data: a #GstBuffer with the application-dependant data or %NULL if
+ * there was no data
+ *
+ * Notify that a RTCP APP packet has been received
+ */
+ rtp_session_signals[SIGNAL_ON_APP_RTCP] =
+ g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp),
+ NULL, NULL, NULL, G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT,
+ G_TYPE_STRING, GST_TYPE_BUFFER);
+
+ /**
* RTPSession::on-feedback-rtcp:
* @session: the object which received the signal
* @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or
rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] =
g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp),
- NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT,
- G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER);
+ NULL, NULL, NULL, G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT,
+ G_TYPE_UINT, GST_TYPE_BUFFER);
/**
* RTPSession::send-rtcp:
*
* Requests that the #RTPSession initiate a new RTCP packet as soon as
* possible within the requested delay.
+ *
+ * This sets feedback to %TRUE if not already done before.
*/
rtp_session_signals[SIGNAL_SEND_RTCP] =
g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64);
+ NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
/**
* RTPSession::send-rtcp-full:
* Requests that the #RTPSession initiate a new RTCP packet as soon as
* possible within the requested delay.
*
+ * This sets feedback to %TRUE if not already done before.
+ *
* Returns: TRUE if the new RTCP packet could be scheduled within the
* requested delay, FALSE otherwise.
*
g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
+ NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
/**
* RTPSession::on-receiving-rtcp
rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] =
g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp),
- NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1,
+ NULL, NULL, NULL, G_TYPE_NONE, 1,
+ GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
+ /**
+ * RTPSession::on-new-sender-ssrc:
+ * @session: the object which received the signal
+ * @src: the new sender RTPSource
+ *
+ * Notify of a new sender SSRC that entered @session.
+ *
+ * Since: 1.8
+ */
+ rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
+ g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc),
+ NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
+
+ /**
+ * RTPSession::on-sender-ssrc-active:
+ * @session: the object which received the signal
+ * @src: the active sender RTPSource
+ *
+ * Notify of a sender SSRC that is active, i.e., sending RTCP.
+ *
+ * Since: 1.8
+ */
+ rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
+ g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass,
+ on_sender_ssrc_active), NULL, NULL, NULL,
+ G_TYPE_NONE, 1, RTP_TYPE_SOURCE);
+
+ /**
+ * RTPSession::on-sending-nack
+ * @session: the object which received the signal
+ * @sender_ssrc: the sender ssrc
+ * @media_ssrc: the media ssrc
+ * @nacks: (element-type guint16): the list of seqnum to be nacked
+ * @buffer: the #GstBuffer containing the RTCP packet about to be sent
+ *
+ * This signal is emitted before NACK packets are added into the RTCP
+ * packet. This signal can be used to override the conversion of the NACK
+ * seqnum array into packets. This can be used if your protocol uses
+ * different type of NACK (e.g. based on RTCP APP).
+ *
+ * The handler should transform the seqnum from @nacks array into packets.
+ * @nacks seqnum must be consumed from the start. The remaining will be
+ * rescheduled for later base on bandwidth. Only one handler will be
+ * signalled.
+ *
+ * A handler may return 0 to signal that generic NACKs should be created
+ * for this set. This can be useful if the signal is used for other purpose
+ * or if the other type of NACK would use more space.
+ *
+ * Returns: the number of NACK seqnum that was consumed from @nacks.
+ *
+ * Since: 1.16
+ */
+ rtp_session_signals[SIGNAL_ON_SENDING_NACKS] =
+ g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks),
+ g_signal_accumulator_first_wins, NULL, NULL,
+ G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY,
GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC,
g_param_spec_uint ("internal-ssrc", "Internal SSRC",
"The internal SSRC used for the session (deprecated)",
- 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ 0, G_MAXUINT, 0,
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+ GST_PARAM_DOC_SHOW_DEFAULT));
+#else
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+#endif
g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE,
g_param_spec_object ("internal-source", "Internal Source",
g_object_class_install_property (gobject_class, PROP_BANDWIDTH,
g_param_spec_double ("bandwidth", "Bandwidth",
- "The bandwidth of the session (0 for auto-discover)",
+ "The bandwidth of the session in bits per second (0 for auto-discover)",
0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION,
g_param_spec_double ("rtcp-fraction", "RTCP Fraction",
- "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)",
+ "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)",
0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH,
g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth",
- "The RTCP bandwidth used for receivers in bytes per second (-1 = default)",
+ "The RTCP bandwidth used for receivers in bits per second (-1 = default)",
-1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH,
g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth",
- "The RTCP bandwidth used for senders in bytes per second (-1 = default)",
+ "The RTCP bandwidth used for senders in bits per second (-1 = default)",
-1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SDES,
g_param_spec_boxed ("sdes", "SDES",
"The SDES items of this session",
+#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK
+ GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
+ | GST_PARAM_DOC_SHOW_DEFAULT));
+#else
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+#endif
g_object_class_install_property (gobject_class, PROP_NUM_SOURCES,
g_param_spec_uint ("num-sources", "Num Sources",
DEFAULT_NUM_ACTIVE_SOURCES,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
- * RTPSource::sources
+ * RTPSource:sources
*
* Get a GValue Array of all sources in the session.
*
- * <example>
- * <title>Getting the #RTPSources of a session
- * <programlisting>
+ * ## Getting the #RTPSources of a session
+ *
+ * ``` C
* {
* GValueArray *arr;
* GValue *val;
* }
* g_value_array_free (arr);
* }
- * </programlisting>
- * </example>
+ * ```
*/
g_object_class_install_property (gobject_class, PROP_SOURCES,
g_param_spec_boxed ("sources", "Sources",
0, G_MAXUINT, DEFAULT_PROBATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
+ g_param_spec_uint ("max-dropout-time", "Max dropout time",
+ "The maximum time (milliseconds) of missing packets tolerated.",
+ 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
+ g_param_spec_uint ("max-misorder-time", "Max misorder time",
+ "The maximum time (milliseconds) of misordered packets tolerated.",
+ 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
/**
- * RTPSession::stats:
+ * RTPSession:stats:
*
* Various session statistics. This property returns a GstStructure
* with name application/x-rtp-session-stats with the following fields:
*
- * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
+ * * "rtx-drop-count" G_TYPE_UINT The number of retransmission events
* dropped (due to bandwidth constraints)
- * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
- * "recv-nack-count" G_TYPE_UINT Number of NACKs received
+ * * "sent-nack-count" G_TYPE_UINT Number of NACKs sent
+ * * "recv-nack-count" G_TYPE_UINT Number of NACKs received
+ * * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource:stats for all
+ * RTP sources (Since 1.8)
*
* Since: 1.4
*/
"Various statistics", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
+ g_param_spec_enum ("rtp-profile", "RTP Profile",
+ "RTP profile to use for this session", GST_TYPE_RTP_PROFILE,
+ DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE,
+ g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size",
+ "Use Reduced Size RTCP for feedback packets",
+ DEFAULT_RTCP_REDUCED_SIZE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * RTPSession:disable-sr-timestamp:
+ *
+ * Whether sender reports should be timestamped.
+ *
+ * Since: 1.16
+ */
+ g_object_class_install_property (gobject_class,
+ PROP_RTCP_DISABLE_SR_TIMESTAMP,
+ g_param_spec_boolean ("disable-sr-timestamp",
+ "Disable Sender Report Timestamp",
+ "Whether sender reports should be timestamped",
+ DEFAULT_RTCP_DISABLE_SR_TIMESTAMP,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
klass->get_source_by_ssrc =
GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp);
sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH;
/* default UDP header length */
- sess->header_len = 28;
+ sess->header_len = UDP_IP_HEADER_OVERHEAD;
sess->mtu = DEFAULT_RTCP_MTU;
sess->probation = DEFAULT_PROBATION;
+ sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
+ sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
/* some default SDES entries */
sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
/* this is the SSRC we suggest */
sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
+ sess->internal_ssrc_set = FALSE;
sess->first_rtcp = TRUE;
sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
- sess->allow_early = TRUE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW;
sess->rtcp_immediate_feedback_threshold =
DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD;
-
- sess->last_keyframe_request = GST_CLOCK_TIME_NONE;
+ sess->rtp_profile = DEFAULT_RTP_PROFILE;
+ sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE;
+ sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP;
sess->is_doing_ptp = TRUE;
+
+ sess->twcc = rtp_twcc_manager_new (sess->mtu);
+ sess->twcc_stats = rtp_twcc_stats_new ();
}
static void
for (i = 0; i < 1; i++)
g_hash_table_destroy (sess->ssrcs[i]);
+ g_object_unref (sess->twcc);
+ rtp_twcc_stats_free (sess->twcc_stats);
+
g_mutex_clear (&sess->lock);
G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
return res;
}
+static void
+create_source_stats (gpointer key, RTPSource * source, GValueArray * arr)
+{
+ GValue *value;
+ GstStructure *s;
+
+ g_object_get (source, "stats", &s, NULL);
+
+ g_value_array_append (arr, NULL);
+ value = g_value_array_get_nth (arr, arr->n_values - 1);
+ g_value_init (value, GST_TYPE_STRUCTURE);
+ g_value_take_boxed (value, s);
+}
+
static GstStructure *
rtp_session_create_stats (RTPSession * sess)
{
GstStructure *s;
+ GValueArray *source_stats;
+ GValue source_stats_v = G_VALUE_INIT;
+ guint size;
+ RTP_SESSION_LOCK (sess);
s = gst_structure_new ("application/x-rtp-session-stats",
"rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped,
"sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent,
"recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL);
+ size = g_hash_table_size (sess->ssrcs[sess->mask_idx]);
+ source_stats = g_value_array_new (size);
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) create_source_stats, source_stats);
+ RTP_SESSION_UNLOCK (sess);
+
+ g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY);
+ g_value_take_boxed (&source_stats_v, source_stats);
+ gst_structure_take_value (s, "source-stats", &source_stats_v);
+
return s;
}
case PROP_INTERNAL_SSRC:
RTP_SESSION_LOCK (sess);
sess->suggested_ssrc = g_value_get_uint (value);
+ sess->internal_ssrc_set = TRUE;
+ sess->internal_ssrc_from_caps_or_property = TRUE;
RTP_SESSION_UNLOCK (sess);
if (sess->callbacks.reconfigure)
sess->callbacks.reconfigure (sess, sess->reconfigure_user_data);
break;
case PROP_RTCP_MTU:
sess->mtu = g_value_get_uint (value);
+ rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu);
break;
case PROP_SDES:
rtp_session_set_sdes_struct (sess, g_value_get_boxed (value));
if (sess->callbacks.reconsider)
sess->callbacks.reconsider (sess, sess->reconsider_user_data);
break;
+ case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
+ sess->rtcp_feedback_retention_window = g_value_get_uint64 (value);
+ break;
case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
break;
case PROP_PROBATION:
sess->probation = g_value_get_uint (value);
break;
+ case PROP_MAX_DROPOUT_TIME:
+ sess->max_dropout_time = g_value_get_uint (value);
+ break;
+ case PROP_MAX_MISORDER_TIME:
+ sess->max_misorder_time = g_value_get_uint (value);
+ break;
+ case PROP_RTP_PROFILE:
+ sess->rtp_profile = g_value_get_enum (value);
+ /* trigger reconsideration */
+ RTP_SESSION_LOCK (sess);
+ sess->next_rtcp_check_time = 0;
+ RTP_SESSION_UNLOCK (sess);
+ if (sess->callbacks.reconsider)
+ sess->callbacks.reconsider (sess, sess->reconsider_user_data);
+ break;
+ case PROP_RTCP_REDUCED_SIZE:
+ sess->reduced_size_rtcp = g_value_get_boolean (value);
+ break;
+ case PROP_RTCP_DISABLE_SR_TIMESTAMP:
+ sess->timestamp_sender_reports = !g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
switch (prop_id) {
case PROP_INTERNAL_SSRC:
- g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
+ g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL));
break;
case PROP_INTERNAL_SOURCE:
/* FIXME, return a random source */
case PROP_RTCP_MIN_INTERVAL:
g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND);
break;
+ case PROP_RTCP_FEEDBACK_RETENTION_WINDOW:
+ g_value_set_uint64 (value, sess->rtcp_feedback_retention_window);
+ break;
case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
break;
case PROP_PROBATION:
g_value_set_uint (value, sess->probation);
break;
+ case PROP_MAX_DROPOUT_TIME:
+ g_value_set_uint (value, sess->max_dropout_time);
+ break;
+ case PROP_MAX_MISORDER_TIME:
+ g_value_set_uint (value, sess->max_misorder_time);
+ break;
case PROP_STATS:
g_value_take_boxed (value, rtp_session_create_stats (sess));
break;
+ case PROP_RTP_PROFILE:
+ g_value_set_enum (value, sess->rtp_profile);
+ break;
+ case PROP_RTCP_REDUCED_SIZE:
+ g_value_set_boolean (value, sess->reduced_size_rtcp);
+ break;
+ case PROP_RTCP_DISABLE_SR_TIMESTAMP:
+ g_value_set_boolean (value, !sess->timestamp_sender_reports);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
g_object_unref (source);
}
+static void
+on_new_sender_ssrc (RTPSession * sess, RTPSource * source)
+{
+ g_object_ref (source);
+ RTP_SESSION_UNLOCK (sess);
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
+ source);
+ RTP_SESSION_LOCK (sess);
+ g_object_unref (source);
+}
+
+static void
+on_sender_ssrc_active (RTPSession * sess, RTPSource * source)
+{
+ g_object_ref (source);
+ RTP_SESSION_UNLOCK (sess);
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0,
+ source);
+ RTP_SESSION_LOCK (sess);
+ g_object_unref (source);
+}
+
/**
* rtp_session_new:
*
}
/**
+ * rtp_session_reset:
+ * @sess: an #RTPSession
+ *
+ * Reset the sources of @sess.
+ */
+void
+rtp_session_reset (RTPSession * sess)
+{
+ g_return_if_fail (RTP_IS_SESSION (sess));
+
+ /* remove all sources */
+ g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]);
+ sess->total_sources = 0;
+ sess->stats.sender_sources = 0;
+ sess->stats.internal_sender_sources = 0;
+ sess->stats.internal_sources = 0;
+ sess->stats.active_sources = 0;
+
+ sess->generation = 0;
+ sess->first_rtcp = TRUE;
+ sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_interval = GST_CLOCK_TIME_NONE;
+ sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
+ sess->scheduled_bye = FALSE;
+
+ /* reset session stats */
+ sess->stats.bye_members = 0;
+ sess->stats.nacks_dropped = 0;
+ sess->stats.nacks_sent = 0;
+ sess->stats.nacks_received = 0;
+
+ sess->is_doing_ptp = TRUE;
+
+ g_list_free_full (sess->conflicting_addresses,
+ (GDestroyNotify) rtp_conflicting_address_free);
+ sess->conflicting_addresses = NULL;
+}
+
+/**
* rtp_session_set_callbacks:
* @sess: an #RTPSession
* @callbacks: callbacks to configure
sess->callbacks.notify_nack = callbacks->notify_nack;
sess->notify_nack_user_data = user_data;
}
+ if (callbacks->notify_twcc) {
+ sess->callbacks.notify_twcc = callbacks->notify_twcc;
+ sess->notify_twcc_user_data = user_data;
+ }
if (callbacks->reconfigure) {
sess->callbacks.reconfigure = callbacks->reconfigure;
sess->reconfigure_user_data = user_data;
}
+ if (callbacks->notify_early_rtcp) {
+ sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp;
+ sess->notify_early_rtcp_user_data = user_data;
+ }
}
/**
* @sess: an #RTPSession
* @bandwidth: the bandwidth allocated
*
- * Set the session bandwidth in bytes per second.
+ * Set the session bandwidth in bits per second.
*/
void
rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth)
* @sess: an #RTPSession
* @bandwidth: the RTCP bandwidth
*
- * Set the bandwidth in bytes per second that should be used for RTCP
+ * Set the bandwidth in bits per second that should be used for RTCP
* messages.
*/
void
return result;
}
+static void
+source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes)
+{
+ rtp_source_set_sdes_struct (source, gst_structure_copy (sdes));
+}
+
/**
* rtp_session_set_sdes_struct:
* @sess: an #RTSPSession
if (sess->sdes)
gst_structure_free (sess->sdes);
sess->sdes = gst_structure_copy (sdes);
+
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) source_set_sdes, sess->sdes);
RTP_SESSION_UNLOCK (sess);
}
add_conflicting_address (sess->conflicting_addresses, address, time);
}
+static void
+rtp_session_have_conflict (RTPSession * sess, RTPSource * source,
+ GSocketAddress * address, GstClockTime current_time)
+{
+ guint32 ssrc = rtp_source_get_ssrc (source);
+
+ /* Its a new collision, lets change our SSRC */
+ rtp_session_add_conflicting_address (sess, address, current_time);
+
+ /* mark the source BYE */
+ rtp_source_mark_bye (source, "SSRC Collision");
+ /* if we were suggesting this SSRC, change to something else */
+ if (sess->suggested_ssrc == ssrc) {
+ sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
+ sess->internal_ssrc_set = TRUE;
+ }
+
+ on_ssrc_collision (sess, source);
+
+ rtp_session_schedule_bye_locked (sess, current_time);
+}
static gboolean
check_collision (RTPSession * sess, RTPSource * source,
*/
GST_DEBUG ("Our packets are being looped back to us, dropping");
} else {
- /* Its a new collision, lets change our SSRC */
- rtp_session_add_conflicting_address (sess, pinfo->address,
- pinfo->current_time);
-
- GST_DEBUG ("Collision for SSRC %x", ssrc);
- /* mark the source BYE */
- rtp_source_mark_bye (source, "SSRC Collision");
- /* if we were suggesting this SSRC, change to something else */
- if (sess->suggested_ssrc == ssrc)
- sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
+ GST_DEBUG ("Collision for SSRC %x from new incoming packet,"
+ " change our sender ssrc", ssrc);
- on_ssrc_collision (sess, source);
-
- rtp_session_schedule_bye_locked (sess, pinfo->current_time);
+ rtp_session_have_conflict (sess, source, pinfo->address,
+ pinfo->current_time);
}
}
sess->stats.active_sources++;
if (src->internal) {
sess->stats.internal_sources++;
- if (sess->suggested_ssrc != src->ssrc)
+ if (!sess->internal_ssrc_from_caps_or_property
+ && sess->suggested_ssrc != src->ssrc) {
sess->suggested_ssrc = src->ssrc;
+ sess->internal_ssrc_set = TRUE;
+ }
}
/* update point-to-point status */
/* for RTP packets we need to set the source in probation. Receiving RTCP
* packets of an SSRC, on the other hand, is a strong indication that we
* are dealing with a valid source. */
- if (rtp)
- g_object_set (source, "probation", sess->probation, NULL);
- else
- g_object_set (source, "probation", 0, NULL);
+ g_object_set (source, "probation", rtp ? sess->probation : 0,
+ "max-dropout-time", sess->max_dropout_time, "max-misorder-time",
+ sess->max_misorder_time, NULL);
/* store from address, if any */
if (pinfo->address) {
/**
* rtp_session_suggest_ssrc:
* @sess: a #RTPSession
+ * @is_random: if the suggested ssrc is random
*
* Suggest an unused SSRC in @sess.
*
* Returns: a free unused SSRC
*/
guint32
-rtp_session_suggest_ssrc (RTPSession * sess)
+rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random)
{
guint32 result;
RTP_SESSION_LOCK (sess);
result = sess->suggested_ssrc;
+ if (is_random)
+ *is_random = !sess->internal_ssrc_set;
RTP_SESSION_UNLOCK (sess);
return result;
return ssrc;
}
-
-/**
- * rtp_session_create_source:
- * @sess: an #RTPSession
- *
- * Create an #RTPSource for use in @sess. This function will create a source
- * with an ssrc that is currently not used by any participants in the session.
- *
- * Returns: an #RTPSource.
- */
-RTPSource *
-rtp_session_create_source (RTPSession * sess)
-{
- guint32 ssrc;
- RTPSource *source;
-
- RTP_SESSION_LOCK (sess);
- ssrc = rtp_session_create_new_ssrc (sess);
- source = rtp_source_new (ssrc);
- rtp_source_set_callbacks (source, &callbacks, sess);
- /* we need an additional ref for the source in the hashtable */
- g_object_ref (source);
- add_source (sess, source);
- RTP_SESSION_UNLOCK (sess);
-
- return source;
-}
-
static gboolean
update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
{
pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ pinfo->marker = gst_rtp_buffer_get_marker (&rtp);
/* copy available csrc */
pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
for (i = 0; i < pinfo->csrc_count; i++)
pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
+
+ /* RTP header extensions */
+ pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
+ &pinfo->header_ext_bit_pattern);
}
gst_rtp_buffer_unmap (&rtp);
}
/* update the RTPPacketInfo structure with the current time and other bits
* about the current buffer we are handling.
* This function is typically called when a validated packet is received.
- * This function should be called with the SESSION_LOCK
+ * This function should be called with the RTP_SESSION_LOCK
*/
static gboolean
update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
pinfo->bytes = 0;
pinfo->payload_len = 0;
pinfo->packets = 0;
+ pinfo->marker = FALSE;
if (is_list) {
GstBufferList *list = GST_BUFFER_LIST_CAST (data);
GstBuffer *buffer = GST_BUFFER_CAST (data);
res = update_packet (&buffer, 0, pinfo);
}
+
return res;
}
gst_mini_object_unref (pinfo->data);
pinfo->data = NULL;
}
+ if (pinfo->header_ext)
+ g_bytes_unref (pinfo->header_ext);
+}
+
+static gint32
+packet_info_get_twcc_seqnum (RTPPacketInfo * pinfo, guint8 ext_id)
+{
+ gint32 val = -1;
+ gpointer data;
+ guint size;
+
+ if (pinfo->header_ext &&
+ gst_rtp_buffer_get_extension_onebyte_header_from_bytes (pinfo->header_ext,
+ pinfo->header_ext_bit_pattern, ext_id, 0, &data, &size)) {
+ if (size == 2)
+ val = GST_READ_UINT16_BE (data);
+ }
+ return val;
}
static gboolean
return TRUE;
}
+static void
+process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
+{
+ gint32 twcc_seqnum;
+
+ if (sess->twcc_recv_ext_id == 0)
+ return;
+
+ twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_recv_ext_id);
+ if (twcc_seqnum == -1)
+ return;
+
+ if (rtp_twcc_manager_recv_packet (sess->twcc, twcc_seqnum, pinfo)) {
+ RTP_SESSION_UNLOCK (sess);
+
+ /* TODO: find a better rational for this number, and possibly tune it based
+ on factors like framerate / bandwidth etc */
+ if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) {
+ GST_INFO ("Could not schedule TWCC straight away");
+ }
+ RTP_SESSION_LOCK (sess);
+ }
+}
+
static gboolean
source_update_sender (RTPSession * sess, RTPSource * source,
gboolean prevsender)
current_time, running_time, ntpnstime)) {
GST_DEBUG ("invalid RTP packet received");
RTP_SESSION_UNLOCK (sess);
- return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime);
+ return rtp_session_process_rtcp (sess, buffer, current_time, running_time,
+ ntpnstime);
}
ssrc = pinfo.ssrc;
prevactive = RTP_SOURCE_IS_ACTIVE (source);
oldrate = source->bitrate;
+ if (created)
+ on_new_ssrc (sess, source);
+
/* let source process the packet */
result = rtp_source_process_rtp (source, &pinfo);
+ process_twcc_packet (sess, &pinfo);
/* source became active */
if (source_update_active (sess, source, prevactive))
if (oldrate != source->bitrate)
sess->recalc_bandwidth = TRUE;
- if (created)
- on_new_ssrc (sess, source);
if (source->validated) {
gboolean created;
value = g_strndup ((const gchar *) data, len);
- gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
+ if (g_utf8_validate (value, -1, NULL)) {
+ gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL);
+ } else {
+ GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value);
+ }
g_free (name);
g_free (value);
for (i = 0; i < count; i++) {
guint32 ssrc;
RTPSource *source;
- gboolean created, prevactive, prevsender;
+ gboolean prevactive, prevsender;
guint pmembers, members;
ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
GST_DEBUG ("SSRC: %08x", ssrc);
/* find src and mark bye, no probation when dealing with RTCP */
- source = obtain_source (sess, ssrc, &created, pinfo, FALSE);
- if (!source)
- return;
-
- if (source->internal) {
- /* our own source, something weird with this packet */
- g_object_unref (source);
- continue;
+ source = find_source (sess, ssrc);
+ if (!source || source->internal) {
+ GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)",
+ !source ? "can't find source" : "has internal source SSRC");
+ break;
}
/* store time for when we need to time out this source */
pinfo->current_time < sess->next_rtcp_check_time) {
GstClockTime time_remaining;
- time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
- sess->next_rtcp_check_time =
- gst_util_uint64_scale (time_remaining, members, pmembers);
+ /* Scale our next RTCP check time according to the change of numbers
+ * of members. But only if a) this is the first RTCP, or b) this is not
+ * a feedback session, or c) this is a feedback session but we schedule
+ * for every RTCP interval (aka no t-rr-interval set).
+ *
+ * FIXME: a) and b) are not great as we will possibly go below Tmin
+ * for non-feedback profiles and in case of a) below
+ * Tmin/t-rr-interval in any case.
+ */
+ if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE ||
+ !(sess->rtp_profile == GST_RTP_PROFILE_AVPF
+ || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) ||
+ sess->next_rtcp_check_time - sess->last_rtcp_send_time ==
+ sess->last_rtcp_interval) {
+ time_remaining = sess->next_rtcp_check_time - pinfo->current_time;
+ sess->next_rtcp_check_time =
+ gst_util_uint64_scale (time_remaining, members, pmembers);
+ sess->next_rtcp_check_time += pinfo->current_time;
+ }
+ sess->last_rtcp_interval =
+ gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers);
GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
GST_TIME_ARGS (sess->next_rtcp_check_time));
- sess->next_rtcp_check_time += pinfo->current_time;
-
/* mark pending reconsider. We only want to signal the reconsideration
* once after we handled all the source in the bye packet */
reconsider = TRUE;
}
}
- if (created)
- on_new_ssrc (sess, source);
-
on_bye_ssrc (sess, source);
-
- g_object_unref (source);
}
if (reconsider) {
RTP_SESSION_UNLOCK (sess);
sess->callbacks.reconsider (sess, sess->reconsider_user_data);
RTP_SESSION_LOCK (sess);
}
+
g_free (reason);
}
RTPPacketInfo * pinfo)
{
GST_DEBUG ("received APP");
+
+ if (g_signal_has_handler_pending (sess,
+ rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) {
+ GstBuffer *data_buffer = NULL;
+ guint16 data_length;
+ gchar name[5];
+
+ data_length = gst_rtcp_packet_app_get_data_length (packet) * 4;
+ if (data_length > 0) {
+ guint8 *data = gst_rtcp_packet_app_get_data (packet);
+ data_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
+ GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length);
+ GST_BUFFER_PTS (data_buffer) = pinfo->running_time;
+ }
+
+ memcpy (name, gst_rtcp_packet_app_get_name (packet), 4);
+ name[4] = '\0';
+
+ RTP_SESSION_UNLOCK (sess);
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0,
+ gst_rtcp_packet_app_get_subtype (packet),
+ gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer);
+ RTP_SESSION_LOCK (sess);
+
+ if (data_buffer)
+ gst_buffer_unref (data_buffer);
+ }
}
static gboolean
rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src,
- gboolean fir, GstClockTime current_time)
+ guint32 media_ssrc, gboolean fir, GstClockTime current_time)
{
guint32 round_trip = 0;
rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip);
- if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
+ if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) {
GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip,
GST_SECOND, 65536);
- if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) {
- GST_DEBUG ("Ignoring %s request because one was send without one "
+ /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP
+ * packets with erroneous values resulting in crazy high RTT. */
+ if (round_trip_in_ns > 5 * GST_SECOND)
+ round_trip_in_ns = GST_SECOND / 2;
+
+ if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) {
+ GST_DEBUG ("Ignoring %s request from %X because one was send without one "
"RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")",
- fir ? "FIR" : "PLI",
- GST_TIME_ARGS (current_time - sess->last_keyframe_request),
+ fir ? "FIR" : "PLI", rtp_source_get_ssrc (src),
+ GST_TIME_ARGS (current_time - src->last_keyframe_request),
GST_TIME_ARGS (round_trip_in_ns));
return FALSE;
}
}
- sess->last_keyframe_request = current_time;
+ src->last_keyframe_request = current_time;
- GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI",
- rtp_source_get_ssrc (src), sess->callbacks.process_rtp,
+ GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI",
+ rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp,
sess->callbacks.request_key_unit);
RTP_SESSION_UNLOCK (sess);
- sess->callbacks.request_key_unit (sess, fir,
+ sess->callbacks.request_key_unit (sess, media_ssrc, fir,
sess->request_key_unit_user_data);
RTP_SESSION_LOCK (sess);
return;
src = find_source (sess, sender_ssrc);
- if (src == NULL)
- return;
+ if (src == NULL) {
+ /* try to find a src with media_ssrc instead */
+ src = find_source (sess, media_ssrc);
+ if (src == NULL)
+ return;
+ }
- rtp_session_request_local_key_unit (sess, src, FALSE, current_time);
+ rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE,
+ current_time);
}
static void
rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
- guint8 * fci_data, guint fci_length, GstClockTime current_time)
+ guint32 media_ssrc, guint8 * fci_data, guint fci_length,
+ GstClockTime current_time)
{
RTPSource *src;
guint32 ssrc;
if (!our_request)
return;
- rtp_session_request_local_key_unit (sess, src, TRUE, current_time);
+ rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE,
+ current_time);
}
static void
}
static void
+rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc,
+ guint32 media_ssrc, guint8 * fci_data, guint fci_length)
+{
+ GArray *twcc_packets;
+ GstStructure *twcc_packets_s;
+ GstStructure *twcc_stats_s;
+
+ twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc,
+ fci_data, fci_length * sizeof (guint32));
+ if (twcc_packets == NULL)
+ return;
+
+ twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets);
+ twcc_stats_s =
+ rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets);
+
+ GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s);
+ GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s);
+
+ g_array_unref (twcc_packets);
+
+ RTP_SESSION_UNLOCK (sess);
+ if (sess->callbacks.notify_twcc)
+ sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s,
+ sess->notify_twcc_user_data);
+ RTP_SESSION_LOCK (sess);
+}
+
+static void
rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet,
RTPPacketInfo * pinfo, GstClockTime current_time)
{
- GstRTCPType type = gst_rtcp_packet_get_type (packet);
- GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet);
- guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
- guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
- guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet);
- guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet);
+ GstRTCPType type;
+ GstRTCPFBType fbtype;
+ guint32 sender_ssrc, media_ssrc;
+ guint8 *fci_data;
+ guint fci_length;
RTPSource *src;
+ /* The feedback packet must include both sender SSRC and media SSRC */
+ if (packet->length < 2)
+ return;
+
+ type = gst_rtcp_packet_get_type (packet);
+ fbtype = gst_rtcp_packet_fb_get_type (packet);
+ sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet);
+ media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet);
+
src = find_source (sess, media_ssrc);
/* skip non-bye packets for sources that are marked BYE */
if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src))
return;
+ if (src)
+ g_object_ref (src);
+
+ fci_data = gst_rtcp_packet_fb_get_fci (packet);
+ fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32);
+
GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of "
"length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length);
fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer,
GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data,
fci_length);
- GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time;
+ GST_BUFFER_PTS (fci_buffer) = pinfo->running_time;
}
RTP_SESSION_UNLOCK (sess);
gst_buffer_unref (fci_buffer);
}
- if (src && sess->rtcp_feedback_retention_window) {
+ if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) {
rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time);
}
if ((src && src->internal) ||
/* PSFB FIR puts the media ssrc inside the FCI */
- (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) {
+ (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) ||
+ /* TWCC is for all sources, so a single media-ssrc is not enough */
+ (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) {
switch (type) {
case GST_RTCP_TYPE_PSFB:
switch (fbtype) {
case GST_RTCP_PSFB_TYPE_PLI:
- src->stats.recv_pli_count++;
+ if (src)
+ src->stats.recv_pli_count++;
rtp_session_process_pli (sess, sender_ssrc, media_ssrc,
current_time);
break;
case GST_RTCP_PSFB_TYPE_FIR:
- src->stats.recv_fir_count++;
- rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length,
- current_time);
+ if (src)
+ src->stats.recv_fir_count++;
+ rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data,
+ fci_length, current_time);
break;
default:
break;
case GST_RTCP_TYPE_RTPFB:
switch (fbtype) {
case GST_RTCP_RTPFB_TYPE_NACK:
+ if (src)
+ src->stats.recv_nack_count++;
rtp_session_process_nack (sess, sender_ssrc, media_ssrc,
fci_data, fci_length, current_time);
break;
+ case GST_RTCP_RTPFB_TYPE_TWCC:
+ rtp_session_process_twcc (sess, sender_ssrc, media_ssrc,
+ fci_data, fci_length);
+ break;
default:
break;
}
break;
}
}
+
+ if (src)
+ g_object_unref (src);
}
/**
*/
GstFlowReturn
rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
- GstClockTime current_time, guint64 ntpnstime)
+ GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
{
GstRTCPPacket packet;
gboolean more, is_bye = FALSE, do_sync = FALSE;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
- if (!gst_rtcp_buffer_validate (buffer))
+ if (!gst_rtcp_buffer_validate_reduced (buffer))
goto invalid_packet;
GST_DEBUG ("received RTCP packet");
RTP_SESSION_LOCK (sess);
/* update pinfo stats */
update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time,
- -1, ntpnstime);
+ running_time, ntpnstime);
/* start processing the compound packet */
gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
case GST_RTCP_TYPE_PSFB:
rtp_session_process_feedback (sess, &packet, &pinfo, current_time);
break;
+ case GST_RTCP_TYPE_XR:
+ /* FIXME: This block is added to downgrade warning level.
+ * Once the parser is implemented, it should be replaced with
+ * a proper process function. */
+ GST_DEBUG ("got RTCP XR packet, but ignored");
+ break;
default:
- GST_WARNING ("got unknown RTCP packet");
+ GST_WARNING ("got unknown RTCP packet type: %d", type);
break;
}
more = gst_rtcp_packet_move_to_next (&packet);
}
}
+static guint8
+_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
+{
+ guint i;
+ guint8 extmap_id = 0;
+ guint n_fields = gst_structure_n_fields (s);
+
+ for (i = 0; i < n_fields; i++) {
+ const gchar *field_name = gst_structure_nth_field_name (s, i);
+ if (g_str_has_prefix (field_name, "extmap-")) {
+ const gchar *str = gst_structure_get_string (s, field_name);
+ if (str && g_strcmp0 (str, ext_name) == 0) {
+ gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
+ if (id > 0 && id < 15) {
+ extmap_id = id;
+ break;
+ }
+ }
+ }
+ }
+ return extmap_id;
+}
+
/**
* rtp_session_update_send_caps:
* @sess: an #RTPSession
RTP_SESSION_LOCK (sess);
source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
+ sess->suggested_ssrc = ssrc;
+ sess->internal_ssrc_set = TRUE;
+ sess->internal_ssrc_from_caps_or_property = TRUE;
if (source) {
rtp_source_update_caps (source, caps);
+
+ if (created)
+ on_new_sender_ssrc (sess, source);
+
g_object_unref (source);
}
obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE);
if (source) {
rtp_source_update_caps (source, caps);
+
+ if (created)
+ on_new_sender_ssrc (sess, source);
+
g_object_unref (source);
}
}
RTP_SESSION_UNLOCK (sess);
+ } else {
+ sess->internal_ssrc_from_caps_or_property = FALSE;
+ }
+
+ sess->twcc_send_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
+ if (sess->twcc_send_ext_id > 0) {
+ GST_INFO ("TWCC enabled for send using extension id: %u",
+ sess->twcc_send_ext_id);
}
}
+static void
+send_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo)
+{
+ gint32 twcc_seqnum;
+
+ if (sess->twcc_send_ext_id == 0)
+ return;
+
+ twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_send_ext_id);
+ if (twcc_seqnum == -1)
+ return;
+
+ rtp_twcc_manager_send_packet (sess->twcc, twcc_seqnum, pinfo);
+}
+
+
/**
* rtp_session_send_rtp:
* @sess: an #RTPSession
* @current_time: the current system time
* @running_time: the running time of @data
*
- * Send the RTP buffer in the session manager. This function takes ownership of
- * @buffer.
+ * Send the RTP data (a buffer or buffer list) in the session manager. This
+ * function takes ownership of @data.
*
* Returns: a #GstFlowReturn.
*/
current_time, running_time, -1))
goto invalid_packet;
+ send_twcc_packet (sess, &pinfo);
+
source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
+ if (created)
+ on_new_sender_ssrc (sess, source);
+
+ if (!source->internal) {
+ GSocketAddress *from;
+
+ if (source->rtp_from)
+ from = source->rtp_from;
+ else
+ from = source->rtcp_from;
+ if (from) {
+ if (rtp_session_find_conflicting_address (sess, from, current_time)) {
+ /* Its a known conflict, its probably a loop, not a collision
+ * lets just drop the incoming packet
+ */
+ GST_LOG ("Our packets are being looped back to us, ignoring collision");
+ } else {
+ GST_DEBUG ("Collision for SSRC %x, change our sender ssrc", pinfo.ssrc);
+
+ rtp_session_have_conflict (sess, source, from, current_time);
+
+ goto collision;
+ }
+ } else {
+ GST_LOG ("Ignoring collision on sent SSRC %x because remote source"
+ " doesn't have an address", pinfo.ssrc);
+ }
+ }
prevsender = RTP_SOURCE_IS_SENDER (source);
oldrate = source->bitrate;
GST_DEBUG ("invalid RTP packet received");
return GST_FLOW_OK;
}
+collision:
+ {
+ g_object_unref (source);
+ clean_packet_info (&pinfo);
+ RTP_SESSION_UNLOCK (sess);
+ GST_WARNING ("non-internal source with same ssrc %08x, drop packet",
+ pinfo.ssrc);
+ return GST_FLOW_OK;
+ }
}
static void
stats = &sess->bye_stats;
result = rtp_stats_calculate_bye_interval (stats);
} else {
+ session_update_ptp (sess);
+
stats = &sess->stats;
result = rtp_stats_calculate_rtcp_interval (stats,
- stats->internal_sender_sources > 0, first);
+ stats->internal_sender_sources > 0, sess->rtp_profile,
+ sess->is_doing_ptp, first);
}
GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d",
INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100);
sess->bye_stats.bye_members = 1;
sess->first_rtcp = TRUE;
- sess->allow_early = TRUE;
/* reschedule transmission */
sess->last_rtcp_send_time = current_time;
+ sess->last_rtcp_check_time = current_time;
interval = calculate_rtcp_interval (sess, FALSE, TRUE);
if (interval != GST_CLOCK_TIME_NONE)
sess->next_rtcp_check_time = current_time + interval;
else
sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
+ sess->last_rtcp_interval = interval;
GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
GST_DEBUG ("reconsider BYE, more than 50 sources");
/* reconsider BYE if members >= 50 */
interval = calculate_rtcp_interval (sess, FALSE, TRUE);
+ sess->last_rtcp_interval = interval;
}
} else {
if (sess->first_rtcp) {
GST_DEBUG ("first RTCP packet");
/* we are called for the first time */
interval = calculate_rtcp_interval (sess, FALSE, TRUE);
+ sess->last_rtcp_interval = interval;
} else if (sess->next_rtcp_check_time < current_time) {
GST_DEBUG ("old check time expired, getting new timeout");
/* get a new timeout when we need to */
interval = calculate_rtcp_interval (sess, FALSE, FALSE);
+ sess->last_rtcp_interval = interval;
+
+ if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
+ || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
+ && interval != GST_CLOCK_TIME_NONE) {
+ /* Apply the rules from RFC 4585 section 3.5.3 */
+ if (sess->stats.min_interval != 0) {
+ GstClockTime T_rr_current_interval = g_random_double_range (0.5,
+ 1.5) * sess->stats.min_interval * GST_SECOND;
+
+ if (T_rr_current_interval > interval) {
+ GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
+ GST_TIME_ARGS (interval));
+ interval = T_rr_current_interval;
+ }
+ }
+ }
}
}
gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
+ if (data->is_early && sess->reduced_size_rtcp)
+ return;
+
if (RTP_SOURCE_IS_SENDER (own)) {
guint64 ntptime;
guint32 rtptime;
/* fill in sender report info */
gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
- ntptime, rtptime, packet_count, octet_count);
+ sess->timestamp_sender_reports ? ntptime : 0,
+ sess->timestamp_sender_reports ? rtptime : 0,
+ packet_count, octet_count);
} else {
/* we are only receiver, create RR */
GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
return;
}
- /* only report about other sender */
- if (source == data->source)
+ /* only report about remote sources */
+ if (source->internal)
goto reported;
if (!RTP_SOURCE_IS_SENDER (source)) {
goto reported;
}
+ if (source->disable_rtcp) {
+ GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+ goto reported;
+ }
+
GST_DEBUG ("create RB for SSRC %08x", source->ssrc);
/* get new stats */
static void
session_nack (const gchar * key, RTPSource * source, ReportData * data)
{
+ RTPSession *sess = data->sess;
GstRTCPBuffer *rtcp = &data->rtcpbuf;
GstRTCPPacket *packet = &data->packet;
- guint32 *nacks;
- guint n_nacks, i;
+ guint16 *nacks;
+ GstClockTime *nack_deadlines;
+ guint n_nacks, i = 0;
+ guint nacked_seqnums = 0;
+ guint16 n_fb_nacks = 0;
guint8 *fci_data;
if (!source->send_nack)
return;
+ nacks = rtp_source_get_nacks (source, &n_nacks);
+ nack_deadlines = rtp_source_get_nack_deadlines (source, NULL);
+ GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks,
+ GST_TIME_ARGS (data->current_time));
+
+ /* cleanup expired nacks */
+ for (i = 0; i < n_nacks; i++) {
+ GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i],
+ GST_TIME_ARGS (nack_deadlines[i]));
+ if (nack_deadlines[i] >= data->current_time)
+ break;
+ }
+
+ if (data->is_early) {
+ /* don't remove them all if this is an early RTCP packet. It may happen
+ * that the NACKs are late due to high RTT, not sending NACKs at all would
+ * keep the RTX RTT stats high and maintain a dropping state. */
+ i = MIN (n_nacks - 1, i);
+ }
+
+ if (i) {
+ GST_WARNING ("Removing %u expired NACKS", i);
+ rtp_source_clear_nacks (source, i);
+ n_nacks -= i;
+ if (n_nacks == 0)
+ return;
+ }
+
+ /* allow overriding NACK to packet conversion */
+ if (g_signal_has_handler_pending (sess,
+ rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) {
+ /* this is needed as it will actually resize the buffer */
+ gst_rtcp_buffer_unmap (rtcp);
+
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0,
+ data->source->ssrc, source->ssrc, source->nacks, data->rtcp,
+ &nacked_seqnums);
+
+ /* and now remap for the remaining work */
+ gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
+
+ if (nacked_seqnums > 0)
+ goto done;
+ }
+
if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet))
/* exit because the packet is full, will put next request in a
* further packet */
gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc);
gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc);
- nacks = rtp_source_get_nacks (source, &n_nacks);
- GST_DEBUG ("%u NACKs", n_nacks);
- if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks))
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) {
+ gst_rtcp_packet_remove (packet);
+ GST_WARNING ("no nacks fit in the packet");
return;
+ }
fci_data = gst_rtcp_packet_fb_get_fci (packet);
- for (i = 0; i < n_nacks; i++) {
- GST_WRITE_UINT32_BE (fci_data, nacks[i]);
+ for (i = 0; i < n_nacks; i = nacked_seqnums) {
+ guint16 seqnum = nacks[i];
+ guint16 blp = 0;
+ guint j;
+
+ if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1))
+ break;
+
+ n_fb_nacks++;
+ nacked_seqnums++;
+
+ for (j = i + 1; j < n_nacks; j++) {
+ gint diff;
+
+ diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]);
+ GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff);
+ if (diff > 16)
+ break;
+
+ blp |= 1 << (diff - 1);
+ nacked_seqnums++;
+ }
+
+ GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp);
fci_data += 4;
- data->nacked_seqnums++;
}
- rtp_source_clear_nacks (source);
+ GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks);
+ source->stats.sent_nack_count += n_fb_nacks;
+
+done:
+ data->nacked_seqnums += nacked_seqnums;
+ rtp_source_clear_nacks (source, nacked_seqnums);
data->may_suppress = FALSE;
}
/* check for outdated collisions */
if (source->internal) {
GST_DEBUG ("Timing out collisions for %x", source->ssrc);
- rtp_source_timeout (source, data->current_time,
- data->running_time - sess->rtcp_feedback_retention_window);
+ rtp_source_timeout (source, data->current_time, data->running_time,
+ sess->rtcp_feedback_retention_window);
}
/* nothing else to do when without RTCP */
else
data->is_early = FALSE;
- if (data->is_early && sess->next_early_rtcp_time < current_time) {
- GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %"
+ if (data->is_early && sess->next_early_rtcp_time <= current_time) {
+ GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %"
GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time),
GST_TIME_ARGS (current_time));
- goto early;
- }
-
- /* no need to check yet */
- if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
+ } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE ||
sess->next_rtcp_check_time > current_time) {
GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %"
GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time),
return FALSE;
}
-early:
-
/* take interval and add jitter */
interval = data->interval;
if (interval != GST_CLOCK_TIME_NONE)
interval = rtp_stats_add_rtcp_jitter (stats, interval);
- if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) {
+ if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) {
/* perform forward reconsideration */
if (interval != GST_CLOCK_TIME_NONE) {
GstClockTime elapsed;
/* get elapsed time since we last reported */
- elapsed = current_time - sess->last_rtcp_send_time;
+ elapsed = current_time - sess->last_rtcp_check_time;
GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %"
GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed));
- new_send_time = interval + sess->last_rtcp_send_time;
+ new_send_time = interval + sess->last_rtcp_check_time;
} else {
- new_send_time = sess->last_rtcp_send_time;
+ new_send_time = sess->last_rtcp_check_time;
}
} else {
/* If this is the first RTCP packet, we can reconsider anything based
GST_TIME_ARGS (new_send_time));
/* store new check time */
sess->next_rtcp_check_time = new_send_time;
+ sess->last_rtcp_interval = interval;
return FALSE;
}
- sess->next_rtcp_check_time = current_time + interval;
- } else if (interval != GST_CLOCK_TIME_NONE) {
- /* Apply the rules from RFC 4585 section 3.5.3 */
- if (stats->min_interval != 0 && !sess->first_rtcp) {
- GstClockTime T_rr_current_interval =
- g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
-
- /* This will caused the RTCP to be suppressed if no FB packets are added */
- if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) {
- GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT
- " last: %" GST_TIME_FORMAT
- " + T_rr_current_interval: %" GST_TIME_FORMAT
- " > new_send_time: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (stats->min_interval),
- GST_TIME_ARGS (sess->last_rtcp_send_time),
- GST_TIME_ARGS (T_rr_current_interval),
- GST_TIME_ARGS (new_send_time));
- data->may_suppress = TRUE;
+
+ sess->last_rtcp_interval = interval;
+ if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF
+ || sess->rtp_profile == GST_RTP_PROFILE_SAVPF)
+ && interval != GST_CLOCK_TIME_NONE) {
+ /* Apply the rules from RFC 4585 section 3.5.3 */
+ if (stats->min_interval != 0 && !sess->first_rtcp) {
+ GstClockTime T_rr_current_interval =
+ g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND;
+
+ if (T_rr_current_interval > interval) {
+ GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval),
+ GST_TIME_ARGS (interval));
+ interval = T_rr_current_interval;
+ }
}
}
+ sess->next_rtcp_check_time = current_time + interval;
}
- GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
- GST_TIME_ARGS (new_send_time));
+
+ GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
return TRUE;
}
}
static void
+generate_twcc (const gchar * key, RTPSource * source, ReportData * data)
+{
+ RTPSession *sess = data->sess;
+ GstBuffer *buf;
+
+ /* only generate RTCP for active internal sources */
+ if (!source->internal || source->sent_bye)
+ return;
+
+ /* ignore other sources when we do the timeout after a scheduled BYE */
+ if (sess->scheduled_bye && !source->marked_bye)
+ return;
+
+ /* skip if RTCP is disabled */
+ if (source->disable_rtcp) {
+ GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+ return;
+ }
+
+ while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) {
+ ReportOutput *output = g_slice_new (ReportOutput);
+ output->source = g_object_ref (source);
+ output->is_bye = FALSE;
+ output->buffer = buf;
+ /* queue the RTCP packet to push later */
+ g_queue_push_tail (&data->output, output);
+ }
+}
+
+
+static void
generate_rtcp (const gchar * key, RTPSource * source, ReportData * data)
{
RTPSession *sess = data->sess;
if (sess->scheduled_bye && !source->marked_bye)
return;
+ /* skip if RTCP is disabled */
+ if (source->disable_rtcp) {
+ GST_DEBUG ("source %08x has RTCP disabled", source->ssrc);
+ return;
+ }
+
data->source = source;
/* open packet */
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) session_report_blocks, data);
}
- if (!data->has_sdes)
+ if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp))
session_sdes (sess, data);
if (data->have_fir)
}
}
+static void
+schedule_remaining_nacks (const gchar * key, RTPSource * source,
+ ReportData * data)
+{
+ RTPSession *sess = data->sess;
+ GstClockTime *nack_deadlines;
+ GstClockTime deadline;
+ guint n_nacks;
+
+ if (!source->send_nack)
+ return;
+
+ /* the scheduling is entirely based on available bandwidth, just take the
+ * biggest seqnum, which will have the largest deadline to request early
+ * RTCP. */
+ nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks);
+ deadline = nack_deadlines[n_nacks - 1];
+ RTP_SESSION_UNLOCK (sess);
+ rtp_session_send_rtcp_with_deadline (sess, deadline);
+ RTP_SESSION_LOCK (sess);
+}
+
+static gboolean
+rtp_session_are_all_sources_bye (RTPSession * sess)
+{
+ GHashTableIter iter;
+ RTPSource *src;
+
+ RTP_SESSION_LOCK (sess);
+ g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
+ while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
+ if (src->internal && !src->sent_bye) {
+ RTP_SESSION_UNLOCK (sess);
+ return FALSE;
+ }
+ }
+ RTP_SESSION_UNLOCK (sess);
+
+ return TRUE;
+}
+
/**
* rtp_session_on_timeout:
* @sess: an #RTPSession
ReportData data = { GST_RTCP_BUFFER_INIT };
GHashTable *table_copy;
ReportOutput *output;
+ gboolean all_empty = FALSE;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
source = obtain_internal_source (sess, sess->suggested_ssrc, &created,
current_time);
+ sess->internal_ssrc_set = TRUE;
+
+ if (created)
+ on_new_sender_ssrc (sess, source);
+
g_object_unref (source);
}
if (!is_rtcp_time (sess, current_time, &data))
goto done;
- GST_DEBUG ("doing RTCP generation %u for %u sources, early %d",
- sess->generation, data.num_to_report, data.is_early);
+ /* check if all the buffers are empty after generation */
+ all_empty = TRUE;
+
+ GST_DEBUG
+ ("doing RTCP generation %u for %u sources, early %d, may suppress %d",
+ sess->generation, data.num_to_report, data.is_early, data.may_suppress);
/* generate RTCP for all internal sources */
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) generate_rtcp, &data);
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) generate_twcc, &data);
+
/* update the generation for all the sources that have been reported */
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) update_generation, &data);
/* we keep track of the last report time in order to timeout inactive
* receivers or senders */
- if (!data.is_early && !data.may_suppress)
+ if (!data.is_early) {
+ GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %"
+ GST_TIME_FORMAT " = %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (data.current_time),
+ GST_TIME_ARGS (sess->last_rtcp_send_time),
+ GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time));
sess->last_rtcp_send_time = data.current_time;
+ }
+
+ GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT
+ " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time),
+ GST_TIME_ARGS (sess->last_rtcp_check_time),
+ GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time));
+ sess->last_rtcp_check_time = data.current_time;
sess->first_rtcp = FALSE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
sess->scheduled_bye = FALSE;
- /* RFC 4585 section 3.5.2 step 6 */
- if (!data.is_early) {
- sess->allow_early = TRUE;
- }
-
done:
RTP_SESSION_UNLOCK (sess);
+ /* notify about updated statistics */
+ g_object_notify (G_OBJECT (sess), "stats");
+
/* push out the RTCP packets */
while ((output = g_queue_pop_head (&data.output))) {
- gboolean do_not_suppress;
+ gboolean do_not_suppress, empty_buffer;
GstBuffer *buffer = output->buffer;
RTPSource *source = output->source;
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
buffer, data.is_early, &do_not_suppress);
- if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
+ empty_buffer = gst_buffer_get_size (buffer) == 0;
+
+ if (!empty_buffer)
+ all_empty = FALSE;
+
+ if (sess->callbacks.send_rtcp &&
+ !empty_buffer && (do_not_suppress || !data.may_suppress)) {
guint packet_size;
packet_size = gst_buffer_get_size (buffer) + sess->header_len;
GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
sess->stats.avg_rtcp_packet_size, packet_size);
result =
- sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye,
- sess->send_rtcp_user_data);
+ sess->callbacks.send_rtcp (sess, source, buffer,
+ rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data);
+
+ RTP_SESSION_LOCK (sess);
sess->stats.nacks_sent += data.nacked_seqnums;
+ on_sender_ssrc_active (sess, source);
+ RTP_SESSION_UNLOCK (sess);
} else {
GST_DEBUG ("freeing packet callback: %p"
- " do_not_suppress: %d may_suppress: %d",
- sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
- sess->stats.nacks_dropped += data.nacked_seqnums;
+ " empty_buffer: %d, "
+ " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp,
+ empty_buffer, do_not_suppress, data.may_suppress);
+ if (!empty_buffer) {
+ RTP_SESSION_LOCK (sess);
+ sess->stats.nacks_dropped += data.nacked_seqnums;
+ RTP_SESSION_UNLOCK (sess);
+ }
gst_buffer_unref (buffer);
}
g_object_unref (source);
g_slice_free (ReportOutput, output);
}
+
+ if (all_empty)
+ GST_ERROR ("generated empty RTCP messages for all the sources");
+
+ /* schedule remaining nacks */
+ RTP_SESSION_LOCK (sess);
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) schedule_remaining_nacks, &data);
+ RTP_SESSION_UNLOCK (sess);
+
return result;
}
rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time,
GstClockTime max_delay)
{
- GstClockTime T_dither_max, T_rr;
+ GstClockTime T_dither_max, T_rr, offset = 0;
gboolean ret;
+ gboolean allow_early;
/* Implements the algorithm described in RFC 4585 section 3.5.2 */
RTP_SESSION_LOCK (sess);
+ /* We assume a feedback profile if something is requesting RTCP
+ * to be sent */
+ sess->rtp_profile = GST_RTP_PROFILE_AVPF;
+
/* Check if already requested */
/* RFC 4585 section 3.5.2 step 2 */
if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) {
goto end;
}
- T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time;
+ T_rr = sess->last_rtcp_interval;
/* RFC 4585 section 3.5.2 step 2b */
/* If the total sources is <=2, then there is only us and one peer */
/* RFC 4585 section 3.5.2 step 3 */
if (current_time + T_dither_max > sess->next_rtcp_check_time) {
GST_LOG_OBJECT (sess,
- "don't send because of dither, next scheduled time is soon %"
+ "don't send because of dither, next scheduled time is too soon %"
GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT,
GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max),
GST_TIME_ARGS (sess->next_rtcp_check_time));
- ret = TRUE;
+ ret = T_dither_max <= max_delay;
goto end;
}
- /* RFC 4585 section 3.5.2 step 4a */
- if (sess->allow_early == FALSE) {
+ /* RFC 4585 section 3.5.2 step 4a and
+ * RFC 4585 section 3.5.2 step 6 */
+ allow_early = FALSE;
+ if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) {
+ /* Last time we sent a full RTCP packet, we can now immediately
+ * send an early one as allow_early was reset to TRUE */
+ allow_early = TRUE;
+ } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) {
+ /* Last packet we sent was an early RTCP packet and more than
+ * T_rr has passed since then, meaning we would have suppressed
+ * a regular RTCP packet already and reset allow_early to TRUE */
+ allow_early = TRUE;
+
+ /* We have to offset a bit as T_rr has not passed yet, but will before
+ * max_delay */
+ if (sess->last_rtcp_check_time + T_rr > current_time)
+ offset = (sess->last_rtcp_check_time + T_rr) - current_time;
+ } else {
+ GST_DEBUG_OBJECT (sess,
+ "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %"
+ GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time),
+ GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr),
+ GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay));
+ }
+
+ if (!allow_early) {
/* Ignore the request a scheduled packet will be in time anyway */
if (current_time + max_delay > sess->next_rtcp_check_time) {
GST_LOG_OBJECT (sess,
ret = TRUE;
} else {
GST_LOG_OBJECT (sess,
- "can't allow early feedback, next scheduled time is too late %"
+ "can't allow early feedback and next scheduled time is too late %"
GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT,
GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay),
GST_TIME_ARGS (sess->next_rtcp_check_time));
if (T_dither_max) {
/* Schedule an early transmission later */
sess->next_early_rtcp_time = g_random_double () * T_dither_max +
- current_time;
+ current_time + offset;
} else {
/* If no dithering, schedule it for NOW */
- sess->next_early_rtcp_time = current_time;
+ sess->next_early_rtcp_time = current_time + offset;
}
- /* RFC 4585 section 3.5.2 step 6 */
- sess->allow_early = FALSE;
- /* Delay next regular RTCP packet to not exceed the short-term
- * RTCP bandwidth when using early feedback as compared to
- * without */
- sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr;
- sess->last_rtcp_send_time += T_rr;
-
GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT
", next regular RTCP time %" GST_TIME_FORMAT,
GST_TIME_ARGS (sess->next_early_rtcp_time),
}
static gboolean
+rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now,
+ GstClockTime max_delay)
+{
+ /* notify the application that we intend to send early RTCP */
+ if (sess->callbacks.notify_early_rtcp)
+ sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data);
+
+ return rtp_session_request_early_rtcp (sess, now, max_delay);
+}
+
+static gboolean
+rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline)
+{
+ GstClockTime now, max_delay;
+
+ if (!sess->callbacks.send_rtcp)
+ return FALSE;
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
+
+ if (deadline < now)
+ return FALSE;
+
+ max_delay = deadline - now;
+
+ return rtp_session_send_rtcp_internal (sess, now, max_delay);
+}
+
+static gboolean
rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay)
{
GstClockTime now;
now = sess->callbacks.request_time (sess, sess->request_time_user_data);
- return rtp_session_request_early_rtcp (sess, now, max_delay);
+ return rtp_session_send_rtcp_internal (sess, now, max_delay);
}
gboolean
{
RTPSource *src;
- if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
- GST_DEBUG ("FIR/PLI not sent");
- return FALSE;
- }
-
RTP_SESSION_LOCK (sess);
src = find_source (sess, ssrc);
if (src == NULL)
}
RTP_SESSION_UNLOCK (sess);
+ if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) {
+ GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP");
+ }
+
return TRUE;
/* ERRORS */
GstClockTime max_delay)
{
RTPSource *source;
+ GstClockTime now;
- if (!rtp_session_send_rtcp (sess, max_delay)) {
- GST_DEBUG ("NACK not sent");
+ if (!sess->callbacks.send_rtcp)
return FALSE;
- }
+
+ now = sess->callbacks.request_time (sess, sess->request_time_user_data);
RTP_SESSION_LOCK (sess);
source = find_source (sess, ssrc);
if (source == NULL)
goto no_source;
- GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum);
- rtp_source_register_nack (source, seqnum);
+ GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT,
+ ssrc, seqnum, GST_TIME_ARGS (now + max_delay));
+ rtp_source_register_nack (source, seqnum, now + max_delay);
RTP_SESSION_UNLOCK (sess);
+ if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) {
+ GST_DEBUG ("NACK not sent early, sending with next regular RTCP");
+ }
+
return TRUE;
/* ERRORS */
return FALSE;
}
}
+
+/**
+ * rtp_session_update_recv_caps_structure:
+ * @sess: an #RTPSession
+ * @s: a #GstStructure from a #GstCaps
+ *
+ * Update the caps of the receiver in the rtp session.
+ */
+void
+rtp_session_update_recv_caps_structure (RTPSession * sess,
+ const GstStructure * s)
+{
+ guint8 ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
+ if (ext_id > 0) {
+ sess->twcc_recv_ext_id = ext_id;
+ GST_INFO ("TWCC enabled for recv using extension id: %u",
+ sess->twcc_recv_ext_id);
+ }
+}