rtsp-server: Add support for buffer lists
authorSebastian Dröge <sebastian@centricular.com>
Wed, 27 Jun 2018 10:17:07 +0000 (12:17 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 30 Jan 2019 12:39:50 +0000 (14:39 +0200)
This adds new functions for passing buffer lists through the different
layers without breaking API/ABI, and enables the appsink to actually
provide buffer lists.

This should already reduce CPU usage and potentially context switches a
bit by passing a whole buffer list from the appsink instead of
individual buffers. As a next step it would be necessary to
  a) Add support for a vector of data for the GstRTSPMessage body
  b) Add support for sending multiple messages at once to the
    GstRTSPWatch and let it be handled internally
  c) Adding API to GOutputStream that works like writev()

Fixes https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/issues/29

docs/libs/gst-rtsp-server-sections.txt
gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-media.c
gst/rtsp-server/rtsp-stream-transport.c
gst/rtsp-server/rtsp-stream-transport.h
gst/rtsp-server/rtsp-stream.c
gst/rtsp-sink/gstrtspclientsink.c

index 08472c8..369a57a 100644 (file)
@@ -680,6 +680,8 @@ gst_rtsp_stream_transport_get_rtpinfo
 
 GstRTSPSendFunc
 gst_rtsp_stream_transport_set_callbacks
+GstRTSPSendListFunc
+gst_rtsp_stream_transport_set_list_callbacks
 
 GstRTSPKeepAliveFunc
 gst_rtsp_stream_transport_set_keepalive
@@ -690,8 +692,10 @@ gst_rtsp_stream_transport_set_active
 gst_rtsp_stream_transport_set_timed_out
 gst_rtsp_stream_transport_is_timed_out
 
-gst_rtsp_stream_transport_send_rtcp
 gst_rtsp_stream_transport_send_rtp
+gst_rtsp_stream_transport_send_rtp_list
+gst_rtsp_stream_transport_send_rtcp
+gst_rtsp_stream_transport_send_rtcp_list
 
 <SUBSECTION Standard>
 GST_RTSP_STREAM_TRANSPORT_CAST
index ce9cbce..cce9222 100644 (file)
@@ -1198,6 +1198,27 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
   return ret;
 }
 
+static gboolean
+do_send_data_list (GstBufferList * buffer_list, guint8 channel,
+    GstRTSPClient * client)
+{
+  gboolean ret = TRUE;
+  guint i, n = gst_buffer_list_length (buffer_list);
+
+  /* TODO: Needs support for a) queueing up multiple messages on the
+   * GstRTSPWatch in do_send_data() above and b) for one message having a body
+   * consisting of multiple parts here */
+  for (i = 0; i < n; i++) {
+    GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+    ret = do_send_data (buffer, channel, client);
+    if (!ret)
+      break;
+  }
+
+  return ret;
+}
+
 /**
  * gst_rtsp_client_close:
  * @client: a #GstRTSPClient
@@ -2527,6 +2548,9 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
     gst_rtsp_stream_transport_set_callbacks (trans,
         (GstRTSPSendFunc) do_send_data,
         (GstRTSPSendFunc) do_send_data, client, NULL);
+    gst_rtsp_stream_transport_set_list_callbacks (trans,
+        (GstRTSPSendListFunc) do_send_data_list,
+        (GstRTSPSendListFunc) do_send_data_list, client, NULL);
 
     g_hash_table_insert (priv->transports,
         GINT_TO_POINTER (ct->interleaved.min), trans);
@@ -4628,7 +4652,8 @@ gst_rtsp_client_attach (GstRTSPClient * client, GMainContext * context)
   /* create watch for the connection and attach */
   priv->watch = gst_rtsp_watch_new (priv->connection, &watch_funcs,
       g_object_ref (client), (GDestroyNotify) client_watch_notify);
-  gst_rtsp_client_set_send_func (client, do_send_message, priv->watch,
+  gst_rtsp_client_set_send_func (client, do_send_message,
+      g_source_ref ((GSource *) priv->watch),
       (GDestroyNotify) gst_rtsp_watch_unref);
 
   gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE);
index e95b0de..bfcba34 100644 (file)
@@ -2260,8 +2260,9 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
     }
 
     g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
-        TRUE, NULL);
-    g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL);
+        TRUE, "emit-signals", FALSE, NULL);
+    g_object_set (appsink, "sync", FALSE, "async", FALSE, "emit-signals",
+        FALSE, "buffer-list", TRUE, NULL);
 
     data = g_new0 (AppSinkSrcData, 1);
     data->appsink = appsink;
index a258025..7c439e2 100644 (file)
@@ -58,6 +58,11 @@ struct _GstRTSPStreamTransportPrivate
   gpointer user_data;
   GDestroyNotify notify;
 
+  GstRTSPSendListFunc send_rtp_list;
+  GstRTSPSendListFunc send_rtcp_list;
+  gpointer list_user_data;
+  GDestroyNotify list_notify;
+
   GstRTSPKeepAliveFunc keep_alive;
   gpointer ka_user_data;
   GDestroyNotify ka_notify;
@@ -208,6 +213,38 @@ gst_rtsp_stream_transport_set_callbacks (GstRTSPStreamTransport * trans,
 }
 
 /**
+ * gst_rtsp_stream_transport_set_list_callbacks:
+ * @trans: a #GstRTSPStreamTransport
+ * @send_rtp_list: (scope notified): a callback called when RTP should be sent
+ * @send_rtcp_list: (scope notified): a callback called when RTCP should be sent
+ * @user_data: (closure): user data passed to callbacks
+ * @notify: (allow-none): called with the user_data when no longer needed.
+ *
+ * Install callbacks that will be called when data for a stream should be sent
+ * to a client. This is usually used when sending RTP/RTCP over TCP.
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans,
+    GstRTSPSendListFunc send_rtp_list, GstRTSPSendListFunc send_rtcp_list,
+    gpointer user_data, GDestroyNotify notify)
+{
+  GstRTSPStreamTransportPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
+
+  priv = trans->priv;
+
+  priv->send_rtp_list = send_rtp_list;
+  priv->send_rtcp_list = send_rtcp_list;
+  if (priv->list_notify)
+    priv->list_notify (priv->list_user_data);
+  priv->list_user_data = user_data;
+  priv->list_notify = notify;
+}
+
+/**
  * gst_rtsp_stream_transport_set_keepalive:
  * @trans: a #GstRTSPStreamTransport
  * @keep_alive: (scope notified): a callback called when the receiver is active
@@ -532,6 +569,98 @@ gst_rtsp_stream_transport_send_rtcp (GstRTSPStreamTransport * trans,
 }
 
 /**
+ * gst_rtsp_stream_transport_send_rtp_list:
+ * @trans: a #GstRTSPStreamTransport
+ * @buffer_list: (transfer none): a #GstBufferList
+ *
+ * Send @buffer_list to the installed RTP callback for @trans.
+ *
+ * Returns: %TRUE on success
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport * trans,
+    GstBufferList * buffer_list)
+{
+  GstRTSPStreamTransportPrivate *priv;
+  gboolean res = FALSE;
+
+  g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
+
+  priv = trans->priv;
+
+  if (priv->send_rtp_list) {
+    res =
+        priv->send_rtp_list (buffer_list, priv->transport->interleaved.min,
+        priv->list_user_data);
+  } else if (priv->send_rtp) {
+    guint n = gst_buffer_list_length (buffer_list), i;
+
+    for (i = 0; i < n; i++) {
+      GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+      res =
+          priv->send_rtp (buffer, priv->transport->interleaved.min,
+          priv->user_data);
+      if (!res)
+        break;
+    }
+  }
+
+  if (res)
+    gst_rtsp_stream_transport_keep_alive (trans);
+
+  return res;
+}
+
+/**
+ * gst_rtsp_stream_transport_send_rtcp_list:
+ * @trans: a #GstRTSPStreamTransport
+ * @buffer_list: (transfer none): a #GstBuffer
+ *
+ * Send @buffer_list to the installed RTCP callback for @trans.
+ *
+ * Returns: %TRUE on success
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_stream_transport_send_rtcp_list (GstRTSPStreamTransport * trans,
+    GstBufferList * buffer_list)
+{
+  GstRTSPStreamTransportPrivate *priv;
+  gboolean res = FALSE;
+
+  g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
+
+  priv = trans->priv;
+
+  if (priv->send_rtcp_list) {
+    res =
+        priv->send_rtcp_list (buffer_list, priv->transport->interleaved.max,
+        priv->list_user_data);
+  } else if (priv->send_rtcp) {
+    guint n = gst_buffer_list_length (buffer_list), i;
+
+    for (i = 0; i < n; i++) {
+      GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+      res =
+          priv->send_rtcp (buffer, priv->transport->interleaved.max,
+          priv->user_data);
+      if (!res)
+        break;
+    }
+  }
+
+  if (res)
+    gst_rtsp_stream_transport_keep_alive (trans);
+
+  return res;
+}
+
+/**
  * gst_rtsp_stream_transport_keep_alive:
  * @trans: a #GstRTSPStreamTransport
  *
index 48c565a..2b507b0 100644 (file)
@@ -56,6 +56,22 @@ typedef struct _GstRTSPStreamTransportPrivate GstRTSPStreamTransportPrivate;
  * Returns: %TRUE on success
  */
 typedef gboolean (*GstRTSPSendFunc)      (GstBuffer *buffer, guint8 channel, gpointer user_data);
