rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffe...
authorSebastian Dröge <sebastian@centricular.com>
Mon, 17 Sep 2018 14:03:45 +0000 (17:03 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Tue, 29 Jan 2019 12:17:23 +0000 (14:17 +0200)
By doing so we can send a whole GstBufferList and each memory in the
contained buffers without copying into a single memory area and with a
single writev() call. This improves performance considerably for
high-packet-rate streams.

This depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333
to be efficient, otherwise each chunk of memory is a separate write()
call.

https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/370

docs/libs/gst-plugins-base-libs-sections.txt
gst-libs/gst/rtsp/gstrtspconnection.c
gst-libs/gst/rtsp/gstrtspconnection.h

index 46fb30999b27558ce0ef5beebb5cd882616c3dc0..6bbad3b481aac9710f0391c561b0b24364fa790e 100644 (file)
@@ -1840,6 +1840,7 @@ gst_rtsp_connection_write
 gst_rtsp_connection_poll
 
 gst_rtsp_connection_send
+gst_rtsp_connection_send_messages
 gst_rtsp_connection_receive
 
 gst_rtsp_connection_next_timeout
@@ -1892,6 +1893,7 @@ gst_rtsp_watch_unref
 gst_rtsp_watch_attach
 gst_rtsp_watch_reset
 gst_rtsp_watch_send_message
+gst_rtsp_watch_send_messages
 gst_rtsp_watch_write_data
 gst_rtsp_watch_get_send_backlog
 gst_rtsp_watch_set_send_backlog
index 48e17d65e6e5a3aca7afceea29f2dbe1ac1321fd..322e77b8b745af7c852c5ec81e0f8df9eb52debf 100644 (file)
@@ -89,6 +89,46 @@ typedef struct
   guint coutl;
 } DecodeCtx;
 
+typedef struct
+{
+  /* If %TRUE we only own data and none of the
+   * other fields
+   */
+  gboolean borrowed;
+
+  /* Header or full message */
+  guint8 *data;
+  guint data_size;
+  gboolean data_is_data_header;
+
+  /* Payload following data, if any */
+  guint8 *body_data;
+  guint body_data_size;
+  /* or */
+  GstBuffer *body_buffer;
+
+  /* DATA packet header statically allocated for above */
+  guint8 data_header[4];
+
+  /* all below only for async writing */
+
+  guint data_offset;            /* == data_size when done */
+  guint body_offset;            /* into body_data or the buffer */
+
+  /* ID of the message for notification */
+  guint id;
+} GstRTSPSerializedMessage;
+
+static void
+gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
+{
+  if (!msg->borrowed) {
+    g_free (msg->body_data);
+    gst_buffer_replace (&msg->body_buffer, NULL);
+  }
+  g_free (msg->data);
+}
+
 #ifdef MSG_NOSIGNAL
 #define SEND_FLAGS MSG_NOSIGNAL
 #else
@@ -1205,6 +1245,100 @@ error:
   }
 }
 
+/* NOTE: This changes the values of vectors if multiple iterations are needed!
+ *
+ * Depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333
+ */
+#if GLIB_CHECK_VERSION(2, 59, 0)
+static GstRTSPResult
+writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
+    gsize * bytes_written, gboolean block, GCancellable * cancellable)
+{
+  gsize _bytes_written = 0;
+  gsize written;
+  GError *err = NULL;
+
+  while (n_vectors > 0) {
+    gboolean res;
+
+    if (block)
+      res = g_output_stream_writev (stream, vectors, n_vectors, &written,
+          cancellable, &err);
+    else
+      res =
+          g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
+          (stream), vectors, n_vectors, &written, cancellable, &err);
+    _bytes_written += written;
+    if (G_UNLIKELY (!res))
+      goto error;
+
+    /* skip vectors that have been written in full */
+    while (written >= vectors[0].size) {
+      written -= vectors[0].size;
+      ++vectors;
+      --n_vectors;
+    }
+
+    /* skip partially written vector data */
+    if (written > 0) {
+      vectors[0].size -= written;
+      vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
+    }
+  }
+
+  *bytes_written = _bytes_written;
+
+  return GST_RTSP_OK;
+
+  /* ERRORS */
+error:
+  {
+    *bytes_written = _bytes_written;
+
+    if (G_UNLIKELY (written == 0))
+      return GST_RTSP_EEOF;
+
+    GST_DEBUG ("%s", err->message);
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      g_clear_error (&err);
+      return GST_RTSP_EINTR;
+    } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+      g_clear_error (&err);
+      return GST_RTSP_EINTR;
+    } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+      g_clear_error (&err);
+      return GST_RTSP_ETIMEOUT;
+    }
+    g_clear_error (&err);
+    return GST_RTSP_ESYS;
+  }
+}
+#else
+static GstRTSPResult
+writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
+    gsize * bytes_written, gboolean block, GCancellable * cancellable)
+{
+  gsize _bytes_written = 0;
+  guint written;
+  gint i;
+  GstRTSPResult res;
+
+  for (i = 0; i < n_vectors; i++) {
+    written = 0;
+    res =
+        write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
+        block, cancellable);
+    _bytes_written += written;
+    if (G_UNLIKELY (res != GST_RTSP_OK))
+      break;
+  }
+
+  *bytes_written = _bytes_written;
+
+  return res;
+}
+#endif
+
 static gint
 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
     gboolean block, GError ** err)
