// See the LICENSE file in the project root for more information.
using System.Buffers;
+using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
}
[Fact]
- public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush()
+ public async Task ReadAsyncReturnsDataAfterItWasWrittenDuringCancelledRead()
{
- // This test tries to get pipe into a state where ReadAsync is being awaited
- // and FlushAsync is cancelled while the method is running
- Pipe = new Pipe();
- var resetEvent = new ManualResetEvent(false);
- var iterations = 0;
- var cancellations = 0;
- var cancellationTokenSource = new CancellationTokenSource();
- var writer = Task.Run(async () =>
- {
- // We are limiting iteration count because on slower machines we are not able to
- // reproduce race conditions enough times during the test
- while (cancellations < 20 && iterations < 2_000_000)
- {
- iterations++;
- try
- {
- // We want reader to be awaiting
- resetEvent.WaitOne();
- Pipe.Writer.WriteEmpty(1);
-
- // We want the token to be cancelled during FlushAsync call
- // check it it's already cancelled and try a new one
- if (cancellationTokenSource.Token.IsCancellationRequested)
- {
- cancellationTokenSource = new CancellationTokenSource();
- continue;
- }
- await Pipe.Writer.FlushAsync(cancellationTokenSource.Token);
- }
- catch (OperationCanceledException)
- {
- cancellations ++;
- }
- }
-
- Pipe.Writer.Complete();
- return;
- });
-
- var reader = Task.Run(async () =>
- {
- while (true)
+ ValueTask<ReadResult> readTask = Pipe.Reader.ReadAsync();
+ ValueTaskAwaiter<ReadResult> awaiter = readTask.GetAwaiter();
+ ReadResult result = default;
+ awaiter.OnCompleted(
+ () =>
{
- var readTask = Pipe.Reader.ReadAsync();
-
- // Signal writer to initiate a flush
- if (!readTask.IsCompleted || readTask.Result.IsCompleted)
- {
- resetEvent.Set();
- }
-
- var result = await readTask;
- if (result.Buffer.IsEmpty)
- {
- return;
- }
-
- resetEvent.Reset();
+ Pipe.Writer.WriteAsync(new byte[] { 1 }).AsTask().Wait();
+ result = awaiter.GetResult();
+ });
- Pipe.Reader.AdvanceTo(result.Buffer.End);
- }
- });
+ Pipe.Reader.CancelPendingRead();
- var canceller = Task.Run(() =>
- {
- while (!writer.IsCompleted)
- {
- resetEvent.WaitOne();
- cancellationTokenSource.Cancel();
- }
- });
+ Assert.True(result.IsCanceled);
- Assert.True(Task.WaitAll(new [] { writer, reader, canceller }, TimeSpan.FromMinutes(5)), "Reader was not completed in 5 minutes");
- Assert.True(cancellations > 0);
+ result = await Pipe.Reader.ReadAsync();
+ Assert.False(result.IsCanceled);
+ Assert.Equal(new byte[] { 1 }, result.Buffer.ToArray());
}
}