HTTP3: Fix issue with GOAWAY handling and implement graceful shutdown logic in Http3L...
authorGeoff Kizer <geoffrek@microsoft.com>
Sat, 24 Jul 2021 21:06:10 +0000 (14:06 -0700)
committerGitHub <noreply@github.com>
Sat, 24 Jul 2021 21:06:10 +0000 (14:06 -0700)
* add IsMockProvider/IsMsQuicProvider on QuicTestBase

* add tests for AcceptStreamAsync and Open*Stream when/after the connection is closed/disposed, and related product fixes

* fix issue with GOAWAY handling and add graceful shutdown logic to Http3LoopbackConnection

* PR feedback

Co-authored-by: Geoffrey Kizer <geoffrek@windows.microsoft.com>
src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs
src/libraries/Common/tests/System/Net/Http/Http3LoopbackStream.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs
src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs
src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockStream.cs
src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs
src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs

index 555a548..ca53bbe 100644 (file)
@@ -189,13 +189,36 @@ namespace System.Net.Test.Common
 
             await stream.SendResponseAsync(statusCode, headers, content).ConfigureAwait(false);
 
-            // closing the connection here causes bytes written to streams to go missing.
-            // Regardless, we told the client we are closing so it shouldn't matter -- they should not use this connection anymore.
-            //await CloseAsync(H3_NO_ERROR).ConfigureAwait(false);
+            await WaitForClientDisconnectAsync();
 
             return request;
         }
 
+        // Wait for the client to close the connection, e.g. after we send a GOAWAY, or after the HttpClient is disposed.
+        public async Task WaitForClientDisconnectAsync()
+        {
+            while (true)
+            {
+                Http3LoopbackStream stream;
+
+                try
+                {
+                    stream = await AcceptRequestStreamAsync().ConfigureAwait(false);
+                }
+                catch (QuicConnectionAbortedException abortException) when (abortException.ErrorCode == H3_NO_ERROR)
+                {
+                    break;
+                }
+
+                using (stream)
+                {
+                    await stream.AbortAndWaitForShutdownAsync(H3_REQUEST_REJECTED);
+                }
+            }
+
+            await CloseAsync(H3_NO_ERROR);
+        }
+
         public override async Task WaitForCancellationAsync(bool ignoreIncomingData = true, int requestId = 0)
         {
             await GetOpenRequest(requestId).WaitForCancellationAsync(ignoreIncomingData).ConfigureAwait(false);
index b93807a..a3d2c9a 100644 (file)
@@ -359,6 +359,13 @@ namespace System.Net.Test.Common
             }
         }
 
