{
ClearCancellationToken();
- if (cancellationToken.IsCancellationRequested)
+ if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+ {
+ // Catch cancellation and translate it into setting isCanceled = true
+ isCanceled = true;
+ }
+ else
{
throw;
}
- isCanceled = true;
}
return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
_internalTokenSource = null;
}
- if (cancellationToken.IsCancellationRequested)
+ if (localToken.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
- throw;
+ // Catch cancellation and translate it into setting isCanceled = true
+ return new FlushResult(isCanceled: true, isCompleted: false);
}
- // Catch any cancellation and translate it into setting isCanceled = true
- return new FlushResult(isCanceled: true, isCompleted: false);
+ throw;
}
}
}
[Fact]
public async Task ThrowsOnReadAfterCompleteReader()
{
- var reader = PipeReader.Create(Stream.Null);
+ PipeReader reader = PipeReader.Create(Stream.Null);
reader.Complete();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await reader.ReadAsync());
[Fact]
public void TryReadAfterCancelPendingReadReturnsTrue()
{
- var reader = PipeReader.Create(Stream.Null);
+ PipeReader reader = PipeReader.Create(Stream.Null);
reader.CancelPendingRead();
}
[Fact]
- public void OnWriterCompletedThrowsNotSupportedException()
+ public void OnWriterCompletedNoops()
{
bool fired = false;
PipeReader reader = PipeReader.Create(Stream.Null);
public void LeaveUnderlyingStreamOpen()
{
var stream = new MemoryStream();
- var reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
+ PipeReader reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
reader.Complete();
Assert.True(stream.CanRead);
}
+ [Fact]
+ public async Task OperationCancelledExceptionNotSwallowedIfNotThrownFromSpecifiedToken()
+ {
+ PipeReader reader = PipeReader.Create(new ThrowsOperationCanceledExceptionStream());
+
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await reader.ReadAsync());
+ }
+
private static async Task<string> ReadFromPipeAsString(PipeReader reader)
{
ReadResult readResult = await reader.ReadAsync();
return new object[] { bytesInBuffer, bufferSize, minimumReadSize, readSizes };
}
+ private class ThrowsOperationCanceledExceptionStream : ReadOnlyStream
+ {
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ throw new OperationCanceledException();
+ }
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ throw new OperationCanceledException();
+ }
+#if netcoreapp
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ throw new OperationCanceledException();
+ }
+#endif
+ }
+
private class ThrowAfterZeroByteReadStream : MemoryStream
{
public ThrowAfterZeroByteReadStream()
}
[Fact]
- public void OnReaderCompletedThrowsNotSupported()
+ public void OnReaderCompletedNoops()
{
bool fired = false;
PipeWriter writer = PipeWriter.Create(Stream.Null);
public void LeaveUnderlyingStreamOpen()
{
var stream = new MemoryStream();
- var writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));
+ PipeWriter writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));
writer.Complete();
Assert.True(stream.CanRead);
}
+ [Fact]
+ public async Task OperationCancelledExceptionNotSwallowedIfNotThrownFromSpecifiedToken()
+ {
+ PipeWriter writer = PipeWriter.Create(new ThrowsOperationCanceledExceptionStream());
+
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await writer.WriteAsync(new byte[1]));
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await writer.FlushAsync());
+ }
+
+ private class ThrowsOperationCanceledExceptionStream : WriteOnlyStream
+ {
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new OperationCanceledException();
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ throw new OperationCanceledException();
+ }
+
+ public override Task FlushAsync(CancellationToken cancellationToken)
+ {
+ throw new OperationCanceledException();
+ }
+
+#if netcoreapp
+ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ throw new OperationCanceledException();
+ }
+#endif
+ }
+
private class FlushAsyncAwareStream : WriteOnlyStream
{
public bool FlushAsyncCalled { get; set; }