Made a few tweaks to pipelines (dotnet/corefx#27158)
authorDavid Fowler <davidfowl@gmail.com>
Thu, 15 Feb 2018 18:03:02 +0000 (10:03 -0800)
committerGitHub <noreply@github.com>
Thu, 15 Feb 2018 18:03:02 +0000 (10:03 -0800)
* Made a few tweaks to pipelines
- Renamed the file containing the class IDuplexPipe
- Changed the default scheduler to use the ThreadPool if none was specified

Commit migrated from https://github.com/dotnet/corefx/commit/c2b5a8e3d6339a2c863b1cd48b8663273574fe48

12 files changed:
src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/IDuplexPipe.cs [moved from src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/IPipeConnection.cs with 100% similarity]
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs
src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs
src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs
src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs
src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs
src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs
src/libraries/System.IO.Pipelines/tests/PipeResetTests.cs
src/libraries/System.IO.Pipelines/tests/PipeTest.cs
src/libraries/System.IO.Pipelines/tests/SchedulerFacts.cs

index 61a378d..3e89668 100644 (file)
@@ -11,7 +11,7 @@
     <Compile Include="System\IO\Pipelines\FlushResult.cs" />
     <Compile Include="System\IO\Pipelines\InlineScheduler.cs" />
     <Compile Include="System\IO\Pipelines\IPipeAwaiter.cs" />
-    <Compile Include="System\IO\Pipelines\IPipeConnection.cs" />
+    <Compile Include="System\IO\Pipelines\IDuplexPipe.cs" />
     <Compile Include="System\IO\Pipelines\Pipe.DefaultPipeReader.cs" />
     <Compile Include="System\IO\Pipelines\Pipe.DefaultPipeWriter.cs" />
     <Compile Include="System\IO\Pipelines\Pipe.cs" />
index 31f598a..cc06c46 100644 (file)
@@ -94,8 +94,8 @@ namespace System.IO.Pipelines
             _minimumSegmentSize = options.MinimumSegmentSize;
             _pauseWriterThreshold = options.PauseWriterThreshold;
             _resumeWriterThreshold = options.ResumeWriterThreshold;
-            _readerScheduler = options.ReaderScheduler ?? PipeScheduler.Inline;
-            _writerScheduler = options.WriterScheduler ?? PipeScheduler.Inline;
+            _readerScheduler = options.ReaderScheduler;
+            _writerScheduler = options.WriterScheduler;
             _readerAwaitable = new PipeAwaitable(completed: false);
             _writerAwaitable = new PipeAwaitable(completed: true);
             _reader = new DefaultPipeReader(this);
index 5ee5a80..dbc143a 100644 (file)
@@ -38,8 +38,8 @@ namespace System.IO.Pipelines
             }
 
             Pool = pool ?? MemoryPool<byte>.Shared;
-            ReaderScheduler = readerScheduler;
-            WriterScheduler = writerScheduler;
+            ReaderScheduler = readerScheduler ?? PipeScheduler.ThreadPool;
+            WriterScheduler = writerScheduler ?? PipeScheduler.ThreadPool;
             PauseWriterThreshold = pauseWriterThreshold;
             ResumeWriterThreshold = resumeWriterThreshold;
             MinimumSegmentSize = minimumSegmentSize;
index b0acf91..8812b66 100644 (file)
@@ -12,7 +12,7 @@ namespace System.IO.Pipelines.Tests
         public BackpressureTests()
         {
             _pool = new TestMemoryPool();
-            _pipe = new Pipe(new PipeOptions(_pool, resumeWriterThreshold: 32, pauseWriterThreshold: 64));
+            _pipe = new Pipe(new PipeOptions(_pool, resumeWriterThreshold: 32, pauseWriterThreshold: 64, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
         }
 
         public void Dispose()
index a9ab310..b81f052 100644 (file)
@@ -49,7 +49,7 @@ namespace System.IO.Pipelines.Tests
         public void CompletingReaderFromWriterCallbackWorks()
         {
             var callbackRan = false;
-            var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5));
+            var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             pipe.Writer.OnReaderCompleted((exception, state) => { pipe.Writer.Complete(); }, null);
 
@@ -63,7 +63,7 @@ namespace System.IO.Pipelines.Tests
         public void CompletingWriterFromReaderCallbackWorks()
         {
             var callbackRan = false;
-            var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5));
+            var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             pipe.Reader.OnWriterCompleted((exception, state) => { pipe.Reader.Complete(); }, null);
 