+        public async Task AbortAndWaitForShutdownAsync(long errorCode)
+        {
+            _stream.AbortRead(errorCode);
+            _stream.AbortWrite(errorCode);
+            await _stream.ShutdownCompleted();
+        }
+
         public async Task<(long? frameType, byte[] payload)> ReadFrameAsync()
         {
             long? frameType = await ReadIntegerAsync().ConfigureAwait(false);
index d3e6a90..6b2012d 100644 (file)
@@ -283,7 +283,7 @@ namespace System.Net.Http
 
             lock (SyncObj)
             {
-                if (lastProcessedStreamId > _lastProcessedStreamId)
+                if (_lastProcessedStreamId != -1 && lastProcessedStreamId > _lastProcessedStreamId)
                 {
                     // Server can send multiple GOAWAY frames.
                     // Spec says a server MUST NOT increase the stream ID in subsequent GOAWAYs,
index 3c1f232..1e8fd20 100644 (file)
@@ -219,6 +219,8 @@ namespace System.Net.Quic.Implementations.Mock
 
         internal MockStream OpenStream(long streamId, bool bidirectional)
         {
+            CheckDisposed();
+
             ConnectionState? state = _state;
             if (state is null)
             {
@@ -274,12 +276,15 @@ namespace System.Net.Quic.Implementations.Mock
             catch (ChannelClosedException)
             {
                 long errorCode = _isClient ? state._serverErrorCode : state._clientErrorCode;
-                throw new QuicConnectionAbortedException(errorCode);
+                throw (errorCode == -1) ? new QuicOperationAbortedException() : new QuicConnectionAbortedException(errorCode);
             }
         }
 
         internal override ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken = default)
         {
+            // TODO: We should abort local streams (and signal the peer to do likewise)
+            // Currently, we are not tracking the streams associated with this connection.
+
             ConnectionState? state = _state;
             if (state is not null)
             {
@@ -292,10 +297,12 @@ namespace System.Net.Quic.Implementations.Mock
                 if (_isClient)
                 {
                     state._clientErrorCode = errorCode;
+                    DrainAcceptQueue(-1, errorCode);
                 }
                 else
                 {
                     state._serverErrorCode = errorCode;
+                    DrainAcceptQueue(errorCode, -1);
                 }
             }
 
@@ -312,19 +319,37 @@ namespace System.Net.Quic.Implementations.Mock
             }
         }
 
+        private void DrainAcceptQueue(long outboundErrorCode, long inboundErrorCode)
+        {
+            ConnectionState? state = _state;
+            if (state is not null)
+            {
+                // TODO: We really only need to do the complete and drain once, but it doesn't really hurt to do it twice.
+                state._clientInitiatedStreamChannel.Writer.TryComplete();
+                while (state._clientInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState))
+                {
+                    streamState._outboundReadErrorCode = streamState._outboundWriteErrorCode = outboundErrorCode;
+                    streamState._inboundStreamBuffer?.AbortRead();
+                    streamState._outboundStreamBuffer?.EndWrite();
+                }
+
+                state._serverInitiatedStreamChannel.Writer.TryComplete();
+                while (state._serverInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState))
+                {
+                    streamState._inboundReadErrorCode = streamState._inboundWriteErrorCode = inboundErrorCode;
+                    streamState._outboundStreamBuffer?.AbortRead();
+                    streamState._inboundStreamBuffer?.EndWrite();
+                }
+            }
+        }
+
         private void Dispose(bool disposing)
         {
             if (!_disposed)
             {
                 if (disposing)
                 {
-                    ConnectionState? state = _state;
-                    if (state is not null)
-                    {
-                        Channel<MockStream.StreamState> streamChannel = _isClient ? state._clientInitiatedStreamChannel : state._serverInitiatedStreamChannel;
-                        streamChannel.Writer.Complete();
-                    }
-
+                    DrainAcceptQueue(-1, -1);
 
                     PeerStreamLimit? streamLimit = LocalStreamLimit;
                     if (streamLimit is not null)
@@ -448,6 +473,7 @@ namespace System.Net.Quic.Implementations.Mock
                 _applicationProtocol = applicationProtocol;
                 _clientInitiatedStreamChannel = Channel.CreateUnbounded<MockStream.StreamState>();
                 _serverInitiatedStreamChannel = Channel.CreateUnbounded<MockStream.StreamState>();
+                _clientErrorCode = _serverErrorCode = -1;
             }
         }
     }
index 313fc1e..fde0eab 100644 (file)
@@ -73,7 +73,7 @@ namespace System.Net.Quic.Implementations.Mock
                 long errorCode = _isInitiator ? _streamState._inboundReadErrorCode : _streamState._outboundReadErrorCode;
                 if (errorCode != 0)
                 {
-                    throw new QuicStreamAbortedException(errorCode);
+                    throw (errorCode == -1) ? new QuicOperationAbortedException() : new QuicStreamAbortedException(errorCode);
                 }
             }
 
index 9988961..b8c91f4 100644 (file)
@@ -37,23 +37,146 @@ namespace System.Net.Quic.Tests
             Assert.Equal(ApplicationProtocol.ToString(), serverConnection.NegotiatedApplicationProtocol.ToString());
         }
 
