From dd32924eb020b01fdec511c9f10a283383312488 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 15 Oct 2019 19:08:32 +0200 Subject: [PATCH] stream: refactor TCP backpressure handling The previous implementation stopped sending TCP messages to all clients when a single one stopped consuming them, which obviously created problems for shared media. Instead, we now manage a backlog in stream-transport, and slow clients are removed once this backlog exceeds a maximum duration, currently hardcoded. Fixes #80 --- gst/rtsp-server/rtsp-client.c | 10 ++ gst/rtsp-server/rtsp-server-internal.h | 55 ++++++++ gst/rtsp-server/rtsp-stream-transport.c | 180 ++++++++++++++++++++++++- gst/rtsp-server/rtsp-stream-transport.h | 5 +- gst/rtsp-server/rtsp-stream.c | 230 ++++++++++++++++++++++---------- 5 files changed, 405 insertions(+), 75 deletions(-) create mode 100644 gst/rtsp-server/rtsp-server-internal.h diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 996af70..727e942 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -53,6 +53,7 @@ #include "rtsp-client.h" #include "rtsp-sdp.h" #include "rtsp-params.h" +#include "rtsp-server-internal.h" typedef enum { @@ -1203,6 +1204,12 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client) } static gboolean +do_check_back_pressure (guint8 channel, GstRTSPClient * client) +{ + return get_data_seq (client, channel) != 0; +} + +static gboolean do_send_data_list (GstBufferList * buffer_list, guint8 channel, GstRTSPClient * client) { @@ -2854,6 +2861,9 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) (GstRTSPSendListFunc) do_send_data_list, (GstRTSPSendListFunc) do_send_data_list, client, NULL); + gst_rtsp_stream_transport_set_back_pressure_callback (trans, + (GstRTSPBackPressureFunc) do_check_back_pressure, client, NULL); + g_hash_table_insert (priv->transports, GINT_TO_POINTER (ct->interleaved.min), trans); g_object_ref (trans); diff --git a/gst/rtsp-server/rtsp-server-internal.h b/gst/rtsp-server/rtsp-server-internal.h new file mode 100644 index 0000000..6650b87 --- /dev/null +++ b/gst/rtsp-server/rtsp-server-internal.h @@ -0,0 +1,55 @@ +/* GStreamer + * Copyright (C) 2019 Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RTSP_SERVER_INTERNAL_H__ +#define __GST_RTSP_SERVER_INTERNAL_H__ + +#include + +G_BEGIN_DECLS + +#include "rtsp-stream-transport.h" + +/* Internal GstRTSPStreamTransport interface */ + +typedef gboolean (*GstRTSPBackPressureFunc) (guint8 channel, gpointer user_data); + +gboolean gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport *trans, + GstBuffer *buffer, + GstBufferList *buffer_list, + gboolean is_rtp); + +gboolean gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport *trans, + GstBuffer **buffer, + GstBufferList **buffer_list, + gboolean *is_rtp); + +gboolean gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport *trans); + +void gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *trans, + GstRTSPBackPressureFunc back_pressure_func, + gpointer user_data, + GDestroyNotify notify); + +gboolean gst_rtsp_stream_transport_check_back_pressure (GstRTSPStreamTransport *trans, + guint8 channel); + +G_END_DECLS + +#endif /* __GST_RTSP_SERVER_INTERNAL_H__ */ diff --git a/gst/rtsp-server/rtsp-stream-transport.c b/gst/rtsp-server/rtsp-stream-transport.c index 1791034..ff53283 100644 --- a/gst/rtsp-server/rtsp-stream-transport.c +++ b/gst/rtsp-server/rtsp-stream-transport.c @@ -48,6 +48,7 @@ #include #include "rtsp-stream-transport.h" +#include "rtsp-server-internal.h" struct _GstRTSPStreamTransportPrivate { @@ -63,6 +64,10 @@ struct _GstRTSPStreamTransportPrivate gpointer list_user_data; GDestroyNotify list_notify; + GstRTSPBackPressureFunc back_pressure_func; + gpointer back_pressure_func_data; + GDestroyNotify back_pressure_func_notify; + GstRTSPKeepAliveFunc keep_alive; gpointer ka_user_data; GDestroyNotify ka_notify; @@ -77,8 +82,23 @@ struct _GstRTSPStreamTransportPrivate GstRTSPUrl *url; GObject *rtpsource; + + /* TCP backlog */ + GstClockTime first_rtp_timestamp; + GstQueueArray *items; }; +#define MAX_BACKLOG_DURATION (10 * GST_SECOND) +#define MAX_BACKLOG_SIZE 100 + +typedef struct +{ + GstBuffer *buffer; + GstBufferList *buffer_list; + gboolean is_rtp; +} BackLogItem; + + enum { PROP_0, @@ -107,9 +127,20 @@ gst_rtsp_stream_transport_class_init (GstRTSPStreamTransportClass * klass) } static void +clear_backlog_item (BackLogItem * item) +{ + gst_clear_buffer (&item->buffer); + gst_clear_buffer_list (&item->buffer_list); +} + +static void gst_rtsp_stream_transport_init (GstRTSPStreamTransport * trans) { trans->priv = gst_rtsp_stream_transport_get_instance_private (trans); + trans->priv->items = gst_queue_array_new_for_struct (sizeof (BackLogItem), 0); + trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE; + gst_queue_array_set_clear_func (trans->priv->items, + (GDestroyNotify) clear_backlog_item); } static void @@ -135,6 +166,8 @@ gst_rtsp_stream_transport_finalize (GObject * obj) if (priv->url) gst_rtsp_url_free (priv->url); + gst_queue_array_free (priv->items); + G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj); } @@ -244,6 +277,39 @@ gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans, priv->list_notify = notify; } +void +gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport * + trans, GstRTSPBackPressureFunc back_pressure_func, gpointer user_data, + GDestroyNotify notify) +{ + GstRTSPStreamTransportPrivate *priv; + + g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); + + priv = trans->priv; + + priv->back_pressure_func = back_pressure_func; + if (priv->back_pressure_func_notify) + priv->back_pressure_func_notify (priv->back_pressure_func_data); + priv->back_pressure_func_data = user_data; + priv->back_pressure_func_notify = notify; +} + +gboolean +gst_rtsp_stream_transport_check_back_pressure (GstRTSPStreamTransport * trans, + guint8 channel) +{ + GstRTSPStreamTransportPrivate *priv; + gboolean ret = FALSE; + + priv = trans->priv; + + if (priv->back_pressure_func) + ret = priv->back_pressure_func (channel, priv->back_pressure_func_data); + + return ret; +} + /** * gst_rtsp_stream_transport_set_keepalive: * @trans: a #GstRTSPStreamTransport @@ -693,7 +759,7 @@ gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans) priv = trans->priv; if (priv->message_sent) - priv->message_sent (priv->ms_user_data); + priv->message_sent (trans, priv->ms_user_data); } /** @@ -729,3 +795,115 @@ gst_rtsp_stream_transport_recv_data (GstRTSPStreamTransport * trans, } return res; } + +static GstClockTime +get_backlog_item_timestamp (BackLogItem * item) +{ + GstClockTime ret = GST_CLOCK_TIME_NONE; + + if (item->buffer) { + ret = GST_BUFFER_DTS_OR_PTS (item->buffer); + } else if (item->buffer_list) { + g_assert (gst_buffer_list_length (item->buffer_list) > 0); + ret = GST_BUFFER_DTS_OR_PTS (gst_buffer_list_get (item->buffer_list, 0)); + } + + return ret; +} + +static GstClockTime +get_first_backlog_timestamp (GstRTSPStreamTransport * trans) +{ + GstRTSPStreamTransportPrivate *priv = trans->priv; + GstClockTime ret = GST_CLOCK_TIME_NONE; + guint i, l; + + l = gst_queue_array_get_length (priv->items); + + for (i = 0; i < l; i++) { + BackLogItem *item = (BackLogItem *) + gst_queue_array_peek_nth_struct (priv->items, i); + + if (item->is_rtp) { + ret = get_backlog_item_timestamp (item); + break; + } + } + + return ret; +} + +/* Not MT-safe, caller should ensure consistent locking. Ownership + * of @buffer and @buffer_list is transfered to the transport */ +gboolean +gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans, + GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp) +{ + gboolean ret = TRUE; + BackLogItem item = { 0, }; + GstClockTime item_timestamp; + GstRTSPStreamTransportPrivate *priv; + + priv = trans->priv; + + if (buffer) + item.buffer = buffer; + if (buffer_list) + item.buffer_list = buffer_list; + item.is_rtp = is_rtp; + + gst_queue_array_push_tail_struct (priv->items, &item); + + item_timestamp = get_backlog_item_timestamp (&item); + + if (is_rtp && priv->first_rtp_timestamp != GST_CLOCK_TIME_NONE) { + GstClockTimeDiff queue_duration; + + g_assert (GST_CLOCK_TIME_IS_VALID (item_timestamp)); + + queue_duration = GST_CLOCK_DIFF (priv->first_rtp_timestamp, item_timestamp); + + g_assert (queue_duration >= 0); + + if (queue_duration > MAX_BACKLOG_DURATION && + gst_queue_array_get_length (priv->items) > MAX_BACKLOG_SIZE) { + ret = FALSE; + } + } else if (is_rtp) { + priv->first_rtp_timestamp = item_timestamp; + } + + return ret; +} + +/* Not MT-safe, caller should ensure consistent locking. Ownership + * of @buffer and @buffer_list is transfered back to the caller */ +gboolean +gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans, + GstBuffer ** buffer, GstBufferList ** buffer_list, gboolean * is_rtp) +{ + BackLogItem *item; + GstRTSPStreamTransportPrivate *priv; + + g_return_val_if_fail (!gst_rtsp_stream_transport_backlog_is_empty (trans), + FALSE); + + priv = trans->priv; + + item = (BackLogItem *) gst_queue_array_pop_head_struct (priv->items); + + priv->first_rtp_timestamp = get_first_backlog_timestamp (trans); + + *buffer = item->buffer; + *buffer_list = item->buffer_list; + *is_rtp = item->is_rtp; + + return TRUE; +} + +/* Not MT-safe, caller should ensure consistent locking. */ +gboolean +gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans) +{ + return gst_queue_array_is_empty (trans->priv->items); +} diff --git a/gst/rtsp-server/rtsp-stream-transport.h b/gst/rtsp-server/rtsp-stream-transport.h index 2b507b0..f878858 100644 --- a/gst/rtsp-server/rtsp-stream-transport.h +++ b/gst/rtsp-server/rtsp-stream-transport.h @@ -18,6 +18,7 @@ */ #include +#include #include #include @@ -88,7 +89,7 @@ typedef void (*GstRTSPKeepAliveFunc) (gpointer user_data); * Function registered with gst_rtsp_stream_transport_set_message_sent() * and called when a message has been sent on the transport. */ -typedef void (*GstRTSPMessageSentFunc) (gpointer user_data); +typedef void (*GstRTSPMessageSentFunc) (GstRTSPStreamTransport *trans, gpointer user_data); /** * GstRTSPStreamTransport: @@ -183,8 +184,6 @@ void gst_rtsp_stream_transport_set_timed_out (GstRTSPStreamT GST_RTSP_SERVER_API gboolean gst_rtsp_stream_transport_is_timed_out (GstRTSPStreamTransport *trans); - - GST_RTSP_SERVER_API gboolean gst_rtsp_stream_transport_send_rtp (GstRTSPStreamTransport *trans, GstBuffer *buffer); diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index e13bfbd..b00315f 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -43,7 +43,26 @@ * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove * the destination again. * - * Last reviewed on 2013-07-16 (1.0.0) + * Each #GstRTSPStreamTransport spawns one queue that will serve as a backlog of a + * controllable maximum size when the reflux from the TCP connection's backpressure + * starts spilling all over. + * + * Unlike the backlog in rtspconnection, which we have decided should only contain + * at most one RTP and one RTCP data message in order to allow control messages to + * go through unobstructed, this backlog only consists of data messages, allowing + * us to fill it up without concern. + * + * When multiple TCP transports exist, for example in the context of a shared media, + * we only pop samples from our appsinks when at least one of the transports doesn't + * experience back pressure: this allows us to pace our sample popping to the speed + * of the fastest client. + * + * When a sample is popped, it is either sent directly on transports that don't + * experience backpressure, or queued on the transport's backlog otherwise. Samples + * are then popped from that backlog when the transport reports it has sent the message. + * + * Once the backlog reaches an overly large duration, the transport is dropped as + * the client was deemed too slow. */ #ifdef HAVE_CONFIG_H #include "config.h" @@ -61,6 +80,7 @@ #include #include "rtsp-stream.h" +#include "rtsp-server-internal.h" struct _GstRTSPStreamPrivate { @@ -167,7 +187,6 @@ struct _GstRTSPStreamPrivate guint tr_cache_cookie; guint n_tcp_transports; gboolean have_buffer[2]; - guint n_outstanding; gint dscp_qos; @@ -2450,13 +2469,93 @@ clear_tr_cache (GstRTSPStreamPrivate * priv) priv->tr_cache = NULL; } +/* With lock taken */ +static gboolean +any_transport_ready (GstRTSPStream * stream, guint8 channel) +{ + gboolean ret = TRUE; + GstRTSPStreamPrivate *priv = stream->priv; + GPtrArray *transports; + gint index; + + transports = priv->tr_cache; + + if (!transports) + goto done; + + for (index = 0; index < transports->len; index++) { + GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index); + if (!gst_rtsp_stream_transport_check_back_pressure (tr, channel)) { + ret = TRUE; + break; + } else { + ret = FALSE; + } + } + +done: + return ret; +} + +/* Must be called *without* priv->lock */ +static void +push_data (GstRTSPStream * stream, GstRTSPStreamTransport * trans, + GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp) +{ + GstRTSPStreamPrivate *priv = stream->priv; + gboolean send_ret = TRUE; + + if (is_rtp) { + if (buffer) + send_ret = gst_rtsp_stream_transport_send_rtp (trans, buffer); + if (buffer_list) + send_ret = gst_rtsp_stream_transport_send_rtp_list (trans, buffer_list); + } else { + if (buffer) + send_ret = gst_rtsp_stream_transport_send_rtcp (trans, buffer); + if (buffer_list) + send_ret = gst_rtsp_stream_transport_send_rtcp_list (trans, buffer_list); + } + + if (!send_ret) { + /* remove transport on send error */ + g_mutex_lock (&priv->lock); + update_transport (stream, trans, FALSE); + g_mutex_unlock (&priv->lock); + } +} + +/* With priv->lock */ +static void +ensure_cached_transports (GstRTSPStream * stream) +{ + GstRTSPStreamPrivate *priv = stream->priv; + GList *walk; + + if (priv->tr_cache_cookie != priv->transports_cookie) { + clear_tr_cache (priv); + priv->tr_cache = + g_ptr_array_new_full (priv->n_tcp_transports, g_object_unref); + + for (walk = priv->transports; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr); + + if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP) + continue; + + g_ptr_array_add (priv->tr_cache, g_object_ref (tr)); + } + priv->tr_cache_cookie = priv->transports_cookie; + } +} + /* Must be called with priv->lock */ static void send_tcp_message (GstRTSPStream * stream, gint idx) { GstRTSPStreamPrivate *priv = stream->priv; GstAppSink *sink; - GList *walk; GstSample *sample; GstBuffer *buffer; GstBufferList *buffer_list; @@ -2464,9 +2563,13 @@ send_tcp_message (GstRTSPStream * stream, gint idx) gboolean is_rtp; GPtrArray *transports; - if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) { + if (!priv->have_buffer[idx]) + return; + + ensure_cached_transports (stream); + + if (!any_transport_ready (stream, idx)) return; - } priv->have_buffer[idx] = FALSE; @@ -2493,52 +2596,38 @@ send_tcp_message (GstRTSPStream * stream, gint idx) is_rtp = (idx == 0); - if (priv->tr_cache_cookie != priv->transports_cookie) { - clear_tr_cache (priv); - priv->tr_cache = - g_ptr_array_new_full (priv->n_tcp_transports, g_object_unref); - - for (walk = priv->transports; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; - const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr); - - if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP) - continue; - - g_ptr_array_add (priv->tr_cache, g_object_ref (tr)); - } - priv->tr_cache_cookie = priv->transports_cookie; - } - transports = priv->tr_cache; g_ptr_array_ref (transports); - priv->n_outstanding += n_messages * priv->n_tcp_transports; g_mutex_unlock (&priv->lock); if (transports) { - for (gint index = 0; index < transports->len; index++) { - GstRTSPStreamTransport *tr = - (GstRTSPStreamTransport *) g_ptr_array_index (transports, index); - gboolean send_ret = TRUE; + gint index; - if (is_rtp) { - if (buffer) - send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer); - if (buffer_list) - send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list); + for (index = 0; index < transports->len; index++) { + GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index); + + if (gst_rtsp_stream_transport_backlog_is_empty (tr) + && !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) { + push_data (stream, tr, buffer, buffer_list, is_rtp); } else { + GstBuffer *buf_ref = NULL; + GstBufferList *buflist_ref = NULL; + + g_mutex_lock (&priv->lock); + if (buffer) - send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer); + buf_ref = gst_buffer_ref (buffer); if (buffer_list) - send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list); - } + buflist_ref = gst_buffer_list_ref (buffer_list); + + if (!gst_rtsp_stream_transport_backlog_push (tr, + buf_ref, buflist_ref, is_rtp)) { + GST_WARNING_OBJECT (stream, + "Dropping slow transport %" GST_PTR_FORMAT, tr); + update_transport (stream, tr, FALSE); + } - if (!send_ret) { - /* remove transport on send error */ - g_mutex_lock (&priv->lock); - priv->n_outstanding -= n_messages; - update_transport (stream, tr, FALSE); g_mutex_unlock (&priv->lock); } } @@ -2558,6 +2647,7 @@ send_thread_main (gpointer data, gpointer user_data) gint i; g_mutex_lock (&priv->lock); + do { idx = -1; /* iterate from 1 and down, so we prioritize RTCP over RTP */ @@ -2569,9 +2659,9 @@ send_thread_main (gpointer data, gpointer user_data) } } - if (idx != -1 && priv->n_outstanding == 0) + if (idx != -1) send_tcp_message (stream, idx); - } while (idx != -1 && priv->n_outstanding == 0); + } while (idx != -1); GST_DEBUG_OBJECT (stream, "send thread done"); g_mutex_unlock (&priv->lock); @@ -2596,10 +2686,7 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) for (i = 0; i < 2; i++) if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) { priv->have_buffer[i] = TRUE; - if (priv->n_outstanding == 0) { - /* send message */ - idx = i; - } + idx = i; break; } @@ -4459,25 +4546,36 @@ mcast_error: } static void -on_message_sent (gpointer user_data) +on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data) { GstRTSPStream *stream = user_data; GstRTSPStreamPrivate *priv = stream->priv; gint idx = -1; + gint i; GST_DEBUG_OBJECT (stream, "message send complete"); g_mutex_lock (&priv->lock); - g_assert (priv->n_outstanding >= 0); + if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) { + GstBuffer *buffer; + GstBufferList *buffer_list; + gboolean is_rtp; + gboolean popped; + + popped = + gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list, + &is_rtp); - if (priv->n_outstanding == 0) - goto no_outstanding; + g_assert (popped == TRUE); - priv->n_outstanding--; - if (priv->n_outstanding == 0) { - gint i; + g_mutex_unlock (&priv->lock); + + push_data (stream, trans, buffer, buffer_list, is_rtp); + gst_clear_buffer (&buffer); + gst_clear_buffer_list (&buffer_list); + } else { /* iterate from 1 and down, so we prioritize RTCP over RTP */ for (i = 1; i >= 0; i--) { if (priv->have_buffer[i]) { @@ -4486,27 +4584,17 @@ on_message_sent (gpointer user_data) break; } } - } - if (idx != -1) { - gint dummy; + if (idx != -1) { + gint dummy; - if (priv->send_pool) { - GST_DEBUG_OBJECT (stream, "start thread"); - g_thread_pool_push (priv->send_pool, &dummy, NULL); + if (priv->send_pool) { + GST_DEBUG_OBJECT (stream, "start thread"); + g_thread_pool_push (priv->send_pool, &dummy, NULL); + } } - } - g_mutex_unlock (&priv->lock); - - return; - - /* ERRORS */ -no_outstanding: - { - GST_INFO ("no outstanding messages"); g_mutex_unlock (&priv->lock); - return; } } @@ -5928,8 +6016,8 @@ gst_rtsp_stream_set_rate_control (GstRTSPStream * stream, gboolean enabled) if (stream->priv->appsink[0]) g_object_set (stream->priv->appsink[0], "sync", enabled, NULL); if (stream->priv->payloader - && g_object_class_find_property (G_OBJECT_GET_CLASS (stream->priv-> - payloader), "onvif-no-rate-control")) + && g_object_class_find_property (G_OBJECT_GET_CLASS (stream-> + priv->payloader), "onvif-no-rate-control")) g_object_set (stream->priv->payloader, "onvif-no-rate-control", !enabled, NULL); if (stream->priv->session) { -- 2.7.4