Replace ReadAsyncCompletesIfFlushAsyncCanceledMidFlush test (dotnet/corefx#32360)
authorPavel Krymets <pavel@krymets.com>
Thu, 20 Sep 2018 18:14:06 +0000 (11:14 -0700)
committerGitHub <noreply@github.com>
Thu, 20 Sep 2018 18:14:06 +0000 (11:14 -0700)
Commit migrated from https://github.com/dotnet/corefx/commit/3bec0c8759806ffbabe5d415bf2fb8abd6832aa4

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs
src/libraries/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs

index a3a897c..6f04874 100644 (file)
@@ -350,6 +350,8 @@ namespace System.IO.Pipelines
                 }
 
                 // Complete reader only if new data was pushed into the pipe
+                // Avoid throwing in between completing the reader and scheduling the callback
+                // if the intent is to allow pipe to continue reading the data
                 if (!wasEmpty)
                 {
                     _readerAwaitable.Complete(out completionData);
index ba44118..12a262c 100644 (file)
@@ -67,7 +67,7 @@ namespace System.IO.Pipelines
             _completionState = null;
 
             completionData = default;
-            
+
             if (!ReferenceEquals(currentCompletion, s_awaitableIsCompleted) &&
                 !ReferenceEquals(currentCompletion, s_awaitableIsNotCompleted))
             {
index 04a738f..361a3cb 100644 (file)
@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Buffers;
+using System.Diagnostics;
 using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
@@ -327,82 +328,25 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush()
+        public async Task ReadAsyncReturnsDataAfterItWasWrittenDuringCancelledRead()
         {
-            // This test tries to get pipe into a state where ReadAsync is being awaited
-            // and FlushAsync is cancelled while the method is running
-            Pipe = new Pipe();
-            var resetEvent = new ManualResetEvent(false);
-            var iterations = 0;
-            var cancellations = 0;
-            var cancellationTokenSource = new CancellationTokenSource();
-            var writer = Task.Run(async () =>
-            {
-                // We are limiting iteration count because on slower machines we are not able to
-                // reproduce race conditions enough times during the test
-                while (cancellations < 20 && iterations < 2_000_000)
-                {
-                    iterations++;
-                    try
-                    {
-                        // We want reader to be awaiting
-                        resetEvent.WaitOne();
-                        Pipe.Writer.WriteEmpty(1);
-
-                        // We want the token to be cancelled during FlushAsync call
-                        // check it it's already cancelled and try a new one
-                        if (cancellationTokenSource.Token.IsCancellationRequested)
-                        {
-                            cancellationTokenSource = new CancellationTokenSource();
-                            continue;
-                        }
-                        await Pipe.Writer.FlushAsync(cancellationTokenSource.Token);
-                    }
-                    catch (OperationCanceledException)
-                    {
-                        cancellations ++;
-                    }
-                }
-
-                Pipe.Writer.Complete();
-                return;
-            });
-
-            var reader = Task.Run(async () =>
-            {
-                while (true)
+            ValueTask<ReadResult> readTask = Pipe.Reader.ReadAsync();
+            ValueTaskAwaiter<ReadResult> awaiter = readTask.GetAwaiter();
+            ReadResult result = default;
+            awaiter.OnCompleted(
+                () =>
                 {
-                    var readTask = Pipe.Reader.ReadAsync();
-
-                    // Signal writer to initiate a flush
-                    if (!readTask.IsCompleted || readTask.Result.IsCompleted)
-                    {
-                        resetEvent.Set();
-                    }
-
-                    var result = await readTask;
-                    if (result.Buffer.IsEmpty)
-                    {
-                        return;
-                    }
-
-                    resetEvent.Reset();
+                    Pipe.Writer.WriteAsync(new byte[] { 1 }).AsTask().Wait();
+                    result = awaiter.GetResult();
+                });
 
-                    Pipe.Reader.AdvanceTo(result.Buffer.End);
-                }
-            });
+            Pipe.Reader.CancelPendingRead();
 
-            var canceller = Task.Run(() =>
-            {
-                while (!writer.IsCompleted)
-                {
-                    resetEvent.WaitOne();
-                    cancellationTokenSource.Cancel();
-                }
-            });
+            Assert.True(result.IsCanceled);
 
-            Assert.True(Task.WaitAll(new [] { writer, reader, canceller }, TimeSpan.FromMinutes(5)), "Reader was not completed in 5 minutes");
-            Assert.True(cancellations > 0);
+            result = await Pipe.Reader.ReadAsync();
+            Assert.False(result.IsCanceled);
+            Assert.Equal(new byte[] { 1 }, result.Buffer.ToArray());
         }
     }