defer sending WINDOW_UDPATE frames until a minimum threshold has been met
authorGeoff Kizer <geoffrek>
Tue, 19 Feb 2019 09:43:27 +0000 (01:43 -0800)
committerGeoff Kizer <geoffrek>
Wed, 20 Feb 2019 00:02:55 +0000 (16:02 -0800)
Commit migrated from https://github.com/dotnet/corefx/commit/99f8d99738f78b6791e2b30b9fababa9f8daa643

src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Stream.cs

index d2bd8ea..8e51085 100644 (file)
@@ -39,6 +39,7 @@ namespace System.Net.Http
         private bool _expectingSettingsAck;
         private int _initialWindowSize;
         private int _maxConcurrentStreams;
+        private int _pendingWindowUpdate;
         private int _idleSinceTickCount;
 
         private bool _disposed;
@@ -55,6 +56,7 @@ namespace System.Net.Http
         // We limit it per stream, and the user controls how many streams are created.
         // So set the connection window size to a large value.
         private const int ConnectionWindowSize = 64 * 1024 * 1024;
+        private const int ConnectionWindowThreshold = ConnectionWindowSize / 8;
 
         public Http2Connection(HttpConnectionPool pool, SslStream stream)
         {
@@ -75,6 +77,7 @@ namespace System.Net.Http
             _nextStream = 1;
             _initialWindowSize = DefaultInitialWindowSize;
             _maxConcurrentStreams = int.MaxValue;
+            _pendingWindowUpdate = 0;
         }
 
         private object SyncObject => _httpStreams;
@@ -975,12 +978,7 @@ namespace System.Net.Http
             await _writerLock.WaitAsync().ConfigureAwait(false);
             try
             {
-                // We update both the connection-level and stream-level windows at the same time
-                _outgoingBuffer.EnsureAvailableSpace((FrameHeader.Size + FrameHeader.WindowUpdateLength) * 2);
-
-                WriteFrameHeader(new FrameHeader(FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, 0));
-                BinaryPrimitives.WriteInt32BigEndian(_outgoingBuffer.AvailableSpan, amount);
-                _outgoingBuffer.Commit(FrameHeader.WindowUpdateLength);
+                _outgoingBuffer.EnsureAvailableSpace(FrameHeader.Size + FrameHeader.WindowUpdateLength);
 
                 WriteFrameHeader(new FrameHeader(FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, streamId));
                 BinaryPrimitives.WriteInt32BigEndian(_outgoingBuffer.AvailableSpan, amount);
@@ -994,6 +992,28 @@ namespace System.Net.Http
             }
         }
 
+        private void ExtendWindow(int amount)
+        {
+            Debug.Assert(amount > 0);
+
+            int windowUpdateSize;
+            lock (SyncObject)
+            {
+                Debug.Assert(_pendingWindowUpdate < ConnectionWindowThreshold);
+
+                _pendingWindowUpdate += amount;
+                if (_pendingWindowUpdate < ConnectionWindowThreshold)
+                {
+                    return;
+                }
+
+                windowUpdateSize = _pendingWindowUpdate;
+                _pendingWindowUpdate = 0;
+            }
+
+            ValueTask ignored = SendWindowUpdateAsync(0, windowUpdateSize);
+        }
+
         private void WriteFrameHeader(FrameHeader frameHeader)
         {
             Debug.Assert(_outgoingBuffer.AvailableMemory.Length >= FrameHeader.Size);
index ece587a..c4a747c 100644 (file)
@@ -30,11 +30,15 @@ namespace System.Net.Http
             private readonly TaskCompletionSource<bool> _responseHeadersAvailable;
 
             private ArrayBuffer _responseBuffer; // mutable struct, do not make this readonly
+            private int _pendingWindowUpdate;
             private TaskCompletionSource<bool> _responseDataAvailable;
             private bool _responseComplete;
             private bool _responseAborted;
             private bool _disposed;
 
+            private const int StreamWindowSize = DefaultInitialWindowSize;
+            private const int StreamWindowThreshold = StreamWindowSize / 8;
+
             public Http2Stream(HttpRequestMessage request, Http2Connection connection, int streamId, int initialWindowSize)
             {
                 _connection = connection;
@@ -52,6 +56,8 @@ namespace System.Net.Http
 
                 _responseBuffer = new ArrayBuffer(InitialStreamBufferSize, usePool: true);
 
+                _pendingWindowUpdate = 0;
+
                 _streamWindow = new CreditManager(initialWindowSize);
 
                 _responseHeadersAvailable = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -171,7 +177,7 @@ namespace System.Net.Http
 
                     Debug.Assert(!_responseComplete);
 
-                    if (_responseBuffer.ActiveSpan.Length + buffer.Length > DefaultInitialWindowSize)
+                    if (_responseBuffer.ActiveSpan.Length + buffer.Length > StreamWindowSize)
                     {
                         // Window size exceeded.
                         throw new Http2ProtocolException(Http2ProtocolErrorCode.FlowControlError);
@@ -232,78 +238,92 @@ namespace System.Net.Http
                 }
             }
 
-            private int ReadFromBuffer(Span<byte> buffer)
+            private void ExtendWindow(int amount)
             {
-                Debug.Assert(_responseBuffer.ActiveSpan.Length > 0);
-                Debug.Assert(buffer.Length > 0);
+                Debug.Assert(amount > 0);
+                Debug.Assert(_pendingWindowUpdate < StreamWindowThreshold);
 
-                int bytesToRead = Math.Min(buffer.Length, _responseBuffer.ActiveSpan.Length);
-                _responseBuffer.ActiveSpan.Slice(0, bytesToRead).CopyTo(buffer);
-                _responseBuffer.Discard(bytesToRead);
+                if (_responseComplete)
+                {
+                    // We have already read to the end of the response, so there's no need to send
+                    // WINDOW_UPDATEs any more.
+                    return;
+                }
 
-                // Send a window update to the peer.
-                // Don't wait for completion, which could happen asynchronously.
-                ValueTask ignored = _connection.SendWindowUpdateAsync(_streamId, bytesToRead);
+                _pendingWindowUpdate += amount;
+                if (_pendingWindowUpdate < StreamWindowThreshold)
+                {
+                    return;
+                }
 
-                return bytesToRead;
-            }
+                int windowUpdateSize = _pendingWindowUpdate;
+                _pendingWindowUpdate = 0;
 
-            // TODO: ISSUE 31310: Cancellation support
+                ValueTask ignored = _connection.SendWindowUpdateAsync(_streamId, windowUpdateSize);
+            }
 
