ValueTask<FlushResult> result;
lock (_sync)
{
- var wasEmpty = CommitUnsynchronized();
+ PrepareFlush(out completionData, out result, cancellationToken);
+ }
- // AttachToken before completing reader awaiter in case cancellationToken is already completed
- _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
+ TrySchedule(_readerScheduler, completionData);
- // If the writer is completed (which it will be most of the time) then return a completed ValueTask
- if (_writerAwaitable.IsCompleted)
- {
- var flushResult = new FlushResult();
- GetFlushResult(ref flushResult);
- result = new ValueTask<FlushResult>(flushResult);
- }
- else
- {
- // Otherwise it's async
- result = new ValueTask<FlushResult>(_writer, token: 0);
- }
+ return result;
+ }
- // Complete reader only if new data was pushed into the pipe
- // Avoid throwing in between completing the reader and scheduling the callback
- // if the intent is to allow pipe to continue reading the data
- if (!wasEmpty)
- {
- _readerAwaitable.Complete(out completionData);
- }
- else
- {
- completionData = default;
- }
+ private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
+ {
+ var wasEmpty = CommitUnsynchronized();
- // I couldn't find a way for flush to induce backpressure deadlock
- // if it always adds new data to pipe and wakes up the reader but assert anyway
- Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);
+ // AttachToken before completing reader awaiter in case cancellationToken is already completed
+ _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
+
+ // If the writer is completed (which it will be most of the time) then return a completed ValueTask
+ if (_writerAwaitable.IsCompleted)
+ {
+ var flushResult = new FlushResult();
+ GetFlushResult(ref flushResult);
+ result = new ValueTask<FlushResult>(flushResult);
+ }
+ else
+ {
+ // Otherwise it's async
+ result = new ValueTask<FlushResult>(_writer, token: 0);
}
- TrySchedule(_readerScheduler, completionData);
+ // Complete reader only if new data was pushed into the pipe
+ // Avoid throwing in between completing the reader and scheduling the callback
+ // if the intent is to allow pipe to continue reading the data
+ if (!wasEmpty)
+ {
+ _readerAwaitable.Complete(out completionData);
+ }
+ else
+ {
+ completionData = default;
+ }
- return result;
+ // I couldn't find a way for flush to induce backpressure deadlock
+ // if it always adds new data to pipe and wakes up the reader but assert anyway
+ Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);
}
internal void CompleteWriter(Exception exception)
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
- // Allocate whatever the pool gives us so we can write, this also marks the
- // state as writing
- AllocateWriteHeadIfNeeded(0);
+ CompletionData completionData;
+ ValueTask<FlushResult> result;
lock (_sync)
{
+ // Allocate whatever the pool gives us so we can write, this also marks the
+ // state as writing
+ AllocateWriteHeadIfNeeded(0);
+
if (source.Length <= _writingHeadMemory.Length)
{
source.CopyTo(_writingHeadMemory);
// This is the multi segment copy
WriteMultiSegment(source.Span);
}
+
+ PrepareFlush(out completionData, out result, cancellationToken);
}
- return FlushAsync(cancellationToken);
+ TrySchedule(_readerScheduler, completionData);
+ return result;
}
private void WriteMultiSegment(ReadOnlySpan<byte> source)