<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Windows.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Windows.cs" />
- <Compile Include="System\IO\Pipes\ConnectionCompletionSource.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\NamedPipeClientStream.Windows.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Windows.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRights.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRule.cs" />
<Compile Include="System\IO\Pipes\PipeAuditRule.cs" />
- <Compile Include="System\IO\Pipes\PipeCompletionSource.cs" />
<Compile Include="System\IO\Pipes\PipesAclExtensions.cs" />
<Compile Include="System\IO\Pipes\PipeSecurity.cs" />
+ <Compile Include="System\IO\Pipes\PipeStream.ValueTaskSource.cs" />
<Compile Include="System\IO\Pipes\PipeStream.Windows.cs" />
- <Compile Include="System\IO\Pipes\ReadWriteCompletionSource.cs" />
</ItemGroup>
<!-- Windows : Win32 only -->
<ItemGroup Condition="'$(TargetsWindows)' == 'true'">
+++ /dev/null
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Runtime.ExceptionServices;
-using System.Threading;
-
-namespace System.IO.Pipes
-{
- internal sealed class ConnectionCompletionSource : PipeCompletionSource<VoidResult>
- {
- private readonly NamedPipeServerStream _serverStream;
-
- // Using RunContinuationsAsynchronously for compat reasons (old API used ThreadPool.QueueUserWorkItem for continuations)
- internal ConnectionCompletionSource(NamedPipeServerStream server)
- : base(server._threadPoolBinding!, ReadOnlyMemory<byte>.Empty)
- {
- _serverStream = server;
- }
-
- internal override void SetCompletedSynchronously()
- {
- _serverStream.State = PipeState.Connected;
- TrySetResult(default(VoidResult));
- }
-
- protected override void AsyncCallback(uint errorCode, uint numBytes)
- {
- // Special case for when the client has already connected to us.
- if (errorCode == Interop.Errors.ERROR_PIPE_CONNECTED)
- {
- errorCode = 0;
- }
-
- base.AsyncCallback(errorCode, numBytes);
- }
-
- protected override void HandleError(int errorCode) =>
- TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(Win32Marshal.GetExceptionForWin32Error(errorCode)));
-
- protected override void HandleUnexpectedCancellation() =>
- TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(Error.GetOperationAborted()));
- }
-
- internal struct VoidResult { }
-}
// The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
-using System.Runtime.CompilerServices;
+using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Security.AccessControl;
using System.Security.Principal;
/// </summary>
public sealed partial class NamedPipeServerStream : PipeStream
{
+ private ConnectionValueTaskSource? _reusableConnectionValueTaskSource; // reusable ConnectionValueTaskSource that is currently NOT being used
+
internal NamedPipeServerStream(
string pipeName,
PipeDirection direction,
Create(pipeName, direction, maxNumberOfServerInstances, transmissionMode, options, inBufferSize, outBufferSize, pipeSecurity, inheritability, additionalAccessRights);
}
+ protected override void Dispose(bool disposing)
+ {
+ try
+ {
+ Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null)?.Dispose();
+ }
+ finally
+ {
+ base.Dispose(disposing);
+ }
+ }
+
+ internal override void TryToReuse(PipeValueTaskSource source)
+ {
+ base.TryToReuse(source);
+
+ if (source is ConnectionValueTaskSource connectionSource)
+ {
+ if (Interlocked.CompareExchange(ref _reusableConnectionValueTaskSource, connectionSource, null) is not null)
+ {
+ source._preallocatedOverlapped.Dispose();
+ }
+ }
+ }
+
private void Create(string pipeName, PipeDirection direction, int maxNumberOfServerInstances,
PipeTransmissionMode transmissionMode, PipeOptions options, int inBufferSize, int outBufferSize,
HandleInheritability inheritability)
if (IsAsync)
{
- WaitForConnectionCoreAsync(CancellationToken.None).GetAwaiter().GetResult();
+ ValueTask vt = WaitForConnectionCoreAsync(CancellationToken.None);
+ vt.AsTask().GetAwaiter().GetResult();
}
else
{
this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}
- return WaitForConnectionCoreAsync(cancellationToken);
+ return WaitForConnectionCoreAsync(cancellationToken).AsTask();
}
public void Disconnect()
}
// Async version of WaitForConnection. See the comments above for more info.
- private unsafe Task WaitForConnectionCoreAsync(CancellationToken cancellationToken)
+ private unsafe ValueTask WaitForConnectionCoreAsync(CancellationToken cancellationToken)
{
CheckConnectOperationsServerWithHandle();
+ Debug.Assert(IsAsync);
- if (!IsAsync)
- {
- throw new InvalidOperationException(SR.InvalidOperation_PipeNotAsync);
- }
-
- var completionSource = new ConnectionCompletionSource(this);
-
- if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, completionSource.Overlapped))
+ ConnectionValueTaskSource? vts = Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null) ?? new ConnectionValueTaskSource(this);
+ try
{
- int errorCode = Marshal.GetLastPInvokeError();
-
- switch (errorCode)
+ vts.PrepareForOperation();
+ if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, vts._overlapped))
{
- case Interop.Errors.ERROR_IO_PENDING:
- break;
-
- // If we are here then the pipe is already connected, or there was an error
- // so we should unpin and free the overlapped.
- case Interop.Errors.ERROR_PIPE_CONNECTED:
- // IOCompletitionCallback will not be called because we completed synchronously.
- completionSource.ReleaseResources();
- if (State == PipeState.Connected)
- {
- throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected);
- }
- completionSource.SetCompletedSynchronously();
-
- // We return a cached task instead of TaskCompletionSource's Task allowing the GC to collect it.
- return Task.CompletedTask;
-
- default:
- completionSource.ReleaseResources();
- throw Win32Marshal.GetExceptionForWin32Error(errorCode);
+ int errorCode = Marshal.GetLastPInvokeError();
+ switch (errorCode)
+ {
+ case Interop.Errors.ERROR_IO_PENDING:
+ // Common case: IO was initiated, completion will be handled by callback.
+ // Register for cancellation now that the operation has been initiated.
+ vts.RegisterForCancellation(cancellationToken);
+ break;
+
+ case Interop.Errors.ERROR_PIPE_CONNECTED:
+ // If we are here then the pipe is already connected.
+ // IOCompletitionCallback will not be called because we completed synchronously.
+ vts.Dispose();
+ if (State == PipeState.Connected)
+ {
+ return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected)));
+ }
+ State = PipeState.Connected;
+ return ValueTask.CompletedTask;
+
+ default:
+ vts.Dispose();
+ return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(Win32Marshal.GetExceptionForWin32Error(errorCode)));
+ }
}
}
+ catch
+ {
+ vts.Dispose();
+ throw;
+ }
- // If we are here then connection is pending.
- completionSource.RegisterForCancellation(cancellationToken);
-
- return completionSource.Task;
+ // Completion handled by callback.
+ vts.FinishedScheduling();
+ return new ValueTask(vts, vts.Version);
}
private void CheckConnectOperationsServerWithHandle()
+++ /dev/null
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Buffers;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
-using System.Security;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace System.IO.Pipes
-{
- internal abstract unsafe class PipeCompletionSource<TResult> : TaskCompletionSource<TResult>
- {
- private const int NoResult = 0;
- private const int ResultSuccess = 1;
- private const int ResultError = 2;
- private const int RegisteringCancellation = 4;
- private const int CompletedCallback = 8;
-
- private readonly ThreadPoolBoundHandle _threadPoolBinding;
-
- private CancellationTokenRegistration _cancellationRegistration;
- private int _errorCode;
- private NativeOverlapped* _overlapped;
- private MemoryHandle _pinnedMemory;
- private int _state;
-
-#if DEBUG
- private bool _cancellationHasBeenRegistered;
-#endif
-
- // Using RunContinuationsAsynchronously for compat reasons (old API used ThreadPool.QueueUserWorkItem for continuations)
- protected PipeCompletionSource(ThreadPoolBoundHandle handle, ReadOnlyMemory<byte> bufferToPin)
- : base(TaskCreationOptions.RunContinuationsAsynchronously)
- {
- Debug.Assert(handle != null, "handle is null");
-
- _threadPoolBinding = handle;
- _state = NoResult;
-
- _pinnedMemory = bufferToPin.Pin();
- _overlapped = _threadPoolBinding.AllocateNativeOverlapped((errorCode, numBytes, pOverlapped) =>
- {
- var completionSource = (PipeCompletionSource<TResult>)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!;
- Debug.Assert(completionSource.Overlapped == pOverlapped);
-
- completionSource.AsyncCallback(errorCode, numBytes);
- }, this, null);
- }
-
- internal NativeOverlapped* Overlapped
- {
- get { return _overlapped; }
- }
-
- internal void RegisterForCancellation(CancellationToken cancellationToken)
- {
-#if DEBUG
- Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
- _cancellationHasBeenRegistered = true;
-#endif
-
- // Quick check to make sure that the cancellation token supports cancellation, and that the IO hasn't completed
- if (cancellationToken.CanBeCanceled && Overlapped != null)
- {
- // Register the cancellation only if the IO hasn't completed
- int state = Interlocked.CompareExchange(ref _state, RegisteringCancellation, NoResult);
- if (state == NoResult)
- {
- // Register the cancellation
- _cancellationRegistration = cancellationToken.UnsafeRegister(thisRef => ((PipeCompletionSource<TResult>)thisRef!).Cancel(), this);
-
- // Grab the state for case if IO completed while we were setting the registration.
- state = Interlocked.Exchange(ref _state, NoResult);
- }
- else if (state != CompletedCallback)
- {
- // IO already completed and we have grabbed result state.
- // Set NoResult to prevent invocation of CompleteCallback(result state) from AsyncCallback(...)
- state = Interlocked.Exchange(ref _state, NoResult);
- }
-
- // If we have the result state of completed IO call CompleteCallback(result).
- // Otherwise IO not completed.
- if ((state & (ResultSuccess | ResultError)) != 0)
- {
- CompleteCallback(state);
- }
- }
- }
-
- internal void ReleaseResources()
- {
- _cancellationRegistration.Dispose();
-
- // NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
- // (this is why we disposed the registration above)
- if (_overlapped != null)
- {
- _threadPoolBinding.FreeNativeOverlapped(Overlapped);
- _overlapped = null;
- }
-
- _pinnedMemory.Dispose();
- }
-
- internal abstract void SetCompletedSynchronously();
-
- protected virtual void AsyncCallback(uint errorCode, uint numBytes)
- {
- int resultState;
- if (errorCode == 0)
- {
- resultState = ResultSuccess;
- }
- else
- {
- resultState = ResultError;
- _errorCode = (int)errorCode;
- }
-
- // Store the result so that other threads can observe it
- // and if no other thread is registering cancellation, continue.
- // Otherwise CompleteCallback(resultState) will be invoked by RegisterForCancellation().
- if (Interlocked.Exchange(ref _state, resultState) == NoResult)
- {
- // Now try to prevent invocation of CompleteCallback(resultState) from RegisterForCancellation().
- // Otherwise, thread responsible for registering cancellation stole the result and it will invoke CompleteCallback(resultState).
- if (Interlocked.Exchange(ref _state, CompletedCallback) != NoResult)
- {
- CompleteCallback(resultState);
- }
- }
- }
-
- protected abstract void HandleError(int errorCode);
-
- private void Cancel()
- {
- SafeHandle handle = _threadPoolBinding.Handle;
- NativeOverlapped* overlapped = Overlapped;
-
- // If the handle is still valid, attempt to cancel the IO
- if (!handle.IsInvalid && !Interop.Kernel32.CancelIoEx(handle, overlapped))
- {
- // This case should not have any consequences although
- // it will be easier to debug if there exists any special case
- // we are not aware of.
- int errorCode = Marshal.GetLastPInvokeError();
- Debug.WriteLine("CancelIoEx finished with error code {0}.", errorCode);
- }
- }
-
- protected virtual void HandleUnexpectedCancellation() => TrySetCanceled();
-
- private void CompleteCallback(int resultState)
- {
- Debug.Assert(resultState == ResultSuccess || resultState == ResultError, $"Unexpected result state {resultState}");
- CancellationToken cancellationToken = _cancellationRegistration.Token;
-
- ReleaseResources();
-
- if (resultState == ResultError)
- {
- if (_errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
- {
- if (cancellationToken.CanBeCanceled && !cancellationToken.IsCancellationRequested)
- {
- HandleUnexpectedCancellation();
- }
- else
- {
- // otherwise set canceled
- TrySetCanceled(cancellationToken);
- }
- }
- else
- {
- HandleError(_errorCode);
- }
- }
- else
- {
- SetCompletedSynchronously();
- }
- }
- }
-}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks.Sources;
+
+namespace System.IO.Pipes
+{
+ public abstract partial class PipeStream : Stream
+ {
+ internal abstract unsafe class PipeValueTaskSource : IValueTaskSource<int>, IValueTaskSource
+ {
+ internal static readonly IOCompletionCallback s_ioCallback = IOCallback;
+
+ internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
+ internal readonly PipeStream _pipeStream;
+ internal MemoryHandle _memoryHandle;
+ internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
+ internal NativeOverlapped* _overlapped;
+ internal CancellationTokenRegistration _cancellationRegistration;
+ /// <summary>
+ /// 0 when the operation hasn't been scheduled, non-zero when either the operation has completed,
+ /// in which case its value is a packed combination of the error code and number of bytes, or when
+ /// the read/write call has finished scheduling the async operation.
+ /// </summary>
+ internal ulong _result;
+
+ protected PipeValueTaskSource(PipeStream pipeStream)
+ {
+ _pipeStream = pipeStream;
+ _source.RunContinuationsAsynchronously = true;
+ _preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);
+ }
+
+ internal void Dispose()
+ {
+ ReleaseResources();
+ _preallocatedOverlapped.Dispose();
+ }
+
+ internal void PrepareForOperation(ReadOnlyMemory<byte> memory = default)
+ {
+ _result = 0;
+ _memoryHandle = memory.Pin();
+ _overlapped = _pipeStream._threadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
+ }
+
+ public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
+ public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags);
+ void IValueTaskSource.GetResult(short token) => GetResult(token);
+ public int GetResult(short token)
+ {
+ try
+ {
+ return _source.GetResult(token);
+ }
+ finally
+ {
+ // The instance is ready to be reused
+ _pipeStream.TryToReuse(this);
+ }
+ }
+
+ internal short Version => _source.Version;
+
+ internal void RegisterForCancellation(CancellationToken cancellationToken)
+ {
+ Debug.Assert(_overlapped != null);
+ if (cancellationToken.CanBeCanceled)
+ {
+ try
+ {
+ _cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
+ {
+ PipeValueTaskSource vts = (PipeValueTaskSource)s!;
+ if (!vts._pipeStream.SafePipeHandle.IsInvalid)
+ {
+ try
+ {
+ Interop.Kernel32.CancelIoEx(vts._pipeStream.SafePipeHandle, vts._overlapped);
+ // Ignore all failures: no matter whether it succeeds or fails, completion is handled via the IOCallback.
+ }
+ catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently
+ }
+ }, this);
+ }
+ catch (OutOfMemoryException)
+ {
+ // Just in case trying to register OOMs, we ignore it in order to
+ // protect the higher-level calling code that would proceed to unpin
+ // memory that might be actively used by an in-flight async operation.
+ }
+ }
+ }
+
+ internal void ReleaseResources()
+ {
+ // Ensure that any cancellation callback has either completed or will never run, so that
+ // we don't try to access an overlapped for this operation after it's already been freed.
+ _cancellationRegistration.Dispose();
+
+ // Unpin any pinned buffer.
+ _memoryHandle.Dispose();
+
+ // Free the overlapped.
+ if (_overlapped != null)
+ {
+ _pipeStream._threadPoolBinding!.FreeNativeOverlapped(_overlapped);
+ _overlapped = null;
+ }
+ }
+
+ // After calling Read/WriteFile to start the asynchronous operation, the caller may configure cancellation,
+ // and only after that should we allow for completing the operation, as completion needs to factor in work
+ // done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's
+ // responsible for calling Complete and for passing the necessary data between parties.
+
+ /// <summary>Invoked when the async operation finished being scheduled.</summary>
+ internal void FinishedScheduling()
+ {
+ // Set the value to 1. If it was already non-0, then the asynchronous operation already completed but
+ // didn't call Complete, so we call Complete here. The read result value is the data (packed) necessary
+ // to make the call.
+ ulong result = Interlocked.Exchange(ref _result, 1);
+ if (result != 0)
+ {
+ Complete(errorCode: (uint)result, numBytes: (uint)(result >> 32) & 0x7FFFFFFF);
+ }
+ }
+
+ /// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
+ private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
+ {
+ PipeValueTaskSource? vts = (PipeValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
+ Debug.Assert(vts is not null);
+ Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");
+
+ // Set the value to a packed combination of the error code and number of bytes (plus a high-bit 1
+ // to ensure the value we're setting is non-zero). If it was already non-0 (the common case), then
+ // the call site already finished scheduling the async operation, in which case we're ready to complete.
+ Debug.Assert(numBytes < int.MaxValue);
+ if (Interlocked.Exchange(ref vts._result, (1ul << 63) | ((ulong)numBytes << 32) | errorCode) != 0)
+ {
+ vts.Complete(errorCode, numBytes);
+ }
+ }
+
+ private void Complete(uint errorCode, uint numBytes)
+ {
+ ReleaseResources();
+ CompleteCore(errorCode, numBytes);
+ }
+
+ private protected abstract void CompleteCore(uint errorCode, uint numBytes);
+ }
+
+ internal sealed class ReadWriteValueTaskSource : PipeValueTaskSource
+ {
+ internal readonly bool _isWrite;
+
+ internal ReadWriteValueTaskSource(PipeStream stream, bool isWrite) : base(stream) => _isWrite = isWrite;
+
+ private protected override void CompleteCore(uint errorCode, uint numBytes)
+ {
+ if (!_isWrite)
+ {
+ bool messageCompletion = true;
+
+ switch (errorCode)
+ {
+ case Interop.Errors.ERROR_BROKEN_PIPE:
+ case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
+ case Interop.Errors.ERROR_NO_DATA:
+ errorCode = 0;
+ break;
+
+ case Interop.Errors.ERROR_MORE_DATA:
+ errorCode = 0;
+ messageCompletion = false;
+ break;
+ }
+
+ _pipeStream.UpdateMessageCompletion(messageCompletion);
+ }
+
+ switch (errorCode)
+ {
+ case 0:
+ // Success
+ _source.SetResult((int)numBytes);
+ break;
+
+ case Interop.Errors.ERROR_OPERATION_ABORTED:
+ // Cancellation
+ CancellationToken ct = _cancellationRegistration.Token;
+ _source.SetException(ct.IsCancellationRequested ? new OperationCanceledException(ct) : new OperationCanceledException());
+ break;
+
+ default:
+ // Failure
+ _source.SetException(_pipeStream.WinIOError((int)errorCode));
+ break;
+ }
+ }
+ }
+
+ internal sealed class ConnectionValueTaskSource : PipeValueTaskSource
+ {
+ internal ConnectionValueTaskSource(NamedPipeServerStream server) : base(server) { }
+
+ private protected override void CompleteCore(uint errorCode, uint numBytes)
+ {
+ switch (errorCode)
+ {
+ case 0:
+ case Interop.Errors.ERROR_PIPE_CONNECTED: // special case for when the client has already connected to us
+ // Success
+ _pipeStream.State = PipeState.Connected;
+ _source.SetResult((int)numBytes);
+ break;
+
+ case Interop.Errors.ERROR_OPERATION_ABORTED:
+ // Cancellation
+ CancellationToken ct = _cancellationRegistration.Token;
+ _source.SetException(ct.CanBeCanceled && !ct.IsCancellationRequested ? Error.GetOperationAborted() : new OperationCanceledException(ct));
+ break;
+
+ default:
+ // Failure
+ _source.SetException(Win32Marshal.GetExceptionForWin32Error((int)errorCode));
+ break;
+ }
+ }
+ }
+ }
+}
// The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
+using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
+using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Win32.SafeHandles;
-using System.Runtime.Versioning;
namespace System.IO.Pipes
{
{
internal const bool CheckOperationsRequiresSetHandle = true;
internal ThreadPoolBoundHandle? _threadPoolBinding;
+ private ReadWriteValueTaskSource? _reusableReadValueTaskSource; // reusable ReadWriteValueTaskSource for read operations, that is currently NOT being used
+ private ReadWriteValueTaskSource? _reusableWriteValueTaskSource; // reusable ReadWriteValueTaskSource for write operations, that is currently NOT being used
public override int Read(byte[] buffer, int offset, int count)
{
return Task.CompletedTask;
}
- return WriteAsyncCore(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+ return WriteAsyncCore(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
return default;
}
- return new ValueTask(WriteAsyncCore(buffer, cancellationToken));
+ return WriteAsyncCore(buffer, cancellationToken);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
_threadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle);
}
+ internal virtual void TryToReuse(PipeValueTaskSource source)
+ {
+ source._source.Reset();
+
+ if (source is ReadWriteValueTaskSource readWriteSource)
+ {
+ ref ReadWriteValueTaskSource? field = ref readWriteSource._isWrite ? ref _reusableWriteValueTaskSource : ref _reusableReadValueTaskSource;
+ if (Interlocked.CompareExchange(ref field, readWriteSource, null) is not null)
+ {
+ source._preallocatedOverlapped.Dispose();
+ }
+ }
+ }
+
private void DisposeCore(bool disposing)
{
if (disposing)
{
_threadPoolBinding?.Dispose();
+ Interlocked.Exchange(ref _reusableReadValueTaskSource, null)?.Dispose();
+ Interlocked.Exchange(ref _reusableWriteValueTaskSource, null)?.Dispose();
}
}
private unsafe int ReadCore(Span<byte> buffer)
{
- int errorCode = 0;
- int r = ReadFileNative(_handle!, buffer, null, out errorCode);
+ DebugAssertHandleValid(_handle!);
+ Debug.Assert(!_isAsync);
- if (r == -1)
+ if (buffer.Length == 0)
{
- // If the other side has broken the connection, set state to Broken and return 0
- if (errorCode == Interop.Errors.ERROR_BROKEN_PIPE ||
- errorCode == Interop.Errors.ERROR_PIPE_NOT_CONNECTED)
+ return 0;
+ }
+
+ fixed (byte* p = &MemoryMarshal.GetReference(buffer))
+ {
+ int bytesRead = 0;
+ if (Interop.Kernel32.ReadFile(_handle!, p, buffer.Length, out bytesRead, IntPtr.Zero) != 0)
{
- State = PipeState.Broken;
- r = 0;
+ _isMessageComplete = true;
+ return bytesRead;
}
else
{
- throw Win32Marshal.GetExceptionForWin32Error(errorCode, string.Empty);
- }
- }
- _isMessageComplete = (errorCode != Interop.Errors.ERROR_MORE_DATA);
+ int errorCode = Marshal.GetLastPInvokeError();
+ _isMessageComplete = errorCode != Interop.Errors.ERROR_MORE_DATA;
+ switch (errorCode)
+ {
+ case Interop.Errors.ERROR_MORE_DATA:
+ return bytesRead;
- Debug.Assert(r >= 0, "PipeStream's ReadCore is likely broken.");
+ case Interop.Errors.ERROR_BROKEN_PIPE:
+ case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
+ State = PipeState.Broken;
+ return 0;
- return r;
+ default:
+ throw Win32Marshal.GetExceptionForWin32Error(errorCode, string.Empty);
+ }
+ }
+ }
}
- private ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
+ private unsafe ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
{
- var completionSource = new ReadWriteCompletionSource(this, buffer, isWrite: false);
+ Debug.Assert(_isAsync);
- // Queue an async ReadFile operation and pass in a packed overlapped
- int errorCode = 0;
- int r;
- unsafe
+ ReadWriteValueTaskSource vts = Interlocked.Exchange(ref _reusableReadValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: false);
+ try
{
- r = ReadFileNative(_handle!, buffer.Span, completionSource.Overlapped, out errorCode);
- }
+ vts.PrepareForOperation(buffer);
+ Debug.Assert(vts._memoryHandle.Pointer != null);
- // ReadFile, the OS version, will return 0 on failure, but this ReadFileNative wrapper
- // returns -1. This will return the following:
- // - On error, r==-1.
- // - On async requests that are still pending, r==-1 w/ hr==ERROR_IO_PENDING
- // - On async requests that completed sequentially, r==0
- //
- // You will NEVER RELIABLY be able to get the number of buffer read back from this call
- // when using overlapped structures! You must not pass in a non-null lpNumBytesRead to
- // ReadFile when using overlapped structures! This is by design NT behavior.
- if (r == -1)
- {
- switch (errorCode)
+ // Queue an async ReadFile operation.
+ if (Interop.Kernel32.ReadFile(_handle!, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, vts._overlapped) == 0)
{
- // One side has closed its handle or server disconnected.
- // Set the state to Broken and do some cleanup work
- case Interop.Errors.ERROR_BROKEN_PIPE:
- case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
- State = PipeState.Broken;
-
- unsafe
- {
- // Clear the overlapped status bit for this special case. Failure to do so looks
- // like we are freeing a pending overlapped.
- completionSource.Overlapped->InternalLow = IntPtr.Zero;
- }
-
- completionSource.ReleaseResources();
- UpdateMessageCompletion(true);
- return new ValueTask<int>(0);
-
- case Interop.Errors.ERROR_IO_PENDING:
- break;
-
- default:
- throw Win32Marshal.GetExceptionForWin32Error(errorCode);
+ // The operation failed, or it's pending.
+ int errorCode = Marshal.GetLastPInvokeError();
+ switch (errorCode)
+ {
+ case Interop.Errors.ERROR_IO_PENDING:
+ // Common case: IO was initiated, completion will be handled by callback.
+ // Register for cancellation now that the operation has been initiated.
+ vts.RegisterForCancellation(cancellationToken);
+ break;
+
+ case Interop.Errors.ERROR_MORE_DATA:
+ // The operation is completing asynchronously but there's nothing to cancel.
+ break;
+
+ // One side has closed its handle or server disconnected.
+ // Set the state to Broken and do some cleanup work
+ case Interop.Errors.ERROR_BROKEN_PIPE:
+ case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
+ State = PipeState.Broken;
+ vts._overlapped->InternalLow = IntPtr.Zero;
+ vts.Dispose();
+ UpdateMessageCompletion(true);
+ return new ValueTask<int>(0);
+
+ default:
+ // Error. Callback will not be called.
+ vts.Dispose();
+ return ValueTask.FromException<int>(Win32Marshal.GetExceptionForWin32Error(errorCode));
+ }
}
}
+ catch
+ {
+ vts.Dispose();
+ throw;
+ }
- completionSource.RegisterForCancellation(cancellationToken);
- return new ValueTask<int>(completionSource.Task);
+ vts.FinishedScheduling();
+ return new ValueTask<int>(vts, vts.Version);
}
private unsafe void WriteCore(ReadOnlySpan<byte> buffer)
{
- int errorCode = 0;
- int r = WriteFileNative(_handle!, buffer, null, out errorCode);
+ DebugAssertHandleValid(_handle!);
+ Debug.Assert(!_isAsync);
+
+ if (buffer.Length == 0)
+ {
+ return;
+ }
- if (r == -1)
+ fixed (byte* p = &MemoryMarshal.GetReference(buffer))
{
- throw WinIOError(errorCode);
+ int bytesWritten = 0;
+ if (Interop.Kernel32.WriteFile(_handle!, p, buffer.Length, out bytesWritten, IntPtr.Zero) == 0)
+ {
+ throw WinIOError(Marshal.GetLastPInvokeError());
+ }
}
- Debug.Assert(r >= 0, "PipeStream's WriteCore is likely broken.");
}
- private Task WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
+ private unsafe ValueTask WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
- var completionSource = new ReadWriteCompletionSource(this, buffer, isWrite: true);
- int errorCode = 0;
+ Debug.Assert(_isAsync);
- // Queue an async WriteFile operation and pass in a packed overlapped
- int r;
- unsafe
+ ReadWriteValueTaskSource vts = Interlocked.Exchange(ref _reusableWriteValueTaskSource, null) ?? new ReadWriteValueTaskSource(this, isWrite: true);
+ try
{
- r = WriteFileNative(_handle!, buffer.Span, completionSource.Overlapped, out errorCode);
- }
+ vts.PrepareForOperation(buffer);
+ Debug.Assert(vts._memoryHandle.Pointer != null);
- // WriteFile, the OS version, will return 0 on failure, but this WriteFileNative
- // wrapper returns -1. This will return the following:
- // - On error, r==-1.
- // - On async requests that are still pending, r==-1 w/ hr==ERROR_IO_PENDING
- // - On async requests that completed sequentially, r==0
- //
- // You will NEVER RELIABLY be able to get the number of buffer written back from this
- // call when using overlapped structures! You must not pass in a non-null
- // lpNumBytesWritten to WriteFile when using overlapped structures! This is by design
- // NT behavior.
- if (r == -1 && errorCode != Interop.Errors.ERROR_IO_PENDING)
+ // Queue an async WriteFile operation.
+ if (Interop.Kernel32.WriteFile(_handle!, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, vts._overlapped) == 0)
+ {
+ // The operation failed, or it's pending.
+ int errorCode = Marshal.GetLastPInvokeError();
+ switch (errorCode)
+ {
+ case Interop.Errors.ERROR_IO_PENDING:
+ // Common case: IO was initiated, completion will be handled by callback.
+ // Register for cancellation now that the operation has been initiated.
+ vts.RegisterForCancellation(cancellationToken);
+ break;
+
+ default:
+ // Error. Callback will not be invoked.
+ vts.Dispose();
+ return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(WinIOError(errorCode)));
+ }
+ }
+ }
+ catch
{
- completionSource.ReleaseResources();
- throw WinIOError(errorCode);
+ vts.Dispose();
+ throw;
}
- completionSource.RegisterForCancellation(cancellationToken);
- return completionSource.Task;
+ // Completion handled by callback.
+ vts.FinishedScheduling();
+ return new ValueTask(vts, vts.Version);
}
// Blocks until the other end of the pipe has read in all written buffer.
}
}
- private unsafe int ReadFileNative(SafePipeHandle handle, Span<byte> buffer, NativeOverlapped* overlapped, out int errorCode)
- {
- DebugAssertHandleValid(handle);
- Debug.Assert((_isAsync && overlapped != null) || (!_isAsync && overlapped == null), "Async IO parameter screwup in call to ReadFileNative.");
-
- // Note that async callers check to avoid calling this first, so they can call user's callback.
- if (buffer.Length == 0)
- {
- errorCode = 0;
- return 0;
- }
-
- int r = 0;
- int numBytesRead = 0;
-
- fixed (byte* p = &MemoryMarshal.GetReference(buffer))
- {
- r = _isAsync ?
- Interop.Kernel32.ReadFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) :
- Interop.Kernel32.ReadFile(handle, p, buffer.Length, out numBytesRead, IntPtr.Zero);
- }
-
- if (r == 0)
- {
- // In message mode, the ReadFile can inform us that there is more data to come.
- errorCode = Marshal.GetLastPInvokeError();
- return errorCode == Interop.Errors.ERROR_MORE_DATA ?
- numBytesRead :
- -1;
- }
- else
- {
- errorCode = 0;
- return numBytesRead;
- }
- }
-
- private unsafe int WriteFileNative(SafePipeHandle handle, ReadOnlySpan<byte> buffer, NativeOverlapped* overlapped, out int errorCode)
- {
- DebugAssertHandleValid(handle);
- Debug.Assert((_isAsync && overlapped != null) || (!_isAsync && overlapped == null), "Async IO parameter screwup in call to WriteFileNative.");
-
- // Note that async callers check to avoid calling this first, so they can call user's callback.
- if (buffer.Length == 0)
- {
- errorCode = 0;
- return 0;
- }
-
- int r = 0;
- int numBytesWritten = 0;
-
- fixed (byte* p = &MemoryMarshal.GetReference(buffer))
- {
- r = _isAsync ?
- Interop.Kernel32.WriteFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) :
- Interop.Kernel32.WriteFile(handle, p, buffer.Length, out numBytesWritten, IntPtr.Zero);
- }
-
- if (r == 0)
- {
- errorCode = Marshal.GetLastPInvokeError();
- return -1;
- }
- else
- {
- errorCode = 0;
- return numBytesWritten;
- }
- }
-
internal static unsafe Interop.Kernel32.SECURITY_ATTRIBUTES GetSecAttrs(HandleInheritability inheritability)
{
Interop.Kernel32.SECURITY_ATTRIBUTES secAttrs = new Interop.Kernel32.SECURITY_ATTRIBUTES
+++ /dev/null
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Runtime.ExceptionServices;
-using System.Threading;
-
-namespace System.IO.Pipes
-{
- internal sealed class ReadWriteCompletionSource : PipeCompletionSource<int>
- {
- private readonly bool _isWrite;
- private readonly PipeStream _pipeStream;
-
- private bool _isMessageComplete;
- private int _numBytes; // number of buffer read OR written
-
- internal ReadWriteCompletionSource(PipeStream stream, ReadOnlyMemory<byte> bufferToPin, bool isWrite)
- : base(stream._threadPoolBinding!, bufferToPin)
- {
- _pipeStream = stream;
- _isWrite = isWrite;
- _isMessageComplete = true;
- }
-
- internal override void SetCompletedSynchronously()
- {
- if (!_isWrite)
- {
- _pipeStream.UpdateMessageCompletion(_isMessageComplete);
- }
-
- TrySetResult(_numBytes);
- }
-
- protected override void AsyncCallback(uint errorCode, uint numBytes)
- {
- _numBytes = (int)numBytes;
-
- // Allow async read to finish
- if (!_isWrite)
- {
- switch (errorCode)
- {
- case Interop.Errors.ERROR_BROKEN_PIPE:
- case Interop.Errors.ERROR_PIPE_NOT_CONNECTED:
- case Interop.Errors.ERROR_NO_DATA:
- errorCode = 0;
- break;
- }
- }
-
- // For message type buffer.
- if (errorCode == Interop.Errors.ERROR_MORE_DATA)
- {
- errorCode = 0;
- _isMessageComplete = false;
- }
- else
- {
- _isMessageComplete = true;
- }
-
- base.AsyncCallback(errorCode, numBytes);
- }
-
- protected override void HandleError(int errorCode) =>
- TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(_pipeStream.WinIOError(errorCode)));
- }
-}
private void ReleaseResources()
{
_strategy = null;
- // Unpin any pinned buffer.
- _memoryHandle.Dispose();
// Ensure that any cancellation callback has either completed or will never run,
// so that we don't try to access an overlapped for this operation after it's already
// been freed.
_cancellationRegistration.Dispose();
+ // Unpin any pinned buffer.
+ _memoryHandle.Dispose();
+
// Free the overlapped.
if (_overlapped != null)
{