X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Frtpsession.c;h=77c5594f280686526853878ce83416e17dac0b04;hb=cc65bff7c1d89d569a0476f8d67cbe687fc5a9fc;hp=4e8a732d40ba1e2b17c56cfb85942f6b4ddfda39;hpb=55bb4d5c955d37c229353896ac92a66d84093bd8;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 4e8a732..77c5594 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -42,50 +42,55 @@ enum SIGNAL_ON_BYE_TIMEOUT, SIGNAL_ON_TIMEOUT, SIGNAL_ON_SENDER_TIMEOUT, + SIGNAL_ON_SENDING_RTCP, + SIGNAL_ON_FEEDBACK_RTCP, + SIGNAL_SEND_RTCP, LAST_SIGNAL }; #define DEFAULT_INTERNAL_SOURCE NULL #define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH -#define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_BANDWIDTH -#define DEFAULT_SDES_CNAME NULL -#define DEFAULT_SDES_NAME NULL -#define DEFAULT_SDES_EMAIL NULL -#define DEFAULT_SDES_PHONE NULL -#define DEFAULT_SDES_LOCATION NULL -#define DEFAULT_SDES_TOOL NULL -#define DEFAULT_SDES_NOTE NULL +#define DEFAULT_RTCP_FRACTION (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH) +#define DEFAULT_RTCP_RR_BANDWIDTH -1 +#define DEFAULT_RTCP_RS_BANDWIDTH -1 +#define DEFAULT_RTCP_MTU 1400 +#define DEFAULT_SDES NULL #define DEFAULT_NUM_SOURCES 0 #define DEFAULT_NUM_ACTIVE_SOURCES 0 #define DEFAULT_SOURCES NULL +#define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND) +#define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND) enum { PROP_0, + PROP_INTERNAL_SSRC, PROP_INTERNAL_SOURCE, PROP_BANDWIDTH, PROP_RTCP_FRACTION, - PROP_SDES_CNAME, - PROP_SDES_NAME, - PROP_SDES_EMAIL, - PROP_SDES_PHONE, - PROP_SDES_LOCATION, - PROP_SDES_TOOL, - PROP_SDES_NOTE, + PROP_RTCP_RR_BANDWIDTH, + PROP_RTCP_RS_BANDWIDTH, + PROP_RTCP_MTU, + PROP_SDES, PROP_NUM_SOURCES, PROP_NUM_ACTIVE_SOURCES, PROP_SOURCES, + PROP_FAVOR_NEW, + PROP_RTCP_MIN_INTERVAL, + PROP_RTCP_FEEDBACK_RETENTION_WINDOW, PROP_LAST }; -/* update average packet size, we keep this scaled by 16 to keep enough - * precision. */ -#define UPDATE_AVG(avg, val) \ - if ((avg) == 0) \ - (avg) = (val) << 4; \ - else \ +/* update average packet size */ +#define INIT_AVG(avg, val) \ + (avg) = (val); +#define UPDATE_AVG(avg, val) \ + if ((avg) == 0) \ + (avg) = (val); \ + else \ (avg) = ((val) + (15 * (avg))) >> 4; + /* The number RTCP intervals after which to timeout entries in the * collision table */ @@ -98,17 +103,102 @@ static void rtp_session_set_property (GObject * object, guint prop_id, static void rtp_session_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); +static gboolean rtp_session_on_sending_rtcp (RTPSession * sess, + GstBuffer * buffer, gboolean early); +static void rtp_session_send_rtcp (RTPSession * sess, + GstClockTimeDiff max_delay); + + static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT); static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, RTPArrivalStats * arrival, gboolean rtp); -static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess, +static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason, GstClockTime current_time); static GstClockTime calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, gboolean first); +static gboolean +accumulate_trues (GSignalInvocationHint * ihint, GValue * return_accu, + const GValue * handler_return, gpointer data) +{ + if (g_value_get_boolean (handler_return)) + g_value_set_boolean (return_accu, TRUE); + + return TRUE; +} + +static void +gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN (GClosure * closure, + GValue * return_value G_GNUC_UNUSED, guint n_param_values, + const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED, + gpointer marshal_data) +{ + typedef gboolean (*GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (gpointer data1, + gpointer arg_1, gboolean arg_2, gpointer data2); + register GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN callback; + register GCClosure *cc = (GCClosure *) closure; + register gpointer data1, data2; + gboolean v_return; + + g_return_if_fail (return_value != NULL); + g_return_if_fail (n_param_values == 3); + + if (G_CCLOSURE_SWAP_DATA (closure)) { + data1 = closure->data; + data2 = g_value_peek_pointer (param_values + 0); + } else { + data1 = g_value_peek_pointer (param_values + 0); + data2 = closure->data; + } + callback = + (GMarshalFunc_BOOLEAN__MINIOBJECT_BOOLEAN) (marshal_data ? marshal_data : + cc->callback); + + v_return = callback (data1, + gst_value_get_mini_object (param_values + 1), + g_value_get_boolean (param_values + 2), data2); + + g_value_set_boolean (return_value, v_return); +} + +static void +gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT (GClosure * closure, + GValue * return_value G_GNUC_UNUSED, guint n_param_values, + const GValue * param_values, gpointer invocation_hint G_GNUC_UNUSED, + gpointer marshal_data) +{ + typedef void (*GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (gpointer + data1, guint arg_1, guint arg_2, guint arg_3, guint arg_4, gpointer arg_5, + gpointer data2); + register GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT callback; + register GCClosure *cc = (GCClosure *) closure; + register gpointer data1, data2; + + g_return_if_fail (n_param_values == 6); + + if (G_CCLOSURE_SWAP_DATA (closure)) { + data1 = closure->data; + data2 = g_value_peek_pointer (param_values + 0); + } else { + data1 = g_value_peek_pointer (param_values + 0); + data2 = closure->data; + } + callback = + (GMarshalFunc_VOID__UINT_UINT_UINT_UINT_MINIOBJECT) (marshal_data ? + marshal_data : cc->callback); + + callback (data1, + g_value_get_uint (param_values + 1), + g_value_get_uint (param_values + 2), + g_value_get_uint (param_values + 3), + g_value_get_uint (param_values + 4), + gst_value_get_mini_object (param_values + 5), data2); +} + + static void rtp_session_class_init (RTPSessionClass * klass) { @@ -242,6 +332,66 @@ rtp_session_class_init (RTPSessionClass * klass) NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); + /** + * RTPSession::on-sending-rtcp + * @session: the object which received the signal + * @buffer: the #GstBuffer containing the RTCP packet about to be sent + * @early: %TRUE if the packet is early, %FALSE if it is regular + * + * This signal is emitted before sending an RTCP packet, it can be used + * to add extra RTCP Packets. + * + * Returns: %TRUE if the RTCP buffer should NOT be suppressed, %FALSE + * if suppressing it is acceptable + */ + rtp_session_signals[SIGNAL_ON_SENDING_RTCP] = + g_signal_new ("on-sending-rtcp", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_rtcp), + accumulate_trues, NULL, gst_rtp_bin_marshal_BOOLEAN__MINIOBJECT_BOOLEAN, + G_TYPE_BOOLEAN, 2, GST_TYPE_BUFFER, G_TYPE_BOOLEAN); + + /** + * RTPSession::on-feedback-rtcp: + * @session: the object which received the signal + * @type: Type of RTCP packet, will be %GST_RTCP_TYPE_RTPFB or + * %GST_RTCP_TYPE_RTPFB + * @fbtype: The type of RTCP FB packet, probably part of #GstRTCPFBType + * @sender_ssrc: The SSRC of the sender + * @media_ssrc: The SSRC of the media this refers to + * @fci: a #GstBuffer with the FCI data from the FB packet or %NULL if + * there was no FCI + * + * Notify that a RTCP feedback packet has been received + */ + + rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP] = + g_signal_new ("on-feedback-rtcp", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_feedback_rtcp), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT_UINT_UINT_MINIOBJECT, + G_TYPE_NONE, 5, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT, + GST_TYPE_BUFFER); + + /** + * RTPSession::send-rtcp: + * @session: the object which received the signal + * @max_delay: The maximum delay after which the feedback will not be useful + * anymore + * + * Requests that the #RTPSession initiate a new RTCP packet as soon as + * possible within the requested delay. + */ + + rtp_session_signals[SIGNAL_SEND_RTCP] = + g_signal_new ("send-rtcp", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL, + gst_rtp_bin_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64); + + g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC, + g_param_spec_uint ("internal-ssrc", "Internal SSRC", + "The internal SSRC used for the session", + 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_INTERNAL_SOURCE, g_param_spec_object ("internal-source", "Internal Source", "The internal source element of the session", @@ -249,50 +399,38 @@ rtp_session_class_init (RTPSessionClass * klass) g_object_class_install_property (gobject_class, PROP_BANDWIDTH, g_param_spec_double ("bandwidth", "Bandwidth", - "The bandwidth of the session", + "The bandwidth of the session (0 for auto-discover)", 0.0, G_MAXDOUBLE, DEFAULT_BANDWIDTH, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTCP_FRACTION, g_param_spec_double ("rtcp-fraction", "RTCP Fraction", - "The fraction of the bandwidth used for RTCP", + "The fraction of the bandwidth used for RTCP (or as a real fraction of the RTP bandwidth if < 1)", 0.0, G_MAXDOUBLE, DEFAULT_RTCP_FRACTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SDES_CNAME, - g_param_spec_string ("sdes-cname", "SDES CNAME", - "The CNAME to put in SDES messages of this session", - DEFAULT_SDES_CNAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_SDES_NAME, - g_param_spec_string ("sdes-name", "SDES NAME", - "The NAME to put in SDES messages of this session", - DEFAULT_SDES_NAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_SDES_EMAIL, - g_param_spec_string ("sdes-email", "SDES EMAIL", - "The EMAIL to put in SDES messages of this session", - DEFAULT_SDES_EMAIL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_SDES_PHONE, - g_param_spec_string ("sdes-phone", "SDES PHONE", - "The PHONE to put in SDES messages of this session", - DEFAULT_SDES_PHONE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_SDES_LOCATION, - g_param_spec_string ("sdes-location", "SDES LOCATION", - "The LOCATION to put in SDES messages of this session", - DEFAULT_SDES_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_SDES_TOOL, - g_param_spec_string ("sdes-tool", "SDES TOOL", - "The TOOL to put in SDES messages of this session", - DEFAULT_SDES_TOOL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_SDES_NOTE, - g_param_spec_string ("sdes-note", "SDES NOTE", - "The NOTE to put in SDES messages of this session", - DEFAULT_SDES_NOTE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RTCP_RR_BANDWIDTH, + g_param_spec_int ("rtcp-rr-bandwidth", "RTCP RR bandwidth", + "The RTCP bandwidth used for receivers in bytes per second (-1 = default)", + -1, G_MAXINT, DEFAULT_RTCP_RR_BANDWIDTH, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RTCP_RS_BANDWIDTH, + g_param_spec_int ("rtcp-rs-bandwidth", "RTCP RS bandwidth", + "The RTCP bandwidth used for senders in bytes per second (-1 = default)", + -1, G_MAXINT, DEFAULT_RTCP_RS_BANDWIDTH, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RTCP_MTU, + g_param_spec_uint ("rtcp-mtu", "RTCP MTU", + "The maximum size of the RTCP packets", + 16, G_MAXINT16, DEFAULT_RTCP_MTU, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_SDES, + g_param_spec_boxed ("sdes", "SDES", + "The SDES items of this session", + GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_NUM_SOURCES, g_param_spec_uint ("num-sources", "Num Sources", @@ -335,8 +473,30 @@ rtp_session_class_init (RTPSessionClass * klass) "An array of all known sources in the session", G_TYPE_VALUE_ARRAY, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_FAVOR_NEW, + g_param_spec_boolean ("favor-new", "Favor new sources", + "Resolve SSRC conflict in favor of new sources", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RTCP_MIN_INTERVAL, + g_param_spec_uint64 ("rtcp-min-interval", "Minimum RTCP interval", + "Minimum interval between Regular RTCP packet (in ns)", + 0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_RTCP_FEEDBACK_RETENTION_WINDOW, + g_param_spec_uint64 ("rtcp-feedback-retention-window", + "RTCP Feedback retention window", + "Duration during which RTCP Feedback packets are retained (in ns)", + 0, G_MAXUINT64, DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + klass->get_source_by_ssrc = GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); + klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp); + klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp); GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session"); } @@ -361,15 +521,22 @@ rtp_session_init (RTPSession * sess) rtp_stats_init_defaults (&sess->stats); + sess->recalc_bandwidth = TRUE; + sess->bandwidth = DEFAULT_BANDWIDTH; + sess->rtcp_bandwidth = DEFAULT_RTCP_FRACTION; + sess->rtcp_rr_bandwidth = DEFAULT_RTCP_RR_BANDWIDTH; + sess->rtcp_rs_bandwidth = DEFAULT_RTCP_RS_BANDWIDTH; + /* create an active SSRC for this session manager */ sess->source = rtp_session_create_source (sess); sess->source->validated = TRUE; sess->source->internal = TRUE; sess->stats.active_sources++; + INIT_AVG (sess->stats.avg_rtcp_packet_size, 100); /* default UDP header length */ sess->header_len = 28; - sess->mtu = 1400; + sess->mtu = DEFAULT_RTCP_MTU; /* some default SDES entries */ str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ()); @@ -381,6 +548,10 @@ rtp_session_init (RTPSession * sess) rtp_source_set_sdes_string (sess->source, GST_RTCP_SDES_TOOL, "GStreamer"); sess->first_rtcp = TRUE; + sess->allow_early = TRUE; + sess->rtcp_feedback_retention_window = DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW; + + sess->rtcp_pli_requests = g_array_new (FALSE, FALSE, sizeof (guint32)); GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc); } @@ -402,6 +573,8 @@ rtp_session_finalize (GObject * object) g_hash_table_destroy (sess->cnames); g_object_unref (sess->source); + g_array_free (sess->rtcp_pli_requests, TRUE); + G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); } @@ -412,6 +585,7 @@ copy_source (gpointer key, RTPSource * source, GValueArray * arr) g_value_init (&value, RTP_TYPE_SOURCE); g_value_take_object (&value, source); + /* copies the value */ g_value_array_append (arr, &value); } @@ -443,39 +617,37 @@ rtp_session_set_property (GObject * object, guint prop_id, sess = RTP_SESSION (object); switch (prop_id) { + case PROP_INTERNAL_SSRC: + rtp_session_set_internal_ssrc (sess, g_value_get_uint (value)); + break; case PROP_BANDWIDTH: - rtp_session_set_bandwidth (sess, g_value_get_double (value)); + sess->bandwidth = g_value_get_double (value); + sess->recalc_bandwidth = TRUE; break; case PROP_RTCP_FRACTION: - rtp_session_set_rtcp_fraction (sess, g_value_get_double (value)); - break; - case PROP_SDES_CNAME: - rtp_session_set_sdes_string (sess, GST_RTCP_SDES_CNAME, - g_value_get_string (value)); + sess->rtcp_bandwidth = g_value_get_double (value); + sess->recalc_bandwidth = TRUE; break; - case PROP_SDES_NAME: - rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NAME, - g_value_get_string (value)); + case PROP_RTCP_RR_BANDWIDTH: + sess->rtcp_rr_bandwidth = g_value_get_int (value); + sess->recalc_bandwidth = TRUE; break; - case PROP_SDES_EMAIL: - rtp_session_set_sdes_string (sess, GST_RTCP_SDES_EMAIL, - g_value_get_string (value)); + case PROP_RTCP_RS_BANDWIDTH: + sess->rtcp_rs_bandwidth = g_value_get_int (value); + sess->recalc_bandwidth = TRUE; break; - case PROP_SDES_PHONE: - rtp_session_set_sdes_string (sess, GST_RTCP_SDES_PHONE, - g_value_get_string (value)); + case PROP_RTCP_MTU: + sess->mtu = g_value_get_uint (value); break; - case PROP_SDES_LOCATION: - rtp_session_set_sdes_string (sess, GST_RTCP_SDES_LOC, - g_value_get_string (value)); + case PROP_SDES: + rtp_session_set_sdes_struct (sess, g_value_get_boxed (value)); break; - case PROP_SDES_TOOL: - rtp_session_set_sdes_string (sess, GST_RTCP_SDES_TOOL, - g_value_get_string (value)); + case PROP_FAVOR_NEW: + sess->favor_new = g_value_get_boolean (value); break; - case PROP_SDES_NOTE: - rtp_session_set_sdes_string (sess, GST_RTCP_SDES_NOTE, - g_value_get_string (value)); + case PROP_RTCP_MIN_INTERVAL: + rtp_stats_set_min_interval (&sess->stats, + (gdouble) g_value_get_uint64 (value) / GST_SECOND); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -492,42 +664,29 @@ rtp_session_get_property (GObject * object, guint prop_id, sess = RTP_SESSION (object); switch (prop_id) { + case PROP_INTERNAL_SSRC: + g_value_set_uint (value, rtp_session_get_internal_ssrc (sess)); + break; case PROP_INTERNAL_SOURCE: g_value_take_object (value, rtp_session_get_internal_source (sess)); break; case PROP_BANDWIDTH: - g_value_set_double (value, rtp_session_get_bandwidth (sess)); + g_value_set_double (value, sess->bandwidth); break; case PROP_RTCP_FRACTION: - g_value_set_double (value, rtp_session_get_rtcp_fraction (sess)); - break; - case PROP_SDES_CNAME: - g_value_take_string (value, rtp_session_get_sdes_string (sess, - GST_RTCP_SDES_CNAME)); - break; - case PROP_SDES_NAME: - g_value_take_string (value, rtp_session_get_sdes_string (sess, - GST_RTCP_SDES_NAME)); + g_value_set_double (value, sess->rtcp_bandwidth); break; - case PROP_SDES_EMAIL: - g_value_take_string (value, rtp_session_get_sdes_string (sess, - GST_RTCP_SDES_EMAIL)); + case PROP_RTCP_RR_BANDWIDTH: + g_value_set_int (value, sess->rtcp_rr_bandwidth); break; - case PROP_SDES_PHONE: - g_value_take_string (value, rtp_session_get_sdes_string (sess, - GST_RTCP_SDES_PHONE)); + case PROP_RTCP_RS_BANDWIDTH: + g_value_set_int (value, sess->rtcp_rs_bandwidth); break; - case PROP_SDES_LOCATION: - g_value_take_string (value, rtp_session_get_sdes_string (sess, - GST_RTCP_SDES_LOC)); + case PROP_RTCP_MTU: + g_value_set_uint (value, sess->mtu); break; - case PROP_SDES_TOOL: - g_value_take_string (value, rtp_session_get_sdes_string (sess, - GST_RTCP_SDES_TOOL)); - break; - case PROP_SDES_NOTE: - g_value_take_string (value, rtp_session_get_sdes_string (sess, - GST_RTCP_SDES_NOTE)); + case PROP_SDES: + g_value_take_boxed (value, rtp_session_get_sdes_struct (sess)); break; case PROP_NUM_SOURCES: g_value_set_uint (value, rtp_session_get_num_sources (sess)); @@ -538,6 +697,12 @@ rtp_session_get_property (GObject * object, guint prop_id, case PROP_SOURCES: g_value_take_boxed (value, rtp_session_create_sources (sess)); break; + case PROP_FAVOR_NEW: + g_value_set_boolean (value, sess->favor_new); + break; + case PROP_RTCP_MIN_INTERVAL: + g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -693,6 +858,14 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.reconsider = callbacks->reconsider; sess->reconsider_user_data = user_data; } + if (callbacks->request_key_unit) { + sess->callbacks.request_key_unit = callbacks->request_key_unit; + sess->request_key_unit_user_data = user_data; + } + if (callbacks->request_time) { + sess->callbacks.request_time = callbacks->request_time; + sess->request_time_user_data = user_data; + } } /** @@ -804,6 +977,24 @@ rtp_session_set_reconsider_callback (RTPSession * sess, } /** + * rtp_session_set_request_time_callback: + * @sess: an #RTPSession + * @callback: callback to set + * @user_data: user data passed in the callback + * + * Configure only the request_time callback + */ +void +rtp_session_set_request_time_callback (RTPSession * sess, + RTPSessionRequestTime callback, gpointer user_data) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + sess->callbacks.request_time = callback; + sess->request_time_user_data = user_data; +} + +/** * rtp_session_set_bandwidth: * @sess: an #RTPSession * @bandwidth: the bandwidth allocated @@ -847,7 +1038,7 @@ rtp_session_get_bandwidth (RTPSession * sess) * @sess: an #RTPSession * @bandwidth: the RTCP bandwidth * - * Set the bandwidth that should be used for RTCP + * Set the bandwidth in bytes per second that should be used for RTCP * messages. */ void @@ -886,9 +1077,9 @@ rtp_session_get_rtcp_fraction (RTPSession * sess) * rtp_session_set_sdes_string: * @sess: an #RTPSession * @type: the type of the SDES item - * @item: a null-terminated string to set. + * @item: a null-terminated string to set. * - * Store an SDES item of @type in @sess. + * Store an SDES item of @type in @sess. * * Returns: %FALSE if the data was unchanged @type is invalid. */ @@ -912,7 +1103,7 @@ rtp_session_set_sdes_string (RTPSession * sess, GstRTCPSDESType type, * @sess: an #RTPSession * @type: the type of the SDES item * - * Get the SDES item of @type from @sess. + * Get the SDES item of @type from @sess. * * Returns: a null-terminated copy of the SDES item or NULL when @type was not * valid. g_free() after usage. @@ -931,8 +1122,52 @@ rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type) return result; } +/** + * rtp_session_get_sdes_struct: + * @sess: an #RTSPSession + * + * Get the SDES data as a #GstStructure + * + * Returns: a GstStructure with SDES items for @sess. This function returns a + * copy of the SDES structure, use gst_structure_free() after usage. + */ +GstStructure * +rtp_session_get_sdes_struct (RTPSession * sess) +{ + const GstStructure *sdes; + GstStructure *result = NULL; + + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + RTP_SESSION_LOCK (sess); + sdes = rtp_source_get_sdes_struct (sess->source); + if (sdes) + result = gst_structure_copy (sdes); + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_set_sdes_struct: + * @sess: an #RTSPSession + * @sdes: a #GstStructure + * + * Set the SDES data as a #GstStructure. This function makes a copy of @sdes. + */ +void +rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes) +{ + g_return_if_fail (sdes); + g_return_if_fail (RTP_IS_SESSION (sess)); + + RTP_SESSION_LOCK (sess); + rtp_source_set_sdes_struct (sess->source, gst_structure_copy (sdes)); + RTP_SESSION_UNLOCK (sess); +} + static GstFlowReturn -source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) +source_push_rtp (RTPSource * source, gpointer data, RTPSession * session) { GstFlowReturn result = GST_FLOW_OK; @@ -943,21 +1178,21 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) if (session->callbacks.send_rtp) result = - session->callbacks.send_rtp (session, source, buffer, + session->callbacks.send_rtp (session, source, data, session->send_rtp_user_data); - else - gst_buffer_unref (buffer); - + else { + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + } } else { GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc); RTP_SESSION_UNLOCK (session); if (session->callbacks.process_rtp) result = - session->callbacks.process_rtp (session, source, buffer, - session->process_rtp_user_data); + session->callbacks.process_rtp (session, source, + GST_BUFFER_CAST (data), session->process_rtp_user_data); else - gst_buffer_unref (buffer); + gst_buffer_unref (GST_BUFFER_CAST (data)); } RTP_SESSION_LOCK (session); @@ -990,44 +1225,6 @@ static RTPSourceCallbacks callbacks = { (RTPSourceClockRate) source_clock_rate, }; -/** - * find_add_conflicting_addresses: - * @sess: The session to check in - * @arrival: The arrival stats for the buffer - * - * Checks if an address which has a conflict is already known, - * otherwise remembers it to prevent loops. - * - * Returns: TRUE if it was a known conflict, FALSE otherwise - */ - -static gboolean -find_add_conflicting_addresses (RTPSession * sess, RTPArrivalStats * arrival) -{ - GList *item; - RTPConflictingAddress *new_conflict; - - for (item = g_list_first (sess->conflicting_addresses); - item; item = g_list_next (item)) { - RTPConflictingAddress *known_conflict = item->data; - - if (gst_netaddress_equal (&arrival->address, &known_conflict->address)) { - known_conflict->time = arrival->time; - return TRUE; - } - } - - new_conflict = g_new0 (RTPConflictingAddress, 1); - - memcpy (&new_conflict->address, &arrival->address, sizeof (GstNetAddress)); - new_conflict->time = arrival->time; - - sess->conflicting_addresses = g_list_prepend (sess->conflicting_addresses, - new_conflict); - - return FALSE; -} - static gboolean check_collision (RTPSession * sess, RTPSource * source, RTPArrivalStats * arrival, gboolean rtp) @@ -1037,42 +1234,99 @@ check_collision (RTPSession * sess, RTPSource * source, return FALSE; if (sess->source != source) { + GstNetAddress *from; + gboolean have_from; + /* This is not our local source, but lets check if two remote * source collide */ + if (rtp) { - if (source->have_rtp_from) { - if (gst_netaddress_equal (&source->rtp_from, &arrival->address)) - /* Address is the same */ - return FALSE; - } else { - /* We don't already have a from address for RTP, just set it */ - rtp_source_set_rtp_from (source, &arrival->address); + from = &source->rtp_from; + have_from = source->have_rtp_from; + } else { + from = &source->rtcp_from; + have_from = source->have_rtcp_from; + } + + if (have_from) { + if (gst_netaddress_equal (from, &arrival->address)) { + /* Address is the same */ return FALSE; + } else { + GST_LOG ("we have a third-party collision or loop ssrc:%x", + rtp_source_get_ssrc (source)); + if (sess->favor_new) { + if (rtp_source_find_conflicting_address (source, + &arrival->address, arrival->current_time)) { + gchar buf1[40]; + gst_netaddress_to_string (&arrival->address, buf1, 40); + GST_LOG ("Known conflict on %x for %s, dropping packet", + rtp_source_get_ssrc (source), buf1); + return TRUE; + } else { + gchar buf1[40], buf2[40]; + + /* Current address is not a known conflict, lets assume this is + * a new source. Save old address in possible conflict list + */ + rtp_source_add_conflicting_address (source, from, + arrival->current_time); + + gst_netaddress_to_string (from, buf1, 40); + gst_netaddress_to_string (&arrival->address, buf2, 40); + GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s," + " saving old as known conflict", + rtp_source_get_ssrc (source), buf1, buf2); + + if (rtp) + rtp_source_set_rtp_from (source, &arrival->address); + else + rtp_source_set_rtcp_from (source, &arrival->address); + return FALSE; + } + } else { + /* Don't need to save old addresses, we ignore new sources */ + return TRUE; + } } } else { - if (source->have_rtcp_from) { - if (gst_netaddress_equal (&source->rtcp_from, &arrival->address)) - /* Address is the same */ - return FALSE; - } else { - /* We don't already have a from address for RTCP, just set it */ + /* We don't already have a from address for RTP, just set it */ + if (rtp) + rtp_source_set_rtp_from (source, &arrival->address); + else rtp_source_set_rtcp_from (source, &arrival->address); - return FALSE; - } + return FALSE; } - /* We received RTP or RTCP from this source before but the network address - * changed. In this case, we have third-party collision or loop */ - GST_DEBUG ("we have a third-party collision or loop"); /* FIXME: Log 3rd party collision somehow * Maybe should be done in upper layer, only the SDES can tell us * if its a collision or a loop */ + + /* If the source has been inactive for some time, we assume that it has + * simply changed its transport source address. Hence, there is no true + * third-party collision - only a simulated one. */ + if (arrival->current_time > source->last_activity) { + GstClockTime inactivity_period = + arrival->current_time - source->last_activity; + if (inactivity_period > 1 * GST_SECOND) { + /* Use new network address */ + if (rtp) { + g_assert (source->have_rtp_from); + rtp_source_set_rtp_from (source, &arrival->address); + } else { + g_assert (source->have_rtcp_from); + rtp_source_set_rtcp_from (source, &arrival->address); + } + return FALSE; + } + } } else { /* This is sending with our ssrc, is it an address we already know */ - if (find_add_conflicting_addresses (sess, arrival)) { + if (rtp_source_find_conflicting_address (source, &arrival->address, + arrival->current_time)) { /* Its a known conflict, its probably a loop, not a collision * lets just drop the incoming packet */ @@ -1080,10 +1334,14 @@ check_collision (RTPSession * sess, RTPSource * source, } else { /* Its a new collision, lets change our SSRC */ + rtp_source_add_conflicting_address (source, &arrival->address, + arrival->current_time); + GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source)); on_ssrc_collision (sess, source); - rtp_session_send_bye_locked (sess, "SSRC Collision", arrival->time); + rtp_session_schedule_bye_locked (sess, "SSRC Collision", + arrival->current_time); sess->change_ssrc = TRUE; } @@ -1093,7 +1351,8 @@ check_collision (RTPSession * sess, RTPSource * source, } -/* must be called with the session lock */ +/* must be called with the session lock, the returned source needs to be + * unreffed after usage. */ static RTPSource * obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, RTPArrivalStats * arrival, gboolean rtp) @@ -1139,9 +1398,10 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, } } /* update last activity */ - source->last_activity = arrival->time; + source->last_activity = arrival->current_time; if (rtp) - source->last_rtp_activity = arrival->time; + source->last_rtp_activity = arrival->current_time; + g_object_ref (source); return source; } @@ -1177,15 +1437,24 @@ void rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc) { RTP_SESSION_LOCK (sess); - g_hash_table_steal (sess->ssrcs[sess->mask_idx], - GINT_TO_POINTER (sess->source->ssrc)); + if (ssrc != sess->source->ssrc) { + g_hash_table_steal (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (sess->source->ssrc)); - sess->source->ssrc = ssrc; - rtp_source_reset (sess->source); + GST_DEBUG ("setting internal SSRC to %08x", ssrc); + /* After this call, any receiver of the old SSRC either in RTP or RTCP + * packets will timeout on the old SSRC, we could potentially schedule a + * BYE RTCP for the old SSRC... */ + sess->source->ssrc = ssrc; + rtp_source_reset (sess->source); - g_hash_table_insert (sess->ssrcs[sess->mask_idx], - GINT_TO_POINTER (sess->source->ssrc), sess->source); + /* rehash with the new SSRC */ + g_hash_table_insert (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (sess->source->ssrc), sess->source); + } RTP_SESSION_UNLOCK (sess); + + g_object_notify (G_OBJECT (sess), "internal-ssrc"); } /** @@ -1194,7 +1463,7 @@ rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc) * * Get the internal SSRC of @sess. * - * Returns: The SSRC of the session. + * Returns: The SSRC of the session. */ guint32 rtp_session_get_internal_ssrc (RTPSession * sess) @@ -1342,6 +1611,7 @@ rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname) return result; } +/* should be called with the SESSION lock */ static guint32 rtp_session_create_new_ssrc (RTPSession * sess) { @@ -1355,7 +1625,6 @@ rtp_session_create_new_ssrc (RTPSession * sess) GINT_TO_POINTER (ssrc)) == NULL) break; } - return ssrc; } @@ -1378,8 +1647,9 @@ rtp_session_create_source (RTPSession * sess) RTP_SESSION_LOCK (sess); ssrc = rtp_session_create_new_ssrc (sess); source = rtp_source_new (ssrc); - g_object_ref (source); rtp_source_set_callbacks (source, &callbacks, sess); + /* we need an additional ref for the source in the hashtable */ + g_object_ref (source); g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc), source); /* we have one more source now */ @@ -1399,8 +1669,10 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, gboolean rtp, GstBuffer * buffer, GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) { + GstMetaNetAddress *meta; + /* get time of arrival */ - arrival->time = current_time; + arrival->current_time = current_time; arrival->running_time = running_time; arrival->ntpnstime = ntpnstime; @@ -1414,11 +1686,12 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, } /* for netbuffer we can store the IP address to check for collisions */ - arrival->have_address = GST_IS_NETBUFFER (buffer); - if (arrival->have_address) { - GstNetBuffer *netbuf = (GstNetBuffer *) buffer; - - memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress)); + meta = gst_buffer_get_meta_net_address (buffer); + if (meta) { + arrival->have_address = TRUE; + memcpy (&arrival->address, &meta->naddr, sizeof (GstNetAddress)); + } else { + arrival->have_address = FALSE; } } @@ -1427,7 +1700,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, * @sess: and #RTPSession * @buffer: an RTP buffer * @current_time: the current system time - * @ntpnstime: the NTP arrival time in nanoseconds + * @running_time: the running_time of @buffer * * Process an RTP buffer in the session manager. This function takes ownership * of @buffer. @@ -1436,7 +1709,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, */ GstFlowReturn rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, - GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) + GstClockTime current_time, GstClockTime running_time) { GstFlowReturn result; guint32 ssrc; @@ -1444,6 +1717,9 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, gboolean created; gboolean prevsender, prevactive; RTPArrivalStats arrival; + guint32 csrcs[16]; + guint8 i, count; + guint64 oldrate; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1454,7 +1730,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, RTP_SESSION_LOCK (sess); /* update arrival stats */ update_arrival_stats (sess, &arrival, TRUE, buffer, current_time, - running_time, ntpnstime); + running_time, -1); /* ignore more RTP packets when we left the session */ if (sess->source->received_bye) @@ -1463,15 +1739,21 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, /* get SSRC and look up in session database */ ssrc = gst_rtp_buffer_get_ssrc (buffer); source = obtain_source (sess, ssrc, &created, &arrival, TRUE); - if (!source) goto collision; prevsender = RTP_SOURCE_IS_SENDER (source); prevactive = RTP_SOURCE_IS_ACTIVE (source); + oldrate = source->bitrate; + + /* copy available csrc for later */ + count = gst_rtp_buffer_get_csrc_count (buffer); + /* make sure to not overflow our array. An RTP buffer can maximally contain + * 16 CSRCs */ + count = MIN (count, 16); - /* we need to ref so that we can process the CSRCs later */ - gst_buffer_ref (buffer); + for (i = 0; i < count; i++) + csrcs[i] = gst_rtp_buffer_get_csrc (buffer, i); /* let source process the packet */ result = rtp_source_process_rtp (source, buffer, &arrival); @@ -1488,36 +1770,38 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc, sess->stats.sender_sources); } + if (oldrate != source->bitrate) + sess->recalc_bandwidth = TRUE; if (created) on_new_ssrc (sess, source); if (source->validated) { - guint8 i, count; gboolean created; /* for validated sources, we add the CSRCs as well */ - count = gst_rtp_buffer_get_csrc_count (buffer); - for (i = 0; i < count; i++) { guint32 csrc; RTPSource *csrc_src; - csrc = gst_rtp_buffer_get_csrc (buffer, i); + csrc = csrcs[i]; /* get source */ csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE); + if (!csrc_src) + continue; if (created) { GST_DEBUG ("created new CSRC: %08x", csrc); rtp_source_set_as_csrc (csrc_src); if (RTP_SOURCE_IS_ACTIVE (csrc_src)) sess->stats.active_sources++; - on_new_ssrc (sess, source); + on_new_ssrc (sess, csrc_src); } + g_object_unref (csrc_src); } } - gst_buffer_unref (buffer); + g_object_unref (source); RTP_SESSION_UNLOCK (sess); @@ -1567,12 +1851,11 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source, /* only deal with report blocks for our session, we update the stats of * the sender of the RTCP message. We could also compare our stats against * the other sender to see if we are better or worse. */ - rtp_source_process_rb (source, arrival->time, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); - - on_ssrc_active (sess, source); + rtp_source_process_rb (source, arrival->ntpnstime, fractionlost, + packetslost, exthighestseq, jitter, lsr, dlsr); } } + on_ssrc_active (sess, source); } /* A Sender report contains statistics about how the sender is doing. This @@ -1586,7 +1869,7 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source, */ static void rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, - RTPArrivalStats * arrival) + RTPArrivalStats * arrival, gboolean * do_sync) { guint32 senderssrc, rtptime, packet_count, octet_count; guint64 ntptime; @@ -1597,18 +1880,23 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, &packet_count, &octet_count); GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT, - senderssrc, GST_TIME_ARGS (arrival->time)); + senderssrc, GST_TIME_ARGS (arrival->current_time)); source = obtain_source (sess, senderssrc, &created, arrival, FALSE); - if (!source) return; + /* don't try to do lip-sync for sources that sent a BYE */ + if (rtp_source_received_bye (source)) + *do_sync = FALSE; + else + *do_sync = TRUE; + prevsender = RTP_SOURCE_IS_SENDER (source); /* first update the source */ - rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count, - octet_count); + rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime, + packet_count, octet_count); if (prevsender != RTP_SOURCE_IS_SENDER (source)) { sess->stats.sender_sources++; @@ -1620,6 +1908,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, on_new_ssrc (sess, source); rtp_session_process_rb (sess, source, packet, arrival); + g_object_unref (source); } /* A receiver report contains statistics about how a receiver is doing. It @@ -1641,7 +1930,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, GST_DEBUG ("got RR packet: SSRC %08x", senderssrc); source = obtain_source (sess, senderssrc, &created, arrival, FALSE); - if (!source) return; @@ -1649,6 +1937,7 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, on_new_ssrc (sess, source); rtp_session_process_rb (sess, source, packet, arrival); + g_object_unref (source); } /* Get SDES items and store them in the SSRC */ @@ -1666,45 +1955,77 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, i = 0; while (more_items) { guint32 ssrc; - gboolean changed, created; + gboolean changed, created, validated; RTPSource *source; + GstStructure *sdes; ssrc = gst_rtcp_packet_sdes_get_ssrc (packet); GST_DEBUG ("item %d, SSRC %08x", i, ssrc); - /* find src, no probation when dealing with RTCP */ - source = obtain_source (sess, ssrc, &created, arrival, FALSE); changed = FALSE; + /* find src, no probation when dealing with RTCP */ + source = obtain_source (sess, ssrc, &created, arrival, FALSE); if (!source) return; + sdes = gst_structure_new ("application/x-rtp-source-sdes", NULL); + more_entries = gst_rtcp_packet_sdes_first_entry (packet); j = 0; while (more_entries) { GstRTCPSDESType type; guint8 len; guint8 *data; + gchar *name; + gchar *value; gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data); GST_DEBUG ("entry %d, type %d, len %d, data %.*s", j, type, len, len, data); - changed |= rtp_source_set_sdes (source, type, data, len); + if (type == GST_RTCP_SDES_PRIV) { + name = g_strndup ((const gchar *) &data[1], data[0]); + len -= data[0] + 1; + data += data[0] + 1; + } else { + name = g_strdup (gst_rtcp_sdes_type_to_name (type)); + } + + value = g_strndup ((const gchar *) data, len); + + gst_structure_set (sdes, name, G_TYPE_STRING, value, NULL); + + g_free (name); + g_free (value); more_entries = gst_rtcp_packet_sdes_next_entry (packet); j++; } + /* takes ownership of sdes */ + changed = rtp_source_set_sdes_struct (source, sdes); + + validated = !RTP_SOURCE_IS_ACTIVE (source); source->validated = TRUE; + /* source became active */ + if (validated) { + sess->stats.active_sources++; + GST_DEBUG ("source: %08x became active, %d active sources", ssrc, + sess->stats.active_sources); + on_ssrc_validated (sess, source); + } + if (created) on_new_ssrc (sess, source); if (changed) on_ssrc_sdes (sess, source); + g_object_unref (source); + more_items = gst_rtcp_packet_sdes_next_item (packet); i++; } @@ -1718,6 +2039,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, { guint count, i; gchar *reason; + gboolean reconsider = FALSE; reason = gst_rtcp_packet_bye_get_reason (packet); GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason)); @@ -1732,14 +2054,16 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i); GST_DEBUG ("SSRC: %08x", ssrc); + if (ssrc == sess->source->ssrc) + return; + /* find src and mark bye, no probation when dealing with RTCP */ source = obtain_source (sess, ssrc, &created, arrival, FALSE); - if (!source) return; /* store time for when we need to time out this source */ - source->bye_time = arrival->time; + source->bye_time = arrival->current_time; prevactive = RTP_SOURCE_IS_ACTIVE (source); prevsender = RTP_SOURCE_IS_SENDER (source); @@ -1765,23 +2089,21 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, /* some members went away since the previous timeout estimate. * Perform reverse reconsideration but only when we are not scheduling a * BYE ourselves. */ - if (arrival->time < sess->next_rtcp_check_time) { + if (arrival->current_time < sess->next_rtcp_check_time) { GstClockTime time_remaining; - time_remaining = sess->next_rtcp_check_time - arrival->time; + time_remaining = sess->next_rtcp_check_time - arrival->current_time; sess->next_rtcp_check_time = gst_util_uint64_scale (time_remaining, members, pmembers); GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time)); - sess->next_rtcp_check_time += arrival->time; + sess->next_rtcp_check_time += arrival->current_time; - RTP_SESSION_UNLOCK (sess); - /* notify app of reconsideration */ - if (sess->callbacks.reconsider) - sess->callbacks.reconsider (sess, sess->reconsider_user_data); - RTP_SESSION_LOCK (sess); + /* mark pending reconsider. We only want to signal the reconsideration + * once after we handled all the source in the bye packet */ + reconsider = TRUE; } } @@ -1789,6 +2111,15 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, on_new_ssrc (sess, source); on_bye_ssrc (sess, source); + + g_object_unref (source); + } + if (reconsider) { + RTP_SESSION_UNLOCK (sess); + /* notify app of reconsideration */ + if (sess->callbacks.reconsider) + sess->callbacks.reconsider (sess, sess->reconsider_user_data); + RTP_SESSION_LOCK (sess); } g_free (reason); } @@ -1800,11 +2131,113 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, GST_DEBUG ("received APP"); } +static void +rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc, + guint32 media_ssrc, GstClockTime current_time) +{ + RTPSource *src; + guint32 round_trip = 0; + + if (!sess->callbacks.request_key_unit) + return; + + src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (sender_ssrc)); + + if (!src) + return; + + if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && + rtp_source_get_last_rb (src, NULL, NULL, NULL, NULL, NULL, NULL, + &round_trip)) { + GstClockTime round_trip_in_ns = gst_util_uint64_scale (round_trip, + GST_SECOND, 65536); + + if (sess->last_keyframe_request != GST_CLOCK_TIME_NONE && + current_time - sess->last_keyframe_request < round_trip_in_ns) { + GST_DEBUG ("Ignoring PLI because one was send without one RTT (%" + GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")", + GST_TIME_ARGS (current_time - sess->last_keyframe_request), + GST_TIME_ARGS (round_trip_in_ns));; + return; + } + } + + sess->last_keyframe_request = current_time; + + GST_LOG ("received PLI from %X %p(%p)", sender_ssrc, + sess->callbacks.process_rtp, sess->callbacks.request_key_unit); + + sess->callbacks.request_key_unit (sess, FALSE, + sess->request_key_unit_user_data); +} + +static void +rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, + RTPArrivalStats * arrival, GstClockTime current_time) +{ + GstRTCPType type = gst_rtcp_packet_get_type (packet); + GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet); + guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet); + guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet); + guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet); + guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet); + + GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of " + "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length); + + if (g_signal_has_handler_pending (sess, + rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, TRUE)) { + GstBuffer *fci_buffer = NULL; + + if (fci_length > 0) { + fci_buffer = gst_buffer_create_sub (packet->buffer, + fci_data - GST_BUFFER_DATA (packet->buffer), fci_length); + GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time; + } + + RTP_SESSION_UNLOCK (sess); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_FEEDBACK_RTCP], 0, + type, fbtype, sender_ssrc, media_ssrc, fci_buffer); + RTP_SESSION_LOCK (sess); + + if (fci_buffer) + gst_buffer_unref (fci_buffer); + } + + if (sess->rtcp_feedback_retention_window) { + RTPSource *src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (media_ssrc)); + + if (src) + rtp_source_retain_rtcp_packet (src, packet, arrival->running_time); + } + + if (rtp_source_get_ssrc (sess->source) == media_ssrc) { + switch (type) { + case GST_RTCP_TYPE_PSFB: + switch (fbtype) { + case GST_RTCP_PSFB_TYPE_PLI: + rtp_session_process_pli (sess, sender_ssrc, media_ssrc, + current_time); + break; + default: + break; + } + break; + case GST_RTCP_TYPE_RTPFB: + default: + break; + } + } +} + /** * rtp_session_process_rtcp: * @sess: and #RTPSession * @buffer: an RTCP buffer * @current_time: the current system time + * @ntpnstime: the current NTP time in nanoseconds * * Process an RTCP buffer in the session manager. This function takes ownership * of @buffer. @@ -1813,10 +2246,10 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, */ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, - GstClockTime current_time) + GstClockTime current_time, guint64 ntpnstime) { GstRTCPPacket packet; - gboolean more, is_bye = FALSE, is_sr = FALSE; + gboolean more, is_bye = FALSE, do_sync = FALSE; RTPArrivalStats arrival; GstFlowReturn result = GST_FLOW_OK; @@ -1830,14 +2263,12 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, RTP_SESSION_LOCK (sess); /* update arrival stats */ - update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1, -1); + update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1, + ntpnstime); if (sess->sent_bye) goto ignore; - /* make writable, we might want to change the buffer */ - buffer = gst_buffer_make_metadata_writable (buffer); - /* start processing the compound packet */ more = gst_rtcp_buffer_get_first_packet (buffer, &packet); while (more) { @@ -1853,8 +2284,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, switch (type) { case GST_RTCP_TYPE_SR: - rtp_session_process_sr (sess, &packet, &arrival); - is_sr = TRUE; + rtp_session_process_sr (sess, &packet, &arrival, &do_sync); break; case GST_RTCP_TYPE_RR: rtp_session_process_rr (sess, &packet, &arrival); @@ -1864,11 +2294,17 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, break; case GST_RTCP_TYPE_BYE: is_bye = TRUE; + /* don't try to attempt lip-sync anymore for streams with a BYE */ + do_sync = FALSE; rtp_session_process_bye (sess, &packet, &arrival); break; case GST_RTCP_TYPE_APP: rtp_session_process_app (sess, &packet, &arrival); break; + case GST_RTCP_TYPE_RTPFB: + case GST_RTCP_TYPE_PSFB: + rtp_session_process_feedback (sess, &packet, &arrival, current_time); + break; default: GST_WARNING ("got unknown RTCP packet"); break; @@ -1888,13 +2324,18 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, /* keep track of average packet size */ UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes); } + GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats, + sess->stats.avg_rtcp_packet_size, arrival.bytes); RTP_SESSION_UNLOCK (sess); /* notify caller of sr packets in the callback */ - if (is_sr && sess->callbacks.sync_rtcp) + if (do_sync && sess->callbacks.sync_rtcp) { + /* make writable, we might want to change the buffer */ + buffer = gst_buffer_make_metadata_writable (buffer); + result = sess->callbacks.sync_rtcp (sess, sess->source, buffer, sess->sync_rtcp_user_data); - else + } else gst_buffer_unref (buffer); return result; @@ -1918,10 +2359,10 @@ ignore: /** * rtp_session_send_rtp: * @sess: an #RTPSession - * @buffer: an RTP buffer + * @data: pointer to either an RTP buffer or a list of RTP buffers + * @is_list: TRUE when @data is a buffer list * @current_time: the current system time - * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured. - * This is the buffer timestamp converted to NTP time. + * @running_time: the running time of @data * * Send the RTP buffer in the session manager. This function takes ownership of * @buffer. @@ -1929,20 +2370,28 @@ ignore: * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, - GstClockTime current_time, guint64 ntpnstime) +rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, + GstClockTime current_time, GstClockTime running_time) { GstFlowReturn result; RTPSource *source; gboolean prevsender; + gboolean valid_packet; + guint64 oldrate; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR); - if (!gst_rtp_buffer_validate (buffer)) + if (is_list) { + valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data)); + } else { + valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data)); + } + + if (!valid_packet) goto invalid_packet; - GST_LOG ("received RTP packet for sending"); + GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet"); RTP_SESSION_LOCK (sess); source = sess->source; @@ -1951,12 +2400,15 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, source->last_rtp_activity = current_time; prevsender = RTP_SOURCE_IS_SENDER (source); + oldrate = source->bitrate; /* we use our own source to send */ - result = rtp_source_send_rtp (source, buffer, ntpnstime); + result = rtp_source_send_rtp (source, data, is_list, running_time); if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; + if (oldrate != source->bitrate) + sess->recalc_bandwidth = TRUE; RTP_SESSION_UNLOCK (sess); return result; @@ -1964,18 +2416,46 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, /* ERRORS */ invalid_packet: { - gst_buffer_unref (buffer); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } } +static void +add_bitrates (gpointer key, RTPSource * source, gdouble * bandwidth) +{ + *bandwidth += source->bitrate; +} + static GstClockTime calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, gboolean first) { GstClockTime result; + /* recalculate bandwidth when it changed */ + if (sess->recalc_bandwidth) { + gdouble bandwidth; + + if (sess->bandwidth > 0) + bandwidth = sess->bandwidth; + else { + /* If it is <= 0, then try to estimate the actual bandwidth */ + bandwidth = sess->source->bitrate; + + g_hash_table_foreach (sess->cnames, (GHFunc) add_bitrates, &bandwidth); + bandwidth /= 8.0; + } + if (bandwidth == 0) + bandwidth = RTP_STATS_BANDWIDTH; + + rtp_stats_set_bandwidths (&sess->stats, bandwidth, + sess->rtcp_bandwidth, sess->rtcp_rs_bandwidth, sess->rtcp_rr_bandwidth); + + sess->recalc_bandwidth = FALSE; + } + if (sess->source->received_bye) { result = rtp_stats_calculate_bye_interval (&sess->stats); } else { @@ -1986,7 +2466,7 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d", GST_TIME_ARGS (result), first); - if (!deterministic) + if (!deterministic && result != GST_CLOCK_TIME_NONE) result = rtp_stats_add_rtcp_jitter (&sess->stats, result); GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result)); @@ -1994,19 +2474,11 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, return result; } -/** - * rtp_session_send_bye_locked: - * @sess: an #RTPSession - * @reason: a reason or NULL - * - * Stop the current @sess and schedule a BYE message for the other members. - * +/* Stop the current @sess and schedule a BYE message for the other members. * One must have the session lock to call this function - * - * Returns: a #GstFlowReturn. */ static GstFlowReturn -rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason, +rtp_session_schedule_bye_locked (RTPSession * sess, const gchar * reason, GstClockTime current_time) { GstFlowReturn result = GST_FLOW_OK; @@ -2026,10 +2498,11 @@ rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason, /* at least one member wants to send a BYE */ g_free (sess->bye_reason); sess->bye_reason = g_strdup (reason); - sess->stats.avg_rtcp_packet_size = 100; + INIT_AVG (sess->stats.avg_rtcp_packet_size, 100); sess->stats.bye_members = 1; sess->first_rtcp = TRUE; sess->sent_bye = FALSE; + sess->allow_early = TRUE; /* reschedule transmission */ sess->last_rtcp_send_time = current_time; @@ -2050,19 +2523,17 @@ done: } /** - * rtp_session_send_bye: + * rtp_session_schedule_bye: * @sess: an #RTPSession * @reason: a reason or NULL * @current_time: the current system time * * Stop the current @sess and schedule a BYE message for the other members. * - * One must have the session lock to call this function - * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_send_bye (RTPSession * sess, const gchar * reason, +rtp_session_schedule_bye (RTPSession * sess, const gchar * reason, GstClockTime current_time) { GstFlowReturn result = GST_FLOW_OK; @@ -2070,7 +2541,7 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason, g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); RTP_SESSION_LOCK (sess); - result = rtp_session_send_bye_locked (sess, reason, current_time); + result = rtp_session_schedule_bye_locked (sess, reason, current_time); RTP_SESSION_UNLOCK (sess); return result; @@ -2089,12 +2560,17 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason, GstClockTime rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time) { - GstClockTime result; + GstClockTime result, interval = 0; - g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_CLOCK_TIME_NONE); RTP_SESSION_LOCK (sess); + if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) { + result = sess->next_early_rtcp_time; + goto early_exit; + } + result = sess->next_rtcp_check_time; GST_DEBUG ("current time: %" GST_TIME_FORMAT ", next :%" GST_TIME_FORMAT, @@ -2110,26 +2586,36 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time) if (sess->source->received_bye) { if (sess->sent_bye) { GST_DEBUG ("we sent BYE already"); - result = GST_CLOCK_TIME_NONE; + interval = GST_CLOCK_TIME_NONE; } else if (sess->stats.active_sources >= 50) { GST_DEBUG ("reconsider BYE, more than 50 sources"); /* reconsider BYE if members >= 50 */ - result += calculate_rtcp_interval (sess, FALSE, TRUE); + interval = calculate_rtcp_interval (sess, FALSE, TRUE); } } else { if (sess->first_rtcp) { GST_DEBUG ("first RTCP packet"); /* we are called for the first time */ - result += calculate_rtcp_interval (sess, FALSE, TRUE); + interval = calculate_rtcp_interval (sess, FALSE, TRUE); } else if (sess->next_rtcp_check_time < current_time) { GST_DEBUG ("old check time expired, getting new timeout"); /* get a new timeout when we need to */ - result += calculate_rtcp_interval (sess, FALSE, FALSE); + interval = calculate_rtcp_interval (sess, FALSE, FALSE); } } + + if (interval != GST_CLOCK_TIME_NONE) + result += interval; + else + result = GST_CLOCK_TIME_NONE; + sess->next_rtcp_check_time = result; - GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result)); +early_exit: + + GST_DEBUG ("current time: %" GST_TIME_FORMAT + ", next time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (current_time), GST_TIME_ARGS (result)); RTP_SESSION_UNLOCK (sess); return result; @@ -2141,10 +2627,13 @@ typedef struct GstBuffer *rtcp; GstClockTime current_time; guint64 ntpnstime; + GstClockTime running_time; GstClockTime interval; GstRTCPPacket packet; gboolean is_bye; gboolean has_sdes; + gboolean is_early; + gboolean may_suppress; } ReportData; static void @@ -2165,8 +2654,8 @@ session_start_rtcp (RTPSession * sess, ReportData * data) gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); /* get latest stats */ - rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime, - &packet_count, &octet_count); + rtp_source_get_new_sr (own, data->ntpnstime, data->running_time, + &ntptime, &rtptime, &packet_count, &octet_count); /* store stats */ rtp_source_process_sr (own, data->current_time, ntptime, rtptime, packet_count, octet_count); @@ -2192,6 +2681,9 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) /* create a new buffer if needed */ if (data->rtcp == NULL) { session_start_rtcp (sess, data); + } else if (data->is_early) { + /* Put a single RR or SR in minimal compound packets */ + return; } if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) { /* only report about other sender sources */ @@ -2205,6 +2697,15 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) rtp_source_get_new_rb (source, data->current_time, &fractionlost, &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + /* store last generated RR packet */ + source->last_rr.is_valid = TRUE; + source->last_rr.fractionlost = fractionlost; + source->last_rr.packetslost = packetslost; + source->last_rr.exthighestseq = exthighestseq; + source->last_rr.jitter = jitter; + source->last_rr.lsr = lsr; + source->last_rr.dlsr = dlsr; + /* packet is not yet filled, add report block for this source. */ gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost, exthighestseq, jitter, lsr, dlsr); @@ -2213,7 +2714,7 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) } /* perform cleanup of sources that timed out */ -static gboolean +static void session_cleanup (const gchar * key, RTPSource * source, ReportData * data) { gboolean remove = FALSE; @@ -2281,34 +2782,70 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) if (sendertimeout) on_sender_timeout (sess, source); } - return remove; + + source->closing = remove; } static void session_sdes (RTPSession * sess, ReportData * data) { GstRTCPPacket *packet = &data->packet; - guint8 *sdes_data; - guint sdes_len; + const GstStructure *sdes; + gint i, n_fields; /* add SDES packet */ gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet); gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc); - rtp_source_get_sdes (sess->source, GST_RTCP_SDES_CNAME, &sdes_data, - &sdes_len); - gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, sdes_len, - sdes_data); - - /* other SDES items must only be added at regular intervals and only when the - * user requests to since it might be a privacy problem */ -#if 0 - gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME, - strlen (sess->name), (guint8 *) sess->name); - gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL, - strlen (sess->tool), (guint8 *) sess->tool); -#endif + sdes = rtp_source_get_sdes_struct (sess->source); + + /* add all fields in the structure, the order is not important. */ + n_fields = gst_structure_n_fields (sdes); + for (i = 0; i < n_fields; ++i) { + const gchar *field; + const gchar *value; + GstRTCPSDESType type; + + field = gst_structure_nth_field_name (sdes, i); + if (field == NULL) + continue; + value = gst_structure_get_string (sdes, field); + if (value == NULL) + continue; + type = gst_rtcp_sdes_name_to_type (field); + + /* Early packets are minimal and only include the CNAME */ + if (data->is_early && type != GST_RTCP_SDES_CNAME) + continue; + + if (type > GST_RTCP_SDES_END && type < GST_RTCP_SDES_PRIV) { + gst_rtcp_packet_sdes_add_entry (packet, type, strlen (value), + (const guint8 *) value); + } else if (type == GST_RTCP_SDES_PRIV) { + gsize prefix_len; + gsize value_len; + gsize data_len; + guint8 data[256]; + + /* don't accept entries that are too big */ + prefix_len = strlen (field); + if (prefix_len > 255) + continue; + value_len = strlen (value); + if (value_len > 255) + continue; + data_len = 1 + prefix_len + value_len; + if (data_len > 255) + continue; + + data[0] = prefix_len; + memcpy (&data[1], field, prefix_len); + memcpy (&data[1 + prefix_len], value, value_len); + + gst_rtcp_packet_sdes_add_entry (packet, type, data_len, data); + } + } data->has_sdes = TRUE; } @@ -2339,7 +2876,9 @@ static gboolean is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) { GstClockTime new_send_time, elapsed; - gboolean result; + + if (data->is_early && sess->next_early_rtcp_time < current_time) + goto early; /* no need to check yet */ if (sess->next_rtcp_check_time > current_time) { @@ -2364,18 +2903,52 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) if (current_time < new_send_time) { GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time)); - result = FALSE; /* store new check time */ sess->next_rtcp_check_time = new_send_time; - } else { - result = TRUE; - new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE); + return FALSE; + } - GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT, - GST_TIME_ARGS (new_send_time)); - sess->next_rtcp_check_time = current_time + new_send_time; +early: + + new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE); + + GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_send_time)); + sess->next_rtcp_check_time = current_time + new_send_time; + + /* Apply the rules from RFC 4585 section 3.5.3 */ + if (sess->stats.min_interval != 0 && !sess->first_rtcp) { + GstClockTimeDiff T_rr_current_interval = g_random_double_range (0.5, 1.5) * + sess->stats.min_interval; + + /* This will caused the RTCP to be suppressed if no FB packets are added */ + if (sess->last_rtcp_send_time + T_rr_current_interval > + sess->next_rtcp_check_time) { + GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT + " last: %" GST_TIME_FORMAT + " + T_rr_current_interval: %" GST_TIME_FORMAT + " > sess->next_rtcp_check_time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (sess->stats.min_interval), + GST_TIME_ARGS (sess->last_rtcp_send_time), + GST_TIME_ARGS (T_rr_current_interval), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + data->may_suppress = TRUE; + } } - return result; + + return TRUE; +} + +static void +clone_ssrcs_hashtable (gchar * key, RTPSource * source, GHashTable * hash_table) +{ + g_hash_table_insert (hash_table, key, g_object_ref (source)); +} + +static gboolean +remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data) +{ + return source->closing; } /** @@ -2383,6 +2956,7 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) * @sess: an #RTPSession * @current_time: the current system time * @ntpnstime: the current NTP time in nanoseconds + * @running_time: the current running_time of the pipeline * * Perform maintenance actions after the timeout obtained with * rtp_session_next_timeout() expired. @@ -2397,12 +2971,13 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) */ GstFlowReturn rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, - guint64 ntpnstime) + guint64 ntpnstime, GstClockTime running_time) { GstFlowReturn result = GST_FLOW_OK; - GList *item; ReportData data; RTPSource *own; + GHashTable *table_copy; + gboolean notify = FALSE; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -2415,6 +2990,8 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, data.ntpnstime = ntpnstime; data.is_bye = FALSE; data.has_sdes = FALSE; + data.may_suppress = FALSE; + data.running_time = running_time; own = sess->source; @@ -2422,9 +2999,26 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, /* get a new interval, we need this for various cleanups etc */ data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp); - /* first perform cleanups */ + /* Make a local copy of the hashtable. We need to do this because the + * cleanup stage below releases the session lock. */ + table_copy = g_hash_table_new_full (NULL, NULL, NULL, + (GDestroyNotify) g_object_unref); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) clone_ssrcs_hashtable, table_copy); + + /* Clean up the session, mark the source for removing, this might release the + * session lock. */ + g_hash_table_foreach (table_copy, (GHFunc) session_cleanup, &data); + g_hash_table_destroy (table_copy); + + /* Now remove the marked sources */ g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx], - (GHRFunc) session_cleanup, &data); + (GHRFunc) remove_closing_sources, NULL); + + if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) + data.is_early = TRUE; + else + data.is_early = FALSE; /* see if we need to generate SR or RR packets */ if (is_rtcp_time (sess, current_time, &data)) { @@ -2441,38 +3035,23 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, } if (data.rtcp) { - guint size; - /* we keep track of the last report time in order to timeout inactive * receivers or senders */ - sess->last_rtcp_send_time = data.current_time; + if (!data.is_early && !data.may_suppress) + sess->last_rtcp_send_time = data.current_time; sess->first_rtcp = FALSE; + sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; /* add SDES for this source when not already added */ if (!data.has_sdes) session_sdes (sess, &data); - - /* update average RTCP size before sending */ - size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len; - UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size); } /* check for outdated collisions */ - GST_DEBUG ("checking collision list"); - item = g_list_first (sess->conflicting_addresses); - while (item) { - RTPConflictingAddress *known_conflict = item->data; - GList *next_item = g_list_next (item); - - if (known_conflict->time < current_time - (data.interval * - RTCP_INTERVAL_COLLISION_TIMEOUT)) { - sess->conflicting_addresses = - g_list_delete_link (sess->conflicting_addresses, item); - GST_DEBUG ("collision %p timed out", known_conflict); - g_free (known_conflict); - } - item = next_item; - } + GST_DEBUG ("Timing out collisions"); + rtp_source_timeout (sess->source, current_time, + data.interval * RTCP_INTERVAL_COLLISION_TIMEOUT, + running_time - sess->rtcp_feedback_retention_window); if (sess->change_ssrc) { GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc); @@ -2489,24 +3068,185 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, sess->bye_reason = NULL; sess->sent_bye = FALSE; sess->change_ssrc = FALSE; + notify = TRUE; GST_DEBUG ("changed our SSRC to %08x", own->ssrc); } + + sess->allow_early = TRUE; + RTP_SESSION_UNLOCK (sess); + if (notify) + g_object_notify (G_OBJECT (sess), "internal-ssrc"); + /* push out the RTCP packet */ if (data.rtcp) { - /* close the RTCP packet */ - gst_rtcp_buffer_end (data.rtcp); + gboolean do_not_suppress; - GST_DEBUG ("sending packet"); - if (sess->callbacks.send_rtcp) - result = sess->callbacks.send_rtcp (sess, own, data.rtcp, - sess->sent_bye, sess->send_rtcp_user_data); - else { - GST_DEBUG ("freeing packet"); + /* Give the user a change to add its own packet */ + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0, + data.rtcp, data.is_early, &do_not_suppress); + + if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) { + guint packet_size; + + /* close the RTCP packet */ + gst_rtcp_buffer_end (data.rtcp); + + packet_size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len; + + UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size); + GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats, + sess->stats.avg_rtcp_packet_size, packet_size); + result = + sess->callbacks.send_rtcp (sess, own, data.rtcp, sess->sent_bye, + sess->send_rtcp_user_data); + } else { + GST_DEBUG ("freeing packet callback: %p" + " do_not_suppress: %d may_suppress: %d", + sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress); gst_buffer_unref (data.rtcp); } } return result; } + +void +rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, + GstClockTimeDiff max_delay) +{ + GstClockTime T_dither_max; + + /* Implements the algorithm described in RFC 4585 section 3.5.2 */ + + RTP_SESSION_LOCK (sess); + + /* Check if already requested */ + /* RFC 4585 section 3.5.2 step 2 */ + if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) + goto dont_send; + + /* Ignore the request a scheduled packet will be in time anyway */ + if (current_time + max_delay > sess->next_rtcp_check_time) + goto dont_send; + + /* RFC 4585 section 3.5.2 step 2b */ + /* If the total sources is <=2, then there is only us and one peer */ + if (sess->total_sources <= 2) { + T_dither_max = 0; + } else { + /* Divide by 2 because l = 0.5 */ + T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time; + T_dither_max /= 2; + } + + /* RFC 4585 section 3.5.2 step 3 */ + if (current_time + T_dither_max > sess->next_rtcp_check_time) + goto dont_send; + + /* RFC 4585 section 3.5.2 step 4 */ + if (sess->allow_early == FALSE) + goto dont_send; + + if (T_dither_max) { + /* Schedule an early transmission later */ + sess->next_early_rtcp_time = g_random_double () * T_dither_max + + current_time; + } else { + /* If no dithering, schedule it for NOW */ + sess->next_early_rtcp_time = current_time; + } + + RTP_SESSION_UNLOCK (sess); + + /* notify app of need to send packet early + * and therefore of timeout change */ + if (sess->callbacks.reconsider) + sess->callbacks.reconsider (sess, sess->reconsider_user_data); + + return; + +dont_send: + + RTP_SESSION_UNLOCK (sess); + +} + +void +rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc) +{ + guint i; + + for (i = 0; i < sess->rtcp_pli_requests->len; i++) + if (ssrc == g_array_index (sess->rtcp_pli_requests, guint32, i)) + return; + + g_array_append_val (sess->rtcp_pli_requests, ssrc); +} + +static gboolean +has_pli_compare_func (gconstpointer a, gconstpointer ignored) +{ + GstRTCPPacket packet; + + packet.buffer = (GstBuffer *) a; + packet.offset = 0; + + if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_PSFB && + gst_rtcp_packet_fb_get_type (&packet) == GST_RTCP_PSFB_TYPE_PLI) + return TRUE; + else + return FALSE; +} + +static gboolean +rtp_session_on_sending_rtcp (RTPSession * sess, GstBuffer * buffer, + gboolean early) +{ + gboolean ret = FALSE; + + RTP_SESSION_LOCK (sess); + + while (sess->rtcp_pli_requests->len) { + GstRTCPPacket rtcppacket; + guint media_ssrc = g_array_index (sess->rtcp_pli_requests, guint32, 0); + RTPSource *media_src = g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GUINT_TO_POINTER (media_ssrc)); + + if (media_src && !rtp_source_has_retained (media_src, + has_pli_compare_func, NULL)) { + if (gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_PSFB, &rtcppacket)) { + gst_rtcp_packet_fb_set_type (&rtcppacket, GST_RTCP_PSFB_TYPE_PLI); + gst_rtcp_packet_fb_set_sender_ssrc (&rtcppacket, + rtp_source_get_ssrc (sess->source)); + gst_rtcp_packet_fb_set_media_ssrc (&rtcppacket, media_ssrc); + ret = TRUE; + } else { + /* Break because the packet is full, will put next request in a + * further packet + */ + break; + } + } + + g_array_remove_index (sess->rtcp_pli_requests, 0); + } + + RTP_SESSION_UNLOCK (sess); + + return ret; +} + +static void +rtp_session_send_rtcp (RTPSession * sess, GstClockTimeDiff max_delay) +{ + GstClockTime now; + + if (!sess->callbacks.send_rtcp) + return; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); + + rtp_session_request_early_rtcp (sess, now, max_delay); +}