X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Frtpsession.c;h=fa9dfed71c61c67b4e913388ca48c0d7b7a87098;hb=0d77e9721deea47256cf800db58bf119f3c53786;hp=2c49fb1eea95ae05a6ccdcc44230b9e57cf0485b;hpb=b07b7736b37757e3e271eda27aacb4b0669826fa;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 2c49fb1..fa9dfed 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -47,15 +47,20 @@ enum SIGNAL_ON_TIMEOUT, SIGNAL_ON_SENDER_TIMEOUT, SIGNAL_ON_SENDING_RTCP, + SIGNAL_ON_APP_RTCP, SIGNAL_ON_FEEDBACK_RTCP, SIGNAL_SEND_RTCP, SIGNAL_SEND_RTCP_FULL, + SIGNAL_ON_RECEIVING_RTCP, + SIGNAL_ON_NEW_SENDER_SSRC, + SIGNAL_ON_SENDER_SSRC_ACTIVE, + SIGNAL_ON_SENDING_NACKS, LAST_SIGNAL }; #define DEFAULT_INTERNAL_SOURCE NULL -#define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH -#define DEFAULT_RTCP_FRACTION (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH) +#define DEFAULT_BANDWIDTH 0.0 +#define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_FRACTION #define DEFAULT_RTCP_RR_BANDWIDTH -1 #define DEFAULT_RTCP_RS_BANDWIDTH -1 #define DEFAULT_RTCP_MTU 1400 @@ -67,6 +72,11 @@ enum #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND) #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3) #define DEFAULT_PROBATION RTP_DEFAULT_PROBATION +#define DEFAULT_MAX_DROPOUT_TIME 60000 +#define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP +#define DEFAULT_RTCP_REDUCED_SIZE FALSE +#define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE enum { @@ -87,8 +97,12 @@ enum PROP_RTCP_FEEDBACK_RETENTION_WINDOW, PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, PROP_PROBATION, + PROP_MAX_DROPOUT_TIME, + PROP_MAX_MISORDER_TIME, PROP_STATS, - PROP_LAST + PROP_RTP_PROFILE, + PROP_RTCP_REDUCED_SIZE, + PROP_RTCP_DISABLE_SR_TIMESTAMP }; /* update average packet size */ @@ -110,6 +124,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id, static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay); +static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess, + GstClockTime deadline); static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; @@ -287,6 +303,23 @@ rtp_session_class_init (RTPSessionClass * klass) GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE, G_TYPE_BOOLEAN); /** + * RTPSession::on-app-rtcp: + * @session: the object which received the signal + * @subtype: The subtype of the packet + * @ssrc: The SSRC/CSRC of the packet + * @name: The name of the packet + * @data: a #GstBuffer with the application-dependant data or %NULL if + * there was no data + * + * Notify that a RTCP APP packet has been received + */ + rtp_session_signals[SIGNAL_ON_APP_RTCP] = + g_signal_new ("on-app-rtcp", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_app_rtcp), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 4, + G_TYPE_UINT, G_TYPE_UINT, G_TYPE_STRING, GST_TYPE_BUFFER); + + /** * RTPSession::on-feedback-rtcp: * @session: the object which received the signal * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or @@ -313,6 +346,8 @@ rtp_session_class_init (RTPSessionClass * klass) * * Requests that the #RTPSession initiate a new RTCP packet as soon as * possible within the requested delay. + * + * This sets feedback to %TRUE if not already done before. */ rtp_session_signals[SIGNAL_SEND_RTCP] = g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass), @@ -329,6 +364,8 @@ rtp_session_class_init (RTPSessionClass * klass) * Requests that the #RTPSession initiate a new RTCP packet as soon as * possible within the requested delay. * + * This sets feedback to %TRUE if not already done before. + * * Returns: TRUE if the new RTCP packet could be scheduled within the * requested delay, FALSE otherwise. * @@ -340,6 +377,85 @@ rtp_session_class_init (RTPSessionClass * klass) G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64); + /** + * RTPSession::on-receiving-rtcp + * @session: the object which received the signal + * @buffer: the #GstBuffer containing the RTCP packet that was received + * + * This signal is emitted when receiving an RTCP packet before it is handled + * by the session. It can be used to extract custom information from RTCP packets. + * + * Since: 1.6 + */ + rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] = + g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, + GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE); + + /** + * RTPSession::on-new-sender-ssrc: + * @session: the object which received the signal + * @src: the new sender RTPSource + * + * Notify of a new sender SSRC that entered @session. + * + * Since: 1.8 + */ + rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC] = + g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_sender_ssrc), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + RTP_TYPE_SOURCE); + + /** + * RTPSession::on-sender-ssrc-active: + * @session: the object which received the signal + * @src: the active sender RTPSource + * + * Notify of a sender SSRC that is active, i.e., sending RTCP. + * + * Since: 1.8 + */ + rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] = + g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, + on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, + G_TYPE_NONE, 1, RTP_TYPE_SOURCE); + + /** + * RTPSession::on-sending-nack + * @session: the object which received the signal + * @sender_ssrc: the sender ssrc + * @media_ssrc: the media ssrc + * @nacks: (element-type guint16): the list of seqnum to be nacked + * @buffer: the #GstBuffer containing the RTCP packet about to be sent + * + * This signal is emitted before NACK packets are added into the RTCP + * packet. This signal can be used to override the conversion of the NACK + * seqnum array into packets. This can be used if your protocol uses + * different type of NACK (e.g. based on RTCP APP). + * + * The handler should transform the seqnum from @nacks array into packets. + * @nacks seqnum must be consumed from the start. The remaining will be + * rescheduled for later base on bandwidth. Only one handler will be + * signalled. + * + * A handler may return 0 to signal that generic NACKs should be created + * for this set. This can be useful if the signal is used for other purpose + * or if the other type of NACK would use more space. + * + * Returns: the number of NACK seqnum that was consumed from @nacks. + * + * Since: 1.16 + */ + rtp_session_signals[SIGNAL_ON_SENDING_NACKS] = + g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks), + g_signal_accumulator_first_wins, NULL, g_cclosure_marshal_generic, + G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY, + GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE); + g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC, g_param_spec_uint ("internal-ssrc", "Internal SSRC", "The internal SSRC used for the session (deprecated)", @@ -352,25 +468,25 @@ rtp_session_class_init (RTPSessionClass * klass) g_object_class_install_property (gobject_class, PROP_BANDWIDTH, g_param_spec_double ("bandwidth", "Bandwidth", - "The bandwidth of the session (0 for auto-discover)", + "The bandwidth of the session in bits per second (0 for auto-discover)", 0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION, g_param_spec_double ("rtcp-fraction", "RTCP Fraction", - "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)", + "The fraction of the bandwidth used for RTCP in bits per second (or as a real fraction of the RTP bandwidth if < 1)", 0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH, g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth", - "The RTCP bandwidth used for receivers in bytes per second (-1 = default)", + "The RTCP bandwidth used for receivers in bits per second (-1 = default)", -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH, g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth", - "The RTCP bandwidth used for senders in bytes per second (-1 = default)", + "The RTCP bandwidth used for senders in bits per second (-1 = default)", -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); @@ -450,9 +566,9 @@ rtp_session_class_init (RTPSessionClass * klass) g_param_spec_uint ("rtcp-immediate-feedback-threshold", "RTCP Immediate Feedback threshold", "The maximum number of members of a RTP session for which immediate" - " feedback is used", + " feedback is used (DEPRECATED: has no effect and is not needed)", 0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED)); g_object_class_install_property (gobject_class, PROP_PROBATION, g_param_spec_uint ("probation", "Number of probations", @@ -460,6 +576,18 @@ rtp_session_class_init (RTPSessionClass * klass) 0, G_MAXUINT, DEFAULT_PROBATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME, + g_param_spec_uint ("max-dropout-time", "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME, + g_param_spec_uint ("max-misorder-time", "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * RTPSession::stats: * @@ -470,6 +598,8 @@ rtp_session_class_init (RTPSessionClass * klass) * dropped (due to bandwidth constraints) * "sent-nack-count" G_TYPE_UINT Number of NACKs sent * "recv-nack-count" G_TYPE_UINT Number of NACKs received + * "source-stats" G_TYPE_BOXED GValueArray of #RTPSource::stats for all + * RTP sources (Since 1.8) * * Since: 1.4 */ @@ -478,6 +608,32 @@ rtp_session_class_init (RTPSessionClass * klass) "Various statistics", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RTP_PROFILE, + g_param_spec_enum ("rtp-profile", "RTP Profile", + "RTP profile to use for this session", GST_TYPE_RTP_PROFILE, + DEFAULT_RTP_PROFILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RTCP_REDUCED_SIZE, + g_param_spec_boolean ("rtcp-reduced-size", "RTCP Reduced Size", + "Use Reduced Size RTCP for feedback packets", + DEFAULT_RTCP_REDUCED_SIZE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * RTPSession::disable-sr-timestamp: + * + * Whether sender reports should be timestamped. + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, + PROP_RTCP_DISABLE_SR_TIMESTAMP, + g_param_spec_boolean ("disable-sr-timestamp", + "Disable Sender Report Timestamp", + "Whether sender reports should be timestamped", + DEFAULT_RTCP_DISABLE_SR_TIMESTAMP, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + klass->get_source_by_ssrc = GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp); @@ -496,7 +652,11 @@ rtp_session_init (RTPSession * sess) sess->mask_idx = 0; sess->mask = 0; - for (i = 0; i < 32; i++) { + /* TODO: We currently only use the first hash table but this is the + * beginning of an implementation for RFC2762 + for (i = 0; i < 32; i++) { + */ + for (i = 0; i < 1; i++) { sess->ssrcs[i] = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref); @@ -518,6 +678,8 @@ rtp_session_init (RTPSession * sess) sess->mtu = DEFAULT_RTCP_MTU; sess->probation = DEFAULT_PROBATION; + sess->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME; + sess->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; /* some default SDES entries */ sess->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes"); @@ -538,17 +700,21 @@ rtp_session_init (RTPSession * sess) /* this is the SSRC we suggest */ sess->suggested_ssrc = rtp_session_create_new_ssrc (sess); + sess->internal_ssrc_set = FALSE; sess->first_rtcp = TRUE; sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_interval = GST_CLOCK_TIME_NONE; - sess->allow_early = TRUE; sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW; sess->rtcp_immediate_feedback_threshold = DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD; - - sess->last_keyframe_request = GST_CLOCK_TIME_NONE; + sess->rtp_profile = DEFAULT_RTP_PROFILE; + sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE; + sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP; sess->is_doing_ptp = TRUE; } @@ -566,7 +732,10 @@ rtp_session_finalize (GObject * object) g_list_free_full (sess->conflicting_addresses, (GDestroyNotify) rtp_conflicting_address_free); - for (i = 0; i < 32; i++) + /* TODO: Change this again when implementing RFC 2762 + * for (i = 0; i < 32; i++) + */ + for (i = 0; i < 1; i++) g_hash_table_destroy (sess->ssrcs[i]); g_mutex_clear (&sess->lock); @@ -604,16 +773,44 @@ rtp_session_create_sources (RTPSession * sess) return res; } +static void +create_source_stats (gpointer key, RTPSource * source, GValueArray * arr) +{ + GValue *value; + GstStructure *s; + + g_object_get (source, "stats", &s, NULL); + + g_value_array_append (arr, NULL); + value = g_value_array_get_nth (arr, arr->n_values - 1); + g_value_init (value, GST_TYPE_STRUCTURE); + g_value_take_boxed (value, s); +} + static GstStructure * rtp_session_create_stats (RTPSession * sess) { GstStructure *s; + GValueArray *source_stats; + GValue source_stats_v = G_VALUE_INIT; + guint size; + RTP_SESSION_LOCK (sess); s = gst_structure_new ("application/x-rtp-session-stats", "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped, "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent, "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL); + size = g_hash_table_size (sess->ssrcs[sess->mask_idx]); + source_stats = g_value_array_new (size); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) create_source_stats, source_stats); + RTP_SESSION_UNLOCK (sess); + + g_value_init (&source_stats_v, G_TYPE_VALUE_ARRAY); + g_value_take_boxed (&source_stats_v, source_stats); + gst_structure_take_value (s, "source-stats", &source_stats_v); + return s; } @@ -629,6 +826,8 @@ rtp_session_set_property (GObject * object, guint prop_id, case PROP_INTERNAL_SSRC: RTP_SESSION_LOCK (sess); sess->suggested_ssrc = g_value_get_uint (value); + sess->internal_ssrc_set = TRUE; + sess->internal_ssrc_from_caps_or_property = TRUE; RTP_SESSION_UNLOCK (sess); if (sess->callbacks.reconfigure) sess->callbacks.reconfigure (sess, sess->reconfigure_user_data); @@ -676,12 +875,36 @@ rtp_session_set_property (GObject * object, guint prop_id, if (sess->callbacks.reconsider) sess->callbacks.reconsider (sess, sess->reconsider_user_data); break; + case PROP_RTCP_FEEDBACK_RETENTION_WINDOW: + sess->rtcp_feedback_retention_window = g_value_get_uint64 (value); + break; case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value); break; case PROP_PROBATION: sess->probation = g_value_get_uint (value); break; + case PROP_MAX_DROPOUT_TIME: + sess->max_dropout_time = g_value_get_uint (value); + break; + case PROP_MAX_MISORDER_TIME: + sess->max_misorder_time = g_value_get_uint (value); + break; + case PROP_RTP_PROFILE: + sess->rtp_profile = g_value_get_enum (value); + /* trigger reconsideration */ + RTP_SESSION_LOCK (sess); + sess->next_rtcp_check_time = 0; + RTP_SESSION_UNLOCK (sess); + if (sess->callbacks.reconsider) + sess->callbacks.reconsider (sess, sess->reconsider_user_data); + break; + case PROP_RTCP_REDUCED_SIZE: + sess->reduced_size_rtcp = g_value_get_boolean (value); + break; + case PROP_RTCP_DISABLE_SR_TIMESTAMP: + sess->timestamp_sender_reports = !g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -698,7 +921,7 @@ rtp_session_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_INTERNAL_SSRC: - g_value_set_uint (value, rtp_session_suggest_ssrc (sess)); + g_value_set_uint (value, rtp_session_suggest_ssrc (sess, NULL)); break; case PROP_INTERNAL_SOURCE: /* FIXME, return a random source */ @@ -737,15 +960,33 @@ rtp_session_get_property (GObject * object, guint prop_id, case PROP_RTCP_MIN_INTERVAL: g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND); break; + case PROP_RTCP_FEEDBACK_RETENTION_WINDOW: + g_value_set_uint64 (value, sess->rtcp_feedback_retention_window); + break; case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold); break; case PROP_PROBATION: g_value_set_uint (value, sess->probation); break; + case PROP_MAX_DROPOUT_TIME: + g_value_set_uint (value, sess->max_dropout_time); + break; + case PROP_MAX_MISORDER_TIME: + g_value_set_uint (value, sess->max_misorder_time); + break; case PROP_STATS: g_value_take_boxed (value, rtp_session_create_stats (sess)); break; + case PROP_RTP_PROFILE: + g_value_set_enum (value, sess->rtp_profile); + break; + case PROP_RTCP_REDUCED_SIZE: + g_value_set_boolean (value, sess->reduced_size_rtcp); + break; + case PROP_RTCP_DISABLE_SR_TIMESTAMP: + g_value_set_boolean (value, !sess->timestamp_sender_reports); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -846,6 +1087,28 @@ on_sender_timeout (RTPSession * sess, RTPSource * source) g_object_unref (source); } +static void +on_new_sender_ssrc (RTPSession * sess, RTPSource * source) +{ + g_object_ref (source); + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0, + source); + RTP_SESSION_LOCK (sess); + g_object_unref (source); +} + +static void +on_sender_ssrc_active (RTPSession * sess, RTPSource * source) +{ + g_object_ref (source); + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE], 0, + source); + RTP_SESSION_LOCK (sess); + g_object_unref (source); +} + /** * rtp_session_new: * @@ -864,6 +1127,47 @@ rtp_session_new (void) } /** + * rtp_session_reset: + * @sess: an #RTPSession + * + * Reset the sources of @sess. + */ +void +rtp_session_reset (RTPSession * sess) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + /* remove all sources */ + g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]); + sess->total_sources = 0; + sess->stats.sender_sources = 0; + sess->stats.internal_sender_sources = 0; + sess->stats.internal_sources = 0; + sess->stats.active_sources = 0; + + sess->generation = 0; + sess->first_rtcp = TRUE; + sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_interval = GST_CLOCK_TIME_NONE; + sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; + sess->scheduled_bye = FALSE; + + /* reset session stats */ + sess->stats.bye_members = 0; + sess->stats.nacks_dropped = 0; + sess->stats.nacks_sent = 0; + sess->stats.nacks_received = 0; + + sess->is_doing_ptp = TRUE; + + g_list_free_full (sess->conflicting_addresses, + (GDestroyNotify) rtp_conflicting_address_free); + sess->conflicting_addresses = NULL; +} + +/** * rtp_session_set_callbacks: * @sess: an #RTPSession * @callbacks: callbacks to configure @@ -917,6 +1221,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.reconfigure = callbacks->reconfigure; sess->reconfigure_user_data = user_data; } + if (callbacks->notify_early_rtcp) { + sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp; + sess->notify_early_rtcp_user_data = user_data; + } } /** @@ -1050,7 +1358,7 @@ rtp_session_set_request_time_callback (RTPSession * sess, * @sess: an #RTPSession * @bandwidth: the bandwidth allocated * - * Set the session bandwidth in bytes per second. + * Set the session bandwidth in bits per second. */ void rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth) @@ -1089,7 +1397,7 @@ rtp_session_get_bandwidth (RTPSession * sess) * @sess: an #RTPSession * @bandwidth: the RTCP bandwidth * - * Set the bandwidth in bytes per second that should be used for RTCP + * Set the bandwidth in bits per second that should be used for RTCP * messages. */ void @@ -1148,6 +1456,12 @@ rtp_session_get_sdes_struct (RTPSession * sess) return result; } +static void +source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes) +{ + rtp_source_set_sdes_struct (source, gst_structure_copy (sdes)); +} + /** * rtp_session_set_sdes_struct: * @sess: an #RTSPSession @@ -1165,6 +1479,9 @@ rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes) if (sess->sdes) gst_structure_free (sess->sdes); sess->sdes = gst_structure_copy (sdes); + + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) source_set_sdes, sess->sdes); RTP_SESSION_UNLOCK (sess); } @@ -1364,8 +1681,10 @@ check_collision (RTPSession * sess, RTPSource * source, /* mark the source BYE */ rtp_source_mark_bye (source, "SSRC Collision"); /* if we were suggesting this SSRC, change to something else */ - if (sess->suggested_ssrc == ssrc) + if (sess->suggested_ssrc == ssrc) { sess->suggested_ssrc = rtp_session_create_new_ssrc (sess); + sess->internal_ssrc_set = TRUE; + } on_ssrc_collision (sess, source); @@ -1473,8 +1792,11 @@ add_source (RTPSession * sess, RTPSource * src) sess->stats.active_sources++; if (src->internal) { sess->stats.internal_sources++; - if (sess->suggested_ssrc != src->ssrc) + if (!sess->internal_ssrc_from_caps_or_property + && sess->suggested_ssrc != src->ssrc) { sess->suggested_ssrc = src->ssrc; + sess->internal_ssrc_set = TRUE; + } } /* update point-to-point status */ @@ -1507,10 +1829,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, /* for RTP packets we need to set the source in probation. Receiving RTCP * packets of an SSRC, on the other hand, is a strong indication that we * are dealing with a valid source. */ - if (rtp) - g_object_set (source, "probation", sess->probation, NULL); - else - g_object_set (source, "probation", 0, NULL); + g_object_set (source, "probation", rtp ? sess->probation : 0, + "max-dropout-time", sess->max_dropout_time, "max-misorder-time", + sess->max_misorder_time, NULL); /* store from address, if any */ if (pinfo->address) { @@ -1584,13 +1905,14 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created, /** * rtp_session_suggest_ssrc: * @sess: a #RTPSession + * @is_random: if the suggested ssrc is random * * Suggest an unused SSRC in @sess. * * Returns: a free unused SSRC */ guint32 -rtp_session_suggest_ssrc (RTPSession * sess) +rtp_session_suggest_ssrc (RTPSession * sess, gboolean * is_random) { guint32 result; @@ -1598,6 +1920,8 @@ rtp_session_suggest_ssrc (RTPSession * sess) RTP_SESSION_LOCK (sess); result = sess->suggested_ssrc; + if (is_random) + *is_random = !sess->internal_ssrc_set; RTP_SESSION_UNLOCK (sess); return result; @@ -1720,34 +2044,6 @@ rtp_session_create_new_ssrc (RTPSession * sess) return ssrc; } - -/** - * rtp_session_create_source: - * @sess: an #RTPSession - * - * Create an #RTPSource for use in @sess. This function will create a source - * with an ssrc that is currently not used by any participants in the session. - * - * Returns: an #RTPSource. - */ -RTPSource * -rtp_session_create_source (RTPSession * sess) -{ - guint32 ssrc; - RTPSource *source; - - RTP_SESSION_LOCK (sess); - ssrc = rtp_session_create_new_ssrc (sess); - source = rtp_source_new (ssrc); - rtp_source_set_callbacks (source, &callbacks, sess); - /* we need an additional ref for the source in the hashtable */ - g_object_ref (source); - add_source (sess, source); - RTP_SESSION_UNLOCK (sess); - - return source; -} - static gboolean update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo) { @@ -1804,7 +2100,7 @@ invalid_packet: /* update the RTPPacketInfo structure with the current time and other bits * about the current buffer we are handling. * This function is typically called when a validated packet is received. - * This function should be called with the SESSION_LOCK + * This function should be called with the RTP_SESSION_LOCK */ static gboolean update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, @@ -1930,7 +2226,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, current_time, running_time, ntpnstime)) { GST_DEBUG ("invalid RTP packet received"); RTP_SESSION_UNLOCK (sess); - return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime); + return rtp_session_process_rtcp (sess, buffer, current_time, running_time, + ntpnstime); } ssrc = pinfo.ssrc; @@ -1943,6 +2240,9 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, prevactive = RTP_SOURCE_IS_ACTIVE (source); oldrate = source->bitrate; + if (created) + on_new_ssrc (sess, source); + /* let source process the packet */ result = rtp_source_process_rtp (source, &pinfo); @@ -1955,8 +2255,6 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, if (oldrate != source->bitrate) sess->recalc_bandwidth = TRUE; - if (created) - on_new_ssrc (sess, source); if (source->validated) { gboolean created; @@ -2186,7 +2484,11 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, value = g_strndup ((const gchar *) data, len); - gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL); + if (g_utf8_validate (value, -1, NULL)) { + gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL); + } else { + GST_WARNING ("ignore SDES field %s with non-utf8 data %s", name, value); + } g_free (name); g_free (value); @@ -2236,21 +2538,18 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, for (i = 0; i < count; i++) { guint32 ssrc; RTPSource *source; - gboolean created, prevactive, prevsender; + gboolean prevactive, prevsender; guint pmembers, members; ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i); GST_DEBUG ("SSRC: %08x", ssrc); /* find src and mark bye, no probation when dealing with RTCP */ - source = obtain_source (sess, ssrc, &created, pinfo, FALSE); - if (!source) - return; - - if (source->internal) { - /* our own source, something weird with this packet */ - g_object_unref (source); - continue; + source = find_source (sess, ssrc); + if (!source || source->internal) { + GST_DEBUG ("Ignoring suspicious BYE packet (reason: %s)", + !source ? "can't find source" : "has internal source SSRC"); + break; } /* store time for when we need to time out this source */ @@ -2277,27 +2576,38 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, pinfo->current_time < sess->next_rtcp_check_time) { GstClockTime time_remaining; - time_remaining = sess->next_rtcp_check_time - pinfo->current_time; - sess->next_rtcp_check_time = - gst_util_uint64_scale (time_remaining, members, pmembers); + /* Scale our next RTCP check time according to the change of numbers + * of members. But only if a) this is the first RTCP, or b) this is not + * a feedback session, or c) this is a feedback session but we schedule + * for every RTCP interval (aka no t-rr-interval set). + * + * FIXME: a) and b) are not great as we will possibly go below Tmin + * for non-feedback profiles and in case of a) below + * Tmin/t-rr-interval in any case. + */ + if (sess->last_rtcp_send_time == GST_CLOCK_TIME_NONE || + !(sess->rtp_profile == GST_RTP_PROFILE_AVPF + || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) || + sess->next_rtcp_check_time - sess->last_rtcp_send_time == + sess->last_rtcp_interval) { + time_remaining = sess->next_rtcp_check_time - pinfo->current_time; + sess->next_rtcp_check_time = + gst_util_uint64_scale (time_remaining, members, pmembers); + sess->next_rtcp_check_time += pinfo->current_time; + } + sess->last_rtcp_interval = + gst_util_uint64_scale (sess->last_rtcp_interval, members, pmembers); GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time)); - sess->next_rtcp_check_time += pinfo->current_time; - /* mark pending reconsider. We only want to signal the reconsideration * once after we handled all the source in the bye packet */ reconsider = TRUE; } } - if (created) - on_new_ssrc (sess, source); - on_bye_ssrc (sess, source); - - g_object_unref (source); } if (reconsider) { RTP_SESSION_UNLOCK (sess); @@ -2306,6 +2616,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, sess->callbacks.reconsider (sess, sess->reconsider_user_data); RTP_SESSION_LOCK (sess); } + g_free (reason); } @@ -2314,38 +2625,70 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, RTPPacketInfo * pinfo) { GST_DEBUG ("received APP"); + + if (g_signal_has_handler_pending (sess, + rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, TRUE)) { + GstBuffer *data_buffer = NULL; + guint16 data_length; + gchar name[5]; + + data_length = gst_rtcp_packet_app_get_data_length (packet) * 4; + if (data_length > 0) { + guint8 *data = gst_rtcp_packet_app_get_data (packet); + data_buffer = gst_buffer_copy_region (packet->rtcp->buffer, + GST_BUFFER_COPY_MEMORY, data - packet->rtcp->map.data, data_length); + GST_BUFFER_PTS (data_buffer) = pinfo->running_time; + } + + memcpy (name, gst_rtcp_packet_app_get_name (packet), 4); + name[4] = '\0'; + + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_APP_RTCP], 0, + gst_rtcp_packet_app_get_subtype (packet), + gst_rtcp_packet_app_get_ssrc (packet), name, data_buffer); + RTP_SESSION_LOCK (sess); + + if (data_buffer) + gst_buffer_unref (data_buffer); + } } static gboolean rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src, - gboolean fir, GstClockTime current_time) + guint32 media_ssrc, gboolean fir, GstClockTime current_time) { guint32 round_trip = 0; rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, &round_trip); - if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) { + if (src->last_keyframe_request != GST_CLOCK_TIME_NONE && round_trip) { GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip, GST_SECOND, 65536); - if (current_time - sess->last_keyframe_request < 2 * round_trip_in_ns) { - GST_DEBUG ("Ignoring %s request because one was send without one " + /* Sanity check to avoid always ignoring PLI/FIR if we receive RTCP + * packets with erroneous values resulting in crazy high RTT. */ + if (round_trip_in_ns > 5 * GST_SECOND) + round_trip_in_ns = GST_SECOND / 2; + + if (current_time - src->last_keyframe_request < 2 * round_trip_in_ns) { + GST_DEBUG ("Ignoring %s request from %X because one was send without one " "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")", - fir ? "FIR" : "PLI", - GST_TIME_ARGS (current_time - sess->last_keyframe_request), - GST_TIME_ARGS (round_trip_in_ns));; + fir ? "FIR" : "PLI", rtp_source_get_ssrc (src), + GST_TIME_ARGS (current_time - src->last_keyframe_request), + GST_TIME_ARGS (round_trip_in_ns)); return FALSE; } } - sess->last_keyframe_request = current_time; + src->last_keyframe_request = current_time; - GST_LOG ("received %s request from %X %p(%p)", fir ? "FIR" : "PLI", - rtp_source_get_ssrc (src), sess->callbacks.process_rtp, + GST_LOG ("received %s request from %X about %X %p(%p)", fir ? "FIR" : "PLI", + rtp_source_get_ssrc (src), media_ssrc, sess->callbacks.process_rtp, sess->callbacks.request_key_unit); RTP_SESSION_UNLOCK (sess); - sess->callbacks.request_key_unit (sess, fir, + sess->callbacks.request_key_unit (sess, media_ssrc, fir, sess->request_key_unit_user_data); RTP_SESSION_LOCK (sess); @@ -2362,15 +2705,21 @@ rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc, return; src = find_source (sess, sender_ssrc); - if (src == NULL) - return; + if (src == NULL) { + /* try to find a src with media_ssrc instead */ + src = find_source (sess, media_ssrc); + if (src == NULL) + return; + } - rtp_session_request_local_key_unit (sess, src, FALSE, current_time); + rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE, + current_time); } static void rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc, - guint8 * fci_data, guint fci_length, GstClockTime current_time) + guint32 media_ssrc, guint8 * fci_data, guint fci_length, + GstClockTime current_time) { RTPSource *src; guint32 ssrc; @@ -2421,7 +2770,8 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc, if (!our_request) return; - rtp_session_request_local_key_unit (sess, src, TRUE, current_time); + rtp_session_request_local_key_unit (sess, src, media_ssrc, TRUE, + current_time); } static void @@ -2456,20 +2806,34 @@ static void rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, RTPPacketInfo * pinfo, GstClockTime current_time) { - GstRTCPType type = gst_rtcp_packet_get_type (packet); - GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet); - guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet); - guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet); - guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet); - guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet); + GstRTCPType type; + GstRTCPFBType fbtype; + guint32 sender_ssrc, media_ssrc; + guint8 *fci_data; + guint fci_length; RTPSource *src; + /* The feedback packet must include both sender SSRC and media SSRC */ + if (packet->length < 2) + return; + + type = gst_rtcp_packet_get_type (packet); + fbtype = gst_rtcp_packet_fb_get_type (packet); + sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet); + media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet); + src = find_source (sess, media_ssrc); /* skip non-bye packets for sources that are marked BYE */ if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src)) return; + if (src) + g_object_ref (src); + + fci_data = gst_rtcp_packet_fb_get_fci (packet); + fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32); + GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of " "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length); @@ -2481,7 +2845,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data, fci_length); - GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time; + GST_BUFFER_PTS (fci_buffer) = pinfo->running_time; } RTP_SESSION_UNLOCK (sess); @@ -2493,7 +2857,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, gst_buffer_unref (fci_buffer); } - if (src && sess->rtcp_feedback_retention_window) { + if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) { rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time); } @@ -2504,12 +2868,16 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, case GST_RTCP_TYPE_PSFB: switch (fbtype) { case GST_RTCP_PSFB_TYPE_PLI: + if (src) + src->stats.recv_pli_count++; rtp_session_process_pli (sess, sender_ssrc, media_ssrc, current_time); break; case GST_RTCP_PSFB_TYPE_FIR: - rtp_session_process_fir (sess, sender_ssrc, fci_data, fci_length, - current_time); + if (src) + src->stats.recv_fir_count++; + rtp_session_process_fir (sess, sender_ssrc, media_ssrc, fci_data, + fci_length, current_time); break; default: break; @@ -2518,6 +2886,8 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, case GST_RTCP_TYPE_RTPFB: switch (fbtype) { case GST_RTCP_RTPFB_TYPE_NACK: + if (src) + src->stats.recv_nack_count++; rtp_session_process_nack (sess, sender_ssrc, media_ssrc, fci_data, fci_length, current_time); break; @@ -2528,6 +2898,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, break; } } + + if (src) + g_object_unref (src); } /** @@ -2544,7 +2917,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, */ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, - GstClockTime current_time, guint64 ntpnstime) + GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) { GstRTCPPacket packet; gboolean more, is_bye = FALSE, do_sync = FALSE; @@ -2555,15 +2928,18 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); - if (!gst_rtcp_buffer_validate (buffer)) + if (!gst_rtcp_buffer_validate_reduced (buffer)) goto invalid_packet; GST_DEBUG ("received RTCP packet"); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0, + buffer); + RTP_SESSION_LOCK (sess); /* update pinfo stats */ update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time, - -1, ntpnstime); + running_time, ntpnstime); /* start processing the compound packet */ gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp); @@ -2596,8 +2972,14 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, case GST_RTCP_TYPE_PSFB: rtp_session_process_feedback (sess, &packet, &pinfo, current_time); break; + case GST_RTCP_TYPE_XR: + /* FIXME: This block is added to downgrade warning level. + * Once the parser is implemented, it should be replaced with + * a proper process function. */ + GST_DEBUG ("got RTCP XR packet, but ignored"); + break; default: - GST_WARNING ("got unknown RTCP packet"); + GST_WARNING ("got unknown RTCP packet type: %d", type); break; } more = gst_rtcp_packet_move_to_next (&packet); @@ -2666,11 +3048,33 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) RTP_SESSION_LOCK (sess); source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE); + sess->suggested_ssrc = ssrc; + sess->internal_ssrc_set = TRUE; + sess->internal_ssrc_from_caps_or_property = TRUE; if (source) { rtp_source_update_caps (source, caps); + + if (created) + on_new_sender_ssrc (sess, source); + g_object_unref (source); } + + if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) { + source = + obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE); + if (source) { + rtp_source_update_caps (source, caps); + + if (created) + on_new_sender_ssrc (sess, source); + + g_object_unref (source); + } + } RTP_SESSION_UNLOCK (sess); + } else { + sess->internal_ssrc_from_caps_or_property = FALSE; } } @@ -2682,8 +3086,8 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) * @current_time: the current system time * @running_time: the running time of @data * - * Send the RTP buffer in the session manager. This function takes ownership of - * @buffer. + * Send the RTP data (a buffer or buffer list) in the session manager. This + * function takes ownership of @data. * * Returns: a #GstFlowReturn. */ @@ -2709,6 +3113,12 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, goto invalid_packet; source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time); + if (created) + on_new_sender_ssrc (sess, source); + + if (!source->internal) + /* FIXME: Send GstRTPCollision upstream */ + goto collision; prevsender = RTP_SOURCE_IS_SENDER (source); oldrate = source->bitrate; @@ -2734,6 +3144,15 @@ invalid_packet: GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } +collision: + { + g_object_unref (source); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + RTP_SESSION_UNLOCK (sess); + GST_WARNING ("non-internal source with same ssrc %08x, drop packet", + pinfo.ssrc); + return GST_FLOW_OK; + } } static void @@ -2762,9 +3181,8 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) add_bitrates, &bandwidth); - bandwidth /= 8.0; } - if (bandwidth < 8000) + if (bandwidth < RTP_STATS_BANDWIDTH) bandwidth = RTP_STATS_BANDWIDTH; rtp_stats_set_bandwidths (&sess->stats, bandwidth, @@ -2777,9 +3195,12 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, stats = &sess->bye_stats; result = rtp_stats_calculate_bye_interval (stats); } else { + session_update_ptp (sess); + stats = &sess->stats; result = rtp_stats_calculate_rtcp_interval (stats, - stats->internal_sender_sources > 0, first); + stats->internal_sender_sources > 0, sess->rtp_profile, + sess->is_doing_ptp, first); } GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d", @@ -2838,16 +3259,17 @@ rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time) INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100); sess->bye_stats.bye_members = 1; sess->first_rtcp = TRUE; - sess->allow_early = TRUE; /* reschedule transmission */ sess->last_rtcp_send_time = current_time; + sess->last_rtcp_check_time = current_time; interval = calculate_rtcp_interval (sess, FALSE, TRUE); if (interval != GST_CLOCK_TIME_NONE) sess->next_rtcp_check_time = current_time + interval; else sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_interval = interval; GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time)); @@ -2928,16 +3350,36 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time) GST_DEBUG ("reconsider BYE, more than 50 sources"); /* reconsider BYE if members >= 50 */ interval = calculate_rtcp_interval (sess, FALSE, TRUE); + sess->last_rtcp_interval = interval; } } else { if (sess->first_rtcp) { GST_DEBUG ("first RTCP packet"); /* we are called for the first time */ interval = calculate_rtcp_interval (sess, FALSE, TRUE); + sess->last_rtcp_interval = interval; } else if (sess->next_rtcp_check_time < current_time) { GST_DEBUG ("old check time expired, getting new timeout"); /* get a new timeout when we need to */ interval = calculate_rtcp_interval (sess, FALSE, FALSE); + sess->last_rtcp_interval = interval; + + if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF + || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) + && interval != GST_CLOCK_TIME_NONE) { + /* Apply the rules from RFC 4585 section 3.5.3 */ + if (sess->stats.min_interval != 0) { + GstClockTime T_rr_current_interval = g_random_double_range (0.5, + 1.5) * sess->stats.min_interval * GST_SECOND; + + if (T_rr_current_interval > interval) { + GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval), + GST_TIME_ARGS (interval)); + interval = T_rr_current_interval; + } + } + } } } @@ -2999,6 +3441,9 @@ session_start_rtcp (RTPSession * sess, ReportData * data) gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp); + if (data->is_early && sess->reduced_size_rtcp) + return; + if (RTP_SOURCE_IS_SENDER (own)) { guint64 ntptime; guint32 rtptime; @@ -3017,7 +3462,9 @@ session_start_rtcp (RTPSession * sess, ReportData * data) /* fill in sender report info */ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - ntptime, rtptime, packet_count, octet_count); + sess->timestamp_sender_reports ? ntptime : 0, + sess->timestamp_sender_reports ? rtptime : 0, + packet_count, octet_count); } else { /* we are only receiver, create RR */ GST_DEBUG ("create RR for SSRC %08x", own->ssrc); @@ -3055,8 +3502,8 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) return; } - /* only report about other sender */ - if (source == data->source) + /* only report about remote sources */ + if (source->internal) goto reported; if (!RTP_SOURCE_IS_SENDER (source)) { @@ -3064,6 +3511,11 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) goto reported; } + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + goto reported; + } + GST_DEBUG ("create RB for SSRC %08x", source->ssrc); /* get new stats */ @@ -3113,6 +3565,7 @@ session_add_fir (const gchar * key, RTPSource * source, ReportData * data) fci_data[1] = fci_data[2] = fci_data[3] = 0; source->send_fir = FALSE; + source->stats.sent_fir_count++; } static void @@ -3181,21 +3634,72 @@ session_pli (const gchar * key, RTPSource * source, ReportData * data) source->send_pli = FALSE; data->may_suppress = FALSE; + + source->stats.sent_pli_count++; } /* construct NACK */ static void session_nack (const gchar * key, RTPSource * source, ReportData * data) { + RTPSession *sess = data->sess; GstRTCPBuffer *rtcp = &data->rtcpbuf; GstRTCPPacket *packet = &data->packet; - guint32 *nacks; - guint n_nacks, i; + guint16 *nacks; + GstClockTime *nack_deadlines; + guint n_nacks, i = 0; + guint nacked_seqnums = 0; + guint16 n_fb_nacks = 0; guint8 *fci_data; if (!source->send_nack) return; + nacks = rtp_source_get_nacks (source, &n_nacks); + nack_deadlines = rtp_source_get_nack_deadlines (source, NULL); + GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks, + GST_TIME_ARGS (data->current_time)); + + /* cleanup expired nacks */ + for (i = 0; i < n_nacks; i++) { + GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i], + GST_TIME_ARGS (nack_deadlines[i])); + if (nack_deadlines[i] >= data->current_time) + break; + } + + if (data->is_early) { + /* don't remove them all if this is an early RTCP packet. It may happen + * that the NACKs are late due to high RTT, not sending NACKs at all would + * keep the RTX RTT stats high and maintain a dropping state. */ + i = MIN (n_nacks - 1, i); + } + + if (i) { + GST_WARNING ("Removing %u expired NACKS", i); + rtp_source_clear_nacks (source, i); + n_nacks -= i; + if (n_nacks == 0) + return; + } + + /* allow overriding NACK to packet conversion */ + if (g_signal_has_handler_pending (sess, + rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) { + /* this is needed as it will actually resize the buffer */ + gst_rtcp_buffer_unmap (rtcp); + + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, + data->source->ssrc, source->ssrc, source->nacks, data->rtcp, + &nacked_seqnums); + + /* and now remap for the remaining work */ + gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp); + + if (nacked_seqnums > 0) + goto done; + } + if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet)) /* exit because the packet is full, will put next request in a * further packet */ @@ -3205,19 +3709,46 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc); gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc); - nacks = rtp_source_get_nacks (source, &n_nacks); - GST_DEBUG ("%u NACKs", n_nacks); - if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks)) + if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) { + gst_rtcp_packet_remove (packet); + GST_WARNING ("no nacks fit in the packet"); return; + } fci_data = gst_rtcp_packet_fb_get_fci (packet); - for (i = 0; i < n_nacks; i++) { - GST_WRITE_UINT32_BE (fci_data, nacks[i]); + for (i = 0; i < n_nacks; i = nacked_seqnums) { + guint16 seqnum = nacks[i]; + guint16 blp = 0; + guint j; + + if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1)) + break; + + n_fb_nacks++; + nacked_seqnums++; + + for (j = i + 1; j < n_nacks; j++) { + gint diff; + + diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]); + GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff); + if (diff > 16) + break; + + blp |= 1 << (diff - 1); + nacked_seqnums++; + } + + GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp); fci_data += 4; - data->nacked_seqnums++; } - rtp_source_clear_nacks (source); + GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks); + source->stats.sent_nack_count += n_fb_nacks; + +done: + data->nacked_seqnums += nacked_seqnums; + rtp_source_clear_nacks (source, nacked_seqnums); data->may_suppress = FALSE; } @@ -3238,8 +3769,8 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) /* check for outdated collisions */ if (source->internal) { GST_DEBUG ("Timing out collisions for %x", source->ssrc); - rtp_source_timeout (source, data->current_time, - data->running_time - sess->rtcp_feedback_retention_window); + rtp_source_timeout (source, data->current_time, data->running_time, + sess->rtcp_feedback_retention_window); } /* nothing else to do when without RTCP */ @@ -3447,7 +3978,7 @@ make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data) static gboolean is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) { - GstClockTime new_send_time, elapsed; + GstClockTime new_send_time; GstClockTime interval; RTPSessionStats *stats; @@ -3461,15 +3992,11 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) else data->is_early = FALSE; - if (data->is_early && sess->next_early_rtcp_time < current_time) { - GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %" + if (data->is_early && sess->next_early_rtcp_time <= current_time) { + GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time), GST_TIME_ARGS (current_time)); - goto early; - } - - /* no need to check yet */ - if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE || + } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE || sess->next_rtcp_check_time > current_time) { GST_DEBUG ("no check time yet, next %" GST_TIME_FORMAT " > now %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time), @@ -3477,22 +4004,32 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) return FALSE; } -early: - /* get elapsed time since we last reported */ - elapsed = current_time - sess->last_rtcp_send_time; - /* take interval and add jitter */ interval = data->interval; if (interval != GST_CLOCK_TIME_NONE) interval = rtp_stats_add_rtcp_jitter (stats, interval); - /* perform forward reconsideration */ - if (interval != GST_CLOCK_TIME_NONE) { - GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %" - GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed)); - new_send_time = interval + sess->last_rtcp_send_time; + if (sess->last_rtcp_check_time != GST_CLOCK_TIME_NONE) { + /* perform forward reconsideration */ + if (interval != GST_CLOCK_TIME_NONE) { + GstClockTime elapsed; + + /* get elapsed time since we last reported */ + elapsed = current_time - sess->last_rtcp_check_time; + + GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %" + GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed)); + new_send_time = interval + sess->last_rtcp_check_time; + } else { + new_send_time = sess->last_rtcp_check_time; + } } else { - new_send_time = sess->last_rtcp_send_time; + /* If this is the first RTCP packet, we can reconsider anything based + * on the last RTCP send time because there was none. + */ + g_warn_if_fail (!data->is_early); + data->is_early = FALSE; + new_send_time = current_time; } if (!data->is_early) { @@ -3502,32 +4039,33 @@ early: GST_TIME_ARGS (new_send_time)); /* store new check time */ sess->next_rtcp_check_time = new_send_time; + sess->last_rtcp_interval = interval; return FALSE; } - sess->next_rtcp_check_time = current_time + interval; - } else if (interval != GST_CLOCK_TIME_NONE) { - /* Apply the rules from RFC 4585 section 3.5.3 */ - if (stats->min_interval != 0 && !sess->first_rtcp) { - GstClockTime T_rr_current_interval = - g_random_double_range (0.5, 1.5) * stats->min_interval; - - /* This will caused the RTCP to be suppressed if no FB packets are added */ - if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) { - GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT - " last: %" GST_TIME_FORMAT - " + T_rr_current_interval: %" GST_TIME_FORMAT - " > new_send_time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (stats->min_interval), - GST_TIME_ARGS (sess->last_rtcp_send_time), - GST_TIME_ARGS (T_rr_current_interval), - GST_TIME_ARGS (new_send_time)); - data->may_suppress = TRUE; + + sess->last_rtcp_interval = interval; + if ((sess->rtp_profile == GST_RTP_PROFILE_AVPF + || sess->rtp_profile == GST_RTP_PROFILE_SAVPF) + && interval != GST_CLOCK_TIME_NONE) { + /* Apply the rules from RFC 4585 section 3.5.3 */ + if (stats->min_interval != 0 && !sess->first_rtcp) { + GstClockTime T_rr_current_interval = + g_random_double_range (0.5, 1.5) * stats->min_interval * GST_SECOND; + + if (T_rr_current_interval > interval) { + GST_DEBUG ("Adjusting interval for t-rr-interval: %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (T_rr_current_interval), + GST_TIME_ARGS (interval)); + interval = T_rr_current_interval; + } } } + sess->next_rtcp_check_time = current_time + interval; } - GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT, - GST_TIME_ARGS (new_send_time)); + + GST_DEBUG ("can send RTCP now, next %" GST_TIME_FORMAT, + GST_TIME_ARGS (sess->next_rtcp_check_time)); return TRUE; } @@ -3570,6 +4108,12 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) if (sess->scheduled_bye && !source->marked_bye) return; + /* skip if RTCP is disabled */ + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + return; + } + data->source = source; /* open packet */ @@ -3585,7 +4129,7 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) session_report_blocks, data); } - if (!data->has_sdes) + if (!data->has_sdes && (!data->is_early || !sess->reduced_size_rtcp)) session_sdes (sess, data); if (data->have_fir) @@ -3631,6 +4175,47 @@ update_generation (const gchar * key, RTPSource * source, ReportData * data) } } +static void +schedule_remaining_nacks (const gchar * key, RTPSource * source, + ReportData * data) +{ + RTPSession *sess = data->sess; + GstClockTime *nack_deadlines; + GstClockTime deadline; + guint n_nacks; + + if (!source->send_nack) + return; + + /* the scheduling is entirely based on available bandwidth, just take the + * biggest seqnum, which will have the largest deadline to request early + * RTCP. */ + nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks); + deadline = nack_deadlines[n_nacks - 1]; + RTP_SESSION_UNLOCK (sess); + rtp_session_send_rtcp_with_deadline (sess, deadline); + RTP_SESSION_LOCK (sess); +} + +static gboolean +rtp_session_are_all_sources_bye (RTPSession * sess) +{ + GHashTableIter iter; + RTPSource *src; + + RTP_SESSION_LOCK (sess); + g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]); + while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) { + if (src->internal && !src->sent_bye) { + RTP_SESSION_UNLOCK (sess); + return FALSE; + } + } + RTP_SESSION_UNLOCK (sess); + + return TRUE; +} + /** * rtp_session_on_timeout: * @sess: an #RTPSession @@ -3657,6 +4242,7 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, ReportData data = { GST_RTCP_BUFFER_INIT }; GHashTable *table_copy; ReportOutput *output; + gboolean all_empty = FALSE; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -3686,6 +4272,11 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, source = obtain_internal_source (sess, sess->suggested_ssrc, &created, current_time); + sess->internal_ssrc_set = TRUE; + + if (created) + on_new_sender_ssrc (sess, source); + g_object_unref (source); } @@ -3715,8 +4306,12 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, if (!is_rtcp_time (sess, current_time, &data)) goto done; - GST_DEBUG ("doing RTCP generation %u for %u sources, early %d", - sess->generation, data.num_to_report, data.is_early); + /* check if all the buffers are empty afer generation */ + all_empty = TRUE; + + GST_DEBUG + ("doing RTCP generation %u for %u sources, early %d, may suppress %d", + sess->generation, data.num_to_report, data.is_early, data.may_suppress); /* generate RTCP for all internal sources */ g_hash_table_foreach (sess->ssrcs[sess->mask_idx], @@ -3728,23 +4323,33 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, /* we keep track of the last report time in order to timeout inactive * receivers or senders */ - if (!data.is_early && !data.may_suppress) + if (!data.is_early) { + GST_DEBUG ("Time since last regular RTCP: %" GST_TIME_FORMAT " - %" + GST_TIME_FORMAT " = %" GST_TIME_FORMAT, + GST_TIME_ARGS (data.current_time), + GST_TIME_ARGS (sess->last_rtcp_send_time), + GST_TIME_ARGS (data.current_time - sess->last_rtcp_send_time)); sess->last_rtcp_send_time = data.current_time; + } + + GST_DEBUG ("Time since last RTCP: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT + " = %" GST_TIME_FORMAT, GST_TIME_ARGS (data.current_time), + GST_TIME_ARGS (sess->last_rtcp_check_time), + GST_TIME_ARGS (data.current_time - sess->last_rtcp_check_time)); + sess->last_rtcp_check_time = data.current_time; sess->first_rtcp = FALSE; sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; sess->scheduled_bye = FALSE; - /* RFC 4585 section 3.5.2 step 6 */ - if (!data.is_early) { - sess->allow_early = TRUE; - } - done: RTP_SESSION_UNLOCK (sess); + /* notify about updated statistics */ + g_object_notify (G_OBJECT (sess), "stats"); + /* push out the RTCP packets */ while ((output = g_queue_pop_head (&data.output))) { - gboolean do_not_suppress; + gboolean do_not_suppress, empty_buffer; GstBuffer *buffer = output->buffer; RTPSource *source = output->source; @@ -3752,7 +4357,13 @@ done: g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0, buffer, data.is_early, &do_not_suppress); - if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) { + empty_buffer = gst_buffer_get_size (buffer) == 0; + + if (!empty_buffer) + all_empty = FALSE; + + if (sess->callbacks.send_rtcp && + !empty_buffer && (do_not_suppress || !data.may_suppress)) { guint packet_size; packet_size = gst_buffer_get_size (buffer) + sess->header_len; @@ -3761,19 +4372,38 @@ done: GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats, sess->stats.avg_rtcp_packet_size, packet_size); result = - sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye, - sess->send_rtcp_user_data); + sess->callbacks.send_rtcp (sess, source, buffer, + rtp_session_are_all_sources_bye (sess), sess->send_rtcp_user_data); + + RTP_SESSION_LOCK (sess); sess->stats.nacks_sent += data.nacked_seqnums; + on_sender_ssrc_active (sess, source); + RTP_SESSION_UNLOCK (sess); } else { GST_DEBUG ("freeing packet callback: %p" - " do_not_suppress: %d may_suppress: %d", - sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress); - sess->stats.nacks_dropped += data.nacked_seqnums; + " empty_buffer: %d, " + " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp, + empty_buffer, do_not_suppress, data.may_suppress); + if (!empty_buffer) { + RTP_SESSION_LOCK (sess); + sess->stats.nacks_dropped += data.nacked_seqnums; + RTP_SESSION_UNLOCK (sess); + } gst_buffer_unref (buffer); } g_object_unref (source); g_slice_free (ReportOutput, output); } + + if (all_empty) + GST_ERROR ("generated empty RTCP messages for all the sources"); + + /* schedule remaining nacks */ + RTP_SESSION_LOCK (sess); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) schedule_remaining_nacks, &data); + RTP_SESSION_UNLOCK (sess); + return result; } @@ -3791,18 +4421,23 @@ gboolean rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, GstClockTime max_delay) { - GstClockTime T_dither_max, T_rr; + GstClockTime T_dither_max, T_rr, offset = 0; gboolean ret; + gboolean allow_early; /* Implements the algorithm described in RFC 4585 section 3.5.2 */ RTP_SESSION_LOCK (sess); + /* We assume a feedback profile if something is requesting RTCP + * to be sent */ + sess->rtp_profile = GST_RTP_PROFILE_AVPF; + /* Check if already requested */ /* RFC 4585 section 3.5.2 step 2 */ if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) { GST_LOG_OBJECT (sess, "already have next early rtcp time"); - ret = TRUE; + ret = (current_time + max_delay > sess->next_early_rtcp_time); goto end; } @@ -3812,7 +4447,33 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, goto end; } - T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time; + /* RFC 4585 section 3.5.3 step 1 + * If no regular RTCP packet has been sent before, then a regular + * RTCP packet has to be scheduled first and FB messages might be + * included there + */ + if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) { + GST_LOG_OBJECT (sess, "no RTCP sent yet"); + + if (current_time + max_delay > sess->next_rtcp_check_time) { + GST_LOG_OBJECT (sess, + "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time), + GST_TIME_ARGS (max_delay), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = TRUE; + } else { + GST_LOG_OBJECT (sess, + "can't allow early feedback, next scheduled time is too late %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT, + GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = FALSE; + } + goto end; + } + + T_rr = sess->last_rtcp_interval; /* RFC 4585 section 3.5.2 step 2b */ /* If the total sources is <=2, then there is only us and one peer */ @@ -3829,13 +4490,42 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, /* RFC 4585 section 3.5.2 step 3 */ if (current_time + T_dither_max > sess->next_rtcp_check_time) { - GST_LOG_OBJECT (sess, "don't send because of dither"); - ret = FALSE; + GST_LOG_OBJECT (sess, + "don't send because of dither, next scheduled time is too soon %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT, + GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = T_dither_max <= max_delay; goto end; } - /* RFC 4585 section 3.5.2 step 4a */ - if (sess->allow_early == FALSE) { + /* RFC 4585 section 3.5.2 step 4a and + * RFC 4585 section 3.5.2 step 6 */ + allow_early = FALSE; + if (sess->last_rtcp_check_time == sess->last_rtcp_send_time) { + /* Last time we sent a full RTCP packet, we can now immediately + * send an early one as allow_early was reset to TRUE */ + allow_early = TRUE; + } else if (sess->last_rtcp_check_time + T_rr <= current_time + max_delay) { + /* Last packet we sent was an early RTCP packet and more than + * T_rr has passed since then, meaning we would have suppressed + * a regular RTCP packet already and reset allow_early to TRUE */ + allow_early = TRUE; + + /* We have to offset a bit as T_rr has not passed yet, but will before + * max_delay */ + if (sess->last_rtcp_check_time + T_rr > current_time) + offset = (sess->last_rtcp_check_time + T_rr) - current_time; + } else { + GST_DEBUG_OBJECT (sess, + "can't allow early RTCP yet: last regular %" GST_TIME_FORMAT ", %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT " + %" + GST_TIME_FORMAT, GST_TIME_ARGS (sess->last_rtcp_send_time), + GST_TIME_ARGS (sess->last_rtcp_check_time), GST_TIME_ARGS (T_rr), + GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay)); + } + + if (!allow_early) { /* Ignore the request a scheduled packet will be in time anyway */ if (current_time + max_delay > sess->next_rtcp_check_time) { GST_LOG_OBJECT (sess, @@ -3845,7 +4535,11 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, GST_TIME_ARGS (sess->next_rtcp_check_time)); ret = TRUE; } else { - GST_LOG_OBJECT (sess, "can't allow early feedback"); + GST_LOG_OBJECT (sess, + "can't allow early feedback and next scheduled time is too late %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT, + GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay), + GST_TIME_ARGS (sess->next_rtcp_check_time)); ret = FALSE; } goto end; @@ -3855,22 +4549,16 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, if (T_dither_max) { /* Schedule an early transmission later */ sess->next_early_rtcp_time = g_random_double () * T_dither_max + - current_time; + current_time + offset; } else { /* If no dithering, schedule it for NOW */ - sess->next_early_rtcp_time = current_time; + sess->next_early_rtcp_time = current_time + offset; } - /* RFC 4585 section 3.5.2 step 6 */ - sess->allow_early = FALSE; - /* Delay next regular RTCP packet to not exceed the short-term - * RTCP bandwidth when using early feedback as compared to - * without */ - sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr; - sess->last_rtcp_send_time += T_rr; - - GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT, - GST_TIME_ARGS (sess->next_early_rtcp_time)); + GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT + ", next regular RTCP time %" GST_TIME_FORMAT, + GST_TIME_ARGS (sess->next_early_rtcp_time), + GST_TIME_ARGS (sess->next_rtcp_check_time)); RTP_SESSION_UNLOCK (sess); /* notify app of need to send packet early @@ -3888,6 +4576,35 @@ end: } static gboolean +rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now, + GstClockTime max_delay) +{ + /* notify the application that we intend to send early RTCP */ + if (sess->callbacks.notify_early_rtcp) + sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data); + + return rtp_session_request_early_rtcp (sess, now, max_delay); +} + +static gboolean +rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline) +{ + GstClockTime now, max_delay; + + if (!sess->callbacks.send_rtcp) + return FALSE; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); + + if (deadline < now) + return FALSE; + + max_delay = deadline - now; + + return rtp_session_send_rtcp_internal (sess, now, max_delay); +} + +static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) { GstClockTime now; @@ -3897,7 +4614,7 @@ rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) now = sess->callbacks.request_time (sess, sess->request_time_user_data); - return rtp_session_request_early_rtcp (sess, now, max_delay); + return rtp_session_send_rtcp_internal (sess, now, max_delay); } gboolean @@ -3906,11 +4623,6 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, { RTPSource *src; - if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) { - GST_DEBUG ("FIR/PLI not sent"); - return FALSE; - } - RTP_SESSION_LOCK (sess); src = find_source (sess, ssrc); if (src == NULL) @@ -3928,6 +4640,10 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, } RTP_SESSION_UNLOCK (sess); + if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) { + GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP"); + } + return TRUE; /* ERRORS */ @@ -3954,21 +4670,27 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum, GstClockTime max_delay) { RTPSource *source; + GstClockTime now; - if (!rtp_session_send_rtcp (sess, max_delay)) { - GST_DEBUG ("NACK not sent"); + if (!sess->callbacks.send_rtcp) return FALSE; - } + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); RTP_SESSION_LOCK (sess); source = find_source (sess, ssrc); if (source == NULL) goto no_source; - GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum); - rtp_source_register_nack (source, seqnum); + GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT, + ssrc, seqnum, GST_TIME_ARGS (now + max_delay)); + rtp_source_register_nack (source, seqnum, now + max_delay); RTP_SESSION_UNLOCK (sess); + if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) { + GST_DEBUG ("NACK not sent early, sending with next regular RTCP"); + } + return TRUE; /* ERRORS */