rtmp2: Handle outgoing set chunk/window size properly
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Mon, 27 Jan 2020 15:22:20 +0000 (16:22 +0100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 21 Feb 2020 15:20:41 +0000 (15:20 +0000)
Apply outgoing sizes only after writing the chunk to the peer. This is
important particularly for the set chunk size and allows exposing it
without threading issues.

gst/rtmp2/rtmp/rtmpconnection.c

index 7d78377..02d3e80 100644 (file)
@@ -75,9 +75,9 @@ struct _GstRtmpConnection
 
   /* RTMP configuration */
   guint32 in_chunk_size;
-  guint32 out_chunk_size;
+  guint32 out_chunk_size, out_chunk_size_pending;
   guint32 in_window_ack_size;
-  guint32 out_window_ack_size;
+  guint32 out_window_ack_size, out_window_ack_size_pending;
 
   guint64 in_bytes_total;
   guint64 in_bytes_acked;
@@ -121,6 +121,12 @@ static void
 gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
     guint32 event_data);
 
+static gboolean
+gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
+    GstBuffer * buffer);
+static void
+gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
+
 typedef struct
 {
   gdouble transaction_id;
@@ -483,6 +489,14 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self)
     goto out;
   }
 
+  if (gst_rtmp_message_is_protocol_control (message)) {
+    if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
+      GST_ERROR_OBJECT (self,
+          "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
+      goto out;
+    }
+  }
+
   cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
   if (!cstream) {
     GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
@@ -555,6 +569,8 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
   }
 
   GST_LOG_OBJECT (self, "write completed");
+
+  gst_rtmp_connection_apply_protocol_control (self);
   gst_rtmp_connection_start_write (self);
   g_object_unref (self);
 }
@@ -1059,11 +1075,6 @@ gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
 
   g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
 
-  if (connection->out_window_ack_size == window_ack_size)
-    return;
-
-  connection->out_window_ack_size = window_ack_size;
-
   gst_rtmp_connection_queue_message (connection,
       gst_rtmp_message_new_protocol_control (&pc));
 }
@@ -1078,3 +1089,88 @@ gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
   gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
   gst_rtmp_connection_queue_message (connection, buffer);
 }
+
+static gboolean
+gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
+    GstBuffer * buffer)
+{
+  GstRtmpProtocolControl pc;
+
+  if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
+    GST_ERROR_OBJECT (self, "can't parse protocol control message");
+    return FALSE;
+  }
+
+  switch (pc.type) {
+    case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
+      guint32 chunk_size = pc.param;
+
+      GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
+          chunk_size);
+
+      if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
+        GST_ERROR_OBJECT (self,
+            "requested chunk size %" G_GUINT32_FORMAT " is too small",
+            chunk_size);
+        return FALSE;
+      }
+
+      if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
+        GST_ERROR_OBJECT (self,
+            "requested chunk size %" G_GUINT32_FORMAT " is too large",
+            chunk_size);
+        return FALSE;
+      }
+
+      if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
+        GST_WARNING_OBJECT (self,
+            "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
+      }
+
+      self->out_chunk_size_pending = pc.param;
+      break;
+    }
+
+    case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
+      guint32 window_ack_size = pc.param;
+
+      GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
+          window_ack_size);
+
+      if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
+        GST_WARNING_OBJECT (self,
+            "requesting small window ack size %" G_GUINT32_FORMAT,
+            window_ack_size);
+      }
+
+      self->out_window_ack_size_pending = window_ack_size;
+      break;
+    }
+
+    default:
+      break;
+  }
+
+  return TRUE;
+}
+
+static void
+gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
+{
+  guint32 chunk_size, window_ack_size;
+
+  chunk_size = self->out_chunk_size_pending;
+  if (chunk_size) {
+    self->out_chunk_size = chunk_size;
+    self->out_chunk_size_pending = 0;
+    GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
+  }
+
+  window_ack_size = self->out_window_ack_size_pending;
+  if (window_ack_size) {
+    self->out_window_ack_size = window_ack_size;
+    self->out_window_ack_size_pending = 0;
+    GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
+        window_ack_size);
+  }
+}