X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Frtpsession.c;h=7aa50a657e68f0a6104110952a5ca4523df53871;hb=deeb3be3ec26feef48f277d85bf55e816a228d4e;hp=c8c6908a489d0831f76b4561cddbaae589aee338;hpb=6f830e5bd5c45ea4e778002ea46964901a6f5ae0;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index c8c6908..7aa50a6 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -22,6 +22,7 @@ #define GLIB_DISABLE_DEPRECATION_WARNINGS #include +#include #include #include @@ -30,7 +31,7 @@ #include "rtpsession.h" -GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); +GST_DEBUG_CATEGORY (rtp_session_debug); #define GST_CAT_DEFAULT rtp_session_debug /* signals and args */ @@ -47,10 +48,14 @@ enum SIGNAL_ON_TIMEOUT, SIGNAL_ON_SENDER_TIMEOUT, SIGNAL_ON_SENDING_RTCP, + SIGNAL_ON_APP_RTCP, SIGNAL_ON_FEEDBACK_RTCP, SIGNAL_SEND_RTCP, SIGNAL_SEND_RTCP_FULL, SIGNAL_ON_RECEIVING_RTCP, + SIGNAL_ON_NEW_SENDER_SSRC, + SIGNAL_ON_SENDER_SSRC_ACTIVE, + SIGNAL_ON_SENDING_NACKS, LAST_SIGNAL }; @@ -68,7 +73,11 @@ enum #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND) #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3) #define DEFAULT_PROBATION RTP_DEFAULT_PROBATION +#define DEFAULT_MAX_DROPOUT_TIME 60000 +#define DEFAULT_MAX_MISORDER_TIME 2000 #define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP +#define DEFAULT_RTCP_REDUCED_SIZE FALSE +#define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE enum { @@ -89,8 +98,12 @@ enum PROP_RTCP_FEEDBACK_RETENTION_WINDOW, PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, PROP_PROBATION, + PROP_MAX_DROPOUT_TIME, + PROP_MAX_MISORDER_TIME, PROP_STATS, - PROP_RTP_PROFILE + PROP_RTP_PROFILE, + PROP_RTCP_REDUCED_SIZE, + PROP_RTCP_DISABLE_SR_TIMESTAMP }; /* update average packet size */ @@ -103,6 +116,8 @@ enum (avg) = ((val) + (15 * (avg))) >> 4; +#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" + /* GObject vmethods */ static void rtp_session_finalize (GObject * object); static void rtp_session_set_property (GObject * object, guint prop_id, @@ -112,6 +127,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id, static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay); +static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess, + GstClockTime deadline); static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; @@ -158,7 +175,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_GET_SOURCE_BY_SSRC] = g_signal_new ("get-source-by-ssrc", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass, - get_source_by_ssrc), NULL, NULL, g_cclosure_marshal_generic, + get_source_by_ssrc), NULL, NULL, NULL, RTP_TYPE_SOURCE, 1, G_TYPE_UINT); /** @@ -171,8 +188,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_NEW_SSRC] = g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-ssrc-collision: * @session: the object which received the signal @@ -183,8 +199,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] = g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-ssrc-validated: * @session: the object which received the signal @@ -195,8 +210,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] = g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-ssrc-active: * @session: the object which received the signal @@ -207,8 +221,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_SSRC_ACTIVE] = g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_active), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-ssrc-sdes: * @session: the object which received the signal @@ -219,8 +232,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_SSRC_SDES] = g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_sdes), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-bye-ssrc: * @session: the object which received the signal @@ -231,8 +243,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_BYE_SSRC] = g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-bye-timeout: * @session: the object which received the signal @@ -243,8 +254,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] = g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-timeout: * @session: the object which received the signal @@ -255,8 +265,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_TIMEOUT] = g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-sender-timeout: * @session: the object which received the signal @@ -267,8 +276,7 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_SENDER_TIMEOUT] = g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sender_timeout), - NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, - RTP_TYPE_SOURCE); + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); /** * RTPSession::on-sending-rtcp @@ -285,10 +293,27 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_SENDING_RTCP] = g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp), - accumulate_trues, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2, + accumulate_trues, NULL, NULL, G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN); /** + * RTPSession::on-app-rtcp: + * @session: the object which received the signal + * @subtype: The subtype of the packet + * @ssrc: The SSRC/CSRC of the packet + * @name: The name of the packet + * @data: a #GstBuffer with the application-dependant data or %NULL if + * there was no data + * + * Notify that a RTCP APP packet has been received + */ + rtp_session_signals[SIGNAL_ON_APP_RTCP] = + g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp), + NULL, NULL, NULL, G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT, + G_TYPE_STRING, GST_TYPE_BUFFER); + + /** * RTPSession::on-feedback-rtcp: * @session: the object which received the signal * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or @@ -304,8 +329,8 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] = g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp), - NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 5, G_TYPE_UINT, - G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, GST_TYPE_BUFFER); + NULL, NULL, NULL, G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, + G_TYPE_UINT, GST_TYPE_BUFFER); /** * RTPSession::send-rtcp: @@ -322,7 +347,7 @@ rtp_session_class_init (RTPSessionClass * klass) g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL, - g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64); + NULL, G_TYPE_NONE, 1, G_TYPE_UINT64); /** * RTPSession::send-rtcp-full: @@ -344,7 +369,7 @@ rtp_session_class_init (RTPSessionClass * klass) g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL, - g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64); + NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64); /** * RTPSession::on-receiving-rtcp @@ -359,13 +384,81 @@ rtp_session_class_init (RTPSessionClass * klass) rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] = g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp), - NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, + NULL, NULL, NULL, G_TYPE_NONE, 1, + GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE); + + /** + * RTPSession::on-new-sender-ssrc: + * @session: the object which received the signal + * @src: the new sender RTPSource + * + * Notify of a new sender SSRC that entered @session. + * + * Since: 1.8 + */ + rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] = + g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc), + NULL, NULL, NULL, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); + + /** + * RTPSession::on-sender-ssrc-active: + * @session: the object which received the signal + * @src: the active sender RTPSource + * + * Notify of a sender SSRC that is active, i.e., sending RTCP. + * + * Since: 1.8 + */ + rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] = + g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, + on_sender_ssrc_active), NULL, NULL, NULL, + G_TYPE_NONE, 1, RTP_TYPE_SOURCE); + + /** + * RTPSession::on-sending-nack + * @session: the object which received the signal + * @sender_ssrc: the sender ssrc + * @media_ssrc: the media ssrc + * @nacks: (element-type guint16): the list of seqnum to be nacked + * @buffer: the #GstBuffer containing the RTCP packet about to be sent + * + * This signal is emitted before NACK packets are added into the RTCP + * packet. This signal can be used to override the conversion of the NACK + * seqnum array into packets. This can be used if your protocol uses + * different type of NACK (e.g. based on RTCP APP). + * + * The handler should transform the seqnum from @nacks array into packets. + * @nacks seqnum must be consumed from the start. The remaining will be + * rescheduled for later base on bandwidth. Only one handler will be + * signalled. + * + * A handler may return 0 to signal that generic NACKs should be created + * for this set. This can be useful if the signal is used for other purpose + * or if the other type of NACK would use more space. + * + * Returns: the number of NACK seqnum that was consumed from @nacks. + * + * Since: 1.16 + */ + rtp_session_signals[SIGNAL_ON_SENDING_NACKS] = + g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks), + g_signal_accumulator_first_wins, NULL, NULL, + G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY, GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE); g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC, g_param_spec_uint ("internal-ssrc", "Internal SSRC", "The internal SSRC used for the session (deprecated)", - 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + 0, G_MAXUINT, 0, +#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_DOC_SHOW_DEFAULT)); +#else + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#endif g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE, g_param_spec_object ("internal-source", "Internal Source", @@ -374,25 +467,25 @@ rtp_session_class_init (RTPSessionClass * klass) g_object_class_install_property (gobject_class, PROP_BANDWIDTH, g_param_spec_double ("bandwidth", "Bandwidth", - "The bandwidth of the session (0 for auto-discover)", + "The bandwidth of the session in bits per second (0 for auto-discover)", 0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION, g_param_spec_double ("rtcp-fraction", "RTCP Fraction", - "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)", + "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)", 0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH, g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth", - "The RTCP bandwidth used for receivers in bytes per second (-1 = default)", + "The RTCP bandwidth used for receivers in bits per second (-1 = default)", -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH, g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth", - "The RTCP bandwidth used for senders in bytes per second (-1 = default)", + "The RTCP bandwidth used for senders in bits per second (-1 = default)", -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); @@ -405,7 +498,12 @@ rtp_session_class_init (RTPSessionClass * klass) g_object_class_install_property (gobject_class, PROP_SDES, g_param_spec_boxed ("sdes", "SDES", "The SDES items of this session", +#ifndef TIZEN_FEATURE_GST_UPSTREAM_AVOID_BUILD_BREAK + GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS + | GST_PARAM_DOC_SHOW_DEFAULT)); +#else GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#endif g_object_class_install_property (gobject_class, PROP_NUM_SOURCES, g_param_spec_uint ("num-sources", "Num Sources", @@ -418,13 +516,13 @@ rtp_session_class_init (RTPSessionClass * klass) DEFAULT_NUM_ACTIVE_SOURCES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** - * RTPSource::sources + * RTPSource:sources * * Get a GValue Array of all sources in the session. * - * - * Getting the #RTPSources of a session - * <programlisting> + * ## Getting the #RTPSources of a session + * + * ``` C * { * GValueArray *arr; * GValue *val; @@ -440,8 +538,7 @@ rtp_session_class_init (RTPSessionClass * klass) * } * g_value_array_free (arr); * } - * </programlisting> - * </example> + * ``` */ g_object_class_install_property (gobject_class, PROP_SOURCES, g_param_spec_boxed ("sources", "Sources", @@ -482,16 +579,30 @@ rtp_session_class_init (RTPSessionClass * klass) 0, G_MAXUINT, DEFAULT_PROBATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME, + g_param_spec_uint ("max-dropout-time", "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME, + g_param_spec_uint ("max-misorder-time", "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** - * RTPSession::stats: + * RTPSession:stats: * * Various session statistics. This property returns a GstStructure * with name application/x-rtp-session-stats with the following fields: * - * "rtx-drop-count" G_TYPE_UINT The number of retransmission events + * * "rtx-drop-count" G_TYPE_UINT The number of retransmission events * dropped (due to bandwidth constraints) - * "sent-nack-count" G_TYPE_UINT Number of NACKs sent - * "recv-nack-count" G_TYPE_UINT Number of NACKs received + * * "sent-nack-count" G_TYPE_UINT Number of NACKs sent + * * "recv-nack-count" G_TYPE_UINT Number of NACKs received + * * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource:stats for all + * RTP sources (Since 1.8) * * Since: 1.4 */ @@ -505,6 +616,27 @@ rtp_session_class_init (RTPSessionClass * klass) "RTP profile to use for this session", GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE, + g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size", + "Use Reduced Size RTCP for feedback packets", + DEFAULT_RTCP_REDUCED_SIZE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * RTPSession:disable-sr-timestamp: + * + * Whether sender reports should be timestamped. + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, + PROP_RTCP_DISABLE_SR_TIMESTAMP, + g_param_spec_boolean ("disable-sr-timestamp", + "Disable Sender Report Timestamp", + "Whether sender reports should be timestamped", + DEFAULT_RTCP_DISABLE_SR_TIMESTAMP, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + klass->get_source_by_ssrc = GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp); @@ -545,10 +677,12 @@ rtp_session_init (RTPSession * sess) sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH; /* default UDP header length */ - sess->header_len = 28; + sess->header_len = UDP_IP_HEADER_OVERHEAD; sess->mtu = DEFAULT_RTCP_MTU; sess->probation = DEFAULT_PROBATION; + sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME; + sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; /* some default SDES entries */ sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes"); @@ -569,22 +703,26 @@ rtp_session_init (RTPSession * sess) /* this is the SSRC we suggest */ sess->suggested_ssrc = rtp_session_create_new_ssrc (sess); + sess->internal_ssrc_set = FALSE; sess->first_rtcp = TRUE; sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE; sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_interval = GST_CLOCK_TIME_NONE; - sess->allow_early = TRUE; sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW; sess->rtcp_immediate_feedback_threshold = DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD; sess->rtp_profile = DEFAULT_RTP_PROFILE; - - sess->last_keyframe_request = GST_CLOCK_TIME_NONE; + sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE; + sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP; sess->is_doing_ptp = TRUE; + + sess->twcc = rtp_twcc_manager_new (sess->mtu); + sess->twcc_stats = rtp_twcc_stats_new (); } static void @@ -606,6 +744,9 @@ rtp_session_finalize (GObject * object) for (i = 0; i < 1; i++) g_hash_table_destroy (sess->ssrcs[i]); + g_object_unref (sess->twcc); + rtp_twcc_stats_free (sess->twcc_stats); + g_mutex_clear (&sess->lock); G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); @@ -641,16 +782,44 @@ rtp_session_create_sources (RTPSession * sess) return res; } +static void +create_source_stats (gpointer key, RTPSource * source, GValueArray * arr) +{ + GValue *value; + GstStructure *s; + + g_object_get (source, "stats", &s, NULL); + + g_value_array_append (arr, NULL); + value = g_value_array_get_nth (arr, arr->n_values - 1); + g_value_init (value, GST_TYPE_STRUCTURE); + g_value_take_boxed (value, s); +} + static GstStructure * rtp_session_create_stats (RTPSession * sess) { GstStructure *s; + GValueArray *source_stats; + GValue source_stats_v = G_VALUE_INIT; + guint size; + RTP_SESSION_LOCK (sess); s = gst_structure_new ("application/x-rtp-session-stats", "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped, "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent, "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL); + size = g_hash_table_size (sess->ssrcs[sess->mask_idx]); + source_stats = g_value_array_new (size); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) create_source_stats, source_stats); + RTP_SESSION_UNLOCK (sess); + + g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY); + g_value_take_boxed (&source_stats_v, source_stats); + gst_structure_take_value (s, "source-stats", &source_stats_v); + return s; } @@ -666,6 +835,8 @@ rtp_session_set_property (GObject * object, guint prop_id, case PROP_INTERNAL_SSRC: RTP_SESSION_LOCK (sess); sess->suggested_ssrc = g_value_get_uint (value); + sess->internal_ssrc_set = TRUE; + sess->internal_ssrc_from_caps_or_property = TRUE; RTP_SESSION_UNLOCK (sess); if (sess->callbacks.reconfigure) sess->callbacks.reconfigure (sess, sess->reconfigure_user_data); @@ -696,6 +867,7 @@ rtp_session_set_property (GObject * object, guint prop_id, break; case PROP_RTCP_MTU: sess->mtu = g_value_get_uint (value); + rtp_twcc_manager_set_mtu (sess->twcc, sess->mtu); break; case PROP_SDES: rtp_session_set_sdes_struct (sess, g_value_get_boxed (value)); @@ -713,12 +885,21 @@ rtp_session_set_property (GObject * object, guint prop_id, if (sess->callbacks.reconsider) sess->callbacks.reconsider (sess, sess->reconsider_user_data); break; + case PROP_RTCP_FEEDBACK_RETENTION_WINDOW: + sess->rtcp_feedback_retention_window = g_value_get_uint64 (value); + break; case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value); break; case PROP_PROBATION: sess->probation = g_value_get_uint (value); break; + case PROP_MAX_DROPOUT_TIME: + sess->max_dropout_time = g_value_get_uint (value); + break; + case PROP_MAX_MISORDER_TIME: + sess->max_misorder_time = g_value_get_uint (value); + break; case PROP_RTP_PROFILE: sess->rtp_profile = g_value_get_enum (value); /* trigger reconsideration */ @@ -728,6 +909,12 @@ rtp_session_set_property (GObject * object, guint prop_id, if (sess->callbacks.reconsider) sess->callbacks.reconsider (sess, sess->reconsider_user_data); break; + case PROP_RTCP_REDUCED_SIZE: + sess->reduced_size_rtcp = g_value_get_boolean (value); + break; + case PROP_RTCP_DISABLE_SR_TIMESTAMP: + sess->timestamp_sender_reports = !g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -744,7 +931,7 @@ rtp_session_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_INTERNAL_SSRC: - g_value_set_uint (value, rtp_session_suggest_ssrc (sess)); + g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL)); break; case PROP_INTERNAL_SOURCE: /* FIXME, return a random source */ @@ -783,18 +970,33 @@ rtp_session_get_property (GObject * object, guint prop_id, case PROP_RTCP_MIN_INTERVAL: g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND); break; + case PROP_RTCP_FEEDBACK_RETENTION_WINDOW: + g_value_set_uint64 (value, sess->rtcp_feedback_retention_window); + break; case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold); break; case PROP_PROBATION: g_value_set_uint (value, sess->probation); break; + case PROP_MAX_DROPOUT_TIME: + g_value_set_uint (value, sess->max_dropout_time); + break; + case PROP_MAX_MISORDER_TIME: + g_value_set_uint (value, sess->max_misorder_time); + break; case PROP_STATS: g_value_take_boxed (value, rtp_session_create_stats (sess)); break; case PROP_RTP_PROFILE: g_value_set_enum (value, sess->rtp_profile); break; + case PROP_RTCP_REDUCED_SIZE: + g_value_set_boolean (value, sess->reduced_size_rtcp); + break; + case PROP_RTCP_DISABLE_SR_TIMESTAMP: + g_value_set_boolean (value, !sess->timestamp_sender_reports); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -895,6 +1097,28 @@ on_sender_timeout (RTPSession * sess, RTPSource * source) g_object_unref (source); } +static void +on_new_sender_ssrc (RTPSession * sess, RTPSource * source) +{ + g_object_ref (source); + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0, + source); + RTP_SESSION_LOCK (sess); + g_object_unref (source); +} + +static void +on_sender_ssrc_active (RTPSession * sess, RTPSource * source) +{ + g_object_ref (source); + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0, + source); + RTP_SESSION_LOCK (sess); + g_object_unref (source); +} + /** * rtp_session_new: * @@ -913,6 +1137,47 @@ rtp_session_new (void) } /** + * rtp_session_reset: + * @sess: an #RTPSession + * + * Reset the sources of @sess. + */ +void +rtp_session_reset (RTPSession * sess) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + /* remove all sources */ + g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]); + sess->total_sources = 0; + sess->stats.sender_sources = 0; + sess->stats.internal_sender_sources = 0; + sess->stats.internal_sources = 0; + sess->stats.active_sources = 0; + + sess->generation = 0; + sess->first_rtcp = TRUE; + sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_interval = GST_CLOCK_TIME_NONE; + sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; + sess->scheduled_bye = FALSE; + + /* reset session stats */ + sess->stats.bye_members = 0; + sess->stats.nacks_dropped = 0; + sess->stats.nacks_sent = 0; + sess->stats.nacks_received = 0; + + sess->is_doing_ptp = TRUE; + + g_list_free_full (sess->conflicting_addresses, + (GDestroyNotify) rtp_conflicting_address_free); + sess->conflicting_addresses = NULL; +} + +/** * rtp_session_set_callbacks: * @sess: an #RTPSession * @callbacks: callbacks to configure @@ -962,10 +1227,18 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.notify_nack = callbacks->notify_nack; sess->notify_nack_user_data = user_data; } + if (callbacks->notify_twcc) { + sess->callbacks.notify_twcc = callbacks->notify_twcc; + sess->notify_twcc_user_data = user_data; + } if (callbacks->reconfigure) { sess->callbacks.reconfigure = callbacks->reconfigure; sess->reconfigure_user_data = user_data; } + if (callbacks->notify_early_rtcp) { + sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp; + sess->notify_early_rtcp_user_data = user_data; + } } /** @@ -1099,7 +1372,7 @@ rtp_session_set_request_time_callback (RTPSession * sess, * @sess: an #RTPSession * @bandwidth: the bandwidth allocated * - * Set the session bandwidth in bytes per second. + * Set the session bandwidth in bits per second. */ void rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth) @@ -1138,7 +1411,7 @@ rtp_session_get_bandwidth (RTPSession * sess) * @sess: an #RTPSession * @bandwidth: the RTCP bandwidth * - * Set the bandwidth in bytes per second that should be used for RTCP + * Set the bandwidth in bits per second that should be used for RTCP * messages. */ void @@ -1197,6 +1470,12 @@ rtp_session_get_sdes_struct (RTPSession * sess) return result; } +static void +source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes) +{ + rtp_source_set_sdes_struct (source, gst_structure_copy (sdes)); +} + /** * rtp_session_set_sdes_struct: * @sess: an #RTSPSession @@ -1214,6 +1493,9 @@ rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes) if (sess->sdes) gst_structure_free (sess->sdes); sess->sdes = gst_structure_copy (sdes); + + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) source_set_sdes, sess->sdes); RTP_SESSION_UNLOCK (sess); } @@ -1312,6 +1594,27 @@ rtp_session_add_conflicting_address (RTPSession * sess, add_conflicting_address (sess->conflicting_addresses, address, time); } +static void +rtp_session_have_conflict (RTPSession * sess, RTPSource * source, + GSocketAddress * address, GstClockTime current_time) +{ + guint32 ssrc = rtp_source_get_ssrc (source); + + /* Its a new collision, lets change our SSRC */ + rtp_session_add_conflicting_address (sess, address, current_time); + + /* mark the source BYE */ + rtp_source_mark_bye (source, "SSRC Collision"); + /* if we were suggesting this SSRC, change to something else */ + if (sess->suggested_ssrc == ssrc) { + sess->suggested_ssrc = rtp_session_create_new_ssrc (sess); + sess->internal_ssrc_set = TRUE; + } + + on_ssrc_collision (sess, source); + + rtp_session_schedule_bye_locked (sess, current_time); +} static gboolean check_collision (RTPSession * sess, RTPSource * source, @@ -1405,20 +1708,11 @@ check_collision (RTPSession * sess, RTPSource * source, */ GST_DEBUG ("Our packets are being looped back to us, dropping"); } else { - /* Its a new collision, lets change our SSRC */ - rtp_session_add_conflicting_address (sess, pinfo->address, - pinfo->current_time); + GST_DEBUG ("Collision for SSRC %x from new incoming packet," + " change our sender ssrc", ssrc); - GST_DEBUG ("Collision for SSRC %x", ssrc); - /* mark the source BYE */ - rtp_source_mark_bye (source, "SSRC Collision"); - /* if we were suggesting this SSRC, change to something else */ - if (sess->suggested_ssrc == ssrc) - sess->suggested_ssrc = rtp_session_create_new_ssrc (sess); - - on_ssrc_collision (sess, source); - - rtp_session_schedule_bye_locked (sess, pinfo->current_time); + rtp_session_have_conflict (sess, source, pinfo->address, + pinfo->current_time); } } @@ -1522,8 +1816,11 @@ add_source (RTPSession * sess, RTPSource * src) sess->stats.active_sources++; if (src->internal) { sess->stats.internal_sources++; - if (sess->suggested_ssrc != src->ssrc) + if (!sess->internal_ssrc_from_caps_or_property + && sess->suggested_ssrc != src->ssrc) { sess->suggested_ssrc = src->ssrc; + sess->internal_ssrc_set = TRUE; + } } /* update point-to-point status */ @@ -1556,10 +1853,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, /* for RTP packets we need to set the source in probation. Receiving RTCP * packets of an SSRC, on the other hand, is a strong indication that we * are dealing with a valid source. */ - if (rtp) - g_object_set (source, "probation", sess->probation, NULL); - else - g_object_set (source, "probation", 0, NULL); + g_object_set (source, "probation", rtp ? sess->probation : 0, + "max-dropout-time", sess->max_dropout_time, "max-misorder-time", + sess->max_misorder_time, NULL); /* store from address, if any */ if (pinfo->address) { @@ -1633,13 +1929,14 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created, /** * rtp_session_suggest_ssrc: * @sess: a #RTPSession + * @is_random: if the suggested ssrc is random * * Suggest an unused SSRC in @sess. * * Returns: a free unused SSRC */ guint32 -rtp_session_suggest_ssrc (RTPSession * sess) +rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random) { guint32 result; @@ -1647,6 +1944,8 @@ rtp_session_suggest_ssrc (RTPSession * sess) RTP_SESSION_LOCK (sess); result = sess->suggested_ssrc; + if (is_random) + *is_random = !sess->internal_ssrc_set; RTP_SESSION_UNLOCK (sess); return result; @@ -1769,34 +2068,6 @@ rtp_session_create_new_ssrc (RTPSession * sess) return ssrc; } - -/** - * rtp_session_create_source: - * @sess: an #RTPSession - * - * Create an #RTPSource for use in @sess. This function will create a source - * with an ssrc that is currently not used by any participants in the session. - * - * Returns: an #RTPSource. - */ -RTPSource * -rtp_session_create_source (RTPSession * sess) -{ - guint32 ssrc; - RTPSource *source; - - RTP_SESSION_LOCK (sess); - ssrc = rtp_session_create_new_ssrc (sess); - source = rtp_source_new (ssrc); - rtp_source_set_callbacks (source, &callbacks, sess); - /* we need an additional ref for the source in the hashtable */ - g_object_ref (source); - add_source (sess, source); - RTP_SESSION_UNLOCK (sess); - - return source; -} - static gboolean update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo) { @@ -1821,10 +2092,15 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo) pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp); pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp); pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp); + pinfo->marker = gst_rtp_buffer_get_marker (&rtp); /* copy available csrc */ pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp); for (i = 0; i < pinfo->csrc_count; i++) pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i); + + /* RTP header extensions */ + pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp, + &pinfo->header_ext_bit_pattern); } gst_rtp_buffer_unmap (&rtp); } @@ -1853,7 +2129,7 @@ invalid_packet: /* update the RTPPacketInfo structure with the current time and other bits * about the current buffer we are handling. * This function is typically called when a validated packet is received. - * This function should be called with the SESSION_LOCK + * This function should be called with the RTP_SESSION_LOCK */ static gboolean update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, @@ -1873,6 +2149,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, pinfo->bytes = 0; pinfo->payload_len = 0; pinfo->packets = 0; + pinfo->marker = FALSE; if (is_list) { GstBufferList *list = GST_BUFFER_LIST_CAST (data); @@ -1883,6 +2160,7 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, GstBuffer *buffer = GST_BUFFER_CAST (data); res = update_packet (&buffer, 0, pinfo); } + return res; } @@ -1895,6 +2173,24 @@ clean_packet_info (RTPPacketInfo * pinfo) gst_mini_object_unref (pinfo->data); pinfo->data = NULL; } + if (pinfo->header_ext) + g_bytes_unref (pinfo->header_ext); +} + +static gint32 +packet_info_get_twcc_seqnum (RTPPacketInfo * pinfo, guint8 ext_id) +{ + gint32 val = -1; + gpointer data; + guint size; + + if (pinfo->header_ext && + gst_rtp_buffer_get_extension_onebyte_header_from_bytes (pinfo->header_ext, + pinfo->header_ext_bit_pattern, ext_id, 0, &data, &size)) { + if (size == 2) + val = GST_READ_UINT16_BE (data); + } + return val; } static gboolean @@ -1919,6 +2215,30 @@ source_update_active (RTPSession * sess, RTPSource * source, return TRUE; } +static void +process_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo) +{ + gint32 twcc_seqnum; + + if (sess->twcc_recv_ext_id == 0) + return; + + twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_recv_ext_id); + if (twcc_seqnum == -1) + return; + + if (rtp_twcc_manager_recv_packet (sess->twcc, twcc_seqnum, pinfo)) { + RTP_SESSION_UNLOCK (sess); + + /* TODO: find a better rational for this number, and possibly tune it based + on factors like framerate / bandwidth etc */ + if (!rtp_session_send_rtcp (sess, 100 * GST_MSECOND)) { + GST_INFO ("Could not schedule TWCC straight away"); + } + RTP_SESSION_LOCK (sess); + } +} + static gboolean source_update_sender (RTPSession * sess, RTPSource * source, gboolean prevsender) @@ -1979,7 +2299,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, current_time, running_time, ntpnstime)) { GST_DEBUG ("invalid RTP packet received"); RTP_SESSION_UNLOCK (sess); - return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime); + return rtp_session_process_rtcp (sess, buffer, current_time, running_time, + ntpnstime); } ssrc = pinfo.ssrc; @@ -1992,8 +2313,12 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, prevactive = RTP_SOURCE_IS_ACTIVE (source); oldrate = source->bitrate; + if (created) + on_new_ssrc (sess, source); + /* let source process the packet */ result = rtp_source_process_rtp (source, &pinfo); + process_twcc_packet (sess, &pinfo); /* source became active */ if (source_update_active (sess, source, prevactive)) @@ -2004,8 +2329,6 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, if (oldrate != source->bitrate) sess->recalc_bandwidth = TRUE; - if (created) - on_new_ssrc (sess, source); if (source->validated) { gboolean created; @@ -2235,7 +2558,11 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, value = g_strndup ((const gchar *) data, len); - gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL); + if (g_utf8_validate (value, -1, NULL)) { + gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL); + } else { + GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value); + } g_free (name); g_free (value); @@ -2285,21 +2612,18 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, for (i = 0; i < count; i++) { guint32 ssrc; RTPSource *source; - gboolean created, prevactive, prevsender; + gboolean prevactive, prevsender; guint pmembers, members; ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i); GST_DEBUG ("SSRC: %08x", ssrc); /* find src and mark bye, no probation when dealing with RTCP */ - source = obtain_source (sess, ssrc, &created, pinfo, FALSE); - if (!source) - return; - - if (source->internal) { - /* our own source, something weird with this packet */ - g_object_unref (source); - continue; + source = find_source (sess, ssrc); + if (!source || source->internal) { + GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)", + !source ? "can't find source" : "has internal source SSRC"); + break; } /* store time for when we need to time out this source */ @@ -2326,27 +2650,38 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, pinfo->current_time < sess->next_rtcp_check_time) { GstClockTime time_remaining; - time_remaining = sess->next_rtcp_check_time - pinfo->current_time; - sess->next_rtcp_check_time = - gst_util_uint64_scale (time_remaining, members, pmembers); + /* Scale our next RTCP check time according to the change of numbers + * of members. But only if a) this is the first RTCP, or b) this is not + * a feedback session, or c) this is a feedback session but we schedule + * for every RTCP interval (aka no t-rr-interval set). + * + * FIXME: a) and b) are not great as we will possibly go below Tmin + * for non-feedback profiles and in case of a) below + * Tmin/t-rr-interval in any case. + */ + if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE || + !(sess->rtp_profile == GST_RTP_PROFILE_AVPF + || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) || + sess->next_rtcp_check_time - sess->last_rtcp_send_time == + sess->last_rtcp_interval) { + time_remaining = sess->next_rtcp_check_time - pinfo->current_time; + sess->next_rtcp_check_time = + gst_util_uint64_scale (time_remaining, members, pmembers); + sess->next_rtcp_check_time += pinfo->current_time; + } + sess->last_rtcp_interval = + gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers); GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time)); - sess->next_rtcp_check_time += pinfo->current_time; - /* mark pending reconsider. We only want to signal the reconsideration * once after we handled all the source in the bye packet */ reconsider = TRUE; } } - if (created) - on_new_ssrc (sess, source); - on_bye_ssrc (sess, source); - - g_object_unref (source); } if (reconsider) { RTP_SESSION_UNLOCK (sess); @@ -2355,6 +2690,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, sess->callbacks.reconsider (sess, sess->reconsider_user_data); RTP_SESSION_LOCK (sess); } + g_free (reason); } @@ -2363,38 +2699,70 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, RTPPacketInfo * pinfo) { GST_DEBUG ("received APP"); + + if (g_signal_has_handler_pending (sess, + rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) { + GstBuffer *data_buffer = NULL; + guint16 data_length; + gchar name[5]; + + data_length = gst_rtcp_packet_app_get_data_length (packet) * 4; + if (data_length > 0) { + guint8 *data = gst_rtcp_packet_app_get_data (packet); + data_buffer = gst_buffer_copy_region (packet->rtcp->buffer, + GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length); + GST_BUFFER_PTS (data_buffer) = pinfo->running_time; + } + + memcpy (name, gst_rtcp_packet_app_get_name (packet), 4); + name[4] = '\0'; + + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, + gst_rtcp_packet_app_get_subtype (packet), + gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer); + RTP_SESSION_LOCK (sess); + + if (data_buffer) + gst_buffer_unref (data_buffer); + } } static gboolean rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src, - gboolean fir, GstClockTime current_time) + guint32 media_ssrc, gboolean fir, GstClockTime current_time) { guint32 round_trip = 0; rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip); - if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) { + if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) { GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip, GST_SECOND, 65536); - if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) { - GST_DEBUG ("Ignoring %s request because one was send without one " + /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP + * packets with erroneous values resulting in crazy high RTT. */ + if (round_trip_in_ns > 5 * GST_SECOND) + round_trip_in_ns = GST_SECOND / 2; + + if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) { + GST_DEBUG ("Ignoring %s request from %X because one was send without one " "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")", - fir ? "FIR" : "PLI", - GST_TIME_ARGS (current_time - sess->last_keyframe_request), + fir ? "FIR" : "PLI", rtp_source_get_ssrc (src), + GST_TIME_ARGS (current_time - src->last_keyframe_request), GST_TIME_ARGS (round_trip_in_ns)); return FALSE; } } - sess->last_keyframe_request = current_time; + src->last_keyframe_request = current_time; - GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI", - rtp_source_get_ssrc (src), sess->callbacks.process_rtp, + GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI", + rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp, sess->callbacks.request_key_unit); RTP_SESSION_UNLOCK (sess); - sess->callbacks.request_key_unit (sess, fir, + sess->callbacks.request_key_unit (sess, media_ssrc, fir, sess->request_key_unit_user_data); RTP_SESSION_LOCK (sess); @@ -2411,15 +2779,21 @@ rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc, return; src = find_source (sess, sender_ssrc); - if (src == NULL) - return; + if (src == NULL) { + /* try to find a src with media_ssrc instead */ + src = find_source (sess, media_ssrc); + if (src == NULL) + return; + } - rtp_session_request_local_key_unit (sess, src, FALSE, current_time); + rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE, + current_time); } static void rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc, - guint8 * fci_data, guint fci_length, GstClockTime current_time) + guint32 media_ssrc, guint8 * fci_data, guint fci_length, + GstClockTime current_time) { RTPSource *src; guint32 ssrc; @@ -2470,7 +2844,8 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc, if (!our_request) return; - rtp_session_request_local_key_unit (sess, src, TRUE, current_time); + rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE, + current_time); } static void @@ -2502,23 +2877,66 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc, } static void +rtp_session_process_twcc (RTPSession * sess, guint32 sender_ssrc, + guint32 media_ssrc, guint8 * fci_data, guint fci_length) +{ + GArray *twcc_packets; + GstStructure *twcc_packets_s; + GstStructure *twcc_stats_s; + + twcc_packets = rtp_twcc_manager_parse_fci (sess->twcc, + fci_data, fci_length * sizeof (guint32)); + if (twcc_packets == NULL) + return; + + twcc_packets_s = rtp_twcc_stats_get_packets_structure (twcc_packets); + twcc_stats_s = + rtp_twcc_stats_process_packets (sess->twcc_stats, twcc_packets); + + GST_DEBUG_OBJECT (sess, "Parsed TWCC: %" GST_PTR_FORMAT, twcc_packets_s); + GST_INFO_OBJECT (sess, "Current TWCC stats %" GST_PTR_FORMAT, twcc_stats_s); + + g_array_unref (twcc_packets); + + RTP_SESSION_UNLOCK (sess); + if (sess->callbacks.notify_twcc) + sess->callbacks.notify_twcc (sess, twcc_packets_s, twcc_stats_s, + sess->notify_twcc_user_data); + RTP_SESSION_LOCK (sess); +} + +static void rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, RTPPacketInfo * pinfo, GstClockTime current_time) { - GstRTCPType type = gst_rtcp_packet_get_type (packet); - GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet); - guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet); - guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet); - guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet); - guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet); + GstRTCPType type; + GstRTCPFBType fbtype; + guint32 sender_ssrc, media_ssrc; + guint8 *fci_data; + guint fci_length; RTPSource *src; + /* The feedback packet must include both sender SSRC and media SSRC */ + if (packet->length < 2) + return; + + type = gst_rtcp_packet_get_type (packet); + fbtype = gst_rtcp_packet_fb_get_type (packet); + sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet); + media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet); + src = find_source (sess, media_ssrc); /* skip non-bye packets for sources that are marked BYE */ if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src)) return; + if (src) + g_object_ref (src); + + fci_data = gst_rtcp_packet_fb_get_fci (packet); + fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32); + GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of " "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length); @@ -2530,7 +2948,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data, fci_length); - GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time; + GST_BUFFER_PTS (fci_buffer) = pinfo->running_time; } RTP_SESSION_UNLOCK (sess); @@ -2542,13 +2960,15 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, gst_buffer_unref (fci_buffer); } - if (src && sess->rtcp_feedback_retention_window) { + if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) { rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time); } if ((src && src->internal) || /* PSFB FIR puts the media ssrc inside the FCI */ - (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) { + (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR) || + /* TWCC is for all sources, so a single media-ssrc is not enough */ + (type == GST_RTCP_TYPE_RTPFB && fbtype == GST_RTCP_RTPFB_TYPE_TWCC)) { switch (type) { case GST_RTCP_TYPE_PSFB: switch (fbtype) { @@ -2561,8 +2981,8 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, case GST_RTCP_PSFB_TYPE_FIR: if (src) src->stats.recv_fir_count++; - rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length, - current_time); + rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data, + fci_length, current_time); break; default: break; @@ -2571,9 +2991,15 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, case GST_RTCP_TYPE_RTPFB: switch (fbtype) { case GST_RTCP_RTPFB_TYPE_NACK: + if (src) + src->stats.recv_nack_count++; rtp_session_process_nack (sess, sender_ssrc, media_ssrc, fci_data, fci_length, current_time); break; + case GST_RTCP_RTPFB_TYPE_TWCC: + rtp_session_process_twcc (sess, sender_ssrc, media_ssrc, + fci_data, fci_length); + break; default: break; } @@ -2581,6 +3007,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, break; } } + + if (src) + g_object_unref (src); } /** @@ -2597,7 +3026,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, */ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, - GstClockTime current_time, guint64 ntpnstime) + GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) { GstRTCPPacket packet; gboolean more, is_bye = FALSE, do_sync = FALSE; @@ -2608,7 +3037,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); - if (!gst_rtcp_buffer_validate (buffer)) + if (!gst_rtcp_buffer_validate_reduced (buffer)) goto invalid_packet; GST_DEBUG ("received RTCP packet"); @@ -2619,7 +3048,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, RTP_SESSION_LOCK (sess); /* update pinfo stats */ update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time, - -1, ntpnstime); + running_time, ntpnstime); /* start processing the compound packet */ gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp); @@ -2652,8 +3081,14 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, case GST_RTCP_TYPE_PSFB: rtp_session_process_feedback (sess, &packet, &pinfo, current_time); break; + case GST_RTCP_TYPE_XR: + /* FIXME: This block is added to downgrade warning level. + * Once the parser is implemented, it should be replaced with + * a proper process function. */ + GST_DEBUG ("got RTCP XR packet, but ignored"); + break; default: - GST_WARNING ("got unknown RTCP packet"); + GST_WARNING ("got unknown RTCP packet type: %d", type); break; } more = gst_rtcp_packet_move_to_next (&packet); @@ -2696,6 +3131,29 @@ invalid_packet: } } +static guint8 +_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name) +{ + guint i; + guint8 extmap_id = 0; + guint n_fields = gst_structure_n_fields (s); + + for (i = 0; i < n_fields; i++) { + const gchar *field_name = gst_structure_nth_field_name (s, i); + if (g_str_has_prefix (field_name, "extmap-")) { + const gchar *str = gst_structure_get_string (s, field_name); + if (str && g_strcmp0 (str, ext_name) == 0) { + gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10); + if (id > 0 && id < 15) { + extmap_id = id; + break; + } + } + } + } + return extmap_id; +} + /** * rtp_session_update_send_caps: * @sess: an #RTPSession @@ -2722,8 +3180,15 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) RTP_SESSION_LOCK (sess); source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE); + sess->suggested_ssrc = ssrc; + sess->internal_ssrc_set = TRUE; + sess->internal_ssrc_from_caps_or_property = TRUE; if (source) { rtp_source_update_caps (source, caps); + + if (created) + on_new_sender_ssrc (sess, source); + g_object_unref (source); } @@ -2732,13 +3197,41 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE); if (source) { rtp_source_update_caps (source, caps); + + if (created) + on_new_sender_ssrc (sess, source); + g_object_unref (source); } } RTP_SESSION_UNLOCK (sess); + } else { + sess->internal_ssrc_from_caps_or_property = FALSE; } + + sess->twcc_send_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR); + if (sess->twcc_send_ext_id > 0) { + GST_INFO ("TWCC enabled for send using extension id: %u", + sess->twcc_send_ext_id); + } +} + +static void +send_twcc_packet (RTPSession * sess, RTPPacketInfo * pinfo) +{ + gint32 twcc_seqnum; + + if (sess->twcc_send_ext_id == 0) + return; + + twcc_seqnum = packet_info_get_twcc_seqnum (pinfo, sess->twcc_send_ext_id); + if (twcc_seqnum == -1) + return; + + rtp_twcc_manager_send_packet (sess->twcc, twcc_seqnum, pinfo); } + /** * rtp_session_send_rtp: * @sess: an #RTPSession @@ -2747,8 +3240,8 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) * @current_time: the current system time * @running_time: the running time of @data * - * Send the RTP buffer in the session manager. This function takes ownership of - * @buffer. + * Send the RTP data (a buffer or buffer list) in the session manager. This + * function takes ownership of @data. * * Returns: a #GstFlowReturn. */ @@ -2773,7 +3266,37 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, current_time, running_time, -1)) goto invalid_packet; + send_twcc_packet (sess, &pinfo); + source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time); + if (created) + on_new_sender_ssrc (sess, source); + + if (!source->internal) { + GSocketAddress *from; + + if (source->rtp_from) + from = source->rtp_from; + else + from = source->rtcp_from; + if (from) { + if (rtp_session_find_conflicting_address (sess, from, current_time)) { + /* Its a known conflict, its probably a loop, not a collision + * lets just drop the incoming packet + */ + GST_LOG ("Our packets are being looped back to us, ignoring collision"); + } else { + GST_DEBUG ("Collision for SSRC %x, change our sender ssrc", pinfo.ssrc); + + rtp_session_have_conflict (sess, source, from, current_time); + + goto collision; + } + } else { + GST_LOG ("Ignoring collision on sent SSRC %x because remote source" + " doesn't have an address", pinfo.ssrc); + } + } prevsender = RTP_SOURCE_IS_SENDER (source); oldrate = source->bitrate; @@ -2799,6 +3322,15 @@ invalid_packet: GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } +collision: + { + g_object_unref (source); + clean_packet_info (&pinfo); + RTP_SESSION_UNLOCK (sess); + GST_WARNING ("non-internal source with same ssrc %08x, drop packet", + pinfo.ssrc); + return GST_FLOW_OK; + } } static void @@ -2905,7 +3437,6 @@ rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time) INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100); sess->bye_stats.bye_members = 1; sess->first_rtcp = TRUE; - sess->allow_early = TRUE; /* reschedule transmission */ sess->last_rtcp_send_time = current_time; @@ -2916,6 +3447,7 @@ rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time) sess->next_rtcp_check_time = current_time + interval; else sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_interval = interval; GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time)); @@ -2996,16 +3528,36 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time) GST_DEBUG ("reconsider BYE, more than 50 sources"); /* reconsider BYE if members >= 50 */ interval = calculate_rtcp_interval (sess, FALSE, TRUE); + sess->last_rtcp_interval = interval; } } else { if (sess->first_rtcp) { GST_DEBUG ("first RTCP packet"); /* we are called for the first time */ interval = calculate_rtcp_interval (sess, FALSE, TRUE); + sess->last_rtcp_interval = interval; } else if (sess->next_rtcp_check_time < current_time) { GST_DEBUG ("old check time expired, getting new timeout"); /* get a new timeout when we need to */ interval = calculate_rtcp_interval (sess, FALSE, FALSE); + sess->last_rtcp_interval = interval; + + if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF + || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) + && interval != GST_CLOCK_TIME_NONE) { + /* Apply the rules from RFC 4585 section 3.5.3 */ + if (sess->stats.min_interval != 0) { + GstClockTime T_rr_current_interval = g_random_double_range (0.5, + 1.5) * sess->stats.min_interval * GST_SECOND; + + if (T_rr_current_interval > interval) { + GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval), + GST_TIME_ARGS (interval)); + interval = T_rr_current_interval; + } + } + } } } @@ -3067,6 +3619,9 @@ session_start_rtcp (RTPSession * sess, ReportData * data) gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp); + if (data->is_early && sess->reduced_size_rtcp) + return; + if (RTP_SOURCE_IS_SENDER (own)) { guint64 ntptime; guint32 rtptime; @@ -3085,7 +3640,9 @@ session_start_rtcp (RTPSession * sess, ReportData * data) /* fill in sender report info */ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - ntptime, rtptime, packet_count, octet_count); + sess->timestamp_sender_reports ? ntptime : 0, + sess->timestamp_sender_reports ? rtptime : 0, + packet_count, octet_count); } else { /* we are only receiver, create RR */ GST_DEBUG ("create RR for SSRC %08x", own->ssrc); @@ -3123,8 +3680,8 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) return; } - /* only report about other sender */ - if (source == data->source) + /* only report about remote sources */ + if (source->internal) goto reported; if (!RTP_SOURCE_IS_SENDER (source)) { @@ -3132,6 +3689,11 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) goto reported; } + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + goto reported; + } + GST_DEBUG ("create RB for SSRC %08x", source->ssrc); /* get new stats */ @@ -3258,15 +3820,64 @@ session_pli (const gchar * key, RTPSource * source, ReportData * data) static void session_nack (const gchar * key, RTPSource * source, ReportData * data) { + RTPSession *sess = data->sess; GstRTCPBuffer *rtcp = &data->rtcpbuf; GstRTCPPacket *packet = &data->packet; - guint32 *nacks; - guint n_nacks, i; + guint16 *nacks; + GstClockTime *nack_deadlines; + guint n_nacks, i = 0; + guint nacked_seqnums = 0; + guint16 n_fb_nacks = 0; guint8 *fci_data; if (!source->send_nack) return; + nacks = rtp_source_get_nacks (source, &n_nacks); + nack_deadlines = rtp_source_get_nack_deadlines (source, NULL); + GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks, + GST_TIME_ARGS (data->current_time)); + + /* cleanup expired nacks */ + for (i = 0; i < n_nacks; i++) { + GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i], + GST_TIME_ARGS (nack_deadlines[i])); + if (nack_deadlines[i] >= data->current_time) + break; + } + + if (data->is_early) { + /* don't remove them all if this is an early RTCP packet. It may happen + * that the NACKs are late due to high RTT, not sending NACKs at all would + * keep the RTX RTT stats high and maintain a dropping state. */ + i = MIN (n_nacks - 1, i); + } + + if (i) { + GST_WARNING ("Removing %u expired NACKS", i); + rtp_source_clear_nacks (source, i); + n_nacks -= i; + if (n_nacks == 0) + return; + } + + /* allow overriding NACK to packet conversion */ + if (g_signal_has_handler_pending (sess, + rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) { + /* this is needed as it will actually resize the buffer */ + gst_rtcp_buffer_unmap (rtcp); + + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, + data->source->ssrc, source->ssrc, source->nacks, data->rtcp, + &nacked_seqnums); + + /* and now remap for the remaining work */ + gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp); + + if (nacked_seqnums > 0) + goto done; + } + if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet)) /* exit because the packet is full, will put next request in a * further packet */ @@ -3276,19 +3887,46 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc); gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc); - nacks = rtp_source_get_nacks (source, &n_nacks); - GST_DEBUG ("%u NACKs", n_nacks); - if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks)) + if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) { + gst_rtcp_packet_remove (packet); + GST_WARNING ("no nacks fit in the packet"); return; + } fci_data = gst_rtcp_packet_fb_get_fci (packet); - for (i = 0; i < n_nacks; i++) { - GST_WRITE_UINT32_BE (fci_data, nacks[i]); + for (i = 0; i < n_nacks; i = nacked_seqnums) { + guint16 seqnum = nacks[i]; + guint16 blp = 0; + guint j; + + if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1)) + break; + + n_fb_nacks++; + nacked_seqnums++; + + for (j = i + 1; j < n_nacks; j++) { + gint diff; + + diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]); + GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff); + if (diff > 16) + break; + + blp |= 1 << (diff - 1); + nacked_seqnums++; + } + + GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp); fci_data += 4; - data->nacked_seqnums++; } - rtp_source_clear_nacks (source); + GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks); + source->stats.sent_nack_count += n_fb_nacks; + +done: + data->nacked_seqnums += nacked_seqnums; + rtp_source_clear_nacks (source, nacked_seqnums); data->may_suppress = FALSE; } @@ -3309,8 +3947,8 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) /* check for outdated collisions */ if (source->internal) { GST_DEBUG ("Timing out collisions for %x", source->ssrc); - rtp_source_timeout (source, data->current_time, - data->running_time - sess->rtcp_feedback_retention_window); + rtp_source_timeout (source, data->current_time, data->running_time, + sess->rtcp_feedback_retention_window); } /* nothing else to do when without RTCP */ @@ -3532,15 +4170,11 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) else data->is_early = FALSE; - if (data->is_early && sess->next_early_rtcp_time < current_time) { - GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %" + if (data->is_early && sess->next_early_rtcp_time <= current_time) { + GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time), GST_TIME_ARGS (current_time)); - goto early; - } - - /* no need to check yet */ - if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE || + } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE || sess->next_rtcp_check_time > current_time) { GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time), @@ -3548,8 +4182,6 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) return FALSE; } -early: - /* take interval and add jitter */ interval = data->interval; if (interval != GST_CLOCK_TIME_NONE) @@ -3585,36 +4217,33 @@ early: GST_TIME_ARGS (new_send_time)); /* store new check time */ sess->next_rtcp_check_time = new_send_time; + sess->last_rtcp_interval = interval; return FALSE; } - sess->next_rtcp_check_time = current_time + interval; - } - if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF - || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) - && interval != GST_CLOCK_TIME_NONE) { - /* Apply the rules from RFC 4585 section 3.5.3 */ - if (stats->min_interval != 0 && !sess->first_rtcp) { - GstClockTime T_rr_current_interval = - g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND; - - /* This will caused the RTCP to be suppressed if no FB packets are added */ - if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) { - GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT - " last: %" GST_TIME_FORMAT - " + T_rr_current_interval: %" GST_TIME_FORMAT - " > new_send_time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (stats->min_interval), - GST_TIME_ARGS (sess->last_rtcp_send_time), - GST_TIME_ARGS (T_rr_current_interval), - GST_TIME_ARGS (new_send_time)); - data->may_suppress = TRUE; + sess->last_rtcp_interval = interval; + if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF + || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) + && interval != GST_CLOCK_TIME_NONE) { + /* Apply the rules from RFC 4585 section 3.5.3 */ + if (stats->min_interval != 0 && !sess->first_rtcp) { + GstClockTime T_rr_current_interval = + g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND; + + if (T_rr_current_interval > interval) { + GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval), + GST_TIME_ARGS (interval)); + interval = T_rr_current_interval; + } } } + sess->next_rtcp_check_time = current_time + interval; } - GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT, - GST_TIME_ARGS (new_send_time)); + + GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT, + GST_TIME_ARGS (sess->next_rtcp_check_time)); return TRUE; } @@ -3643,6 +4272,37 @@ remove_closing_sources (const gchar * key, RTPSource * source, } static void +generate_twcc (const gchar * key, RTPSource * source, ReportData * data) +{ + RTPSession *sess = data->sess; + GstBuffer *buf; + + /* only generate RTCP for active internal sources */ + if (!source->internal || source->sent_bye) + return; + + /* ignore other sources when we do the timeout after a scheduled BYE */ + if (sess->scheduled_bye && !source->marked_bye) + return; + + /* skip if RTCP is disabled */ + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + return; + } + + while ((buf = rtp_twcc_manager_get_feedback (sess->twcc, source->ssrc))) { + ReportOutput *output = g_slice_new (ReportOutput); + output->source = g_object_ref (source); + output->is_bye = FALSE; + output->buffer = buf; + /* queue the RTCP packet to push later */ + g_queue_push_tail (&data->output, output); + } +} + + +static void generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) { RTPSession *sess = data->sess; @@ -3657,6 +4317,12 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) if (sess->scheduled_bye && !source->marked_bye) return; + /* skip if RTCP is disabled */ + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + return; + } + data->source = source; /* open packet */ @@ -3672,7 +4338,7 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) session_report_blocks, data); } - if (!data->has_sdes) + if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp)) session_sdes (sess, data); if (data->have_fir) @@ -3718,6 +4384,47 @@ update_generation (const gchar * key, RTPSource * source, ReportData * data) } } +static void +schedule_remaining_nacks (const gchar * key, RTPSource * source, + ReportData * data) +{ + RTPSession *sess = data->sess; + GstClockTime *nack_deadlines; + GstClockTime deadline; + guint n_nacks; + + if (!source->send_nack) + return; + + /* the scheduling is entirely based on available bandwidth, just take the + * biggest seqnum, which will have the largest deadline to request early + * RTCP. */ + nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks); + deadline = nack_deadlines[n_nacks - 1]; + RTP_SESSION_UNLOCK (sess); + rtp_session_send_rtcp_with_deadline (sess, deadline); + RTP_SESSION_LOCK (sess); +} + +static gboolean +rtp_session_are_all_sources_bye (RTPSession * sess) +{ + GHashTableIter iter; + RTPSource *src; + + RTP_SESSION_LOCK (sess); + g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]); + while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) { + if (src->internal && !src->sent_bye) { + RTP_SESSION_UNLOCK (sess); + return FALSE; + } + } + RTP_SESSION_UNLOCK (sess); + + return TRUE; +} + /** * rtp_session_on_timeout: * @sess: an #RTPSession @@ -3744,6 +4451,7 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, ReportData data = { GST_RTCP_BUFFER_INIT }; GHashTable *table_copy; ReportOutput *output; + gboolean all_empty = FALSE; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -3773,6 +4481,11 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, source = obtain_internal_source (sess, sess->suggested_ssrc, &created, current_time); + sess->internal_ssrc_set = TRUE; + + if (created) + on_new_sender_ssrc (sess, source); + g_object_unref (source); } @@ -3802,37 +4515,53 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, if (!is_rtcp_time (sess, current_time, &data)) goto done; - GST_DEBUG ("doing RTCP generation %u for %u sources, early %d", - sess->generation, data.num_to_report, data.is_early); + /* check if all the buffers are empty after generation */ + all_empty = TRUE; + + GST_DEBUG + ("doing RTCP generation %u for %u sources, early %d, may suppress %d", + sess->generation, data.num_to_report, data.is_early, data.may_suppress); /* generate RTCP for all internal sources */ g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) generate_rtcp, &data); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) generate_twcc, &data); + /* update the generation for all the sources that have been reported */ g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) update_generation, &data); /* we keep track of the last report time in order to timeout inactive * receivers or senders */ - if (!data.is_early && !data.may_suppress) + if (!data.is_early) { + GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %" + GST_TIME_FORMAT " = %" GST_TIME_FORMAT, + GST_TIME_ARGS (data.current_time), + GST_TIME_ARGS (sess->last_rtcp_send_time), + GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time)); sess->last_rtcp_send_time = data.current_time; + } + + GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT + " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time), + GST_TIME_ARGS (sess->last_rtcp_check_time), + GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time)); sess->last_rtcp_check_time = data.current_time; sess->first_rtcp = FALSE; sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; sess->scheduled_bye = FALSE; - /* RFC 4585 section 3.5.2 step 6 */ - if (!data.is_early) { - sess->allow_early = TRUE; - } - done: RTP_SESSION_UNLOCK (sess); + /* notify about updated statistics */ + g_object_notify (G_OBJECT (sess), "stats"); + /* push out the RTCP packets */ while ((output = g_queue_pop_head (&data.output))) { - gboolean do_not_suppress; + gboolean do_not_suppress, empty_buffer; GstBuffer *buffer = output->buffer; RTPSource *source = output->source; @@ -3840,7 +4569,13 @@ done: g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0, buffer, data.is_early, &do_not_suppress); - if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) { + empty_buffer = gst_buffer_get_size (buffer) == 0; + + if (!empty_buffer) + all_empty = FALSE; + + if (sess->callbacks.send_rtcp && + !empty_buffer && (do_not_suppress || !data.may_suppress)) { guint packet_size; packet_size = gst_buffer_get_size (buffer) + sess->header_len; @@ -3849,19 +4584,38 @@ done: GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats, sess->stats.avg_rtcp_packet_size, packet_size); result = - sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye, - sess->send_rtcp_user_data); + sess->callbacks.send_rtcp (sess, source, buffer, + rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data); + + RTP_SESSION_LOCK (sess); sess->stats.nacks_sent += data.nacked_seqnums; + on_sender_ssrc_active (sess, source); + RTP_SESSION_UNLOCK (sess); } else { GST_DEBUG ("freeing packet callback: %p" - " do_not_suppress: %d may_suppress: %d", - sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress); - sess->stats.nacks_dropped += data.nacked_seqnums; + " empty_buffer: %d, " + " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp, + empty_buffer, do_not_suppress, data.may_suppress); + if (!empty_buffer) { + RTP_SESSION_LOCK (sess); + sess->stats.nacks_dropped += data.nacked_seqnums; + RTP_SESSION_UNLOCK (sess); + } gst_buffer_unref (buffer); } g_object_unref (source); g_slice_free (ReportOutput, output); } + + if (all_empty) + GST_ERROR ("generated empty RTCP messages for all the sources"); + + /* schedule remaining nacks */ + RTP_SESSION_LOCK (sess); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) schedule_remaining_nacks, &data); + RTP_SESSION_UNLOCK (sess); + return result; } @@ -3879,8 +4633,9 @@ gboolean rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, GstClockTime max_delay) { - GstClockTime T_dither_max, T_rr; + GstClockTime T_dither_max, T_rr, offset = 0; gboolean ret; + gboolean allow_early; /* Implements the algorithm described in RFC 4585 section 3.5.2 */ @@ -3930,7 +4685,7 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, goto end; } - T_rr = sess->next_rtcp_check_time - sess->last_rtcp_check_time; + T_rr = sess->last_rtcp_interval; /* RFC 4585 section 3.5.2 step 2b */ /* If the total sources is <=2, then there is only us and one peer */ @@ -3948,16 +4703,41 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, /* RFC 4585 section 3.5.2 step 3 */ if (current_time + T_dither_max > sess->next_rtcp_check_time) { GST_LOG_OBJECT (sess, - "don't send because of dither, next scheduled time is soon %" + "don't send because of dither, next scheduled time is too soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max), GST_TIME_ARGS (sess->next_rtcp_check_time)); - ret = TRUE; + ret = T_dither_max <= max_delay; goto end; } - /* RFC 4585 section 3.5.2 step 4a */ - if (!sess->allow_early) { + /* RFC 4585 section 3.5.2 step 4a and + * RFC 4585 section 3.5.2 step 6 */ + allow_early = FALSE; + if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) { + /* Last time we sent a full RTCP packet, we can now immediately + * send an early one as allow_early was reset to TRUE */ + allow_early = TRUE; + } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) { + /* Last packet we sent was an early RTCP packet and more than + * T_rr has passed since then, meaning we would have suppressed + * a regular RTCP packet already and reset allow_early to TRUE */ + allow_early = TRUE; + + /* We have to offset a bit as T_rr has not passed yet, but will before + * max_delay */ + if (sess->last_rtcp_check_time + T_rr > current_time) + offset = (sess->last_rtcp_check_time + T_rr) - current_time; + } else { + GST_DEBUG_OBJECT (sess, + "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %" + GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time), + GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr), + GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay)); + } + + if (!allow_early) { /* Ignore the request a scheduled packet will be in time anyway */ if (current_time + max_delay > sess->next_rtcp_check_time) { GST_LOG_OBJECT (sess, @@ -3981,20 +4761,12 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, if (T_dither_max) { /* Schedule an early transmission later */ sess->next_early_rtcp_time = g_random_double () * T_dither_max + - current_time; + current_time + offset; } else { /* If no dithering, schedule it for NOW */ - sess->next_early_rtcp_time = current_time; + sess->next_early_rtcp_time = current_time + offset; } - /* RFC 4585 section 3.5.2 step 6 */ - sess->allow_early = FALSE; - /* Delay next regular RTCP packet to not exceed the short-term - * RTCP bandwidth when using early feedback as compared to - * without */ - sess->next_rtcp_check_time = sess->last_rtcp_check_time + 2 * T_rr; - sess->last_rtcp_check_time += T_rr; - GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT ", next regular RTCP time %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time), @@ -4016,6 +4788,35 @@ end: } static gboolean +rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now, + GstClockTime max_delay) +{ + /* notify the application that we intend to send early RTCP */ + if (sess->callbacks.notify_early_rtcp) + sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data); + + return rtp_session_request_early_rtcp (sess, now, max_delay); +} + +static gboolean +rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline) +{ + GstClockTime now, max_delay; + + if (!sess->callbacks.send_rtcp) + return FALSE; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); + + if (deadline < now) + return FALSE; + + max_delay = deadline - now; + + return rtp_session_send_rtcp_internal (sess, now, max_delay); +} + +static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) { GstClockTime now; @@ -4025,7 +4826,7 @@ rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) now = sess->callbacks.request_time (sess, sess->request_time_user_data); - return rtp_session_request_early_rtcp (sess, now, max_delay); + return rtp_session_send_rtcp_internal (sess, now, max_delay); } gboolean @@ -4034,11 +4835,6 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, { RTPSource *src; - if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) { - GST_DEBUG ("FIR/PLI not sent"); - return FALSE; - } - RTP_SESSION_LOCK (sess); src = find_source (sess, ssrc); if (src == NULL) @@ -4056,6 +4852,10 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, } RTP_SESSION_UNLOCK (sess); + if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) { + GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP"); + } + return TRUE; /* ERRORS */ @@ -4082,21 +4882,27 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum, GstClockTime max_delay) { RTPSource *source; + GstClockTime now; - if (!rtp_session_send_rtcp (sess, max_delay)) { - GST_DEBUG ("NACK not sent"); + if (!sess->callbacks.send_rtcp) return FALSE; - } + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); RTP_SESSION_LOCK (sess); source = find_source (sess, ssrc); if (source == NULL) goto no_source; - GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum); - rtp_source_register_nack (source, seqnum); + GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT, + ssrc, seqnum, GST_TIME_ARGS (now + max_delay)); + rtp_source_register_nack (source, seqnum, now + max_delay); RTP_SESSION_UNLOCK (sess); + if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) { + GST_DEBUG ("NACK not sent early, sending with next regular RTCP"); + } + return TRUE; /* ERRORS */ @@ -4106,3 +4912,22 @@ no_source: return FALSE; } } + +/** + * rtp_session_update_recv_caps_structure: + * @sess: an #RTPSession + * @s: a #GstStructure from a #GstCaps + * + * Update the caps of the receiver in the rtp session. + */ +void +rtp_session_update_recv_caps_structure (RTPSession * sess, + const GstStructure * s) +{ + guint8 ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR); + if (ext_id > 0) { + sess->twcc_recv_ext_id = ext_id; + GST_INFO ("TWCC enabled for recv using extension id: %u", + sess->twcc_recv_ext_id); + } +}