rtmp2: Consistently use GstBuffer for RTMP chunks
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Wed, 12 Feb 2020 15:55:15 +0000 (16:55 +0100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 21 Feb 2020 15:20:41 +0000 (15:20 +0000)
gst/rtmp2/rtmp/rtmpconnection.c

index c27bdfe..0733244 100644 (file)
@@ -97,7 +97,7 @@ static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
 static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
     gpointer user_data);
 static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
-static void gst_rtmp_connection_write_bytes_done (GObject * obj,
+static void gst_rtmp_connection_write_buffer_done (GObject * obj,
     GAsyncResult * result, gpointer user_data);
 static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
     guint needed_bytes);
@@ -232,7 +232,7 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
 {
   rtmpconnection->cancellable = g_cancellable_new ();
   rtmpconnection->output_queue =
-      g_async_queue_new_full ((GDestroyNotify) g_bytes_unref);
+      g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
   rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
   rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
 
@@ -464,14 +464,14 @@ static void
 gst_rtmp_connection_start_write (GstRtmpConnection * self)
 {
   GOutputStream *os;
-  GBytes *bytes;
+  GstBuffer *chunks;
 
   if (self->writing) {
     return;
   }
 
-  bytes = g_async_queue_try_pop (self->output_queue);
-  if (!bytes) {
+  chunks = g_async_queue_try_pop (self->output_queue);
+  if (!chunks) {
     return;
   }
 
@@ -481,10 +481,11 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self)
   }
 
   os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
-  gst_rtmp_output_stream_write_all_bytes_async (os, bytes,
-      G_PRIORITY_DEFAULT, self->cancellable,
-      gst_rtmp_connection_write_bytes_done, g_object_ref (self));
-  g_bytes_unref (bytes);
+  gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
+      self->cancellable, gst_rtmp_connection_write_buffer_done,
+      g_object_ref (self));
+
+  gst_buffer_unref (chunks);
 }
 
 static void
@@ -503,7 +504,7 @@ gst_rtmp_connection_emit_error (GstRtmpConnection * self)
 }
 
 static void
-gst_rtmp_connection_write_bytes_done (GObject * obj,
+gst_rtmp_connection_write_buffer_done (GObject * obj,
     GAsyncResult * result, gpointer user_data)
 {
   GOutputStream *os = G_OUTPUT_STREAM (obj);
@@ -513,7 +514,9 @@ gst_rtmp_connection_write_bytes_done (GObject * obj,
 
   self->writing = FALSE;
 
-  res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error);
+  res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, NULL,
+      &error);
+
   if (!res) {
     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
       GST_INFO_OBJECT (self, "write cancelled");
@@ -904,26 +907,12 @@ start_write (gpointer user_data)
   return G_SOURCE_REMOVE;
 }
 
-static void
-byte_array_take_buffer (GByteArray * byte_array, GstBuffer * buffer)
-{
-  GstMapInfo map;
-  gboolean ret;
-  ret = gst_buffer_map (buffer, &map, GST_MAP_READ);
-  g_assert (ret);
-  g_assert (byte_array->len + map.size <= (guint64) G_MAXUINT);
-  g_byte_array_append (byte_array, map.data, map.size);
-  gst_buffer_unmap (buffer, &map);
-  gst_buffer_unref (buffer);
-}
-
 void
 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
 {
   GstRtmpMeta *meta;
   GstRtmpChunkStream *cstream;
-  GstBuffer *out_buffer;
-  GByteArray *out_ba;
+  GstBuffer *chunks;
 
   g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
   g_return_if_fail (GST_IS_BUFFER (buffer));
@@ -934,20 +923,11 @@ gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
   cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
   g_return_if_fail (cstream);
 
-  out_buffer = gst_rtmp_chunk_stream_serialize_start (cstream, buffer,
+  chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer,
       self->out_chunk_size);
-  g_return_if_fail (out_buffer);
-
-  out_ba = g_byte_array_new ();
-
-  while (out_buffer) {
-    byte_array_take_buffer (out_ba, out_buffer);
-
-    out_buffer = gst_rtmp_chunk_stream_serialize_next (cstream,
-        self->out_chunk_size);
-  }
+  g_return_if_fail (chunks);
 
-  g_async_queue_push (self->output_queue, g_byte_array_free_to_bytes (out_ba));
+  g_async_queue_push (self->output_queue, chunks);
   g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
       start_write, g_object_ref (self), g_object_unref);
 }