@@ -83,9 +83,10 @@ namespace System.IO.Pipelines.Tests
 
             var counter = 0;
 
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState1, state);
                     Assert.Equal(0, counter);
                     counter++;
@@ -93,7 +94,8 @@ namespace System.IO.Pipelines.Tests
                 }, callbackState1);
 
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState2, state);
                     Assert.Equal(1, counter);
                     counter++;
@@ -111,7 +113,7 @@ namespace System.IO.Pipelines.Tests
         {
             var exception = new Exception();
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
             pipe.Writer.OnReaderCompleted((e, state) => throw exception, null);
             pipe.Reader.Complete();
 
@@ -125,11 +127,12 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
             pipe.Reader.Complete();
 
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Null(exception);
                     callbackRan = true;
                 }, null);
@@ -143,7 +146,7 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
             pipe.Writer.OnReaderCompleted((exception, state) => { callbackRan = true; }, null);
 
             pipe.Reader.Complete();
@@ -164,11 +167,12 @@ namespace System.IO.Pipelines.Tests
         public void OnReaderCompletedPassesException()
         {
             var callbackRan = false;
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             var readerException = new Exception();
 
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     callbackRan = true;
                     Assert.Same(readerException, exception);
                 }, null);
@@ -182,9 +186,10 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var callbackState = new object();
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState, state);
                     callbackRan = true;
                 }, callbackState);
@@ -198,10 +203,11 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var continuationRan = false;
-            var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5));
+            var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.False(continuationRan);
                     callbackRan = true;
                 }, null);
@@ -225,23 +231,26 @@ namespace System.IO.Pipelines.Tests
             var callbackState3 = new object();
             var counter = 0;
 
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState1, state);
                     Assert.Equal(0, counter);
                     counter++;
                 }, callbackState1);
 
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState2, state);
                     Assert.Equal(1, counter);
                     counter++;
                 }, callbackState2);
 
             pipe.Writer.OnReaderCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState3, state);
                     Assert.Equal(2, counter);
                     counter++;
@@ -255,7 +264,7 @@ namespace System.IO.Pipelines.Tests
         [Fact]
         public void OnReaderCompletedThrowsWithNullCallback()
         {
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             Assert.Throws<ArgumentNullException>(() => pipe.Writer.OnReaderCompleted(null, null));
         }
@@ -265,7 +274,7 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, writerScheduler: scheduler, readerScheduler: PipeScheduler.Inline));
             pipe.Writer.OnReaderCompleted((exception, state) => { callbackRan = true; }, null);
             pipe.Reader.Complete();
 
@@ -283,9 +292,10 @@ namespace System.IO.Pipelines.Tests
 
             var counter = 0;
 
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState1, state);
                     Assert.Equal(0, counter);
                     counter++;
@@ -293,7 +303,8 @@ namespace System.IO.Pipelines.Tests
                 }, callbackState1);
 
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState2, state);
                     Assert.Equal(1, counter);
                     counter++;
@@ -311,7 +322,7 @@ namespace System.IO.Pipelines.Tests
         {
             var exception = new Exception();
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
             pipe.Reader.OnWriterCompleted((e, state) => throw exception, null);
             pipe.Writer.Complete();
 
@@ -325,11 +336,12 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
             pipe.Writer.Complete();
 
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Null(exception);
                     callbackRan = true;
                 }, null);
@@ -343,7 +355,7 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
             pipe.Reader.OnWriterCompleted((exception, state) => { callbackRan = true; }, null);
             pipe.Reader.Complete();
             pipe.Writer.Complete();
@@ -363,11 +375,12 @@ namespace System.IO.Pipelines.Tests
         public void OnWriterCompletedPassesException()
         {
             var callbackRan = false;
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             var readerException = new Exception();
 
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     callbackRan = true;
                     Assert.Same(readerException, exception);
                 }, null);
@@ -381,9 +394,10 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var callbackState = new object();
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState, state);
                     callbackRan = true;
                 }, callbackState);
@@ -397,10 +411,11 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var continuationRan = false;
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     callbackRan = true;
                     Assert.False(continuationRan);
                 }, null);