+
+/**
+ * GstRTSPSendListFunc:
+ * @buffer_list: a #GstBufferList
+ * @channel: a channel
+ * @user_data: user data
+ *
+ * Function registered with gst_rtsp_stream_transport_set_callbacks() and
+ * called when @buffer_list must be sent on @channel.
+ *
+ * Returns: %TRUE on success
+ *
+ * Since: 1.16
+ */
+typedef gboolean (*GstRTSPSendListFunc)      (GstBufferList *buffer_list, guint8 channel, gpointer user_data);
+
 /**
  * GstRTSPKeepAliveFunc:
  * @user_data: user data
@@ -132,6 +148,13 @@ void                     gst_rtsp_stream_transport_set_callbacks (GstRTSPStreamT
                                                                   GDestroyNotify  notify);
 
 GST_RTSP_SERVER_API
+void                     gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport *trans,
+                                                                       GstRTSPSendListFunc send_rtp_list,
+                                                                       GstRTSPSendListFunc send_rtcp_list,
+                                                                       gpointer user_data,
+                                                                       GDestroyNotify  notify);
+
+GST_RTSP_SERVER_API
 void                     gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport *trans,
                                                                   GstRTSPKeepAliveFunc keep_alive,
                                                                   gpointer user_data,
@@ -171,6 +194,14 @@ gboolean                 gst_rtsp_stream_transport_send_rtcp     (GstRTSPStreamT
                                                                   GstBuffer *buffer);
 
 GST_RTSP_SERVER_API
+gboolean                 gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport *trans,
+                                                                  GstBufferList *buffer_list);
+
+GST_RTSP_SERVER_API
+gboolean                 gst_rtsp_stream_transport_send_rtcp_list(GstRTSPStreamTransport *trans,
+                                                                  GstBufferList *buffer_list);
+
+GST_RTSP_SERVER_API
 GstFlowReturn            gst_rtsp_stream_transport_recv_data     (GstRTSPStreamTransport *trans,
                                                                   guint channel, GstBuffer *buffer);
 
index 8484ef9..b9d2e7f 100644 (file)
@@ -2456,6 +2456,8 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
   GList *walk;
   GstSample *sample;
   GstBuffer *buffer;
+  GstBufferList *buffer_list;
+  guint n_messages = 0;
   gboolean is_rtp;
 
   if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
@@ -2476,6 +2478,14 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
   }
 
   buffer = gst_sample_get_buffer (sample);
+  buffer_list = gst_sample_get_buffer_list (sample);
+
+  /* We will get one message-sent notification per message,
+   * i.e. per buffer that is actually sent out */
+  if (buffer)
+    n_messages += 1;
+  if (buffer_list)
+    n_messages += gst_buffer_list_length (buffer_list);
 
   is_rtp = (idx == 0);
 
