await stream.SendResponseAsync(statusCode, headers, content).ConfigureAwait(false);
- // closing the connection here causes bytes written to streams to go missing.
- // Regardless, we told the client we are closing so it shouldn't matter -- they should not use this connection anymore.
- //await CloseAsync(H3_NO_ERROR).ConfigureAwait(false);
+ await WaitForClientDisconnectAsync();
return request;
}
+ // Wait for the client to close the connection, e.g. after we send a GOAWAY, or after the HttpClient is disposed.
+ public async Task WaitForClientDisconnectAsync()
+ {
+ while (true)
+ {
+ Http3LoopbackStream stream;
+
+ try
+ {
+ stream = await AcceptRequestStreamAsync().ConfigureAwait(false);
+ }
+ catch (QuicConnectionAbortedException abortException) when (abortException.ErrorCode == H3_NO_ERROR)
+ {
+ break;
+ }
+
+ using (stream)
+ {
+ await stream.AbortAndWaitForShutdownAsync(H3_REQUEST_REJECTED);
+ }
+ }
+
+ await CloseAsync(H3_NO_ERROR);
+ }
+
public override async Task WaitForCancellationAsync(bool ignoreIncomingData = true, int requestId = 0)
{
await GetOpenRequest(requestId).WaitForCancellationAsync(ignoreIncomingData).ConfigureAwait(false);
internal MockStream OpenStream(long streamId, bool bidirectional)
{
+ CheckDisposed();
+
ConnectionState? state = _state;
if (state is null)
{
catch (ChannelClosedException)
{
long errorCode = _isClient ? state._serverErrorCode : state._clientErrorCode;
- throw new QuicConnectionAbortedException(errorCode);
+ throw (errorCode == -1) ? new QuicOperationAbortedException() : new QuicConnectionAbortedException(errorCode);
}
}
internal override ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken = default)
{
+ // TODO: We should abort local streams (and signal the peer to do likewise)
+ // Currently, we are not tracking the streams associated with this connection.
+
ConnectionState? state = _state;
if (state is not null)
{
if (_isClient)
{
state._clientErrorCode = errorCode;
+ DrainAcceptQueue(-1, errorCode);
}
else
{
state._serverErrorCode = errorCode;
+ DrainAcceptQueue(errorCode, -1);
}
}
}
}
+ private void DrainAcceptQueue(long outboundErrorCode, long inboundErrorCode)
+ {
+ ConnectionState? state = _state;
+ if (state is not null)
+ {
+ // TODO: We really only need to do the complete and drain once, but it doesn't really hurt to do it twice.
+ state._clientInitiatedStreamChannel.Writer.TryComplete();
+ while (state._clientInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState))
+ {
+ streamState._outboundReadErrorCode = streamState._outboundWriteErrorCode = outboundErrorCode;
+ streamState._inboundStreamBuffer?.AbortRead();
+ streamState._outboundStreamBuffer?.EndWrite();
+ }
+
+ state._serverInitiatedStreamChannel.Writer.TryComplete();
+ while (state._serverInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState))
+ {
+ streamState._inboundReadErrorCode = streamState._inboundWriteErrorCode = inboundErrorCode;
+ streamState._outboundStreamBuffer?.AbortRead();
+ streamState._inboundStreamBuffer?.EndWrite();
+ }
+ }
+ }
+
private void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
- ConnectionState? state = _state;
- if (state is not null)
- {
- Channel<MockStream.StreamState> streamChannel = _isClient ? state._clientInitiatedStreamChannel : state._serverInitiatedStreamChannel;
- streamChannel.Writer.Complete();
- }
-
+ DrainAcceptQueue(-1, -1);
PeerStreamLimit? streamLimit = LocalStreamLimit;
if (streamLimit is not null)
_applicationProtocol = applicationProtocol;
_clientInitiatedStreamChannel = Channel.CreateUnbounded<MockStream.StreamState>();
_serverInitiatedStreamChannel = Channel.CreateUnbounded<MockStream.StreamState>();
+ _clientErrorCode = _serverErrorCode = -1;
}
}
}
Assert.Equal(ApplicationProtocol.ToString(), serverConnection.NegotiatedApplicationProtocol.ToString());
}
+ private static async Task<QuicStream> OpenAndUseStreamAsync(QuicConnection c)
+ {
+ QuicStream s = c.OpenBidirectionalStream();
+
+ // This will pend
+ await s.ReadAsync(new byte[1]);
+
+ return s;
+ }
+
[Fact]
[ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
- public async Task AcceptStream_ConnectionAborted_ByClient_Throws()
+ public async Task CloseAsync_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException()
{
using var sync = new SemaphoreSlim(0);
await RunClientServer(
async clientConnection =>
{
- await clientConnection.CloseAsync(ExpectedErrorCode);
+ await sync.WaitAsync();
+ },
+ async serverConnection =>
+ {
+ // Pend operations before the client closes.
+ Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
+ Assert.False(acceptTask.IsCompleted);
+ Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
+ Assert.False(connectTask.IsCompleted);
+
+ await serverConnection.CloseAsync(ExpectedErrorCode);
+
sync.Release();
+
+ // Pending ops should fail
+ await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask);
+ await Assert.ThrowsAsync<QuicOperationAbortedException>(() => connectTask);
+
+ // Subsequent attempts should fail
+ // TODO: Which exception is correct?
+ if (IsMockProvider)
+ {
+ await Assert.ThrowsAsync<ObjectDisposedException>(async () => await serverConnection.AcceptStreamAsync());
+ await Assert.ThrowsAsync<ObjectDisposedException>(async () => await OpenAndUseStreamAsync(serverConnection));
+ }
+ else
+ {
+ await Assert.ThrowsAsync<QuicOperationAbortedException>(async () => await serverConnection.AcceptStreamAsync());
+
+ // TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133
+ // MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE
+ //await Assert.ThrowsAsync<QuicOperationAbortedException>(async () => await OpenAndUseStreamAsync(serverConnection));
+ await Assert.ThrowsAsync<QuicException>(() => OpenAndUseStreamAsync(serverConnection));
+ }
+ });
+ }
+
+ [Fact]
+ [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
+ public async Task Dispose_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException()
+ {
+ using var sync = new SemaphoreSlim(0);
+
+ await RunClientServer(
+ async clientConnection =>
+ {
+ await sync.WaitAsync();
},
async serverConnection =>
{
+ // Pend operations before the client closes.
+ Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
+ Assert.False(acceptTask.IsCompleted);
+ Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
+ Assert.False(connectTask.IsCompleted);
+
+ serverConnection.Dispose();
+
+ sync.Release();
+
+ // Pending ops should fail
+ await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask);
+ await Assert.ThrowsAsync<QuicOperationAbortedException>(() => connectTask);
+
+ // Subsequent attempts should fail
+ // TODO: Should these be QuicOperationAbortedException, to match above? Or vice-versa?
+ await Assert.ThrowsAsync<ObjectDisposedException>(async () => await serverConnection.AcceptStreamAsync());
+ await Assert.ThrowsAsync<ObjectDisposedException>(async () => await OpenAndUseStreamAsync(serverConnection));
+ });
+ }
+
+ [Fact]
+ [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
+ public async Task ConnectionClosedByPeer_WithPendingAcceptAndConnect_PendingAndSubsequentThrowConnectionAbortedException()
+ {
+ if (IsMockProvider)
+ {
+ return;
+ }
+
+ using var sync = new SemaphoreSlim(0);
+
+ await RunClientServer(
+ async clientConnection =>
+ {
await sync.WaitAsync();
- QuicConnectionAbortedException ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverConnection.AcceptStreamAsync().AsTask());
+
+ await clientConnection.CloseAsync(ExpectedErrorCode);
+ },
+ async serverConnection =>
+ {
+ // Pend operations before the client closes.
+ Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
+ Assert.False(acceptTask.IsCompleted);
+ Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
+ Assert.False(connectTask.IsCompleted);
+
+ sync.Release();
+
+ // Pending ops should fail
+ QuicConnectionAbortedException ex;
+
+ ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => acceptTask);
+ Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+ ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => connectTask);
+ Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+
+ // Subsequent attempts should fail
+ ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverConnection.AcceptStreamAsync().AsTask());
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+ // TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133
+ // MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE
+ if (IsMsQuicProvider)
+ {
+ await Assert.ThrowsAsync<QuicException>(() => OpenAndUseStreamAsync(serverConnection));
+ }
+ else
+ {
+ ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => OpenAndUseStreamAsync(serverConnection));
+ Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+ }
});
}
[InlineData(10)]
public async Task CloseAsync_WithOpenStream_LocalAndPeerStreamsFailWithQuicOperationAbortedException(int writesBeforeClose)
{
- if (typeof(T) == typeof(MockProviderFactory))
+ if (IsMockProvider)
{
return;
}
[InlineData(10)]
public async Task Dispose_WithOpenLocalStream_LocalStreamFailsWithQuicOperationAbortedException(int writesBeforeClose)
{
- if (typeof(T) == typeof(MockProviderFactory))
+ if (IsMockProvider)
{
return;
}