client: Add drop-backlog property
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-client.c
index 8b253fa..f2c146e 100644 (file)
@@ -81,6 +81,8 @@ struct _GstRTSPClientPrivate
 
   GList *transports;
   GList *sessions;
+
+  gboolean drop_backlog;
 };
 
 static GMutex tunnels_lock;
@@ -88,12 +90,14 @@ static GHashTable *tunnels;     /* protected by tunnels_lock */
 
 #define DEFAULT_SESSION_POOL            NULL
 #define DEFAULT_MOUNT_POINTS            NULL
+#define DEFAULT_DROP_BACKLOG            TRUE
 
 enum
 {
   PROP_0,
   PROP_SESSION_POOL,
   PROP_MOUNT_POINTS,
+  PROP_DROP_BACKLOG,
   PROP_LAST
 };
 
@@ -174,6 +178,11 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass)
           GST_TYPE_RTSP_MOUNT_POINTS,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_DROP_BACKLOG,
+      g_param_spec_boolean ("drop-backlog", "Drop Backlog",
+          "Drop data when the backlog queue is full",
+          DEFAULT_DROP_BACKLOG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gst_rtsp_client_signals[SIGNAL_CLOSED] =
       g_signal_new ("closed", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstRTSPClientClass, closed), NULL, NULL,
@@ -255,6 +264,7 @@ gst_rtsp_client_init (GstRTSPClient * client)
   g_mutex_init (&priv->lock);
   g_mutex_init (&priv->send_lock);
   priv->close_seq = 0;
+  priv->drop_backlog = DEFAULT_DROP_BACKLOG;
 }
 
 static GstRTSPFilterResult
@@ -341,6 +351,8 @@ gst_rtsp_client_finalize (GObject * obj)
 
   GST_INFO ("finalize client %p", client);
 
+  if (priv->watch)
+    gst_rtsp_watch_set_flushing (priv->watch, TRUE);
   gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
 
   if (priv->watch)
@@ -378,6 +390,7 @@ gst_rtsp_client_get_property (GObject * object, guint propid,
     GValue * value, GParamSpec * pspec)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (object);
+  GstRTSPClientPrivate *priv = client->priv;
 
   switch (propid) {
     case PROP_SESSION_POOL:
@@ -386,6 +399,9 @@ gst_rtsp_client_get_property (GObject * object, guint propid,
     case PROP_MOUNT_POINTS:
       g_value_take_object (value, gst_rtsp_client_get_mount_points (client));
       break;
+    case PROP_DROP_BACKLOG:
+      g_value_set_boolean (value, priv->drop_backlog);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
   }
@@ -396,6 +412,7 @@ gst_rtsp_client_set_property (GObject * object, guint propid,
     const GValue * value, GParamSpec * pspec)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (object);
+  GstRTSPClientPrivate *priv = client->priv;
 
   switch (propid) {
     case PROP_SESSION_POOL:
@@ -404,6 +421,11 @@ gst_rtsp_client_set_property (GObject * object, guint propid,
     case PROP_MOUNT_POINTS:
       gst_rtsp_client_set_mount_points (client, g_value_get_object (value));
       break;
+    case PROP_DROP_BACKLOG:
+      g_mutex_lock (&priv->lock);
+      priv->drop_backlog = g_value_get_boolean (value);
+      g_mutex_unlock (&priv->lock);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
   }
@@ -2842,11 +2864,42 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message,
     gboolean close, gpointer user_data)
 {
   GstRTSPClientPrivate *priv = client->priv;
+  GstRTSPResult ret;
+  GTimeVal time;
+
+  time.tv_sec = 1;
+  time.tv_usec = 0;
+
+  do {
+    /* send the response and store the seq number so we can wait until it's
+     * written to the client to close the connection */
+    ret =
+        gst_rtsp_watch_send_message (priv->watch, message,
+        close ? &priv->close_seq : NULL);
+    if (ret == GST_RTSP_OK)
+      break;
 
-  /* send the response and store the seq number so we can wait until it's
-   * written to the client to close the connection */
-  return gst_rtsp_watch_send_message (priv->watch, message, close ?
-      &priv->close_seq : NULL);
+    if (ret != GST_RTSP_ENOMEM)
+      goto error;
+
+    /* drop backlog */
+    if (priv->drop_backlog)
+      break;
+
+    /* queue was full, wait for more space */
+    GST_DEBUG_OBJECT (client, "waiting for backlog");
+    ret = gst_rtsp_watch_wait_backlog (priv->watch, &time);
+    GST_DEBUG_OBJECT (client, "Resend due to backlog full");
+  } while (ret != GST_RTSP_EINTR);
+
+  return ret;
+
+  /* ERRORS */
+error:
+  {
+    GST_DEBUG_OBJECT (client, "got error %d", ret);
+    return ret;
+  }
 }
 
 static GstRTSPResult
@@ -2886,6 +2939,7 @@ closed (GstRTSPWatch * watch, gpointer user_data)
     g_mutex_unlock (&tunnels_lock);
   }
 
+  gst_rtsp_watch_set_flushing (watch, TRUE);
   gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
 
   return GST_RTSP_OK;