@@ -421,23 +436,26 @@ namespace System.IO.Pipelines.Tests
             var callbackState3 = new object();
             var counter = 0;
 
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState1, state);
                     Assert.Equal(0, counter);
                     counter++;
                 }, callbackState1);
 
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState2, state);
                     Assert.Equal(1, counter);
                     counter++;
                 }, callbackState2);
 
             pipe.Reader.OnWriterCompleted(
-                (exception, state) => {
+                (exception, state) =>
+                {
                     Assert.Equal(callbackState3, state);
                     Assert.Equal(2, counter);
                     counter++;
@@ -451,7 +469,7 @@ namespace System.IO.Pipelines.Tests
         [Fact]
         public void OnWriterCompletedThrowsWithNullCallback()
         {
-            var pipe = new Pipe(new PipeOptions(_pool));
+            var pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             Assert.Throws<ArgumentNullException>(() => pipe.Reader.OnWriterCompleted(null, null));
         }
@@ -461,7 +479,7 @@ namespace System.IO.Pipelines.Tests
         {
             var callbackRan = false;
             var scheduler = new TestScheduler();
-            var pipe = new Pipe(new PipeOptions(_pool, scheduler));
+            var pipe = new Pipe(new PipeOptions(_pool, scheduler, PipeScheduler.Inline));
             pipe.Reader.OnWriterCompleted((exception, state) => { callbackRan = true; }, null);
             pipe.Writer.Complete();
 
index fe413da..d6fe0cb 100644 (file)
@@ -11,7 +11,7 @@ namespace System.IO.Pipelines.Tests
         public PipeLengthTests()
         {
             _pool = new TestMemoryPool();
-            _pipe = new Pipe(new PipeOptions(_pool));
+            _pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
         }
 
         public void Dispose()
index 3a9d2b1..66b9b57 100644 (file)
@@ -91,7 +91,7 @@ namespace System.IO.Pipelines.Tests
 
             var writeSize = 512;
 
-            var pipe = new Pipe(new PipeOptions(pool));
+            var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             while (pool.CurrentlyRentedBlocks != 3)
             {
                 PipeWriter writableBuffer = pipe.Writer.WriteEmpty(writeSize);
@@ -111,7 +111,7 @@ namespace System.IO.Pipelines.Tests
 
             var writeSize = 512;
 
-            var pipe = new Pipe(new PipeOptions(pool));
+            var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             // Write two blocks
             Memory<byte> buffer = pipe.Writer.GetMemory(writeSize);
@@ -135,7 +135,7 @@ namespace System.IO.Pipelines.Tests
         {
             var pool = new DisposeTrackingBufferPool();
 
-            var readerWriter = new Pipe(new PipeOptions(pool));
+            var readerWriter = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             await readerWriter.Writer.WriteAsync(new byte[] { 1 });
 
             readerWriter.Writer.Complete();
@@ -153,7 +153,7 @@ namespace System.IO.Pipelines.Tests
             var pool = new DisposeTrackingBufferPool();
             var writeSize = 512;
 
-            var pipe = new Pipe(new PipeOptions(pool, minimumSegmentSize: 2020));
+            var pipe = new Pipe(new PipeOptions(pool, minimumSegmentSize: 2020, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
 
             Memory<byte> buffer = pipe.Writer.GetMemory(writeSize);
             int allocatedSize = buffer.Length;
@@ -173,7 +173,7 @@ namespace System.IO.Pipelines.Tests
         public void ReturnsWriteHeadOnComplete()
         {
             var pool = new DisposeTrackingBufferPool();
-            var pipe = new Pipe(new PipeOptions(pool));
+            var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             var memory = pipe.Writer.GetMemory(512);
 
             pipe.Reader.Complete();
@@ -185,7 +185,7 @@ namespace System.IO.Pipelines.Tests
         public void ReturnsWriteHeadWhenRequestingLargerBlock()
         {
             var pool = new DisposeTrackingBufferPool();
-            var pipe = new Pipe(new PipeOptions(pool));
+            var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             var memory = pipe.Writer.GetMemory(512);
             pipe.Writer.GetMemory(4096);
 
@@ -201,7 +201,7 @@ namespace System.IO.Pipelines.Tests
 
             var writeSize = 512;
 
-            var pipe = new Pipe(new PipeOptions(pool));
+            var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             await pipe.Writer.WriteAsync(new byte[writeSize]);
 
             pipe.Writer.GetMemory(writeSize);
index 78645ec..8e6c291 100644 (file)
@@ -17,7 +17,7 @@ namespace System.IO.Pipelines.Tests
         public PipelineReaderWriterFacts()
         {
             _pool = new TestMemoryPool();
-            _pipe = new Pipe(new PipeOptions(_pool));
+            _pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
         }
 
         public void Dispose()
@@ -391,7 +391,7 @@ namespace System.IO.Pipelines.Tests
             // Write Hello to another pipeline and get the buffer
             byte[] bytes = Encoding.ASCII.GetBytes("Hello");
 
-            var c2 = new Pipe(new PipeOptions(_pool));
+            var c2 = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
             await c2.Writer.WriteAsync(bytes);
             ReadResult result = await c2.Reader.ReadAsync();
             ReadOnlySequence<byte> c2Buffer = result.Buffer;
index 40659e3..eb9c6ca 100644 (file)
@@ -13,7 +13,7 @@ namespace System.IO.Pipelines.Tests
         public PipeResetTests()
         {
             _pool = new TestMemoryPool();
-            _pipe = new Pipe(new PipeOptions(_pool));
+            _pipe = new Pipe(new PipeOptions(_pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
         }
 
         public void Dispose()
index 425d652..b907f75 100644 (file)
@@ -21,7 +21,9 @@ namespace System.IO.Pipelines.Tests
                 new PipeOptions(
                     _pool,
                     pauseWriterThreshold: pauseWriterThreshold,
-                    resumeWriterThreshold: resumeWriterThreshold
+                    resumeWriterThreshold: resumeWriterThreshold,
+                    readerScheduler: PipeScheduler.Inline,
+                    writerScheduler: PipeScheduler.Inline
                 ));
         }
 
index 6cc3507..81dd012 100644 (file)
@@ -50,16 +50,17 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public async Task DefaultReaderSchedulerRunsInline()
+        public async Task DefaultReaderSchedulerRunsOnThreadPool()
         {
             var pipe = new Pipe();
 
             var id = 0;
 
-            Func<Task> doRead = async () => {
+            Func<Task> doRead = async () =>
+            {
                 ReadResult result = await pipe.Reader.ReadAsync();
 
-                Assert.Equal(Thread.CurrentThread.ManagedThreadId, id);
+                Assert.True(Thread.CurrentThread.IsThreadPoolThread);
 
                 pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.End);
 
@@ -80,7 +81,7 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public async Task DefaultWriterSchedulerRunsInline()
+        public async Task DefaultWriterSchedulerRunsOnThreadPool()
         {
             using (var pool = new TestMemoryPool())
             {
@@ -98,12 +99,13 @@ namespace System.IO.Pipelines.Tests
 
                 var id = 0;
 
-                Func<Task> doWrite = async () => {
+                Func<Task> doWrite = async () =>
+                {
                     await flushAsync;
 
                     pipe.Writer.Complete();
 
-                    Assert.Equal(Thread.CurrentThread.ManagedThreadId, id);
+                    Assert.True(Thread.CurrentThread.IsThreadPoolThread);
                 };
 
                 Task writing = doWrite();
@@ -132,6 +134,7 @@ namespace System.IO.Pipelines.Tests
                             pool,
                             resumeWriterThreshold: 32,
                             pauseWriterThreshold: 64,
+                            readerScheduler: PipeScheduler.Inline,
                             writerScheduler: scheduler));
 
                     PipeWriter writableBuffer = pipe.Writer.WriteEmpty(64);
@@ -139,7 +142,8 @@ namespace System.IO.Pipelines.Tests
 
                     Assert.False(flushAsync.IsCompleted);
 
-                    Func<Task> doWrite = async () => {
+                    Func<Task> doWrite = async () =>
+                    {
                         int oid = Thread.CurrentThread.ManagedThreadId;
 
                         await flushAsync;
@@ -171,9 +175,10 @@ namespace System.IO.Pipelines.Tests
             {
                 using (var scheduler = new ThreadScheduler())
                 {
-                    var pipe = new Pipe(new PipeOptions(pool, scheduler));
+                    var pipe = new Pipe(new PipeOptions(pool, scheduler, writerScheduler: PipeScheduler.Inline));
 
-                    Func<Task> doRead = async () => {
+                    Func<Task> doRead = async () =>
+                    {
                         int oid = Thread.CurrentThread.ManagedThreadId;
 
                         ReadResult result = await pipe.Reader.ReadAsync();
@@ -201,7 +206,7 @@ namespace System.IO.Pipelines.Tests
         [Fact]
         public async Task ThreadPoolScheduler_SchedulesOnThreadPool()
         {
-            var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.ThreadPool));
+            var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.ThreadPool, writerScheduler: PipeScheduler.Inline));
 
             async Task DoRead()
             {