rtp: make rtp packet probation configurable (bug #682512)
authorAleix Conchillo Flaque <aleix@oblong.com>
Wed, 22 Aug 2012 23:36:21 +0000 (16:36 -0700)
committerWim Taymans <wim.taymans@collabora.co.uk>
Thu, 30 Aug 2012 19:49:57 +0000 (21:49 +0200)
gst/rtpmanager/gstrtpsession.c
gst/rtpmanager/rtpsession.c
gst/rtpmanager/rtpsession.h
gst/rtpmanager/rtpsource.c
gst/rtpmanager/rtpsource.h
gst/rtsp/gstrtspsrc.c
gst/rtsp/gstrtspsrc.h

index 42fe09e..5863650 100644 (file)
@@ -201,6 +201,7 @@ enum
 #define DEFAULT_NUM_ACTIVE_SOURCES   0
 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
+#define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
 
 enum
 {
@@ -215,6 +216,7 @@ enum
   PROP_INTERNAL_SESSION,
   PROP_USE_PIPELINE_CLOCK,
   PROP_RTCP_MIN_INTERVAL,
+  PROP_PROBATION,
   PROP_LAST
 };
 
@@ -572,6 +574,12 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
           0, G_MAXUINT64, DEFAULT_RTCP_MIN_INTERVAL,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_PROBATION,
+      g_param_spec_uint ("probation", "Number of probations",
+          "Consecutive packet sequence numbers to accept the source",
+          0, G_MAXUINT, DEFAULT_PROBATION,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
   gstelement_class->request_new_pad =
@@ -696,6 +704,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
       g_object_set_property (G_OBJECT (priv->session), "rtcp-min-interval",
           value);
       break;
+    case PROP_PROBATION:
+      g_object_set_property (G_OBJECT (priv->session), "probation", value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -747,6 +758,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
       g_object_get_property (G_OBJECT (priv->session), "rtcp-min-interval",
           value);
       break;
+    case PROP_PROBATION:
+      g_object_get_property (G_OBJECT (priv->session), "probation", value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index fa55c16..f008ef7 100644 (file)
@@ -66,6 +66,7 @@ enum
 #define DEFAULT_RTCP_MIN_INTERVAL    (RTP_STATS_MIN_INTERVAL * GST_SECOND)
 #define DEFAULT_RTCP_FEEDBACK_RETENTION_WINDOW (2 * GST_SECOND)
 #define DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD (3)
+#define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
 
 enum
 {
@@ -85,6 +86,7 @@ enum
   PROP_RTCP_MIN_INTERVAL,
   PROP_RTCP_FEEDBACK_RETENTION_WINDOW,
   PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
+  PROP_PROBATION,
   PROP_LAST
 };
 
@@ -439,6 +441,12 @@ rtp_session_class_init (RTPSessionClass * klass)
           0, G_MAXUINT, DEFAULT_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_PROBATION,
+      g_param_spec_uint ("probation", "Number of probations",
+          "Consecutive packet sequence numbers to accept the source",
+          0, G_MAXUINT, DEFAULT_PROBATION,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   klass->get_source_by_ssrc =
       GST_DEBUG_FUNCPTR (rtp_session_get_source_by_ssrc);
   klass->on_sending_rtcp = GST_DEBUG_FUNCPTR (rtp_session_on_sending_rtcp);
@@ -489,6 +497,8 @@ rtp_session_init (RTPSession * sess)
   sess->header_len = 28;
   sess->mtu = DEFAULT_RTCP_MTU;
 
+  sess->probation = DEFAULT_PROBATION;
+
   /* some default SDES entries */
 
   /* we do not want to leak details like the username or hostname here */
@@ -616,6 +626,10 @@ rtp_session_set_property (GObject * object, guint prop_id,
     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
       sess->rtcp_immediate_feedback_threshold = g_value_get_uint (value);
       break;
+    case PROP_PROBATION:
+      sess->probation = g_value_get_uint (value);
+      g_object_set_property (G_OBJECT (sess->source), "probation", value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -673,6 +687,10 @@ rtp_session_get_property (GObject * object, guint prop_id,
     case PROP_RTCP_IMMEDIATE_FEEDBACK_THRESHOLD:
       g_value_set_uint (value, sess->rtcp_immediate_feedback_threshold);
       break;
+    case PROP_PROBATION:
+      g_value_set_uint (value, sess->probation);
+      g_object_get_property (G_OBJECT (sess->source), "probation", value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1344,9 +1362,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
      * packets of an SSRC, on the other hand, is a strong indication that we
      * are dealing with a valid source. */
     if (rtp)
-      source->probation = RTP_DEFAULT_PROBATION;
+      g_object_set (source, "probation", sess->probation, NULL);
     else
-      source->probation = 0;
+      g_object_set (source, "probation", 0, NULL);
 
     /* store from address, if any */
     if (arrival->address) {
index 8ca9458..d1cdcdc 100644 (file)
@@ -79,7 +79,7 @@ typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, gp
  *
  * Returns: a #GstFlowReturn.
  */
-typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, 
+typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer,
     gboolean eos, gpointer user_data);
 
 /**
@@ -113,7 +113,7 @@ typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer
  * @sess: an #RTPSession
  * @user_data: user data specified when registering
  *
- * This callback will be called when @sess needs to cancel the current timeout. 
+ * This callback will be called when @sess needs to cancel the current timeout.
  * The currently running timeout should be canceled and a new reporting interval
  * should be requested from @sess.
  */
@@ -189,6 +189,8 @@ struct _RTPSession {
   guint         header_len;
   guint         mtu;
 
+  guint         probation;
+
   /* bandwidths */
   gboolean     recalc_bandwidth;
   guint        bandwidth;
index 611cca7..a42f98f 100644 (file)
@@ -39,6 +39,7 @@ enum
 #define DEFAULT_IS_VALIDATED         FALSE
 #define DEFAULT_IS_SENDER            FALSE
 #define DEFAULT_SDES                 NULL
+#define DEFAULT_PROBATION            RTP_DEFAULT_PROBATION
 
 enum
 {
@@ -49,6 +50,7 @@ enum
   PROP_IS_SENDER,
   PROP_SDES,
   PROP_STATS,
+  PROP_PROBATION,
   PROP_LAST
 };
 
@@ -199,6 +201,12 @@ rtp_source_class_init (RTPSourceClass * klass)
           "The stats of this source", GST_TYPE_STRUCTURE,
           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_PROBATION,
+      g_param_spec_uint ("probation", "Number of probations",
+          "Consecutive packet sequence numbers to accept the source",
+          0, G_MAXUINT, DEFAULT_PROBATION,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source");
 }
 
@@ -227,7 +235,8 @@ rtp_source_init (RTPSource * src)
    * packets or a valid RTCP packet */
   src->validated = FALSE;
   src->internal = FALSE;
-  src->probation = RTP_DEFAULT_PROBATION;
+  src->probation = DEFAULT_PROBATION;
+  src->curr_probation = src->probation;
   src->closing = FALSE;
 
   src->sdes = gst_structure_new_empty ("application/x-rtp-source-sdes");
@@ -461,6 +470,9 @@ rtp_source_set_property (GObject * object, guint prop_id,
     case PROP_SSRC:
       src->ssrc = g_value_get_uint (value);
       break;
+    case PROP_PROBATION:
+      src->probation = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -494,6 +506,9 @@ rtp_source_get_property (GObject * object, guint prop_id,
     case PROP_STATS:
       g_value_take_boxed (value, rtp_source_create_stats (src));
       break;
+    case PROP_PROBATION:
+      g_value_set_uint (value, src->probation);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1057,28 +1072,28 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
     /* first time we heard of this source */
     init_seq (src, seqnr);
     src->stats.max_seq = seqnr - 1;
-    src->probation = RTP_DEFAULT_PROBATION;
+    src->curr_probation = src->probation;
   }
 
   udelta = seqnr - stats->max_seq;
 
   /* if we are still on probation, check seqnum */
-  if (src->probation) {
+  if (src->curr_probation) {
     expected = src->stats.max_seq + 1;
 
     /* when in probation, we require consecutive seqnums */
     if (seqnr == expected) {
       /* expected packet */
       GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected);
-      src->probation--;
+      src->curr_probation--;
       src->stats.max_seq = seqnr;
-      if (src->probation == 0) {
+      if (src->curr_probation == 0) {
         GST_DEBUG ("probation done!");
         init_seq (src, seqnr);
       } else {
         GstBuffer *q;
 
-        GST_DEBUG ("probation %d: queue buffer", src->probation);
+        GST_DEBUG ("probation %d: queue buffer", src->curr_probation);
         /* when still in probation, keep packets in a list. */
         g_queue_push_tail (src->packets, buffer);
         /* remove packets from queue if there are too many */
@@ -1113,7 +1128,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
     }
   } else {
     /* duplicate or reordered packet, will be filtered by jitterbuffer. */
-    GST_WARNING ("duplicate or reordered packet");
+    GST_WARNING ("duplicate or reordered packet (seqnr %d)", seqnr);
   }
 
   src->stats.octets_received += arrival->payload_len;
@@ -1149,7 +1164,7 @@ bad_sequence:
 probation_seqnum:
   {
     GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
-    src->probation = RTP_DEFAULT_PROBATION;
+    src->curr_probation = src->probation;
     src->stats.max_seq = seqnr;
     gst_buffer_unref (buffer);
     return GST_FLOW_OK;
index 4adbf70..7414f94 100644 (file)
@@ -72,7 +72,7 @@ typedef struct _RTPSourceClass RTPSourceClass;
  *
  * Returns: a #GstFlowReturn.
  */
-typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, 
+typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer,
        gpointer user_data);
 
 /**
@@ -127,7 +127,8 @@ struct _RTPSource {
   /*< private >*/
   guint32       ssrc;
 
-  gint          probation;
+  guint         probation;
+  guint         curr_probation;
   gboolean      validated;
   gboolean      internal;
   gboolean      is_csrc;
index 868f85f..e592850 100644 (file)
@@ -179,6 +179,7 @@ gst_rtsp_src_buffer_mode_get_type (void)
 #define DEFAULT_BUFFER_MODE      BUFFER_MODE_AUTO
 #define DEFAULT_PORT_RANGE       NULL
 #define DEFAULT_SHORT_HEADER     FALSE
+#define DEFAULT_PROBATION        2
 
 enum
 {
@@ -203,6 +204,7 @@ enum
   PROP_PORT_RANGE,
   PROP_UDP_BUFFER_SIZE,
   PROP_SHORT_HEADER,
+  PROP_PROBATION,
   PROP_LAST
 };
 
@@ -482,6 +484,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
           "Only send the basic RTSP headers for broken encoders",
           DEFAULT_SHORT_HEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_PROBATION,
+      g_param_spec_uint ("probation", "Number of probations",
+          "Consecutive packet sequence numbers to accept the source",
+          0, G_MAXUINT, DEFAULT_PROBATION,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gstelement_class->send_event = gst_rtspsrc_send_event;
   gstelement_class->change_state = gst_rtspsrc_change_state;
 
@@ -525,6 +533,7 @@ gst_rtspsrc_init (GstRTSPSrc * src)
   src->client_port_range.max = 0;
   src->udp_buffer_size = DEFAULT_UDP_BUFFER_SIZE;
   src->short_header = DEFAULT_SHORT_HEADER;
+  src->probation = DEFAULT_PROBATION;
 
   /* get a list of all extensions */
   src->extensions = gst_rtsp_ext_list_get ();
@@ -719,6 +728,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
     case PROP_SHORT_HEADER:
       rtspsrc->short_header = g_value_get_boolean (value);
       break;
+    case PROP_PROBATION:
+      rtspsrc->probation = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -820,6 +832,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_SHORT_HEADER:
       g_value_set_boolean (value, rtspsrc->short_header);
       break;
+    case PROP_PROBATION:
+      g_value_set_uint (value, rtspsrc->probation);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -2475,6 +2490,9 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
           g_object_set (rtpsession, "rtcp-rs-bandwidth", stream->rs_bandwidth,
               NULL);
         }
+
+        g_object_set (rtpsession, "probation", src->probation, NULL);
+
         g_signal_connect (rtpsession, "on-bye-ssrc", (GCallback) on_bye_ssrc,
             stream);
         g_signal_connect (rtpsession, "on-bye-timeout", (GCallback) on_timeout,
index e35cac5..1f8ee22 100644 (file)
@@ -217,6 +217,7 @@ struct _GstRTSPSrc {
   GstRTSPRange      client_port_range;
   gint              udp_buffer_size;
   gboolean          short_header;
+  guint             probation;
 
   /* state */
   GstRTSPState       state;