@@ -1465,6 +1599,7 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
  *
  * Returns: #GST_RTSP_OK on success.
  */
+/* FIXME 2.0: This should've been static! */
 GstRTSPResult
 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
     guint size, GTimeVal * timeout)
@@ -1490,15 +1625,22 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
   return res;
 }
 
-static GString *
-message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
+static gboolean
+serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
+    GstRTSPSerializedMessage * serialized_message)
 {
   GString *str = NULL;
 
-  str = g_string_new ("");
+  memset (serialized_message, 0, sizeof (*serialized_message));
+
+  /* Initially we borrow the body_data / body_buffer fields from
+   * the message */
+  serialized_message->borrowed = TRUE;
 
   switch (message->type) {
     case GST_RTSP_MESSAGE_REQUEST:
+      str = g_string_new ("");
+
       /* create request string, add CSeq */
       g_string_append_printf (str, "%s %s RTSP/%s\r\n"
           "CSeq: %d\r\n",
@@ -1516,12 +1658,16 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
       add_auth_header (conn, message);
       break;
     case GST_RTSP_MESSAGE_RESPONSE:
+      str = g_string_new ("");
+
       /* create response string */
       g_string_append_printf (str, "RTSP/%s %d %s\r\n",
           gst_rtsp_version_as_text (message->type_data.response.version),
           message->type_data.response.code, message->type_data.response.reason);
       break;
     case GST_RTSP_MESSAGE_HTTP_REQUEST:
+      str = g_string_new ("");
+
       /* create request string */
       g_string_append_printf (str, "%s %s HTTP/%s\r\n",
           gst_rtsp_method_as_text (message->type_data.request.method),
@@ -1531,6 +1677,8 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
       add_auth_header (conn, message);
       break;
     case GST_RTSP_MESSAGE_HTTP_RESPONSE:
+      str = g_string_new ("");
+
       /* create response string */
       g_string_append_printf (str, "HTTP/%s %d %s\r\n",
           gst_rtsp_version_as_text (message->type_data.request.version),
@@ -1538,7 +1686,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
       break;
     case GST_RTSP_MESSAGE_DATA:
     {
-      guint8 data_header[4];
+      guint8 *data_header = serialized_message->data_header;
 
       /* prepare data header */
       data_header[0] = '$';
@@ -1546,16 +1694,22 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
       data_header[2] = (message->body_size >> 8) & 0xff;
       data_header[3] = message->body_size & 0xff;
 
-      /* create string with header and data */
-      str = g_string_append_len (str, (gchar *) data_header, 4);
-      str =
-          g_string_append_len (str, (gchar *) message->body,
-          message->body_size);
+      /* create serialized message with header and data */
+      serialized_message->data_is_data_header = TRUE;
+      serialized_message->data_size = 4;
+
+      if (message->body) {
+        serialized_message->body_data = message->body;
+        serialized_message->body_data_size = message->body_size;
+      } else {
+        g_assert (message->body_buffer != NULL);
+        serialized_message->body_buffer = message->body_buffer;
+      }
       break;
     }
     default:
       g_string_free (str, TRUE);
-      g_return_val_if_reached (NULL);
+      g_return_val_if_reached (FALSE);
       break;
   }
 
@@ -1563,6 +1717,8 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
   if (message->type != GST_RTSP_MESSAGE_DATA) {
     gchar date_string[100];
 
+    g_assert (str != NULL);
+
     gen_date_string (date_string, sizeof (date_string));
 
     /* add date header */
@@ -1573,7 +1729,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
     gst_rtsp_message_append_headers (message, str);
 
     /* append Content-Length and body if needed */
-    if (message->body != NULL && message->body_size > 0) {
+    if (message->body_size > 0) {
       gchar *len;
 
       len = g_strdup_printf ("%d", message->body_size);
@@ -1582,16 +1738,24 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
       g_free (len);
       /* header ends here */
       g_string_append (str, "\r\n");
-      str =
-          g_string_append_len (str, (gchar *) message->body,
-          message->body_size);
+
+      if (message->body) {
+        serialized_message->body_data = message->body;
+        serialized_message->body_data_size = message->body_size;
+      } else {
+        g_assert (message->body_buffer != NULL);
+        serialized_message->body_buffer = message->body_buffer;
+      }
     } else {
       /* just end headers */
       g_string_append (str, "\r\n");
     }
+
+    serialized_message->data_size = str->len;
+    serialized_message->data = (guint8 *) g_string_free (str, FALSE);
   }
 
-  return str;
+  return TRUE;
 }
 
 /**
@@ -1612,36 +1776,190 @@ GstRTSPResult
 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
     GTimeVal * timeout)
 {
-  GString *string = NULL;
+  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+
+  return gst_rtsp_connection_send_messages (conn, message, 1, timeout);
+}
+
+/**
+ * gst_rtsp_connection_send_messages:
+ * @conn: a #GstRTSPConnection
+ * @messages: (array length=n_messages): the messages to send
+ * @n_messages: the number of messages to send
+ * @timeout: a timeout value or %NULL
+ *
+ * Attempt to send @messages to the connected @conn, blocking up to
+ * the specified @timeout. @timeout can be %NULL, in which case this function
+ * might block forever.
+ *
+ * This function can be cancelled with gst_rtsp_connection_flush().
+ *
+ * Returns: #GST_RTSP_OK on success.
+ *
+ * Since: 1.16
+ */
+GstRTSPResult
+gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
+    GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
+{
+  GstClockTime to;
   GstRTSPResult res;
-  gchar *str;
-  gsize len;
+  GstRTSPSerializedMessage *serialized_messages;
+  GOutputVector *vectors;
+  GstMapInfo *map_infos;
+  guint n_vectors, n_memories;
+  gint i, j, k;
+  gsize bytes_to_write, bytes_written;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
-  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
+
+  serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
+  memset (serialized_messages, 0,
+      sizeof (GstRTSPSerializedMessage) * n_messages);
+
+  for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
+      i++) {
+    if (G_UNLIKELY (!serialize_message (conn, &messages[i],
+                &serialized_messages[i])))
+      goto no_message;
+
+    if (conn->tunneled) {
+      gint state = 0, save = 0;
+      gchar *base64_buffer, *out_buffer;
+      gsize written = 0;
+      gsize in_length;
+
+      in_length = serialized_messages[i].data_size;
+      if (serialized_messages[i].body_data)
+        in_length += serialized_messages[i].body_data_size;
+      else if (serialized_messages[i].body_buffer)
+        in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
+
+      in_length = (in_length / 3 + 1) * 4 + 4 + 1;
+      base64_buffer = out_buffer = g_malloc0 (in_length);
+
+      written =
+          g_base64_encode_step (serialized_messages[i].data_is_data_header ?
+          serialized_messages[i].data_header : serialized_messages[i].data,
+          serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
+      out_buffer += written;
+
+      if (serialized_messages[i].body_data) {
+        written =
+            g_base64_encode_step (serialized_messages[i].body_data,
+            serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
+            &save);
+        out_buffer += written;
+      } else if (serialized_messages[i].body_buffer) {
+        guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
+
+        for (j = 0; j < n; j++) {
+          GstMemory *mem =
+              gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
+          GstMapInfo map;
+
+          gst_memory_map (mem, &map, GST_MAP_READ);
+
+          written = g_base64_encode_step (map.data, map.size,
+              FALSE, out_buffer, &state, &save);
+          out_buffer += written;
+
+          gst_memory_unmap (mem, &map);
+        }
+      }
 
-  if (G_UNLIKELY (!(string = message_to_string (conn, message))))
-    goto no_message;
+      written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
+      out_buffer += written;
 
-  if (conn->tunneled) {
-    str = g_base64_encode ((const guchar *) string->str, string->len);
-    g_string_free (string, TRUE);
-    len = strlen (str);
-  } else {
-    str = string->str;
-    len = string->len;
-    g_string_free (string, FALSE);
+      gst_rtsp_serialized_message_clear (&serialized_messages[i]);
+      memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
+
+      serialized_messages[i].data = (guint8 *) base64_buffer;
+      serialized_messages[i].data_size = (out_buffer - base64_buffer) + 1;
+      n_vectors++;
+    } else {
+      n_vectors++;
+      if (serialized_messages[i].body_data) {
+        n_vectors++;
+      } else if (serialized_messages[i].body_buffer) {
+        n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
+        n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
+      }
+    }
   }
 
-  /* write request */
-  res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
+  vectors = g_newa (GOutputVector, n_vectors);
+  map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
+
+  for (i = 0, j = 0, k = 0; i < n_messages; i++) {
+    vectors[j].buffer = serialized_messages[i].data_is_data_header ?
+        serialized_messages[i].data_header : serialized_messages[i].data;
+    vectors[j].size = serialized_messages[i].data_size;
+    bytes_to_write += vectors[j].size;
+    j++;
+
+    if (serialized_messages[i].body_data) {
+      vectors[j].buffer = serialized_messages[i].body_data;
+      vectors[j].size = serialized_messages[i].body_data_size;
+      bytes_to_write += vectors[j].size;
+      j++;
+    } else if (serialized_messages[i].body_buffer) {
+      gint l, n;
+
+      n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
+      for (l = 0; l < n; l++) {
+        GstMemory *mem =
+            gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
+
+        gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
+        vectors[j].buffer = map_infos[k].data;
+        vectors[j].size = map_infos[k].size;
+        bytes_to_write += vectors[j].size;
+
+        k++;
+        j++;
+      }
+    }
+  }
+
+  /* write request: this is synchronous */
+  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
+
+  g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
+  res =
+      writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
+      TRUE, conn->cancellable);
+  g_socket_set_timeout (conn->write_socket, 0);
+
+  g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
 
-  g_free (str);
+  /* free everything */
+  for (i = 0, k = 0; i < n_messages; i++) {
+    if (serialized_messages[i].body_buffer) {
+      gint l, n;
+
+      n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
+      for (l = 0; l < n; l++) {
+        GstMemory *mem =
+            gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
+
+        gst_memory_unmap (mem, &map_infos[k]);
+        k++;
+      }
+    }
+
+    g_free (serialized_messages[i].data);
+  }
 
   return res;
 
 no_message:
   {
+    for (i = 0; i < n_messages; i++) {
+      gst_rtsp_serialized_message_clear (&serialized_messages[i]);
+    }
     g_warning ("Wrong message");
     return GST_RTSP_EINVAL;
   }
@@ -3136,13 +3454,6 @@ gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
 #define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
 #define WRITE_COND  (G_IO_OUT | WRITE_ERR)
 
-typedef struct
-{
-  guint8 *data;
-  guint size;
-  guint id;
-} GstRTSPRec;
-
 /* async functions */
 struct _GstRTSPWatch
 {
@@ -3164,10 +3475,8 @@ struct _GstRTSPWatch
   GMutex mutex;
   GstQueueArray *messages;
   gsize messages_bytes;
-  guint8 *write_data;
-  guint write_off;
-  guint write_size;
-  guint write_id;
+  guint messages_count;
+
   gsize max_bytes;
   guint max_messages;
   GCond queue_not_full;
@@ -3180,7 +3489,7 @@ struct _GstRTSPWatch
 };
 
 #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
-      ((w)->max_messages != 0 && gst_queue_array_get_length((w)->messages) >= (w)->max_messages))
+      ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
 
 static gboolean
 gst_rtsp_source_prepare (GSource * source, gint * timeout)
@@ -3426,71 +3735,251 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
   GstRTSPConnection *conn = watch->conn;
 
   /* if this connection was already closed, stop now */
-  if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream)
+  if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
+      !watch->messages)
     goto eof;
 
   g_mutex_lock (&watch->mutex);
   do {
-    if (watch->write_data == NULL) {
-      GstRTSPRec *rec_ptr, rec;
-
-      /* get a new message from the queue */
-      rec_ptr = gst_queue_array_pop_head_struct (watch->messages);
-      if (rec_ptr == NULL) {
-        if (watch->writesrc) {
-          if (!g_source_is_destroyed ((GSource *) watch))
-            g_source_remove_child_source ((GSource *) watch, watch->writesrc);
-          g_source_unref (watch->writesrc);
-          watch->writesrc = NULL;
-          /* we create and add the write source again when we actually have
-           * something to write */
-
-          /* since write source is now removed we add read source on the write
-           * socket instead to be able to detect when client closes get channel
-           * in tunneled mode */
-          if (watch->conn->control_stream) {
-            watch->controlsrc =
-                g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
-                (watch->conn->control_stream), NULL);
-            g_source_set_callback (watch->controlsrc,
-                (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
-                NULL);
-            g_source_add_child_source ((GSource *) watch, watch->controlsrc);
-          } else {
-            watch->controlsrc = NULL;
+    guint n_messages = gst_queue_array_get_length (watch->messages);
+    GOutputVector *vectors;
+    GstMapInfo *map_infos;
+    guint *ids;
+    gsize bytes_to_write, bytes_written;
+    guint n_vectors, n_memories, n_ids, drop_messages;
+    gint i, j, k, l;
+    GstRTSPSerializedMessage *msg;
+
+    /* if this connection was already closed, stop now */
+    if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
+        !watch->messages) {
+      g_mutex_unlock (&watch->mutex);
+      goto eof;
+    }
+
+    if (n_messages == 0) {
+      if (watch->writesrc) {
+        if (!g_source_is_destroyed ((GSource *) watch))
+          g_source_remove_child_source ((GSource *) watch, watch->writesrc);
+        g_source_unref (watch->writesrc);
+        watch->writesrc = NULL;
+        /* we create and add the write source again when we actually have
+         * something to write */
+
+        /* since write source is now removed we add read source on the write
+         * socket instead to be able to detect when client closes get channel
+         * in tunneled mode */
+        if (watch->conn->control_stream) {
+          watch->controlsrc =
+              g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
+              (watch->conn->control_stream), NULL);
+          g_source_set_callback (watch->controlsrc,
+              (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
+              NULL);
+          g_source_add_child_source ((GSource *) watch, watch->controlsrc);
+        } else {
+          watch->controlsrc = NULL;
+        }
+      }
+      break;
+    }
+
+    for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
+      msg = gst_queue_array_peek_nth_struct (watch->messages, i);
+      if (msg->id != 0)
+        n_ids++;
+
+      if (msg->data_offset < msg->data_size)
+        n_vectors++;
+
+      if (msg->body_data && msg->body_offset < msg->body_data_size) {
+        n_vectors++;
+      } else if (msg->body_buffer) {
+        guint m, n;
+        guint offset = 0;
+
+        n = gst_buffer_n_memory (msg->body_buffer);
+        for (m = 0; m < n; m++) {
+          GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
+
+          /* Skip all memories we already wrote */
+          if (offset + mem->size < msg->body_offset) {
+            offset += mem->size;
+            continue;
           }
+          offset += mem->size;
+
+          n_memories++;
+          n_vectors++;
         }
-        break;
       }
+    }
 
-      rec = *rec_ptr;
-      watch->messages_bytes -= rec.size;
+    vectors = g_newa (GOutputVector, n_vectors);
+    map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
+    ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
+    if (ids)
+      memset (ids, 0, sizeof (guint) * (n_ids + 1));
+
+    for (i = 0, j = 0, k = 0, l = 0, bytes_to_write = 0; i < n_messages; i++) {
+      msg = gst_queue_array_peek_nth_struct (watch->messages, i);
+
+      if (msg->data_offset < msg->data_size) {
+        vectors[j].buffer = (msg->data_is_data_header ?
+            msg->data_header : msg->data) + msg->data_offset;
+        vectors[j].size = msg->data_size - msg->data_offset;
+        bytes_to_write += vectors[j].size;
+        j++;
+      }
+
+      if (msg->body_data) {
+        if (msg->body_offset < msg->body_data_size) {
+          vectors[j].buffer = msg->body_data + msg->body_offset;
+          vectors[j].size = msg->body_data_size - msg->body_offset;
+          bytes_to_write += vectors[j].size;
+          j++;
+        }
+      } else if (msg->body_buffer) {
+        guint m, n;
+        guint offset = 0;
+
+        n = gst_buffer_n_memory (msg->body_buffer);
+        for (m = 0; m < n; m++) {
+          GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
+          guint off;
+
+          /* Skip all memories we already wrote */
+          if (offset + mem->size < msg->body_offset) {
+            offset += mem->size;
+            continue;
+          }
+
+          if (offset < msg->body_offset) {
+            off = msg->body_offset - offset;
+          } else {
+            offset += mem->size;
+            off = 0;
+          }
+
+          g_assert (off < mem->size);
+
+          gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
+          vectors[j].buffer = map_infos[k].data + off;
+          vectors[j].size = map_infos[k].size - off;
+          bytes_to_write += vectors[j].size;
+
+          k++;
+          j++;
+        }
+      }
+    }
 
-      watch->write_off = 0;
-      watch->write_data = rec.data;
-      watch->write_size = rec.size;
-      watch->write_id = rec.id;
+    res =
+        writev_bytes (watch->conn->output_stream, vectors, n_vectors,
+        &bytes_written, FALSE, watch->conn->cancellable);
+    g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
+
+    /* First unmap all memories here, this simplifies the code below
+     * as we don't have to skip all memories that were already written
+     * before */
+    for (k = 0; k < n_memories; k++) {
+      gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
     }
 
-    res = write_bytes (conn->output_stream, watch->write_data,
-        &watch->write_off, watch->write_size, FALSE, conn->cancellable);
+    if (bytes_written == bytes_to_write) {
+      /* fast path, just unmap all memories, free memory, drop all messages and notify them */
+      l = 0;
+      while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
+        if (msg->id) {
+          ids[l] = msg->id;
+          l++;
+        }
+
+        gst_rtsp_serialized_message_clear (msg);
+      }
+
+      g_assert (watch->messages_bytes >= bytes_written);
+      watch->messages_bytes -= bytes_written;
+    } else if (bytes_written > 0) {
+      /* not done, let's skip all messages that were sent already and free them */
+      for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
+        msg = gst_queue_array_peek_nth_struct (watch->messages, i);
+
+        if (bytes_written >= 0) {
+          if (bytes_written >= msg->data_size) {
+            guint body_size;
+
+            /* all data of this message is sent, check body and otherwise
+             * skip the whole message for next time */
+            msg->data_offset = msg->data_size;
+            bytes_written -= msg->data_size;
+
+            if (msg->body_data) {
+              body_size = msg->body_data_size;
+
+            } else if (msg->body_buffer) {
+              body_size = gst_buffer_get_size (msg->body_buffer);
+            } else {
+              body_size = 0;
+            }
+
+            if (bytes_written >= body_size) {
+              /* body written, drop this message */
+              msg->body_offset = body_size;
+              bytes_written -= body_size;
+              drop_messages++;
+
+              if (msg->id) {
+                ids[l] = msg->id;
+                l++;
+              }
+
+              gst_rtsp_serialized_message_clear (msg);
+            } else {
+              msg->body_offset = bytes_written;
+              bytes_written = 0;
+            }
+          } else {
+            /* Need to continue sending from the data of this message */
+            msg->data_offset = bytes_written;
+            bytes_written = 0;
+          }
+        }
+      }
+
+      while (drop_messages > 0) {
+        msg = gst_queue_array_pop_head_struct (watch->messages);
+        g_assert (msg);
+        drop_messages--;
+      }
+
+      g_assert (watch->messages_bytes >= bytes_written);
+      watch->messages_bytes -= bytes_written;
+    }
 
     if (!IS_BACKLOG_FULL (watch))
       g_cond_signal (&watch->queue_not_full);
     g_mutex_unlock (&watch->mutex);
 
-    if (res == GST_RTSP_EINTR)
+    /* notify all messages that were successfully written */
+    if (ids) {
+      while (*ids) {
+        /* only decrease the counter for messages that have an id. Only
+         * the last message of a messages chunk is counted */
+        watch->messages_count--;
+
+        if (watch->funcs.message_sent)
+          watch->funcs.message_sent (watch, *ids, watch->user_data);
+        ids++;
+      }
+    }
+
+    if (res == GST_RTSP_EINTR) {
       goto write_blocked;
-    else if (G_LIKELY (res == GST_RTSP_OK)) {
-      if (watch->funcs.message_sent)
-        watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
-    } else {
+    } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
       goto write_error;
     }
     g_mutex_lock (&watch->mutex);
-
-    g_free (watch->write_data);
-    watch->write_data = NULL;
   } while (TRUE);
   g_mutex_unlock (&watch->mutex);
 
@@ -3504,29 +3993,29 @@ eof:
   }
 write_error:
   {
-    if (watch->funcs.error_full)
-      watch->funcs.error_full (watch, res, NULL,
-          watch->write_id, watch->user_data);
-    else if (watch->funcs.error)
+    if (watch->funcs.error_full) {
+      guint i, n_messages;
+
+      n_messages = gst_queue_array_get_length (watch->messages);
+      for (i = 0; i < n_messages; i++) {
+        GstRTSPSerializedMessage *msg =
+            gst_queue_array_peek_nth_struct (watch->messages, i);
+        if (msg->id)
+          watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
+      }
+    } else if (watch->funcs.error) {
       watch->funcs.error (watch, res, watch->user_data);
+    }
 
     return FALSE;
   }
 }
 
