Fix remaining races in WriteAsync (dotnet/corefx#37414)
authorDavid Fowler <davidfowl@gmail.com>
Mon, 6 May 2019 18:10:04 +0000 (11:10 -0700)
committerGitHub <noreply@github.com>
Mon, 6 May 2019 18:10:04 +0000 (11:10 -0700)
* Fix remaining races in WriteAsync
- Lock everything, this ensures Complete won't interfere with the copying or flushing

* Enable test

Commit migrated from https://github.com/dotnet/corefx/commit/5921c1076078e90c3998f41890dddeeb704b790c

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs

index 69925aa..dbd5a2d 100644 (file)
@@ -334,44 +334,49 @@ namespace System.IO.Pipelines
             ValueTask<FlushResult> result;
             lock (_sync)
             {
-                var wasEmpty = CommitUnsynchronized();
+                PrepareFlush(out completionData, out result, cancellationToken);
+            }
 
-                // AttachToken before completing reader awaiter in case cancellationToken is already completed
-                _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
+            TrySchedule(_readerScheduler, completionData);
 
-                // If the writer is completed (which it will be most of the time) then return a completed ValueTask
-                if (_writerAwaitable.IsCompleted)
-                {
-                    var flushResult = new FlushResult();
-                    GetFlushResult(ref flushResult);
-                    result = new ValueTask<FlushResult>(flushResult);
-                }
-                else
-                {
-                    // Otherwise it's async
-                    result = new ValueTask<FlushResult>(_writer, token: 0);
-                }
+            return result;
+        }
 
-                // Complete reader only if new data was pushed into the pipe
-                // Avoid throwing in between completing the reader and scheduling the callback
-                // if the intent is to allow pipe to continue reading the data
-                if (!wasEmpty)
-                {
-                    _readerAwaitable.Complete(out completionData);
-                }
-                else
-                {
-                    completionData = default;
-                }
+        private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
+        {
+            var wasEmpty = CommitUnsynchronized();
 
-                // I couldn't find a way for flush to induce backpressure deadlock
-                // if it always adds new data to pipe and wakes up the reader but assert anyway
-                Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);
+            // AttachToken before completing reader awaiter in case cancellationToken is already completed
+            _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
+
+            // If the writer is completed (which it will be most of the time) then return a completed ValueTask
+            if (_writerAwaitable.IsCompleted)
+            {
+                var flushResult = new FlushResult();
+                GetFlushResult(ref flushResult);
+                result = new ValueTask<FlushResult>(flushResult);
+            }
+            else
+            {
+                // Otherwise it's async
+                result = new ValueTask<FlushResult>(_writer, token: 0);
             }
 
-            TrySchedule(_readerScheduler, completionData);
+            // Complete reader only if new data was pushed into the pipe
+            // Avoid throwing in between completing the reader and scheduling the callback
+            // if the intent is to allow pipe to continue reading the data
+            if (!wasEmpty)
+            {
+                _readerAwaitable.Complete(out completionData);
+            }
+            else
+            {
+                completionData = default;
+            }
 
-            return result;
+            // I couldn't find a way for flush to induce backpressure deadlock
+            // if it always adds new data to pipe and wakes up the reader but assert anyway
+            Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);
         }
 
         internal void CompleteWriter(Exception exception)
@@ -930,12 +935,15 @@ namespace System.IO.Pipelines
                 ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
             }
 
-            // Allocate whatever the pool gives us so we can write, this also marks the
-            // state as writing
-            AllocateWriteHeadIfNeeded(0);
+            CompletionData completionData;
+            ValueTask<FlushResult> result;
 
             lock (_sync)
             {
+                // Allocate whatever the pool gives us so we can write, this also marks the
+                // state as writing
+                AllocateWriteHeadIfNeeded(0);
+
                 if (source.Length <= _writingHeadMemory.Length)
                 {
                     source.CopyTo(_writingHeadMemory);
@@ -947,9 +955,12 @@ namespace System.IO.Pipelines
                     // This is the multi segment copy
                     WriteMultiSegment(source.Span);
                 }
+
+                PrepareFlush(out completionData, out result, cancellationToken);
             }
 
-            return FlushAsync(cancellationToken);
+            TrySchedule(_readerScheduler, completionData);
+            return result;
         }
 
         private void WriteMultiSegment(ReadOnlySpan<byte> source)
index 68ad54f..0ae5110 100644 (file)
@@ -233,7 +233,6 @@ namespace System.IO.Pipelines.Tests
             pipe.Reader.Complete();
         }
 
-        [ActiveIssue(37239)]
         [Fact]
         public async Task CompleteWithLargeWriteThrows()
         {