Use IValueTaskSource in PipeStream on Windows (#52695)
authorEmmanuel André <2341261+manandre@users.noreply.github.com>
Thu, 12 Aug 2021 00:59:40 +0000 (02:59 +0200)
committerGitHub <noreply@github.com>
Thu, 12 Aug 2021 00:59:40 +0000 (20:59 -0400)
* Use IValueTaskSource in PipeStream on Windows

* Revise implementation of IValueTaskSources

Better match implementation in RandomAccess

Co-authored-by: Stephen Toub <stoub@microsoft.com>
src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj
src/libraries/System.IO.Pipes/src/System/IO/Pipes/ConnectionCompletionSource.cs [deleted file]
src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs
src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeCompletionSource.cs [deleted file]
src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ValueTaskSource.cs [new file with mode: 0644]
src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs
src/libraries/System.IO.Pipes/src/System/IO/Pipes/ReadWriteCompletionSource.cs [deleted file]
src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs

index d92b043..3113dd3 100644 (file)
     <Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Windows.cs" />
     <Compile Include="System\IO\Pipes\AnonymousPipeServerStreamAcl.cs" />
     <Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Windows.cs" />
-    <Compile Include="System\IO\Pipes\ConnectionCompletionSource.cs" />
     <Compile Include="System\IO\Pipes\NamedPipeServerStreamAcl.cs" />
     <Compile Include="System\IO\Pipes\NamedPipeClientStream.Windows.cs" />
     <Compile Include="System\IO\Pipes\NamedPipeServerStream.Windows.cs" />
     <Compile Include="System\IO\Pipes\PipeAccessRights.cs" />
     <Compile Include="System\IO\Pipes\PipeAccessRule.cs" />
     <Compile Include="System\IO\Pipes\PipeAuditRule.cs" />
-    <Compile Include="System\IO\Pipes\PipeCompletionSource.cs" />
     <Compile Include="System\IO\Pipes\PipesAclExtensions.cs" />
     <Compile Include="System\IO\Pipes\PipeSecurity.cs" />
+    <Compile Include="System\IO\Pipes\PipeStream.ValueTaskSource.cs" />
     <Compile Include="System\IO\Pipes\PipeStream.Windows.cs" />
-    <Compile Include="System\IO\Pipes\ReadWriteCompletionSource.cs" />
   </ItemGroup>
   <!-- Windows : Win32 only -->
   <ItemGroup Condition="'$(TargetsWindows)' == 'true'">
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/ConnectionCompletionSource.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/ConnectionCompletionSource.cs
deleted file mode 100644 (file)
index 96d94e0..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Runtime.ExceptionServices;
-using System.Threading;
-
-namespace System.IO.Pipes
-{
-    internal sealed class ConnectionCompletionSource : PipeCompletionSource<VoidResult>
-    {
-        private readonly NamedPipeServerStream _serverStream;
-
-        // Using RunContinuationsAsynchronously for compat reasons (old API used ThreadPool.QueueUserWorkItem for continuations)
-        internal ConnectionCompletionSource(NamedPipeServerStream server)
-            : base(server._threadPoolBinding!, ReadOnlyMemory<byte>.Empty)
-        {
-            _serverStream = server;
-        }
-
-        internal override void SetCompletedSynchronously()
-        {
-            _serverStream.State = PipeState.Connected;
-            TrySetResult(default(VoidResult));
-        }
-
-        protected override void AsyncCallback(uint errorCode, uint numBytes)
-        {
-            // Special case for when the client has already connected to us.
-            if (errorCode == Interop.Errors.ERROR_PIPE_CONNECTED)
-            {
-                errorCode = 0;
-            }
-
-            base.AsyncCallback(errorCode, numBytes);
-        }
-
-        protected override void HandleError(int errorCode) =>
-            TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(Win32Marshal.GetExceptionForWin32Error(errorCode)));
-
-        protected override void HandleUnexpectedCancellation() =>
-            TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(Error.GetOperationAborted()));
-    }
-
-    internal struct VoidResult { }
-}
index 3abd2fe..625d2e3 100644 (file)
@@ -2,8 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 
 using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
-using System.Runtime.CompilerServices;
+using System.Runtime.ExceptionServices;
 using System.Runtime.InteropServices;
 using System.Security.AccessControl;
 using System.Security.Principal;
@@ -18,6 +17,8 @@ namespace System.IO.Pipes
     /// </summary>
     public sealed partial class NamedPipeServerStream : PipeStream
     {
+        private ConnectionValueTaskSource? _reusableConnectionValueTaskSource; // reusable ConnectionValueTaskSource that is currently NOT being used
+
         internal NamedPipeServerStream(
             string pipeName,
             PipeDirection direction,
@@ -41,6 +42,31 @@ namespace System.IO.Pipes
             Create(pipeName, direction, maxNumberOfServerInstances, transmissionMode, options, inBufferSize, outBufferSize, pipeSecurity, inheritability, additionalAccessRights);
         }
 
+        protected override void Dispose(bool disposing)
+        {
+            try
+            {
+                Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null)?.Dispose();
+            }
+            finally
+            {
+                base.Dispose(disposing);
+            }
+        }
+
+        internal override void TryToReuse(PipeValueTaskSource source)
+        {
+            base.TryToReuse(source);
+
+            if (source is ConnectionValueTaskSource connectionSource)
+            {
+                if (Interlocked.CompareExchange(ref _reusableConnectionValueTaskSource, connectionSource, null) is not null)
+                {
+                    source._preallocatedOverlapped.Dispose();
+                }
+            }
+        }
+
         private void Create(string pipeName, PipeDirection direction, int maxNumberOfServerInstances,
                 PipeTransmissionMode transmissionMode, PipeOptions options, int inBufferSize, int outBufferSize,
                 HandleInheritability inheritability)
