var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
- reg = cancellationToken.Register(state => ((StreamPipeReader)state).Cancel(), this);
+ reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
}
using (reg)
Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);
- int length = await InnerStream.ReadAsync(buffer, tokenSource.Token);
+ int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);
{
public class CopyToAsyncTests
{
- private static readonly PipeOptions s_testOptions = new PipeOptions(readerScheduler: PipeScheduler.Inline);
+ private static readonly PipeOptions s_testOptions = new PipeOptions(readerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
[Fact]
public async Task CopyToAsyncThrowsArgumentNullExceptionForNullDestination()
{
using (var pool = new TestMemoryPool())
{
- var pipe = new Pipe(new PipeOptions(pool: pool, readerScheduler: PipeScheduler.Inline));
+ var pipe = new Pipe(s_testOptions);
pipe.Writer.WriteEmpty(4096);
pipe.Writer.WriteEmpty(4096);
pipe.Writer.WriteEmpty(4096);
[Fact]
public async Task CanReadMultipleTimes()
{
- static async Task DoAsyncRead(PipeReader reader, int[] bufferSizes)
+ // This needs to run inline to synchronize the reader and writer
+ TaskCompletionSource<object> waitForRead = null;
+
+ async Task DoAsyncRead(PipeReader reader, int[] bufferSizes)
{
var index = 0;
while (true)
{
- ReadResult readResult = await reader.ReadAsync();
+ ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false);
if (readResult.IsCompleted)
{
Assert.Equal(bufferSizes[index], readResult.Buffer.Length);
reader.AdvanceTo(readResult.Buffer.End);
index++;
+ waitForRead?.TrySetResult(null);
}
reader.Complete();
}
- static async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes)
+ async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes)
{
for (int i = 0; i < bufferSizes.Length; i++)
{
writer.WriteEmpty(bufferSizes[i]);
- await writer.FlushAsync();
+ waitForRead = new TaskCompletionSource<object>();
+ await writer.FlushAsync().ConfigureAwait(false);
+ await waitForRead.Task;
}
writer.Complete();
}
// We're using the pipe here as a way to pump bytes into the reader asynchronously
- var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.Inline));
+ var pipe = new Pipe();
var options = new StreamPipeReaderOptions(bufferSize: 4096);
PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);
await readingTask;
await writingTask;
+
+ pipe.Reader.Complete();
}
[Theory]
reader.AdvanceTo(readResult.Buffer.End);
reader.Complete();
+
+ pipe.Writer.Complete();
}
[Fact]