using System.Buffers;
using System.Diagnostics;
+using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
namespace System.Net
public ArrayBuffer(int initialSize, bool usePool = false)
{
+ Debug.Assert(initialSize > 0 || usePool);
+
_usePool = usePool;
- _bytes = usePool ? ArrayPool<byte>.Shared.Rent(initialSize) : new byte[initialSize];
+ _bytes = initialSize == 0
+ ? Array.Empty<byte>()
+ : usePool ? ArrayPool<byte>.Shared.Rent(initialSize) : new byte[initialSize];
_activeStart = 0;
_availableStart = 0;
}
byte[] array = _bytes;
_bytes = null!;
- if (_usePool && array != null)
+ if (array is not null)
{
- ArrayPool<byte>.Shared.Return(array);
+ ReturnBufferIfPooled(array);
}
}
+ // This is different from Dispose as the instance remains usable afterwards (_bytes will not be null).
+ public void ClearAndReturnBuffer()
+ {
+ Debug.Assert(_usePool);
+ Debug.Assert(_bytes is not null);
+
+ _activeStart = 0;
+ _availableStart = 0;
+
+ byte[] bufferToReturn = _bytes;
+ _bytes = Array.Empty<byte>();
+ ReturnBufferIfPooled(bufferToReturn);
+ }
+
public int ActiveLength => _availableStart - _activeStart;
public Span<byte> ActiveSpan => new Span<byte>(_bytes, _activeStart, _availableStart - _activeStart);
public ReadOnlySpan<byte> ActiveReadOnlySpan => new ReadOnlySpan<byte>(_bytes, _activeStart, _availableStart - _activeStart);
}
// Ensure at least [byteCount] bytes to write to.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EnsureAvailableSpace(int byteCount)
{
- if (byteCount <= AvailableLength)
+ if (byteCount > AvailableLength)
{
+ EnsureAvailableSpaceCore(byteCount);
+ }
+ }
+
+ private void EnsureAvailableSpaceCore(int byteCount)
+ {
+ Debug.Assert(AvailableLength < byteCount);
+
+ if (_bytes.Length == 0)
+ {
+ Debug.Assert(_usePool && _activeStart == 0 && _availableStart == 0);
+ _bytes = ArrayPool<byte>.Shared.Rent(byteCount);
return;
}
_activeStart = 0;
_bytes = newBytes;
- if (_usePool)
- {
- ArrayPool<byte>.Shared.Return(oldBytes);
- }
+ ReturnBufferIfPooled(oldBytes);
Debug.Assert(byteCount <= AvailableLength);
}
- // Ensure at least [byteCount] bytes to write to, up to the specified limit
- public void TryEnsureAvailableSpaceUpToLimit(int byteCount, int limit)
+ public void Grow()
{
- if (byteCount <= AvailableLength)
- {
- return;
- }
-
- int totalFree = _activeStart + AvailableLength;
- if (byteCount <= totalFree)
- {
- // We can free up enough space by just shifting the bytes down, so do so.
- Buffer.BlockCopy(_bytes, _activeStart, _bytes, 0, ActiveLength);
- _availableStart = ActiveLength;
- _activeStart = 0;
- Debug.Assert(byteCount <= AvailableLength);
- return;
- }
-
- if (_bytes.Length >= limit)
- {
- // Already at limit, can't grow further.
- return;
- }
-
- // Double the size of the buffer until we have enough space, or we hit the limit
- int desiredSize = Math.Min(ActiveLength + byteCount, limit);
- int newSize = _bytes.Length;
- do
- {
- newSize = Math.Min(newSize * 2, limit);
- } while (newSize < desiredSize);
-
- byte[] newBytes = _usePool ?
- ArrayPool<byte>.Shared.Rent(newSize) :
- new byte[newSize];
- byte[] oldBytes = _bytes;
-
- if (ActiveLength != 0)
- {
- Buffer.BlockCopy(oldBytes, _activeStart, newBytes, 0, ActiveLength);
- }
-
- _availableStart = ActiveLength;
- _activeStart = 0;
-
- _bytes = newBytes;
- if (_usePool)
- {
- ArrayPool<byte>.Shared.Return(oldBytes);
- }
-
- Debug.Assert(byteCount <= AvailableLength || desiredSize == limit);
+ EnsureAvailableSpaceCore(AvailableLength + 1);
}
- public void Grow()
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void ReturnBufferIfPooled(byte[] buffer)
{
- EnsureAvailableSpace(AvailableLength + 1);
+ // The buffer may be Array.Empty<byte>()
+ if (_usePool && buffer.Length > 0)
+ {
+ ArrayPool<byte>.Shared.Return(buffer);
+ }
}
}
}
private readonly Stream _stream;
// NOTE: These are mutable structs; do not make these readonly.
+ // ProcessIncomingFramesAsync and ProcessOutgoingFramesAsync are responsible for disposing/returning their respective buffers.
private ArrayBuffer _incomingBuffer;
private ArrayBuffer _outgoingBuffer;
#if DEBUG
// In debug builds, start with a very small buffer to induce buffer growing logic.
- private const int InitialConnectionBufferSize = 4;
+ private const int InitialConnectionBufferSize = FrameHeader.Size;
#else
- private const int InitialConnectionBufferSize = 4096;
+ // Rent enough space to receive a full data frame in one read call.
+ private const int InitialConnectionBufferSize = FrameHeader.Size + FrameHeader.MaxPayloadLength;
#endif
+
// The default initial window size for streams and connections according to the RFC:
// https://datatracker.ietf.org/doc/html/rfc7540#section-5.2.1
// Unlike HttpHandlerDefaults.DefaultInitialHttp2StreamWindowSize, this value should never be changed.
_pool = pool;
_stream = stream;
- _incomingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
- _outgoingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
+ _incomingBuffer = new ArrayBuffer(initialSize: 0, usePool: true);
+ _outgoingBuffer = new ArrayBuffer(initialSize: 0, usePool: true);
_hpackDecoder = new HPackDecoder(maxHeadersLength: pool.Settings.MaxResponseHeadersByteLength);
_ = ProcessIncomingFramesAsync();
await _stream.WriteAsync(_outgoingBuffer.ActiveMemory, cancellationToken).ConfigureAwait(false);
_rttEstimator.OnInitialSettingsSent();
- _outgoingBuffer.Discard(_outgoingBuffer.ActiveLength);
-
+ _outgoingBuffer.ClearAndReturnBuffer();
}
catch (Exception e)
{
+ // ProcessIncomingFramesAsync and ProcessOutgoingFramesAsync are responsible for disposing/returning their respective buffers.
+ // SetupAsync is the exception as it's responsible for starting the ProcessOutgoingFramesAsync loop.
+ // As we're about to throw and ProcessOutgoingFramesAsync will never be called, we must return the buffer here.
+ _outgoingBuffer.Dispose();
+
Dispose();
if (e is OperationCanceledException oce && oce.CancellationToken == cancellationToken)
// Ensure we've read enough data for the frame header.
if (_incomingBuffer.ActiveLength < FrameHeader.Size)
{
- _incomingBuffer.EnsureAvailableSpace(FrameHeader.Size - _incomingBuffer.ActiveLength);
do
{
+ // Issue a zero-byte read to avoid potentially pinning the buffer while waiting for more data.
+ await _stream.ReadAsync(Memory<byte>.Empty).ConfigureAwait(false);
+
+ _incomingBuffer.EnsureAvailableSpace(FrameHeader.Size);
+
int bytesRead = await _stream.ReadAsync(_incomingBuffer.AvailableMemory).ConfigureAwait(false);
_incomingBuffer.Commit(bytesRead);
if (bytesRead == 0)
_incomingBuffer.EnsureAvailableSpace(frameHeader.PayloadLength - _incomingBuffer.ActiveLength);
do
{
+ // Issue a zero-byte read to avoid potentially pinning the buffer while waiting for more data.
+ await _stream.ReadAsync(Memory<byte>.Empty).ConfigureAwait(false);
+
int bytesRead = await _stream.ReadAsync(_incomingBuffer.AvailableMemory).ConfigureAwait(false);
_incomingBuffer.Commit(bytesRead);
if (bytesRead == 0) ThrowPrematureEOF(frameHeader.PayloadLength);
// the entire frame's needs (not just the header).
if (_incomingBuffer.ActiveLength < FrameHeader.Size)
{
- _incomingBuffer.EnsureAvailableSpace(FrameHeader.Size - _incomingBuffer.ActiveLength);
do
{
+ // Issue a zero-byte read to avoid potentially pinning the buffer while waiting for more data.
+ ValueTask<int> zeroByteReadTask = _stream.ReadAsync(Memory<byte>.Empty);
+ if (!zeroByteReadTask.IsCompletedSuccessfully && _incomingBuffer.ActiveLength == 0)
+ {
+ // No data is available yet. Return the receive buffer back to the pool while we wait.
+ _incomingBuffer.ClearAndReturnBuffer();
+ }
+ await zeroByteReadTask.ConfigureAwait(false);
+
+ // While we only need FrameHeader.Size bytes to complete this read, it's better if we rent more
+ // to avoid multiple ReadAsync calls and resizes once we start copying the content.
+ _incomingBuffer.EnsureAvailableSpace(InitialConnectionBufferSize);
+
int bytesRead = await _stream.ReadAsync(_incomingBuffer.AvailableMemory).ConfigureAwait(false);
Debug.Assert(bytesRead >= 0);
_incomingBuffer.Commit(bytesRead);
Abort(e);
}
+ finally
+ {
+ _incomingBuffer.Dispose();
+ }
}
// Note, this will return null for a streamId that's no longer in use.
{
await FlushOutgoingBytesAsync().ConfigureAwait(false);
}
+
+ if (_outgoingBuffer.ActiveLength == 0)
+ {
+ _outgoingBuffer.ClearAndReturnBuffer();
+ }
}
}
catch (Exception e)
Debug.Fail($"Unexpected exception in {nameof(ProcessOutgoingFramesAsync)}: {e}");
}
+ finally
+ {
+ _outgoingBuffer.Dispose();
+ }
}
private Task SendSettingsAckAsync() =>
int bytesWritten;
while (!HPackEncoder.EncodeIndexedHeaderField(index, headerBuffer.AvailableSpan, out bytesWritten))
{
- headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
+ headerBuffer.Grow();
}
headerBuffer.Commit(bytesWritten);
int bytesWritten;
while (!HPackEncoder.EncodeLiteralHeaderFieldWithoutIndexing(index, value, valueEncoding: null, headerBuffer.AvailableSpan, out bytesWritten))
{
- headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
+ headerBuffer.Grow();
}
headerBuffer.Commit(bytesWritten);
int bytesWritten;
while (!HPackEncoder.EncodeLiteralHeaderFieldWithoutIndexingNewName(name, values, HttpHeaderParser.DefaultSeparator, valueEncoding, headerBuffer.AvailableSpan, out bytesWritten))
{
- headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
+ headerBuffer.Grow();
}
headerBuffer.Commit(bytesWritten);
int bytesWritten;
while (!HPackEncoder.EncodeStringLiterals(values, separator, valueEncoding, headerBuffer.AvailableSpan, out bytesWritten))
{
- headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
+ headerBuffer.Grow();
}
headerBuffer.Commit(bytesWritten);
int bytesWritten;
while (!HPackEncoder.EncodeStringLiteral(value, valueEncoding, headerBuffer.AvailableSpan, out bytesWritten))
{
- headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
+ headerBuffer.Grow();
}
headerBuffer.Commit(bytesWritten);
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(bytes.Length)}={bytes.Length}");
- if (bytes.Length > headerBuffer.AvailableLength)
- {
- headerBuffer.EnsureAvailableSpace(bytes.Length);
- }
-
+ headerBuffer.EnsureAvailableSpace(bytes.Length);
bytes.CopyTo(headerBuffer.AvailableSpan);
headerBuffer.Commit(bytes.Length);
}
_connectionWindow.Dispose();
_writeChannel.Writer.Complete();
+ // We're not disposing the _incomingBuffer and _outgoingBuffer here as they may still be in use by
+ // ProcessIncomingFramesAsync and ProcessOutgoingFramesAsync respectively, and those methods are
+ // responsible for returning the buffers.
+
if (HttpTelemetry.Log.IsEnabled())
{
if (Interlocked.Exchange(ref _markedByTelemetryStatus, TelemetryStatus_Closed) == TelemetryStatus_Opened)
server.Dispose();
}
}
-
- private sealed class ReadInterceptStream : DelegatingStream
- {
- private readonly Action<int> _readCallback;
-
- public ReadInterceptStream(Stream innerStream, Action<int> readCallback)
- : base(innerStream)
- {
- _readCallback = readCallback;
- }
-
- public override int Read(Span<byte> buffer)
- {
- _readCallback(buffer.Length);
- return base.Read(buffer);
- }
-
- public override int Read(byte[] buffer, int offset, int count)
- {
- _readCallback(count);
- return base.Read(buffer, offset, count);
- }
-
- public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
- {
- _readCallback(buffer.Length);
- return base.ReadAsync(buffer, cancellationToken);
- }
- }
}
public sealed class Http1ResponseStreamZeroByteReadTest : ResponseStreamZeroByteReadTestBase
}
}
}
+
+ [ConditionalClass(typeof(PlatformDetection), nameof(PlatformDetection.SupportsAlpn))]
+ public sealed class Http2ConnectionZeroByteReadTest : HttpClientHandlerTestBase
+ {
+ public Http2ConnectionZeroByteReadTest(ITestOutputHelper output) : base(output) { }
+
+ protected override Version UseVersion => HttpVersion.Version20;
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task ConnectionIssuesZeroByteReadsOnUnderlyingStream(bool useSsl)
+ {
+ await Http2LoopbackServer.CreateClientAndServerAsync(async uri =>
+ {
+ using HttpClientHandler handler = CreateHttpClientHandler();
+
+ int zeroByteReads = 0;
+ GetUnderlyingSocketsHttpHandler(handler).PlaintextStreamFilter = (context, _) =>
+ {
+ return new ValueTask<Stream>(new ReadInterceptStream(context.PlaintextStream, read =>
+ {
+ if (read == 0)
+ {
+ zeroByteReads++;
+ }
+ }));
+ };
+
+ using HttpClient client = CreateHttpClient(handler);
+ client.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionExact;
+
+ Assert.Equal("Foo", await client.GetStringAsync(uri));
+
+ Assert.NotEqual(0, zeroByteReads);
+ },
+ async server =>
+ {
+ await server.HandleRequestAsync(content: "Foo");
+ }, http2Options: new Http2Options { UseSsl = useSsl });
+ }
+ }
+
+ file sealed class ReadInterceptStream : DelegatingStream
+ {
+ private readonly Action<int> _readCallback;
+
+ public ReadInterceptStream(Stream innerStream, Action<int> readCallback)
+ : base(innerStream)
+ {
+ _readCallback = readCallback;
+ }
+
+ public override int Read(Span<byte> buffer)
+ {
+ _readCallback(buffer.Length);
+ return base.Read(buffer);
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ _readCallback(count);
+ return base.Read(buffer, offset, count);
+ }
+
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ _readCallback(buffer.Length);
+ return base.ReadAsync(buffer, cancellationToken);
+ }
+ }
}
void WriteBytes(ReadOnlySpan<byte> bytes)
{
- if (bytes.Length > buffer.AvailableLength)
- {
- buffer.EnsureAvailableSpace(bytes.Length);
- FillAvailableSpaceWithOnes(buffer);
- }
+ buffer.EnsureAvailableSpace(bytes.Length);
+ FillAvailableSpaceWithOnes(buffer);
bytes.CopyTo(buffer.AvailableSpan);
buffer.Commit(bytes.Length);
int bytesWritten;
while (!HPackEncoder.EncodeStringLiterals(values, separator, valueEncoding, buffer.AvailableSpan, out bytesWritten))
{
- buffer.EnsureAvailableSpace(buffer.AvailableLength + 1);
+ buffer.Grow();
FillAvailableSpaceWithOnes(buffer);
}
int bytesWritten;
while (!HPackEncoder.EncodeLiteralHeaderFieldWithoutIndexingNewName(name, values, HttpHeaderParser.DefaultSeparator, valueEncoding, buffer.AvailableSpan, out bytesWritten))
{
- buffer.EnsureAvailableSpace(buffer.AvailableLength + 1);
+ buffer.Grow();
FillAvailableSpaceWithOnes(buffer);
}
private const int InitialHandshakeBufferSize = 4096 + FrameOverhead; // try to fit at least 4K ServerCertificate
private const int ReadBufferSize = 4096 * 4 + FrameOverhead; // We read in 16K chunks + headers.
- private SslBuffer _buffer;
+ private SslBuffer _buffer = new();
// internal buffer for storing incoming data. Wrapper around ArrayBuffer which adds
// separation between decrypted and still encrypted part of the active region.
// padding between decrypted part of the active memory and following undecrypted TLS frame.
private int _decryptedPadding;
+ // Indicates whether the _buffer currently holds a rented buffer.
private bool _isValid;
- public SslBuffer(int initialSize)
+ public SslBuffer()
{
- _buffer = new ArrayBuffer(initialSize, true);
+ _buffer = new ArrayBuffer(initialSize: 0, usePool: true);
_decryptedLength = 0;
_decryptedPadding = 0;
- _isValid = true;
+ _isValid = false;
}
public bool IsValid => _isValid;
public void EnsureAvailableSpace(int byteCount)
{
- if (_isValid)
- {
- _buffer.EnsureAvailableSpace(byteCount);
- }
- else
- {
- _isValid = true;
- _buffer = new ArrayBuffer(byteCount, true);
- }
+ _isValid = true;
+ _buffer.EnsureAvailableSpace(byteCount);
}
public void Discard(int byteCount)
public void ReturnBuffer()
{
- _buffer.Dispose();
+ _buffer.ClearAndReturnBuffer();
_decryptedLength = 0;
_decryptedPadding = 0;
_isValid = false;