+        private static async Task<QuicStream> OpenAndUseStreamAsync(QuicConnection c)
+        {
+            QuicStream s = c.OpenBidirectionalStream();
+
+            // This will pend
+            await s.ReadAsync(new byte[1]);
+
+            return s;
+        }
+
         [Fact]
         [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
-        public async Task AcceptStream_ConnectionAborted_ByClient_Throws()
+        public async Task CloseAsync_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException()
         {
             using var sync = new SemaphoreSlim(0);
 
             await RunClientServer(
                 async clientConnection =>
                 {
-                    await clientConnection.CloseAsync(ExpectedErrorCode);
+                    await sync.WaitAsync();
+                },
+                async serverConnection =>
+                {
+                    // Pend operations before the client closes.
+                    Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
+                    Assert.False(acceptTask.IsCompleted);
+                    Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
+                    Assert.False(connectTask.IsCompleted);
+
+                    await serverConnection.CloseAsync(ExpectedErrorCode);
+
                     sync.Release();
+
+                    // Pending ops should fail
+                    await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask);
+                    await Assert.ThrowsAsync<QuicOperationAbortedException>(() => connectTask);
+
+                    // Subsequent attempts should fail
+                    // TODO: Which exception is correct?
+                    if (IsMockProvider)
+                    {
+                        await Assert.ThrowsAsync<ObjectDisposedException>(async () => await serverConnection.AcceptStreamAsync());
+                        await Assert.ThrowsAsync<ObjectDisposedException>(async () => await OpenAndUseStreamAsync(serverConnection));
+                    }
+                    else
+                    {
+                        await Assert.ThrowsAsync<QuicOperationAbortedException>(async () => await serverConnection.AcceptStreamAsync());
+
+                        // TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133
+                        // MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE
+                        //await Assert.ThrowsAsync<QuicOperationAbortedException>(async () => await OpenAndUseStreamAsync(serverConnection));
+                        await Assert.ThrowsAsync<QuicException>(() => OpenAndUseStreamAsync(serverConnection));
+                    }
+                });
+        }
+
+        [Fact]
+        [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
+        public async Task Dispose_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException()
+        {
+            using var sync = new SemaphoreSlim(0);
+
+            await RunClientServer(
+                async clientConnection =>
+                {
+                    await sync.WaitAsync();
                 },
                 async serverConnection =>
                 {
+                    // Pend operations before the client closes.
+                    Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
+                    Assert.False(acceptTask.IsCompleted);
+                    Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
+                    Assert.False(connectTask.IsCompleted);
+
+                    serverConnection.Dispose();
+
+                    sync.Release();
+
+                    // Pending ops should fail
+                    await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask);
+                    await Assert.ThrowsAsync<QuicOperationAbortedException>(() => connectTask);
+
+                    // Subsequent attempts should fail
+                    // TODO: Should these be QuicOperationAbortedException, to match above? Or vice-versa?
+                    await Assert.ThrowsAsync<ObjectDisposedException>(async () => await serverConnection.AcceptStreamAsync());
+                    await Assert.ThrowsAsync<ObjectDisposedException>(async () => await OpenAndUseStreamAsync(serverConnection));
+                });
+        }
+
+        [Fact]
+        [ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
+        public async Task ConnectionClosedByPeer_WithPendingAcceptAndConnect_PendingAndSubsequentThrowConnectionAbortedException()
+        {
+            if (IsMockProvider)
+            {
+                return;
+            }
+
+            using var sync = new SemaphoreSlim(0);
+
+            await RunClientServer(
+                async clientConnection =>
+                {
                     await sync.WaitAsync();
-                    QuicConnectionAbortedException ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverConnection.AcceptStreamAsync().AsTask());
+
+                    await clientConnection.CloseAsync(ExpectedErrorCode);
+                },
+                async serverConnection =>
+                {
+                    // Pend operations before the client closes.
+                    Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
+                    Assert.False(acceptTask.IsCompleted);
+                    Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
+                    Assert.False(connectTask.IsCompleted);
+
+                    sync.Release();
+
+                    // Pending ops should fail
+                    QuicConnectionAbortedException ex;
+
+                    ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => acceptTask);
+                    Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+                    ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => connectTask);
+                    Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+
+                    // Subsequent attempts should fail
+                    ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverConnection.AcceptStreamAsync().AsTask());
                     Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+                    // TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133
+                    // MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE
+                    if (IsMsQuicProvider)
+                    {
+                        await Assert.ThrowsAsync<QuicException>(() => OpenAndUseStreamAsync(serverConnection));
+                    }
+                    else
+                    {
+                        ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => OpenAndUseStreamAsync(serverConnection));
+                        Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
+                    }
                 });
         }
 
@@ -79,7 +202,7 @@ namespace System.Net.Quic.Tests
         [InlineData(10)]
         public async Task CloseAsync_WithOpenStream_LocalAndPeerStreamsFailWithQuicOperationAbortedException(int writesBeforeClose)
         {
-            if (typeof(T) == typeof(MockProviderFactory))
+            if (IsMockProvider)
             {
                 return;
             }
@@ -122,7 +245,7 @@ namespace System.Net.Quic.Tests
         [InlineData(10)]
         public async Task Dispose_WithOpenLocalStream_LocalStreamFailsWithQuicOperationAbortedException(int writesBeforeClose)
         {
-            if (typeof(T) == typeof(MockProviderFactory))
+            if (IsMockProvider)
             {
                 return;
             }
index 9a6e43a..24a653c 100644 (file)
@@ -23,6 +23,9 @@ namespace System.Net.Quic.Tests
         public static QuicImplementationProvider ImplementationProvider { get; } = s_factory.GetProvider();
         public static bool IsSupported => ImplementationProvider.IsSupported;
 
+        public static bool IsMockProvider => typeof(T) == typeof(MockProviderFactory);
+        public static bool IsMsQuicProvider => typeof(T) == typeof(MsQuicProviderFactory);
+
         public static SslApplicationProtocol ApplicationProtocol { get; } = new SslApplicationProtocol("quictest");
 
         public X509Certificate2 ServerCertificate = System.Net.Test.Common.Configuration.Certificates.GetServerCertificate();