rtspconnection: add limit to queued messages
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 14 Dec 2012 10:36:58 +0000 (11:36 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 14 Dec 2012 10:36:58 +0000 (11:36 +0100)
Add a limit to the amount of queued bytes or messages we allow on the watch.

API: GstRTSPConnection::gst_rtsp_watch_set_send_backlog()
API: GstRTSPConnection::gst_rtsp_watch_get_send_backlog()

gst-libs/gst/rtsp/gstrtspconnection.c
gst-libs/gst/rtsp/gstrtspconnection.h

index f501974..aed3b26 100644 (file)
@@ -2873,10 +2873,13 @@ struct _GstRTSPWatch
   guint id;
   GMutex mutex;
   GQueue *messages;
+  gsize messages_bytes;
   guint8 *write_data;
   guint write_off;
   guint write_size;
   guint write_id;
+  gsize max_bytes;
+  guint max_messages;
 
   GstRTSPWatchFuncs funcs;
 
@@ -3029,6 +3032,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
         if (rec == NULL)
           break;
 
+        watch->messages_bytes -= rec->size;
+
         watch->write_off = 0;
         watch->write_data = rec->data;
         watch->write_size = rec->size;
@@ -3134,6 +3139,7 @@ gst_rtsp_source_finalize (GSource * source)
   g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
   g_queue_free (watch->messages);
   watch->messages = NULL;
+  watch->messages_bytes = 0;
   g_free (watch->write_data);
 
   g_mutex_clear (&watch->mutex);
@@ -3263,9 +3269,63 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
 }
 
 /**
+ * gst_rtsp_watch_set_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: maximum bytes
+ * @messages: maximum messages
+ *
+ * Set the maximum amount of bytes and messages that will be queued in @watch.
+ * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
+ * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
+ *
+ * A value of 0 for @bytes or @messages means no limits.
+ *
+ * Since: 1.1.1
+ */
+void
+gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
+    gsize bytes, guint messages)
+{
+  g_return_if_fail (watch != NULL);
+
+  g_mutex_lock (&watch->mutex);
+  watch->max_bytes = bytes;
+  watch->max_messages = messages;
+  g_mutex_unlock (&watch->mutex);
+
+  GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
+      bytes, messages);
+}
+
+/**
+ * gst_rtsp_watch_get_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: (out) (allow-none): maximum bytes
+ * @messages: (out) (allow-none): maximum messages
+ *
+ * Get the maximum amount of bytes and messages that will be queued in @watch.
+ * See gst_rtsp_watch_set_send_backlog().
+ *
+ * Since: 1.1.1
+ */
+void
+gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
+    gsize * bytes, guint * messages)
+{
+  g_return_if_fail (watch != NULL);
+
+  g_mutex_lock (&watch->mutex);
+  if (bytes)
+    *bytes = watch->max_bytes;
+  if (messages)
+    *messages = watch->max_messages;
+  g_mutex_unlock (&watch->mutex);
+}
+
+/**
  * gst_rtsp_watch_write_data:
  * @watch: a #GstRTSPWatch
- * @data: the data to queue
+ * @data: (array length=size) (transfer full): the data to queue
  * @size: the size of @data
  * @id: (out) (allow-none): location for a message ID or %NULL
  *
@@ -3278,7 +3338,12 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
  *
  * This function will take ownership of @data and g_free() it after use.
  *
- * Returns: #GST_RTSP_OK on success.
+ * If the amount of queued data exceeds the limits set with
+ * gst_rtsp_watch_set_send_backlog(), this function will return
+ * #GST_RTSP_ENOMEM.
+ *
+ * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
+ * are reached.
  */
 GstRTSPResult
 gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
@@ -3308,6 +3373,12 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
     }
   }
 
+  /* check limits */
+  if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) ||
+      (watch->max_messages != 0
+          && watch->messages->length >= watch->max_messages))
+    goto too_much_backlog;
+
   /* make a record with the data and id for sending async */
   rec = g_slice_new (GstRTSPRec);
   if (off == 0) {
@@ -3324,8 +3395,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
     rec->id = ++watch->id;
   } while (G_UNLIKELY (rec->id == 0));
 
-  /* add the record to a queue. FIXME we would like to have an upper limit here */
+  /* add the record to a queue. */
   g_queue_push_head (watch->messages, rec);
+  watch->messages_bytes += rec->size;
 
   /* make sure the main context will now also check for writability on the
    * socket */
@@ -3345,6 +3417,17 @@ done:
     g_main_context_wakeup (context);
 
   return res;
+
+  /* ERRORS */
+too_much_backlog:
+  {
+    GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
+        G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
+        watch->messages_bytes, watch->max_messages, watch->messages->length);
+    g_mutex_unlock (&watch->mutex);
+    g_free ((gpointer) data);
+    return GST_RTSP_ENOMEM;
+  }
 }
 
 /**
index 935e2d1..ab81de9 100644 (file)
@@ -187,6 +187,11 @@ void               gst_rtsp_watch_unref              (GstRTSPWatch *watch);
 guint              gst_rtsp_watch_attach             (GstRTSPWatch *watch,
                                                       GMainContext *context);
 
+void               gst_rtsp_watch_set_send_backlog  (GstRTSPWatch *watch,
+                                                     gsize bytes, guint messages);
+void               gst_rtsp_watch_get_send_backlog  (GstRTSPWatch *watch,
+                                                     gsize *bytes, guint *messages);
+
 GstRTSPResult      gst_rtsp_watch_write_data         (GstRTSPWatch *watch,
                                                       const guint8 *data,
                                                       guint size, guint *id);