{
if (initialFrame && NetEventSource.IsEnabled)
{
- string response = System.Text.Encoding.ASCII.GetString(_incomingBuffer.ActiveSpan.Slice(0, Math.Min(20, _incomingBuffer.ActiveSpan.Length)));
+ string response = Encoding.ASCII.GetString(_incomingBuffer.ActiveSpan.Slice(0, Math.Min(20, _incomingBuffer.ActiveSpan.Length)));
Trace($"HTTP/2 handshake failed. Server returned {response}");
}
}
catch (Exception e)
{
- if (NetEventSource.IsEnabled) Trace($"ProcessIncomingFramesAsync: {e.Message}");
+ if (NetEventSource.IsEnabled) Trace($"{nameof(ProcessIncomingFramesAsync)}: {e.Message}");
if (!_disposed)
{
}
else
{
- // Send request body, if any
- Task bodyTask = http2Stream.SendRequestBodyAsync(cancellationToken);
- // read response headers.
+ // Send request body, if any, and read response headers.
+ Task requestBodyTask = http2Stream.SendRequestBodyAsync(cancellationToken);
Task responseHeadersTask = http2Stream.ReadResponseHeadersAsync(cancellationToken);
- if (bodyTask == await Task.WhenAny(bodyTask, responseHeadersTask).ConfigureAwait(false) ||
- bodyTask.IsCompleted)
+ // Wait for either task to complete. The best and most common case is when the request body completes
+ // before the response headers, in which case we can fully process the sending of the request and then
+ // fully process the sending of the response. WhenAny is not free, so we do a fast-path check to see
+ // if the request body completed synchronously, only progressing to do the WhenAny if it didn't. Then
+ // if the WhenAny completes and either the WhenAny indicated that the request body completed or
+ // both tasks completed, we can proceed to handle the request body as if it completed first.
+ if (requestBodyTask.IsCompleted ||
+ requestBodyTask == await Task.WhenAny(requestBodyTask, responseHeadersTask).ConfigureAwait(false) ||
+ requestBodyTask.IsCompleted)
{
// The sending of the request body completed before receiving all of the request headers.
- Task t = bodyTask;
- bodyTask = null;
+ // This is the common and desirable case.
try
{
- await t.ConfigureAwait(false);
+ await requestBodyTask.ConfigureAwait(false);
}
catch (Exception e)
{
- if (NetEventSource.IsEnabled) Trace($"SendRequestBody Task failed. {e}");
- // Observe exception (if any) on responseHeadersTask.
- LogExceptions(responseHeadersTask);
+ if (NetEventSource.IsEnabled) Trace($"{nameof(http2Stream.SendRequestBodyAsync)} failed. {e}");
+ LogExceptions(responseHeadersTask); // Observe exception (if any) on responseHeadersTask.
throw;
}
-
- await responseHeadersTask.ConfigureAwait(false);
}
else
{
- // We received the response headers but the request body hasn't yet finished.
- // If the connection is aborted or if we get RST or GOAWAY from server, exception will be
- // stored in stream._abortException and propagated to up to caller if possible while processing response.
- LogExceptions(bodyTask);
- bodyTask = null;
- // Pick up any exceptions from the header Task.
- await responseHeadersTask.ConfigureAwait(false);
+ // We received the response headers but the request body hasn't yet finished; this most commonly happens
+ // when the protocol is being used to enable duplex communication. If the connection is aborted or if we
+ // get RST or GOAWAY from server, exception will be stored in stream._abortException and propagated up
+ // to caller if possible while processing response, but make sure that we log any exceptions from this task
+ // completing asynchronously).
+ LogExceptions(requestBodyTask);
}
+
+ // Wait for the response headers to complete if they haven't already, propagating any exceptions.
+ await responseHeadersTask.ConfigureAwait(false);
}
}
catch (Exception e)
http2Stream.Cancel();
}
- if (oce.CancellationToken != cancellationToken)
+ if (cancellationToken.IsCancellationRequested && oce.CancellationToken != cancellationToken)
{
replacementException = new OperationCanceledException(oce.Message, oce, cancellationToken);
}
{
private sealed class Http2Stream : IValueTaskSource, IDisposable
{
- private enum StreamState : byte
- {
- ExpectingStatus,
- ExpectingIgnoredHeaders,
- ExpectingHeaders,
- ExpectingData,
- ExpectingTrailingHeaders,
- Complete,
- Aborted
- }
-
private const int InitialStreamBufferSize =
#if DEBUG
10;
private bool _disposed;
private Exception _abortException;
- /// <summary>The core logic for the IValueTaskSource implementation.</summary>
+ /// <summary>
+ /// The core logic for the IValueTaskSource implementation.
+ ///
+ /// Thread-safety:
+ /// _waitSource is used to coordinate between a producer indicating that something is available to process (either the connection's event loop
+ /// or a cancellation request) and a consumer doing that processing. There must only ever be a single consumer, namely this stream reading
+ /// data associated with the response. Because there is only ever at most one consumer, producers can trust that if _hasWaiter is true,
+ /// until the _waitSource is then set, no consumer will attempt to reset the _waitSource. A producer must still take SyncObj in order to
+ /// coordinate with other producers (e.g. a race between data arriving from the event loop and cancellation being requested), but while holding
+ /// the lock it can check whether _hasWaiter is true, and if it is, set _hasWaiter to false, exit the lock, and then set the _waitSource. Another
+ /// producer coming along will then see _hasWaiter as false and will not attempt to concurrently set _waitSource (which would violate _waitSource's
+ /// thread-safety), and no other consumer could come along in the interim, because _hasWaiter being true means that a consumer is already waiting
+ /// for _waitSource to be set, and legally there can only be one consumer. Once this producer sets _waitSource, the consumer could quickly loop
+ /// around to wait again, but invariants have all been maintained in the interim, and the consumer would need to take the SyncObj lock in order to
+ /// Reset _waitSource.
+ /// </summary>
private ManualResetValueTaskSourceCore<bool> _waitSource = new ManualResetValueTaskSourceCore<bool> { RunContinuationsAsynchronously = true }; // mutable struct, do not make this readonly
/// <summary>
/// Whether code has requested or is about to request a wait be performed and thus requires a call to SetResult to complete it.
- /// This is read and written while holding the lock so that most operations on _waitSourceCore don't need to be.
+ /// This is read and written while holding the lock so that most operations on _waitSource don't need to be.
/// </summary>
private bool _hasWaiter;
{
using (Http2WriteStream writeStream = new Http2WriteStream(this))
{
- // TODO: until #9071 is fixed, cancellation on content.CopyToAsync does not work.
- // To work around it, register delegate and set _abortException as needed.
- using (cancellationToken.UnsafeRegister(stream => { if (((Http2Stream)stream)._abortException == null) ((Http2Stream)stream)._abortException = new OperationCanceledException(); }, this))
+ // TODO: until #9071 is fixed, cancellation on content.CopyToAsync does not apply for most content types,
+ // because most content types aren't passed the token given to this internal overload of CopyToAsync.
+ // To work around it, we register to set _abortException as needed; this won't preempt reads issued to
+ // the source content, but it will at least enable the writes then performed on our write stream to see
+ // that cancellation was requested and abort, rather than waiting for the whole copy to complete.
+ using (cancellationToken.UnsafeRegister(stream =>
+ {
+ var thisRef = (Http2Stream)stream;
+ if (thisRef._abortException == null)
+ {
+ Interlocked.CompareExchange(ref thisRef._abortException, new OperationCanceledException(), null);
+ }
+ }, this))
{
await _request.Content.CopyToAsync(writeStream, null, cancellationToken).ConfigureAwait(false);
}
if (_abortException == null)
{
- // If we are still the response after receiving response headers, this will give us a chance to propagate exception up.
- // Since we failed while Copying stream, wrap it as IOException if needed.
- _abortException = e;
+ // If we are still processing the response after receiving response headers,
+ // this will give us a chance to propagate exception up.
+ Interlocked.CompareExchange(ref _abortException, e, null);
}
throw;
Task response = ReadResponseHeadersAsync(cancellationToken);
using (var expect100Timer = new Timer(
- s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
- allowExpect100ToContinue, _connection._pool.Settings._expect100ContinueTimeout, Timeout.InfiniteTimeSpan))
+ s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
+ allowExpect100ToContinue, _connection._pool.Settings._expect100ContinueTimeout, Timeout.InfiniteTimeSpan))
{
// By now, either we got response from server or timer expired.
sendRequestContent = await allowExpect100ToContinue.Task.ConfigureAwait(false);
}
else
{
- if (NetEventSource.IsEnabled) _connection.Trace("Invalid response pseudo-header '{System.Text.Encoding.ASCII.GetString(name)}'.");
+ if (NetEventSource.IsEnabled) _connection.Trace($"Invalid response pseudo-header '{Encoding.ASCII.GetString(name)}'.");
throw new Http2ProtocolException(SR.net_http_invalid_response);
}
}
if (_state != StreamState.ExpectingHeaders && _state != StreamState.ExpectingTrailingHeaders)
{
- if (NetEventSource.IsEnabled) _connection.Trace($"Received header before status.");
+ if (NetEventSource.IsEnabled) _connection.Trace("Received header before status.");
throw new Http2ProtocolException(SR.net_http_invalid_response);
}
return;
}
- _abortException = abortException;
+ Interlocked.CompareExchange(ref _abortException, abortException, null);
_state = StreamState.Aborted;
signalWaiter = _hasWaiter;
(wait, emptyResponse) = TryEnsureHeaders();
if (wait)
{
+ Debug.Assert(_hasWaiter, $"{nameof(TryEnsureHeaders)} should have set _hasWaiter to true.");
await GetWaiterTask(cancellationToken).ConfigureAwait(false);
(wait, emptyResponse) = TryEnsureHeaders();
{
// Synchronously block waiting for data to be produced.
Debug.Assert(bytesRead == 0);
- GetWaiterTask(cancellationToken).GetAwaiter().GetResult();
+ Debug.Assert(_hasWaiter, $"{nameof(TryReadFromBuffer)} should have set _hasWaiter to true.");
+ GetWaiterTask(cancellationToken).AsTask().GetAwaiter().GetResult();
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
(wait, bytesRead) = TryReadFromBuffer(buffer);
Debug.Assert(!wait);
if (wait)
{
Debug.Assert(bytesRead == 0);
+ Debug.Assert(_hasWaiter, $"{nameof(TryReadFromBuffer)} should have set _hasWaiter to true.");
await GetWaiterTask(cancellationToken).ConfigureAwait(false);
(wait, bytesRead) = TryReadFromBuffer(buffer.Span);
Debug.Assert(!wait);
lock (SyncObject)
{
IgnoreExceptions(_connection.SendRstStreamAsync(_streamId, Http2ProtocolErrorCode.Cancel));
- _abortException = new OperationCanceledException();
+ Interlocked.CompareExchange(ref _abortException, new OperationCanceledException(), null);
_state = StreamState.Aborted;
signalWaiter = _hasWaiter;
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _waitSource.GetStatus(token);
void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _waitSource.OnCompleted(continuation, state, token, flags);
void IValueTaskSource.GetResult(short token) => _waitSource.GetResult(token);
- private async Task GetWaiterTask(CancellationToken cancellationToken)
+ private ValueTask GetWaiterTask(CancellationToken cancellationToken)
{
- var vt = new ValueTask(this, _waitSource.Version).AsTask();
- using (cancellationToken.Register(s =>
- {
- Http2Stream stream = (Http2Stream)s;
- bool signalWaiter;
- lock (stream.SyncObject)
+ // No locking is required here to access _waitSource. To be here, we've already updated _hasWaiter (while holding the lock)
+ // to indicate that we would be creating this waiter, and at that point the only code that could be await'ing _waitSource or
+ // Reset'ing it is this code here. It's possible for this to race with the _waitSource being completed, but that's ok and is
+ // handled by _waitSource as one of its primary purposes.
+ Debug.Assert(_hasWaiter, $"This should only be called after we've transitioned _hasWaiter to true to enable this {nameof(GetWaiterTask)} call.");
+
+ // With HttpClient, the supplied cancellation token will always be cancelable, as HttpClient supplies a token that
+ // will have cancellation requested if CancelPendingRequests is called (or when a non-infinite Timeout expires).
+ // However, this could still be non-cancelable if HttpMessageInvoker was used, at which point this will only be
+ // cancelable if the caller's token was cancelable. To avoid the extra allocation here in such a case, we make
+ // this pay-for-play: if the token isn't cancelable, return a ValueTask wrapping this object directly, and only
+ // if it is cancelable, then register for the cancellation callback, allocate a task for the asynchronously
+ // completing case, etc.
+ return cancellationToken.CanBeCanceled ?
+ new ValueTask(GetWaiterTaskCore()) :
+ new ValueTask(this, _waitSource.Version);
+
+ async Task GetWaiterTaskCore()
+ {
+ using (cancellationToken.UnsafeRegister(s =>
+ {
+ var thisRef = (Http2Stream)s;
+
+ bool signalWaiter;
+ lock (thisRef.SyncObject)
+ {
+ signalWaiter = thisRef._hasWaiter;
+ thisRef._hasWaiter = false;
+ }
+
+ if (signalWaiter)
+ {
+ // Wake up the wait. It will then immediately check whether cancellation was requested and throw if it was.
+ thisRef._waitSource.SetResult(true);
+ }
+ }, this))
{
- signalWaiter = stream._hasWaiter;
- stream._hasWaiter = false;
+ await new ValueTask(this, _waitSource.Version).ConfigureAwait(false);
}
- if (signalWaiter) stream._waitSource.SetException(new OperationCanceledException());
- }, this))
- {
- await vt.ConfigureAwait(false);
+ CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
}
+ }
- CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
+ private enum StreamState : byte
+ {
+ ExpectingStatus,
+ ExpectingIgnoredHeaders,
+ ExpectingHeaders,
+ ExpectingData,
+ ExpectingTrailingHeaders,
+ Complete,
+ Aborted
}
private sealed class Http2ReadStream : HttpBaseStream
Http2Stream http2Stream = _http2Stream ?? throw new ObjectDisposedException(nameof(Http2ReadStream));
if (http2Stream._abortException != null)
{
- ExceptionDispatchInfo.Throw(new IOException(SR.net_http_client_execution_error, http2Stream._abortException));
+ throw new IOException(SR.net_http_client_execution_error, http2Stream._abortException);
}
return http2Stream.ReadData(destination, CancellationToken.None);