-static void
-gst_rtsp_rec_clear (gpointer data)
-{
-  GstRTSPRec *rec = data;
-
-  g_free (rec->data);
-}
-
 static void
 gst_rtsp_source_finalize (GSource * source)
 {
   GstRTSPWatch *watch = (GstRTSPWatch *) source;
-  GstRTSPRec *rec;
+  GstRTSPSerializedMessage *msg;
 
   if (watch->notify)
     watch->notify (watch->user_data);
@@ -3534,13 +4023,14 @@ gst_rtsp_source_finalize (GSource * source)
   build_reset (&watch->builder);
   gst_rtsp_message_unset (&watch->message);
 
-  while ((rec = gst_queue_array_pop_head_struct (watch->messages)))
-    gst_rtsp_rec_clear (rec);
+  while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
+    gst_rtsp_serialized_message_clear (msg);
+  }
   gst_queue_array_free (watch->messages);
   watch->messages = NULL;
   watch->messages_bytes = 0;
+  watch->messages_count = 0;
 
-  g_free (watch->write_data);
   g_cond_clear (&watch->queue_not_full);
 
   if (watch->readsrc)
@@ -3598,7 +4088,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
   result->builder.state = STATE_START;
 
   g_mutex_init (&result->mutex);
-  result->messages = gst_queue_array_new_for_struct (sizeof (GstRTSPRec), 10);
+  result->messages =
+      gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
   g_cond_init (&result->queue_not_full);
 
   gst_rtsp_watch_reset (result);