@@ -140,7 +166,8 @@ namespace System.IO.Pipes
 
             if (IsAsync)
             {
-                WaitForConnectionCoreAsync(CancellationToken.None).GetAwaiter().GetResult();
+                ValueTask vt = WaitForConnectionCoreAsync(CancellationToken.None);
+                vt.AsTask().GetAwaiter().GetResult();
             }
             else
             {
@@ -180,7 +207,7 @@ namespace System.IO.Pipes
                     this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
             }
 
-            return WaitForConnectionCoreAsync(cancellationToken);
+            return WaitForConnectionCoreAsync(cancellationToken).AsTask();
         }
 
         public void Disconnect()
@@ -293,50 +320,52 @@ namespace System.IO.Pipes
         }
 
         // Async version of WaitForConnection.  See the comments above for more info.
-        private unsafe Task WaitForConnectionCoreAsync(CancellationToken cancellationToken)
+        private unsafe ValueTask WaitForConnectionCoreAsync(CancellationToken cancellationToken)
         {
             CheckConnectOperationsServerWithHandle();
+            Debug.Assert(IsAsync);
 
-            if (!IsAsync)
-            {
-                throw new InvalidOperationException(SR.InvalidOperation_PipeNotAsync);
-            }
-
-            var completionSource = new ConnectionCompletionSource(this);
-
-            if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, completionSource.Overlapped))
+            ConnectionValueTaskSource? vts = Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null) ?? new ConnectionValueTaskSource(this);
+            try
             {
-                int errorCode = Marshal.GetLastPInvokeError();
-
-                switch (errorCode)
+                vts.PrepareForOperation();
+                if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, vts._overlapped))
                 {
-                    case Interop.Errors.ERROR_IO_PENDING:
-                        break;
-
-                    // If we are here then the pipe is already connected, or there was an error
-                    // so we should unpin and free the overlapped.
-                    case Interop.Errors.ERROR_PIPE_CONNECTED:
-                        // IOCompletitionCallback will not be called because we completed synchronously.
-                        completionSource.ReleaseResources();
-                        if (State == PipeState.Connected)
-                        {
-                            throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected);
-                        }
-                        completionSource.SetCompletedSynchronously();
-
-                        // We return a cached task instead of TaskCompletionSource's Task allowing the GC to collect it.
-                        return Task.CompletedTask;
-
-                    default:
-                        completionSource.ReleaseResources();
-                        throw Win32Marshal.GetExceptionForWin32Error(errorCode);
+                    int errorCode = Marshal.GetLastPInvokeError();
+                    switch (errorCode)
+                    {
+                        case Interop.Errors.ERROR_IO_PENDING:
+                            // Common case: IO was initiated, completion will be handled by callback.
+                            // Register for cancellation now that the operation has been initiated.
+                            vts.RegisterForCancellation(cancellationToken);
+                            break;
+
+                        case Interop.Errors.ERROR_PIPE_CONNECTED:
+                            // If we are here then the pipe is already connected.
+                            // IOCompletitionCallback will not be called because we completed synchronously.
+                            vts.Dispose();
+                            if (State == PipeState.Connected)
+                            {
+                                return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected)));
+                            }
+                            State = PipeState.Connected;
+                            return ValueTask.CompletedTask;
+
+                        default:
+                            vts.Dispose();
+                            return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(Win32Marshal.GetExceptionForWin32Error(errorCode)));
+                    }
                 }
             }
+            catch
+            {
+                vts.Dispose();
+                throw;
+            }
 
-            // If we are here then connection is pending.
-            completionSource.RegisterForCancellation(cancellationToken);
-
-            return completionSource.Task;
+            // Completion handled by callback.
+            vts.FinishedScheduling();
+            return new ValueTask(vts, vts.Version);
         }
 
         private void CheckConnectOperationsServerWithHandle()
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeCompletionSource.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeCompletionSource.cs
deleted file mode 100644 (file)
index dc2180b..0000000
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Buffers;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
-using System.Security;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace System.IO.Pipes
-{
-    internal abstract unsafe class PipeCompletionSource<TResult> : TaskCompletionSource<TResult>
-    {
-        private const int NoResult = 0;
-        private const int ResultSuccess = 1;
-        private const int ResultError = 2;
-        private const int RegisteringCancellation = 4;
-        private const int CompletedCallback = 8;
-
-        private readonly ThreadPoolBoundHandle _threadPoolBinding;
-
-        private CancellationTokenRegistration _cancellationRegistration;
-        private int _errorCode;
-        private NativeOverlapped* _overlapped;
-        private MemoryHandle _pinnedMemory;
-        private int _state;
-
-#if DEBUG
-        private bool _cancellationHasBeenRegistered;
-#endif
-
-        // Using RunContinuationsAsynchronously for compat reasons (old API used ThreadPool.QueueUserWorkItem for continuations)
-        protected PipeCompletionSource(ThreadPoolBoundHandle handle, ReadOnlyMemory<byte> bufferToPin)
-            : base(TaskCreationOptions.RunContinuationsAsynchronously)
-        {
-            Debug.Assert(handle != null, "handle is null");
-
-            _threadPoolBinding = handle;
-            _state = NoResult;
-
-            _pinnedMemory = bufferToPin.Pin();
-            _overlapped = _threadPoolBinding.AllocateNativeOverlapped((errorCode, numBytes, pOverlapped) =>
-            {
-                var completionSource = (PipeCompletionSource<TResult>)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!;
-                Debug.Assert(completionSource.Overlapped == pOverlapped);
-
-                completionSource.AsyncCallback(errorCode, numBytes);
-            }, this, null);
-        }
-
-        internal NativeOverlapped* Overlapped
-        {
-            get { return _overlapped; }
-        }
-
-        internal void RegisterForCancellation(CancellationToken cancellationToken)
-        {
-#if DEBUG
-            Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
-            _cancellationHasBeenRegistered = true;
-#endif
-
-            // Quick check to make sure that the cancellation token supports cancellation, and that the IO hasn't completed
-            if (cancellationToken.CanBeCanceled && Overlapped != null)
-            {
-                // Register the cancellation only if the IO hasn't completed
-                int state = Interlocked.CompareExchange(ref _state, RegisteringCancellation, NoResult);
-                if (state == NoResult)
-                {
-                    // Register the cancellation
-                    _cancellationRegistration = cancellationToken.UnsafeRegister(thisRef => ((PipeCompletionSource<TResult>)thisRef!).Cancel(), this);
-
-                    // Grab the state for case if IO completed while we were setting the registration.
-                    state = Interlocked.Exchange(ref _state, NoResult);
-                }
-                else if (state != CompletedCallback)
-                {
-                    // IO already completed and we have grabbed result state.
-                    // Set NoResult to prevent invocation of CompleteCallback(result state) from AsyncCallback(...)
-                    state = Interlocked.Exchange(ref _state, NoResult);
-                }
-
-                // If we have the result state of completed IO call CompleteCallback(result).
-                // Otherwise IO not completed.
-                if ((state & (ResultSuccess | ResultError)) != 0)
-                {
-                    CompleteCallback(state);
-                }
-            }
-        }
-
-        internal void ReleaseResources()
-        {
-            _cancellationRegistration.Dispose();
-
-            // NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
-            // (this is why we disposed the registration above)
-            if (_overlapped != null)
-            {
-                _threadPoolBinding.FreeNativeOverlapped(Overlapped);
-                _overlapped = null;
-            }
-
-            _pinnedMemory.Dispose();
-        }
-
-        internal abstract void SetCompletedSynchronously();
-
-        protected virtual void AsyncCallback(uint errorCode, uint numBytes)
-        {
-            int resultState;
-            if (errorCode == 0)
-            {
-                resultState = ResultSuccess;
-            }
-            else
-            {
-                resultState = ResultError;
-                _errorCode = (int)errorCode;
-            }
-
-            // Store the result so that other threads can observe it
-            // and if no other thread is registering cancellation, continue.
-            // Otherwise CompleteCallback(resultState) will be invoked by RegisterForCancellation().
-            if (Interlocked.Exchange(ref _state, resultState) == NoResult)
-            {
-                // Now try to prevent invocation of CompleteCallback(resultState) from RegisterForCancellation().
-                // Otherwise, thread responsible for registering cancellation stole the result and it will invoke CompleteCallback(resultState).
-                if (Interlocked.Exchange(ref _state, CompletedCallback) != NoResult)
-                {
-                    CompleteCallback(resultState);
-                }
-            }
-        }
-
-        protected abstract void HandleError(int errorCode);
-
-        private void Cancel()
-        {
-            SafeHandle handle = _threadPoolBinding.Handle;
-            NativeOverlapped* overlapped = Overlapped;
-
-            // If the handle is still valid, attempt to cancel the IO
-            if (!handle.IsInvalid && !Interop.Kernel32.CancelIoEx(handle, overlapped))
-            {
-                // This case should not have any consequences although
-                // it will be easier to debug if there exists any special case
-                // we are not aware of.
-                int errorCode = Marshal.GetLastPInvokeError();
-                Debug.WriteLine("CancelIoEx finished with error code {0}.", errorCode);
-            }
-        }
-
-        protected virtual void HandleUnexpectedCancellation() => TrySetCanceled();
-
-        private void CompleteCallback(int resultState)
-        {
-            Debug.Assert(resultState == ResultSuccess || resultState == ResultError, $"Unexpected result state {resultState}");
-            CancellationToken cancellationToken = _cancellationRegistration.Token;
-
-            ReleaseResources();
-
-            if (resultState == ResultError)
-            {
-                if (_errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
-                {
-                    if (cancellationToken.CanBeCanceled && !cancellationToken.IsCancellationRequested)
-                    {
-                        HandleUnexpectedCancellation();
-                    }
-                    else
-                    {
-                        // otherwise set canceled
-                        TrySetCanceled(cancellationToken);
-                    }
-                }
-                else
-                {
-                    HandleError(_errorCode);
-                }
-            }
-            else
-            {
-                SetCompletedSynchronously();
-            }
-        }
-    }
-}
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ValueTaskSource.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.ValueTaskSource.cs
new file mode 100644 (file)
index 0000000..9d1430a
--- /dev/null
@@ -0,0 +1,238 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks.Sources;
+
+namespace System.IO.Pipes
+{
+    public abstract partial class PipeStream : Stream
+    {
+        internal abstract unsafe class PipeValueTaskSource : IValueTaskSource<int>, IValueTaskSource
+        {
+            internal static readonly IOCompletionCallback s_ioCallback = IOCallback;
+
+            internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
+            internal readonly PipeStream _pipeStream;
+            internal MemoryHandle _memoryHandle;
+            internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
+            internal NativeOverlapped* _overlapped;
+            internal CancellationTokenRegistration _cancellationRegistration;
+            /// <summary>
+            /// 0 when the operation hasn't been scheduled, non-zero when either the operation has completed,
+            /// in which case its value is a packed combination of the error code and number of bytes, or when
+            /// the read/write call has finished scheduling the async operation.
+            /// </summary>
+            internal ulong _result;
+
+            protected PipeValueTaskSource(PipeStream pipeStream)
+            {
+                _pipeStream = pipeStream;
+                _source.RunContinuationsAsynchronously = true;
+                _preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);
+            }
+
+            internal void Dispose()
+            {
+                ReleaseResources();
+                _preallocatedOverlapped.Dispose();
+            }
+
+            internal void PrepareForOperation(ReadOnlyMemory<byte> memory = default)
+            {
+                _result = 0;
+                _memoryHandle = memory.Pin();
+                _overlapped = _pipeStream._threadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
+            }
+
+            public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
+            public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags);
+            void IValueTaskSource.GetResult(short token) => GetResult(token);
+            public int GetResult(short token)
+            {
+                try
+                {
+                    return _source.GetResult(token);
+                }
+                finally
+                {
+                    // The instance is ready to be reused
+                    _pipeStream.TryToReuse(this);
+                }
+            }
+
+            internal short Version => _source.Version;
+
+            internal void RegisterForCancellation(CancellationToken cancellationToken)
+            {
+                Debug.Assert(_overlapped != null);
+                if (cancellationToken.CanBeCanceled)
+                {
+                    try
+                    {
+                        _cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
+                        {
+                            PipeValueTaskSource vts = (PipeValueTaskSource)s!;
+                            if (!vts._pipeStream.SafePipeHandle.IsInvalid)
+                            {
+                                try
+                                {
+                                    Interop.Kernel32.CancelIoEx(vts._pipeStream.SafePipeHandle, vts._overlapped);
+                                    // Ignore all failures: no matter whether it succeeds or fails, completion is handled via the IOCallback.
+                                }
+                                catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently
+                            }
+                        }, this);
+                    }
+                    catch (OutOfMemoryException)
+                    {
+                        // Just in case trying to register OOMs, we ignore it in order to
+                        // protect the higher-level calling code that would proceed to unpin
+                        // memory that might be actively used by an in-flight async operation.
+                    }
+                }
+            }
+
+            internal void ReleaseResources()
+            {
+                // Ensure that any cancellation callback has either completed or will never run, so that
+                // we don't try to access an overlapped for this operation after it's already been freed.
+                _cancellationRegistration.Dispose();
+
+                // Unpin any pinned buffer.
+                _memoryHandle.Dispose();
+
+                // Free the overlapped.
+                if (_overlapped != null)
+                {
+                    _pipeStream._threadPoolBinding!.FreeNativeOverlapped(_overlapped);
+                    _overlapped = null;
+                }
+            }
+
+            // After calling Read/WriteFile to start the asynchronous operation, the caller may configure cancellation,
+            // and only after that should we allow for completing the operation, as completion needs to factor in work
+            // done by that cancellation registration, e.g. unregistering.  As such, we use _result to both track who's
+            // responsible for calling Complete and for passing the necessary data between parties.
+
+            /// <summary>Invoked when the async operation finished being scheduled.</summary>
+            internal void FinishedScheduling()
+            {
+                // Set the value to 1.  If it was already non-0, then the asynchronous operation already completed but
+                // didn't call Complete, so we call Complete here.  The read result value is the data (packed) necessary
+                // to make the call.
+                ulong result = Interlocked.Exchange(ref _result, 1);
+                if (result != 0)
+                {
+                    Complete(errorCode: (uint)result, numBytes: (uint)(result >> 32) & 0x7FFFFFFF);
+                }
+            }
+
+            /// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
+            private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
+            {
+                PipeValueTaskSource? vts = (PipeValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
+                Debug.Assert(vts is not null);
+                Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");
+
+                // Set the value to a packed combination of the error code and number of bytes (plus a high-bit 1
+                // to ensure the value we're setting is non-zero).  If it was already non-0 (the common case), then
+                // the call site already finished scheduling the async operation, in which case we're ready to complete.
+                Debug.Assert(numBytes < int.MaxValue);
+                if (Interlocked.Exchange(ref vts._result, (1ul << 63) | ((ulong)numBytes << 32) | errorCode) != 0)
+                {
+                    vts.Complete(errorCode, numBytes);
+                }
+            }
+
+            private void Complete(uint errorCode, uint numBytes)
+            {
+                ReleaseResources();
+                CompleteCore(errorCode, numBytes);
+            }
+
+            private protected abstract void CompleteCore(uint errorCode, uint numBytes);
+        }
+
+        internal sealed class ReadWriteValueTaskSource : PipeValueTaskSource
+        {
+            internal readonly bool _isWrite;
+
+            internal ReadWriteValueTaskSource(PipeStream stream, bool isWrite) : base(stream) => _isWrite = isWrite;
+
+            private protected override void CompleteCore(uint errorCode, uint numBytes)
+            {
+                if (!_isWrite)
+                {
+                    bool messageCompletion = true;
+
+                    switch (errorCode)
+                    {
+                        case Interop.Errors.ERROR_BROKEN_PIPE:
+                        case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
+                        case Interop.Errors.ERROR_NO_DATA:
+                            errorCode = 0;
+                            break;
+
+                        case Interop.Errors.ERROR_MORE_DATA:
+                            errorCode = 0;
+                            messageCompletion = false;
+                            break;
+                    }
+
+                    _pipeStream.UpdateMessageCompletion(messageCompletion);
+                }
+
+                switch (errorCode)
+                {
+                    case 0:
+                        // Success
+                        _source.SetResult((int)numBytes);
+                        break;
+
+                    case Interop.Errors.ERROR_OPERATION_ABORTED:
+                        // Cancellation
+                        CancellationToken ct = _cancellationRegistration.Token;
+                        _source.SetException(ct.IsCancellationRequested ? new OperationCanceledException(ct) : new OperationCanceledException());
+                        break;
+
+                    default:
+                        // Failure
+                        _source.SetException(_pipeStream.WinIOError((int)errorCode));
+                        break;
+                }
+            }
+        }
+
+        internal sealed class ConnectionValueTaskSource : PipeValueTaskSource
+        {
+            internal ConnectionValueTaskSource(NamedPipeServerStream server) : base(server) { }
+
+            private protected override void CompleteCore(uint errorCode, uint numBytes)
+            {
+                switch (errorCode)
+                {
+                    case 0:
+                    case Interop.Errors.ERROR_PIPE_CONNECTED: // special case for when the client has already connected to us
+                        // Success
+                        _pipeStream.State = PipeState.Connected;
+                        _source.SetResult((int)numBytes);
+                        break;
+
+                    case Interop.Errors.ERROR_OPERATION_ABORTED:
+                        // Cancellation
+                        CancellationToken ct = _cancellationRegistration.Token;
+                        _source.SetException(ct.CanBeCanceled && !ct.IsCancellationRequested ? Error.GetOperationAborted() : new OperationCanceledException(ct));
+                        break;
+
+                    default:
+                        // Failure
+                        _source.SetException(Win32Marshal.GetExceptionForWin32Error((int)errorCode));
+                        break;
+                }
+            }
+        }
+    }
+}
index d8c4bf2..1a1134c 100644 (file)
@@ -2,12 +2,12 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 
 using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
+using System.Runtime.ExceptionServices;
 using System.Runtime.InteropServices;
+using System.Runtime.Versioning;
 using System.Threading;
 using System.Threading.Tasks;
 using Microsoft.Win32.SafeHandles;
-using System.Runtime.Versioning;
 
 namespace System.IO.Pipes
 {
@@ -15,6 +15,8 @@ namespace System.IO.Pipes
     {
         internal const bool CheckOperationsRequiresSetHandle = true;
         internal ThreadPoolBoundHandle? _threadPoolBinding;
+        private ReadWriteValueTaskSource? _reusableReadValueTaskSource; // reusable ReadWriteValueTaskSource for read operations, that is currently NOT being used
+        private ReadWriteValueTaskSource? _reusableWriteValueTaskSource; // reusable ReadWriteValueTaskSource for write operations, that is currently NOT being used
 
         public override int Read(byte[] buffer, int offset, int count)
         {
@@ -182,7 +184,7 @@ namespace System.IO.Pipes
                 return Task.CompletedTask;
             }
 
-            return WriteAsyncCore(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+            return WriteAsyncCore(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
         }
 
         public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
@@ -209,7 +211,7 @@ namespace System.IO.Pipes
                 return default;
             }
 
-            return new ValueTask(WriteAsyncCore(buffer, cancellationToken));
+            return WriteAsyncCore(buffer, cancellationToken);
         }
 
         public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
@@ -258,136 +260,182 @@ namespace System.IO.Pipes
             _threadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle);
         }
 
+        internal virtual void TryToReuse(PipeValueTaskSource source)
+        {
+            source._source.Reset();
+
+            if (source is ReadWriteValueTaskSource readWriteSource)
+            {
+                ref ReadWriteValueTaskSource? field = ref readWriteSource._isWrite ? ref _reusableWriteValueTaskSource : ref _reusableReadValueTaskSource;
+                if (Interlocked.CompareExchange(ref field, readWriteSource, null) is not null)
+                {
+                    source._preallocatedOverlapped.Dispose();
+                }
+            }
+        }
+
         private void DisposeCore(bool disposing)
         {
             if (disposing)
             {
                 _threadPoolBinding?.Dispose();
+                Interlocked.Exchange(ref _reusableReadValueTaskSource, null)?.Dispose();
+                Interlocked.Exchange(ref _reusableWriteValueTaskSource, null)?.Dispose();
             }
         }
 
         private unsafe int ReadCore(Span<byte> buffer)
         {
-            int errorCode = 0;
-            int r = ReadFileNative(_handle!, buffer, null, out errorCode);
+            DebugAssertHandleValid(_handle!);
+            Debug.Assert(!_isAsync);
 
-            if (r == -1)
+            if (buffer.Length == 0)
             {
-                // If the other side has broken the connection, set state to Broken and return 0
-                if (errorCode == Interop.Errors.ERROR_BROKEN_PIPE ||
-                    errorCode == Interop.Errors.ERROR_PIPE_NOT_CONNECTED)
+                return 0;
+            }
+
+            fixed (byte* p = &MemoryMarshal.GetReference(buffer))
+            {
+                int bytesRead = 0;
+                if (Interop.Kernel32.ReadFile(_handle!, p, buffer.Length, out bytesRead, IntPtr.Zero) != 0)
                 {
-                    State = PipeState.Broken;
-                    r = 0;
+                    _isMessageComplete = true;
+                    return bytesRead;
                 }
                 else
                 {
-                    throw Win32Marshal.GetExceptionForWin32Error(errorCode, string.Empty);
-                }
-            }
-            _isMessageComplete = (errorCode != Interop.Errors.ERROR_MORE_DATA);
+                    int errorCode = Marshal.GetLastPInvokeError();
+                    _isMessageComplete = errorCode != Interop.Errors.ERROR_MORE_DATA;
+                    switch (errorCode)
+                    {
+                        case Interop.Errors.ERROR_MORE_DATA:
+                            return bytesRead;
 
-            Debug.Assert(r >= 0, "PipeStream's ReadCore is likely broken.");
+                        case Interop.Errors.ERROR_BROKEN_PIPE:
+                        case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
+                            State = PipeState.Broken;
+                            return 0;
 
-            return r;
+                        default:
+                            throw Win32Marshal.GetExceptionForWin32Error(errorCode, string.Empty);
+                    }
+                }
+            }
         }
 
-        private ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
+        private unsafe ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
         {
-            var completionSource = new ReadWriteCompletionSource(this, buffer, isWrite: false);
+            Debug.Assert(_isAsync);
 
-            // Queue an async ReadFile operation and pass in a packed overlapped
-            int errorCode = 0;
-            int r;
-            unsafe
+            ReadWriteValueTaskSource vts = Interlocked.Exchange(ref _reusableReadValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: false);
+            try
             {
-                r = ReadFileNative(_handle!, buffer.Span, completionSource.Overlapped, out errorCode);
-            }
+                vts.PrepareForOperation(buffer);
+                Debug.Assert(vts._memoryHandle.Pointer != null);
 
-            // ReadFile, the OS version, will return 0 on failure, but this ReadFileNative wrapper
-            // returns -1. This will return the following:
-            // - On error, r==-1.
-            // - On async requests that are still pending, r==-1 w/ hr==ERROR_IO_PENDING
-            // - On async requests that completed sequentially, r==0
-            //
-            // You will NEVER RELIABLY be able to get the number of buffer read back from this call
-            // when using overlapped structures!  You must not pass in a non-null lpNumBytesRead to
-            // ReadFile when using overlapped structures!  This is by design NT behavior.
-            if (r == -1)
-            {
-                switch (errorCode)
+                // Queue an async ReadFile operation.
+                if (Interop.Kernel32.ReadFile(_handle!, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, vts._overlapped) == 0)
                 {
-                    // One side has closed its handle or server disconnected.
-                    // Set the state to Broken and do some cleanup work
-                    case Interop.Errors.ERROR_BROKEN_PIPE:
-                    case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
-                        State = PipeState.Broken;
-
-                        unsafe
-                        {
-                            // Clear the overlapped status bit for this special case. Failure to do so looks
-                            // like we are freeing a pending overlapped.
-                            completionSource.Overlapped->InternalLow = IntPtr.Zero;
-                        }
-
-                        completionSource.ReleaseResources();
-                        UpdateMessageCompletion(true);
-                        return new ValueTask<int>(0);
-
-                    case Interop.Errors.ERROR_IO_PENDING:
-                        break;
-
-                    default:
-                        throw Win32Marshal.GetExceptionForWin32Error(errorCode);
+                    // The operation failed, or it's pending.
+                    int errorCode = Marshal.GetLastPInvokeError();
+                    switch (errorCode)
+                    {
+                        case Interop.Errors.ERROR_IO_PENDING:
+                            // Common case: IO was initiated, completion will be handled by callback.
+                            // Register for cancellation now that the operation has been initiated.
+                            vts.RegisterForCancellation(cancellationToken);
+                            break;
+
+                        case Interop.Errors.ERROR_MORE_DATA:
+                            // The operation is completing asynchronously but there's nothing to cancel.
+                            break;
+
+                        // One side has closed its handle or server disconnected.
+                        // Set the state to Broken and do some cleanup work
+                        case Interop.Errors.ERROR_BROKEN_PIPE:
+                        case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
+                            State = PipeState.Broken;
+                            vts._overlapped->InternalLow = IntPtr.Zero;
+                            vts.Dispose();
+                            UpdateMessageCompletion(true);
+                            return new ValueTask<int>(0);
+
+                        default:
+                            // Error. Callback will not be called.
+                            vts.Dispose();
+                            return ValueTask.FromException<int>(Win32Marshal.GetExceptionForWin32Error(errorCode));
+                    }
                 }
             }
+            catch
+            {
+                vts.Dispose();
+                throw;
+            }
 
-            completionSource.RegisterForCancellation(cancellationToken);
-            return new ValueTask<int>(completionSource.Task);
+            vts.FinishedScheduling();
+            return new ValueTask<int>(vts, vts.Version);
         }
 
         private unsafe void WriteCore(ReadOnlySpan<byte> buffer)
         {
-            int errorCode = 0;
-            int r = WriteFileNative(_handle!, buffer, null, out errorCode);
+            DebugAssertHandleValid(_handle!);
+            Debug.Assert(!_isAsync);
+
+            if (buffer.Length == 0)
+            {
+                return;
+            }
 
-            if (r == -1)
+            fixed (byte* p = &MemoryMarshal.GetReference(buffer))
             {
-                throw WinIOError(errorCode);
+                int bytesWritten = 0;
+                if (Interop.Kernel32.WriteFile(_handle!, p, buffer.Length, out bytesWritten, IntPtr.Zero) == 0)
+                {
+                    throw WinIOError(Marshal.GetLastPInvokeError());
+                }
             }
-            Debug.Assert(r >= 0, "PipeStream's WriteCore is likely broken.");
         }
 
-        private Task WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
+        private unsafe ValueTask WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
         {
-            var completionSource = new ReadWriteCompletionSource(this, buffer, isWrite: true);
-            int errorCode = 0;
+            Debug.Assert(_isAsync);
 
-            // Queue an async WriteFile operation and pass in a packed overlapped
-            int r;
-            unsafe
+            ReadWriteValueTaskSource vts = Interlocked.Exchange(ref _reusableWriteValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: true);
+            try
             {
-                r = WriteFileNative(_handle!, buffer.Span, completionSource.Overlapped, out errorCode);
-            }
+                vts.PrepareForOperation(buffer);
+                Debug.Assert(vts._memoryHandle.Pointer != null);
 
-            // WriteFile, the OS version, will return 0 on failure, but this WriteFileNative
-            // wrapper returns -1. This will return the following:
-            // - On error, r==-1.
-            // - On async requests that are still pending, r==-1 w/ hr==ERROR_IO_PENDING
-            // - On async requests that completed sequentially, r==0
-            //
-            // You will NEVER RELIABLY be able to get the number of buffer written back from this
-            // call when using overlapped structures!  You must not pass in a non-null
-            // lpNumBytesWritten to WriteFile when using overlapped structures!  This is by design
-            // NT behavior.
-            if (r == -1 && errorCode != Interop.Errors.ERROR_IO_PENDING)
+                // Queue an async WriteFile operation.
+                if (Interop.Kernel32.WriteFile(_handle!, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, vts._overlapped) == 0)
+                {
+                    // The operation failed, or it's pending.
+                    int errorCode = Marshal.GetLastPInvokeError();
+                    switch (errorCode)
+                    {
+                        case Interop.Errors.ERROR_IO_PENDING:
+                            // Common case: IO was initiated, completion will be handled by callback.
+                            // Register for cancellation now that the operation has been initiated.
+                            vts.RegisterForCancellation(cancellationToken);
+                            break;
+
+                        default:
+                            // Error. Callback will not be invoked.
+                            vts.Dispose();
+                            return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(WinIOError(errorCode)));
+                    }
+                }
+            }
+            catch
             {
-                completionSource.ReleaseResources();
-                throw WinIOError(errorCode);
+                vts.Dispose();
+                throw;
             }
 
-            completionSource.RegisterForCancellation(cancellationToken);
-            return completionSource.Task;
+            // Completion handled by callback.
+            vts.FinishedScheduling();
+            return new ValueTask(vts, vts.Version);
         }
 
         // Blocks until the other end of the pipe has read in all written buffer.
@@ -529,77 +577,6 @@ namespace System.IO.Pipes
             }
         }
 
-        private unsafe int ReadFileNative(SafePipeHandle handle, Span<byte> buffer, NativeOverlapped* overlapped, out int errorCode)
-        {
-            DebugAssertHandleValid(handle);
-            Debug.Assert((_isAsync && overlapped != null) || (!_isAsync && overlapped == null), "Async IO parameter screwup in call to ReadFileNative.");
-
-            // Note that async callers check to avoid calling this first, so they can call user's callback.
-            if (buffer.Length == 0)
-            {
-                errorCode = 0;
-                return 0;
-            }
-
-            int r = 0;
-            int numBytesRead = 0;
-
-            fixed (byte* p = &MemoryMarshal.GetReference(buffer))
-            {
-                r = _isAsync ?
-                    Interop.Kernel32.ReadFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) :
-                    Interop.Kernel32.ReadFile(handle, p, buffer.Length, out numBytesRead, IntPtr.Zero);
-            }
-
-            if (r == 0)
-            {
-                // In message mode, the ReadFile can inform us that there is more data to come.
-                errorCode = Marshal.GetLastPInvokeError();
-                return errorCode == Interop.Errors.ERROR_MORE_DATA ?
-                    numBytesRead :
-                    -1;
-            }
-            else
-            {
-                errorCode = 0;
-                return numBytesRead;
-            }
-        }
-
-        private unsafe int WriteFileNative(SafePipeHandle handle, ReadOnlySpan<byte> buffer, NativeOverlapped* overlapped, out int errorCode)
-        {
-            DebugAssertHandleValid(handle);
-            Debug.Assert((_isAsync && overlapped != null) || (!_isAsync && overlapped == null), "Async IO parameter screwup in call to WriteFileNative.");
-
-            // Note that async callers check to avoid calling this first, so they can call user's callback.
-            if (buffer.Length == 0)
-            {
-                errorCode = 0;
-                return 0;
-            }
-
-            int r = 0;
-            int numBytesWritten = 0;
-
-            fixed (byte* p = &MemoryMarshal.GetReference(buffer))
-            {
-                r = _isAsync ?
-                    Interop.Kernel32.WriteFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) :
-                    Interop.Kernel32.WriteFile(handle, p, buffer.Length, out numBytesWritten, IntPtr.Zero);
-            }
-
-            if (r == 0)
-            {
-                errorCode = Marshal.GetLastPInvokeError();
-                return -1;
-            }
-            else
-            {
-                errorCode = 0;
-                return numBytesWritten;
-            }
-        }
-
         internal static unsafe Interop.Kernel32.SECURITY_ATTRIBUTES GetSecAttrs(HandleInheritability inheritability)
         {
             Interop.Kernel32.SECURITY_ATTRIBUTES secAttrs = new Interop.Kernel32.SECURITY_ATTRIBUTES
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/ReadWriteCompletionSource.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/ReadWriteCompletionSource.cs
deleted file mode 100644 (file)
index af6ad5a..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Runtime.ExceptionServices;
-using System.Threading;
-
-namespace System.IO.Pipes
-{
-    internal sealed class ReadWriteCompletionSource : PipeCompletionSource<int>
-    {
-        private readonly bool _isWrite;
-        private readonly PipeStream _pipeStream;
-
-        private bool _isMessageComplete;
-        private int _numBytes; // number of buffer read OR written
-
-        internal ReadWriteCompletionSource(PipeStream stream, ReadOnlyMemory<byte> bufferToPin, bool isWrite)
-            : base(stream._threadPoolBinding!, bufferToPin)
-        {
-            _pipeStream = stream;
-            _isWrite = isWrite;
-            _isMessageComplete = true;
-        }
-
-        internal override void SetCompletedSynchronously()
-        {
-            if (!_isWrite)
-            {
-                _pipeStream.UpdateMessageCompletion(_isMessageComplete);
-            }
-
-            TrySetResult(_numBytes);
-        }
-
-        protected override void AsyncCallback(uint errorCode, uint numBytes)
-        {
-            _numBytes = (int)numBytes;
-
-            // Allow async read to finish
-            if (!_isWrite)
-            {
-                switch (errorCode)
-                {
-                    case Interop.Errors.ERROR_BROKEN_PIPE:
-                    case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
-                    case Interop.Errors.ERROR_NO_DATA:
-                        errorCode = 0;
-                        break;
-                }
-            }
-
-            // For message type buffer.
-            if (errorCode == Interop.Errors.ERROR_MORE_DATA)
-            {
-                errorCode = 0;
-                _isMessageComplete = false;
-            }
-            else
-            {
-                _isMessageComplete = true;
-            }
-
-            base.AsyncCallback(errorCode, numBytes);
-        }
-
-        protected override void HandleError(int errorCode) =>
-            TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(_pipeStream.WinIOError(errorCode)));
-    }
-}
index 12a9fe2..9f689b1 100644 (file)
@@ -142,14 +142,15 @@ namespace Microsoft.Win32.SafeHandles
             private void ReleaseResources()
             {
                 _strategy = null;
-                // Unpin any pinned buffer.
-                _memoryHandle.Dispose();
 
                 // Ensure that any cancellation callback has either completed or will never run,
                 // so that we don't try to access an overlapped for this operation after it's already
                 // been freed.
                 _cancellationRegistration.Dispose();
 
+                // Unpin any pinned buffer.
+                _memoryHandle.Dispose();
+
                 // Free the overlapped.
                 if (_overlapped != null)
                 {