rtsp-client: Add support for sending buffer lists directly
authorSebastian Dröge <sebastian@centricular.com>
Mon, 17 Sep 2018 19:18:46 +0000 (22:18 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 30 Jan 2019 12:40:09 +0000 (14:40 +0200)
Fixes https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/issues/29

docs/libs/gst-rtsp-server-sections.txt
gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-client.h
gst/rtsp-server/rtsp-stream.c

index 369a57a..c1627da 100644 (file)
@@ -118,6 +118,8 @@ gst_rtsp_client_attach
 
 GstRTSPClientSendFunc
 gst_rtsp_client_set_send_func
+GstRTSPClientSendMessagesFunc
+gst_rtsp_client_set_send_messages_func
 
 gst_rtsp_client_handle_message
 gst_rtsp_client_send_message
index cce9222..020f2fa 100644 (file)
@@ -80,6 +80,9 @@ struct _GstRTSPClientPrivate
   GstRTSPClientSendFunc send_func;
   gpointer send_data;
   GDestroyNotify send_notify;
+  GstRTSPClientSendMessagesFunc send_messages_func;
+  gpointer send_messages_data;
+  GDestroyNotify send_messages_notify;
   guint close_seq;
   GArray *data_seqs;
 
@@ -753,6 +756,7 @@ gst_rtsp_client_finalize (GObject * obj)
   if (priv->watch)
     gst_rtsp_watch_set_flushing (priv->watch, TRUE);
   gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+  gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
 
   if (priv->watch)
     g_source_destroy ((GSource *) priv->watch);
@@ -1158,17 +1162,10 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
   GstRTSPClientPrivate *priv = client->priv;
   GstRTSPMessage message = { 0 };
   gboolean ret = TRUE;
-  GstMapInfo map_info;
-  guint8 *data;
-  guint usize;
 
   gst_rtsp_message_init_data (&message, channel);
 
-  /* FIXME, need some sort of iovec RTSPMessage here */
-  if (!gst_buffer_map (buffer, &map_info, GST_MAP_READ))
-    return FALSE;
-
-  gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
+  gst_rtsp_message_set_body_buffer (&message, buffer);
 
   g_mutex_lock (&priv->send_lock);
   if (get_data_seq (client, channel) != 0) {
@@ -1180,9 +1177,6 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
     ret = priv->send_func (client, &message, FALSE, priv->send_data);
   g_mutex_unlock (&priv->send_lock);
 
-  gst_rtsp_message_steal_body (&message, &data, &usize);
-  gst_buffer_unmap (buffer, &map_info);
-
   gst_rtsp_message_unset (&message);
 
   if (!ret) {
@@ -1202,18 +1196,50 @@ static gboolean
 do_send_data_list (GstBufferList * buffer_list, guint8 channel,
     GstRTSPClient * client)
 {
+  GstRTSPClientPrivate *priv = client->priv;
   gboolean ret = TRUE;
   guint i, n = gst_buffer_list_length (buffer_list);
+  GstRTSPMessage *messages;
+
+  g_mutex_lock (&priv->send_lock);
+  if (get_data_seq (client, channel) != 0) {
+    GST_WARNING ("already a queued data message for channel %d", channel);
+    g_mutex_unlock (&priv->send_lock);
+    return FALSE;
+  }
 
-  /* TODO: Needs support for a) queueing up multiple messages on the
-   * GstRTSPWatch in do_send_data() above and b) for one message having a body
-   * consisting of multiple parts here */
+  messages = g_newa (GstRTSPMessage, n);
+  memset (messages, 0, sizeof (GstRTSPMessage) * n);
   for (i = 0; i < n; i++) {
     GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+    gst_rtsp_message_init_data (&messages[i], channel);
+    gst_rtsp_message_set_body_buffer (&messages[i], buffer);
+  }
 
-    ret = do_send_data (buffer, channel, client);
-    if (!ret)
-      break;
+  if (priv->send_messages_func) {
+    ret =
+        priv->send_messages_func (client, messages, n, FALSE, priv->send_data);
+  } else if (priv->send_func) {
+    for (i = 0; i < n; i++) {
+      ret = priv->send_func (client, &messages[i], FALSE, priv->send_data);
+      if (!ret)
+        break;
+    }
+  }
+  g_mutex_unlock (&priv->send_lock);
+
+  for (i = 0; i < n; i++) {
+    gst_rtsp_message_unset (&messages[i]);
+  }
+
+  if (!ret) {
+    GSource *idle_src;
+
+    /* close in watch context */
+    idle_src = g_idle_source_new ();
+    g_source_set_callback (idle_src, do_close, client, NULL);
+    g_source_attach (idle_src, priv->watch_context);
+    g_source_unref (idle_src);
   }
 
   return ret;
@@ -1252,6 +1278,7 @@ gst_rtsp_client_close (GstRTSPClient * client)
     g_source_destroy ((GSource *) priv->watch);
     priv->watch = NULL;
     gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+    gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
     rtsp_ctrl_timeout_remove (priv);
     g_main_context_unref (priv->watch_context);
     priv->watch_context = NULL;
@@ -4185,6 +4212,47 @@ gst_rtsp_client_set_send_func (GstRTSPClient * client,
 }
 
 /**
+ * gst_rtsp_client_set_send_messages_func:
+ * @client: a #GstRTSPClient
+ * @func: (scope notified): a #GstRTSPClientSendMessagesFunc
+ * @user_data: (closure): user data passed to @func
+ * @notify: (allow-none): called when @user_data is no longer in use
+ *
+ * Set @func as the callback that will be called when new messages needs to be
+ * sent to the client. @user_data is passed to @func and @notify is called when
+ * @user_data is no longer in use.
+ *
+ * By default, the client will send the messages on the #GstRTSPConnection that
+ * was configured with gst_rtsp_client_attach() was called.
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_client_set_send_messages_func (GstRTSPClient * client,
+    GstRTSPClientSendMessagesFunc func, gpointer user_data,
+    GDestroyNotify notify)
+{
+  GstRTSPClientPrivate *priv;
+  GDestroyNotify old_notify;
+  gpointer old_data;
+
+  g_return_if_fail (GST_IS_RTSP_CLIENT (client));
+
+  priv = client->priv;
+
+  g_mutex_lock (&priv->send_lock);
+  priv->send_messages_func = func;
+  old_notify = priv->send_messages_notify;
+  old_data = priv->send_messages_data;
+  priv->send_messages_notify = notify;
+  priv->send_messages_data = user_data;
+  g_mutex_unlock (&priv->send_lock);
+
+  if (old_notify)
+    old_notify (old_data);
+}
+
+/**
  * gst_rtsp_client_handle_message:
  * @client: a #GstRTSPClient
  * @message: (transfer none): an #GstRTSPMessage
@@ -4317,6 +4385,71 @@ error:
   }
 }
 
+static gboolean
+do_send_messages (GstRTSPClient * client, GstRTSPMessage * messages,
+    guint n_messages, gboolean close, gpointer user_data)
+{
+  GstRTSPClientPrivate *priv = client->priv;
+  guint id = 0;
+  GstRTSPResult ret;
+  guint i;
+
+  /* send the message */
+  ret = gst_rtsp_watch_send_messages (priv->watch, messages, n_messages, &id);
+  if (ret != GST_RTSP_OK)
+    goto error;
+
+  /* if close flag is set, store the seq number so we can wait until it's
+   * written to the client to close the connection */
+  if (close)
+    priv->close_seq = id;
+
+  for (i = 0; i < n_messages; i++) {
+    if (gst_rtsp_message_get_type (&messages[i]) == GST_RTSP_MESSAGE_DATA) {
+      guint8 channel = 0;
+      GstRTSPResult r;
+
+      /* We assume that all data messages in the list are for the
+       * same channel */
+      r = gst_rtsp_message_parse_data (&messages[i], &channel);
+      if (r != GST_RTSP_OK) {
+        ret = r;
+        goto error;
+      }
+
+      /* check if the message has been queued for transmission in watch */
+      if (id) {
+        /* store the seq number so we can wait until it has been sent */
+        GST_DEBUG_OBJECT (client, "wait for message %d, channel %d", id,
+            channel);
+        set_data_seq (client, channel, id);
+      } else {
+        GstRTSPStreamTransport *trans;
+
+        trans =
+            g_hash_table_lookup (priv->transports,
+            GINT_TO_POINTER ((gint) channel));
+        if (trans) {
+          GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
+          g_mutex_unlock (&priv->send_lock);
+          gst_rtsp_stream_transport_message_sent (trans);
+          g_mutex_lock (&priv->send_lock);
+        }
+      }
+      break;
+    }
+  }
+
+  return ret == GST_RTSP_OK;
+
+  /* ERRORS */
+error:
+  {
+    GST_DEBUG_OBJECT (client, "got error %d", ret);
+    return FALSE;
+  }
+}
+
 static GstRTSPResult
 message_received (GstRTSPWatch * watch, GstRTSPMessage * message,
     gpointer user_data)
@@ -4378,6 +4511,7 @@ closed (GstRTSPWatch * watch, gpointer user_data)
   gst_rtsp_watch_set_flushing (watch, TRUE);
   g_mutex_lock (&priv->watch_lock);
   gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+  gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
   g_mutex_unlock (&priv->watch_lock);
 
   return GST_RTSP_OK;
@@ -4517,6 +4651,7 @@ handle_tunnel (GstRTSPClient * client)
     g_source_destroy ((GSource *) priv->watch);
     priv->watch = NULL;
     gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+    gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
   }
 
   return GST_RTSP_STS_OK;
@@ -4655,6 +4790,8 @@ gst_rtsp_client_attach (GstRTSPClient * client, GMainContext * context)
   gst_rtsp_client_set_send_func (client, do_send_message,
       g_source_ref ((GSource *) priv->watch),
       (GDestroyNotify) gst_rtsp_watch_unref);
+  gst_rtsp_client_set_send_messages_func (client, do_send_messages, priv->watch,
+      (GDestroyNotify) gst_rtsp_watch_unref);
 
   gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE);
 
index 01846bc..ebf6aec 100644 (file)
@@ -62,6 +62,27 @@ typedef gboolean (*GstRTSPClientSendFunc)      (GstRTSPClient *client,
                                                 gpointer user_data);
 
 /**
+ * GstRTSPClientSendMessagesFunc:
+ * @client: a #GstRTSPClient
+ * @messages: #GstRTSPMessage
+ * @n_messages: number of messages
+ * @close: close the connection
+ * @user_data: user data when registering the callback
+ *
+ * This callback is called when @client wants to send @messages. When @close is
+ * %TRUE, the connection should be closed when the message has been sent.
+ *
+ * Returns: %TRUE on success.
+ *
+ * Since: 1.16
+ */
+typedef gboolean (*GstRTSPClientSendMessagesFunc)      (GstRTSPClient *client,
+                                                        GstRTSPMessage *messages,
+                                                        guint n_messages,
+                                                        gboolean close,
+                                                        gpointer user_data);
+
+/**
  * GstRTSPClient:
  *
  * The client object represents the connection and its state with a client.
@@ -196,6 +217,12 @@ void                  gst_rtsp_client_set_send_func     (GstRTSPClient *client,
                                                          GDestroyNotify notify);
 
 GST_RTSP_SERVER_API
+void                  gst_rtsp_client_set_send_messages_func (GstRTSPClient *client,
+                                                              GstRTSPClientSendMessagesFunc func,
+                                                              gpointer user_data,
+                                                              GDestroyNotify notify);
+
+GST_RTSP_SERVER_API
 GstRTSPResult         gst_rtsp_client_handle_message    (GstRTSPClient *client,
                                                          GstRTSPMessage *message);
 
index b9d2e7f..1161257 100644 (file)
@@ -2480,12 +2480,12 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
   buffer = gst_sample_get_buffer (sample);
   buffer_list = gst_sample_get_buffer_list (sample);
 
-  /* We will get one message-sent notification per message,
-   * i.e. per buffer that is actually sent out */
+  /* We will get one message-sent notification per buffer or
+   * complete buffer-list. We handle each buffer-list as a unit */
   if (buffer)
     n_messages += 1;
   if (buffer_list)
-    n_messages += gst_buffer_list_length (buffer_list);
+    n_messages += 1;
 
   is_rtp = (idx == 0);