rtmp2: Chunk messages as buffers in loop thread
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Mon, 27 Jan 2020 13:05:31 +0000 (14:05 +0100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 21 Feb 2020 15:20:41 +0000 (15:20 +0000)
Move output chunking from gst_rtmp_connection_queue_message into
gst_rtmp_connection_start_write, which effectively moves it from the
streaming thread into the loop thread.

This allows us to handle the outgoing chunk-size message (which is
generated by changing the future chunk-size property) properly, which
could come from any other thread.

gst/rtmp2/rtmp/rtmpchunkstream.c
gst/rtmp2/rtmp/rtmpconnection.c

index cca8848..1cdd68b 100644 (file)
@@ -643,7 +643,7 @@ gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
   gst_rtmp_buffer_dump (buffer, ">>> message");
 
   chunk_stream_clear (cstream);
-  chunk_stream_take_buffer (cstream, buffer);
+  chunk_stream_take_buffer (cstream, gst_buffer_ref (buffer));
 
   return serialize_next (cstream, chunk_size, type);
 }
index 0733244..7d78377 100644 (file)
@@ -464,17 +464,39 @@ static void
 gst_rtmp_connection_start_write (GstRtmpConnection * self)
 {
   GOutputStream *os;
-  GstBuffer *chunks;
+  GstBuffer *message, *chunks;
+  GstRtmpMeta *meta;
+  GstRtmpChunkStream *cstream;
 
   if (self->writing) {
     return;
   }
 
-  chunks = g_async_queue_try_pop (self->output_queue);
-  if (!chunks) {
+  message = g_async_queue_try_pop (self->output_queue);
+  if (!message) {
     return;
   }
 
+  meta = gst_buffer_get_rtmp_meta (message);
+  if (!meta) {
+    GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
+    goto out;
+  }
+
+  cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
+  if (!cstream) {
+    GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
+        message);
+    goto out;
+  }
+
+  chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
+      self->out_chunk_size);
+  if (!chunks) {
+    GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
+    goto out;
+  }
+
   self->writing = TRUE;
   if (self->output_handler) {
     self->output_handler (self, self->output_handler_user_data);
@@ -486,6 +508,9 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self)
       g_object_ref (self));
 
   gst_buffer_unref (chunks);
+
+out:
+  gst_buffer_unref (message);
 }
 
 static void
@@ -910,24 +935,10 @@ start_write (gpointer user_data)
 void
 gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
 {
-  GstRtmpMeta *meta;
-  GstRtmpChunkStream *cstream;
-  GstBuffer *chunks;
-
   g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
   g_return_if_fail (GST_IS_BUFFER (buffer));
 
-  meta = gst_buffer_get_rtmp_meta (buffer);
-  g_return_if_fail (meta);
-
-  cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
-  g_return_if_fail (cstream);
-
-  chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer,
-      self->out_chunk_size);
-  g_return_if_fail (chunks);
-
-  g_async_queue_push (self->output_queue, chunks);
+  g_async_queue_push (self->output_queue, buffer);
   g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
       start_write, g_object_ref (self), g_object_unref);
 }