Nullable annotation for System.IO.Pipelines (dotnet/corefx#41407)
authorbuyaa-n <bunamnan@microsoft.com>
Mon, 14 Oct 2019 16:52:26 +0000 (09:52 -0700)
committerGitHub <noreply@github.com>
Mon, 14 Oct 2019 16:52:26 +0000 (09:52 -0700)
* Nullable annotation for System.IO.Pipelines

Commit migrated from https://github.com/dotnet/corefx/commit/48363ac826ccf66fbe31a5dcb1dc2aab9a7dd768

26 files changed:
src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj
src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/CompletionData.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/InlineScheduler.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletionCallback.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletionCallbacks.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeScheduler.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReaderOptions.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriterOptions.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThreadPoolScheduler.netcoreapp.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs

index c90b090..2cc01c3 100644 (file)
@@ -29,7 +29,7 @@ namespace System.IO.Pipelines
     }
     public partial class PipeOptions
     {
-        public PipeOptions(System.Buffers.MemoryPool<byte> pool = null, System.IO.Pipelines.PipeScheduler readerScheduler = null, System.IO.Pipelines.PipeScheduler writerScheduler = null, long pauseWriterThreshold = (long)-1, long resumeWriterThreshold = (long)-1, int minimumSegmentSize = -1, bool useSynchronizationContext = true) { }
+        public PipeOptions(System.Buffers.MemoryPool<byte>? pool = null, System.IO.Pipelines.PipeScheduler? readerScheduler = null, System.IO.Pipelines.PipeScheduler? writerScheduler = null, long pauseWriterThreshold = (long)-1, long resumeWriterThreshold = (long)-1, int minimumSegmentSize = -1, bool useSynchronizationContext = true) { }
         public static System.IO.Pipelines.PipeOptions Default { get { throw null; } }
         public int MinimumSegmentSize { get { throw null; } }
         public long PauseWriterThreshold { get { throw null; } }
@@ -46,13 +46,13 @@ namespace System.IO.Pipelines
         public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
         public virtual System.IO.Stream AsStream(bool leaveOpen = false) { throw null; }
         public abstract void CancelPendingRead();
-        public abstract void Complete(System.Exception exception = null);
-        public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception exception = null) { throw null; }
+        public abstract void Complete(System.Exception? exception = null);
+        public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
         public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
         public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
-        public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions readerOptions = null) { throw null; }
+        public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
         [System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
-        public virtual void OnWriterCompleted(System.Action<System.Exception, object> callback, object state) { }
+        public virtual void OnWriterCompleted(System.Action<System.Exception?, object> callback, object state) { }
         public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
         public abstract bool TryRead(out System.IO.Pipelines.ReadResult result);
     }
@@ -61,7 +61,7 @@ namespace System.IO.Pipelines
         protected PipeScheduler() { }
         public static System.IO.Pipelines.PipeScheduler Inline { get { throw null; } }
         public static System.IO.Pipelines.PipeScheduler ThreadPool { get { throw null; } }
-        public abstract void Schedule(System.Action<object> action, object state);
+        public abstract void Schedule(System.Action<object?> action, object? state);
     }
     public abstract partial class PipeWriter : System.Buffers.IBufferWriter<byte>
     {
@@ -69,15 +69,15 @@ namespace System.IO.Pipelines
         public abstract void Advance(int bytes);
         public virtual System.IO.Stream AsStream(bool leaveOpen = false) { throw null; }
         public abstract void CancelPendingFlush();
-        public abstract void Complete(System.Exception exception = null);
-        public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception exception = null) { throw null; }
+        public abstract void Complete(System.Exception? exception = null);
+        public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
         protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
