rtmp2: Count outgoing bytes and acked bytes
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Fri, 14 Feb 2020 11:34:44 +0000 (12:34 +0100)
committerJan Alexander Steffens (heftig) <jsteffens@make.tv>
Fri, 21 Feb 2020 18:26:33 +0000 (19:26 +0100)
For statistics.

gst/rtmp2/rtmp/rtmpconnection.c

index ec41ea1..396d745 100644 (file)
@@ -80,7 +80,9 @@ struct _GstRtmpConnection
   guint32 out_window_ack_size, out_window_ack_size_pending;
 
   guint64 in_bytes_total;
+  guint64 out_bytes_total;
   guint64 in_bytes_acked;
+  guint64 out_bytes_acked;
 };
 
 
@@ -113,6 +115,8 @@ static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
     GstBuffer * buffer);
 static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
     guint32 in_chunk_size);
+static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
+    guint32 bytes);
 static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
     self, guint32 in_chunk_size);
 
@@ -548,19 +552,25 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
 {
   GOutputStream *os = G_OUTPUT_STREAM (obj);
   GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
+  gsize bytes_written = 0;
   GError *error = NULL;
   gboolean res;
 
   self->writing = FALSE;
 
-  res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, NULL,
-      &error);
+  res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
+      &bytes_written, &error);
+
+  self->out_bytes_total += bytes_written;
 
   if (!res) {
     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-      GST_INFO_OBJECT (self, "write cancelled");
+      GST_INFO_OBJECT (self,
+          "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
     } else {
-      GST_ERROR_OBJECT (self, "write error: %s", error->message);
+      GST_ERROR_OBJECT (self,
+          "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
+          error->message, bytes_written);
     }
     gst_rtmp_connection_emit_error (self);
     g_error_free (error);
@@ -568,7 +578,8 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
     return;
   }
 
-  GST_LOG_OBJECT (self, "write completed");
+  GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
+      bytes_written);
 
   gst_rtmp_connection_apply_protocol_control (self);
   gst_rtmp_connection_start_write (self);
@@ -717,9 +728,9 @@ gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
       break;
 
     case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
-      /* We don't really send ack requests that we care about, so ignore */
       GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
           pc.param);
+      gst_rtmp_connection_handle_ack (connection, pc.param);
       break;
 
     case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
@@ -827,6 +838,31 @@ gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
 }
 
 static void
+gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
+{
+  guint64 last_ack, new_ack;
+  guint32 last_ack_low, last_ack_high;
+
+  last_ack = self->out_bytes_acked;
+  last_ack_low = last_ack & G_MAXUINT32;
+  last_ack_high = (last_ack >> 32) & G_MAXUINT32;
+
+  if (bytes < last_ack_low) {
+    GST_WARNING_OBJECT (self,
+        "Acknowledgement bytes regression, assuming rollover: %"
+        G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
+    last_ack_high += 1;
+  }
+
+  new_ack = (((guint64) last_ack_high) << 32) | bytes;
+
+  GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
+      new_ack - last_ack);
+
+  self->out_bytes_acked = new_ack;
+}
+
+static void
 gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
     guint32 window_ack_size)
 {