From 11369d38ef6bc6a054b6da521cb9e2dbe6226373 Mon Sep 17 00:00:00 2001 From: =?utf8?q?G=C3=B6ran=20J=C3=B6nsson?= Date: Tue, 1 Apr 2014 13:04:21 +0200 Subject: [PATCH] client: Add drop-backlog property When we have too many messages queued for a client (currently hardcoded to 100) we overflow and drop the messages. Add a drop-backlog property to control this behaviour. Setting this property to FALSE will retry to send the messages to the client by waiting for more room in the backlog. Fixes https://bugzilla.gnome.org/show_bug.cgi?id=725898 --- gst/rtsp-server/rtsp-client.c | 62 ++++++++++++++++++++++++++++++++++++++++--- gst/rtsp-server/rtsp-media.c | 5 ++-- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 8b253fa..f2c146e 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -81,6 +81,8 @@ struct _GstRTSPClientPrivate GList *transports; GList *sessions; + + gboolean drop_backlog; }; static GMutex tunnels_lock; @@ -88,12 +90,14 @@ static GHashTable *tunnels; /* protected by tunnels_lock */ #define DEFAULT_SESSION_POOL NULL #define DEFAULT_MOUNT_POINTS NULL +#define DEFAULT_DROP_BACKLOG TRUE enum { PROP_0, PROP_SESSION_POOL, PROP_MOUNT_POINTS, + PROP_DROP_BACKLOG, PROP_LAST }; @@ -174,6 +178,11 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass) GST_TYPE_RTSP_MOUNT_POINTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_DROP_BACKLOG, + g_param_spec_boolean ("drop-backlog", "Drop Backlog", + "Drop data when the backlog queue is full", + DEFAULT_DROP_BACKLOG, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gst_rtsp_client_signals[SIGNAL_CLOSED] = g_signal_new ("closed", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTSPClientClass, closed), NULL, NULL, @@ -255,6 +264,7 @@ gst_rtsp_client_init (GstRTSPClient * client) g_mutex_init (&priv->lock); g_mutex_init (&priv->send_lock); priv->close_seq = 0; + priv->drop_backlog = DEFAULT_DROP_BACKLOG; } static GstRTSPFilterResult @@ -341,6 +351,8 @@ gst_rtsp_client_finalize (GObject * obj) GST_INFO ("finalize client %p", client); + if (priv->watch) + gst_rtsp_watch_set_flushing (priv->watch, TRUE); gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); if (priv->watch) @@ -378,6 +390,7 @@ gst_rtsp_client_get_property (GObject * object, guint propid, GValue * value, GParamSpec * pspec) { GstRTSPClient *client = GST_RTSP_CLIENT (object); + GstRTSPClientPrivate *priv = client->priv; switch (propid) { case PROP_SESSION_POOL: @@ -386,6 +399,9 @@ gst_rtsp_client_get_property (GObject * object, guint propid, case PROP_MOUNT_POINTS: g_value_take_object (value, gst_rtsp_client_get_mount_points (client)); break; + case PROP_DROP_BACKLOG: + g_value_set_boolean (value, priv->drop_backlog); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); } @@ -396,6 +412,7 @@ gst_rtsp_client_set_property (GObject * object, guint propid, const GValue * value, GParamSpec * pspec) { GstRTSPClient *client = GST_RTSP_CLIENT (object); + GstRTSPClientPrivate *priv = client->priv; switch (propid) { case PROP_SESSION_POOL: @@ -404,6 +421,11 @@ gst_rtsp_client_set_property (GObject * object, guint propid, case PROP_MOUNT_POINTS: gst_rtsp_client_set_mount_points (client, g_value_get_object (value)); break; + case PROP_DROP_BACKLOG: + g_mutex_lock (&priv->lock); + priv->drop_backlog = g_value_get_boolean (value); + g_mutex_unlock (&priv->lock); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); } @@ -2842,11 +2864,42 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message, gboolean close, gpointer user_data) { GstRTSPClientPrivate *priv = client->priv; + GstRTSPResult ret; + GTimeVal time; + + time.tv_sec = 1; + time.tv_usec = 0; + + do { + /* send the response and store the seq number so we can wait until it's + * written to the client to close the connection */ + ret = + gst_rtsp_watch_send_message (priv->watch, message, + close ? &priv->close_seq : NULL); + if (ret == GST_RTSP_OK) + break; - /* send the response and store the seq number so we can wait until it's - * written to the client to close the connection */ - return gst_rtsp_watch_send_message (priv->watch, message, close ? - &priv->close_seq : NULL); + if (ret != GST_RTSP_ENOMEM) + goto error; + + /* drop backlog */ + if (priv->drop_backlog) + break; + + /* queue was full, wait for more space */ + GST_DEBUG_OBJECT (client, "waiting for backlog"); + ret = gst_rtsp_watch_wait_backlog (priv->watch, &time); + GST_DEBUG_OBJECT (client, "Resend due to backlog full"); + } while (ret != GST_RTSP_EINTR); + + return ret; + + /* ERRORS */ +error: + { + GST_DEBUG_OBJECT (client, "got error %d", ret); + return ret; + } } static GstRTSPResult @@ -2886,6 +2939,7 @@ closed (GstRTSPWatch * watch, gpointer user_data) g_mutex_unlock (&tunnels_lock); } + gst_rtsp_watch_set_flushing (watch, TRUE); gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); return GST_RTSP_OK; diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index de47c68..33c4d2c 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -1194,7 +1194,6 @@ gst_rtsp_media_collect_streams (GstRTSPMedia * media) name = g_strdup_printf ("dynpay%d", i); if ((elem = gst_bin_get_by_name (GST_BIN (element), name))) { /* a stream that will dynamically create pads to provide RTP packets */ - GST_INFO ("found dynamic element %d, %p", i, elem); g_mutex_lock (&priv->lock); @@ -1936,7 +1935,7 @@ pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media) /* join the element in the PAUSED state because this callback is * called from the streaming thread and it is PAUSED */ if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline), - priv->rtpbin, GST_STATE_PAUSED)) { + priv->rtpbin, GST_STATE_PAUSED)) { GST_WARNING ("failed to join bin element"); } @@ -2092,7 +2091,7 @@ start_prepare (GstRTSPMedia * media) stream = g_ptr_array_index (priv->streams, i); if (!gst_rtsp_stream_join_bin (stream, GST_BIN (priv->pipeline), - priv->rtpbin, GST_STATE_NULL)) { + priv->rtpbin, GST_STATE_NULL)) { goto join_bin_failed; } } -- 2.7.4