From: David Fowler Date: Thu, 15 Feb 2018 18:03:02 +0000 (-0800) Subject: Made a few tweaks to pipelines (dotnet/corefx#27158) X-Git-Tag: submit/tizen/20210909.063632~11031^2~5493 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=087cad9b55535544f8b46c6175647bde2ccc2fb2;p=platform%2Fupstream%2Fdotnet%2Fruntime.git Made a few tweaks to pipelines (dotnet/corefx#27158) * Made a few tweaks to pipelines - Renamed the file containing the class IDuplexPipe - Changed the default scheduler to use the ThreadPool if none was specified Commit migrated from https://github.com/dotnet/corefx/commit/c2b5a8e3d6339a2c863b1cd48b8663273574fe48 --- diff --git a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj index 61a378d..3e89668 100644 --- a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/IPipeConnection.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/IDuplexPipe.cs similarity index 100% rename from src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/IPipeConnection.cs rename to src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/IDuplexPipe.cs diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 31f598a..cc06c46 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -94,8 +94,8 @@ namespace System.IO.Pipelines _minimumSegmentSize = options.MinimumSegmentSize; _pauseWriterThreshold = options.PauseWriterThreshold; _resumeWriterThreshold = options.ResumeWriterThreshold; - _readerScheduler = options.ReaderScheduler ?? PipeScheduler.Inline; - _writerScheduler = options.WriterScheduler ?? PipeScheduler.Inline; + _readerScheduler = options.ReaderScheduler; + _writerScheduler = options.WriterScheduler; _readerAwaitable = new PipeAwaitable(completed: false); _writerAwaitable = new PipeAwaitable(completed: true); _reader = new DefaultPipeReader(this); diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index 5ee5a80..dbc143a 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -38,8 +38,8 @@ namespace System.IO.Pipelines } Pool = pool ?? MemoryPool.Shared; - ReaderScheduler = readerScheduler; - WriterScheduler = writerScheduler; + ReaderScheduler = readerScheduler ?? PipeScheduler.ThreadPool; + WriterScheduler = writerScheduler ?? PipeScheduler.ThreadPool; PauseWriterThreshold = pauseWriterThreshold; ResumeWriterThreshold = resumeWriterThreshold; MinimumSegmentSize = minimumSegmentSize; diff --git a/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs b/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs index b0acf91..8812b66 100644 --- a/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs @@ -12,7 +12,7 @@ namespace System.IO.Pipelines.Tests public BackpressureTests() { _pool = new TestMemoryPool(); - _pipe = new Pipe(new PipeOptions(_pool, resumeWriterThreshold: 32, pauseWriterThreshold: 64)); + _pipe = new Pipe(new PipeOptions(_pool, resumeWriterThreshold: 32, pauseWriterThreshold: 64, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); } public void Dispose() diff --git a/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs index a9ab310..b81f052 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs @@ -49,7 +49,7 @@ namespace System.IO.Pipelines.Tests public void CompletingReaderFromWriterCallbackWorks() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted((exception, state) => { pipe.Writer.Complete(); }, null); @@ -63,7 +63,7 @@ namespace System.IO.Pipelines.Tests public void CompletingWriterFromReaderCallbackWorks() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted((exception, state) => { pipe.Reader.Complete(); }, null); @@ -83,9 +83,10 @@ namespace System.IO.Pipelines.Tests var counter = 0; - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState1, state); Assert.Equal(0, counter); counter++; @@ -93,7 +94,8 @@ namespace System.IO.Pipelines.Tests }, callbackState1); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState2, state); Assert.Equal(1, counter); counter++; @@ -111,7 +113,7 @@ namespace System.IO.Pipelines.Tests { var exception = new Exception(); var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted((e, state) => throw exception, null); pipe.Reader.Complete(); @@ -125,11 +127,12 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline)); pipe.Reader.Complete(); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.Null(exception); callbackRan = true; }, null); @@ -143,7 +146,7 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted((exception, state) => { callbackRan = true; }, null); pipe.Reader.Complete(); @@ -164,11 +167,12 @@ namespace System.IO.Pipelines.Tests public void OnReaderCompletedPassesException() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); var readerException = new Exception(); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { callbackRan = true; Assert.Same(readerException, exception); }, null); @@ -182,9 +186,10 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var callbackState = new object(); - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState, state); callbackRan = true; }, callbackState); @@ -198,10 +203,11 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var continuationRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.False(continuationRan); callbackRan = true; }, null); @@ -225,23 +231,26 @@ namespace System.IO.Pipelines.Tests var callbackState3 = new object(); var counter = 0; - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState1, state); Assert.Equal(0, counter); counter++; }, callbackState1); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState2, state); Assert.Equal(1, counter); counter++; }, callbackState2); pipe.Writer.OnReaderCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState3, state); Assert.Equal(2, counter); counter++; @@ -255,7 +264,7 @@ namespace System.IO.Pipelines.Tests [Fact] public void OnReaderCompletedThrowsWithNullCallback() { - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); Assert.Throws(() => pipe.Writer.OnReaderCompleted(null, null)); } @@ -265,7 +274,7 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline)); pipe.Writer.OnReaderCompleted((exception, state) => { callbackRan = true; }, null); pipe.Reader.Complete(); @@ -283,9 +292,10 @@ namespace System.IO.Pipelines.Tests var counter = 0; - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState1, state); Assert.Equal(0, counter); counter++; @@ -293,7 +303,8 @@ namespace System.IO.Pipelines.Tests }, callbackState1); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState2, state); Assert.Equal(1, counter); counter++; @@ -311,7 +322,7 @@ namespace System.IO.Pipelines.Tests { var exception = new Exception(); var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted((e, state) => throw exception, null); pipe.Writer.Complete(); @@ -325,11 +336,12 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline)); pipe.Writer.Complete(); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { Assert.Null(exception); callbackRan = true; }, null); @@ -343,7 +355,7 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted((exception, state) => { callbackRan = true; }, null); pipe.Reader.Complete(); pipe.Writer.Complete(); @@ -363,11 +375,12 @@ namespace System.IO.Pipelines.Tests public void OnWriterCompletedPassesException() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); var readerException = new Exception(); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { callbackRan = true; Assert.Same(readerException, exception); }, null); @@ -381,9 +394,10 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var callbackState = new object(); - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState, state); callbackRan = true; }, callbackState); @@ -397,10 +411,11 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var continuationRan = false; - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { callbackRan = true; Assert.False(continuationRan); }, null); @@ -421,23 +436,26 @@ namespace System.IO.Pipelines.Tests var callbackState3 = new object(); var counter = 0; - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState1, state); Assert.Equal(0, counter); counter++; }, callbackState1); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState2, state); Assert.Equal(1, counter); counter++; }, callbackState2); pipe.Reader.OnWriterCompleted( - (exception, state) => { + (exception, state) => + { Assert.Equal(callbackState3, state); Assert.Equal(2, counter); counter++; @@ -451,7 +469,7 @@ namespace System.IO.Pipelines.Tests [Fact] public void OnWriterCompletedThrowsWithNullCallback() { - var pipe = new Pipe(new PipeOptions(_pool)); + var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); Assert.Throws(() => pipe.Reader.OnWriterCompleted(null, null)); } @@ -461,7 +479,7 @@ namespace System.IO.Pipelines.Tests { var callbackRan = false; var scheduler = new TestScheduler(); - var pipe = new Pipe(new PipeOptions(_pool, scheduler)); + var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline)); pipe.Reader.OnWriterCompleted((exception, state) => { callbackRan = true; }, null); pipe.Writer.Complete(); diff --git a/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs index fe413da..d6fe0cb 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs @@ -11,7 +11,7 @@ namespace System.IO.Pipelines.Tests public PipeLengthTests() { _pool = new TestMemoryPool(); - _pipe = new Pipe(new PipeOptions(_pool)); + _pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); } public void Dispose() diff --git a/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs b/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs index 3a9d2b1..66b9b57 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs @@ -91,7 +91,7 @@ namespace System.IO.Pipelines.Tests var writeSize = 512; - var pipe = new Pipe(new PipeOptions(pool)); + var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); while (pool.CurrentlyRentedBlocks != 3) { PipeWriter writableBuffer = pipe.Writer.WriteEmpty(writeSize); @@ -111,7 +111,7 @@ namespace System.IO.Pipelines.Tests var writeSize = 512; - var pipe = new Pipe(new PipeOptions(pool)); + var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); // Write two blocks Memory buffer = pipe.Writer.GetMemory(writeSize); @@ -135,7 +135,7 @@ namespace System.IO.Pipelines.Tests { var pool = new DisposeTrackingBufferPool(); - var readerWriter = new Pipe(new PipeOptions(pool)); + var readerWriter = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); await readerWriter.Writer.WriteAsync(new byte[] { 1 }); readerWriter.Writer.Complete(); @@ -153,7 +153,7 @@ namespace System.IO.Pipelines.Tests var pool = new DisposeTrackingBufferPool(); var writeSize = 512; - var pipe = new Pipe(new PipeOptions(pool, minimumSegmentSize: 2020)); + var pipe = new Pipe(new PipeOptions(pool, minimumSegmentSize: 2020, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); Memory buffer = pipe.Writer.GetMemory(writeSize); int allocatedSize = buffer.Length; @@ -173,7 +173,7 @@ namespace System.IO.Pipelines.Tests public void ReturnsWriteHeadOnComplete() { var pool = new DisposeTrackingBufferPool(); - var pipe = new Pipe(new PipeOptions(pool)); + var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); var memory = pipe.Writer.GetMemory(512); pipe.Reader.Complete(); @@ -185,7 +185,7 @@ namespace System.IO.Pipelines.Tests public void ReturnsWriteHeadWhenRequestingLargerBlock() { var pool = new DisposeTrackingBufferPool(); - var pipe = new Pipe(new PipeOptions(pool)); + var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); var memory = pipe.Writer.GetMemory(512); pipe.Writer.GetMemory(4096); @@ -201,7 +201,7 @@ namespace System.IO.Pipelines.Tests var writeSize = 512; - var pipe = new Pipe(new PipeOptions(pool)); + var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); await pipe.Writer.WriteAsync(new byte[writeSize]); pipe.Writer.GetMemory(writeSize); diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs index 78645ec..8e6c291 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs @@ -17,7 +17,7 @@ namespace System.IO.Pipelines.Tests public PipelineReaderWriterFacts() { _pool = new TestMemoryPool(); - _pipe = new Pipe(new PipeOptions(_pool)); + _pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); } public void Dispose() @@ -391,7 +391,7 @@ namespace System.IO.Pipelines.Tests // Write Hello to another pipeline and get the buffer byte[] bytes = Encoding.ASCII.GetBytes("Hello"); - var c2 = new Pipe(new PipeOptions(_pool)); + var c2 = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); await c2.Writer.WriteAsync(bytes); ReadResult result = await c2.Reader.ReadAsync(); ReadOnlySequence c2Buffer = result.Buffer; diff --git a/src/libraries/System.IO.Pipelines/tests/PipeResetTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeResetTests.cs index 40659e3..eb9c6ca 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeResetTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeResetTests.cs @@ -13,7 +13,7 @@ namespace System.IO.Pipelines.Tests public PipeResetTests() { _pool = new TestMemoryPool(); - _pipe = new Pipe(new PipeOptions(_pool)); + _pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline)); } public void Dispose() diff --git a/src/libraries/System.IO.Pipelines/tests/PipeTest.cs b/src/libraries/System.IO.Pipelines/tests/PipeTest.cs index 425d652..b907f75 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeTest.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeTest.cs @@ -21,7 +21,9 @@ namespace System.IO.Pipelines.Tests new PipeOptions( _pool, pauseWriterThreshold: pauseWriterThreshold, - resumeWriterThreshold: resumeWriterThreshold + resumeWriterThreshold: resumeWriterThreshold, + readerScheduler: PipeScheduler.Inline, + writerScheduler: PipeScheduler.Inline )); } diff --git a/src/libraries/System.IO.Pipelines/tests/SchedulerFacts.cs b/src/libraries/System.IO.Pipelines/tests/SchedulerFacts.cs index 6cc3507..81dd012 100644 --- a/src/libraries/System.IO.Pipelines/tests/SchedulerFacts.cs +++ b/src/libraries/System.IO.Pipelines/tests/SchedulerFacts.cs @@ -50,16 +50,17 @@ namespace System.IO.Pipelines.Tests } [Fact] - public async Task DefaultReaderSchedulerRunsInline() + public async Task DefaultReaderSchedulerRunsOnThreadPool() { var pipe = new Pipe(); var id = 0; - Func doRead = async () => { + Func doRead = async () => + { ReadResult result = await pipe.Reader.ReadAsync(); - Assert.Equal(Thread.CurrentThread.ManagedThreadId, id); + Assert.True(Thread.CurrentThread.IsThreadPoolThread); pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.End); @@ -80,7 +81,7 @@ namespace System.IO.Pipelines.Tests } [Fact] - public async Task DefaultWriterSchedulerRunsInline() + public async Task DefaultWriterSchedulerRunsOnThreadPool() { using (var pool = new TestMemoryPool()) { @@ -98,12 +99,13 @@ namespace System.IO.Pipelines.Tests var id = 0; - Func doWrite = async () => { + Func doWrite = async () => + { await flushAsync; pipe.Writer.Complete(); - Assert.Equal(Thread.CurrentThread.ManagedThreadId, id); + Assert.True(Thread.CurrentThread.IsThreadPoolThread); }; Task writing = doWrite(); @@ -132,6 +134,7 @@ namespace System.IO.Pipelines.Tests pool, resumeWriterThreshold: 32, pauseWriterThreshold: 64, + readerScheduler: PipeScheduler.Inline, writerScheduler: scheduler)); PipeWriter writableBuffer = pipe.Writer.WriteEmpty(64); @@ -139,7 +142,8 @@ namespace System.IO.Pipelines.Tests Assert.False(flushAsync.IsCompleted); - Func doWrite = async () => { + Func doWrite = async () => + { int oid = Thread.CurrentThread.ManagedThreadId; await flushAsync; @@ -171,9 +175,10 @@ namespace System.IO.Pipelines.Tests { using (var scheduler = new ThreadScheduler()) { - var pipe = new Pipe(new PipeOptions(pool, scheduler)); + var pipe = new Pipe(new PipeOptions(pool, scheduler, writerScheduler: PipeScheduler.Inline)); - Func doRead = async () => { + Func doRead = async () => + { int oid = Thread.CurrentThread.ManagedThreadId; ReadResult result = await pipe.Reader.ReadAsync(); @@ -201,7 +206,7 @@ namespace System.IO.Pipelines.Tests [Fact] public async Task ThreadPoolScheduler_SchedulesOnThreadPool() { - var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.ThreadPool)); + var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.ThreadPool, writerScheduler: PipeScheduler.Inline)); async Task DoRead() {