X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Frtpsession.c;h=fefa259d4f22ac250263381682e6475ec5f7d697;hb=67b5edfebbc46893eae578ace518abfdaac8e3c7;hp=3364af23293e289ddb6841087122809a91a5154e;hpb=53ec444963e273376c7a96865dd6b32e1e8fc65e;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 3364af2..fefa259 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -54,6 +54,7 @@ enum SIGNAL_ON_RECEIVING_RTCP, SIGNAL_ON_NEW_SENDER_SSRC, SIGNAL_ON_SENDER_SSRC_ACTIVE, + SIGNAL_ON_SENDING_NACKS, LAST_SIGNAL }; @@ -75,6 +76,7 @@ enum #define DEFAULT_MAX_MISORDER_TIME 2000 #define DEFAULT_RTP_PROFILE GST_RTP_PROFILE_AVP #define DEFAULT_RTCP_REDUCED_SIZE FALSE +#define DEFAULT_RTCP_DISABLE_SR_TIMESTAMP FALSE enum { @@ -99,7 +101,8 @@ enum PROP_MAX_MISORDER_TIME, PROP_STATS, PROP_RTP_PROFILE, - PROP_RTCP_REDUCED_SIZE + PROP_RTCP_REDUCED_SIZE, + PROP_RTCP_DISABLE_SR_TIMESTAMP }; /* update average packet size */ @@ -121,6 +124,8 @@ static void rtp_session_get_property (GObject * object, guint prop_id, static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay); +static gboolean rtp_session_send_rtcp_with_deadline (RTPSession * sess, + GstClockTime deadline); static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; @@ -418,6 +423,39 @@ rtp_session_class_init (RTPSessionClass * klass) on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, RTP_TYPE_SOURCE); + /** + * RTPSession::on-sending-nack + * @session: the object which received the signal + * @sender_ssrc: the sender ssrc + * @media_ssrc: the media ssrc + * @nacks: (element-type guint16): the list of seqnum to be nacked + * @buffer: the #GstBuffer containing the RTCP packet about to be sent + * + * This signal is emitted before NACK packets are added into the RTCP + * packet. This signal can be used to override the conversion of the NACK + * seqnum array into packets. This can be used if your protocol uses + * different type of NACK (e.g. based on RTCP APP). + * + * The handler should transform the seqnum from @nacks array into packets. + * @nacks seqnum must be consumed from the start. The remaining will be + * rescheduled for later base on bandwidth. Only one handler will be + * signalled. + * + * A handler may return 0 to signal that generic NACKs should be created + * for this set. This can be useful if the signal is used for other purpose + * or if the other type of NACK would use more space. + * + * Returns: the number of NACK seqnum that was consumed from @nacks. + * + * Since: 1.16 + */ + rtp_session_signals[SIGNAL_ON_SENDING_NACKS] = + g_signal_new ("on-sending-nacks", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_sending_nacks), + g_signal_accumulator_first_wins, NULL, g_cclosure_marshal_generic, + G_TYPE_UINT, 4, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_ARRAY, + GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE); + g_object_class_install_property (gobject_class, PROP_INTERNAL_SSRC, g_param_spec_uint ("internal-ssrc", "Internal SSRC", "The internal SSRC used for the session (deprecated)", @@ -581,6 +619,21 @@ rtp_session_class_init (RTPSessionClass * klass) DEFAULT_RTCP_REDUCED_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * RTPSession::disable-sr-timestamp: + * + * Whether sender reports should be timestamped. + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, + PROP_RTCP_DISABLE_SR_TIMESTAMP, + g_param_spec_boolean ("disable-sr-timestamp", + "Disable Sender Report Timestamp", + "Whether sender reports should be timestamped", + DEFAULT_RTCP_DISABLE_SR_TIMESTAMP, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + klass->get_source_by_ssrc = GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); klass->send_rtcp = GST_DEBUG_FUNCPTR (rtp_session_send_rtcp); @@ -661,6 +714,7 @@ rtp_session_init (RTPSession * sess) DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD; sess->rtp_profile = DEFAULT_RTP_PROFILE; sess->reduced_size_rtcp = DEFAULT_RTCP_REDUCED_SIZE; + sess->timestamp_sender_reports = !DEFAULT_RTCP_DISABLE_SR_TIMESTAMP; sess->is_doing_ptp = TRUE; } @@ -722,16 +776,15 @@ rtp_session_create_sources (RTPSession * sess) static void create_source_stats (gpointer key, RTPSource * source, GValueArray * arr) { - GValue value = G_VALUE_INIT; + GValue *value; GstStructure *s; g_object_get (source, "stats", &s, NULL); - g_value_init (&value, GST_TYPE_STRUCTURE); - gst_value_set_structure (&value, s); - g_value_array_append (arr, &value); - gst_structure_free (s); - g_value_unset (&value); + g_value_array_append (arr, NULL); + value = g_value_array_get_nth (arr, arr->n_values - 1); + g_value_init (value, GST_TYPE_STRUCTURE); + g_value_take_boxed (value, s); } static GstStructure * @@ -822,6 +875,9 @@ rtp_session_set_property (GObject * object, guint prop_id, if (sess->callbacks.reconsider) sess->callbacks.reconsider (sess, sess->reconsider_user_data); break; + case PROP_RTCP_FEEDBACK_RETENTION_WINDOW: + sess->rtcp_feedback_retention_window = g_value_get_uint64 (value); + break; case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value); break; @@ -846,6 +902,9 @@ rtp_session_set_property (GObject * object, guint prop_id, case PROP_RTCP_REDUCED_SIZE: sess->reduced_size_rtcp = g_value_get_boolean (value); break; + case PROP_RTCP_DISABLE_SR_TIMESTAMP: + sess->timestamp_sender_reports = !g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -901,6 +960,9 @@ rtp_session_get_property (GObject * object, guint prop_id, case PROP_RTCP_MIN_INTERVAL: g_value_set_uint64 (value, sess->stats.min_interval * GST_SECOND); break; + case PROP_RTCP_FEEDBACK_RETENTION_WINDOW: + g_value_set_uint64 (value, sess->rtcp_feedback_retention_window); + break; case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold); break; @@ -922,6 +984,9 @@ rtp_session_get_property (GObject * object, guint prop_id, case PROP_RTCP_REDUCED_SIZE: g_value_set_boolean (value, sess->reduced_size_rtcp); break; + case PROP_RTCP_DISABLE_SR_TIMESTAMP: + g_value_set_boolean (value, !sess->timestamp_sender_reports); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1062,6 +1127,47 @@ rtp_session_new (void) } /** + * rtp_session_reset: + * @sess: an #RTPSession + * + * Reset the sources of @sess. + */ +void +rtp_session_reset (RTPSession * sess) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + /* remove all sources */ + g_hash_table_remove_all (sess->ssrcs[sess->mask_idx]); + sess->total_sources = 0; + sess->stats.sender_sources = 0; + sess->stats.internal_sender_sources = 0; + sess->stats.internal_sources = 0; + sess->stats.active_sources = 0; + + sess->generation = 0; + sess->first_rtcp = TRUE; + sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_check_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_send_time = GST_CLOCK_TIME_NONE; + sess->last_rtcp_interval = GST_CLOCK_TIME_NONE; + sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE; + sess->scheduled_bye = FALSE; + + /* reset session stats */ + sess->stats.bye_members = 0; + sess->stats.nacks_dropped = 0; + sess->stats.nacks_sent = 0; + sess->stats.nacks_received = 0; + + sess->is_doing_ptp = TRUE; + + g_list_free_full (sess->conflicting_addresses, + (GDestroyNotify) rtp_conflicting_address_free); + sess->conflicting_addresses = NULL; +} + +/** * rtp_session_set_callbacks: * @sess: an #RTPSession * @callbacks: callbacks to configure @@ -1115,6 +1221,10 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.reconfigure = callbacks->reconfigure; sess->reconfigure_user_data = user_data; } + if (callbacks->notify_early_rtcp) { + sess->callbacks.notify_early_rtcp = callbacks->notify_early_rtcp; + sess->notify_early_rtcp_user_data = user_data; + } } /** @@ -1346,6 +1456,12 @@ rtp_session_get_sdes_struct (RTPSession * sess) return result; } +static void +source_set_sdes (const gchar * key, RTPSource * source, GstStructure * sdes) +{ + rtp_source_set_sdes_struct (source, gst_structure_copy (sdes)); +} + /** * rtp_session_set_sdes_struct: * @sess: an #RTSPSession @@ -1363,6 +1479,9 @@ rtp_session_set_sdes_struct (RTPSession * sess, const GstStructure * sdes) if (sess->sdes) gst_structure_free (sess->sdes); sess->sdes = gst_structure_copy (sdes); + + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) source_set_sdes, sess->sdes); RTP_SESSION_UNLOCK (sess); } @@ -1925,34 +2044,6 @@ rtp_session_create_new_ssrc (RTPSession * sess) return ssrc; } - -/** - * rtp_session_create_source: - * @sess: an #RTPSession - * - * Create an #RTPSource for use in @sess. This function will create a source - * with an ssrc that is currently not used by any participants in the session. - * - * Returns: an #RTPSource. - */ -RTPSource * -rtp_session_create_source (RTPSession * sess) -{ - guint32 ssrc; - RTPSource *source; - - RTP_SESSION_LOCK (sess); - ssrc = rtp_session_create_new_ssrc (sess); - source = rtp_source_new (ssrc); - rtp_source_set_callbacks (source, &callbacks, sess); - /* we need an additional ref for the source in the hashtable */ - g_object_ref (source); - add_source (sess, source); - RTP_SESSION_UNLOCK (sess); - - return source; -} - static gboolean update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo) { @@ -2009,7 +2100,7 @@ invalid_packet: /* update the RTPPacketInfo structure with the current time and other bits * about the current buffer we are handling. * This function is typically called when a validated packet is received. - * This function should be called with the SESSION_LOCK + * This function should be called with the RTP_SESSION_LOCK */ static gboolean update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, @@ -2135,7 +2226,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, current_time, running_time, ntpnstime)) { GST_DEBUG ("invalid RTP packet received"); RTP_SESSION_UNLOCK (sess); - return rtp_session_process_rtcp (sess, buffer, current_time, ntpnstime); + return rtp_session_process_rtcp (sess, buffer, current_time, running_time, + ntpnstime); } ssrc = pinfo.ssrc; @@ -2148,6 +2240,9 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, prevactive = RTP_SOURCE_IS_ACTIVE (source); oldrate = source->bitrate; + if (created) + on_new_ssrc (sess, source); + /* let source process the packet */ result = rtp_source_process_rtp (source, &pinfo); @@ -2160,8 +2255,6 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, if (oldrate != source->bitrate) sess->recalc_bandwidth = TRUE; - if (created) - on_new_ssrc (sess, source); if (source->validated) { gboolean created; @@ -2612,8 +2705,12 @@ rtp_session_process_pli (RTPSession * sess, guint32 sender_ssrc, return; src = find_source (sess, sender_ssrc); - if (src == NULL) - return; + if (src == NULL) { + /* try to find a src with media_ssrc instead */ + src = find_source (sess, media_ssrc); + if (src == NULL) + return; + } rtp_session_request_local_key_unit (sess, src, media_ssrc, FALSE, current_time); @@ -2709,20 +2806,34 @@ static void rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, RTPPacketInfo * pinfo, GstClockTime current_time) { - GstRTCPType type = gst_rtcp_packet_get_type (packet); - GstRTCPFBType fbtype = gst_rtcp_packet_fb_get_type (packet); - guint32 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet); - guint32 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet); - guint8 *fci_data = gst_rtcp_packet_fb_get_fci (packet); - guint fci_length = 4 * gst_rtcp_packet_fb_get_fci_length (packet); + GstRTCPType type; + GstRTCPFBType fbtype; + guint32 sender_ssrc, media_ssrc; + guint8 *fci_data; + guint fci_length; RTPSource *src; + /* The feedback packet must include both sender SSRC and media SSRC */ + if (packet->length < 2) + return; + + type = gst_rtcp_packet_get_type (packet); + fbtype = gst_rtcp_packet_fb_get_type (packet); + sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (packet); + media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (packet); + src = find_source (sess, media_ssrc); /* skip non-bye packets for sources that are marked BYE */ if (sess->scheduled_bye && src && RTP_SOURCE_IS_MARKED_BYE (src)) return; + if (src) + g_object_ref (src); + + fci_data = gst_rtcp_packet_fb_get_fci (packet); + fci_length = gst_rtcp_packet_fb_get_fci_length (packet) * sizeof (guint32); + GST_DEBUG ("received feedback %d:%d from %08X about %08X with FCI of " "length %d", type, fbtype, sender_ssrc, media_ssrc, fci_length); @@ -2746,7 +2857,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, gst_buffer_unref (fci_buffer); } - if (src && sess->rtcp_feedback_retention_window) { + if (src && sess->rtcp_feedback_retention_window != GST_CLOCK_TIME_NONE) { rtp_source_retain_rtcp_packet (src, packet, pinfo->running_time); } @@ -2787,6 +2898,9 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, break; } } + + if (src) + g_object_unref (src); } /** @@ -2803,7 +2917,7 @@ rtp_session_process_feedback (RTPSession * sess, GstRTCPPacket * packet, */ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, - GstClockTime current_time, guint64 ntpnstime) + GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) { GstRTCPPacket packet; gboolean more, is_bye = FALSE, do_sync = FALSE; @@ -2825,7 +2939,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, RTP_SESSION_LOCK (sess); /* update pinfo stats */ update_packet_info (sess, &pinfo, FALSE, FALSE, FALSE, buffer, current_time, - -1, ntpnstime); + running_time, ntpnstime); /* start processing the compound packet */ gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp); @@ -2858,6 +2972,12 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer, case GST_RTCP_TYPE_PSFB: rtp_session_process_feedback (sess, &packet, &pinfo, current_time); break; + case GST_RTCP_TYPE_XR: + /* FIXME: This block is added to downgrade warning level. + * Once the parser is implemented, it should be replaced with + * a proper process function. */ + GST_DEBUG ("got RTCP XR packet, but ignored"); + break; default: GST_WARNING ("got unknown RTCP packet type: %d", type); break; @@ -2945,6 +3065,10 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) obtain_internal_source (sess, ssrc, &created, GST_CLOCK_TIME_NONE); if (source) { rtp_source_update_caps (source, caps); + + if (created) + on_new_sender_ssrc (sess, source); + g_object_unref (source); } } @@ -2962,8 +3086,8 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) * @current_time: the current system time * @running_time: the running time of @data * - * Send the RTP buffer in the session manager. This function takes ownership of - * @buffer. + * Send the RTP data (a buffer or buffer list) in the session manager. This + * function takes ownership of @data. * * Returns: a #GstFlowReturn. */ @@ -2992,6 +3116,10 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, if (created) on_new_sender_ssrc (sess, source); + if (!source->internal) + /* FIXME: Send GstRTPCollision upstream */ + goto collision; + prevsender = RTP_SOURCE_IS_SENDER (source); oldrate = source->bitrate; @@ -3016,6 +3144,15 @@ invalid_packet: GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } +collision: + { + g_object_unref (source); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + RTP_SESSION_UNLOCK (sess); + GST_WARNING ("non-internal source with same ssrc %08x, drop packet", + pinfo.ssrc); + return GST_FLOW_OK; + } } static void @@ -3325,7 +3462,9 @@ session_start_rtcp (RTPSession * sess, ReportData * data) /* fill in sender report info */ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - ntptime, rtptime, packet_count, octet_count); + sess->timestamp_sender_reports ? ntptime : 0, + sess->timestamp_sender_reports ? rtptime : 0, + packet_count, octet_count); } else { /* we are only receiver, create RR */ GST_DEBUG ("create RR for SSRC %08x", own->ssrc); @@ -3363,8 +3502,8 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) return; } - /* only report about other sender */ - if (source == data->source) + /* only report about remote sources */ + if (source->internal) goto reported; if (!RTP_SOURCE_IS_SENDER (source)) { @@ -3372,6 +3511,11 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) goto reported; } + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + goto reported; + } + GST_DEBUG ("create RB for SSRC %08x", source->ssrc); /* get new stats */ @@ -3498,15 +3642,56 @@ session_pli (const gchar * key, RTPSource * source, ReportData * data) static void session_nack (const gchar * key, RTPSource * source, ReportData * data) { + RTPSession *sess = data->sess; GstRTCPBuffer *rtcp = &data->rtcpbuf; GstRTCPPacket *packet = &data->packet; - guint32 *nacks; - guint n_nacks, i; + guint16 *nacks; + GstClockTime *nack_deadlines; + guint n_nacks, i = 0; + guint nacked_seqnums = 0; + guint16 n_fb_nacks = 0; guint8 *fci_data; if (!source->send_nack) return; + nacks = rtp_source_get_nacks (source, &n_nacks); + nack_deadlines = rtp_source_get_nack_deadlines (source, NULL); + GST_DEBUG ("%u NACKs current time %" GST_TIME_FORMAT, n_nacks, + GST_TIME_ARGS (data->current_time)); + + /* cleanup expired nacks */ + for (i = 0; i < n_nacks; i++) { + GST_DEBUG ("#%u deadline %" GST_TIME_FORMAT, nacks[i], + GST_TIME_ARGS (nack_deadlines[i])); + if (nack_deadlines[i] >= data->current_time) + break; + } + if (i) { + GST_WARNING ("Removing %u expired NACKS", i); + rtp_source_clear_nacks (source, i); + n_nacks -= i; + if (n_nacks == 0) + return; + } + + /* allow overriding NACK to packet conversion */ + if (g_signal_has_handler_pending (sess, + rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, TRUE)) { + /* this is needed as it will actually resize the buffer */ + gst_rtcp_buffer_unmap (rtcp); + + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_NACKS], 0, + data->source->ssrc, source->ssrc, source->nacks, data->rtcp, + &nacked_seqnums); + + /* and now remap for the remaining work */ + gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp); + + if (nacked_seqnums > 0) + goto done; + } + if (!gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_RTPFB, packet)) /* exit because the packet is full, will put next request in a * further packet */ @@ -3516,21 +3701,47 @@ session_nack (const gchar * key, RTPSource * source, ReportData * data) gst_rtcp_packet_fb_set_sender_ssrc (packet, data->source->ssrc); gst_rtcp_packet_fb_set_media_ssrc (packet, source->ssrc); - nacks = rtp_source_get_nacks (source, &n_nacks); - GST_DEBUG ("%u NACKs", n_nacks); - if (!gst_rtcp_packet_fb_set_fci_length (packet, n_nacks)) + if (!gst_rtcp_packet_fb_set_fci_length (packet, 1)) { + gst_rtcp_packet_remove (packet); + GST_WARNING ("no nacks fit in the packet"); return; + } fci_data = gst_rtcp_packet_fb_get_fci (packet); - for (i = 0; i < n_nacks; i++) { - GST_WRITE_UINT32_BE (fci_data, nacks[i]); + for (i = 0; i < n_nacks; i = nacked_seqnums) { + guint16 seqnum = nacks[i]; + guint16 blp = 0; + guint j; + + if (!gst_rtcp_packet_fb_set_fci_length (packet, n_fb_nacks + 1)) + break; + + n_fb_nacks++; + nacked_seqnums++; + + for (j = i + 1; j < n_nacks; j++) { + gint diff; + + diff = gst_rtp_buffer_compare_seqnum (seqnum, nacks[j]); + GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, nacks[j], diff); + if (diff > 16) + break; + + blp |= 1 << (diff - 1); + nacked_seqnums++; + } + + GST_WRITE_UINT32_BE (fci_data, seqnum << 16 | blp); fci_data += 4; - data->nacked_seqnums++; } - rtp_source_clear_nacks (source); + GST_DEBUG ("Sent %u seqnums into %u FB NACKs", nacked_seqnums, n_fb_nacks); + source->stats.sent_nack_count += n_fb_nacks; + +done: + data->nacked_seqnums += nacked_seqnums; + rtp_source_clear_nacks (source, nacked_seqnums); data->may_suppress = FALSE; - source->stats.sent_nack_count += n_nacks; } /* perform cleanup of sources that timed out */ @@ -3550,8 +3761,8 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) /* check for outdated collisions */ if (source->internal) { GST_DEBUG ("Timing out collisions for %x", source->ssrc); - rtp_source_timeout (source, data->current_time, - data->running_time - sess->rtcp_feedback_retention_window); + rtp_source_timeout (source, data->current_time, data->running_time, + sess->rtcp_feedback_retention_window); } /* nothing else to do when without RTCP */ @@ -3773,8 +3984,8 @@ is_rtcp_time (RTPSession * sess, GstClockTime current_time, ReportData * data) else data->is_early = FALSE; - if (data->is_early && sess->next_early_rtcp_time < current_time) { - GST_DEBUG ("early feedback %" GST_TIME_FORMAT " < now %" + if (data->is_early && sess->next_early_rtcp_time <= current_time) { + GST_DEBUG ("early feedback %" GST_TIME_FORMAT " <= now %" GST_TIME_FORMAT, GST_TIME_ARGS (sess->next_early_rtcp_time), GST_TIME_ARGS (current_time)); } else if (sess->next_rtcp_check_time == GST_CLOCK_TIME_NONE || @@ -3889,6 +4100,12 @@ generate_rtcp (const gchar * key, RTPSource * source, ReportData * data) if (sess->scheduled_bye && !source->marked_bye) return; + /* skip if RTCP is disabled */ + if (source->disable_rtcp) { + GST_DEBUG ("source %08x has RTCP disabled", source->ssrc); + return; + } + data->source = source; /* open packet */ @@ -3950,17 +4167,43 @@ update_generation (const gchar * key, RTPSource * source, ReportData * data) } } +static void +schedule_remaining_nacks (const gchar * key, RTPSource * source, + ReportData * data) +{ + RTPSession *sess = data->sess; + GstClockTime *nack_deadlines; + GstClockTime deadline; + guint n_nacks; + + if (!source->send_nack) + return; + + /* the scheduling is entirely based on available bandwidth, just take the + * biggest seqnum, which will have the largest deadline to request early + * RTCP. */ + nack_deadlines = rtp_source_get_nack_deadlines (source, &n_nacks); + deadline = nack_deadlines[n_nacks - 1]; + RTP_SESSION_UNLOCK (sess); + rtp_session_send_rtcp_with_deadline (sess, deadline); + RTP_SESSION_LOCK (sess); +} + static gboolean rtp_session_are_all_sources_bye (RTPSession * sess) { GHashTableIter iter; RTPSource *src; + RTP_SESSION_LOCK (sess); g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]); while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) { - if (src->internal && !src->sent_bye) + if (src->internal && !src->sent_bye) { + RTP_SESSION_UNLOCK (sess); return FALSE; + } } + RTP_SESSION_UNLOCK (sess); return TRUE; } @@ -4147,6 +4390,12 @@ done: if (all_empty) GST_ERROR ("generated empty RTCP messages for all the sources"); + /* schedule remaining nacks */ + RTP_SESSION_LOCK (sess); + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) schedule_remaining_nacks, &data); + RTP_SESSION_UNLOCK (sess); + return result; } @@ -4319,6 +4568,35 @@ end: } static gboolean +rtp_session_send_rtcp_internal (RTPSession * sess, GstClockTime now, + GstClockTime max_delay) +{ + /* notify the application that we intend to send early RTCP */ + if (sess->callbacks.notify_early_rtcp) + sess->callbacks.notify_early_rtcp (sess, sess->notify_early_rtcp_user_data); + + return rtp_session_request_early_rtcp (sess, now, max_delay); +} + +static gboolean +rtp_session_send_rtcp_with_deadline (RTPSession * sess, GstClockTime deadline) +{ + GstClockTime now, max_delay; + + if (!sess->callbacks.send_rtcp) + return FALSE; + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); + + if (deadline < now) + return FALSE; + + max_delay = deadline - now; + + return rtp_session_send_rtcp_internal (sess, now, max_delay); +} + +static gboolean rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) { GstClockTime now; @@ -4328,7 +4606,7 @@ rtp_session_send_rtcp (RTPSession * sess, GstClockTime max_delay) now = sess->callbacks.request_time (sess, sess->request_time_user_data); - return rtp_session_request_early_rtcp (sess, now, max_delay); + return rtp_session_send_rtcp_internal (sess, now, max_delay); } gboolean @@ -4337,11 +4615,6 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, { RTPSource *src; - if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) { - GST_DEBUG ("FIR/PLI not sent"); - return FALSE; - } - RTP_SESSION_LOCK (sess); src = find_source (sess, ssrc); if (src == NULL) @@ -4359,6 +4632,10 @@ rtp_session_request_key_unit (RTPSession * sess, guint32 ssrc, } RTP_SESSION_UNLOCK (sess); + if (!rtp_session_send_rtcp (sess, 5 * GST_SECOND)) { + GST_DEBUG ("FIR/PLI not sent early, sending with next regular RTCP"); + } + return TRUE; /* ERRORS */ @@ -4385,21 +4662,27 @@ rtp_session_request_nack (RTPSession * sess, guint32 ssrc, guint16 seqnum, GstClockTime max_delay) { RTPSource *source; + GstClockTime now; - if (!rtp_session_send_rtcp (sess, max_delay)) { - GST_DEBUG ("NACK not sent"); + if (!sess->callbacks.send_rtcp) return FALSE; - } + + now = sess->callbacks.request_time (sess, sess->request_time_user_data); RTP_SESSION_LOCK (sess); source = find_source (sess, ssrc); if (source == NULL) goto no_source; - GST_DEBUG ("request NACK for %08x, #%u", ssrc, seqnum); - rtp_source_register_nack (source, seqnum); + GST_DEBUG ("request NACK for SSRC %08x, #%u, deadline %" GST_TIME_FORMAT, + ssrc, seqnum, GST_TIME_ARGS (now + max_delay)); + rtp_source_register_nack (source, seqnum, now + max_delay); RTP_SESSION_UNLOCK (sess); + if (!rtp_session_send_rtcp_internal (sess, now, max_delay)) { + GST_DEBUG ("NACK not sent early, sending with next regular RTCP"); + } + return TRUE; /* ERRORS */