public void CompletingReaderFromWriterCallbackWorks()
{
var callbackRan = false;
- var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5));
+ var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted((exception, state) => { pipe.Writer.Complete(); }, null);
public void CompletingWriterFromReaderCallbackWorks()
{
var callbackRan = false;
- var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5));
+ var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted((exception, state) => { pipe.Reader.Complete(); }, null);
var counter = 0;
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState1, state);
Assert.Equal(0, counter);
counter++;
}, callbackState1);
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState2, state);
Assert.Equal(1, counter);
counter++;
{
var exception = new Exception();
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted((e, state) => throw exception, null);
pipe.Reader.Complete();
{
var callbackRan = false;
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
pipe.Reader.Complete();
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Null(exception);
callbackRan = true;
}, null);
{
var callbackRan = false;
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted((exception, state) => { callbackRan = true; }, null);
pipe.Reader.Complete();
public void OnReaderCompletedPassesException()
{
var callbackRan = false;
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
var readerException = new Exception();
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
callbackRan = true;
Assert.Same(readerException, exception);
}, null);
{
var callbackRan = false;
var callbackState = new object();
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState, state);
callbackRan = true;
}, callbackState);
{
var callbackRan = false;
var continuationRan = false;
- var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5));
+ var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.False(continuationRan);
callbackRan = true;
}, null);
var callbackState3 = new object();
var counter = 0;
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState1, state);
Assert.Equal(0, counter);
counter++;
}, callbackState1);
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState2, state);
Assert.Equal(1, counter);
counter++;
}, callbackState2);
pipe.Writer.OnReaderCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState3, state);
Assert.Equal(2, counter);
counter++;
[Fact]
public void OnReaderCompletedThrowsWithNullCallback()
{
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
Assert.Throws<ArgumentNullException>(() => pipe.Writer.OnReaderCompleted(null, null));
}
{
var callbackRan = false;
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
pipe.Writer.OnReaderCompleted((exception, state) => { callbackRan = true; }, null);
pipe.Reader.Complete();
var counter = 0;
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState1, state);
Assert.Equal(0, counter);
counter++;
}, callbackState1);
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState2, state);
Assert.Equal(1, counter);
counter++;
{
var exception = new Exception();
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted((e, state) => throw exception, null);
pipe.Writer.Complete();
{
var callbackRan = false;
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
pipe.Writer.Complete();
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Null(exception);
callbackRan = true;
}, null);
{
var callbackRan = false;
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted((exception, state) => { callbackRan = true; }, null);
pipe.Reader.Complete();
pipe.Writer.Complete();
public void OnWriterCompletedPassesException()
{
var callbackRan = false;
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
var readerException = new Exception();
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
callbackRan = true;
Assert.Same(readerException, exception);
}, null);
{
var callbackRan = false;
var callbackState = new object();
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState, state);
callbackRan = true;
}, callbackState);
{
var callbackRan = false;
var continuationRan = false;
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
callbackRan = true;
Assert.False(continuationRan);
}, null);
var callbackState3 = new object();
var counter = 0;
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState1, state);
Assert.Equal(0, counter);
counter++;
}, callbackState1);
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState2, state);
Assert.Equal(1, counter);
counter++;
}, callbackState2);
pipe.Reader.OnWriterCompleted(
- (exception, state) => {
+ (exception, state) =>
+ {
Assert.Equal(callbackState3, state);
Assert.Equal(2, counter);
counter++;
[Fact]
public void OnWriterCompletedThrowsWithNullCallback()
{
- var pipe = new Pipe(new PipeOptions(_pool));
+ var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
Assert.Throws<ArgumentNullException>(() => pipe.Reader.OnWriterCompleted(null, null));
}
{
var callbackRan = false;
var scheduler = new TestScheduler();
- var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+ var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
pipe.Reader.OnWriterCompleted((exception, state) => { callbackRan = true; }, null);
pipe.Writer.Complete();
var writeSize = 512;
- var pipe = new Pipe(new PipeOptions(pool));
+ var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
while (pool.CurrentlyRentedBlocks != 3)
{
PipeWriter writableBuffer = pipe.Writer.WriteEmpty(writeSize);
var writeSize = 512;
- var pipe = new Pipe(new PipeOptions(pool));
+ var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
// Write two blocks
Memory<byte> buffer = pipe.Writer.GetMemory(writeSize);
{
var pool = new DisposeTrackingBufferPool();
- var readerWriter = new Pipe(new PipeOptions(pool));
+ var readerWriter = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
await readerWriter.Writer.WriteAsync(new byte[] { 1 });
readerWriter.Writer.Complete();
var pool = new DisposeTrackingBufferPool();
var writeSize = 512;
- var pipe = new Pipe(new PipeOptions(pool, minimumSegmentSize: 2020));
+ var pipe = new Pipe(new PipeOptions(pool, minimumSegmentSize: 2020, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
Memory<byte> buffer = pipe.Writer.GetMemory(writeSize);
int allocatedSize = buffer.Length;
public void ReturnsWriteHeadOnComplete()
{
var pool = new DisposeTrackingBufferPool();
- var pipe = new Pipe(new PipeOptions(pool));
+ var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
var memory = pipe.Writer.GetMemory(512);
pipe.Reader.Complete();
public void ReturnsWriteHeadWhenRequestingLargerBlock()
{
var pool = new DisposeTrackingBufferPool();
- var pipe = new Pipe(new PipeOptions(pool));
+ var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
var memory = pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(4096);
var writeSize = 512;
- var pipe = new Pipe(new PipeOptions(pool));
+ var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
await pipe.Writer.WriteAsync(new byte[writeSize]);
pipe.Writer.GetMemory(writeSize);
}
[Fact]
- public async Task DefaultReaderSchedulerRunsInline()
+ public async Task DefaultReaderSchedulerRunsOnThreadPool()
{
var pipe = new Pipe();
var id = 0;
- Func<Task> doRead = async () => {
+ Func<Task> doRead = async () =>
+ {
ReadResult result = await pipe.Reader.ReadAsync();
- Assert.Equal(Thread.CurrentThread.ManagedThreadId, id);
+ Assert.True(Thread.CurrentThread.IsThreadPoolThread);
pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.End);
}
[Fact]
- public async Task DefaultWriterSchedulerRunsInline()
+ public async Task DefaultWriterSchedulerRunsOnThreadPool()
{
using (var pool = new TestMemoryPool())
{
var id = 0;
- Func<Task> doWrite = async () => {
+ Func<Task> doWrite = async () =>
+ {
await flushAsync;
pipe.Writer.Complete();
- Assert.Equal(Thread.CurrentThread.ManagedThreadId, id);
+ Assert.True(Thread.CurrentThread.IsThreadPoolThread);
};
Task writing = doWrite();
pool,
resumeWriterThreshold: 32,
pauseWriterThreshold: 64,
+ readerScheduler: PipeScheduler.Inline,
writerScheduler: scheduler));
PipeWriter writableBuffer = pipe.Writer.WriteEmpty(64);
Assert.False(flushAsync.IsCompleted);
- Func<Task> doWrite = async () => {
+ Func<Task> doWrite = async () =>
+ {
int oid = Thread.CurrentThread.ManagedThreadId;
await flushAsync;
{
using (var scheduler = new ThreadScheduler())
{
- var pipe = new Pipe(new PipeOptions(pool, scheduler));
+ var pipe = new Pipe(new PipeOptions(pool, scheduler, writerScheduler: PipeScheduler.Inline));
- Func<Task> doRead = async () => {
+ Func<Task> doRead = async () =>
+ {
int oid = Thread.CurrentThread.ManagedThreadId;
ReadResult result = await pipe.Reader.ReadAsync();
[Fact]
public async Task ThreadPoolScheduler_SchedulesOnThreadPool()
{
- var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.ThreadPool));
+ var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.ThreadPool, writerScheduler: PipeScheduler.Inline));
async Task DoRead()
{