X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Frtpsource.c;h=7079e168a73e0606e979826aa7d97149930db38b;hb=0de3ebc5b26b3e3404386db6e058b42150ad7181;hp=fbe1f4ce37c69efa61a2edf2c919524620a9e632;hpb=439e2f1cfd1d54842998b2f963a13ba498b1cd7a;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index fbe1f4c..7079e16 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -1,5 +1,7 @@ /* GStreamer * Copyright (C) <2007> Wim Taymans + * Copyright (C) 2015 Kurento (http://kurento.org/) + * @author: Miguel París * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -13,8 +15,8 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ #include @@ -39,6 +41,10 @@ enum #define DEFAULT_IS_VALIDATED FALSE #define DEFAULT_IS_SENDER FALSE #define DEFAULT_SDES NULL +#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION +#define DEFAULT_MAX_DROPOUT_TIME 60000 +#define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_DISABLE_RTCP FALSE enum { @@ -49,7 +55,10 @@ enum PROP_IS_SENDER, PROP_SDES, PROP_STATS, - PROP_LAST + PROP_PROBATION, + PROP_MAX_DROPOUT_TIME, + PROP_MAX_MISORDER_TIME, + PROP_DISABLE_RTCP }; /* GObject vmethods */ @@ -101,7 +110,7 @@ rtp_source_class_init (RTPSourceClass * klass) * The current SDES items of the source. Returns a structure with name * application/x-rtp-source-sdes and may contain the following fields: * - * 'cname' G_TYPE_STRING : The canonical name + * 'cname' G_TYPE_STRING : The canonical name in the form user@host * 'name' G_TYPE_STRING : The user name * 'email' G_TYPE_STRING : The user's electronic mail address * 'phone' G_TYPE_STRING : The user's phone number @@ -109,7 +118,7 @@ rtp_source_class_init (RTPSourceClass * klass) * 'tool' G_TYPE_STRING : The name of application or tool * 'note' G_TYPE_STRING : A notice about the source * - * other fields may be present and these represent private items in + * Other fields may be present and these represent private items in * the SDES where the field name is the prefix. */ g_object_class_install_property (gobject_class, PROP_SDES, @@ -120,25 +129,37 @@ rtp_source_class_init (RTPSourceClass * klass) /** * RTPSource::stats * - * The statistics of the source. This property returns a GstStructure with - * name application/x-rtp-source-stats with the following fields: + * This property returns a GstStructure named application/x-rtp-source-stats with + * fields useful for statistics and diagnostics. * - * "ssrc" G_TYPE_UINT The SSRC of this source - * "internal" G_TYPE_BOOLEAN If this source is the source of the session - * "validated" G_TYPE_BOOLEAN If the source is validated - * "received-bye" G_TYPE_BOOLEAN If we received a BYE from this source - * "is-csrc" G_TYPE_BOOLEAN If this source was found as CSRC - * "is-sender" G_TYPE_BOOLEAN If this source is a sender + * Take note of each respective field's units: + * + * - NTP times are in the appropriate 32-bit or 64-bit fixed-point format + * starting from January 1, 1970 (except for timespans). + * - RTP times are in clock rate units (i.e. clock rate = 1 second) + * starting at a random offset. + * - For fields indicating packet loss, note that late packets are not considered lost, + * and duplicates are not taken into account. Hence, the loss may be negative + * if there are duplicates. + * + * The following fields are always present. + * + * "ssrc" G_TYPE_UINT the SSRC of this source + * "internal" G_TYPE_BOOLEAN this source is a source of the session + * "validated" G_TYPE_BOOLEAN the source is validated + * "received-bye" G_TYPE_BOOLEAN we received a BYE from this source + * "is-csrc" G_TYPE_BOOLEAN this source was found as CSRC + * "is-sender" G_TYPE_BOOLEAN this source is a sender * "seqnum-base" G_TYPE_INT first seqnum if known * "clock-rate" G_TYPE_INT the clock rate of the media * - * The following two fields are only present when known. + * The following fields are only present when known. * * "rtp-from" G_TYPE_STRING where we received the last RTP packet from * "rtcp-from" G_TYPE_STRING where we received the last RTCP packet from * * The following fields make sense for internal sources and will only increase - * when "is-sender" is TRUE: + * when "is-sender" is TRUE. * * "octets-sent" G_TYPE_UINT64 number of bytes we sent * "packets-sent" G_TYPE_UINT64 number of packets we sent @@ -152,15 +173,15 @@ rtp_source_class_init (RTPSourceClass * klass) * Following fields are updated when "is-sender" is TRUE. * * "bitrate" G_TYPE_UINT64 bitrate in bits per second - * "jitter" G_TYPE_UINT estimated jitter + * "jitter" G_TYPE_UINT estimated jitter (in clock rate units) * "packets-lost" G_TYPE_INT estimated amount of packets lost * * The last SR report this source sent. This only updates when "is-sender" is * TRUE. * * "have-sr" G_TYPE_BOOLEAN the source has sent SR - * "sr-ntptime" G_TYPE_UINT64 ntptime of SR - * "sr-rtptime" G_TYPE_UINT rtptime of SR + * "sr-ntptime" G_TYPE_UINT64 NTP time of SR (in NTP Timestamp Format, 32.32 fixed point) + * "sr-rtptime" G_TYPE_UINT RTP time of SR (in clock rate units) * "sr-octet-count" G_TYPE_UINT the number of bytes in the SR * "sr-packet-count" G_TYPE_UINT the number of packets in the SR * @@ -169,36 +190,65 @@ rtp_source_class_init (RTPSourceClass * klass) * These values are only updated when the source is sending. * * "sent-rb" G_TYPE_BOOLEAN we have sent an RB - * "sent-rb-fractionlost" G_TYPE_UINT calculated lost fraction + * "sent-rb-fractionlost" G_TYPE_UINT calculated lost 8-bit fraction * "sent-rb-packetslost" G_TYPE_INT lost packets * "sent-rb-exthighestseq" G_TYPE_UINT last seen seqnum - * "sent-rb-jitter" G_TYPE_UINT jitter - * "sent-rb-lsr" G_TYPE_UINT last SR time - * "sent-rb-dlsr" G_TYPE_UINT delay since last SR + * "sent-rb-jitter" G_TYPE_UINT jitter (in clock rate units) + * "sent-rb-lsr" G_TYPE_UINT last SR time (seconds in NTP Short Format, 16.16 fixed point) + * "sent-rb-dlsr" G_TYPE_UINT delay since last SR (seconds in NTP Short Format, 16.16 fixed point) * * The following fields are only present for non-internal sources and * represents the last RB that this source sent. This is only updated * when the source is receiving data and sending RB blocks. * * "have-rb" G_TYPE_BOOLEAN the source has sent RB - * "rb-fractionlost" G_TYPE_UINT lost fraction + * "rb-fractionlost" G_TYPE_UINT lost 8-bit fraction * "rb-packetslost" G_TYPE_INT lost packets * "rb-exthighestseq" G_TYPE_UINT highest received seqnum - * "rb-jitter" G_TYPE_UINT reception jitter - * "rb-lsr" G_TYPE_UINT last SR time - * "rb-dlsr" G_TYPE_UINT delay since last SR + * "rb-jitter" G_TYPE_UINT reception jitter (in clock rate units) + * "rb-lsr" G_TYPE_UINT last SR time (seconds in NTP Short Format, 16.16 fixed point) + * "rb-dlsr" G_TYPE_UINT delay since last SR (seconds in NTP Short Format, 16.16 fixed point) * - * The round trip of this source. This is calculated from the last RB - * values and the recption time of the last RB packet. Only present for + * The round trip of this source is calculated from the last RB + * values and the reception time of the last RB packet. It is only present for * non-internal sources. * - * "rb-round-trip" G_TYPE_UINT the round trip time in nanoseconds + * "rb-round-trip" G_TYPE_UINT the round-trip time (seconds in NTP Short Format, 16.16 fixed point) + * */ g_object_class_install_property (gobject_class, PROP_STATS, g_param_spec_boxed ("stats", "Stats", "The stats of this source", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PROBATION, + g_param_spec_uint ("probation", "Number of probations", + "Consecutive packet sequence numbers to accept the source", + 0, G_MAXUINT, DEFAULT_PROBATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME, + g_param_spec_uint ("max-dropout-time", "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME, + g_param_spec_uint ("max-misorder-time", "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * RTPSession::disable-rtcp: + * + * Allow disabling the sending of RTCP packets for this source. + */ + g_object_class_install_property (gobject_class, PROP_DISABLE_RTCP, + g_param_spec_boolean ("disable-rtcp", "Disable RTCP", + "Disable sending RTCP packets for this source", + DEFAULT_DISABLE_RTCP, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source"); } @@ -211,48 +261,83 @@ rtp_source_class_init (RTPSourceClass * klass) void rtp_source_reset (RTPSource * src) { - src->received_bye = FALSE; + src->marked_bye = FALSE; + if (src->bye_reason) + g_free (src->bye_reason); + src->bye_reason = NULL; + src->sent_bye = FALSE; + g_hash_table_remove_all (src->reported_in_sr_of); + g_queue_foreach (src->retained_feedback, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->retained_feedback); + src->last_rtptime = -1; src->stats.cycles = -1; src->stats.jitter = 0; src->stats.transit = -1; src->stats.curr_sr = 0; + src->stats.sr[0].is_valid = FALSE; src->stats.curr_rr = 0; + src->stats.rr[0].is_valid = FALSE; + src->stats.prev_rtptime = GST_CLOCK_TIME_NONE; + src->stats.prev_rtcptime = GST_CLOCK_TIME_NONE; + src->stats.last_rtptime = GST_CLOCK_TIME_NONE; + src->stats.last_rtcptime = GST_CLOCK_TIME_NONE; + g_array_set_size (src->nacks, 0); + + src->stats.sent_pli_count = 0; + src->stats.sent_fir_count = 0; + src->stats.sent_nack_count = 0; + src->stats.recv_nack_count = 0; } static void rtp_source_init (RTPSource * src) { - /* sources are initialy on probation until we receive enough valid RTP + /* sources are initially on probation until we receive enough valid RTP * packets or a valid RTCP packet */ src->validated = FALSE; src->internal = FALSE; - src->probation = RTP_DEFAULT_PROBATION; + src->probation = DEFAULT_PROBATION; + src->curr_probation = src->probation; src->closing = FALSE; + src->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME; + src->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes"); src->payload = -1; src->clock_rate = -1; src->packets = g_queue_new (); - src->seqnum_base = -1; - src->last_rtptime = -1; + src->seqnum_offset = -1; src->retained_feedback = g_queue_new (); + src->nacks = g_array_new (FALSE, FALSE, sizeof (guint16)); + src->nack_deadlines = g_array_new (FALSE, FALSE, sizeof (GstClockTime)); + + src->reported_in_sr_of = g_hash_table_new (g_direct_hash, g_direct_equal); + + src->last_keyframe_request = GST_CLOCK_TIME_NONE; rtp_source_reset (src); + + src->pt_set = FALSE; +} + +void +rtp_conflicting_address_free (RTPConflictingAddress * addr) +{ + g_object_unref (addr->address); + g_slice_free (RTPConflictingAddress, addr); } static void rtp_source_finalize (GObject * object) { RTPSource *src; - GstBuffer *buffer; src = RTP_SOURCE_CAST (object); - while ((buffer = g_queue_pop_head (src->packets))) - gst_buffer_unref (buffer); + g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); g_queue_free (src->packets); gst_structure_free (src->sdes); @@ -261,13 +346,21 @@ rtp_source_finalize (GObject * object) gst_caps_replace (&src->caps, NULL); - g_list_foreach (src->conflicting_addresses, (GFunc) g_free, NULL); - g_list_free (src->conflicting_addresses); - - while ((buffer = g_queue_pop_head (src->retained_feedback))) - gst_buffer_unref (buffer); + g_list_free_full (src->conflicting_addresses, + (GDestroyNotify) rtp_conflicting_address_free); + g_queue_foreach (src->retained_feedback, (GFunc) gst_buffer_unref, NULL); g_queue_free (src->retained_feedback); + g_array_free (src->nacks, TRUE); + g_array_free (src->nack_deadlines, TRUE); + + if (src->rtp_from) + g_object_unref (src->rtp_from); + if (src->rtcp_from) + g_object_unref (src->rtcp_from); + + g_hash_table_unref (src->reported_in_sr_of); + G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object); } @@ -277,7 +370,7 @@ rtp_source_create_stats (RTPSource * src) GstStructure *s; gboolean is_sender = src->is_sender; gboolean internal = src->internal; - gchar address_str[GST_NETADDRESS_MAX_LEN]; + gchar *address_str; gboolean have_rb; guint8 fractionlost = 0; gint32 packetslost = 0; @@ -299,22 +392,22 @@ rtp_source_create_stats (RTPSource * src) "ssrc", G_TYPE_UINT, (guint) src->ssrc, "internal", G_TYPE_BOOLEAN, internal, "validated", G_TYPE_BOOLEAN, src->validated, - "received-bye", G_TYPE_BOOLEAN, src->received_bye, + "received-bye", G_TYPE_BOOLEAN, src->marked_bye, "is-csrc", G_TYPE_BOOLEAN, src->is_csrc, "is-sender", G_TYPE_BOOLEAN, is_sender, - "seqnum-base", G_TYPE_INT, src->seqnum_base, + "seqnum-base", G_TYPE_INT, src->seqnum_offset, "clock-rate", G_TYPE_INT, src->clock_rate, NULL); /* add address and port */ - if (src->have_rtp_from) { - gst_net_address_to_string (&src->rtp_from, address_str, - sizeof (address_str)); + if (src->rtp_from) { + address_str = __g_socket_address_to_string (src->rtp_from); gst_structure_set (s, "rtp-from", G_TYPE_STRING, address_str, NULL); + g_free (address_str); } - if (src->have_rtcp_from) { - gst_net_address_to_string (&src->rtcp_from, address_str, - sizeof (address_str)); + if (src->rtcp_from) { + address_str = __g_socket_address_to_string (src->rtcp_from); gst_structure_set (s, "rtcp-from", G_TYPE_STRING, address_str, NULL); + g_free (address_str); } gst_structure_set (s, @@ -325,7 +418,13 @@ rtp_source_create_stats (RTPSource * src) "bitrate", G_TYPE_UINT64, src->bitrate, "packets-lost", G_TYPE_INT, (gint) rtp_stats_get_packets_lost (&src->stats), "jitter", G_TYPE_UINT, - (guint) (src->stats.jitter >> 4), NULL); + (guint) (src->stats.jitter >> 4), + "sent-pli-count", G_TYPE_UINT, src->stats.sent_pli_count, + "recv-pli-count", G_TYPE_UINT, src->stats.recv_pli_count, + "sent-fir-count", G_TYPE_UINT, src->stats.sent_fir_count, + "recv-fir-count", G_TYPE_UINT, src->stats.recv_fir_count, + "sent-nack-count", G_TYPE_UINT, src->stats.sent_nack_count, + "recv-nack-count", G_TYPE_UINT, src->stats.recv_nack_count, NULL); /* get the last SR. */ have_sr = rtp_source_get_last_sr (src, &time, &ntptime, &rtptime, @@ -404,7 +503,7 @@ sdes_struct_compare_func (GQuark field_id, const GValue * value, } /** - * rtp_source_set_sdes: + * rtp_source_set_sdes_struct: * @src: an #RTPSource * @sdes: the SDES structure * @@ -432,7 +531,6 @@ rtp_source_set_sdes_struct (RTPSource * src, GstStructure * sdes) } else { gst_structure_free (sdes); } - return changed; } @@ -448,6 +546,18 @@ rtp_source_set_property (GObject * object, guint prop_id, case PROP_SSRC: src->ssrc = g_value_get_uint (value); break; + case PROP_PROBATION: + src->probation = g_value_get_uint (value); + break; + case PROP_MAX_DROPOUT_TIME: + src->max_dropout_time = g_value_get_uint (value); + break; + case PROP_MAX_MISORDER_TIME: + src->max_misorder_time = g_value_get_uint (value); + break; + case PROP_DISABLE_RTCP: + src->disable_rtcp = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -481,6 +591,18 @@ rtp_source_get_property (GObject * object, guint prop_id, case PROP_STATS: g_value_take_boxed (value, rtp_source_create_stats (src)); break; + case PROP_PROBATION: + g_value_set_uint (value, src->probation); + break; + case PROP_MAX_DROPOUT_TIME: + g_value_set_uint (value, src->max_dropout_time); + break; + case PROP_MAX_MISORDER_TIME: + g_value_set_uint (value, src->max_misorder_time); + break; + case PROP_DISABLE_RTCP: + g_value_set_boolean (value, src->disable_rtcp); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -642,21 +764,21 @@ rtp_source_is_sender (RTPSource * src) } /** - * rtp_source_received_bye: + * rtp_source_is_marked_bye: * @src: an #RTPSource * - * Check if @src has receoved a BYE packet. + * Check if @src is marked as leaving the session with a BYE packet. * - * Returns: %TRUE if @src has received a BYE packet. + * Returns: %TRUE if @src has been marked BYE. */ gboolean -rtp_source_received_bye (RTPSource * src) +rtp_source_is_marked_bye (RTPSource * src) { gboolean result; g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE); - result = src->received_bye; + result = RTP_SOURCE_IS_MARKED_BYE (src); return result; } @@ -666,11 +788,11 @@ rtp_source_received_bye (RTPSource * src) * rtp_source_get_bye_reason: * @src: an #RTPSource * - * Get the BYE reason for @src. Check if the source receoved a BYE message first - * with rtp_source_received_bye(). + * Get the BYE reason for @src. Check if the source is marked as leaving the + * session with a BYE message first with rtp_source_is_marked_bye(). * - * Returns: The BYE reason or NULL when no reason was given or the source did - * not receive a BYE message yet. g_fee() after usage. + * Returns: The BYE reason or NULL when no reason was given or the source was + * not marked BYE yet. g_free() after usage. */ gchar * rtp_source_get_bye_reason (RTPSource * src) @@ -697,6 +819,7 @@ rtp_source_update_caps (RTPSource * src, GstCaps * caps) GstStructure *s; guint val; gint ival; + gboolean rtx; /* nothing changed, return */ if (caps == NULL || src->caps == caps) @@ -704,11 +827,14 @@ rtp_source_update_caps (RTPSource * src, GstCaps * caps) s = gst_caps_get_structure (caps, 0); - if (gst_structure_get_int (s, "payload", &ival)) + rtx = (gst_structure_get_uint (s, "rtx-ssrc", &val) && val == src->ssrc); + + if (gst_structure_get_int (s, rtx ? "rtx-payload" : "payload", &ival)) src->payload = ival; else src->payload = -1; - GST_DEBUG ("got payload %d", src->payload); + + GST_DEBUG ("got %spayload %d", rtx ? "rtx " : "", src->payload); if (gst_structure_get_int (s, "clock-rate", &ival)) src->clock_rate = ival; @@ -717,86 +843,19 @@ rtp_source_update_caps (RTPSource * src, GstCaps * caps) GST_DEBUG ("got clock-rate %d", src->clock_rate); - if (gst_structure_get_uint (s, "seqnum-base", &val)) - src->seqnum_base = val; + if (gst_structure_get_uint (s, rtx ? "rtx-seqnum-offset" : "seqnum-offset", + &val)) + src->seqnum_offset = val; else - src->seqnum_base = -1; + src->seqnum_offset = -1; - GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base); + GST_DEBUG ("got %sseqnum-offset %" G_GINT32_FORMAT, rtx ? "rtx " : "", + src->seqnum_offset); gst_caps_replace (&src->caps, caps); } /** - * rtp_source_set_sdes_string: - * @src: an #RTPSource - * @type: the type of the SDES item - * @data: the SDES data - * - * Store an SDES item of @type in @src. - * - * Returns: %FALSE if the SDES item was unchanged or @type is unknown. - */ -gboolean -rtp_source_set_sdes_string (RTPSource * src, GstRTCPSDESType type, - const gchar * data) -{ - const gchar *old; - const gchar *field; - - field = gst_rtcp_sdes_type_to_name (type); - - if (gst_structure_has_field (src->sdes, field)) - old = gst_structure_get_string (src->sdes, field); - else - old = NULL; - - if (old == NULL && data == NULL) - return FALSE; - - if (old != NULL && data != NULL && strcmp (old, data) == 0) - return FALSE; - - if (data == NULL) - gst_structure_remove_field (src->sdes, field); - else - gst_structure_set (src->sdes, field, G_TYPE_STRING, data, NULL); - - return TRUE; -} - -/** - * rtp_source_get_sdes_string: - * @src: an #RTPSource - * @type: the type of the SDES item - * - * Get the SDES item of @type from @src. - * - * Returns: a null-terminated copy of the SDES item or NULL when @type was not - * valid or the SDES item was unset. g_free() after usage. - */ -gchar * -rtp_source_get_sdes_string (RTPSource * src, GstRTCPSDESType type) -{ - gchar *result; - const gchar *type_name; - - g_return_val_if_fail (RTP_IS_SOURCE (src), NULL); - - if (type < 0 || type > GST_RTCP_SDES_PRIV - 1) - return NULL; - - type_name = gst_rtcp_sdes_type_to_name (type); - - if (!gst_structure_has_field (src->sdes, type_name)) - return NULL; - - result = g_strdup (gst_structure_get_string (src->sdes, type_name)); - - return result; -} - -/** * rtp_source_set_rtp_from: * @src: an #RTPSource * @address: the RTP address to set @@ -805,12 +864,13 @@ rtp_source_get_sdes_string (RTPSource * src, GstRTCPSDESType type) * collistion checking. */ void -rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address) +rtp_source_set_rtp_from (RTPSource * src, GSocketAddress * address) { g_return_if_fail (RTP_IS_SOURCE (src)); - src->have_rtp_from = TRUE; - memcpy (&src->rtp_from, address, sizeof (GstNetAddress)); + if (src->rtp_from) + g_object_unref (src->rtp_from); + src->rtp_from = G_SOCKET_ADDRESS (g_object_ref (address)); } /** @@ -822,12 +882,13 @@ rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address) * collistion checking. */ void -rtp_source_set_rtcp_from (RTPSource * src, GstNetAddress * address) +rtp_source_set_rtcp_from (RTPSource * src, GSocketAddress * address) { g_return_if_fail (RTP_IS_SOURCE (src)); - src->have_rtcp_from = TRUE; - memcpy (&src->rtcp_from, address, sizeof (GstNetAddress)); + if (src->rtcp_from) + g_object_unref (src->rtcp_from); + src->rtcp_from = G_SOCKET_ADDRESS (g_object_ref (address)); } static GstFlowReturn @@ -879,6 +940,7 @@ get_clock_rate (RTPSource * src, guint8 payload) GST_DEBUG ("got clock-rate %d", clock_rate); src->clock_rate = clock_rate; + gst_rtp_packet_rate_ctx_reset (&src->packet_rate_ctx, clock_rate); } return src->clock_rate; } @@ -889,32 +951,27 @@ get_clock_rate (RTPSource * src, guint8 payload) * 50 milliseconds apart and arrive 60 milliseconds apart, then the jitter is 10 * milliseconds. */ static void -calculate_jitter (RTPSource * src, GstBuffer * buffer, - RTPArrivalStats * arrival) +calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo) { GstClockTime running_time; guint32 rtparrival, transit, rtptime; gint32 diff; gint clock_rate; guint8 pt; - GstRTPBuffer rtp = { NULL }; /* get arrival time */ - if ((running_time = arrival->running_time) == GST_CLOCK_TIME_NONE) + if ((running_time = pinfo->running_time) == GST_CLOCK_TIME_NONE) goto no_time; - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - pt = gst_rtp_buffer_get_payload_type (&rtp); + pt = pinfo->pt; GST_LOG ("SSRC %08x got payload %d", src->ssrc, pt); /* get clockrate */ - if ((clock_rate = get_clock_rate (src, pt)) == -1) { - gst_rtp_buffer_unmap (&rtp); + if ((clock_rate = get_clock_rate (src, pt)) == -1) goto no_clock_rate; - } - rtptime = gst_rtp_buffer_get_timestamp (&rtp); + rtptime = pinfo->rtptime; /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't * care about the absolute value, just the difference. */ @@ -943,7 +1000,6 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, GST_LOG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %f", rtparrival, rtptime, clock_rate, diff, (src->stats.jitter) / 16.0); - gst_rtp_buffer_unmap (&rtp); return; /* ERRORS */ @@ -971,6 +1027,8 @@ init_seq (RTPSource * src, guint16 seq) src->stats.bytes_received = 0; src->stats.prev_received = 0; src->stats.prev_expected = 0; + src->stats.recv_pli_count = 0; + src->stats.recv_fir_count = 0; GST_DEBUG ("base_seq %d", seq); } @@ -1009,244 +1067,266 @@ do_bitrate_estimation (RTPSource * src, GstClockTime running_time, } } -/** - * rtp_source_process_rtp: - * @src: an #RTPSource - * @buffer: an RTP buffer - * - * Let @src handle the incomming RTP @buffer. - * - * Returns: a #GstFlowReturn. - */ -GstFlowReturn -rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, - RTPArrivalStats * arrival) +static gboolean +update_receiver_stats (RTPSource * src, RTPPacketInfo * pinfo, + gboolean is_receive) { - GstFlowReturn result = GST_FLOW_OK; - guint16 seqnr, udelta; + guint16 seqnr, expected; RTPSourceStats *stats; - guint16 expected; - GstRTPBuffer rtp = { NULL }; - - g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + gint16 delta; + gint32 packet_rate, max_dropout, max_misorder; stats = &src->stats; - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - seqnr = gst_rtp_buffer_get_seq (&rtp); - gst_rtp_buffer_unmap (&rtp); + seqnr = pinfo->seqnum; + + packet_rate = + gst_rtp_packet_rate_ctx_update (&src->packet_rate_ctx, pinfo->seqnum, + pinfo->rtptime); + max_dropout = + gst_rtp_packet_rate_ctx_get_max_dropout (&src->packet_rate_ctx, + src->max_dropout_time); + max_misorder = + gst_rtp_packet_rate_ctx_get_max_misorder (&src->packet_rate_ctx, + src->max_misorder_time); + GST_TRACE ("SSRC %08x, packet_rate: %d, max_dropout: %d, max_misorder: %d", + src->ssrc, packet_rate, max_dropout, max_misorder); if (stats->cycles == -1) { - GST_DEBUG ("received first buffer"); + GST_DEBUG ("received first packet"); /* first time we heard of this source */ init_seq (src, seqnr); src->stats.max_seq = seqnr - 1; - src->probation = RTP_DEFAULT_PROBATION; + src->curr_probation = src->probation; } - udelta = seqnr - stats->max_seq; - - /* if we are still on probation, check seqnum */ - if (src->probation) { + if (is_receive) { expected = src->stats.max_seq + 1; - - /* when in probation, we require consecutive seqnums */ - if (seqnr == expected) { - /* expected packet */ - GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); - src->probation--; - src->stats.max_seq = seqnr; - if (src->probation == 0) { - GST_DEBUG ("probation done!"); + delta = gst_rtp_buffer_compare_seqnum (expected, seqnr); + + /* if we are still on probation, check seqnum */ + if (src->curr_probation) { + /* when in probation, we require consecutive seqnums */ + if (delta == 0) { + /* expected packet */ + GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); + src->curr_probation--; + if (seqnr < stats->max_seq) { + /* sequence number wrapped - count another 64K cycle. */ + stats->cycles += RTP_SEQ_MOD; + } + src->stats.max_seq = seqnr; + + if (src->curr_probation == 0) { + GST_DEBUG ("probation done!"); + init_seq (src, seqnr); + } else { + GstBuffer *q; + + GST_DEBUG ("probation %d: queue packet", src->curr_probation); + /* when still in probation, keep packets in a list. */ + g_queue_push_tail (src->packets, pinfo->data); + pinfo->data = NULL; + /* remove packets from queue if there are too many */ + while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { + q = g_queue_pop_head (src->packets); + gst_buffer_unref (q); + } + goto done; + } + } else { + /* unexpected seqnum in probation + * + * There is no need to clean the queue at this point because the + * invalid packets in the queue are not going to be pushed as we are + * still in probation, and some cleanup will be performed at future + * probation attempts anyway if there are too many old packets in the + * queue. + */ + goto probation_seqnum; + } + } else if (delta >= 0 && delta < max_dropout) { + /* Clear bad packets */ + stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ + g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->packets); + + /* in order, with permissible gap */ + if (seqnr < stats->max_seq) { + /* sequence number wrapped - count another 64K cycle. */ + stats->cycles += RTP_SEQ_MOD; + } + stats->max_seq = seqnr; + } else if (delta < -max_misorder || delta >= max_dropout) { + /* the sequence number made a very large jump */ + if (seqnr == stats->bad_seq && src->packets->head) { + /* two sequential packets -- assume that the other side + * restarted without telling us so just re-sync + * (i.e., pretend this was the first packet). */ init_seq (src, seqnr); } else { - GstBuffer *q; - - GST_DEBUG ("probation %d: queue buffer", src->probation); - /* when still in probation, keep packets in a list. */ - g_queue_push_tail (src->packets, buffer); - /* remove packets from queue if there are too many */ - while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { - q = g_queue_pop_head (src->packets); - gst_buffer_unref (q); - } - goto done; + /* unacceptable jump */ + stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1); + g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->packets); + g_queue_push_tail (src->packets, pinfo->data); + pinfo->data = NULL; + goto bad_sequence; } - } else { - /* unexpected seqnum in probation */ - goto probation_seqnum; - } - } else if (udelta < RTP_MAX_DROPOUT) { - /* in order, with permissible gap */ - if (seqnr < stats->max_seq) { - /* sequence number wrapped - count another 64K cycle. */ - stats->cycles += RTP_SEQ_MOD; - } - stats->max_seq = seqnr; - } else if (udelta <= RTP_SEQ_MOD - RTP_MAX_MISORDER) { - /* the sequence number made a very large jump */ - if (seqnr == stats->bad_seq) { - /* two sequential packets -- assume that the other side - * restarted without telling us so just re-sync - * (i.e., pretend this was the first packet). */ - init_seq (src, seqnr); - } else { - /* unacceptable jump */ - stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1); - goto bad_sequence; + } else { /* delta < 0 && delta >= -max_misorder */ + /* Clear bad packets */ + stats->bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ + g_queue_foreach (src->packets, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->packets); + + /* duplicate or reordered packet, will be filtered by jitterbuffer. */ + GST_INFO ("duplicate or reordered packet (seqnr %u, expected %u)", + seqnr, expected); } - } else { - /* duplicate or reordered packet, will be filtered by jitterbuffer. */ - GST_WARNING ("duplicate or reordered packet"); } - src->stats.octets_received += arrival->payload_len; - src->stats.bytes_received += arrival->bytes; - src->stats.packets_received++; + src->stats.octets_received += pinfo->payload_len; + src->stats.bytes_received += pinfo->bytes; + src->stats.packets_received += pinfo->packets; /* for the bitrate estimation */ - src->bytes_received += arrival->payload_len; - /* the source that sent the packet must be a sender */ - src->is_sender = TRUE; - src->validated = TRUE; + src->bytes_received += pinfo->payload_len; - do_bitrate_estimation (src, arrival->running_time, &src->bytes_received); - - GST_LOG ("seq %d, PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, + GST_LOG ("seq %u, PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, seqnr, src->stats.packets_received, src->stats.octets_received); - /* calculate jitter for the stats */ - calculate_jitter (src, buffer, arrival); - - /* we're ready to push the RTP packet now */ - result = push_packet (src, buffer); - -done: - return result; + return TRUE; /* ERRORS */ +done: + { + return FALSE; + } bad_sequence: { - GST_WARNING ("unacceptable seqnum received"); - gst_buffer_unref (buffer); - return GST_FLOW_OK; + GST_WARNING + ("unacceptable seqnum received (seqnr %u, delta %d, packet_rate: %d, max_dropout: %d, max_misorder: %d)", + seqnr, delta, packet_rate, max_dropout, max_misorder); + return FALSE; } probation_seqnum: { - GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected); - src->probation = RTP_DEFAULT_PROBATION; + GST_WARNING ("probation: seqnr %d != expected %d " + "(SSRC %u curr_probation %i probation %i)", seqnr, expected, src->ssrc, + src->curr_probation, src->probation); + src->curr_probation = src->probation; src->stats.max_seq = seqnr; - gst_buffer_unref (buffer); - return GST_FLOW_OK; + return FALSE; } } /** - * rtp_source_process_bye: + * rtp_source_process_rtp: + * @src: an #RTPSource + * @pinfo: an #RTPPacketInfo + * + * Let @src handle the incoming RTP packet described in @pinfo. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo) +{ + GstFlowReturn result; + + g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); + g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR); + + if (!update_receiver_stats (src, pinfo, TRUE)) + return GST_FLOW_OK; + + /* the source that sent the packet must be a sender */ + src->is_sender = TRUE; + src->validated = TRUE; + + do_bitrate_estimation (src, pinfo->running_time, &src->bytes_received); + + /* calculate jitter for the stats */ + calculate_jitter (src, pinfo); + + /* we're ready to push the RTP packet now */ + result = push_packet (src, pinfo->data); + pinfo->data = NULL; + + return result; +} + +/** + * rtp_source_mark_bye: * @src: an #RTPSource * @reason: the reason for leaving * - * Notify @src that a BYE packet has been received. This will make the source - * inactive. + * Mark @src in the BYE state. This can happen when the source wants to + * leave the sesssion or when a BYE packets has been received. + * + * This will make the source inactive. */ void -rtp_source_process_bye (RTPSource * src, const gchar * reason) +rtp_source_mark_bye (RTPSource * src, const gchar * reason) { g_return_if_fail (RTP_IS_SOURCE (src)); GST_DEBUG ("marking SSRC %08x as BYE, reason: %s", src->ssrc, GST_STR_NULL (reason)); - /* copy the reason and mark as received_bye */ + /* copy the reason and mark as bye */ g_free (src->bye_reason); src->bye_reason = g_strdup (reason); - src->received_bye = TRUE; -} - -static gboolean -set_ssrc (GstBuffer ** buffer, guint idx, RTPSource * src) -{ - GstRTPBuffer rtp = { NULL }; - - *buffer = gst_buffer_make_writable (*buffer); - gst_rtp_buffer_map (*buffer, GST_MAP_WRITE, &rtp); - gst_rtp_buffer_set_ssrc (&rtp, src->ssrc); - gst_rtp_buffer_unmap (&rtp); - return TRUE; + src->marked_bye = TRUE; } /** * rtp_source_send_rtp: * @src: an #RTPSource - * @data: an RTP buffer or a list of RTP buffers - * @is_list: if @data is a buffer or list - * @running_time: the running time of @data + * @pinfo: an #RTPPacketInfo * - * Send @data (an RTP buffer or list of buffers) originating from @src. - * This will make @src a sender. This function takes ownership of @data and + * Send data (an RTP buffer or buffer list from @pinfo) originating from @src. + * This will make @src a sender. This function takes ownership of the data and * modifies the SSRC in the RTP packet to that of @src when needed. * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_source_send_rtp (RTPSource * src, gpointer data, gboolean is_list, - GstClockTime running_time) +rtp_source_send_rtp (RTPSource * src, RTPPacketInfo * pinfo) { GstFlowReturn result; - guint len; + GstClockTime running_time; guint32 rtptime; guint64 ext_rtptime; guint64 rt_diff, rtp_diff; - GstBufferList *list = NULL; - GstBuffer *buffer = NULL; - guint packets; - guint32 ssrc; - GstRTPBuffer rtp = { NULL }; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); - g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR); - - if (is_list) { - list = GST_BUFFER_LIST_CAST (data); - - /* We can grab the caps from the first group, since all - * groups of a buffer list have same caps. */ - buffer = gst_buffer_list_get (list, 0); - if (!buffer) - goto no_buffer; - } else { - buffer = GST_BUFFER_CAST (data); - } /* we are a sender now */ src->is_sender = TRUE; - if (is_list) { - gint i; + /* we are also a receiver of our packets */ + if (!update_receiver_stats (src, pinfo, FALSE)) + return GST_FLOW_OK; - /* Each group makes up a network packet. */ - packets = gst_buffer_list_length (list); - for (i = 0, len = 0; i < packets; i++) { - gst_rtp_buffer_map (gst_buffer_list_get (list, i), GST_MAP_READ, &rtp); - len += gst_rtp_buffer_get_payload_len (&rtp); - gst_rtp_buffer_unmap (&rtp); - } - /* subsequent info taken from first list member */ - gst_rtp_buffer_map (gst_buffer_list_get (list, 0), GST_MAP_READ, &rtp); - } else { - packets = 1; - gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); - len = gst_rtp_buffer_get_payload_len (&rtp); + if (src->pt_set && src->pt != pinfo->pt) { + GST_WARNING ("Changing pt from %u to %u for SSRC %u", src->pt, pinfo->pt, + src->ssrc); } + src->pt = pinfo->pt; + src->pt_set = TRUE; + /* update stats for the SR */ - src->stats.packets_sent += packets; - src->stats.octets_sent += len; - src->bytes_sent += len; + src->stats.packets_sent += pinfo->packets; + src->stats.octets_sent += pinfo->payload_len; + src->bytes_sent += pinfo->payload_len; + + running_time = pinfo->running_time; do_bitrate_estimation (src, running_time, &src->bytes_sent); - rtptime = gst_rtp_buffer_get_timestamp (&rtp); + rtptime = pinfo->rtptime; + ext_rtptime = src->last_rtptime; ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime); @@ -1270,50 +1350,21 @@ rtp_source_send_rtp (RTPSource * src, gpointer data, gboolean is_list, src->last_rtptime = ext_rtptime; /* push packet */ - if (!src->callbacks.push_rtp) { - gst_rtp_buffer_unmap (&rtp); + if (!src->callbacks.push_rtp) goto no_callback; - } - - ssrc = gst_rtp_buffer_get_ssrc (&rtp); - gst_rtp_buffer_unmap (&rtp); - - if (ssrc != src->ssrc) { - /* the SSRC of the packet is not correct, make a writable buffer and - * update the SSRC. This could involve a complete copy of the packet when - * it is not writable. Usually the payloader will use caps negotiation to - * get the correct SSRC from the session manager before pushing anything. */ - - /* FIXME, we don't want to warn yet because we can't inform any payloader - * of the changes SSRC yet because we don't implement pad-alloc. */ - GST_LOG ("updating SSRC from %08x to %08x, fix the payloader", ssrc, - src->ssrc); - if (is_list) { - list = gst_buffer_list_make_writable (list); - gst_buffer_list_foreach (list, (GstBufferListFunc) set_ssrc, src); - } else { - set_ssrc (&buffer, 0, src); - } - } - GST_LOG ("pushing RTP %s %" G_GUINT64_FORMAT, is_list ? "list" : "packet", - src->stats.packets_sent); + GST_LOG ("pushing RTP %s %" G_GUINT64_FORMAT, + pinfo->is_list ? "list" : "packet", src->stats.packets_sent); - result = src->callbacks.push_rtp (src, data, src->user_data); + result = src->callbacks.push_rtp (src, pinfo->data, src->user_data); + pinfo->data = NULL; return result; /* ERRORS */ -no_buffer: - { - GST_WARNING ("no buffers in buffer list"); - gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); - return GST_FLOW_OK; - } no_callback: { GST_WARNING ("no callback installed, dropping packet"); - gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return GST_FLOW_OK; } } @@ -1322,10 +1373,10 @@ no_callback: * rtp_source_process_sr: * @src: an #RTPSource * @time: time of packet arrival - * @ntptime: the NTP time in 32.32 fixed point - * @rtptime: the RTP time + * @ntptime: the NTP time (in NTP Timestamp Format, 32.32 fixed point) + * @rtptime: the RTP time (in clock rate units) * @packet_count: the packet count - * @octet_count: the octect count + * @octet_count: the octet count * * Update the sender report in @src. */ @@ -1369,11 +1420,13 @@ rtp_source_process_sr (RTPSource * src, GstClockTime time, guint64 ntptime, * @src: an #RTPSource * @ntpnstime: the current time in nanoseconds since 1970 * @fractionlost: fraction lost since last SR/RR - * @packetslost: the cumululative number of packets lost + * @packetslost: the cumulative number of packets lost * @exthighestseq: the extended last sequence number received - * @jitter: the interarrival jitter - * @lsr: the last SR packet from this source - * @dlsr: the delay since last SR packet + * @jitter: the interarrival jitter (in clock rate units) + * @lsr: the time of the last SR packet on this source + * (in NTP Short Format, 16.16 fixed point) + * @dlsr: the delay since the last SR packet + * (in NTP Short Format, 16.16 fixed point) * * Update the report block in @src. */ @@ -1429,17 +1482,17 @@ rtp_source_process_rb (RTPSource * src, guint64 ntpnstime, * rtp_source_get_new_sr: * @src: an #RTPSource * @ntpnstime: the current time in nanoseconds since 1970 - * @running_time: the current running_time of the pipeline. - * @ntptime: the NTP time in 32.32 fixed point - * @rtptime: the RTP time corresponding to @ntptime + * @running_time: the current running_time of the pipeline + * @ntptime: the NTP time (in NTP Timestamp Format, 32.32 fixed point) + * @rtptime: the RTP time corresponding to @ntptime (in clock rate units) * @packet_count: the packet count - * @octet_count: the octect count + * @octet_count: the octet count * * Get new values to put into a new SR report from this source. * * @running_time and @ntpnstime are captured at the same time and represent the * running time of the pipeline clock and the absolute current system time in - * nanoseconds respectively. Together with the last running_time and rtp timestamp + * nanoseconds respectively. Together with the last running_time and RTP timestamp * we have observed in the source, we can generate @ntptime and @rtptime for an SR * packet. @ntptime is basically the fixed point representation of @ntpnstime * and @rtptime the associated RTP timestamp. @@ -1468,6 +1521,12 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime, GST_DEBUG ("last_rtime %" GST_TIME_FORMAT ", last_rtptime %" G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_rtime), t_rtp); + if (src->clock_rate == -1 && src->pt_set) { + GST_INFO ("no clock-rate, getting for pt %u and SSRC %u", src->pt, + src->ssrc); + get_clock_rate (src, src->pt); + } + if (src->clock_rate != -1) { /* get the diff between the clock running_time and the buffer running_time. * This is the elapsed time, as measured against the pipeline clock, between @@ -1476,21 +1535,20 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime, * We need to apply this diff to the RTP timestamp to get the RTP timestamp * for the given ntpnstime. */ diff = GST_CLOCK_DIFF (src->last_rtime, running_time); + GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_STIME_FORMAT, + GST_TIME_ARGS (running_time), GST_STIME_ARGS (diff)); /* now translate the diff to RTP time, handle positive and negative cases. * If there is no diff, we already set rtptime correctly above. */ if (diff > 0) { - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); } else { diff = -diff; - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); } } else { - GST_WARNING ("no clock-rate, cannot interpolate rtp time"); + GST_WARNING ("no clock-rate, cannot interpolate rtp time for SSRC %u", + src->ssrc); } /* convert the NTP time in nanoseconds to 32.32 fixed point */ @@ -1517,11 +1575,13 @@ rtp_source_get_new_sr (RTPSource * src, guint64 ntpnstime, * @src: an #RTPSource * @time: the current time of the system clock * @fractionlost: fraction lost since last SR/RR - * @packetslost: the cumululative number of packets lost + * @packetslost: the cumulative number of packets lost * @exthighestseq: the extended last sequence number received - * @jitter: the interarrival jitter - * @lsr: the last SR packet from this source - * @dlsr: the delay since last SR packet + * @jitter: the interarrival jitter (in clock rate units) + * @lsr: the time of the last SR packet on this source + * (in NTP Short Format, 16.16 fixed point) + * @dlsr: the delay since the last SR packet + * (in NTP Short Format, 16.16 fixed point) * * Get new values to put into a new report block from this source. * @@ -1607,10 +1667,10 @@ rtp_source_get_new_rb (RTPSource * src, GstClockTime time, * rtp_source_get_last_sr: * @src: an #RTPSource * @time: time of packet arrival - * @ntptime: the NTP time in 32.32 fixed point - * @rtptime: the RTP time + * @ntptime: the NTP time (in NTP Timestamp Format, 32.32 fixed point) + * @rtptime: the RTP time (in clock rate units) * @packet_count: the packet count - * @octet_count: the octect count + * @octet_count: the octet count * * Get the values of the last sender report as set with rtp_source_process_sr(). * @@ -1646,12 +1706,15 @@ rtp_source_get_last_sr (RTPSource * src, GstClockTime * time, guint64 * ntptime, * rtp_source_get_last_rb: * @src: an #RTPSource * @fractionlost: fraction lost since last SR/RR - * @packetslost: the cumululative number of packets lost + * @packetslost: the cumulative number of packets lost * @exthighestseq: the extended last sequence number received - * @jitter: the interarrival jitter - * @lsr: the last SR packet from this source - * @dlsr: the delay since last SR packet - * @round_trip: the round trip time + * @jitter: the interarrival jitter (in clock rate units) + * @lsr: the time of the last SR packet on this source + * (in NTP Short Format, 16.16 fixed point) + * @dlsr: the delay since the last SR packet + * (in NTP Short Format, 16.16 fixed point) + * @round_trip: the round-trip time + * (in NTP Short Format, 16.16 fixed point) * * Get the values of the last RB report set with rtp_source_process_rb(). * @@ -1688,6 +1751,67 @@ rtp_source_get_last_rb (RTPSource * src, guint8 * fractionlost, return TRUE; } +gboolean +find_conflicting_address (GList * conflicting_addresses, + GSocketAddress * address, GstClockTime time) +{ + GList *item; + + for (item = conflicting_addresses; item; item = g_list_next (item)) { + RTPConflictingAddress *known_conflict = item->data; + + if (__g_socket_address_equal (address, known_conflict->address)) { + known_conflict->time = time; + return TRUE; + } + } + + return FALSE; +} + +GList * +add_conflicting_address (GList * conflicting_addresses, + GSocketAddress * address, GstClockTime time) +{ + RTPConflictingAddress *new_conflict; + + new_conflict = g_slice_new (RTPConflictingAddress); + + new_conflict->address = G_SOCKET_ADDRESS (g_object_ref (address)); + new_conflict->time = time; + + return g_list_prepend (conflicting_addresses, new_conflict); +} + +GList * +timeout_conflicting_addresses (GList * conflicting_addresses, + GstClockTime current_time) +{ + GList *item; + /* "a relatively long time" -- RFC 3550 section 8.2 */ + const GstClockTime collision_timeout = + RTP_STATS_MIN_INTERVAL * GST_SECOND * 10; + + item = g_list_first (conflicting_addresses); + while (item) { + RTPConflictingAddress *known_conflict = item->data; + GList *next_item = g_list_next (item); + + if (known_conflict->time < current_time - collision_timeout) { + gchar *buf; + + conflicting_addresses = g_list_delete_link (conflicting_addresses, item); + buf = __g_socket_address_to_string (known_conflict->address); + GST_DEBUG ("collision %p timed out: %s", known_conflict, buf); + g_free (buf); + rtp_conflicting_address_free (known_conflict); + } + item = next_item; + } + + return conflicting_addresses; +} + /** * rtp_source_find_conflicting_address: * @src: The source the packet came in @@ -1700,22 +1824,10 @@ rtp_source_get_last_rb (RTPSource * src, guint8 * fractionlost, * Returns: TRUE if it was a known conflict, FALSE otherwise */ gboolean -rtp_source_find_conflicting_address (RTPSource * src, GstNetAddress * address, +rtp_source_find_conflicting_address (RTPSource * src, GSocketAddress * address, GstClockTime time) { - GList *item; - - for (item = g_list_first (src->conflicting_addresses); - item; item = g_list_next (item)) { - RTPConflictingAddress *known_conflict = item->data; - - if (gst_net_address_equal (address, &known_conflict->address)) { - known_conflict->time = time; - return TRUE; - } - } - - return FALSE; + return find_conflicting_address (src->conflicting_addresses, address, time); } /** @@ -1728,24 +1840,16 @@ rtp_source_find_conflicting_address (RTPSource * src, GstNetAddress * address, */ void rtp_source_add_conflicting_address (RTPSource * src, - GstNetAddress * address, GstClockTime time) + GSocketAddress * address, GstClockTime time) { - RTPConflictingAddress *new_conflict; - - new_conflict = g_new0 (RTPConflictingAddress, 1); - - memcpy (&new_conflict->address, address, sizeof (GstNetAddress)); - new_conflict->time = time; - - src->conflicting_addresses = g_list_prepend (src->conflicting_addresses, - new_conflict); + src->conflicting_addresses = + add_conflicting_address (src->conflicting_addresses, address, time); } /** * rtp_source_timeout: * @src: The #RTPSource * @current_time: The current time - * @collision_timeout: The amount of time after which a collision is timed out * @feedback_retention_window: The running time before which retained feedback * packets have to be discarded * @@ -1754,32 +1858,33 @@ rtp_source_add_conflicting_address (RTPSource * src, */ void rtp_source_timeout (RTPSource * src, GstClockTime current_time, - GstClockTime collision_timeout, GstClockTime feedback_retention_window) + GstClockTime running_time, GstClockTime feedback_retention_window) { - GList *item; GstRTCPPacket *pkt; + GstClockTime max_pts_window; + guint pruned = 0; - item = g_list_first (src->conflicting_addresses); - while (item) { - RTPConflictingAddress *known_conflict = item->data; - GList *next_item = g_list_next (item); + src->conflicting_addresses = + timeout_conflicting_addresses (src->conflicting_addresses, current_time); - if (known_conflict->time < current_time - collision_timeout) { - gchar buf[40]; - - src->conflicting_addresses = - g_list_delete_link (src->conflicting_addresses, item); - gst_net_address_to_string (&known_conflict->address, buf, 40); - GST_DEBUG ("collision %p timed out: %s", known_conflict, buf); - g_free (known_conflict); - } - item = next_item; + if (feedback_retention_window == GST_CLOCK_TIME_NONE || + running_time < feedback_retention_window) { + return; } + max_pts_window = running_time - feedback_retention_window; + /* Time out AVPF packets that are older than the desired length */ - while ((pkt = g_queue_peek_tail (src->retained_feedback)) && - GST_BUFFER_TIMESTAMP (pkt) < feedback_retention_window) - gst_buffer_unref (g_queue_pop_tail (src->retained_feedback)); + while ((pkt = g_queue_peek_head (src->retained_feedback)) && + GST_BUFFER_PTS (pkt) < max_pts_window) { + gst_buffer_unref (g_queue_pop_head (src->retained_feedback)); + pruned++; + } + + GST_LOG_OBJECT (src, + "%u RTCP packets pruned with PTS less than %" GST_TIME_FORMAT + ", queue len: %u", pruned, GST_TIME_ARGS (max_pts_window), + g_queue_get_length (src->retained_feedback)); } static gint @@ -1788,7 +1893,16 @@ compare_buffers (gconstpointer a, gconstpointer b, gpointer user_data) const GstBuffer *bufa = a; const GstBuffer *bufb = b; - return GST_BUFFER_TIMESTAMP (bufa) - GST_BUFFER_TIMESTAMP (bufb); + g_return_val_if_fail (GST_BUFFER_PTS (bufa) != GST_CLOCK_TIME_NONE, -1); + g_return_val_if_fail (GST_BUFFER_PTS (bufb) != GST_CLOCK_TIME_NONE, 1); + + if (GST_BUFFER_PTS (bufa) < GST_BUFFER_PTS (bufb)) { + return -1; + } else if (GST_BUFFER_PTS (bufa) > GST_BUFFER_PTS (bufb)) { + return 1; + } + + return 0; } void @@ -1797,12 +1911,17 @@ rtp_source_retain_rtcp_packet (RTPSource * src, GstRTCPPacket * packet, { GstBuffer *buffer; + g_return_if_fail (running_time != GST_CLOCK_TIME_NONE); + buffer = gst_buffer_copy_region (packet->rtcp->buffer, GST_BUFFER_COPY_MEMORY, packet->offset, (gst_rtcp_packet_get_length (packet) + 1) * 4); - GST_BUFFER_TIMESTAMP (buffer) = running_time; + GST_BUFFER_PTS (buffer) = running_time; g_queue_insert_sorted (src->retained_feedback, buffer, compare_buffers, NULL); + + GST_LOG_OBJECT (src, "RTCP packet retained with PTS: %" GST_TIME_FORMAT, + GST_TIME_ARGS (running_time)); } gboolean @@ -1813,3 +1932,108 @@ rtp_source_has_retained (RTPSource * src, GCompareFunc func, gconstpointer data) else return FALSE; } + +/** + * rtp_source_register_nack: + * @src: The #RTPSource + * @seqnum: a seqnum + * @deadline: the deadline before which RTX is still possible + * + * Register that @seqnum has not been received from @src. + */ +void +rtp_source_register_nack (RTPSource * src, guint16 seqnum, + GstClockTime deadline) +{ + gint i; + guint len; + gint diff = -1; + guint16 tseq; + + len = src->nacks->len; + for (i = len - 1; i >= 0; i--) { + tseq = g_array_index (src->nacks, guint16, i); + diff = gst_rtp_buffer_compare_seqnum (tseq, seqnum); + + GST_TRACE ("[%u] %u %u diff %i len %u", i, tseq, seqnum, diff, len); + + if (diff >= 0) + break; + } + + if (diff == 0) { + GST_DEBUG ("update NACK #%u deadline to %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_index (src->nack_deadlines, GstClockTime, i) = deadline; + } else if (i == len - 1) { + GST_DEBUG ("append NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_append_val (src->nacks, seqnum); + g_array_append_val (src->nack_deadlines, deadline); + } else { + GST_DEBUG ("insert NACK #%u with deadline %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (deadline)); + g_array_insert_val (src->nacks, i + 1, seqnum); + g_array_insert_val (src->nack_deadlines, i + 1, deadline); + } + + src->send_nack = TRUE; +} + +/** + * rtp_source_get_nacks: + * @src: The #RTPSource + * @n_nacks: result number of nacks + * + * Get the registered NACKS since the last rtp_source_clear_nacks(). + * + * Returns: an array of @n_nacks seqnum values. + */ +guint16 * +rtp_source_get_nacks (RTPSource * src, guint * n_nacks) +{ + if (n_nacks) + *n_nacks = src->nacks->len; + + return (guint16 *) src->nacks->data; +} + +/** + * rtp_source_get_nacks: + * @src: The #RTPSource + * @n_nacks: result number of nacks + * + * Get the registered NACKS deadlines. + * + * Returns: an array of @n_nacks deadline values. + */ +GstClockTime * +rtp_source_get_nack_deadlines (RTPSource * src, guint * n_nacks) +{ + if (n_nacks) + *n_nacks = src->nack_deadlines->len; + + return (GstClockTime *) src->nack_deadlines->data; +} + +/** + * rtp_source_clear_nacks: + * @src: The #RTPSource + * @n_nacks: number of nacks + * + * Remove @n_nacks oldest NACKS form array. + */ +void +rtp_source_clear_nacks (RTPSource * src, guint n_nacks) +{ + g_return_if_fail (n_nacks <= src->nacks->len); + + if (src->nacks->len == n_nacks) { + g_array_set_size (src->nacks, 0); + g_array_set_size (src->nack_deadlines, 0); + src->send_nack = FALSE; + } else { + g_array_remove_range (src->nacks, 0, n_nacks); + g_array_remove_range (src->nack_deadlines, 0, n_nacks); + } +}