@@ -3756,83 +4247,190 @@ gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
   g_mutex_unlock (&watch->mutex);
 }
 
-/**
- * gst_rtsp_watch_write_data:
- * @watch: a #GstRTSPWatch
- * @data: (array length=size) (transfer full): the data to queue
- * @size: the size of @data
- * @id: (out) (allow-none): location for a message ID or %NULL
- *
- * Write @data using the connection of the @watch. If it cannot be sent
- * immediately, it will be queued for transmission in @watch. The contents of
- * @message will then be serialized and transmitted when the connection of the
- * @watch becomes writable. In case the @message is queued, the ID returned in
- * @id will be non-zero and used as the ID argument in the message_sent
- * callback.
- *
- * This function will take ownership of @data and g_free() it after use.
- *
- * If the amount of queued data exceeds the limits set with
- * gst_rtsp_watch_set_send_backlog(), this function will return
- * #GST_RTSP_ENOMEM.
- *
- * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
- * are reached. #GST_RTSP_EINTR when @watch was flushing.
- */
-GstRTSPResult
-gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
-    guint size, guint * id)
+static GstRTSPResult
+gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
+    GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
 {
   GstRTSPResult res;
-  GstRTSPRec rec;
-  guint off = 0;
   GMainContext *context = NULL;
+  gint i;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
-  g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
-  g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
+  g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
 
   g_mutex_lock (&watch->mutex);
   if (watch->flushing)
     goto flushing;
 
   /* try to send the message synchronously first */
-  if (gst_queue_array_get_length (watch->messages) == 0
-      && watch->write_data == NULL) {
+  if (gst_queue_array_get_length (watch->messages) == 0) {
+    gint j, k;
+    GOutputVector *vectors;
+    GstMapInfo *map_infos;
+    gsize bytes_to_write, bytes_written;
+    guint n_vectors, n_memories, drop_messages;
+
+    for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
+      n_vectors++;
+      if (messages[i].body_data) {
+        n_vectors++;
+      } else if (messages[i].body_buffer) {
+        n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
+        n_memories += gst_buffer_n_memory (messages[i].body_buffer);
+      }
+    }
+
+    vectors = g_newa (GOutputVector, n_vectors);
+    map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
+
+    for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
+      vectors[j].buffer = messages[i].data_is_data_header ?
+          messages[i].data_header : messages[i].data;
+      vectors[j].size = messages[i].data_size;
+      bytes_to_write += vectors[j].size;
+      j++;
+
+      if (messages[i].body_data) {
+        vectors[j].buffer = messages[i].body_data;
+        vectors[j].size = messages[i].body_data_size;
+        bytes_to_write += vectors[j].size;
+        j++;
+      } else if (messages[i].body_buffer) {
+        gint l, n;
+
+        n = gst_buffer_n_memory (messages[i].body_buffer);
+        for (l = 0; l < n; l++) {
+          GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
+
+          gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
+          vectors[j].buffer = map_infos[k].data;
+          vectors[j].size = map_infos[k].size;
+          bytes_to_write += vectors[j].size;
+
+          k++;
+          j++;
+        }
+      }
+    }
+
     res =
-        write_bytes (watch->conn->output_stream, data, &off, size, FALSE,
-        watch->conn->cancellable);
+        writev_bytes (watch->conn->output_stream, vectors, n_vectors,
+        &bytes_written, FALSE, watch->conn->cancellable);
+    g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
+
+    /* At this point we sent everything we could without blocking or
+     * error and updated the offsets inside the message accordingly */
+
+    /* First of all unmap all memories. This simplifies the code below */
+    for (k = 0; k < n_memories; k++) {
+      gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
+    }
+
     if (res != GST_RTSP_EINTR) {
+      /* actual error or done completely */
       if (id != NULL)
         *id = 0;
-      g_free ((gpointer) data);
+
+      /* free everything */
+      for (i = 0, k = 0; i < n_messages; i++) {
+        gst_rtsp_serialized_message_clear (&messages[i]);
+      }
+
       goto done;
     }
+
+    /* not done, let's skip all messages that were sent already and free them */
+    for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
+      if (bytes_written >= 0) {
+        if (bytes_written >= messages[i].data_size) {
+          guint body_size;
+
+          /* all data of this message is sent, check body and otherwise
+           * skip the whole message for next time */
+          messages[i].data_offset = messages[i].data_size;
+          bytes_written -= messages[i].data_size;
+
+          if (messages[i].body_data) {
+            body_size = messages[i].body_data_size;
+
+          } else if (messages[i].body_buffer) {
+            body_size = gst_buffer_get_size (messages[i].body_buffer);
+          } else {
+            body_size = 0;
+          }
+
+          if (bytes_written >= body_size) {
+            /* body written, drop this message */
+            messages[i].body_offset = body_size;
+            bytes_written -= body_size;
+            drop_messages++;
+
+            gst_rtsp_serialized_message_clear (&messages[i]);
+          } else {
+            messages[i].body_offset = bytes_written;
+            bytes_written = 0;
+          }
+        } else {
+          /* Need to continue sending from the data of this message */
+          messages[i].data_offset = bytes_written;
+          bytes_written = 0;
+        }
+      }
+    }
+
+    g_assert (n_messages > drop_messages);
+
+    messages += drop_messages;
+    n_messages -= drop_messages;
   }
 
   /* check limits */
   if (IS_BACKLOG_FULL (watch))
     goto too_much_backlog;
 
