X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Frtpsession.c;h=a2601866a5c42c9b98b4dec48ad8777bb06059eb;hb=7596ed91b844057f57566c09727339dea99043f7;hp=8eb131f69cb18fede1aecaab33e0ce184d076f2b;hpb=5fe18ee43237e08534b9538c26cb4008aff74610;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 8eb131f..a260186 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -49,12 +49,14 @@ enum SIGNAL_ON_SENDING_RTCP, SIGNAL_ON_FEEDBACK_RTCP, SIGNAL_SEND_RTCP, + SIGNAL_SEND_RTCP_FULL, + SIGNAL_ON_RECEIVING_RTCP, LAST_SIGNAL }; #define DEFAULT_INTERNAL_SOURCE NULL -#define DEFAULT_BANDWIDTH RTP_STATS_BANDWIDTH -#define DEFAULT_RTCP_FRACTION (RTP_STATS_RTCP_FRACTION * RTP_STATS_BANDWIDTH) +#define DEFAULT_BANDWIDTH 0.0 +#define DEFAULT_RTCP_FRACTION RTP_STATS_RTCP_FRACTION #define DEFAULT_RTCP_RR_BANDWIDTH -1 #define DEFAULT_RTCP_RS_BANDWIDTH -1 #define DEFAULT_RTCP_MTU 1400 @@ -86,7 +88,7 @@ enum PROP_RTCP_FEEDBACK_RETENTION_WINDOW, PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, PROP_PROBATION, - PROP_LAST + PROP_STATS }; /* update average packet size */ @@ -99,11 +101,6 @@ enum (avg) = ((val) + (15 * (avg))) >> 4; -/* The number RTCP intervals after which to timeout entries in the - * collision table - */ -#define RTCP_INTERVAL_COLLISION_TIMEOUT 10 - /* GObject vmethods */ static void rtp_session_finalize (GObject * object); static void rtp_session_set_property (GObject * object, guint prop_id, @@ -111,7 +108,8 @@ static void rtp_session_set_property (GObject * object, guint prop_id, static void rtp_session_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static void rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay); +static gboolean rtp_session_send_rtcp (RTPSession * sess, + GstClockTime max_delay); static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; @@ -119,9 +117,9 @@ G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT); static guint32 rtp_session_create_new_ssrc (RTPSession * sess); static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc, - gboolean * created, RTPArrivalStats * arrival, gboolean rtp); + gboolean * created, RTPPacketInfo * pinfo, gboolean rtp); static RTPSource *obtain_internal_source (RTPSession * sess, - guint32 ssrc, gboolean * created); + guint32 ssrc, gboolean * created, GstClockTime current_time); static GstFlowReturn rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time); static GstClockTime calculate_rtcp_interval (RTPSession * sess, @@ -322,6 +320,42 @@ rtp_session_class_init (RTPSessionClass * klass) G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT64); + /** + * RTPSession::send-rtcp-full: + * @session: the object which received the signal + * @max_delay: The maximum delay after which the feedback will not be useful + * anymore + * + * Requests that the #RTPSession initiate a new RTCP packet as soon as + * possible within the requested delay. + * + * Returns: TRUE if the new RTCP packet could be scheduled within the + * requested delay, FALSE otherwise. + * + * Since: 1.6 + */ + rtp_session_signals[SIGNAL_SEND_RTCP_FULL] = + g_signal_new ("send-rtcp-full", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (RTPSessionClass, send_rtcp), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64); + + /** + * RTPSession::on-receiving-rtcp + * @session: the object which received the signal + * @buffer: the #GstBuffer containing the RTCP packet that was received + * + * This signal is emitted when receiving an RTCP packet before it is handled + * by the session. It can be used to extract custom information from RTCP packets. + * + * Since: 1.6 + */ + rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP] = + g_signal_new ("on-receiving-rtcp", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_receiving_rtcp), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, + GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE); + g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC, g_param_spec_uint ("internal-ssrc", "Internal SSRC", "The internal SSRC used for the session (deprecated)", @@ -432,9 +466,9 @@ rtp_session_class_init (RTPSessionClass * klass) g_param_spec_uint ("rtcp-immediate-feedback-threshold", "RTCP Immediate Feedback threshold", "The maximum number of members of a RTP session for which immediate" - " feedback is used", + " feedback is used (DEPRECATED: has no effect and is not needed)", 0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED)); g_object_class_install_property (gobject_class, PROP_PROBATION, g_param_spec_uint ("probation", "Number of probations", @@ -442,6 +476,24 @@ rtp_session_class_init (RTPSessionClass * klass) 0, G_MAXUINT, DEFAULT_PROBATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * RTPSession::stats: + * + * Various session statistics. This property returns a GstStructure + * with name application/x-rtp-session-stats with the following fields: + * + * "rtx-drop-count" G_TYPE_UINT The number of retransmission events + * dropped (due to bandwidth constraints) + * "sent-nack-count" G_TYPE_UINT Number of NACKs sent + * "recv-nack-count" G_TYPE_UINT Number of NACKs received + * + * Since: 1.4 + */ + g_object_class_install_property (gobject_class, PROP_STATS, + g_param_spec_boxed ("stats", "Statistics", + "Various statistics", GST_TYPE_STRUCTURE, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + klass->get_source_by_ssrc = GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp); @@ -460,7 +512,11 @@ rtp_session_init (RTPSession * sess) sess->mask_idx = 0; sess->mask = 0; - for (i = 0; i < 32; i++) { + /* TODO: We currently only use the first hash table but this is the + * beginning of an implementation for RFC2762 + for (i = 0; i < 32; i++) { + */ + for (i = 0; i < 1; i++) { sess->ssrcs[i] = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref); @@ -505,6 +561,7 @@ rtp_session_init (RTPSession * sess) sess->first_rtcp = TRUE; sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE; sess->allow_early = TRUE; sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; @@ -513,6 +570,8 @@ rtp_session_init (RTPSession * sess) DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD; sess->last_keyframe_request = GST_CLOCK_TIME_NONE; + + sess->is_doing_ptp = TRUE; } static void @@ -525,7 +584,13 @@ rtp_session_finalize (GObject * object) gst_structure_free (sess->sdes); - for (i = 0; i < 32; i++) + g_list_free_full (sess->conflicting_addresses, + (GDestroyNotify) rtp_conflicting_address_free); + + /* TODO: Change this again when implementing RFC 2762 + * for (i = 0; i < 32; i++) + */ + for (i = 0; i < 1; i++) g_hash_table_destroy (sess->ssrcs[i]); g_mutex_clear (&sess->lock); @@ -563,6 +628,19 @@ rtp_session_create_sources (RTPSession * sess) return res; } +static GstStructure * +rtp_session_create_stats (RTPSession * sess) +{ + GstStructure *s; + + s = gst_structure_new ("application/x-rtp-session-stats", + "rtx-drop-count", G_TYPE_UINT, sess->stats.nacks_dropped, + "sent-nack-count", G_TYPE_UINT, sess->stats.nacks_sent, + "recv-nack-count", G_TYPE_UINT, sess->stats.nacks_received, NULL); + + return s; +} + static void rtp_session_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -573,6 +651,11 @@ rtp_session_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_INTERNAL_SSRC: + RTP_SESSION_LOCK (sess); + sess->suggested_ssrc = g_value_get_uint (value); + RTP_SESSION_UNLOCK (sess); + if (sess->callbacks.reconfigure) + sess->callbacks.reconfigure (sess, sess->reconfigure_user_data); break; case PROP_BANDWIDTH: RTP_SESSION_LOCK (sess); @@ -684,6 +767,9 @@ rtp_session_get_property (GObject * object, guint prop_id, case PROP_PROBATION: g_value_set_uint (value, sess->probation); break; + case PROP_STATS: + g_value_take_boxed (value, rtp_session_create_stats (sess)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -851,6 +937,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.notify_nack = callbacks->notify_nack; sess->notify_nack_user_data = user_data; } + if (callbacks->reconfigure) { + sess->callbacks.reconfigure = callbacks->reconfigure; + sess->reconfigure_user_data = user_data; + } } /** @@ -1161,14 +1251,51 @@ static RTPSourceCallbacks callbacks = { (RTPSourceClockRate) source_clock_rate, }; + +/** + * rtp_session_find_conflicting_address: + * @session: The session the packet came in + * @address: address to check for + * @time: The time when the packet that is possibly in conflict arrived + * + * Checks if an address which has a conflict is already known. If it is + * a known conflict, remember the time + * + * Returns: TRUE if it was a known conflict, FALSE otherwise + */ +static gboolean +rtp_session_find_conflicting_address (RTPSession * session, + GSocketAddress * address, GstClockTime time) +{ + return find_conflicting_address (session->conflicting_addresses, address, + time); +} + +/** + * rtp_session_add_conflicting_address: + * @session: The session the packet came in + * @address: address to remember + * @time: The time when the packet that is in conflict arrived + * + * Adds a new conflict address + */ +static void +rtp_session_add_conflicting_address (RTPSession * sess, + GSocketAddress * address, GstClockTime time) +{ + sess->conflicting_addresses = + add_conflicting_address (sess->conflicting_addresses, address, time); +} + + static gboolean check_collision (RTPSession * sess, RTPSource * source, - RTPArrivalStats * arrival, gboolean rtp) + RTPPacketInfo * pinfo, gboolean rtp) { guint32 ssrc; - /* If we have no arrival address, we can't do collision checking */ - if (!arrival->address) + /* If we have no pinfo address, we can't do collision checking */ + if (!pinfo->address) return FALSE; ssrc = rtp_source_get_ssrc (source); @@ -1185,17 +1312,17 @@ check_collision (RTPSession * sess, RTPSource * source, } if (from) { - if (__g_socket_address_equal (from, arrival->address)) { + if (__g_socket_address_equal (from, pinfo->address)) { /* Address is the same */ return FALSE; } else { GST_LOG ("we have a third-party collision or loop ssrc:%x", ssrc); if (sess->favor_new) { if (rtp_source_find_conflicting_address (source, - arrival->address, arrival->current_time)) { + pinfo->address, pinfo->current_time)) { gchar *buf1; - buf1 = __g_socket_address_to_string (arrival->address); + buf1 = __g_socket_address_to_string (pinfo->address); GST_LOG ("Known conflict on %x for %s, dropping packet", ssrc, buf1); g_free (buf1); @@ -1208,18 +1335,18 @@ check_collision (RTPSession * sess, RTPSource * source, * a new source. Save old address in possible conflict list */ rtp_source_add_conflicting_address (source, from, - arrival->current_time); + pinfo->current_time); buf1 = __g_socket_address_to_string (from); - buf2 = __g_socket_address_to_string (arrival->address); + buf2 = __g_socket_address_to_string (pinfo->address); GST_DEBUG ("New conflict for ssrc %x, replacing %s with %s," " saving old as known conflict", ssrc, buf1, buf2); if (rtp) - rtp_source_set_rtp_from (source, arrival->address); + rtp_source_set_rtp_from (source, pinfo->address); else - rtp_source_set_rtcp_from (source, arrival->address); + rtp_source_set_rtcp_from (source, pinfo->address); g_free (buf1); g_free (buf2); @@ -1234,9 +1361,9 @@ check_collision (RTPSession * sess, RTPSource * source, } else { /* We don't already have a from address for RTP, just set it */ if (rtp) - rtp_source_set_rtp_from (source, arrival->address); + rtp_source_set_rtp_from (source, pinfo->address); else - rtp_source_set_rtcp_from (source, arrival->address); + rtp_source_set_rtcp_from (source, pinfo->address); return FALSE; } @@ -1246,16 +1373,16 @@ check_collision (RTPSession * sess, RTPSource * source, */ } else { /* This is sending with our ssrc, is it an address we already know */ - if (rtp_source_find_conflicting_address (source, arrival->address, - arrival->current_time)) { + if (rtp_session_find_conflicting_address (sess, pinfo->address, + pinfo->current_time)) { /* Its a known conflict, its probably a loop, not a collision * lets just drop the incoming packet */ GST_DEBUG ("Our packets are being looped back to us, dropping"); } else { /* Its a new collision, lets change our SSRC */ - rtp_source_add_conflicting_address (source, arrival->address, - arrival->current_time); + rtp_session_add_conflicting_address (sess, pinfo->address, + pinfo->current_time); GST_DEBUG ("Collision for SSRC %x", ssrc); /* mark the source BYE */ @@ -1266,18 +1393,95 @@ check_collision (RTPSession * sess, RTPSource * source, on_ssrc_collision (sess, source); - rtp_session_schedule_bye_locked (sess, arrival->current_time); + rtp_session_schedule_bye_locked (sess, pinfo->current_time); } } return TRUE; } -static RTPSource * -find_source (RTPSession * sess, guint32 ssrc) +typedef struct { - return g_hash_table_lookup (sess->ssrcs[sess->mask_idx], - GINT_TO_POINTER (ssrc)); + gboolean is_doing_ptp; + GSocketAddress *new_addr; +} CompareAddrData; + +/* check if the two given ip addr are the same (do not care about the port) */ +static gboolean +ip_addr_equal (GSocketAddress * a, GSocketAddress * b) +{ + return + g_inet_address_equal (g_inet_socket_address_get_address + (G_INET_SOCKET_ADDRESS (a)), + g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (b))); +} + +static void +compare_rtp_source_addr (const gchar * key, RTPSource * source, + CompareAddrData * data) +{ + /* only compare ip addr of remote sources which are also not closing */ + if (!source->internal && !source->closing && source->rtp_from) { + /* look for the first rtp source */ + if (!data->new_addr) + data->new_addr = source->rtp_from; + /* compare current ip addr with the first one */ + else + data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtp_from); + } +} + +static void +compare_rtcp_source_addr (const gchar * key, RTPSource * source, + CompareAddrData * data) +{ + /* only compare ip addr of remote sources which are also not closing */ + if (!source->internal && !source->closing && source->rtcp_from) { + /* look for the first rtcp source */ + if (!data->new_addr) + data->new_addr = source->rtcp_from; + else + /* compare current ip addr with the first one */ + data->is_doing_ptp &= ip_addr_equal (data->new_addr, source->rtcp_from); + } +} + +/* loop over our non-internal source to know if the session + * is doing point-to-point */ +static void +session_update_ptp (RTPSession * sess) +{ + /* to know if the session is doing point to point, the ip addr + * of each non-internal (=remotes) source have to be compared + * to each other. + */ + gboolean is_doing_rtp_ptp; + gboolean is_doing_rtcp_ptp; + CompareAddrData data; + + /* compare the first remote source's ip addr that receive rtp packets + * with other remote rtp source. + * it's enough because the session just needs to know if they are all + * equals or not + */ + data.is_doing_ptp = TRUE; + data.new_addr = NULL; + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) compare_rtp_source_addr, (gpointer) & data); + is_doing_rtp_ptp = data.is_doing_ptp; + + /* same but about rtcp */ + data.is_doing_ptp = TRUE; + data.new_addr = NULL; + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) compare_rtcp_source_addr, (gpointer) & data); + is_doing_rtcp_ptp = data.is_doing_ptp; + + /* the session is doing point-to-point if all rtp remote have the same + * ip addr and if all rtcp remote sources have the same ip addr */ + sess->is_doing_ptp = is_doing_rtp_ptp && is_doing_rtcp_ptp; + + GST_DEBUG ("doing point-to-point: %d", sess->is_doing_ptp); } static void @@ -1296,13 +1500,24 @@ add_source (RTPSession * sess, RTPSource * src) if (sess->suggested_ssrc != src->ssrc) sess->suggested_ssrc = src->ssrc; } + + /* update point-to-point status */ + if (!src->internal) + session_update_ptp (sess); +} + +static RTPSource * +find_source (RTPSession * sess, guint32 ssrc) +{ + return g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (ssrc)); } /* must be called with the session lock, the returned source needs to be * unreffed after usage. */ static RTPSource * obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, - RTPArrivalStats * arrival, gboolean rtp) + RTPPacketInfo * pinfo, gboolean rtp) { RTPSource *source; @@ -1322,11 +1537,11 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, g_object_set (source, "probation", 0, NULL); /* store from address, if any */ - if (arrival->address) { + if (pinfo->address) { if (rtp) - rtp_source_set_rtp_from (source, arrival->address); + rtp_source_set_rtp_from (source, pinfo->address); else - rtp_source_set_rtcp_from (source, arrival->address); + rtp_source_set_rtcp_from (source, pinfo->address); } /* configure a callback on the source */ @@ -1337,7 +1552,7 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, } else { *created = FALSE; /* check for collision, this updates the address when not previously set */ - if (check_collision (sess, source, arrival, rtp)) { + if (check_collision (sess, source, pinfo, rtp)) { return NULL; } /* Receiving RTCP packets of an SSRC is a strong indication that we @@ -1346,9 +1561,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, g_object_set (source, "probation", 0, NULL); } /* update last activity */ - source->last_activity = arrival->current_time; + source->last_activity = pinfo->current_time; if (rtp) - source->last_rtp_activity = arrival->current_time; + source->last_rtp_activity = pinfo->current_time; g_object_ref (source); return source; @@ -1357,7 +1572,8 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, /* must be called with the session lock, the returned source needs to be * unreffed after usage. */ static RTPSource * -obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created) +obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created, + GstClockTime current_time) { RTPSource *source; @@ -1370,6 +1586,7 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created) source->validated = TRUE; source->internal = TRUE; + source->probation = FALSE; rtp_source_set_sdes_struct (source, gst_structure_copy (sess->sdes)); rtp_source_set_callbacks (source, &callbacks, sess); @@ -1378,6 +1595,11 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created) } else { *created = FALSE; } + /* update last activity */ + if (current_time != GST_CLOCK_TIME_NONE) { + source->last_activity = current_time; + source->last_rtp_activity = current_time; + } g_object_ref (source); return source; @@ -1499,7 +1721,7 @@ rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc) RTP_SESSION_LOCK (sess); result = find_source (sess, ssrc); - if (result) + if (result != NULL) g_object_ref (result); RTP_SESSION_UNLOCK (sess); @@ -1550,51 +1772,104 @@ rtp_session_create_source (RTPSession * sess) return source; } -/* update the RTPArrivalStats structure with the current time and other bits - * about the current buffer we are handling. - * This function is typically called when a validated packet is received. - * This function should be called with the SESSION_LOCK - */ -static void -update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, - gboolean rtp, GstBuffer * buffer, GstClockTime current_time, - GstClockTime running_time, guint64 ntpnstime) +static gboolean +update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo) { GstNetAddressMeta *meta; - GstRTPBuffer rtpb = { NULL }; - - /* get time of arrival */ - arrival->current_time = current_time; - arrival->running_time = running_time; - arrival->ntpnstime = ntpnstime; /* get packet size including header overhead */ - arrival->bytes = gst_buffer_get_size (buffer) + sess->header_len; + pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len; + pinfo->packets++; + + if (pinfo->rtp) { + GstRTPBuffer rtp = { NULL }; + + if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp)) + goto invalid_packet; + + pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp); + if (idx == 0) { + gint i; + + /* only keep info for first buffer */ + pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp); + pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp); + pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp); + pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp); + /* copy available csrc */ + pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp); + for (i = 0; i < pinfo->csrc_count; i++) + pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i); + } + gst_rtp_buffer_unmap (&rtp); + } - if (rtp) { - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpb); - arrival->payload_len = gst_rtp_buffer_get_payload_len (&rtpb); - gst_rtp_buffer_unmap (&rtpb); - } else { - arrival->payload_len = 0; + if (idx == 0) { + /* for netbuffer we can store the IP address to check for collisions */ + meta = gst_buffer_get_net_address_meta (*buffer); + if (pinfo->address) + g_object_unref (pinfo->address); + if (meta) { + pinfo->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr)); + } else { + pinfo->address = NULL; + } } + return TRUE; - /* for netbuffer we can store the IP address to check for collisions */ - meta = gst_buffer_get_net_address_meta (buffer); - if (arrival->address) - g_object_unref (arrival->address); - if (meta) { - arrival->address = G_SOCKET_ADDRESS (g_object_ref (meta->addr)); + /* ERRORS */ +invalid_packet: + { + GST_DEBUG ("invalid RTP packet received"); + return FALSE; + } +} + +/* update the RTPPacketInfo structure with the current time and other bits + * about the current buffer we are handling. + * This function is typically called when a validated packet is received. + * This function should be called with the SESSION_LOCK + */ +static gboolean +update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, + gboolean send, gboolean rtp, gboolean is_list, gpointer data, + GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) +{ + gboolean res; + + pinfo->send = send; + pinfo->rtp = rtp; + pinfo->is_list = is_list; + pinfo->data = data; + pinfo->current_time = current_time; + pinfo->running_time = running_time; + pinfo->ntpnstime = ntpnstime; + pinfo->header_len = sess->header_len; + pinfo->bytes = 0; + pinfo->payload_len = 0; + pinfo->packets = 0; + + if (is_list) { + GstBufferList *list = GST_BUFFER_LIST_CAST (data); + res = + gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet, + pinfo); } else { - arrival->address = NULL; + GstBuffer *buffer = GST_BUFFER_CAST (data); + res = update_packet (&buffer, 0, pinfo); } + return res; } static void -clean_arrival_stats (RTPArrivalStats * arrival) +clean_packet_info (RTPPacketInfo * pinfo) { - if (arrival->address) - g_object_unref (arrival->address); + if (pinfo->address) + g_object_unref (pinfo->address); + if (pinfo->data) { + gst_mini_object_unref (pinfo->data); + pinfo->data = NULL; + } } static gboolean @@ -1659,45 +1934,32 @@ source_update_sender (RTPSession * sess, RTPSource * source, */ GstFlowReturn rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, - GstClockTime current_time, GstClockTime running_time) + GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) { GstFlowReturn result; guint32 ssrc; RTPSource *source; gboolean created; gboolean prevsender, prevactive; - RTPArrivalStats arrival = { NULL, }; - guint32 csrcs[16]; - guint8 i, count; + RTPPacketInfo pinfo = { 0, }; guint64 oldrate; - GstRTPBuffer rtp = { NULL }; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); - if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) - goto invalid_packet; - - /* get SSRC to look up in session database */ - ssrc = gst_rtp_buffer_get_ssrc (&rtp); - /* copy available csrc for later */ - count = gst_rtp_buffer_get_csrc_count (&rtp); - /* make sure to not overflow our array. An RTP buffer can maximally contain - * 16 CSRCs */ - count = MIN (count, 16); - - for (i = 0; i < count; i++) - csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i); - - gst_rtp_buffer_unmap (&rtp); - RTP_SESSION_LOCK (sess); - /* update arrival stats */ - update_arrival_stats (sess, &arrival, TRUE, buffer, current_time, - running_time, -1); + /* update pinfo stats */ + if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer, + current_time, running_time, ntpnstime)) { + GST_DEBUG ("invalid RTP packet received"); + RTP_SESSION_UNLOCK (sess); + return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime); + } + + ssrc = pinfo.ssrc; - source = obtain_source (sess, ssrc, &created, &arrival, TRUE); + source = obtain_source (sess, ssrc, &created, &pinfo, TRUE); if (!source) goto collision; @@ -1706,7 +1968,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, oldrate = source->bitrate; /* let source process the packet */ - result = rtp_source_process_rtp (source, buffer, &arrival); + result = rtp_source_process_rtp (source, &pinfo); /* source became active */ if (source_update_active (sess, source, prevactive)) @@ -1722,16 +1984,17 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, if (source->validated) { gboolean created; + gint i; /* for validated sources, we add the CSRCs as well */ - for (i = 0; i < count; i++) { + for (i = 0; i < pinfo.csrc_count; i++) { guint32 csrc; RTPSource *csrc_src; - csrc = csrcs[i]; + csrc = pinfo.csrcs[i]; /* get source */ - csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE); + csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE); if (!csrc_src) continue; @@ -1748,22 +2011,15 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, RTP_SESSION_UNLOCK (sess); - clean_arrival_stats (&arrival); + clean_packet_info (&pinfo); return result; /* ERRORS */ -invalid_packet: - { - gst_buffer_unref (buffer); - GST_DEBUG ("invalid RTP packet received"); - return GST_FLOW_OK; - } collision: { RTP_SESSION_UNLOCK (sess); - gst_buffer_unref (buffer); - clean_arrival_stats (&arrival); + clean_packet_info (&pinfo); GST_DEBUG ("ignoring packet because its collisioning"); return GST_FLOW_OK; } @@ -1771,7 +2027,7 @@ collision: static void rtp_session_process_rb (RTPSession * sess, RTPSource * source, - GstRTCPPacket * packet, RTPArrivalStats * arrival) + GstRTCPPacket * packet, RTPPacketInfo * pinfo) { guint count, i; @@ -1797,7 +2053,7 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source, * the sender of the RTCP message. We could also compare our stats against * the other sender to see if we are better or worse. */ /* FIXME, need to keep track who the RB block is from */ - rtp_source_process_rb (source, arrival->ntpnstime, fractionlost, + rtp_source_process_rb (source, pinfo->ntpnstime, fractionlost, packetslost, exthighestseq, jitter, lsr, dlsr); } } @@ -1815,7 +2071,7 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source, */ static void rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, - RTPArrivalStats * arrival, gboolean * do_sync) + RTPPacketInfo * pinfo, gboolean * do_sync) { guint32 senderssrc, rtptime, packet_count, octet_count; guint64 ntptime; @@ -1826,12 +2082,16 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, &packet_count, &octet_count); GST_DEBUG ("got SR packet: SSRC %08x, time %" GST_TIME_FORMAT, - senderssrc, GST_TIME_ARGS (arrival->current_time)); + senderssrc, GST_TIME_ARGS (pinfo->current_time)); - source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + source = obtain_source (sess, senderssrc, &created, pinfo, FALSE); if (!source) return; + /* skip non-bye packets for sources that are marked BYE */ + if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source)) + goto out; + /* don't try to do lip-sync for sources that sent a BYE */ if (RTP_SOURCE_IS_MARKED_BYE (source)) *do_sync = FALSE; @@ -1841,7 +2101,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, prevsender = RTP_SOURCE_IS_SENDER (source); /* first update the source */ - rtp_source_process_sr (source, arrival->current_time, ntptime, rtptime, + rtp_source_process_sr (source, pinfo->current_time, ntptime, rtptime, packet_count, octet_count); source_update_sender (sess, source, prevsender); @@ -1849,7 +2109,9 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, if (created) on_new_ssrc (sess, source); - rtp_session_process_rb (sess, source, packet, arrival); + rtp_session_process_rb (sess, source, packet, pinfo); + +out: g_object_unref (source); } @@ -1861,7 +2123,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, */ static void rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, - RTPArrivalStats * arrival) + RTPPacketInfo * pinfo) { guint32 senderssrc; RTPSource *source; @@ -1871,21 +2133,27 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, GST_DEBUG ("got RR packet: SSRC %08x", senderssrc); - source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + source = obtain_source (sess, senderssrc, &created, pinfo, FALSE); if (!source) return; + /* skip non-bye packets for sources that are marked BYE */ + if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source)) + goto out; + if (created) on_new_ssrc (sess, source); - rtp_session_process_rb (sess, source, packet, arrival); + rtp_session_process_rb (sess, source, packet, pinfo); + +out: g_object_unref (source); } /* Get SDES items and store them in the SSRC */ static void rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, - RTPArrivalStats * arrival) + RTPPacketInfo * pinfo) { guint items, i, j; gboolean more_items, more_entries; @@ -1908,10 +2176,14 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, changed = FALSE; /* find src, no probation when dealing with RTCP */ - source = obtain_source (sess, ssrc, &created, arrival, FALSE); + source = obtain_source (sess, ssrc, &created, pinfo, FALSE); if (!source) return; + /* skip non-bye packets for sources that are marked BYE */ + if (sess->scheduled_bye && RTP_SOURCE_IS_MARKED_BYE (source)) + goto next; + sdes = gst_structure_new_empty ("application/x-rtp-source-sdes"); more_entries = gst_rtcp_packet_sdes_first_entry (packet); @@ -1963,6 +2235,7 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, if (changed) on_ssrc_sdes (sess, source); + next: g_object_unref (source); more_items = gst_rtcp_packet_sdes_next_item (packet); @@ -1974,7 +2247,7 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, */ static void rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, - RTPArrivalStats * arrival) + RTPPacketInfo * pinfo) { guint count, i; gchar *reason; @@ -1994,7 +2267,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, GST_DEBUG ("SSRC: %08x", ssrc); /* find src and mark bye, no probation when dealing with RTCP */ - source = obtain_source (sess, ssrc, &created, arrival, FALSE); + source = obtain_source (sess, ssrc, &created, pinfo, FALSE); if (!source) return; @@ -2005,7 +2278,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, } /* store time for when we need to time out this source */ - source->bye_time = arrival->current_time; + source->bye_time = pinfo->current_time; prevactive = RTP_SOURCE_IS_ACTIVE (source); prevsender = RTP_SOURCE_IS_SENDER (source); @@ -2025,17 +2298,17 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, * Perform reverse reconsideration but only when we are not scheduling a * BYE ourselves. */ if (sess->next_rtcp_check_time != GST_CLOCK_TIME_NONE && - arrival->current_time < sess->next_rtcp_check_time) { + pinfo->current_time < sess->next_rtcp_check_time) { GstClockTime time_remaining; - time_remaining = sess->next_rtcp_check_time - arrival->current_time; + time_remaining = sess->next_rtcp_check_time - pinfo->current_time; sess->next_rtcp_check_time = gst_util_uint64_scale (time_remaining, members, pmembers); GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_rtcp_check_time)); - sess->next_rtcp_check_time += arrival->current_time; + sess->next_rtcp_check_time += pinfo->current_time; /* mark pending reconsider. We only want to signal the reconsideration * once after we handled all the source in the bye packet */ @@ -2062,7 +2335,7 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, static void rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, - RTPArrivalStats * arrival) + RTPPacketInfo * pinfo) { GST_DEBUG ("received APP"); } @@ -2084,7 +2357,7 @@ rtp_session_request_local_key_unit (RTPSession * sess, RTPSource * src, "RTT (%" GST_TIME_FORMAT " < %" GST_TIME_FORMAT ")", fir ? "FIR" : "PLI", GST_TIME_ARGS (current_time - sess->last_keyframe_request), - GST_TIME_ARGS (round_trip_in_ns));; + GST_TIME_ARGS (round_trip_in_ns)); return FALSE; } } @@ -2113,10 +2386,12 @@ rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc, return; src = find_source (sess, sender_ssrc); - if (!src) + if (src == NULL) return; rtp_session_request_local_key_unit (sess, src, FALSE, current_time); + + src->stats.recv_pli_count++; } static void @@ -2161,6 +2436,9 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc, ssrc = GST_READ_UINT32_BE (data); own = find_source (sess, ssrc); + if (own == NULL) + continue; + if (own->internal) { our_request = TRUE; break; @@ -2170,6 +2448,7 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc, return; rtp_session_request_local_key_unit (sess, src, TRUE, current_time); + src->stats.recv_fir_count++; } static void @@ -2177,6 +2456,8 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc, guint32 media_ssrc, guint8 * fci_data, guint fci_length, GstClockTime current_time) { + sess->stats.nacks_received++; + if (!sess->callbacks.notify_nack) return; @@ -2186,10 +2467,10 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc, seqnum = GST_READ_UINT16_BE (fci_data); blp = GST_READ_UINT16_BE (fci_data + 2); - GST_DEBUG ("NACK #%u, blp %04x", seqnum, blp); + GST_DEBUG ("NACK #%u, blp %04x, SSRC 0x%08x", seqnum, blp, media_ssrc); RTP_SESSION_UNLOCK (sess); - sess->callbacks.notify_nack (sess, seqnum, blp, + sess->callbacks.notify_nack (sess, seqnum, blp, media_ssrc, sess->notify_nack_user_data); RTP_SESSION_LOCK (sess); @@ -2200,7 +2481,7 @@ rtp_session_process_nack (RTPSession * sess, guint32 sender_ssrc, static void rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, - RTPArrivalStats * arrival, GstClockTime current_time) + RTPPacketInfo * pinfo, GstClockTime current_time) { GstRTCPType type = gst_rtcp_packet_get_type (packet); GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet); @@ -2210,6 +2491,12 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet); RTPSource *src; + src = find_source (sess, media_ssrc); + + /* skip non-bye packets for sources that are marked BYE */ + if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src)) + return; + GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of " "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length); @@ -2221,7 +2508,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, fci_buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY, fci_data - packet->rtcp->map.data, fci_length); - GST_BUFFER_TIMESTAMP (fci_buffer) = arrival->running_time; + GST_BUFFER_TIMESTAMP (fci_buffer) = pinfo->running_time; } RTP_SESSION_UNLOCK (sess); @@ -2233,15 +2520,11 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, gst_buffer_unref (fci_buffer); } - src = find_source (sess, media_ssrc); - if (!src) - return; - - if (sess->rtcp_feedback_retention_window) { - rtp_source_retain_rtcp_packet (src, packet, arrival->running_time); + if (src && sess->rtcp_feedback_retention_window) { + rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time); } - if (src->internal || + if ((src && src->internal) || /* PSFB FIR puts the media ssrc inside the FCI */ (type == GST_RTCP_TYPE_PSFB && fbtype == GST_RTCP_PSFB_TYPE_FIR)) { switch (type) { @@ -2292,7 +2575,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, { GstRTCPPacket packet; gboolean more, is_bye = FALSE, do_sync = FALSE; - RTPArrivalStats arrival = { NULL, }; + RTPPacketInfo pinfo = { 0, }; GstFlowReturn result = GST_FLOW_OK; GstRTCPBuffer rtcp = { NULL, }; @@ -2304,10 +2587,13 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, GST_DEBUG ("received RTCP packet"); + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_RECEIVING_RTCP], 0, + buffer); + RTP_SESSION_LOCK (sess); - /* update arrival stats */ - update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1, - ntpnstime); + /* update pinfo stats */ + update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time, + -1, ntpnstime); /* start processing the compound packet */ gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp); @@ -2317,40 +2603,33 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, type = gst_rtcp_packet_get_type (&packet); - /* when we are leaving the session, we should ignore all non-BYE messages */ - if (sess->scheduled_bye && type != GST_RTCP_TYPE_BYE) { - GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving"); - goto next; - } - switch (type) { case GST_RTCP_TYPE_SR: - rtp_session_process_sr (sess, &packet, &arrival, &do_sync); + rtp_session_process_sr (sess, &packet, &pinfo, &do_sync); break; case GST_RTCP_TYPE_RR: - rtp_session_process_rr (sess, &packet, &arrival); + rtp_session_process_rr (sess, &packet, &pinfo); break; case GST_RTCP_TYPE_SDES: - rtp_session_process_sdes (sess, &packet, &arrival); + rtp_session_process_sdes (sess, &packet, &pinfo); break; case GST_RTCP_TYPE_BYE: is_bye = TRUE; /* don't try to attempt lip-sync anymore for streams with a BYE */ do_sync = FALSE; - rtp_session_process_bye (sess, &packet, &arrival); + rtp_session_process_bye (sess, &packet, &pinfo); break; case GST_RTCP_TYPE_APP: - rtp_session_process_app (sess, &packet, &arrival); + rtp_session_process_app (sess, &packet, &pinfo); break; case GST_RTCP_TYPE_RTPFB: case GST_RTCP_TYPE_PSFB: - rtp_session_process_feedback (sess, &packet, &arrival, current_time); + rtp_session_process_feedback (sess, &packet, &pinfo, current_time); break; default: GST_WARNING ("got unknown RTCP packet"); break; } - next: more = gst_rtcp_packet_move_to_next (&packet); } @@ -2358,20 +2637,20 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, /* if we are scheduling a BYE, we only want to count bye packets, else we * count everything */ - if (sess->scheduled_bye) { - if (is_bye) { - sess->stats.bye_members++; - UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes); - } - } else { - /* keep track of average packet size */ - UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes); + if (sess->scheduled_bye && is_bye) { + sess->bye_stats.bye_members++; + UPDATE_AVG (sess->bye_stats.avg_rtcp_packet_size, pinfo.bytes); } + + /* keep track of average packet size */ + UPDATE_AVG (sess->stats.avg_rtcp_packet_size, pinfo.bytes); + GST_DEBUG ("%p, received RTCP packet, avg size %u, %u", &sess->stats, - sess->stats.avg_rtcp_packet_size, arrival.bytes); + sess->stats.avg_rtcp_packet_size, pinfo.bytes); RTP_SESSION_UNLOCK (sess); - clean_arrival_stats (&arrival); + pinfo.data = NULL; + clean_packet_info (&pinfo); /* notify caller of sr packets in the callback */ if (do_sync && sess->callbacks.sync_rtcp) { @@ -2416,11 +2695,20 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) gboolean created; RTP_SESSION_LOCK (sess); - source = obtain_internal_source (sess, ssrc, &created); + source = obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE); if (source) { rtp_source_update_caps (source, caps); g_object_unref (source); } + + if (gst_structure_get_uint (s, "rtx-ssrc", &ssrc)) { + source = + obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE); + if (source) { + rtp_source_update_caps (source, caps); + g_object_unref (source); + } + } RTP_SESSION_UNLOCK (sess); } } @@ -2446,9 +2734,7 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, RTPSource *source; gboolean prevsender; guint64 oldrate; - GstBuffer *buffer; - GstRTPBuffer rtp = { NULL }; - guint32 ssrc; + RTPPacketInfo pinfo = { 0, }; gboolean created; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -2456,35 +2742,18 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet"); - if (is_list) { - GstBufferList *list = GST_BUFFER_LIST_CAST (data); - - buffer = gst_buffer_list_get (list, 0); - if (!buffer) - goto no_buffer; - } else { - buffer = GST_BUFFER_CAST (data); - } - - if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) - goto invalid_packet; - - /* get SSRC and look up in session database */ - ssrc = gst_rtp_buffer_get_ssrc (&rtp); - - gst_rtp_buffer_unmap (&rtp); - RTP_SESSION_LOCK (sess); - source = obtain_internal_source (sess, ssrc, &created); + if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data, + current_time, running_time, -1)) + goto invalid_packet; - /* update last activity */ - source->last_rtp_activity = current_time; + source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time); prevsender = RTP_SOURCE_IS_SENDER (source); oldrate = source->bitrate; /* we use our own source to send */ - result = rtp_source_send_rtp (source, data, is_list, running_time); + result = rtp_source_send_rtp (source, &pinfo); source_update_sender (sess, source, prevsender); @@ -2493,21 +2762,17 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, RTP_SESSION_UNLOCK (sess); g_object_unref (source); + clean_packet_info (&pinfo); return result; invalid_packet: { gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + RTP_SESSION_UNLOCK (sess); GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } -no_buffer: - { - gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); - GST_DEBUG ("no buffer in list"); - return GST_FLOW_OK; - } } static void @@ -2522,6 +2787,7 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, gboolean first) { GstClockTime result; + RTPSessionStats *stats; /* recalculate bandwidth when it changed */ if (sess->recalc_bandwidth) { @@ -2535,9 +2801,8 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) add_bitrates, &bandwidth); - bandwidth /= 8.0; } - if (bandwidth < 8000) + if (bandwidth < RTP_STATS_BANDWIDTH) bandwidth = RTP_STATS_BANDWIDTH; rtp_stats_set_bandwidths (&sess->stats, bandwidth, @@ -2547,17 +2812,19 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, } if (sess->scheduled_bye) { - result = rtp_stats_calculate_bye_interval (&sess->stats); + stats = &sess->bye_stats; + result = rtp_stats_calculate_bye_interval (stats); } else { - result = rtp_stats_calculate_rtcp_interval (&sess->stats, - sess->stats.internal_sender_sources > 0, first); + stats = &sess->stats; + result = rtp_stats_calculate_rtcp_interval (stats, + stats->internal_sender_sources > 0, first); } GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT ", first %d", GST_TIME_ARGS (result), first); if (!deterministic && result != GST_CLOCK_TIME_NONE) - result = rtp_stats_add_rtcp_jitter (&sess->stats, result); + result = rtp_stats_add_rtcp_jitter (stats, result); GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result)); @@ -2605,8 +2872,9 @@ rtp_session_schedule_bye_locked (RTPSession * sess, GstClockTime current_time) /* we schedule BYE now */ sess->scheduled_bye = TRUE; /* at least one member wants to send a BYE */ - INIT_AVG (sess->stats.avg_rtcp_packet_size, 100); - sess->stats.bye_members = 1; + memcpy (&sess->bye_stats, &sess->stats, sizeof (RTPSessionStats)); + INIT_AVG (sess->bye_stats.avg_rtcp_packet_size, 100); + sess->bye_stats.bye_members = 1; sess->first_rtcp = TRUE; sess->allow_early = TRUE; @@ -2644,7 +2912,7 @@ done: GstFlowReturn rtp_session_schedule_bye (RTPSession * sess, GstClockTime current_time) { - GstFlowReturn result = GST_FLOW_OK; + GstFlowReturn result; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -2675,6 +2943,7 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time) RTP_SESSION_LOCK (sess); if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) { + GST_DEBUG ("have early rtcp time"); result = sess->next_early_rtcp_time; goto early_exit; } @@ -2693,7 +2962,7 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time) } if (sess->scheduled_bye) { - if (sess->stats.active_sources >= 50) { + if (sess->bye_stats.active_sources >= 50) { GST_DEBUG ("reconsider BYE, more than 50 sources"); /* reconsider BYE if members >= 50 */ interval = calculate_rtcp_interval (sess, FALSE, TRUE); @@ -2753,6 +3022,7 @@ typedef struct gboolean is_early; gboolean may_suppress; GQueue output; + guint nacked_seqnums; } ReportData; static void @@ -2812,15 +3082,21 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) return; } - /* only report about other sender */ - if (source == data->source) - goto reported; + if (g_hash_table_contains (source->reported_in_sr_of, + GUINT_TO_POINTER (data->source->ssrc))) { + GST_DEBUG ("source %08x already reported in this generation", source->ssrc); + return; + } if (gst_rtcp_packet_get_rb_count (packet) == GST_RTCP_MAX_RB_COUNT) { GST_DEBUG ("max RB count reached"); return; } + /* only report about other sender */ + if (source == data->source) + goto reported; + if (!RTP_SOURCE_IS_SENDER (source)) { GST_DEBUG ("source %08x not sender", source->ssrc); goto reported; @@ -2846,14 +3122,8 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) exthighestseq, jitter, lsr, dlsr); reported: - /* source is reported, move to next generation */ - source->generation = sess->generation + 1; - - /* if we reported all sources in this generation, move to next */ - if (--data->num_to_report == 0) { - sess->generation++; - GST_DEBUG ("all reported, generation now %u", sess->generation); - } + g_hash_table_add (source->reported_in_sr_of, + GUINT_TO_POINTER (data->source->ssrc)); } /* construct FIR */ @@ -2881,6 +3151,7 @@ session_add_fir (const gchar * key, RTPSource * source, ReportData * data) fci_data[1] = fci_data[2] = fci_data[3] = 0; source->send_fir = FALSE; + source->stats.sent_fir_count++; } static void @@ -2949,6 +3220,8 @@ session_pli (const gchar * key, RTPSource * source, ReportData * data) source->send_pli = FALSE; data->may_suppress = FALSE; + + source->stats.sent_pli_count++; } /* construct NACK */ @@ -2982,6 +3255,7 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) for (i = 0; i < n_nacks; i++) { GST_WRITE_UINT32_BE (fci_data, nacks[i]); fci_data += 4; + data->nacked_seqnums++; } rtp_source_clear_nacks (source); @@ -3006,8 +3280,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) if (source->internal) { GST_DEBUG ("Timing out collisions for %x", source->ssrc); rtp_source_timeout (source, data->current_time, - /* "a relatively long time" -- RFC 3550 section 8.2 */ - RTP_STATS_MIN_INTERVAL * GST_SECOND * 10, data->running_time - sess->rtcp_feedback_retention_window); } @@ -3041,26 +3313,41 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) GST_LOG ("timeout base interval %" GST_TIME_FORMAT, GST_TIME_ARGS (binterval)); - if (!source->internal) { - if (source->marked_bye) { - /* if we received a BYE from the source, remove the source after some - * time. */ - if (data->current_time > source->bye_time && - data->current_time - source->bye_time > sess->stats.bye_timeout) { - GST_DEBUG ("removing BYE source %08x", source->ssrc); - remove = TRUE; - byetimeout = TRUE; - } + if (!source->internal && source->marked_bye) { + /* if we received a BYE from the source, remove the source after some + * time. */ + if (data->current_time > source->bye_time && + data->current_time - source->bye_time > sess->stats.bye_timeout) { + GST_DEBUG ("removing BYE source %08x", source->ssrc); + remove = TRUE; + byetimeout = TRUE; } - /* sources that were inactive for more than 5 times the deterministic reporting - * interval get timed out. the min timeout is 5 seconds. */ - /* mind old time that might pre-date last time going to PLAYING */ - btime = MAX (source->last_activity, sess->start_time); - if (data->current_time > btime) { - interval = MAX (binterval * 5, 5 * GST_SECOND); - if (data->current_time - btime > interval) { - GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT, - source->ssrc, GST_TIME_ARGS (btime)); + } + + if (source->internal && source->sent_bye) { + GST_DEBUG ("removing internal source that has sent BYE %08x", source->ssrc); + remove = TRUE; + } + + /* sources that were inactive for more than 5 times the deterministic reporting + * interval get timed out. the min timeout is 5 seconds. */ + /* mind old time that might pre-date last time going to PLAYING */ + btime = MAX (source->last_activity, sess->start_time); + if (data->current_time > btime) { + interval = MAX (binterval * 5, 5 * GST_SECOND); + if (data->current_time - btime > interval) { + GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT, + source->ssrc, GST_TIME_ARGS (btime)); + if (source->internal) { + /* this is an internal source that is not using our suggested ssrc. + * since there must be another source using this ssrc, we can remove + * this one instead of making it a receiver forever */ + if (source->ssrc != sess->suggested_ssrc) { + rtp_source_mark_bye (source, "timed out"); + /* do not schedule bye here, since we are inside the RTCP timeout + * processing and scheduling bye will interfere with SR/RR sending */ + } + } else { remove = TRUE; } } @@ -3074,16 +3361,9 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) if (data->current_time > btime) { interval = MAX (binterval * 2, 5 * GST_SECOND); if (data->current_time - btime > interval) { - if (source->internal && source->sent_bye) { - /* an internal source is BYE and stopped sending RTP, remove */ - GST_DEBUG ("internal BYE source %08x timed out, last %" - GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime)); - remove = TRUE; - } else { - GST_DEBUG ("sender source %08x timed out and became receiver, last %" - GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime)); - sendertimeout = TRUE; - } + GST_DEBUG ("sender source %08x timed out and became receiver, last %" + GST_TIME_FORMAT, source->ssrc, GST_TIME_ARGS (btime)); + sendertimeout = TRUE; } } } @@ -3208,15 +3488,26 @@ make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data) static gboolean is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) { - GstClockTime new_send_time, elapsed; + GstClockTime new_send_time; + GstClockTime interval; + RTPSessionStats *stats; + + if (sess->scheduled_bye) + stats = &sess->bye_stats; + else + stats = &sess->stats; if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) data->is_early = TRUE; else data->is_early = FALSE; - if (data->is_early && sess->next_early_rtcp_time < current_time) + if (data->is_early && sess->next_early_rtcp_time < current_time) { + GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %" + GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time), + GST_TIME_ARGS (current_time)); goto early; + } /* no need to check yet */ if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE || @@ -3227,62 +3518,70 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) return FALSE; } - /* get elapsed time since we last reported */ - elapsed = current_time - sess->last_rtcp_send_time; +early: - new_send_time = data->interval; - /* perform forward reconsideration */ - if (new_send_time != GST_CLOCK_TIME_NONE) { - new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, new_send_time); + /* take interval and add jitter */ + interval = data->interval; + if (interval != GST_CLOCK_TIME_NONE) + interval = rtp_stats_add_rtcp_jitter (stats, interval); - GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %" - GST_TIME_FORMAT, GST_TIME_ARGS (new_send_time), - GST_TIME_ARGS (elapsed)); + if (sess->last_rtcp_send_time != GST_CLOCK_TIME_NONE) { + /* perform forward reconsideration */ + if (interval != GST_CLOCK_TIME_NONE) { + GstClockTime elapsed; - new_send_time += sess->last_rtcp_send_time; - } + /* get elapsed time since we last reported */ + elapsed = current_time - sess->last_rtcp_send_time; - /* check if reconsideration */ - if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) { - GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT, - GST_TIME_ARGS (new_send_time)); - /* store new check time */ - sess->next_rtcp_check_time = new_send_time; - return FALSE; + GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT ", elapsed %" + GST_TIME_FORMAT, GST_TIME_ARGS (interval), GST_TIME_ARGS (elapsed)); + new_send_time = interval + sess->last_rtcp_send_time; + } else { + new_send_time = sess->last_rtcp_send_time; + } + } else { + /* If this is the first RTCP packet, we can reconsider anything based + * on the last RTCP send time because there was none. + */ + g_warn_if_fail (!data->is_early); + data->is_early = FALSE; + new_send_time = current_time; } -early: - - new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE); - - GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT, - GST_TIME_ARGS (new_send_time)); - - sess->next_rtcp_check_time = new_send_time; - if (new_send_time != GST_CLOCK_TIME_NONE) { - sess->next_rtcp_check_time += current_time; - + if (!data->is_early) { + /* check if reconsideration */ + if (new_send_time == GST_CLOCK_TIME_NONE || current_time < new_send_time) { + GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_send_time)); + /* store new check time */ + sess->next_rtcp_check_time = new_send_time; + return FALSE; + } + sess->next_rtcp_check_time = current_time + interval; + } else if (interval != GST_CLOCK_TIME_NONE) { /* Apply the rules from RFC 4585 section 3.5.3 */ - if (sess->stats.min_interval != 0 && !sess->first_rtcp) { + if (stats->min_interval != 0 && !sess->first_rtcp) { GstClockTime T_rr_current_interval = - g_random_double_range (0.5, 1.5) * sess->stats.min_interval; + g_random_double_range (0.5, 1.5) * stats->min_interval; /* This will caused the RTCP to be suppressed if no FB packets are added */ - if (sess->last_rtcp_send_time + T_rr_current_interval > - sess->next_rtcp_check_time) { + if (sess->last_rtcp_send_time + T_rr_current_interval > new_send_time) { GST_DEBUG ("RTCP packet could be suppressed min: %" GST_TIME_FORMAT " last: %" GST_TIME_FORMAT " + T_rr_current_interval: %" GST_TIME_FORMAT - " > sess->next_rtcp_check_time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (sess->stats.min_interval), + " > new_send_time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (stats->min_interval), GST_TIME_ARGS (sess->last_rtcp_send_time), GST_TIME_ARGS (T_rr_current_interval), - GST_TIME_ARGS (sess->next_rtcp_check_time)); + GST_TIME_ARGS (new_send_time)); data->may_suppress = TRUE; } } } + GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_send_time)); + return TRUE; } @@ -3320,6 +3619,10 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) if (!source->internal || source->sent_bye) return; + /* ignore other sources when we do the timeout after a scheduled BYE */ + if (sess->scheduled_bye && !source->marked_bye) + return; + data->source = source; /* open packet */ @@ -3359,6 +3662,28 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) g_queue_push_tail (&data->output, output); } +static void +update_generation (const gchar * key, RTPSource * source, ReportData * data) +{ + RTPSession *sess = data->sess; + + if (g_hash_table_size (source->reported_in_sr_of) >= + sess->stats.internal_sources) { + /* source is reported, move to next generation */ + source->generation = sess->generation + 1; + g_hash_table_remove_all (source->reported_in_sr_of); + + GST_LOG ("reported source %x, new generation: %d", source->ssrc, + source->generation); + + /* if we reported all sources in this generation, move to next */ + if (--data->num_to_report == 0) { + sess->generation++; + GST_DEBUG ("all reported, generation now %u", sess->generation); + } + } +} + /** * rtp_session_on_timeout: * @sess: an #RTPSession @@ -3398,21 +3723,28 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, data.running_time = running_time; data.num_to_report = 0; data.may_suppress = FALSE; + data.nacked_seqnums = 0; g_queue_init (&data.output); RTP_SESSION_LOCK (sess); /* get a new interval, we need this for various cleanups etc */ data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp); + GST_DEBUG ("interval %" GST_TIME_FORMAT, GST_TIME_ARGS (data.interval)); + /* we need an internal source now */ if (sess->stats.internal_sources == 0) { RTPSource *source; gboolean created; - source = obtain_internal_source (sess, sess->suggested_ssrc, &created); + source = obtain_internal_source (sess, sess->suggested_ssrc, &created, + current_time); g_object_unref (source); } + sess->conflicting_addresses = + timeout_conflicting_addresses (sess->conflicting_addresses, current_time); + /* Make a local copy of the hashtable. We need to do this because the * cleanup stage below releases the session lock. */ table_copy = g_hash_table_new_full (NULL, NULL, NULL, @@ -3429,23 +3761,36 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time, g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx], (GHRFunc) remove_closing_sources, &data); + /* update point-to-point status */ + session_update_ptp (sess); + /* see if we need to generate SR or RR packets */ if (!is_rtcp_time (sess, current_time, &data)) goto done; - GST_DEBUG ("doing RTCP generation %u for %u sources", sess->generation, - data.num_to_report); + GST_DEBUG ("doing RTCP generation %u for %u sources, early %d", + sess->generation, data.num_to_report, data.is_early); /* generate RTCP for all internal sources */ g_hash_table_foreach (sess->ssrcs[sess->mask_idx], (GHFunc) generate_rtcp, &data); + /* update the generation for all the sources that have been reported */ + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) update_generation, &data); + /* we keep track of the last report time in order to timeout inactive * receivers or senders */ if (!data.is_early && !data.may_suppress) sess->last_rtcp_send_time = data.current_time; sess->first_rtcp = FALSE; sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; + sess->scheduled_bye = FALSE; + + /* RFC 4585 section 3.5.2 step 6 */ + if (!data.is_early) { + sess->allow_early = TRUE; + } done: RTP_SESSION_UNLOCK (sess); @@ -3471,10 +3816,12 @@ done: result = sess->callbacks.send_rtcp (sess, source, buffer, output->is_bye, sess->send_rtcp_user_data); + sess->stats.nacks_sent += data.nacked_seqnums; } else { GST_DEBUG ("freeing packet callback: %p" " do_not_suppress: %d may_suppress: %d", sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress); + sess->stats.nacks_dropped += data.nacked_seqnums; gst_buffer_unref (buffer); } g_object_unref (source); @@ -3490,12 +3837,15 @@ done: * @max_delay: maximum delay * * Request transmission of early RTCP + * + * Returns: %TRUE if the related RTCP can be scheduled. */ -void +gboolean rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, GstClockTime max_delay) { - GstClockTime T_dither_max; + GstClockTime T_dither_max, T_rr; + gboolean ret; /* Implements the algorithm described in RFC 4585 section 3.5.2 */ @@ -3503,39 +3853,92 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, /* Check if already requested */ /* RFC 4585 section 3.5.2 step 2 */ - if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) - goto dont_send; + if (GST_CLOCK_TIME_IS_VALID (sess->next_early_rtcp_time)) { + GST_LOG_OBJECT (sess, "already have next early rtcp time"); + ret = TRUE; + goto end; + } + + if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) { + GST_LOG_OBJECT (sess, "no next RTCP check time"); + ret = FALSE; + goto end; + } - if (!GST_CLOCK_TIME_IS_VALID (sess->next_rtcp_check_time)) - goto dont_send; + /* RFC 4585 section 3.5.3 step 1 + * If no regular RTCP packet has been sent before, then a regular + * RTCP packet has to be scheduled first and FB messages might be + * included there + */ + if (!GST_CLOCK_TIME_IS_VALID (sess->last_rtcp_send_time)) { + GST_LOG_OBJECT (sess, "no RTCP sent yet"); + + if (current_time + max_delay > sess->next_rtcp_check_time) { + GST_LOG_OBJECT (sess, + "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time), + GST_TIME_ARGS (max_delay), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = TRUE; + } else { + GST_LOG_OBJECT (sess, + "can't allow early feedback, next scheduled time is too late %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT, + GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = FALSE; + } + goto end; + } - /* Ignore the request a scheduled packet will be in time anyway */ - if (current_time + max_delay > sess->next_rtcp_check_time) - goto dont_send; + T_rr = sess->next_rtcp_check_time - sess->last_rtcp_send_time; /* RFC 4585 section 3.5.2 step 2b */ /* If the total sources is <=2, then there is only us and one peer */ - if (sess->total_sources <= 2) { + /* When there is one auxiliary stream the session can still do point + * to point. + */ + if (sess->is_doing_ptp) { T_dither_max = 0; } else { /* Divide by 2 because l = 0.5 */ - T_dither_max = sess->next_rtcp_check_time - sess->last_rtcp_send_time; + T_dither_max = T_rr; T_dither_max /= 2; } /* RFC 4585 section 3.5.2 step 3 */ - if (current_time + T_dither_max > sess->next_rtcp_check_time) - goto dont_send; - - /* RFC 4585 section 3.5.2 step 4 - * Don't send if allow_early is FALSE, but not if we are in - * immediate mode, meaning we are part of a group of at most the - * application-specific threshold. - */ - if (sess->total_sources > sess->rtcp_immediate_feedback_threshold && - sess->allow_early == FALSE) - goto dont_send; + if (current_time + T_dither_max > sess->next_rtcp_check_time) { + GST_LOG_OBJECT (sess, + "don't send because of dither, next scheduled time is soon %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " > %" GST_TIME_FORMAT, + GST_TIME_ARGS (current_time), GST_TIME_ARGS (T_dither_max), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = TRUE; + goto end; + } + + /* RFC 4585 section 3.5.2 step 4a */ + if (sess->allow_early == FALSE) { + /* Ignore the request a scheduled packet will be in time anyway */ + if (current_time + max_delay > sess->next_rtcp_check_time) { + GST_LOG_OBJECT (sess, + "next scheduled time is soon %" GST_TIME_FORMAT " + %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (current_time), + GST_TIME_ARGS (max_delay), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = TRUE; + } else { + GST_LOG_OBJECT (sess, + "can't allow early feedback, next scheduled time is too late %" + GST_TIME_FORMAT " + %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT, + GST_TIME_ARGS (current_time), GST_TIME_ARGS (max_delay), + GST_TIME_ARGS (sess->next_rtcp_check_time)); + ret = FALSE; + } + goto end; + } + /* RFC 4585 section 3.5.2 step 4b */ if (T_dither_max) { /* Schedule an early transmission later */ sess->next_early_rtcp_time = g_random_double () * T_dither_max + @@ -3545,6 +3948,18 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, sess->next_early_rtcp_time = current_time; } + /* RFC 4585 section 3.5.2 step 6 */ + sess->allow_early = FALSE; + /* Delay next regular RTCP packet to not exceed the short-term + * RTCP bandwidth when using early feedback as compared to + * without */ + sess->next_rtcp_check_time = sess->last_rtcp_send_time + 2 * T_rr; + sess->last_rtcp_send_time += T_rr; + + GST_LOG_OBJECT (sess, "next early RTCP time %" GST_TIME_FORMAT + ", next regular RTCP time %" GST_TIME_FORMAT, + GST_TIME_ARGS (sess->next_early_rtcp_time), + GST_TIME_ARGS (sess->next_rtcp_check_time)); RTP_SESSION_UNLOCK (sess); /* notify app of need to send packet early @@ -3552,34 +3967,43 @@ rtp_session_request_early_rtcp (RTPSession * sess, GstClockTime current_time, if (sess->callbacks.reconsider) sess->callbacks.reconsider (sess, sess->reconsider_user_data); - return; + return TRUE; -dont_send: +end: RTP_SESSION_UNLOCK (sess); + + return ret; } -static void +static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) { GstClockTime now; if (!sess->callbacks.send_rtcp) - return; + return FALSE; now = sess->callbacks.request_time (sess, sess->request_time_user_data); - rtp_session_request_early_rtcp (sess, now, max_delay); + return rtp_session_request_early_rtcp (sess, now, max_delay); } gboolean rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, gboolean fir, gint count) { - RTPSource *src = find_source (sess, ssrc); + RTPSource *src; - if (!src) + if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) { + GST_DEBUG ("FIR/PLI not sent"); return FALSE; + } + + RTP_SESSION_LOCK (sess); + src = find_source (sess, ssrc); + if (src == NULL) + goto no_source; if (fir) { src->send_pli = FALSE; @@ -3591,10 +4015,16 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, } else if (!src->send_fir) { src->send_pli = TRUE; } - - rtp_session_send_rtcp (sess, 200 * GST_MSECOND); + RTP_SESSION_UNLOCK (sess); return TRUE; + + /* ERRORS */ +no_source: + { + RTP_SESSION_UNLOCK (sess); + return FALSE; + } } /** @@ -3612,15 +4042,28 @@ gboolean rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum, GstClockTime max_delay) { - RTPSource *source = find_source (sess, ssrc); + RTPSource *source; - if (source == NULL) + if (!rtp_session_send_rtcp (sess, max_delay)) { + GST_DEBUG ("NACK not sent"); return FALSE; + } + + RTP_SESSION_LOCK (sess); + source = find_source (sess, ssrc); + if (source == NULL) + goto no_source; GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum); rtp_source_register_nack (source, seqnum); - - rtp_session_send_rtcp (sess, max_delay); + RTP_SESSION_UNLOCK (sess); return TRUE; + + /* ERRORS */ +no_source: + { + RTP_SESSION_UNLOCK (sess); + return FALSE; + } }