[ThreadStatic]
private static string[]? t_headerValues;
- private ValueTask<int>? _readAheadTask;
- private int _readAheadTaskLock; // 0 == free, 1 == held
+ private const int ReadAheadTask_NotStarted = 0;
+ private const int ReadAheadTask_Started = 1;
+ private const int ReadAheadTask_CompletionReserved = 2;
+ private int _readAheadTaskStatus;
+ private ValueTask<int> _readAheadTask;
private ArrayBuffer _readBuffer;
private long _idleSinceTickCount;
{
GC.SuppressFinalize(this);
_stream.Dispose();
-
- // Eat any exceptions from the read-ahead task. We don't need to log, as we expect
- // failures from this task due to closing the connection while a read is in progress.
- ValueTask<int>? readAheadTask = ConsumeReadAheadTask();
- if (readAheadTask != null)
- {
- IgnoreExceptions(readAheadTask.GetValueOrDefault());
- }
}
}
}
- /// <summary>Prepare an idle connection to be used for a new request.</summary>
+ /// <summary>Prepare an idle connection to be used for a new request.
+ /// The caller MUST call SendAsync afterwards if this method returns true.</summary>
/// <param name="async">Indicates whether the coming request will be sync or async.</param>
/// <returns>True if connection can be used, false if it is invalid due to a timeout or receiving EOF or unexpected data.</returns>
public bool PrepareForReuse(bool async)
// We may already have a read-ahead task if we did a previous scavenge and haven't used the connection since.
// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
- if (_readAheadTask is not null)
+ if (ReadAheadTaskHasStarted)
{
- return !_readAheadTask.Value.IsCompleted;
+ return TryOwnReadAheadTaskCompletion();
}
// Check to see if we've received anything on the connection; if we have, that's
}
else
{
+ Debug.Assert(_readAheadTaskStatus == ReadAheadTask_NotStarted);
+ _readAheadTaskStatus = ReadAheadTask_CompletionReserved;
+
// Perform an async read on the stream, since we're going to need to read from it
// anyway, and in doing so we can avoid the extra syscall.
try
#pragma warning disable CA2012 // we're very careful to ensure the ValueTask is only consumed once, even though it's stored into a field
_readAheadTask = _stream.ReadAsync(_readBuffer.AvailableMemory);
#pragma warning restore CA2012
- return !_readAheadTask.Value.IsCompleted;
+
+ return !_readAheadTask.IsCompleted;
}
catch (Exception error)
{
}
// We may already have a read-ahead task if we did a previous scavenge and haven't used the connection since.
-#pragma warning disable CA2012 // we're very careful to ensure the ValueTask is only consumed once, even though it's stored into a field
- _readAheadTask ??= ReadAheadWithZeroByteReadAsync();
-#pragma warning restore CA2012
+ EnsureReadAheadTaskHasStarted();
// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
- return !_readAheadTask.Value.IsCompleted;
+ return !_readAheadTask.IsCompleted;
+ }
+
+ private bool ReadAheadTaskHasStarted =>
+ _readAheadTaskStatus != ReadAheadTask_NotStarted;
+
+ private bool TryOwnReadAheadTaskCompletion() =>
+ Interlocked.CompareExchange(ref _readAheadTaskStatus, ReadAheadTask_CompletionReserved, ReadAheadTask_Started) == ReadAheadTask_Started;
+
+ private void EnsureReadAheadTaskHasStarted()
+ {
+ if (_readAheadTaskStatus == ReadAheadTask_NotStarted)
+ {
+ Debug.Assert(_readAheadTask == default);
+
+ _readAheadTaskStatus = ReadAheadTask_Started;
+
+#pragma warning disable CA2012 // we're very careful to ensure the ValueTask is only consumed once, even though it's stored into a field
+ _readAheadTask = ReadAheadWithZeroByteReadAsync();
+#pragma warning restore CA2012
+ }
async ValueTask<int> ReadAheadWithZeroByteReadAsync()
{
- Debug.Assert(_readAheadTask is null);
+ Debug.Assert(_readAheadTask == default);
Debug.Assert(_readBuffer.ActiveLength == 0);
- // Issue a zero-byte read.
- // If the underlying stream supports it, this will not complete until the stream has data available,
- // which will avoid pinning the connection's read buffer (and possibly allow us to release it to the buffer pool in the future, if desired).
- // If not, it will complete immediately.
- await _stream.ReadAsync(Memory<byte>.Empty).ConfigureAwait(false);
+ try
+ {
+ // Issue a zero-byte read.
+ // If the underlying stream supports it, this will not complete until the stream has data available,
+ // which will avoid pinning the connection's read buffer (and possibly allow us to release it to the buffer pool in the future, if desired).
+ // If not, it will complete immediately.
+ await _stream.ReadAsync(Memory<byte>.Empty).ConfigureAwait(false);
- // We don't know for sure that the stream actually has data available, so we need to issue a real read now.
- return await _stream.ReadAsync(_readBuffer.AvailableMemory).ConfigureAwait(false);
+ // We don't know for sure that the stream actually has data available, so we need to issue a real read now.
+ int read = await _stream.ReadAsync(_readBuffer.AvailableMemory).ConfigureAwait(false);
+
+ // PrepareForReuse will check TryOwnReadAheadTaskCompletion before calling into SendAsync.
+ // If we can own the completion from within the read-ahead task, it means that PrepareForReuse hasn't been called yet.
+ // In that case we've received EOF/erroneous data before we sent the request headers, and the connection can't be reused.
+ if (TryOwnReadAheadTaskCompletion())
+ {
+ if (NetEventSource.Log.IsEnabled()) Trace("Read-ahead task observed data before the request was sent.");
+ }
+
+ return read;
+ }
+ catch (Exception error) when (TryOwnReadAheadTaskCompletion())
+ {
+ if (NetEventSource.Log.IsEnabled()) Trace($"Error performing read ahead: {error}");
+
+ return 0;
+ }
}
}
GetIdleTicks(Environment.TickCount64) >= _keepAliveTimeoutSeconds * 1000;
}
- private ValueTask<int>? ConsumeReadAheadTask()
- {
- if (Interlocked.CompareExchange(ref _readAheadTaskLock, 1, 0) == 0)
- {
- ValueTask<int>? t = _readAheadTask;
- _readAheadTask = null;
- Volatile.Write(ref _readAheadTaskLock, 0);
- return t;
- }
-
- // We couldn't get the lock, which means it must already be held
- // by someone else who will consume the task.
- return null;
- }
-
public override long GetIdleTicks(long nowTicks) => nowTicks - _idleSinceTickCount;
public TransportContext? TransportContext => _transportContext;
throw new HttpRequestException(SR.net_http_request_invalid_char_encoding);
}
- public async Task<HttpResponseMessage> SendAsyncCore(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
+ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
{
Debug.Assert(_currentRequest == null, $"Expected null {nameof(_currentRequest)}.");
Debug.Assert(_readBuffer.ActiveLength == 0, "Unexpected data in read buffer");
+ Debug.Assert(_readAheadTaskStatus != ReadAheadTask_Started);
TaskCompletionSource<bool>? allowExpect100ToContinue = null;
Task? sendRequestContentTask = null;
// When the connection was taken out of the pool, a pre-emptive read was performed
// into the read buffer. We need to consume that read prior to issuing another read.
- ValueTask<int>? t = ConsumeReadAheadTask();
- if (t != null)
+ if (ReadAheadTaskHasStarted)
{
+ // If the read-ahead task completed synchronously, it would have claimed ownership of its completion,
+ // meaning that PrepareForReuse would have failed, and we wouldn't have called SendAsync.
+ // The task therefore shouldn't be 'default', as it's representing an async operation that had to yield at some point.
+ Debug.Assert(_readAheadTask != default);
+ Debug.Assert(_readAheadTaskStatus == ReadAheadTask_CompletionReserved);
+
// Handle the pre-emptive read. For the async==false case, hopefully the read has
// already completed and this will be a nop, but if it hasn't, the caller will be forced to block
// waiting for the async operation to complete. We will only hit this case for proxied HTTPS
// requests that use a pooled connection, as in that case we don't have a Socket we
// can poll and are forced to issue an async read.
- ValueTask<int> vt = t.GetValueOrDefault();
+ ValueTask<int> vt = _readAheadTask;
+ _readAheadTask = default;
+
int bytesRead;
if (vt.IsCompleted)
{
_readBuffer.Commit(bytesRead);
if (NetEventSource.Log.IsEnabled()) Trace($"Received {bytesRead} bytes.");
+
+ _readAheadTaskStatus = ReadAheadTask_NotStarted;
}
else
{
// Make sure to complete the allowExpect100ToContinue task if it exists.
allowExpect100ToContinue?.TrySetResult(false);
+ if (_readAheadTask != default)
+ {
+ Debug.Assert(_readAheadTaskStatus == ReadAheadTask_CompletionReserved);
+
+ LogExceptions(_readAheadTask.AsTask());
+ }
+
if (NetEventSource.Log.IsEnabled()) Trace($"Error sending request: {error}");
// In the rare case where Expect: 100-continue was used and then processing
}
}
- public Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) =>
- SendAsyncCore(request, async, cancellationToken);
-
private bool MapSendException(Exception exception, CancellationToken cancellationToken, out Exception mappedException)
{
if (CancellationHelper.ShouldWrapInOperationCanceledException(exception, cancellationToken))
// Does not throw on EOF. Also assumes there is no buffered data.
private async ValueTask InitialFillAsync(bool async)
{
- Debug.Assert(_readAheadTask == null);
+ Debug.Assert(!ReadAheadTaskHasStarted);
Debug.Assert(_readBuffer.AvailableLength == _readBuffer.Capacity);
Debug.Assert(_readBuffer.AvailableLength >= InitialReadBufferSize);
// Throws IOException on EOF. This is only called when we expect more data.
private async ValueTask FillAsync(bool async)
{
- Debug.Assert(_readAheadTask == null);
+ Debug.Assert(_readAheadTask == default);
_readBuffer.EnsureAvailableSpace(1);
// No data in read buffer.
// Do an unbuffered read directly against the underlying stream.
- Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers.");
+ Debug.Assert(_readAheadTask == default, "Read ahead task should have been consumed as part of the headers.");
int count = _stream.Read(destination);
if (NetEventSource.Log.IsEnabled()) Trace($"Received {count} bytes.");
return count;
// No data in read buffer.
// Do an unbuffered read directly against the underlying stream.
- Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers.");
+ Debug.Assert(_readAheadTask == default, "Read ahead task should have been consumed as part of the headers.");
int count = await _stream.ReadAsync(destination).ConfigureAwait(false);
if (NetEventSource.Log.IsEnabled()) Trace($"Received {count} bytes.");
return count;
if (_readBuffer.ActiveLength == 0)
{
// Do a buffered read directly against the underlying stream.
- Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers.");
+ Debug.Assert(_readAheadTask == default, "Read ahead task should have been consumed as part of the headers.");
if (destination.Length == 0)
{
if (_readBuffer.ActiveLength == 0)
{
// Do a buffered read directly against the underlying stream.
- Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers.");
+ Debug.Assert(_readAheadTask == default, "Read ahead task should have been consumed as part of the headers.");
Debug.Assert(_readBuffer.AvailableLength == _readBuffer.Capacity);
int bytesRead = await _stream.ReadAsync(_readBuffer.AvailableMemory).ConfigureAwait(false);
private void ReturnConnectionToPool()
{
Debug.Assert(_currentRequest == null, "Connection should no longer be associated with a request.");
- Debug.Assert(_readAheadTask == null, "Expected a previous initial read to already be consumed.");
+ Debug.Assert(_readAheadTask == default, "Expected a previous initial read to already be consumed.");
+ Debug.Assert(_readAheadTaskStatus == ReadAheadTask_NotStarted, "Expected SendAsync to reset the read-ahead task status.");
Debug.Assert(_readBuffer.ActiveLength == 0, "Unexpected data in connection read buffer.");
// If we decided not to reuse the connection (either because the server sent Connection: close,
{
public SocketsHttpHandler_HttpClientHandler_Asynchrony_Test(ITestOutputHelper output) : base(output) { }
+ [OuterLoop("Relies on finalization")]
+ [Fact]
+ public async Task ReadAheadTaskOnScavenge_ExceptionsAreObserved()
+ {
+ bool seenUnobservedExceptions = false;
+
+ EventHandler<UnobservedTaskExceptionEventArgs> eventHandler = (_, e) =>
+ {
+ if (e.Exception.InnerException?.Message == nameof(ReadAheadTaskOnScavenge_ExceptionsAreObserved))
+ {
+ seenUnobservedExceptions = true;
+ }
+ };
+
+ TaskScheduler.UnobservedTaskException += eventHandler;
+ try
+ {
+ for (int i = 0; i < 3; i++)
+ {
+ await MakeARequestWithoutDisposingTheHandlerAsync();
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ await Task.Delay(1000);
+ }
+ }
+ finally
+ {
+ TaskScheduler.UnobservedTaskException -= eventHandler;
+ }
+
+ Assert.False(seenUnobservedExceptions);
+
+ static async Task MakeARequestWithoutDisposingTheHandlerAsync()
+ {
+ var cts = new CancellationTokenSource();
+ var requestCompleted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ var handler = new SocketsHttpHandler();
+ handler.ConnectCallback = async (_, _) =>
+ {
+ cts.Cancel();
+ await requestCompleted.Task;
+
+ Task completedWhenFinalized = new SetOnFinalized().CompletedWhenFinalized.Task;
+
+ return new DelegateDelegatingStream(Stream.Null)
+ {
+ ReadAsyncMemoryFunc = async (_, _) =>
+ {
+ await completedWhenFinalized.WaitAsync(TestHelper.PassingTestTimeout);
+
+ throw new Exception(nameof(ReadAheadTaskOnScavenge_ExceptionsAreObserved));
+ }
+ };
+ };
+
+ handler.PooledConnectionIdleTimeout = TimeSpan.FromSeconds(1);
+
+ var client = new HttpClient(handler);
+
+ await Assert.ThrowsAsync<TaskCanceledException>(() => client.GetStringAsync("http://foo", cts.Token));
+
+ requestCompleted.SetResult();
+ }
+ }
+
[Fact]
public async Task ExecutionContext_Suppressed_Success()
{
[MethodImpl(MethodImplOptions.NoInlining)] // avoid JIT extending lifetime of the finalizable object
private static (Task completedOnFinalized, Task getRequest) MakeHttpRequestWithTcsSetOnFinalizationInAsyncLocal(HttpClient client, Uri uri)
{
- var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
-
// Put something in ExecutionContext, start the HTTP request, then undo the EC change.
- var al = new AsyncLocal<object>() { Value = new SetOnFinalized() { _completedWhenFinalized = tcs } };
+ var al = new AsyncLocal<SetOnFinalized>() { Value = new SetOnFinalized() };
+ TaskCompletionSource tcs = al.Value.CompletedWhenFinalized;
Task t = client.GetStringAsync(uri);
al.Value = null;
private sealed class SetOnFinalized
{
- internal TaskCompletionSource _completedWhenFinalized;
- ~SetOnFinalized() => _completedWhenFinalized.SetResult();
+ public readonly TaskCompletionSource CompletedWhenFinalized = new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ ~SetOnFinalized() => CompletedWhenFinalized.SetResult();
}
}