-  /* make a record with the data and id for sending async */
-  memset (&rec, 0, sizeof (rec));
-  if (off == 0) {
-    rec.data = (guint8 *) data;
-    rec.size = size;
-  } else {
-    rec.data = g_memdup (data + off, size - off);
-    rec.size = size - off;
-    g_free ((gpointer) data);
-  }
+  for (i = 0; i < n_messages; i++) {
+    GstRTSPSerializedMessage local_message;
 
-  do {
-    /* make sure rec->id is never 0 */
-    rec.id = ++watch->id;
-  } while (G_UNLIKELY (rec.id == 0));
+    /* make a record with the data and id for sending async */
+    local_message = messages[i];
+
+    /* copy the body data or take an additional reference to the body buffer
+     * we don't own them here */
+    if (local_message.body_data) {
+      local_message.body_data =
+          g_memdup (local_message.body_data, local_message.body_data_size);
+    } else if (local_message.body_buffer) {
+      gst_buffer_ref (local_message.body_buffer);
+    }
+    local_message.borrowed = FALSE;
+
+    /* set an id for the very last message */
+    if (i == n_messages - 1) {
+      do {
+        /* make sure rec->id is never 0 */
+        local_message.id = ++watch->id;
+      } while (G_UNLIKELY (local_message.id == 0));
 
-  /* add the record to a queue. */
-  gst_queue_array_push_tail_struct (watch->messages, &rec);
-  watch->messages_bytes += rec.size;
+      if (id != NULL)
+        *id = local_message.id;
+    } else {
+      local_message.id = 0;
+    }
+
+    /* add the record to a queue. */
+    gst_queue_array_push_tail_struct (watch->messages, &local_message);
+    watch->messages_bytes +=
+        (local_message.data_size - local_message.data_offset);
+    if (local_message.body_data)
+      watch->messages_bytes +=
+          (local_message.body_data_size - local_message.body_offset);
+    else if (local_message.body_buffer)
+      watch->messages_bytes +=
+          (gst_buffer_get_size (local_message.body_buffer) -
+          local_message.body_offset);
+  }
+  /* each message chunks is one unit */
+  watch->messages_count++;
 
   /* make sure the main context will now also check for writability on the
    * socket */
