rtmp2: Add gst_rtmp_output_stream_write_all_buffer_async
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Wed, 12 Feb 2020 15:43:30 +0000 (16:43 +0100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 21 Feb 2020 15:20:41 +0000 (15:20 +0000)
Similar to gst_rtmp_output_stream_write_all_bytes_async, but takes a
GstBuffer instead of a GBytes. It can also return the number of bytes
written, which might be lower in case of an error.

gst/rtmp2/rtmp/rtmputils.c
gst/rtmp2/rtmp/rtmputils.h

index c9bbc3a..9ad2ea5 100644 (file)
@@ -30,6 +30,8 @@ static void read_all_bytes_done (GObject * source, GAsyncResult * result,
     gpointer user_data);
 static void write_all_bytes_done (GObject * source, GAsyncResult * result,
     gpointer user_data);
+static void write_all_buffer_done (GObject * source, GAsyncResult * result,
+    gpointer user_data);
 
 void
 gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes)
@@ -154,6 +156,108 @@ gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
   return g_task_propagate_boolean (G_TASK (result), error);
 }
 
+typedef struct
+{
+  GstBuffer *buffer;
+  GstMapInfo map;
+  gboolean mapped;
+  gsize bytes_written;
+} WriteAllBufferData;
+
+static WriteAllBufferData *
+write_all_buffer_data_new (GstBuffer * buffer)
+{
+  WriteAllBufferData *data = g_slice_new0 (WriteAllBufferData);
+  data->buffer = gst_buffer_ref (buffer);
+  return data;
+}
+
+static void
+write_all_buffer_data_free (gpointer ptr)
+{
+  WriteAllBufferData *data = ptr;
+  if (data->mapped) {
+    gst_buffer_unmap (data->buffer, &data->map);
+  }
+  g_clear_pointer (&data->buffer, gst_buffer_unref);
+  g_slice_free (WriteAllBufferData, data);
+}
+
+void
+gst_rtmp_output_stream_write_all_buffer_async (GOutputStream * stream,
+    GstBuffer * buffer, int io_priority, GCancellable * cancellable,
+    GAsyncReadyCallback callback, gpointer user_data)
+{
+  GTask *task;
+  WriteAllBufferData *data;
+
+  g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+  g_return_if_fail (GST_IS_BUFFER (buffer));
+
+  task = g_task_new (stream, cancellable, callback, user_data);
+
+  data = write_all_buffer_data_new (buffer);
+  g_task_set_task_data (task, data, write_all_buffer_data_free);
+
+  if (!gst_buffer_map (buffer, &data->map, GST_MAP_READ)) {
+    GST_ERROR ("Failed to map %" GST_PTR_FORMAT, buffer);
+    g_task_return_new_error (task, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
+        "Failed to map buffer for reading");
+    g_object_unref (task);
+    return;
+  }
+
+  data->mapped = TRUE;
+
+  g_output_stream_write_all_async (stream, data->map.data, data->map.size,
+      io_priority, cancellable, write_all_buffer_done, task);
+}
+
+static void
+write_all_buffer_done (GObject * source, GAsyncResult * result,
+    gpointer user_data)
+{
+  GOutputStream *os = G_OUTPUT_STREAM (source);
+  GTask *task = user_data;
+  WriteAllBufferData *data = g_task_get_task_data (task);
+  GError *error = NULL;
+  gboolean res;
+
+  res = g_output_stream_write_all_finish (os, result, &data->bytes_written,
+      &error);
+
+  gst_buffer_unmap (data->buffer, &data->map);
+  data->mapped = FALSE;
+
+  if (!res) {
+    g_task_return_error (task, error);
+    g_object_unref (task);
+    return;
+  }
+
+  g_task_return_boolean (task, TRUE);
+  g_object_unref (task);
+}
+
+
+gboolean
+gst_rtmp_output_stream_write_all_buffer_finish (GOutputStream * stream,
+    GAsyncResult * result, gsize * bytes_written, GError ** error)
+{
+  WriteAllBufferData *data;
+  GTask *task;
+
+  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+  task = G_TASK (result);
+
+  data = g_task_get_task_data (task);
+  if (bytes_written) {
+    *bytes_written = data->bytes_written;
+  }
+
+  return g_task_propagate_boolean (task, error);
+}
+
 static const gchar ascii_table[128] = {
   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
   0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
index b427f37..07c437f 100644 (file)
@@ -20,6 +20,7 @@
 #ifndef _GST_RTMP_UTILS_H_
 #define _GST_RTMP_UTILS_H_
 
+#include <gst/gst.h>
 #include <gio/gio.h>
 
 G_BEGIN_DECLS
@@ -38,7 +39,13 @@ void gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream,
 gboolean gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
     GAsyncResult * result, GError ** error);
 
-void gst_rtmp_string_print_escaped (GString * string, const gchar *data,
+void gst_rtmp_output_stream_write_all_buffer_async (GOutputStream * stream,
+    GstBuffer * buffer, int io_priority, GCancellable * cancellable,
+    GAsyncReadyCallback callback, gpointer user_data);
+gboolean gst_rtmp_output_stream_write_all_buffer_finish (GOutputStream * stream,
+    GAsyncResult * result, gsize * bytes_written, GError ** error);
+
+void gst_rtmp_string_print_escaped (GString * string, const gchar * data,
     gssize size);
 
 G_END_DECLS