-        public static System.IO.Pipelines.PipeWriter Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeWriterOptions writerOptions = null) { throw null; }
+        public static System.IO.Pipelines.PipeWriter Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeWriterOptions? writerOptions = null) { throw null; }
         public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
         public abstract System.Memory<byte> GetMemory(int sizeHint = 0);
         public abstract System.Span<byte> GetSpan(int sizeHint = 0);
         [System.ObsoleteAttribute("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
-        public virtual void OnReaderCompleted(System.Action<System.Exception, object> callback, object state) { }
+        public virtual void OnReaderCompleted(System.Action<System.Exception?, object> callback, object state) { }
         public virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
     }
     public readonly partial struct ReadResult
@@ -95,7 +95,7 @@ namespace System.IO.Pipelines
     }
     public partial class StreamPipeReaderOptions
     {
-        public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte> pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
+        public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
         public int BufferSize { get { throw null; } }
         public bool LeaveOpen { get { throw null; } }
         public int MinimumReadSize { get { throw null; } }
@@ -103,7 +103,7 @@ namespace System.IO.Pipelines
     }
     public partial class StreamPipeWriterOptions
     {
-        public StreamPipeWriterOptions(System.Buffers.MemoryPool<byte> pool = null, int minimumBufferSize = -1, bool leaveOpen = false) { }
+        public StreamPipeWriterOptions(System.Buffers.MemoryPool<byte>? pool = null, int minimumBufferSize = -1, bool leaveOpen = false) { }
         public bool LeaveOpen { get { throw null; } }
         public int MinimumBufferSize { get { throw null; } }
         public System.Buffers.MemoryPool<byte> Pool { get { throw null; } }
index 9cd199e..670a905 100644 (file)
@@ -1,6 +1,7 @@
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
     <Configurations>netstandard-Debug;netstandard-Release</Configurations>
+    <Nullable>enable</Nullable>
     <!-- We only plan to use this ref in netcoreapp. For all other netstandard compatible frameworks we should use the lib
     asset instead. -->
     <PackageTargetFramework Condition="'$(TargetGroup)' == 'netstandard'">netcoreapp2.0</PackageTargetFramework>
index 5df4663..eb00266 100644 (file)
@@ -1,6 +1,8 @@
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
     <Configurations>netcoreapp-Debug;netcoreapp-Release;netcoreapp3.0-Debug;netcoreapp3.0-Release;netstandard-Debug;netstandard-Release</Configurations>
+    <DefineConstants Condition="'$(TargetGroup)'=='netstandard'">$(DefineConstants);INTERNAL_NULLABLE_ATTRIBUTES</DefineConstants>
+    <Nullable>enable</Nullable>
   </PropertyGroup>
   <ItemGroup>
     <Compile Include="$(CommonPath)\CoreLib\System\Threading\Tasks\TaskToApm.cs">
@@ -43,6 +45,7 @@
     <Compile Include="System\IO\Pipelines\StreamExtensions.netstandard.cs" />
     <Compile Include="System\IO\Pipelines\ThreadPoolScheduler.netstandard.cs" />
     <Compile Include="System\IO\Pipelines\CancellationTokenExtensions.netstandard.cs" />
+    <Compile Include="$(CommonPath)\CoreLib\System\Diagnostics\CodeAnalysis\NullableAttributes.cs" Link="System\Diagnostics\CodeAnalysis\NullableAttributes.cs" />
   </ItemGroup>
   <ItemGroup>
     <Reference Include="System.Buffers" />
index 7d0a297..ffc2705 100644 (file)
@@ -10,8 +10,8 @@ namespace System.IO.Pipelines
 {
     internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
     {
-        private object _memoryOwner;
-        private BufferSegment _next;
+        private object? _memoryOwner;
+        private BufferSegment? _next;
         private int _end;
 
         /// <summary>
@@ -37,7 +37,7 @@ namespace System.IO.Pipelines
         /// working memory. The "active" memory is grown when bytes are copied in, End is increased, and Next is assigned. The "active"
         /// memory is shrunk when bytes are consumed, Start is increased, and blocks are returned to the pool.
         /// </summary>
-        public BufferSegment NextSegment
+        public BufferSegment? NextSegment
         {
             get => _next;
             set
@@ -67,6 +67,7 @@ namespace System.IO.Pipelines
             }
             else
             {
+                Debug.Assert(_memoryOwner is byte[]);
                 byte[] poolArray = (byte[])_memoryOwner;
                 ArrayPool<byte>.Shared.Return(poolArray);
             }
@@ -83,7 +84,7 @@ namespace System.IO.Pipelines
         }
 
         // Exposed for testing
-        internal object MemoryOwner => _memoryOwner;
+        internal object? MemoryOwner => _memoryOwner;
 
         public Memory<byte> AvailableMemory { get; private set; }
 
@@ -106,6 +107,7 @@ namespace System.IO.Pipelines
 
             while (segment.Next != null)
             {
+                Debug.Assert(segment.NextSegment != null);
                 segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length;
                 segment = segment.NextSegment;
             }
index a2f09fe..51b8573 100644 (file)
@@ -6,6 +6,7 @@ using System;
 using System.Collections.Generic;
 using System.Runtime.CompilerServices;
 using System.Text;
+using System.Diagnostics.CodeAnalysis;
 
 namespace System.IO.Pipelines
 {
@@ -22,7 +23,7 @@ namespace System.IO.Pipelines
 
         public int Count => _size;
 
-        public bool TryPop(out BufferSegment result)
+        public bool TryPop([NotNullWhen(true)] out BufferSegment? result)
         {
             int size = _size - 1;
             SegmentAsValueType[] array = _array;
index 9420ecc..a953563 100644 (file)
@@ -8,12 +8,12 @@ namespace System.IO.Pipelines
 {
     internal readonly struct CompletionData
     {
-        public Action<object> Completion { get; }
-        public object CompletionState { get; }
-        public ExecutionContext ExecutionContext { get; }
-        public SynchronizationContext SynchronizationContext { get; }
+        public Action<object?> Completion { get; }
+        public object? CompletionState { get; }
+        public ExecutionContext? ExecutionContext { get; }
+        public SynchronizationContext? SynchronizationContext { get; }
 
-        public CompletionData(Action<object> completion, object completionState, ExecutionContext executionContext, SynchronizationContext synchronizationContext)
+        public CompletionData(Action<object?> completion, object? completionState, ExecutionContext? executionContext, SynchronizationContext? synchronizationContext)
         {
             Completion = completion;
             CompletionState = completionState;
index e19ca0e..a503ba2 100644 (file)
@@ -6,12 +6,12 @@ namespace System.IO.Pipelines
 {
     internal sealed class InlineScheduler : PipeScheduler
     {
-        public override void Schedule(Action<object> action, object state)
+        public override void Schedule(Action<object?> action, object? state)
         {
             action(state);
         }
 
-        internal override void UnsafeSchedule(Action<object> action, object state)
+        internal override void UnsafeSchedule(Action<object?> action, object? state)
         {
             action(state);
         }
index 2579397..eab23c2 100644 (file)
@@ -32,17 +32,17 @@ namespace System.IO.Pipelines
 
             public override void CancelPendingRead() => _pipe.CancelPendingRead();
 
-            public override void Complete(Exception exception = null) => _pipe.CompleteReader(exception);
+            public override void Complete(Exception? exception = null) => _pipe.CompleteReader(exception);
 
 #pragma warning disable CS0672 // Member overrides obsolete member
-            public override void OnWriterCompleted(Action<Exception, object> callback, object state) => _pipe.OnWriterCompleted(callback, state);
+            public override void OnWriterCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnWriterCompleted(callback, state);
 #pragma warning restore CS0672 // Member overrides obsolete member
 
             public ValueTaskSourceStatus GetStatus(short token) => _pipe.GetReadAsyncStatus();
 
             public ReadResult GetResult(short token) => _pipe.GetReadAsyncResult();
 
-            public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnReadAsyncCompleted(continuation, state, flags);
+            public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnReadAsyncCompleted(continuation, state, flags);
         }
     }
 }
index 94077bb..8e5cf95 100644 (file)
@@ -22,12 +22,12 @@ namespace System.IO.Pipelines
                 _pipe = pipe;
             }
 
-            public override void Complete(Exception exception = null) => _pipe.CompleteWriter(exception);
+            public override void Complete(Exception? exception = null) => _pipe.CompleteWriter(exception);
 
             public override void CancelPendingFlush() => _pipe.CancelPendingFlush();
 
 #pragma warning disable CS0672 // Member overrides obsolete member
-            public override void OnReaderCompleted(Action<Exception, object> callback, object state) => _pipe.OnReaderCompleted(callback, state);
+            public override void OnReaderCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnReaderCompleted(callback, state);
 #pragma warning restore CS0672 // Member overrides obsolete member
 
             public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => _pipe.FlushAsync(cancellationToken);
@@ -42,7 +42,7 @@ namespace System.IO.Pipelines
 
             public FlushResult GetResult(short token) => _pipe.GetFlushAsyncResult();
 
-            public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnFlushAsyncCompleted(continuation, state, flags);
+            public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnFlushAsyncCompleted(continuation, state, flags);
 
             public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
             {
index 70cd65d..563dd97 100644 (file)
@@ -20,20 +20,20 @@ namespace System.IO.Pipelines
         internal const int InitialSegmentPoolSize = 16; // 65K
         internal const int MaxSegmentPoolSize = 256; // 1MB
 
-        private static readonly Action<object> s_signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested();
-        private static readonly Action<object> s_signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested();
-        private static readonly Action<object> s_invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state).Execute();
+        private static readonly Action<object?> s_signalReaderAwaitable = state => ((Pipe)state!).ReaderCancellationRequested();
+        private static readonly Action<object?> s_signalWriterAwaitable = state => ((Pipe)state!).WriterCancellationRequested();
+        private static readonly Action<object?> s_invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state!).Execute();
 
         // These callbacks all point to the same methods but are different delegate types
-        private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext;
-        private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext;
-        private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext;
-        private static readonly Action<object> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext;
+        private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext!;
+        private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext!;
+        private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext!;
+        private static readonly Action<object?> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext!;
 
         // This sync objects protects the shared state between the writer and reader (most of this class)
         private readonly object _sync = new object();
 
-        private readonly MemoryPool<byte> _pool;
+        private readonly MemoryPool<byte>? _pool;
         private readonly int _minimumSegmentSize;
         private readonly long _pauseWriterThreshold;
         private readonly long _resumeWriterThreshold;
@@ -66,15 +66,15 @@ namespace System.IO.Pipelines
         private long _lastExaminedIndex = -1;
 
         // The read head which is the start of the PipeReader's consumed bytes
-        private BufferSegment _readHead;
+        private BufferSegment? _readHead;
         private int _readHeadIndex;
 
         // The extent of the bytes available to the PipeReader to consume
-        private BufferSegment _readTail;
+        private BufferSegment? _readTail;
         private int _readTailIndex;
 
         // The write head which is the extent of the PipeWriter's written bytes
-        private BufferSegment _writingHead;
+        private BufferSegment? _writingHead;
         private Memory<byte> _writingHeadMemory;
         private int _writingHeadBytesBuffered;
 
@@ -251,7 +251,7 @@ namespace System.IO.Pipelines
 
         private BufferSegment CreateSegmentUnsynchronized()
         {
-            if (_bufferSegmentPool.TryPop(out BufferSegment segment))
+            if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
             {
                 return segment;
             }
@@ -282,6 +282,7 @@ namespace System.IO.Pipelines
             }
 
             // Update the writing head
+            Debug.Assert(_writingHead != null);
             _writingHead.End += _writingHeadBytesBuffered;
 
             // Always move the read tail to the write head
@@ -378,10 +379,10 @@ namespace System.IO.Pipelines
             Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);
         }
 
-        internal void CompleteWriter(Exception exception)
+        internal void CompleteWriter(Exception? exception)
         {
             CompletionData completionData;
-            PipeCompletionCallbacks completionCallbacks;
+            PipeCompletionCallbacks? completionCallbacks;
             bool readerCompleted;
 
             lock (_sync)
@@ -422,10 +423,10 @@ namespace System.IO.Pipelines
 
             // TODO: Use new SequenceMarshal.TryGetReadOnlySequenceSegment to get the correct data
             // directly casting only works because the type value in ReadOnlySequenceSegment is 0
-            AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
+            AdvanceReader((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger());
         }
 
-        private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
+        private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, BufferSegment? examinedSegment, int examinedIndex)
         {
             // Throw if examined < consumed
             if (consumedSegment != null && examinedSegment != null && BufferSegment.GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0)
@@ -433,8 +434,8 @@ namespace System.IO.Pipelines
                 ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
             }
 
-            BufferSegment returnStart = null;
-            BufferSegment returnEnd = null;
+            BufferSegment? returnStart = null;
+            BufferSegment? returnEnd = null;
 
             CompletionData completionData = default;
 
@@ -483,7 +484,7 @@ namespace System.IO.Pipelines
 
                     void MoveReturnEndToNextBlock()
                     {
-                        BufferSegment nextBlock = returnEnd.NextSegment;
+                        BufferSegment? nextBlock = returnEnd!.NextSegment;
                         if (_readTail == returnEnd)
                         {
                             _readTail = nextBlock;
@@ -538,7 +539,7 @@ namespace System.IO.Pipelines
 
                 while (returnStart != null && returnStart != returnEnd)
                 {
-                    BufferSegment next = returnStart.NextSegment;
+                    BufferSegment? next = returnStart.NextSegment;
                     returnStart.ResetMemory();
                     ReturnSegmentUnsynchronized(returnStart);
                     returnStart = next;
@@ -550,9 +551,9 @@ namespace System.IO.Pipelines
             TrySchedule(_writerScheduler, completionData);
         }
 
-        internal void CompleteReader(Exception exception)
+        internal void CompleteReader(Exception? exception)
         {
-            PipeCompletionCallbacks completionCallbacks;
+            PipeCompletionCallbacks? completionCallbacks;
             CompletionData completionData;
             bool writerCompleted;
 
@@ -585,14 +586,14 @@ namespace System.IO.Pipelines
             TrySchedule(_writerScheduler, completionData);
         }
 
-        internal void OnWriterCompleted(Action<Exception, object> callback, object state)
+        internal void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
         {
             if (callback is null)
             {
                 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
             }
 
-            PipeCompletionCallbacks completionCallbacks;
+            PipeCompletionCallbacks? completionCallbacks;
             lock (_sync)
             {
                 completionCallbacks = _writerCompletion.AddCallback(callback, state);
@@ -624,14 +625,14 @@ namespace System.IO.Pipelines
             TrySchedule(_writerScheduler, completionData);
         }
 
-        internal void OnReaderCompleted(Action<Exception, object> callback, object state)
+        internal void OnReaderCompleted(Action<Exception?, object?> callback, object? state)
         {
             if (callback is null)
             {
                 ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
             }
 
-            PipeCompletionCallbacks completionCallbacks;
+            PipeCompletionCallbacks? completionCallbacks;
             lock (_sync)
             {
                 completionCallbacks = _readerCompletion.AddCallback(callback, state);
@@ -707,7 +708,7 @@ namespace System.IO.Pipelines
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         private static void TrySchedule(PipeScheduler scheduler, in CompletionData completionData)
         {
-            Action<object> completion = completionData.Completion;
+            Action<object?> completion = completionData.Completion;
             // Nothing to do
             if (completion is null)
             {
@@ -782,7 +783,7 @@ namespace System.IO.Pipelines
                 // Return all segments
                 // if _readHead is null we need to try return _commitHead
                 // because there might be a block allocated for writing
-                BufferSegment segment = _readHead ?? _readTail;
+                BufferSegment? segment = _readHead ?? _readTail;
                 while (segment != null)
                 {
                     BufferSegment returnSegment = segment;
@@ -812,7 +813,7 @@ namespace System.IO.Pipelines
             return ValueTaskSourceStatus.Pending;
         }
 
-        internal void OnReadAsyncCompleted(Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags)
+        internal void OnReadAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
         {
             CompletionData completionData;
             bool doubleCompletion;
@@ -860,9 +861,10 @@ namespace System.IO.Pipelines
             bool isCanceled = _readerAwaitable.ObserveCancellation();
 
             // No need to read end if there is no head
-            BufferSegment head = _readHead;
+            BufferSegment? head = _readHead;
             if (head != null)
             {
+                Debug.Assert(_readTail != null);
                 // Reading commit head shared with writer
                 var readOnlySequence = new ReadOnlySequence<byte>(head, _readHeadIndex, _readTail, _readTailIndex);
                 result = new ReadResult(readOnlySequence, isCanceled, isCompleted);
@@ -975,6 +977,7 @@ namespace System.IO.Pipelines
 
         private void WriteMultiSegment(ReadOnlySpan<byte> source)
         {
+            Debug.Assert(_writingHead != null);
             Span<byte> destination = _writingHeadMemory.Span;
 
             while (true)
@@ -1004,7 +1007,7 @@ namespace System.IO.Pipelines
             }
         }
 
-        internal void OnFlushAsyncCompleted(Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags)
+        internal void OnFlushAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
         {
             CompletionData completionData;
             bool doubleCompletion;
index 18ccf01..0d0b5ee 100644 (file)
@@ -13,11 +13,11 @@ namespace System.IO.Pipelines
     internal struct PipeAwaitable
     {
         private AwaitableState _awaitableState;
-        private Action<object> _completion;
-        private object _completionState;
+        private Action<object?>? _completion;
+        private object? _completionState;
         private CancellationTokenRegistration _cancellationTokenRegistration;
-        private SynchronizationContext _synchronizationContext;
-        private ExecutionContext _executionContext;
+        private SynchronizationContext? _synchronizationContext;
+        private ExecutionContext? _executionContext;
 
 #if !netstandard
         private CancellationToken CancellationToken => _cancellationTokenRegistration.Token;
@@ -45,7 +45,7 @@ namespace System.IO.Pipelines
         public bool IsRunning => (_awaitableState & AwaitableState.Running) != 0;
 
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
-        public void BeginOperation(CancellationToken cancellationToken, Action<object> callback, object state)
+        public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state)
         {
             cancellationToken.ThrowIfCancellationRequested();
 
@@ -72,10 +72,10 @@ namespace System.IO.Pipelines
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         private void ExtractCompletion(out CompletionData completionData)
         {
-            Action<object> currentCompletion = _completion;
-            object currentState = _completionState;
-            ExecutionContext executionContext = _executionContext;
-            SynchronizationContext synchronizationContext = _synchronizationContext;
+            Action<object?>? currentCompletion = _completion;
+            object? currentState = _completionState;
+            ExecutionContext? executionContext = _executionContext;
+            SynchronizationContext? synchronizationContext = _synchronizationContext;
 
             _completion = null;
             _completionState = null;
@@ -98,7 +98,7 @@ namespace System.IO.Pipelines
             _awaitableState &= ~AwaitableState.Completed;
         }
 
-        public void OnCompleted(Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion)
+        public void OnCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion)
         {
             completionData = default;
             doubleCompletion = !ReferenceEquals(_completion, null);
@@ -116,7 +116,7 @@ namespace System.IO.Pipelines
             if ((_awaitableState & AwaitableState.UseSynchronizationContext) != 0 &&
                 (flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
             {
-                SynchronizationContext sc = SynchronizationContext.Current;
+                SynchronizationContext? sc = SynchronizationContext.Current;
                 if (sc != null && sc.GetType() != typeof(SynchronizationContext))
                 {
                     _synchronizationContext = sc;
index ea3dfbe..3baff04 100644 (file)
@@ -17,17 +17,17 @@ namespace System.IO.Pipelines
         private const int InitialCallbacksSize = 1;
 
         private bool _isCompleted;
-        private ExceptionDispatchInfo _exceptionInfo;
+        private ExceptionDispatchInfo? _exceptionInfo;
 
         private PipeCompletionCallback _firstCallback;
-        private PipeCompletionCallback[] _callbacks;
+        private PipeCompletionCallback[]? _callbacks;
         private int _callbackCount;
 
         public bool IsCompleted => _isCompleted;
 
         public bool IsFaulted => _exceptionInfo != null;
 
-        public PipeCompletionCallbacks TryComplete(Exception exception = null)
+        public PipeCompletionCallbacks? TryComplete(Exception? exception = null)
         {
             if (!_isCompleted)
             {
@@ -40,7 +40,7 @@ namespace System.IO.Pipelines
             return GetCallbacks();
         }
 
-        public PipeCompletionCallbacks AddCallback(Action<Exception, object> callback, object state)
+        public PipeCompletionCallbacks? AddCallback(Action<Exception?, object?> callback, object? state)
         {
             if (_callbackCount == 0)
             {
@@ -54,6 +54,7 @@ namespace System.IO.Pipelines
                 // -1 to adjust for _firstCallback
                 var callbackIndex = _callbackCount - 1;
                 _callbackCount++;
+                Debug.Assert(_callbacks != null);
                 _callbacks[callbackIndex] = new PipeCompletionCallback(callback, state);
             }
 
@@ -99,7 +100,7 @@ namespace System.IO.Pipelines
             return true;
         }
 
-        private PipeCompletionCallbacks GetCallbacks()
+        private PipeCompletionCallbacks? GetCallbacks()
         {
             Debug.Assert(IsCompleted);
             if (_callbackCount == 0)
@@ -130,6 +131,7 @@ namespace System.IO.Pipelines
         [MethodImpl(MethodImplOptions.NoInlining)]
         private void ThrowLatchedException()
         {
+            Debug.Assert(_exceptionInfo != null);
             _exceptionInfo.Throw();
         }
 
index 47c7d7d..6c0da43 100644 (file)
@@ -6,10 +6,10 @@ namespace System.IO.Pipelines
 {
     internal struct PipeCompletionCallback
     {
-        public Action<Exception, object> Callback;
-        public object State;
+        public Action<Exception?, object?> Callback;
+        public object? State;
 
-        public PipeCompletionCallback(Action<Exception, object> callback, object state)
+        public PipeCompletionCallback(Action<Exception?, object?> callback, object? state)
         {
             Callback = callback;
             State = state;
index 25ac895..d6638f6 100644 (file)
@@ -11,11 +11,11 @@ namespace System.IO.Pipelines
     {
         private readonly ArrayPool<PipeCompletionCallback> _pool;
         private readonly int _count;
-        private readonly Exception _exception;
+        private readonly Exception? _exception;
         private readonly PipeCompletionCallback _firstCallback;
-        private readonly PipeCompletionCallback[] _callbacks;
+        private readonly PipeCompletionCallback[]? _callbacks;
 
-        public PipeCompletionCallbacks(ArrayPool<PipeCompletionCallback> pool, int count, Exception exception, PipeCompletionCallback firstCallback, PipeCompletionCallback[] callbacks)
+        public PipeCompletionCallbacks(ArrayPool<PipeCompletionCallback> pool, int count, Exception? exception, PipeCompletionCallback firstCallback, PipeCompletionCallback[]? callbacks)
         {
             _pool = pool;
             _count = count;
@@ -31,7 +31,7 @@ namespace System.IO.Pipelines
                 return;
             }
 
-            List<Exception> exceptions = null;
+            List<Exception>? exceptions = null;
 
             Execute(_firstCallback, ref exceptions);
 
@@ -57,7 +57,7 @@ namespace System.IO.Pipelines
             }
         }
 
-        private void Execute(PipeCompletionCallback callback, ref List<Exception> exceptions)
+        private void Execute(PipeCompletionCallback callback, ref List<Exception>? exceptions)
         {
             try
             {
index 1a1cf03..b6d5cdc 100644 (file)
@@ -27,9 +27,9 @@ namespace System.IO.Pipelines
         /// Creates a new instance of <see cref="PipeOptions"/>
         /// </summary>
         public PipeOptions(
-            MemoryPool<byte> pool = null,
-            PipeScheduler readerScheduler = null,
-            PipeScheduler writerScheduler = null,
+            MemoryPool<byte>? pool = null,
+            PipeScheduler? readerScheduler = null,
+            PipeScheduler? writerScheduler = null,
             long pauseWriterThreshold = -1,
             long resumeWriterThreshold = -1,
             int minimumSegmentSize = -1,
index 6dd5be2..40df398 100644 (file)
@@ -13,7 +13,7 @@ namespace System.IO.Pipelines
     /// </summary>
     public abstract partial class PipeReader
     {
-        private PipeReaderStream _stream;
+        private PipeReaderStream? _stream;
 
         /// <summary>
         /// Attempt to synchronously read data the <see cref="PipeReader"/>.
@@ -78,13 +78,13 @@ namespace System.IO.Pipelines
         /// Marks the <see cref="PipeReader"/> as being complete, meaning no more data will be read from it.
         /// </summary>
         /// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the reader to complete.</param>
-        public abstract void Complete(Exception exception = null);
+        public abstract void Complete(Exception? exception = null);
 
         /// <summary>
         /// Marks the <see cref="PipeReader"/> as being complete, meaning no more data will be read from it.
         /// </summary>
         /// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the reader to complete.</param>
-        public virtual ValueTask CompleteAsync(Exception exception = null)
+        public virtual ValueTask CompleteAsync(Exception? exception = null)
         {
             try
             {
@@ -101,7 +101,7 @@ namespace System.IO.Pipelines
         /// Registers a callback that gets executed when the <see cref="PipeWriter"/> side of the pipe is completed
         /// </summary>
         [Obsolete("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
-        public virtual void OnWriterCompleted(Action<Exception, object> callback, object state)
+        public virtual void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
         {
 
         }
@@ -113,7 +113,7 @@ namespace System.IO.Pipelines
         /// <param name="stream">The stream.</param>
         /// <param name="readerOptions">The options.</param>
         /// <returns>A <see cref="PipeReader"/> that wraps the <see cref="Stream"/>.</returns>
-        public static PipeReader Create(Stream stream, StreamPipeReaderOptions readerOptions = null)
+        public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOptions = null)
         {
             return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default);
         }
index 6c5aaac..ab1025f 100644 (file)
@@ -57,7 +57,7 @@ namespace System.IO.Pipelines
 
         public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
 
-        public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+        public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
             TaskToApm.Begin(ReadAsync(buffer, offset, count, default), callback, state);
 
         public sealed override int EndRead(IAsyncResult asyncResult) =>
index f5217a5..ff0b1c9 100644 (file)
@@ -27,9 +27,9 @@ namespace System.IO.Pipelines
         /// <summary>
         /// Requests <paramref name="action"/> to be run on scheduler with <paramref name="state"/> being passed in
         /// </summary>
-        public abstract void Schedule(Action<object> action, object state);
+        public abstract void Schedule(Action<object?> action, object? state);
 
-        internal virtual void UnsafeSchedule(Action<object> action, object state)
+        internal virtual void UnsafeSchedule(Action<object?> action, object? state)
             => Schedule(action, state);
     }
 }
index 960e257..427bd2e 100644 (file)
@@ -13,19 +13,19 @@ namespace System.IO.Pipelines
     /// </summary>
     public abstract partial class PipeWriter : IBufferWriter<byte>
     {
-        private PipeWriterStream _stream;
+        private PipeWriterStream? _stream;
 
         /// <summary>
         /// Marks the <see cref="PipeWriter"/> as being complete, meaning no more data will be written to it.
         /// </summary>
         /// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the pipeline to complete.</param>
-        public abstract void Complete(Exception exception = null);
+        public abstract void Complete(Exception? exception = null);
 
         /// <summary>
         /// Marks the <see cref="PipeWriter"/> as being complete, meaning no more data will be written to it.
         /// </summary>
         /// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the pipeline to complete.</param>
-        public virtual ValueTask CompleteAsync(Exception exception = null)
+        public virtual ValueTask CompleteAsync(Exception? exception = null)
         {
             try
             {
@@ -47,7 +47,7 @@ namespace System.IO.Pipelines
         /// Registers a callback that gets executed when the <see cref="PipeReader"/> side of the pipe is completed
         /// </summary>
         [Obsolete("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
-        public virtual void OnReaderCompleted(Action<Exception, object> callback, object state)
+        public virtual void OnReaderCompleted(Action<Exception?, object?> callback, object? state)
         {
 
         }
@@ -91,7 +91,7 @@ namespace System.IO.Pipelines
         /// <param name="stream">The stream.</param>
         /// <param name="writerOptions">The options.</param>
         /// <returns>A <see cref="PipeWriter"/> that wraps the <see cref="Stream"/>.</returns>
-        public static PipeWriter Create(Stream stream, StreamPipeWriterOptions writerOptions = null)
+        public static PipeWriter Create(Stream stream, StreamPipeWriterOptions? writerOptions = null)
         {
             return new StreamPipeWriter(stream, writerOptions ?? StreamPipeWriterOptions.s_default);
         }
index 7fe15e7..56a3126 100644 (file)
@@ -51,7 +51,7 @@ namespace System.IO.Pipelines
 
         public override void SetLength(long value) => throw new NotSupportedException();
 
-        public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+        public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
             TaskToApm.Begin(WriteAsync(buffer, offset, count, default), callback, state);
 
         public sealed override void EndWrite(IAsyncResult asyncResult) =>
index 63c158c..cf5cc25 100644 (file)
@@ -16,16 +16,16 @@ namespace System.IO.Pipelines
 
         private readonly int _bufferSize;
         private readonly int _minimumReadThreshold;
-        private readonly MemoryPool<byte> _pool;
+        private readonly MemoryPool<byte>? _pool;
 
-        private CancellationTokenSource _internalTokenSource;
+        private CancellationTokenSource? _internalTokenSource;
         private bool _isReaderCompleted;
         private bool _isStreamCompleted;
 
-        private BufferSegment _readHead;
+        private BufferSegment? _readHead;
         private int _readIndex;
 
-        private BufferSegment _readTail;
+        private BufferSegment? _readTail;
         private long _bufferedBytes;
         private bool _examinedEverything;
         private readonly object _lock = new object();
@@ -86,10 +86,10 @@ namespace System.IO.Pipelines
         {
             ThrowIfCompleted();
 
-            AdvanceTo((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
+            AdvanceTo((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger());
         }
 
-        private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
+        private void AdvanceTo(BufferSegment? consumedSegment, int consumedIndex, BufferSegment? examinedSegment, int examinedIndex)
         {
             if (consumedSegment == null || examinedSegment == null)
             {
@@ -102,7 +102,7 @@ namespace System.IO.Pipelines
             }
 
             BufferSegment returnStart = _readHead;
-            BufferSegment returnEnd = consumedSegment;
+            BufferSegment? returnEnd = consumedSegment;
 
             long consumedBytes = BufferSegment.GetLength(returnStart, _readIndex, consumedSegment, consumedIndex);
 
@@ -135,7 +135,7 @@ namespace System.IO.Pipelines
             }
             else if (consumedIndex == returnEnd.Length)
             {
-                BufferSegment nextBlock = returnEnd.NextSegment;
+                BufferSegment? nextBlock = returnEnd.NextSegment;
                 _readHead = nextBlock;
                 _readIndex = 0;
                 returnEnd = nextBlock;
@@ -149,7 +149,7 @@ namespace System.IO.Pipelines
             // Remove all blocks that are freed (except the last one)
             while (returnStart != returnEnd)
             {
-                BufferSegment next = returnStart.NextSegment;
+                BufferSegment next = returnStart.NextSegment!;
                 returnStart.ResetMemory();
                 ReturnSegmentUnsynchronized(returnStart);
                 returnStart = next;
@@ -163,7 +163,7 @@ namespace System.IO.Pipelines
         }
 
         /// <inheritdoc />
-        public override void Complete(Exception exception = null)
+        public override void Complete(Exception? exception = null)
         {
             if (_isReaderCompleted)
             {
@@ -172,7 +172,7 @@ namespace System.IO.Pipelines
 
             _isReaderCompleted = true;
 
-            BufferSegment segment = _readHead;
+            BufferSegment? segment = _readHead;
             while (segment != null)
             {
                 BufferSegment returnSegment = segment;
@@ -208,7 +208,7 @@ namespace System.IO.Pipelines
             var reg = new CancellationTokenRegistration();
             if (cancellationToken.CanBeCanceled)
             {
-                reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
+                reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), this);
             }
 
             using (reg)
@@ -218,7 +218,7 @@ namespace System.IO.Pipelines
                 {
                     AllocateReadTail();
 
-                    Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);
+                    Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
 
                     int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
 
@@ -297,6 +297,7 @@ namespace System.IO.Pipelines
 
         private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
         {
+            Debug.Assert(_readHead != null &&_readTail != null);
             return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
         }
 
@@ -308,11 +309,15 @@ namespace System.IO.Pipelines
                 _readHead = AllocateSegment();
                 _readTail = _readHead;
             }
-            else if (_readTail.WritableBytes < _minimumReadThreshold)
+            else
             {
-                BufferSegment nextSegment = AllocateSegment();
-                _readTail.SetNext(nextSegment);
-                _readTail = nextSegment;
+                Debug.Assert(_readTail != null);
+                if (_readTail.WritableBytes < _minimumReadThreshold)
+                {
+                    BufferSegment nextSegment = AllocateSegment();
+                    _readTail.SetNext(nextSegment);
+                    _readTail = nextSegment;
+                }
             }
         }
 
@@ -334,7 +339,7 @@ namespace System.IO.Pipelines
 
         private BufferSegment CreateSegmentUnsynchronized()
         {
-            if (_bufferSegmentPool.TryPop(out BufferSegment segment))
+            if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
             {
                 return segment;
             }
index e554c55..7112c96 100644 (file)
@@ -19,7 +19,7 @@ namespace System.IO.Pipelines
         /// <summary>
         /// Creates a new instance of <see cref="StreamPipeReaderOptions"/>.
         /// </summary>
-        public StreamPipeReaderOptions(MemoryPool<byte> pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
+        public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
         {
             Pool = pool ?? MemoryPool<byte>.Shared;
 
index 8af1c16..dcd2aa0 100644 (file)
@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Buffers;
+using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -15,15 +16,15 @@ namespace System.IO.Pipelines
 
         private readonly int _minimumBufferSize;
 
-        private BufferSegment _head;
-        private BufferSegment _tail;
+        private BufferSegment? _head;
+        private BufferSegment? _tail;
         private Memory<byte> _tailMemory;
         private int _tailBytesBuffered;
         private int _bytesBuffered;
 
-        private readonly MemoryPool<byte> _pool;
+        private readonly MemoryPool<byte>? _pool;
 
-        private CancellationTokenSource _internalTokenSource;
+        private CancellationTokenSource? _internalTokenSource;
         private bool _isCompleted;
         private readonly object _lockObject = new object();
 
@@ -127,6 +128,7 @@ namespace System.IO.Pipelines
             }
             else
             {
+                Debug.Assert(_tail != null);
                 int bytesLeftInBuffer = _tailMemory.Length;
 
                 if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
@@ -178,7 +180,7 @@ namespace System.IO.Pipelines
 
         private BufferSegment CreateSegmentUnsynchronized()
         {
-            if (_bufferSegmentPool.TryPop(out BufferSegment segment))
+            if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
             {
                 return segment;
             }
@@ -201,7 +203,7 @@ namespace System.IO.Pipelines
         }
 
         /// <inheritdoc />
-        public override void Complete(Exception exception = null)
+        public override void Complete(Exception? exception = null)
         {
             if (_isCompleted)
             {
@@ -220,7 +222,7 @@ namespace System.IO.Pipelines
             }
         }
 
-        public override async ValueTask CompleteAsync(Exception exception = null)
+        public override async ValueTask CompleteAsync(Exception? exception = null)
         {
             if (_isCompleted)
             {
@@ -266,11 +268,13 @@ namespace System.IO.Pipelines
             var reg = new CancellationTokenRegistration();
             if (cancellationToken.CanBeCanceled)
             {
-                reg = cancellationToken.UnsafeRegister(state => ((StreamPipeWriter)state).Cancel(), this);
+                reg = cancellationToken.UnsafeRegister(state => ((StreamPipeWriter)state!).Cancel(), this);
             }
 
             if (_tailBytesBuffered > 0)
             {
+                Debug.Assert(_tail != null);
+
                 // Update any buffered data
                 _tail.End += _tailBytesBuffered;
                 _tailBytesBuffered = 0;
@@ -281,7 +285,7 @@ namespace System.IO.Pipelines
                 CancellationToken localToken = InternalTokenSource.Token;
                 try
                 {
-                    BufferSegment segment = _head;
+                    BufferSegment? segment = _head;
                     while (segment != null)
                     {
                         BufferSegment returnSegment = segment;
@@ -337,12 +341,14 @@ namespace System.IO.Pipelines
             // and flush the result.
             if (_tailBytesBuffered > 0)
             {
+                Debug.Assert(_tail != null);
+
                 // Update any buffered data
                 _tail.End += _tailBytesBuffered;
                 _tailBytesBuffered = 0;
             }
 
-            BufferSegment segment = _head;
+            BufferSegment? segment = _head;
             while (segment != null)
             {
                 BufferSegment returnSegment = segment;
index 347b009..ffe6674 100644 (file)
@@ -18,7 +18,7 @@ namespace System.IO.Pipelines
         /// <summary>
         /// Creates a new instance of <see cref="StreamPipeWriterOptions"/>.
         /// </summary>
-        public StreamPipeWriterOptions(MemoryPool<byte> pool = null, int minimumBufferSize = -1, bool leaveOpen = false)
+        public StreamPipeWriterOptions(MemoryPool<byte>? pool = null, int minimumBufferSize = -1, bool leaveOpen = false)
         {
             Pool = pool ?? MemoryPool<byte>.Shared;
 
index 9466757..4c424f1 100644 (file)
@@ -9,12 +9,12 @@ namespace System.IO.Pipelines
 {
     internal sealed class ThreadPoolScheduler : PipeScheduler
     {
-        public override void Schedule(Action<object> action, object state)
+        public override void Schedule(Action<object?> action, object? state)
         {
             System.Threading.ThreadPool.QueueUserWorkItem(action, state, preferLocal: false);
         }
 
-        internal override void UnsafeSchedule(Action<object> action, object state)
+        internal override void UnsafeSchedule(Action<object?> action, object? state)
         {
             System.Threading.ThreadPool.UnsafeQueueUserWorkItem(action, state, preferLocal: false);
         }
index 3617bba..6a124e5 100644 (file)
@@ -3,68 +3,84 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Runtime.CompilerServices;
+using System.Diagnostics.CodeAnalysis;
 
 namespace System.IO.Pipelines
 {
     internal static class ThrowHelper
     {
+        [DoesNotReturn]
         internal static void ThrowArgumentOutOfRangeException(ExceptionArgument argument) => throw CreateArgumentOutOfRangeException(argument);
         [MethodImpl(MethodImplOptions.NoInlining)]
         private static Exception CreateArgumentOutOfRangeException(ExceptionArgument argument) => new ArgumentOutOfRangeException(argument.ToString());
 
+        [DoesNotReturn]
         internal static void ThrowArgumentNullException(ExceptionArgument argument) => throw CreateArgumentNullException(argument);
         [MethodImpl(MethodImplOptions.NoInlining)]
         private static Exception CreateArgumentNullException(ExceptionArgument argument) => new ArgumentNullException(argument.ToString());
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_AlreadyReading() => throw CreateInvalidOperationException_AlreadyReading();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_AlreadyReading() => new InvalidOperationException(SR.ReadingIsInProgress);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_NoReadToComplete() => throw CreateInvalidOperationException_NoReadToComplete();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_NoReadToComplete() => new InvalidOperationException(SR.NoReadingOperationToComplete);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_NoConcurrentOperation() => throw CreateInvalidOperationException_NoConcurrentOperation();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_NoConcurrentOperation() => new InvalidOperationException(SR.ConcurrentOperationsNotSupported);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_GetResultNotCompleted() => throw CreateInvalidOperationException_GetResultNotCompleted();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_GetResultNotCompleted() => new InvalidOperationException(SR.GetResultBeforeCompleted);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_NoWritingAllowed() => throw CreateInvalidOperationException_NoWritingAllowed();
 
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_NoWritingAllowed() => new InvalidOperationException(SR.WritingAfterCompleted);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_NoReadingAllowed() => throw CreateInvalidOperationException_NoReadingAllowed();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_NoReadingAllowed() => new InvalidOperationException(SR.ReadingAfterCompleted);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_InvalidExaminedPosition() => throw CreateInvalidOperationException_InvalidExaminedPosition();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_InvalidExaminedPosition() => new InvalidOperationException(SR.InvalidExaminedPosition);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition() => throw CreateInvalidOperationException_InvalidExaminedOrConsumedPosition();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_InvalidExaminedOrConsumedPosition() => new InvalidOperationException(SR.InvalidExaminedOrConsumedPosition);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_AdvanceToInvalidCursor() => throw CreateInvalidOperationException_AdvanceToInvalidCursor();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_AdvanceToInvalidCursor() => new InvalidOperationException(SR.AdvanceToInvalidCursor);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_ResetIncompleteReaderWriter() => throw CreateInvalidOperationException_ResetIncompleteReaderWriter();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_ResetIncompleteReaderWriter() => new InvalidOperationException(SR.ReaderAndWriterHasToBeCompleted);
 
+        [DoesNotReturn]
         public static void ThrowOperationCanceledException_ReadCanceled() => throw CreateOperationCanceledException_ReadCanceled();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateOperationCanceledException_ReadCanceled() => new OperationCanceledException(SR.ReadCanceledOnPipeReader);
 
+        [DoesNotReturn]
         public static void ThrowOperationCanceledException_FlushCanceled() => throw CreateOperationCanceledException_FlushCanceled();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateOperationCanceledException_FlushCanceled() => new OperationCanceledException(SR.FlushCanceledOnPipeWriter);
 
+        [DoesNotReturn]
         public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);