-            public ValueTask<int> ReadDataAsync(Memory<byte> buffer, CancellationToken cancellationToken)
+            private (Task waitForData, int bytesRead) TryReadFromBuffer(Span<byte> buffer)
             {
-                if (buffer.Length == 0)
-                {
-                    return new ValueTask<int>(0);
-                }
+                Debug.Assert(buffer.Length > 0);
 
-                Task onDataAvailable;
                 lock (SyncObject)
                 {
                     if (_disposed)
                     {
-                        return new ValueTask<int>(Task.FromException<int>(new ObjectDisposedException(nameof(Http2Stream))));
+                        throw new ObjectDisposedException(nameof(Http2Stream));
                     }
 
                     if (_responseBuffer.ActiveSpan.Length > 0)
                     {
-                        return new ValueTask<int>(ReadFromBuffer(buffer.Span));
-                    }
+                        int bytesRead = Math.Min(buffer.Length, _responseBuffer.ActiveSpan.Length);
+                        _responseBuffer.ActiveSpan.Slice(0, bytesRead).CopyTo(buffer);
+                        _responseBuffer.Discard(bytesRead);
 
-                    if (_responseComplete)
+                        return (null, bytesRead);
+                    }
+                    else if (_responseComplete)
                     {
                         if (_responseAborted)
                         {
-                            return new ValueTask<int>(Task.FromException<int>(new IOException(SR.net_http_invalid_response)));
+                            throw new IOException(SR.net_http_invalid_response);
                         }
 
-                        return new ValueTask<int>(0);
+                        return (null, 0);
                     }
 
                     Debug.Assert(_responseDataAvailable == null);
                     Debug.Assert(!_responseAborted);
+
                     _responseDataAvailable = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
-                    onDataAvailable = _responseDataAvailable.Task;
+                    return (_responseDataAvailable.Task, 0);
                 }
+            }
 
-                return ReadDataAsyncCore(onDataAvailable, buffer);
-
-                async ValueTask<int> ReadDataAsyncCore(Task onDataAvailable, Memory<byte> buffer)
+            public async ValueTask<int> ReadDataAsync(Memory<byte> buffer, CancellationToken cancellationToken)
+            {
+                if (buffer.Length == 0)
                 {
-                    await onDataAvailable.ConfigureAwait(false);
+                    return 0;
+                }
 
-                    lock (SyncObject)
-                    {
-                        if (!_disposed && _responseBuffer.ActiveSpan.Length > 0)
-                        {
-                            return ReadFromBuffer(buffer.Span);
-                        }
+                Task waitForData;
+                int bytesRead;
 
-                        // If no data was made available, we must be at the end of the stream
-                        Debug.Assert(_responseComplete);
-                        return 0;
-                    }
+                (waitForData, bytesRead) = TryReadFromBuffer(buffer.Span);
+                if (waitForData != null)
+                {
+                    await waitForData;
+                    (waitForData, bytesRead) = TryReadFromBuffer(buffer.Span);
+                    Debug.Assert(waitForData == null);
                 }
+
+                if (bytesRead != 0)
+                {
+                    ExtendWindow(bytesRead);
+                    _connection.ExtendWindow(bytesRead);
+                }
+
+                return bytesRead;
             }
 
             private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer)