guint id;
GMutex mutex;
GQueue *messages;
+ gsize messages_bytes;
guint8 *write_data;
guint write_off;
guint write_size;
guint write_id;
+ gsize max_bytes;
+ guint max_messages;
GstRTSPWatchFuncs funcs;
if (rec == NULL)
break;
+ watch->messages_bytes -= rec->size;
+
watch->write_off = 0;
watch->write_data = rec->data;
watch->write_size = rec->size;
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
g_queue_free (watch->messages);
watch->messages = NULL;
+ watch->messages_bytes = 0;
g_free (watch->write_data);
g_mutex_clear (&watch->mutex);
}
/**
+ * gst_rtsp_watch_set_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: maximum bytes
+ * @messages: maximum messages
+ *
+ * Set the maximum amount of bytes and messages that will be queued in @watch.
+ * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
+ * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
+ *
+ * A value of 0 for @bytes or @messages means no limits.
+ *
+ * Since: 1.1.1
+ */
+void
+gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
+ gsize bytes, guint messages)
+{
+ g_return_if_fail (watch != NULL);
+
+ g_mutex_lock (&watch->mutex);
+ watch->max_bytes = bytes;
+ watch->max_messages = messages;
+ g_mutex_unlock (&watch->mutex);
+
+ GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
+ bytes, messages);
+}
+
+/**
+ * gst_rtsp_watch_get_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: (out) (allow-none): maximum bytes
+ * @messages: (out) (allow-none): maximum messages
+ *
+ * Get the maximum amount of bytes and messages that will be queued in @watch.
+ * See gst_rtsp_watch_set_send_backlog().
+ *
+ * Since: 1.1.1
+ */
+void
+gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
+ gsize * bytes, guint * messages)
+{
+ g_return_if_fail (watch != NULL);
+
+ g_mutex_lock (&watch->mutex);
+ if (bytes)
+ *bytes = watch->max_bytes;
+ if (messages)
+ *messages = watch->max_messages;
+ g_mutex_unlock (&watch->mutex);
+}
+
+/**
* gst_rtsp_watch_write_data:
* @watch: a #GstRTSPWatch
- * @data: the data to queue
+ * @data: (array length=size) (transfer full): the data to queue
* @size: the size of @data
* @id: (out) (allow-none): location for a message ID or %NULL
*
*
* This function will take ownership of @data and g_free() it after use.
*
- * Returns: #GST_RTSP_OK on success.
+ * If the amount of queued data exceeds the limits set with
+ * gst_rtsp_watch_set_send_backlog(), this function will return
+ * #GST_RTSP_ENOMEM.
+ *
+ * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
+ * are reached.
*/
GstRTSPResult
gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
}
}
+ /* check limits */
+ if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) ||
+ (watch->max_messages != 0
+ && watch->messages->length >= watch->max_messages))
+ goto too_much_backlog;
+
/* make a record with the data and id for sending async */
rec = g_slice_new (GstRTSPRec);
if (off == 0) {
rec->id = ++watch->id;
} while (G_UNLIKELY (rec->id == 0));
- /* add the record to a queue. FIXME we would like to have an upper limit here */
+ /* add the record to a queue. */
g_queue_push_head (watch->messages, rec);
+ watch->messages_bytes += rec->size;
/* make sure the main context will now also check for writability on the
* socket */
g_main_context_wakeup (context);
return res;
+
+ /* ERRORS */
+too_much_backlog:
+ {
+ GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
+ G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
+ watch->messages_bytes, watch->max_messages, watch->messages->length);
+ g_mutex_unlock (&watch->mutex);
+ g_free ((gpointer) data);
+ return GST_RTSP_ENOMEM;
+ }
}
/**