client: Add drop-backlog property
authorGöran Jönsson <goranjn@axis.com>
Tue, 1 Apr 2014 11:04:21 +0000 (13:04 +0200)
committerWim Taymans <wtaymans@redhat.com>
Thu, 10 Apr 2014 14:08:06 +0000 (16:08 +0200)
When we have too many messages queued for a client (currently hardcoded
to 100) we overflow and drop the messages. Add a drop-backlog property
to control this behaviour. Setting this property to FALSE will retry
to send the messages to the client by waiting for more room in the
backlog.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=725898

gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-media.c

index 8b253fa..f2c146e 100644 (file)
@@ -81,6 +81,8 @@ struct _GstRTSPClientPrivate
 
   GList *transports;
   GList *sessions;
+
+  gboolean drop_backlog;
 };
 
 static GMutex tunnels_lock;
@@ -88,12 +90,14 @@ static GHashTable *tunnels;     /* protected by tunnels_lock */
 
 #define DEFAULT_SESSION_POOL            NULL
 #define DEFAULT_MOUNT_POINTS            NULL
+#define DEFAULT_DROP_BACKLOG            TRUE
 
 enum
 {
   PROP_0,
   PROP_SESSION_POOL,
   PROP_MOUNT_POINTS,
+  PROP_DROP_BACKLOG,
   PROP_LAST
 };
 
@@ -174,6 +178,11 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass)
           GST_TYPE_RTSP_MOUNT_POINTS,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_DROP_BACKLOG,
+      g_param_spec_boolean ("drop-backlog", "Drop Backlog",
+          "Drop data when the backlog queue is full",
+          DEFAULT_DROP_BACKLOG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gst_rtsp_client_signals[SIGNAL_CLOSED] =
       g_signal_new ("closed", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstRTSPClientClass, closed), NULL, NULL,
@@ -255,6 +264,7 @@ gst_rtsp_client_init (GstRTSPClient * client)
   g_mutex_init (&priv->lock);
   g_mutex_init (&priv->send_lock);
   priv->close_seq = 0;
+  priv->drop_backlog = DEFAULT_DROP_BACKLOG;
 }
 
 static GstRTSPFilterResult
@@ -341,6 +351,8 @@ gst_rtsp_client_finalize (GObject * obj)
 
   GST_INFO ("finalize client %p", client);
 
+  if (priv->watch)
+    gst_rtsp_watch_set_flushing (priv->watch, TRUE);
   gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
 
   if (priv->watch)
@@ -378,6 +390,7 @@ gst_rtsp_client_get_property (GObject * object, guint propid,
     GValue * value, GParamSpec * pspec)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (object);
+  GstRTSPClientPrivate *priv = client->priv;
 
   switch (propid) {
     case PROP_SESSION_POOL:
@@ -386,6 +399,9 @@ gst_rtsp_client_get_property (GObject * object, guint propid,
     case PROP_MOUNT_POINTS:
       g_value_take_object (value, gst_rtsp_client_get_mount_points (client));
       break;
+    case PROP_DROP_BACKLOG:
+      g_value_set_boolean (value, priv->drop_backlog);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
   }
@@ -396,6 +412,7 @@ gst_rtsp_client_set_property (GObject * object, guint propid,
     const GValue * value, GParamSpec * pspec)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (object);
+  GstRTSPClientPrivate *priv = client->priv;
 
   switch (propid) {
     case PROP_SESSION_POOL:
@@ -404,6 +421,11 @@ gst_rtsp_client_set_property (GObject * object, guint propid,
     case PROP_MOUNT_POINTS:
       gst_rtsp_client_set_mount_points (client, g_value_get_object (value));
       break;
+    case PROP_DROP_BACKLOG:
+      g_mutex_lock (&priv->lock);
+      priv->drop_backlog = g_value_get_boolean (value);
+      g_mutex_unlock (&priv->lock);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
   }
@@ -2842,11 +2864,42 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message,
     gboolean close, gpointer user_data)
 {
   GstRTSPClientPrivate *priv = client->priv;
+  GstRTSPResult ret;
+  GTimeVal time;
+
+  time.tv_sec = 1;
+  time.tv_usec = 0;
+
+  do {
+    /* send the response and store the seq number so we can wait until it's
+     * written to the client to close the connection */
+    ret =
+        gst_rtsp_watch_send_message (priv->watch, message,
+        close ? &priv->close_seq : NULL);
+    if (ret == GST_RTSP_OK)
+      break;
 
-  /* send the response and store the seq number so we can wait until it's
-   * written to the client to close the connection */
-  return gst_rtsp_watch_send_message (priv->watch, message, close ?
-      &priv->close_seq : NULL);
+    if (ret != GST_RTSP_ENOMEM)
+      goto error;
+
+    /* drop backlog */
+    if (priv->drop_backlog)
+      break;
+
+    /* queue was full, wait for more space */
+    GST_DEBUG_OBJECT (client, "waiting for backlog");
+    ret = gst_rtsp_watch_wait_backlog (priv->watch, &time);
+    GST_DEBUG_OBJECT (client, "Resend due to backlog full");
+  } while (ret != GST_RTSP_EINTR);
+
+  return ret;
+
+  /* ERRORS */
+error:
+  {
+    GST_DEBUG_OBJECT (client, "got error %d", ret);
+    return ret;
+  }
 }
 
 static GstRTSPResult
@@ -2886,6 +2939,7 @@ closed (GstRTSPWatch * watch, gpointer user_data)
     g_mutex_unlock (&tunnels_lock);
   }
 
+  gst_rtsp_watch_set_flushing (watch, TRUE);
   gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
 
   return GST_RTSP_OK;
index de47c68..33c4d2c 100644 (file)
@@ -1194,7 +1194,6 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media)
     name = g_strdup_printf ("dynpay%d", i);
     if ((elem = gst_bin_get_by_name (GST_BIN (element), name))) {
       /* a stream that will dynamically create pads to provide RTP packets */
-
       GST_INFO ("found dynamic element %d, %p", i, elem);
 
       g_mutex_lock (&priv->lock);
@@ -1936,7 +1935,7 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
   /* join the element in the PAUSED state because this callback is
    * called from the streaming thread and it is PAUSED */
   if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
-      priv->rtpbin, GST_STATE_PAUSED)) {
+          priv->rtpbin, GST_STATE_PAUSED)) {
     GST_WARNING ("failed to join bin element");
   }
 
@@ -2092,7 +2091,7 @@ start_prepare (GstRTSPMedia * media)
     stream = g_ptr_array_index (priv->streams, i);
 
     if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline),
-        priv->rtpbin, GST_STATE_NULL)) {
+            priv->rtpbin, GST_STATE_NULL)) {
       goto join_bin_failed;
     }
   }