make Quic AcceptStreamAsync concurrent safe (#56768)
authorTomas Weinfurt <tweinfurt@yahoo.com>
Wed, 4 Aug 2021 05:06:00 +0000 (22:06 -0700)
committerGitHub <noreply@github.com>
Wed, 4 Aug 2021 05:06:00 +0000 (22:06 -0700)
* make AcceptStreamAsync concurrent safe

* feedback from review

src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs
src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs

index 82638fc..b3cbb93 100644 (file)
@@ -74,7 +74,6 @@ namespace System.Net.Quic.Implementations.MsQuic
             // Backlog limit is managed by MsQuic so it can be unbounded here.
             public readonly Channel<MsQuicStream> AcceptQueue = Channel.CreateUnbounded<MsQuicStream>(new UnboundedChannelOptions()
             {
-                SingleReader = true,
                 SingleWriter = true,
             });
 
index d350c86..438f836 100644 (file)
@@ -160,6 +160,35 @@ namespace System.Net.Quic.Tests
         }
 
         [Fact]
+        public async Task MultipleConcurrentStreamsOnSingleConnection()
+        {
+            const int count = 100;
+            Task[] tasks = new Task[count];
+
+            (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection();
+            using (clientConnection)
+            using (serverConnection)
+            {
+                for (int i = 0; i < count; i++)
+                {
+                    tasks[i] = MakeStreams(clientConnection, serverConnection);
+                }
+                await tasks.WhenAllOrAnyFailed(PassingTestTimeoutMilliseconds);
+            }
+
+            static async Task MakeStreams(QuicConnection clientConnection, QuicConnection serverConnection)
+            {
+                byte[] buffer = new byte[64];
+                QuicStream clientStream = clientConnection.OpenBidirectionalStream();
+                ValueTask writeTask = clientStream.WriteAsync(Encoding.UTF8.GetBytes("PING"), endStream: true);
+                ValueTask<QuicStream> acceptTask = serverConnection.AcceptStreamAsync();
+                await new Task[] { writeTask.AsTask(), acceptTask.AsTask()}.WhenAllOrAnyFailed(PassingTestTimeoutMilliseconds);
+                QuicStream serverStream = acceptTask.Result;
+                await serverStream.ReadAsync(buffer);
+            }
+        }
+
+        [Fact]
         public async Task GetStreamIdWithoutStartWorks()
         {
             using QuicListener listener = CreateQuicListener();