@@ -3853,9 +4451,6 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
         (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
     g_source_add_child_source ((GSource *) watch, watch->writesrc);
   }
-
-  if (id != NULL)
-    *id = rec.id;
   res = GST_RTSP_OK;
 
 done:
@@ -3871,19 +4466,62 @@ flushing:
   {
     GST_DEBUG ("we are flushing");
     g_mutex_unlock (&watch->mutex);
-    g_free ((gpointer) data);
+    for (i = 0; i < n_messages; i++) {
+      gst_rtsp_serialized_message_clear (&messages[i]);
+    }
     return GST_RTSP_EINTR;
   }
 too_much_backlog:
   {
     GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
         G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
-        watch->messages_bytes, watch->max_messages,
-        gst_queue_array_get_length (watch->messages));
+        watch->messages_bytes, watch->max_messages, watch->messages_count);
     g_mutex_unlock (&watch->mutex);
-    g_free ((gpointer) data);
+    for (i = 0; i < n_messages; i++) {
+      gst_rtsp_serialized_message_clear (&messages[i]);
+    }
     return GST_RTSP_ENOMEM;
   }
+
+  return GST_RTSP_OK;
+}
+
+/**
+ * gst_rtsp_watch_write_data:
+ * @watch: a #GstRTSPWatch
+ * @data: (array length=size) (transfer full): the data to queue
+ * @size: the size of @data
+ * @id: (out) (allow-none): location for a message ID or %NULL
+ *
+ * Write @data using the connection of the @watch. If it cannot be sent
+ * immediately, it will be queued for transmission in @watch. The contents of
+ * @message will then be serialized and transmitted when the connection of the
+ * @watch becomes writable. In case the @message is queued, the ID returned in
+ * @id will be non-zero and used as the ID argument in the message_sent
+ * callback.
+ *
+ * This function will take ownership of @data and g_free() it after use.
+ *
+ * If the amount of queued data exceeds the limits set with
+ * gst_rtsp_watch_set_send_backlog(), this function will return
+ * #GST_RTSP_ENOMEM.
+ *
+ * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
+ * are reached. #GST_RTSP_EINTR when @watch was flushing.
+ */
+/* FIXME 2.0: This should've been static! */
+GstRTSPResult
+gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
+    guint size, guint * id)
+{
+  GstRTSPSerializedMessage serialized_message;
+
+  memset (&serialized_message, 0, sizeof (serialized_message));
+  serialized_message.data = (guint8 *) data;
+  serialized_message.data_size = size;
+
+  return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
+      1, id);
 }
 
 /**
@@ -3905,17 +4543,59 @@ GstRTSPResult
 gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
     guint * id)
 {
-  GString *str;
-  guint size;
-
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
 
-  /* make a record with the message as a string and id */
-  str = message_to_string (watch->conn, message);
-  size = str->len;
-  return gst_rtsp_watch_write_data (watch,
-      (guint8 *) g_string_free (str, FALSE), size, id);
+  return gst_rtsp_watch_send_messages (watch, message, 1, id);
+}
+
+/**
+ * gst_rtsp_watch_send_messages:
+ * @watch: a #GstRTSPWatch
+ * @messages: (array length=n_messages): the messages to send
+ * @n_messages: the number of messages to send
+ * @id: (out) (allow-none): location for a message ID or %NULL
+ *
+ * Sends @messages using the connection of the @watch. If they cannot be sent
+ * immediately, they will be queued for transmission in @watch. The contents of
+ * @messages will then be serialized and transmitted when the connection of the
+ * @watch becomes writable. In case the @messages are queued, the ID returned in
+ * @id will be non-zero and used as the ID argument in the message_sent
+ * callback once the last message is sent. The callback will only be called
+ * once for the last message.
+ *
+ * Returns: #GST_RTSP_OK on success.
+ *
+ * Since: 1.16
+ */
+GstRTSPResult
+gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
+    guint n_messages, guint * id)
+{
+  GstRTSPSerializedMessage *serialized_messages;
+  gint i;
+
+  g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
+
+  serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
+  memset (serialized_messages, 0,
+      sizeof (GstRTSPSerializedMessage) * n_messages);
+
+  for (i = 0; i < n_messages; i++) {
+    if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
+      goto error;
+  }
+
+  return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
+      n_messages, id);
+
+error:
+  for (i = 0; i < n_messages; i++) {
+    gst_rtsp_serialized_message_clear (&serialized_messages[i]);
+  }
+
+  return GST_RTSP_EINVAL;
 }
 
 /**
@@ -4005,10 +4685,11 @@ gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
   watch->flushing = flushing;
   g_cond_signal (&watch->queue_not_full);
   if (flushing) {
-    GstRTSPRec *rec;
+    GstRTSPSerializedMessage *msg;
 
-    while ((rec = gst_queue_array_pop_head_struct (watch->messages)))
-      gst_rtsp_rec_clear (rec);
+    while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
+      gst_rtsp_serialized_message_clear (msg);
+    }
   }
   g_mutex_unlock (&watch->mutex);
 }
index adea8af0b748d244fdea7760b6184536318555f4..b27f813105800f0b666c3bd38c51581412baac98 100644 (file)
@@ -136,6 +136,10 @@ GST_RTSP_API
 GstRTSPResult      gst_rtsp_connection_send           (GstRTSPConnection *conn, GstRTSPMessage *message,
                                                        GTimeVal *timeout);
 
+GST_RTSP_API
+GstRTSPResult      gst_rtsp_connection_send_messages  (GstRTSPConnection *conn, GstRTSPMessage *messages, guint n_messages,
+                                                       GTimeVal *timeout);
+
 GST_RTSP_API
 GstRTSPResult      gst_rtsp_connection_receive        (GstRTSPConnection *conn, GstRTSPMessage *message,
                                                        GTimeVal *timeout);
@@ -313,6 +317,12 @@ GstRTSPResult      gst_rtsp_watch_send_message       (GstRTSPWatch *watch,
                                                       GstRTSPMessage *message,
                                                       guint *id);
 
+GST_RTSP_API
+GstRTSPResult      gst_rtsp_watch_send_messages      (GstRTSPWatch *watch,
+                                                      GstRTSPMessage *messages,
+                                                      guint n_messages,
+                                                      guint *id);
+
 GST_RTSP_API
 GstRTSPResult      gst_rtsp_watch_wait_backlog       (GstRTSPWatch * watch,
                                                       GTimeVal *timeout);