private bool _expectingSettingsAck;
private int _initialWindowSize;
private int _maxConcurrentStreams;
+ private int _pendingWindowUpdate;
private int _idleSinceTickCount;
private bool _disposed;
// We limit it per stream, and the user controls how many streams are created.
// So set the connection window size to a large value.
private const int ConnectionWindowSize = 64 * 1024 * 1024;
+ private const int ConnectionWindowThreshold = ConnectionWindowSize / 8;
public Http2Connection(HttpConnectionPool pool, SslStream stream)
{
_nextStream = 1;
_initialWindowSize = DefaultInitialWindowSize;
_maxConcurrentStreams = int.MaxValue;
+ _pendingWindowUpdate = 0;
}
private object SyncObject => _httpStreams;
await _writerLock.WaitAsync().ConfigureAwait(false);
try
{
- // We update both the connection-level and stream-level windows at the same time
- _outgoingBuffer.EnsureAvailableSpace((FrameHeader.Size + FrameHeader.WindowUpdateLength) * 2);
-
- WriteFrameHeader(new FrameHeader(FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, 0));
- BinaryPrimitives.WriteInt32BigEndian(_outgoingBuffer.AvailableSpan, amount);
- _outgoingBuffer.Commit(FrameHeader.WindowUpdateLength);
+ _outgoingBuffer.EnsureAvailableSpace(FrameHeader.Size + FrameHeader.WindowUpdateLength);
WriteFrameHeader(new FrameHeader(FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, streamId));
BinaryPrimitives.WriteInt32BigEndian(_outgoingBuffer.AvailableSpan, amount);
}
}
+ private void ExtendWindow(int amount)
+ {
+ Debug.Assert(amount > 0);
+
+ int windowUpdateSize;
+ lock (SyncObject)
+ {
+ Debug.Assert(_pendingWindowUpdate < ConnectionWindowThreshold);
+
+ _pendingWindowUpdate += amount;
+ if (_pendingWindowUpdate < ConnectionWindowThreshold)
+ {
+ return;
+ }
+
+ windowUpdateSize = _pendingWindowUpdate;
+ _pendingWindowUpdate = 0;
+ }
+
+ ValueTask ignored = SendWindowUpdateAsync(0, windowUpdateSize);
+ }
+
private void WriteFrameHeader(FrameHeader frameHeader)
{
Debug.Assert(_outgoingBuffer.AvailableMemory.Length >= FrameHeader.Size);
private readonly TaskCompletionSource<bool> _responseHeadersAvailable;
private ArrayBuffer _responseBuffer; // mutable struct, do not make this readonly
+ private int _pendingWindowUpdate;
private TaskCompletionSource<bool> _responseDataAvailable;
private bool _responseComplete;
private bool _responseAborted;
private bool _disposed;
+ private const int StreamWindowSize = DefaultInitialWindowSize;
+ private const int StreamWindowThreshold = StreamWindowSize / 8;
+
public Http2Stream(HttpRequestMessage request, Http2Connection connection, int streamId, int initialWindowSize)
{
_connection = connection;
_responseBuffer = new ArrayBuffer(InitialStreamBufferSize, usePool: true);
+ _pendingWindowUpdate = 0;
+
_streamWindow = new CreditManager(initialWindowSize);
_responseHeadersAvailable = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Debug.Assert(!_responseComplete);
- if (_responseBuffer.ActiveSpan.Length + buffer.Length > DefaultInitialWindowSize)
+ if (_responseBuffer.ActiveSpan.Length + buffer.Length > StreamWindowSize)
{
// Window size exceeded.
throw new Http2ProtocolException(Http2ProtocolErrorCode.FlowControlError);
}
}
- private int ReadFromBuffer(Span<byte> buffer)
+ private void ExtendWindow(int amount)
{
- Debug.Assert(_responseBuffer.ActiveSpan.Length > 0);
- Debug.Assert(buffer.Length > 0);
+ Debug.Assert(amount > 0);
+ Debug.Assert(_pendingWindowUpdate < StreamWindowThreshold);
- int bytesToRead = Math.Min(buffer.Length, _responseBuffer.ActiveSpan.Length);
- _responseBuffer.ActiveSpan.Slice(0, bytesToRead).CopyTo(buffer);
- _responseBuffer.Discard(bytesToRead);
+ if (_responseComplete)
+ {
+ // We have already read to the end of the response, so there's no need to send
+ // WINDOW_UPDATEs any more.
+ return;
+ }
- // Send a window update to the peer.
- // Don't wait for completion, which could happen asynchronously.
- ValueTask ignored = _connection.SendWindowUpdateAsync(_streamId, bytesToRead);
+ _pendingWindowUpdate += amount;
+ if (_pendingWindowUpdate < StreamWindowThreshold)
+ {
+ return;
+ }
- return bytesToRead;
- }
+ int windowUpdateSize = _pendingWindowUpdate;
+ _pendingWindowUpdate = 0;
- // TODO: ISSUE 31310: Cancellation support
+ ValueTask ignored = _connection.SendWindowUpdateAsync(_streamId, windowUpdateSize);
+ }
- public ValueTask<int> ReadDataAsync(Memory<byte> buffer, CancellationToken cancellationToken)
+ private (Task waitForData, int bytesRead) TryReadFromBuffer(Span<byte> buffer)
{
- if (buffer.Length == 0)
- {
- return new ValueTask<int>(0);
- }
+ Debug.Assert(buffer.Length > 0);
- Task onDataAvailable;
lock (SyncObject)
{
if (_disposed)
{
- return new ValueTask<int>(Task.FromException<int>(new ObjectDisposedException(nameof(Http2Stream))));
+ throw new ObjectDisposedException(nameof(Http2Stream));
}
if (_responseBuffer.ActiveSpan.Length > 0)
{
- return new ValueTask<int>(ReadFromBuffer(buffer.Span));
- }
+ int bytesRead = Math.Min(buffer.Length, _responseBuffer.ActiveSpan.Length);
+ _responseBuffer.ActiveSpan.Slice(0, bytesRead).CopyTo(buffer);
+ _responseBuffer.Discard(bytesRead);
- if (_responseComplete)
+ return (null, bytesRead);
+ }
+ else if (_responseComplete)
{
if (_responseAborted)
{
- return new ValueTask<int>(Task.FromException<int>(new IOException(SR.net_http_invalid_response)));
+ throw new IOException(SR.net_http_invalid_response);
}
- return new ValueTask<int>(0);
+ return (null, 0);
}
Debug.Assert(_responseDataAvailable == null);
Debug.Assert(!_responseAborted);
+
_responseDataAvailable = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
- onDataAvailable = _responseDataAvailable.Task;
+ return (_responseDataAvailable.Task, 0);
}
+ }
- return ReadDataAsyncCore(onDataAvailable, buffer);
-
- async ValueTask<int> ReadDataAsyncCore(Task onDataAvailable, Memory<byte> buffer)
+ public async ValueTask<int> ReadDataAsync(Memory<byte> buffer, CancellationToken cancellationToken)
+ {
+ if (buffer.Length == 0)
{
- await onDataAvailable.ConfigureAwait(false);
+ return 0;
+ }
- lock (SyncObject)
- {
- if (!_disposed && _responseBuffer.ActiveSpan.Length > 0)
- {
- return ReadFromBuffer(buffer.Span);
- }
+ Task waitForData;
+ int bytesRead;
- // If no data was made available, we must be at the end of the stream
- Debug.Assert(_responseComplete);
- return 0;
- }
+ (waitForData, bytesRead) = TryReadFromBuffer(buffer.Span);
+ if (waitForData != null)
+ {
+ await waitForData;
+ (waitForData, bytesRead) = TryReadFromBuffer(buffer.Span);
+ Debug.Assert(waitForData == null);
}
+
+ if (bytesRead != 0)
+ {
+ ExtendWindow(bytesRead);
+ _connection.ExtendWindow(bytesRead);
+ }
+
+ return bytesRead;
}
private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer)