}
public partial class PipeOptions
{
- public PipeOptions(System.Buffers.MemoryPool<byte> pool = null, System.IO.Pipelines.PipeScheduler readerScheduler = null, System.IO.Pipelines.PipeScheduler writerScheduler = null, long pauseWriterThreshold = (long)-1, long resumeWriterThreshold = (long)-1, int minimumSegmentSize = -1, bool useSynchronizationContext = true) { }
+ public PipeOptions(System.Buffers.MemoryPool<byte>? pool = null, System.IO.Pipelines.PipeScheduler? readerScheduler = null, System.IO.Pipelines.PipeScheduler? writerScheduler = null, long pauseWriterThreshold = (long)-1, long resumeWriterThreshold = (long)-1, int minimumSegmentSize = -1, bool useSynchronizationContext = true) { }
public static System.IO.Pipelines.PipeOptions Default { get { throw null; } }
public int MinimumSegmentSize { get { throw null; } }
public long PauseWriterThreshold { get { throw null; } }
public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
public virtual System.IO.Stream AsStream(bool leaveOpen = false) { throw null; }
public abstract void CancelPendingRead();
- public abstract void Complete(System.Exception exception = null);
- public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception exception = null) { throw null; }
+ public abstract void Complete(System.Exception? exception = null);
+ public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
- public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions readerOptions = null) { throw null; }
+ public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
[System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
- public virtual void OnWriterCompleted(System.Action<System.Exception, object> callback, object state) { }
+ public virtual void OnWriterCompleted(System.Action<System.Exception?, object> callback, object state) { }
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public abstract bool TryRead(out System.IO.Pipelines.ReadResult result);
}
protected PipeScheduler() { }
public static System.IO.Pipelines.PipeScheduler Inline { get { throw null; } }
public static System.IO.Pipelines.PipeScheduler ThreadPool { get { throw null; } }
- public abstract void Schedule(System.Action<object> action, object state);
+ public abstract void Schedule(System.Action<object?> action, object? state);
}
public abstract partial class PipeWriter : System.Buffers.IBufferWriter<byte>
{
public abstract void Advance(int bytes);
public virtual System.IO.Stream AsStream(bool leaveOpen = false) { throw null; }
public abstract void CancelPendingFlush();
- public abstract void Complete(System.Exception exception = null);
- public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception exception = null) { throw null; }
+ public abstract void Complete(System.Exception? exception = null);
+ public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
- public static System.IO.Pipelines.PipeWriter Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeWriterOptions writerOptions = null) { throw null; }
+ public static System.IO.Pipelines.PipeWriter Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeWriterOptions? writerOptions = null) { throw null; }
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public abstract System.Memory<byte> GetMemory(int sizeHint = 0);
public abstract System.Span<byte> GetSpan(int sizeHint = 0);
[System.ObsoleteAttribute("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
- public virtual void OnReaderCompleted(System.Action<System.Exception, object> callback, object state) { }
+ public virtual void OnReaderCompleted(System.Action<System.Exception?, object> callback, object state) { }
public virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public readonly partial struct ReadResult
}
public partial class StreamPipeReaderOptions
{
- public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte> pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
+ public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
public int BufferSize { get { throw null; } }
public bool LeaveOpen { get { throw null; } }
public int MinimumReadSize { get { throw null; } }
}
public partial class StreamPipeWriterOptions
{
- public StreamPipeWriterOptions(System.Buffers.MemoryPool<byte> pool = null, int minimumBufferSize = -1, bool leaveOpen = false) { }
+ public StreamPipeWriterOptions(System.Buffers.MemoryPool<byte>? pool = null, int minimumBufferSize = -1, bool leaveOpen = false) { }
public bool LeaveOpen { get { throw null; } }
public int MinimumBufferSize { get { throw null; } }
public System.Buffers.MemoryPool<byte> Pool { get { throw null; } }
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Configurations>netstandard-Debug;netstandard-Release</Configurations>
+ <Nullable>enable</Nullable>
<!-- We only plan to use this ref in netcoreapp. For all other netstandard compatible frameworks we should use the lib
asset instead. -->
<PackageTargetFramework Condition="'$(TargetGroup)' == 'netstandard'">netcoreapp2.0</PackageTargetFramework>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netcoreapp3.0-Debug;netcoreapp3.0-Release;netstandard-Debug;netstandard-Release</Configurations>
+ <DefineConstants Condition="'$(TargetGroup)'=='netstandard'">$(DefineConstants);INTERNAL_NULLABLE_ATTRIBUTES</DefineConstants>
+ <Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<Compile Include="$(CommonPath)\CoreLib\System\Threading\Tasks\TaskToApm.cs">
<Compile Include="System\IO\Pipelines\StreamExtensions.netstandard.cs" />
<Compile Include="System\IO\Pipelines\ThreadPoolScheduler.netstandard.cs" />
<Compile Include="System\IO\Pipelines\CancellationTokenExtensions.netstandard.cs" />
+ <Compile Include="$(CommonPath)\CoreLib\System\Diagnostics\CodeAnalysis\NullableAttributes.cs" Link="System\Diagnostics\CodeAnalysis\NullableAttributes.cs" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.Buffers" />
{
internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
{
- private object _memoryOwner;
- private BufferSegment _next;
+ private object? _memoryOwner;
+ private BufferSegment? _next;
private int _end;
/// <summary>
/// working memory. The "active" memory is grown when bytes are copied in, End is increased, and Next is assigned. The "active"
/// memory is shrunk when bytes are consumed, Start is increased, and blocks are returned to the pool.
/// </summary>
- public BufferSegment NextSegment
+ public BufferSegment? NextSegment
{
get => _next;
set
}
else
{
+ Debug.Assert(_memoryOwner is byte[]);
byte[] poolArray = (byte[])_memoryOwner;
ArrayPool<byte>.Shared.Return(poolArray);
}
}
// Exposed for testing
- internal object MemoryOwner => _memoryOwner;
+ internal object? MemoryOwner => _memoryOwner;
public Memory<byte> AvailableMemory { get; private set; }
while (segment.Next != null)
{
+ Debug.Assert(segment.NextSegment != null);
segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length;
segment = segment.NextSegment;
}
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
+using System.Diagnostics.CodeAnalysis;
namespace System.IO.Pipelines
{
public int Count => _size;
- public bool TryPop(out BufferSegment result)
+ public bool TryPop([NotNullWhen(true)] out BufferSegment? result)
{
int size = _size - 1;
SegmentAsValueType[] array = _array;
{
internal readonly struct CompletionData
{
- public Action<object> Completion { get; }
- public object CompletionState { get; }
- public ExecutionContext ExecutionContext { get; }
- public SynchronizationContext SynchronizationContext { get; }
+ public Action<object?> Completion { get; }
+ public object? CompletionState { get; }
+ public ExecutionContext? ExecutionContext { get; }
+ public SynchronizationContext? SynchronizationContext { get; }
- public CompletionData(Action<object> completion, object completionState, ExecutionContext executionContext, SynchronizationContext synchronizationContext)
+ public CompletionData(Action<object?> completion, object? completionState, ExecutionContext? executionContext, SynchronizationContext? synchronizationContext)
{
Completion = completion;
CompletionState = completionState;
{
internal sealed class InlineScheduler : PipeScheduler
{
- public override void Schedule(Action<object> action, object state)
+ public override void Schedule(Action<object?> action, object? state)
{
action(state);
}
- internal override void UnsafeSchedule(Action<object> action, object state)
+ internal override void UnsafeSchedule(Action<object?> action, object? state)
{
action(state);
}
public override void CancelPendingRead() => _pipe.CancelPendingRead();
- public override void Complete(Exception exception = null) => _pipe.CompleteReader(exception);
+ public override void Complete(Exception? exception = null) => _pipe.CompleteReader(exception);
#pragma warning disable CS0672 // Member overrides obsolete member
- public override void OnWriterCompleted(Action<Exception, object> callback, object state) => _pipe.OnWriterCompleted(callback, state);
+ public override void OnWriterCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnWriterCompleted(callback, state);
#pragma warning restore CS0672 // Member overrides obsolete member
public ValueTaskSourceStatus GetStatus(short token) => _pipe.GetReadAsyncStatus();
public ReadResult GetResult(short token) => _pipe.GetReadAsyncResult();
- public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnReadAsyncCompleted(continuation, state, flags);
+ public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnReadAsyncCompleted(continuation, state, flags);
}
}
}
_pipe = pipe;
}
- public override void Complete(Exception exception = null) => _pipe.CompleteWriter(exception);
+ public override void Complete(Exception? exception = null) => _pipe.CompleteWriter(exception);
public override void CancelPendingFlush() => _pipe.CancelPendingFlush();
#pragma warning disable CS0672 // Member overrides obsolete member
- public override void OnReaderCompleted(Action<Exception, object> callback, object state) => _pipe.OnReaderCompleted(callback, state);
+ public override void OnReaderCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnReaderCompleted(callback, state);
#pragma warning restore CS0672 // Member overrides obsolete member
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => _pipe.FlushAsync(cancellationToken);
public FlushResult GetResult(short token) => _pipe.GetFlushAsyncResult();
- public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnFlushAsyncCompleted(continuation, state, flags);
+ public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnFlushAsyncCompleted(continuation, state, flags);
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
internal const int InitialSegmentPoolSize = 16; // 65K
internal const int MaxSegmentPoolSize = 256; // 1MB
- private static readonly Action<object> s_signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested();
- private static readonly Action<object> s_signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested();
- private static readonly Action<object> s_invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state).Execute();
+ private static readonly Action<object?> s_signalReaderAwaitable = state => ((Pipe)state!).ReaderCancellationRequested();
+ private static readonly Action<object?> s_signalWriterAwaitable = state => ((Pipe)state!).WriterCancellationRequested();
+ private static readonly Action<object?> s_invokeCompletionCallbacks = state => ((PipeCompletionCallbacks)state!).Execute();
// These callbacks all point to the same methods but are different delegate types
- private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext;
- private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext;
- private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext;
- private static readonly Action<object> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext;
+ private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext!;
+ private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext!;
+ private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext!;
+ private static readonly Action<object?> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext!;
// This sync objects protects the shared state between the writer and reader (most of this class)
private readonly object _sync = new object();
- private readonly MemoryPool<byte> _pool;
+ private readonly MemoryPool<byte>? _pool;
private readonly int _minimumSegmentSize;
private readonly long _pauseWriterThreshold;
private readonly long _resumeWriterThreshold;
private long _lastExaminedIndex = -1;
// The read head which is the start of the PipeReader's consumed bytes
- private BufferSegment _readHead;
+ private BufferSegment? _readHead;
private int _readHeadIndex;
// The extent of the bytes available to the PipeReader to consume
- private BufferSegment _readTail;
+ private BufferSegment? _readTail;
private int _readTailIndex;
// The write head which is the extent of the PipeWriter's written bytes
- private BufferSegment _writingHead;
+ private BufferSegment? _writingHead;
private Memory<byte> _writingHeadMemory;
private int _writingHeadBytesBuffered;
private BufferSegment CreateSegmentUnsynchronized()
{
- if (_bufferSegmentPool.TryPop(out BufferSegment segment))
+ if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
{
return segment;
}
}
// Update the writing head
+ Debug.Assert(_writingHead != null);
_writingHead.End += _writingHeadBytesBuffered;
// Always move the read tail to the write head
Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);
}
- internal void CompleteWriter(Exception exception)
+ internal void CompleteWriter(Exception? exception)
{
CompletionData completionData;
- PipeCompletionCallbacks completionCallbacks;
+ PipeCompletionCallbacks? completionCallbacks;
bool readerCompleted;
lock (_sync)
// TODO: Use new SequenceMarshal.TryGetReadOnlySequenceSegment to get the correct data
// directly casting only works because the type value in ReadOnlySequenceSegment is 0
- AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
+ AdvanceReader((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger());
}
- private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
+ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, BufferSegment? examinedSegment, int examinedIndex)
{
// Throw if examined < consumed
if (consumedSegment != null && examinedSegment != null && BufferSegment.GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0)
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
}
- BufferSegment returnStart = null;
- BufferSegment returnEnd = null;
+ BufferSegment? returnStart = null;
+ BufferSegment? returnEnd = null;
CompletionData completionData = default;
void MoveReturnEndToNextBlock()
{
- BufferSegment nextBlock = returnEnd.NextSegment;
+ BufferSegment? nextBlock = returnEnd!.NextSegment;
if (_readTail == returnEnd)
{
_readTail = nextBlock;
while (returnStart != null && returnStart != returnEnd)
{
- BufferSegment next = returnStart.NextSegment;
+ BufferSegment? next = returnStart.NextSegment;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
TrySchedule(_writerScheduler, completionData);
}
- internal void CompleteReader(Exception exception)
+ internal void CompleteReader(Exception? exception)
{
- PipeCompletionCallbacks completionCallbacks;
+ PipeCompletionCallbacks? completionCallbacks;
CompletionData completionData;
bool writerCompleted;
TrySchedule(_writerScheduler, completionData);
}
- internal void OnWriterCompleted(Action<Exception, object> callback, object state)
+ internal void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
{
if (callback is null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
}
- PipeCompletionCallbacks completionCallbacks;
+ PipeCompletionCallbacks? completionCallbacks;
lock (_sync)
{
completionCallbacks = _writerCompletion.AddCallback(callback, state);
TrySchedule(_writerScheduler, completionData);
}
- internal void OnReaderCompleted(Action<Exception, object> callback, object state)
+ internal void OnReaderCompleted(Action<Exception?, object?> callback, object? state)
{
if (callback is null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
}
- PipeCompletionCallbacks completionCallbacks;
+ PipeCompletionCallbacks? completionCallbacks;
lock (_sync)
{
completionCallbacks = _readerCompletion.AddCallback(callback, state);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void TrySchedule(PipeScheduler scheduler, in CompletionData completionData)
{
- Action<object> completion = completionData.Completion;
+ Action<object?> completion = completionData.Completion;
// Nothing to do
if (completion is null)
{
// Return all segments
// if _readHead is null we need to try return _commitHead
// because there might be a block allocated for writing
- BufferSegment segment = _readHead ?? _readTail;
+ BufferSegment? segment = _readHead ?? _readTail;
while (segment != null)
{
BufferSegment returnSegment = segment;
return ValueTaskSourceStatus.Pending;
}
- internal void OnReadAsyncCompleted(Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags)
+ internal void OnReadAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
{
CompletionData completionData;
bool doubleCompletion;
bool isCanceled = _readerAwaitable.ObserveCancellation();
// No need to read end if there is no head
- BufferSegment head = _readHead;
+ BufferSegment? head = _readHead;
if (head != null)
{
+ Debug.Assert(_readTail != null);
// Reading commit head shared with writer
var readOnlySequence = new ReadOnlySequence<byte>(head, _readHeadIndex, _readTail, _readTailIndex);
result = new ReadResult(readOnlySequence, isCanceled, isCompleted);
private void WriteMultiSegment(ReadOnlySpan<byte> source)
{
+ Debug.Assert(_writingHead != null);
Span<byte> destination = _writingHeadMemory.Span;
while (true)
}
}
- internal void OnFlushAsyncCompleted(Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags)
+ internal void OnFlushAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
{
CompletionData completionData;
bool doubleCompletion;
internal struct PipeAwaitable
{
private AwaitableState _awaitableState;
- private Action<object> _completion;
- private object _completionState;
+ private Action<object?>? _completion;
+ private object? _completionState;
private CancellationTokenRegistration _cancellationTokenRegistration;
- private SynchronizationContext _synchronizationContext;
- private ExecutionContext _executionContext;
+ private SynchronizationContext? _synchronizationContext;
+ private ExecutionContext? _executionContext;
#if !netstandard
private CancellationToken CancellationToken => _cancellationTokenRegistration.Token;
public bool IsRunning => (_awaitableState & AwaitableState.Running) != 0;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void BeginOperation(CancellationToken cancellationToken, Action<object> callback, object state)
+ public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state)
{
cancellationToken.ThrowIfCancellationRequested();
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ExtractCompletion(out CompletionData completionData)
{
- Action<object> currentCompletion = _completion;
- object currentState = _completionState;
- ExecutionContext executionContext = _executionContext;
- SynchronizationContext synchronizationContext = _synchronizationContext;
+ Action<object?>? currentCompletion = _completion;
+ object? currentState = _completionState;
+ ExecutionContext? executionContext = _executionContext;
+ SynchronizationContext? synchronizationContext = _synchronizationContext;
_completion = null;
_completionState = null;
_awaitableState &= ~AwaitableState.Completed;
}
- public void OnCompleted(Action<object> continuation, object state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion)
+ public void OnCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion)
{
completionData = default;
doubleCompletion = !ReferenceEquals(_completion, null);
if ((_awaitableState & AwaitableState.UseSynchronizationContext) != 0 &&
(flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
{
- SynchronizationContext sc = SynchronizationContext.Current;
+ SynchronizationContext? sc = SynchronizationContext.Current;
if (sc != null && sc.GetType() != typeof(SynchronizationContext))
{
_synchronizationContext = sc;
private const int InitialCallbacksSize = 1;
private bool _isCompleted;
- private ExceptionDispatchInfo _exceptionInfo;
+ private ExceptionDispatchInfo? _exceptionInfo;
private PipeCompletionCallback _firstCallback;
- private PipeCompletionCallback[] _callbacks;
+ private PipeCompletionCallback[]? _callbacks;
private int _callbackCount;
public bool IsCompleted => _isCompleted;
public bool IsFaulted => _exceptionInfo != null;
- public PipeCompletionCallbacks TryComplete(Exception exception = null)
+ public PipeCompletionCallbacks? TryComplete(Exception? exception = null)
{
if (!_isCompleted)
{
return GetCallbacks();
}
- public PipeCompletionCallbacks AddCallback(Action<Exception, object> callback, object state)
+ public PipeCompletionCallbacks? AddCallback(Action<Exception?, object?> callback, object? state)
{
if (_callbackCount == 0)
{
// -1 to adjust for _firstCallback
var callbackIndex = _callbackCount - 1;
_callbackCount++;
+ Debug.Assert(_callbacks != null);
_callbacks[callbackIndex] = new PipeCompletionCallback(callback, state);
}
return true;
}
- private PipeCompletionCallbacks GetCallbacks()
+ private PipeCompletionCallbacks? GetCallbacks()
{
Debug.Assert(IsCompleted);
if (_callbackCount == 0)
[MethodImpl(MethodImplOptions.NoInlining)]
private void ThrowLatchedException()
{
+ Debug.Assert(_exceptionInfo != null);
_exceptionInfo.Throw();
}
{
internal struct PipeCompletionCallback
{
- public Action<Exception, object> Callback;
- public object State;
+ public Action<Exception?, object?> Callback;
+ public object? State;
- public PipeCompletionCallback(Action<Exception, object> callback, object state)
+ public PipeCompletionCallback(Action<Exception?, object?> callback, object? state)
{
Callback = callback;
State = state;
{
private readonly ArrayPool<PipeCompletionCallback> _pool;
private readonly int _count;
- private readonly Exception _exception;
+ private readonly Exception? _exception;
private readonly PipeCompletionCallback _firstCallback;
- private readonly PipeCompletionCallback[] _callbacks;
+ private readonly PipeCompletionCallback[]? _callbacks;
- public PipeCompletionCallbacks(ArrayPool<PipeCompletionCallback> pool, int count, Exception exception, PipeCompletionCallback firstCallback, PipeCompletionCallback[] callbacks)
+ public PipeCompletionCallbacks(ArrayPool<PipeCompletionCallback> pool, int count, Exception? exception, PipeCompletionCallback firstCallback, PipeCompletionCallback[]? callbacks)
{
_pool = pool;
_count = count;
return;
}
- List<Exception> exceptions = null;
+ List<Exception>? exceptions = null;
Execute(_firstCallback, ref exceptions);
}
}
- private void Execute(PipeCompletionCallback callback, ref List<Exception> exceptions)
+ private void Execute(PipeCompletionCallback callback, ref List<Exception>? exceptions)
{
try
{
/// Creates a new instance of <see cref="PipeOptions"/>
/// </summary>
public PipeOptions(
- MemoryPool<byte> pool = null,
- PipeScheduler readerScheduler = null,
- PipeScheduler writerScheduler = null,
+ MemoryPool<byte>? pool = null,
+ PipeScheduler? readerScheduler = null,
+ PipeScheduler? writerScheduler = null,
long pauseWriterThreshold = -1,
long resumeWriterThreshold = -1,
int minimumSegmentSize = -1,
/// </summary>
public abstract partial class PipeReader
{
- private PipeReaderStream _stream;
+ private PipeReaderStream? _stream;
/// <summary>
/// Attempt to synchronously read data the <see cref="PipeReader"/>.
/// Marks the <see cref="PipeReader"/> as being complete, meaning no more data will be read from it.
/// </summary>
/// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the reader to complete.</param>
- public abstract void Complete(Exception exception = null);
+ public abstract void Complete(Exception? exception = null);
/// <summary>
/// Marks the <see cref="PipeReader"/> as being complete, meaning no more data will be read from it.
/// </summary>
/// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the reader to complete.</param>
- public virtual ValueTask CompleteAsync(Exception exception = null)
+ public virtual ValueTask CompleteAsync(Exception? exception = null)
{
try
{
/// Registers a callback that gets executed when the <see cref="PipeWriter"/> side of the pipe is completed
/// </summary>
[Obsolete("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
- public virtual void OnWriterCompleted(Action<Exception, object> callback, object state)
+ public virtual void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
{
}
/// <param name="stream">The stream.</param>
/// <param name="readerOptions">The options.</param>
/// <returns>A <see cref="PipeReader"/> that wraps the <see cref="Stream"/>.</returns>
- public static PipeReader Create(Stream stream, StreamPipeReaderOptions readerOptions = null)
+ public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOptions = null)
{
return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default);
}
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
- public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+ public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count, default), callback, state);
public sealed override int EndRead(IAsyncResult asyncResult) =>
/// <summary>
/// Requests <paramref name="action"/> to be run on scheduler with <paramref name="state"/> being passed in
/// </summary>
- public abstract void Schedule(Action<object> action, object state);
+ public abstract void Schedule(Action<object?> action, object? state);
- internal virtual void UnsafeSchedule(Action<object> action, object state)
+ internal virtual void UnsafeSchedule(Action<object?> action, object? state)
=> Schedule(action, state);
}
}
/// </summary>
public abstract partial class PipeWriter : IBufferWriter<byte>
{
- private PipeWriterStream _stream;
+ private PipeWriterStream? _stream;
/// <summary>
/// Marks the <see cref="PipeWriter"/> as being complete, meaning no more data will be written to it.
/// </summary>
/// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the pipeline to complete.</param>
- public abstract void Complete(Exception exception = null);
+ public abstract void Complete(Exception? exception = null);
/// <summary>
/// Marks the <see cref="PipeWriter"/> as being complete, meaning no more data will be written to it.
/// </summary>
/// <param name="exception">Optional <see cref="Exception"/> indicating a failure that's causing the pipeline to complete.</param>
- public virtual ValueTask CompleteAsync(Exception exception = null)
+ public virtual ValueTask CompleteAsync(Exception? exception = null)
{
try
{
/// Registers a callback that gets executed when the <see cref="PipeReader"/> side of the pipe is completed
/// </summary>
[Obsolete("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
- public virtual void OnReaderCompleted(Action<Exception, object> callback, object state)
+ public virtual void OnReaderCompleted(Action<Exception?, object?> callback, object? state)
{
}
/// <param name="stream">The stream.</param>
/// <param name="writerOptions">The options.</param>
/// <returns>A <see cref="PipeWriter"/> that wraps the <see cref="Stream"/>.</returns>
- public static PipeWriter Create(Stream stream, StreamPipeWriterOptions writerOptions = null)
+ public static PipeWriter Create(Stream stream, StreamPipeWriterOptions? writerOptions = null)
{
return new StreamPipeWriter(stream, writerOptions ?? StreamPipeWriterOptions.s_default);
}
public override void SetLength(long value) => throw new NotSupportedException();
- public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+ public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(WriteAsync(buffer, offset, count, default), callback, state);
public sealed override void EndWrite(IAsyncResult asyncResult) =>
private readonly int _bufferSize;
private readonly int _minimumReadThreshold;
- private readonly MemoryPool<byte> _pool;
+ private readonly MemoryPool<byte>? _pool;
- private CancellationTokenSource _internalTokenSource;
+ private CancellationTokenSource? _internalTokenSource;
private bool _isReaderCompleted;
private bool _isStreamCompleted;
- private BufferSegment _readHead;
+ private BufferSegment? _readHead;
private int _readIndex;
- private BufferSegment _readTail;
+ private BufferSegment? _readTail;
private long _bufferedBytes;
private bool _examinedEverything;
private readonly object _lock = new object();
{
ThrowIfCompleted();
- AdvanceTo((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
+ AdvanceTo((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger());
}
- private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
+ private void AdvanceTo(BufferSegment? consumedSegment, int consumedIndex, BufferSegment? examinedSegment, int examinedIndex)
{
if (consumedSegment == null || examinedSegment == null)
{
}
BufferSegment returnStart = _readHead;
- BufferSegment returnEnd = consumedSegment;
+ BufferSegment? returnEnd = consumedSegment;
long consumedBytes = BufferSegment.GetLength(returnStart, _readIndex, consumedSegment, consumedIndex);
}
else if (consumedIndex == returnEnd.Length)
{
- BufferSegment nextBlock = returnEnd.NextSegment;
+ BufferSegment? nextBlock = returnEnd.NextSegment;
_readHead = nextBlock;
_readIndex = 0;
returnEnd = nextBlock;
// Remove all blocks that are freed (except the last one)
while (returnStart != returnEnd)
{
- BufferSegment next = returnStart.NextSegment;
+ BufferSegment next = returnStart.NextSegment!;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
}
/// <inheritdoc />
- public override void Complete(Exception exception = null)
+ public override void Complete(Exception? exception = null)
{
if (_isReaderCompleted)
{
_isReaderCompleted = true;
- BufferSegment segment = _readHead;
+ BufferSegment? segment = _readHead;
while (segment != null)
{
BufferSegment returnSegment = segment;
var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
- reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
+ reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), this);
}
using (reg)
{
AllocateReadTail();
- Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);
+ Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
{
+ Debug.Assert(_readHead != null &&_readTail != null);
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
}
_readHead = AllocateSegment();
_readTail = _readHead;
}
- else if (_readTail.WritableBytes < _minimumReadThreshold)
+ else
{
- BufferSegment nextSegment = AllocateSegment();
- _readTail.SetNext(nextSegment);
- _readTail = nextSegment;
+ Debug.Assert(_readTail != null);
+ if (_readTail.WritableBytes < _minimumReadThreshold)
+ {
+ BufferSegment nextSegment = AllocateSegment();
+ _readTail.SetNext(nextSegment);
+ _readTail = nextSegment;
+ }
}
}
private BufferSegment CreateSegmentUnsynchronized()
{
- if (_bufferSegmentPool.TryPop(out BufferSegment segment))
+ if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
{
return segment;
}
/// <summary>
/// Creates a new instance of <see cref="StreamPipeReaderOptions"/>.
/// </summary>
- public StreamPipeReaderOptions(MemoryPool<byte> pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
+ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
{
Pool = pool ?? MemoryPool<byte>.Shared;
// See the LICENSE file in the project root for more information.
using System.Buffers;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
private readonly int _minimumBufferSize;
- private BufferSegment _head;
- private BufferSegment _tail;
+ private BufferSegment? _head;
+ private BufferSegment? _tail;
private Memory<byte> _tailMemory;
private int _tailBytesBuffered;
private int _bytesBuffered;
- private readonly MemoryPool<byte> _pool;
+ private readonly MemoryPool<byte>? _pool;
- private CancellationTokenSource _internalTokenSource;
+ private CancellationTokenSource? _internalTokenSource;
private bool _isCompleted;
private readonly object _lockObject = new object();
}
else
{
+ Debug.Assert(_tail != null);
int bytesLeftInBuffer = _tailMemory.Length;
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
private BufferSegment CreateSegmentUnsynchronized()
{
- if (_bufferSegmentPool.TryPop(out BufferSegment segment))
+ if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
{
return segment;
}
}
/// <inheritdoc />
- public override void Complete(Exception exception = null)
+ public override void Complete(Exception? exception = null)
{
if (_isCompleted)
{
}
}
- public override async ValueTask CompleteAsync(Exception exception = null)
+ public override async ValueTask CompleteAsync(Exception? exception = null)
{
if (_isCompleted)
{
var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
- reg = cancellationToken.UnsafeRegister(state => ((StreamPipeWriter)state).Cancel(), this);
+ reg = cancellationToken.UnsafeRegister(state => ((StreamPipeWriter)state!).Cancel(), this);
}
if (_tailBytesBuffered > 0)
{
+ Debug.Assert(_tail != null);
+
// Update any buffered data
_tail.End += _tailBytesBuffered;
_tailBytesBuffered = 0;
CancellationToken localToken = InternalTokenSource.Token;
try
{
- BufferSegment segment = _head;
+ BufferSegment? segment = _head;
while (segment != null)
{
BufferSegment returnSegment = segment;
// and flush the result.
if (_tailBytesBuffered > 0)
{
+ Debug.Assert(_tail != null);
+
// Update any buffered data
_tail.End += _tailBytesBuffered;
_tailBytesBuffered = 0;
}
- BufferSegment segment = _head;
+ BufferSegment? segment = _head;
while (segment != null)
{
BufferSegment returnSegment = segment;
/// <summary>
/// Creates a new instance of <see cref="StreamPipeWriterOptions"/>.
/// </summary>
- public StreamPipeWriterOptions(MemoryPool<byte> pool = null, int minimumBufferSize = -1, bool leaveOpen = false)
+ public StreamPipeWriterOptions(MemoryPool<byte>? pool = null, int minimumBufferSize = -1, bool leaveOpen = false)
{
Pool = pool ?? MemoryPool<byte>.Shared;
{
internal sealed class ThreadPoolScheduler : PipeScheduler
{
- public override void Schedule(Action<object> action, object state)
+ public override void Schedule(Action<object?> action, object? state)
{
System.Threading.ThreadPool.QueueUserWorkItem(action, state, preferLocal: false);
}
- internal override void UnsafeSchedule(Action<object> action, object state)
+ internal override void UnsafeSchedule(Action<object?> action, object? state)
{
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(action, state, preferLocal: false);
}
// See the LICENSE file in the project root for more information.
using System.Runtime.CompilerServices;
+using System.Diagnostics.CodeAnalysis;
namespace System.IO.Pipelines
{
internal static class ThrowHelper
{
+ [DoesNotReturn]
internal static void ThrowArgumentOutOfRangeException(ExceptionArgument argument) => throw CreateArgumentOutOfRangeException(argument);
[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateArgumentOutOfRangeException(ExceptionArgument argument) => new ArgumentOutOfRangeException(argument.ToString());
+ [DoesNotReturn]
internal static void ThrowArgumentNullException(ExceptionArgument argument) => throw CreateArgumentNullException(argument);
[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateArgumentNullException(ExceptionArgument argument) => new ArgumentNullException(argument.ToString());
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_AlreadyReading() => throw CreateInvalidOperationException_AlreadyReading();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_AlreadyReading() => new InvalidOperationException(SR.ReadingIsInProgress);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_NoReadToComplete() => throw CreateInvalidOperationException_NoReadToComplete();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoReadToComplete() => new InvalidOperationException(SR.NoReadingOperationToComplete);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_NoConcurrentOperation() => throw CreateInvalidOperationException_NoConcurrentOperation();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoConcurrentOperation() => new InvalidOperationException(SR.ConcurrentOperationsNotSupported);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_GetResultNotCompleted() => throw CreateInvalidOperationException_GetResultNotCompleted();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_GetResultNotCompleted() => new InvalidOperationException(SR.GetResultBeforeCompleted);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_NoWritingAllowed() => throw CreateInvalidOperationException_NoWritingAllowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoWritingAllowed() => new InvalidOperationException(SR.WritingAfterCompleted);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_NoReadingAllowed() => throw CreateInvalidOperationException_NoReadingAllowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoReadingAllowed() => new InvalidOperationException(SR.ReadingAfterCompleted);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_InvalidExaminedPosition() => throw CreateInvalidOperationException_InvalidExaminedPosition();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_InvalidExaminedPosition() => new InvalidOperationException(SR.InvalidExaminedPosition);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition() => throw CreateInvalidOperationException_InvalidExaminedOrConsumedPosition();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_InvalidExaminedOrConsumedPosition() => new InvalidOperationException(SR.InvalidExaminedOrConsumedPosition);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_AdvanceToInvalidCursor() => throw CreateInvalidOperationException_AdvanceToInvalidCursor();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_AdvanceToInvalidCursor() => new InvalidOperationException(SR.AdvanceToInvalidCursor);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_ResetIncompleteReaderWriter() => throw CreateInvalidOperationException_ResetIncompleteReaderWriter();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_ResetIncompleteReaderWriter() => new InvalidOperationException(SR.ReaderAndWriterHasToBeCompleted);
+ [DoesNotReturn]
public static void ThrowOperationCanceledException_ReadCanceled() => throw CreateOperationCanceledException_ReadCanceled();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateOperationCanceledException_ReadCanceled() => new OperationCanceledException(SR.ReadCanceledOnPipeReader);
+ [DoesNotReturn]
public static void ThrowOperationCanceledException_FlushCanceled() => throw CreateOperationCanceledException_FlushCanceled();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateOperationCanceledException_FlushCanceled() => new OperationCanceledException(SR.FlushCanceledOnPipeWriter);
+ [DoesNotReturn]
public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);