From: Aleix Conchillo Flaque Date: Wed, 22 Aug 2012 23:36:21 +0000 (-0700) Subject: rtp: make rtp packet probation configurable (bug #682512) X-Git-Tag: 1.19.3~509^2~6744 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=4a200b670f0cf0da01818a70cdaad910f9b53ebf;p=platform%2Fupstream%2Fgstreamer.git rtp: make rtp packet probation configurable (bug #682512) --- diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 42fe09e..5863650 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -201,6 +201,7 @@ enum #define DEFAULT_NUM_ACTIVE_SOURCES 0 #define DEFAULT_USE_PIPELINE_CLOCK FALSE #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND) +#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION enum { @@ -215,6 +216,7 @@ enum PROP_INTERNAL_SESSION, PROP_USE_PIPELINE_CLOCK, PROP_RTCP_MIN_INTERVAL, + PROP_PROBATION, PROP_LAST }; @@ -572,6 +574,12 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) 0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PROBATION, + g_param_spec_uint ("probation", "Number of probations", + "Consecutive packet sequence numbers to accept the source", + 0, G_MAXUINT, DEFAULT_PROBATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); gstelement_class->request_new_pad = @@ -696,6 +704,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id, g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval", value); break; + case PROP_PROBATION: + g_object_set_property (G_OBJECT (priv->session), "probation", value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -747,6 +758,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval", value); break; + case PROP_PROBATION: + g_object_get_property (G_OBJECT (priv->session), "probation", value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index fa55c16..f008ef7 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -66,6 +66,7 @@ enum #define DEFAULT_RTCP_MIN_INTERVAL (RTP_STATS_MIN_INTERVAL * GST_SECOND) #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND) #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3) +#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION enum { @@ -85,6 +86,7 @@ enum PROP_RTCP_MIN_INTERVAL, PROP_RTCP_FEEDBACK_RETENTION_WINDOW, PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, + PROP_PROBATION, PROP_LAST }; @@ -439,6 +441,12 @@ rtp_session_class_init (RTPSessionClass * klass) 0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PROBATION, + g_param_spec_uint ("probation", "Number of probations", + "Consecutive packet sequence numbers to accept the source", + 0, G_MAXUINT, DEFAULT_PROBATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + klass->get_source_by_ssrc = GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc); klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp); @@ -489,6 +497,8 @@ rtp_session_init (RTPSession * sess) sess->header_len = 28; sess->mtu = DEFAULT_RTCP_MTU; + sess->probation = DEFAULT_PROBATION; + /* some default SDES entries */ /* we do not want to leak details like the username or hostname here */ @@ -616,6 +626,10 @@ rtp_session_set_property (GObject * object, guint prop_id, case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value); break; + case PROP_PROBATION: + sess->probation = g_value_get_uint (value); + g_object_set_property (G_OBJECT (sess->source), "probation", value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -673,6 +687,10 @@ rtp_session_get_property (GObject * object, guint prop_id, case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD: g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold); break; + case PROP_PROBATION: + g_value_set_uint (value, sess->probation); + g_object_get_property (G_OBJECT (sess->source), "probation", value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1344,9 +1362,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, * packets of an SSRC, on the other hand, is a strong indication that we * are dealing with a valid source. */ if (rtp) - source->probation = RTP_DEFAULT_PROBATION; + g_object_set (source, "probation", sess->probation, NULL); else - source->probation = 0; + g_object_set (source, "probation", 0, NULL); /* store from address, if any */ if (arrival->address) { diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 8ca9458..d1cdcdc 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -79,7 +79,7 @@ typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, gp * * Returns: a #GstFlowReturn. */ -typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, +typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gboolean eos, gpointer user_data); /** @@ -113,7 +113,7 @@ typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer * @sess: an #RTPSession * @user_data: user data specified when registering * - * This callback will be called when @sess needs to cancel the current timeout. + * This callback will be called when @sess needs to cancel the current timeout. * The currently running timeout should be canceled and a new reporting interval * should be requested from @sess. */ @@ -189,6 +189,8 @@ struct _RTPSession { guint header_len; guint mtu; + guint probation; + /* bandwidths */ gboolean recalc_bandwidth; guint bandwidth; diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 611cca7..a42f98f 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -39,6 +39,7 @@ enum #define DEFAULT_IS_VALIDATED FALSE #define DEFAULT_IS_SENDER FALSE #define DEFAULT_SDES NULL +#define DEFAULT_PROBATION RTP_DEFAULT_PROBATION enum { @@ -49,6 +50,7 @@ enum PROP_IS_SENDER, PROP_SDES, PROP_STATS, + PROP_PROBATION, PROP_LAST }; @@ -199,6 +201,12 @@ rtp_source_class_init (RTPSourceClass * klass) "The stats of this source", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PROBATION, + g_param_spec_uint ("probation", "Number of probations", + "Consecutive packet sequence numbers to accept the source", + 0, G_MAXUINT, DEFAULT_PROBATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source"); } @@ -227,7 +235,8 @@ rtp_source_init (RTPSource * src) * packets or a valid RTCP packet */ src->validated = FALSE; src->internal = FALSE; - src->probation = RTP_DEFAULT_PROBATION; + src->probation = DEFAULT_PROBATION; + src->curr_probation = src->probation; src->closing = FALSE; src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes"); @@ -461,6 +470,9 @@ rtp_source_set_property (GObject * object, guint prop_id, case PROP_SSRC: src->ssrc = g_value_get_uint (value); break; + case PROP_PROBATION: + src->probation = g_value_get_uint (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -494,6 +506,9 @@ rtp_source_get_property (GObject * object, guint prop_id, case PROP_STATS: g_value_take_boxed (value, rtp_source_create_stats (src)); break; + case PROP_PROBATION: + g_value_set_uint (value, src->probation); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1057,28 +1072,28 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, /* first time we heard of this source */ init_seq (src, seqnr); src->stats.max_seq = seqnr - 1; - src->probation = RTP_DEFAULT_PROBATION; + src->curr_probation = src->probation; } udelta = seqnr - stats->max_seq; /* if we are still on probation, check seqnum */ - if (src->probation) { + if (src->curr_probation) { expected = src->stats.max_seq + 1; /* when in probation, we require consecutive seqnums */ if (seqnr == expected) { /* expected packet */ GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); - src->probation--; + src->curr_probation--; src->stats.max_seq = seqnr; - if (src->probation == 0) { + if (src->curr_probation == 0) { GST_DEBUG ("probation done!"); init_seq (src, seqnr); } else { GstBuffer *q; - GST_DEBUG ("probation %d: queue buffer", src->probation); + GST_DEBUG ("probation %d: queue buffer", src->curr_probation); /* when still in probation, keep packets in a list. */ g_queue_push_tail (src->packets, buffer); /* remove packets from queue if there are too many */ @@ -1113,7 +1128,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, } } else { /* duplicate or reordered packet, will be filtered by jitterbuffer. */ - GST_WARNING ("duplicate or reordered packet"); + GST_WARNING ("duplicate or reordered packet (seqnr %d)", seqnr); } src->stats.octets_received += arrival->payload_len; @@ -1149,7 +1164,7 @@ bad_sequence: probation_seqnum: { GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected); - src->probation = RTP_DEFAULT_PROBATION; + src->curr_probation = src->probation; src->stats.max_seq = seqnr; gst_buffer_unref (buffer); return GST_FLOW_OK; diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index 4adbf70..7414f94 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -72,7 +72,7 @@ typedef struct _RTPSourceClass RTPSourceClass; * * Returns: a #GstFlowReturn. */ -typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, +typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, gpointer user_data); /** @@ -127,7 +127,8 @@ struct _RTPSource { /*< private >*/ guint32 ssrc; - gint probation; + guint probation; + guint curr_probation; gboolean validated; gboolean internal; gboolean is_csrc; diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 868f85f..e592850 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -179,6 +179,7 @@ gst_rtsp_src_buffer_mode_get_type (void) #define DEFAULT_BUFFER_MODE BUFFER_MODE_AUTO #define DEFAULT_PORT_RANGE NULL #define DEFAULT_SHORT_HEADER FALSE +#define DEFAULT_PROBATION 2 enum { @@ -203,6 +204,7 @@ enum PROP_PORT_RANGE, PROP_UDP_BUFFER_SIZE, PROP_SHORT_HEADER, + PROP_PROBATION, PROP_LAST }; @@ -482,6 +484,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass) "Only send the basic RTSP headers for broken encoders", DEFAULT_SHORT_HEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PROBATION, + g_param_spec_uint ("probation", "Number of probations", + "Consecutive packet sequence numbers to accept the source", + 0, G_MAXUINT, DEFAULT_PROBATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->send_event = gst_rtspsrc_send_event; gstelement_class->change_state = gst_rtspsrc_change_state; @@ -525,6 +533,7 @@ gst_rtspsrc_init (GstRTSPSrc * src) src->client_port_range.max = 0; src->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE; src->short_header = DEFAULT_SHORT_HEADER; + src->probation = DEFAULT_PROBATION; /* get a list of all extensions */ src->extensions = gst_rtsp_ext_list_get (); @@ -719,6 +728,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value, case PROP_SHORT_HEADER: rtspsrc->short_header = g_value_get_boolean (value); break; + case PROP_PROBATION: + rtspsrc->probation = g_value_get_uint (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -820,6 +832,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_SHORT_HEADER: g_value_set_boolean (value, rtspsrc->short_header); break; + case PROP_PROBATION: + g_value_set_uint (value, rtspsrc->probation); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2475,6 +2490,9 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth, NULL); } + + g_object_set (rtpsession, "probation", src->probation, NULL); + g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc, stream); g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout, diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index e35cac5..1f8ee22 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -217,6 +217,7 @@ struct _GstRTSPSrc { GstRTSPRange client_port_range; gint udp_buffer_size; gboolean short_header; + guint probation; /* state */ GstRTSPState state;