ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}
+ // If the reader is completed we no-op Advance but leave GetMemory and FlushAsync alone
+ if (_readerCompletion.IsCompleted)
+ {
+ return;
+ }
+
AdvanceCore(bytes);
}
}
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
+ if (_readerCompletion.IsCompleted)
+ {
+ return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
+ }
+
CompletionData completionData;
ValueTask<FlushResult> result;
await task;
}
+
+ [Fact]
+ public async Task WriteAsyncWithACompletedReaderNoops()
+ {
+ var pool = new DisposeTrackingBufferPool();
+ var pipe = new Pipe(new PipeOptions(pool));
+ pipe.Reader.Complete();
+
+ byte[] writeBuffer = new byte[100];
+ for (var i = 0; i < 10000; i++)
+ {
+ await pipe.Writer.WriteAsync(writeBuffer);
+ }
+
+ Assert.Equal(0, pool.CurrentlyRentedBlocks);
+ }
+
+ [Fact]
+ public async Task GetMemoryFlushWithACompletedReaderNoops()
+ {
+ var pool = new DisposeTrackingBufferPool();
+ var pipe = new Pipe(new PipeOptions(pool));
+ pipe.Reader.Complete();
+
+ for (var i = 0; i < 10000; i++)
+ {
+ var mem = pipe.Writer.GetMemory();
+ pipe.Writer.Advance(mem.Length);
+ await pipe.Writer.FlushAsync(default);
+ }
+
+ Assert.Equal(1, pool.CurrentlyRentedBlocks);
+ pipe.Writer.Complete();
+ Assert.Equal(0, pool.CurrentlyRentedBlocks);
+ }
}
}