@@ -2513,17 +2523,24 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
     }
   }
 
-  priv->n_outstanding += priv->n_tcp_transports;
+  priv->n_outstanding += n_messages * priv->n_tcp_transports;
 
   g_mutex_unlock (&priv->lock);
 
   if (is_rtp) {
     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
-      if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
+      gboolean send_ret = TRUE;
+
+      if (buffer)
+        send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer);
+      if (buffer_list)
+        send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list);
+
+      if (!send_ret) {
         /* remove transport on send error */
         g_mutex_lock (&priv->lock);
-        priv->n_outstanding--;
+        priv->n_outstanding -= n_messages;
         update_transport (stream, tr, FALSE);
         g_mutex_unlock (&priv->lock);
       }
@@ -2531,10 +2548,17 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
   } else {
     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
-      if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
+      gboolean send_ret = TRUE;
+
+      if (buffer)
+        send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer);
+      if (buffer_list)
+        send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list);
+
+      if (!send_ret) {
         /* remove transport on send error */
         g_mutex_lock (&priv->lock);
-        priv->n_outstanding--;
+        priv->n_outstanding -= n_messages;
         update_transport (stream, tr, FALSE);
         g_mutex_unlock (&priv->lock);
       }
@@ -3369,8 +3393,8 @@ create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
     } else if (is_tcp && !priv->appsink[i]) {
       /* make appsink */
       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
-      g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
-          NULL);
+      g_object_set (priv->appsink[i], "emit-signals", FALSE, "buffer-list",
+          TRUE, "max-buffers", 1, NULL);
 
       /* we need to set sync and preroll to FALSE for the sink to avoid
        * deadlock. This is only needed for sink sending RTCP data. */
index 175b4ea..3e309c1 100644 (file)
@@ -3850,6 +3850,27 @@ do_send_data (GstBuffer * buffer, guint8 channel,
   return res == GST_RTSP_OK;
 }
 
+static gboolean
+do_send_data_list (GstBufferList * buffer_list, guint8 channel,
+    GstRTSPStreamContext * context)
+{
+  gboolean ret = TRUE;
+  guint i, n = gst_buffer_list_length (buffer_list);
+
+  /* TODO: Needs support for a) queueing up multiple messages on the
+   * GstRTSPWatch in do_send_data() above and b) for one message having a body
+   * consisting of multiple parts here */
+  for (i = 0; i < n; i++) {
+    GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+    ret = do_send_data (buffer, channel, context);
+    if (!ret)
+      break;
+  }
+
+  return ret;
+}
+
 static GstRTSPResult
 gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
 {
@@ -4147,6 +4168,10 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
           gst_rtsp_stream_transport_set_callbacks (context->stream_transport,
               (GstRTSPSendFunc) do_send_data,
               (GstRTSPSendFunc) do_send_data, context, NULL);
+          gst_rtsp_stream_transport_set_list_callbacks
+              (context->stream_transport,
+              (GstRTSPSendListFunc) do_send_data_list,
+              (GstRTSPSendListFunc) do_send_data_list, context, NULL);
         }
 
         /* The stream_transport now owns the transport */