From 5d720eb59e68b0c70a662fe1686c850763abe859 Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Fri, 14 Feb 2020 12:34:44 +0100 Subject: [PATCH] rtmp2: Count outgoing bytes and acked bytes For statistics. --- gst/rtmp2/rtmp/rtmpconnection.c | 48 +++++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c index ec41ea1..396d745 100644 --- a/gst/rtmp2/rtmp/rtmpconnection.c +++ b/gst/rtmp2/rtmp/rtmpconnection.c @@ -80,7 +80,9 @@ struct _GstRtmpConnection guint32 out_window_ack_size, out_window_ack_size_pending; guint64 in_bytes_total; + guint64 out_bytes_total; guint64 in_bytes_acked; + guint64 out_bytes_acked; }; @@ -113,6 +115,8 @@ static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer); static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self, guint32 in_chunk_size); +static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self, + guint32 bytes); static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self, guint32 in_chunk_size); @@ -548,19 +552,25 @@ gst_rtmp_connection_write_buffer_done (GObject * obj, { GOutputStream *os = G_OUTPUT_STREAM (obj); GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data); + gsize bytes_written = 0; GError *error = NULL; gboolean res; self->writing = FALSE; - res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, NULL, - &error); + res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, + &bytes_written, &error); + + self->out_bytes_total += bytes_written; if (!res) { if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - GST_INFO_OBJECT (self, "write cancelled"); + GST_INFO_OBJECT (self, + "write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written); } else { - GST_ERROR_OBJECT (self, "write error: %s", error->message); + GST_ERROR_OBJECT (self, + "write error: %s (wrote %" G_GSIZE_FORMAT " bytes)", + error->message, bytes_written); } gst_rtmp_connection_emit_error (self); g_error_free (error); @@ -568,7 +578,8 @@ gst_rtmp_connection_write_buffer_done (GObject * obj, return; } - GST_LOG_OBJECT (self, "write completed"); + GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes", + bytes_written); gst_rtmp_connection_apply_protocol_control (self); gst_rtmp_connection_start_write (self); @@ -717,9 +728,9 @@ gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection, break; case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT: - /* We don't really send ack requests that we care about, so ignore */ GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT, pc.param); + gst_rtmp_connection_handle_ack (connection, pc.param); break; case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE: @@ -827,6 +838,31 @@ gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self, } static void +gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes) +{ + guint64 last_ack, new_ack; + guint32 last_ack_low, last_ack_high; + + last_ack = self->out_bytes_acked; + last_ack_low = last_ack & G_MAXUINT32; + last_ack_high = (last_ack >> 32) & G_MAXUINT32; + + if (bytes < last_ack_low) { + GST_WARNING_OBJECT (self, + "Acknowledgement bytes regression, assuming rollover: %" + G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low); + last_ack_high += 1; + } + + new_ack = (((guint64) last_ack_high) << 32) | bytes; + + GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes", + new_ack - last_ack); + + self->out_bytes_acked = new_ack; +} + +static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self, guint32 window_ack_size) { -- 2.7.4