Use UnixFileStream's ReadAsync implementation on Windows as well (#56682)
authorStephen Toub <stoub@microsoft.com>
Mon, 2 Aug 2021 09:03:53 +0000 (05:03 -0400)
committerGitHub <noreply@github.com>
Mon, 2 Aug 2021 09:03:53 +0000 (11:03 +0200)
UnixFileStream's ReadAsync implementation uses a reusable IValueTaskSource implementation to avoid allocating a new work item on every read.  We can push that implementation down to OSFileStreamStrategy, and then use it for the Windows implementation of ReadAsync as well when IsAsync==false, rather than delegating to the base Stream implementation.

This PR almost entirely just moves code around.  The only change to logic is in RandomAccess.Windows.cs, to only set an offset into the NativeOverlapped if the SafeFileHandle is seekable; otherwise, it fails when used with pipes.

src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs
src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs
src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs
src/libraries/System.Private.CoreLib/src/System/IO/Strategies/SyncWindowsFileStreamStrategy.cs
src/libraries/System.Private.CoreLib/src/System/IO/Strategies/UnixFileStreamStrategy.cs

index 2bb8d46..09f7a69 100644 (file)
@@ -695,8 +695,11 @@ namespace System.IO
             Debug.Assert(!handle.IsAsync);
 
             NativeOverlapped result = default;
-            result.OffsetLow = unchecked((int)fileOffset);
-            result.OffsetHigh = (int)(fileOffset >> 32);
+            if (handle.CanSeek)
+            {
+                result.OffsetLow = unchecked((int)fileOffset);
+                result.OffsetHigh = (int)(fileOffset >> 32);
+            }
             return result;
         }
 
index 9bc4965..12af993 100644 (file)
@@ -21,13 +21,7 @@ namespace System.IO.Strategies
 
         internal override bool IsAsync => true;
 
-        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
-            => ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
-
         public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
-            => ReadAsyncInternal(destination, cancellationToken);
-
-        private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
         {
             if (!CanSeek)
             {
@@ -52,17 +46,11 @@ namespace System.IO.Strategies
                 : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
         }
 
-        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
-            => WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
-
         public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
-            => WriteAsyncInternal(buffer, cancellationToken);
-
-        private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
         {
-            long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
+            long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, buffer.Length) - buffer.Length : -1;
 
-            (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, writeOffset, cancellationToken);
+            (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, buffer, writeOffset, cancellationToken);
             return vts != null
                 ? new ValueTask(vts, vts.Version)
                 : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
@@ -120,15 +108,5 @@ namespace System.IO.Strategies
                 }
             }
         }
-
-        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
-            TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
-
-        public override int EndRead(IAsyncResult asyncResult) => TaskToApm.End<int>(asyncResult);
-
-        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
-            TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
-
-        public override void EndWrite(IAsyncResult asyncResult) => TaskToApm.End(asyncResult);
     }
 }
index 2faf4c2..260624b 100644 (file)
@@ -4,6 +4,7 @@
 using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
+using System.Threading.Tasks.Sources;
 using Microsoft.Win32.SafeHandles;
 
 namespace System.IO.Strategies
@@ -13,6 +14,7 @@ namespace System.IO.Strategies
     {
         protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws
         private readonly FileAccess _access; // What file was opened for.
+        private ReadAsyncTaskSource? _readAsyncTaskSource; // Cached IValueTaskSource used for async-over-sync reads
 
         protected long _filePosition;
         protected long _length = -1; // negative means that hasn't been fetched.
@@ -69,6 +71,8 @@ namespace System.IO.Strategies
             }
         }
 
+        internal override bool IsAsync => _fileHandle.IsAsync;
+
         public sealed override bool CanSeek => _fileHandle.CanSeek;
 
         public sealed override bool CanRead => !_fileHandle.IsClosed && (_access & FileAccess.Read) != 0;
@@ -278,5 +282,145 @@ namespace System.IO.Strategies
             RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
             _filePosition += buffer.Length;
         }
+
+        public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
+            TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+
+        public sealed override void EndWrite(IAsyncResult asyncResult) =>
+            TaskToApm.End(asyncResult);
+
+        public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
+            WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
+
+        public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
+        {
+            long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
+            return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken);
+        }
+
+        public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
+            TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+
+        public sealed override int EndRead(IAsyncResult asyncResult) =>
+            TaskToApm.End<int>(asyncResult);
+
+        public sealed override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
+            ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
+
+        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
+        {
+            if (!CanSeek)
+            {
+                return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
+            }
+
+            // This implementation updates the file position before the operation starts and updates it after incomplete read.
+            // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
+            long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
+            ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
+            return rats.QueueRead(destination, readOffset, cancellationToken);
+        }
+
+        /// <summary>Provides a reusable ValueTask-backing object for implementing ReadAsync.</summary>
+        private sealed class ReadAsyncTaskSource : IValueTaskSource<int>, IThreadPoolWorkItem
+        {
+            private readonly OSFileStreamStrategy _stream;
+            private ManualResetValueTaskSourceCore<int> _source;
+
+            private Memory<byte> _destination;
+            private long _readOffset;
+            private ExecutionContext? _context;
+            private CancellationToken _cancellationToken;
+
+            public ReadAsyncTaskSource(OSFileStreamStrategy stream) => _stream = stream;
+
+            public ValueTask<int> QueueRead(Memory<byte> destination, long readOffset, CancellationToken cancellationToken)
+            {
+                _destination = destination;
+                _readOffset = readOffset;
+                _cancellationToken = cancellationToken;
+                _context = ExecutionContext.Capture();
+
+                ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
+                return new ValueTask<int>(this, _source.Version);
+            }
+
+            void IThreadPoolWorkItem.Execute()
+            {
+                if (_context is null || _context.IsDefault)
+                {
+                    Read();
+                }
+                else
+                {
+                    ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.Read(), this);
+                }
+            }
+
+            private void Read()
+            {
+                Exception? error = null;
+                int result = 0;
+
+                try
+                {
+                    if (_cancellationToken.IsCancellationRequested)
+                    {
+                        error = new OperationCanceledException(_cancellationToken);
+                    }
+                    else
+                    {
+                        result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset);
+                    }
+                }
+                catch (Exception e)
+                {
+                    error = e;
+                }
+                finally
+                {
+                    // if the read was incomplete, we need to update the file position:
+                    if (result != _destination.Length)
+                    {
+                        _stream.OnIncompleteRead(_destination.Length, result);
+                    }
+
+                    _destination = default;
+                    _readOffset = -1;
+                    _cancellationToken = default;
+                    _context = null;
+                }
+
+                if (error is not null)
+                {
+                    _source.SetException(error);
+                }
+                else
+                {
+                    _source.SetResult(result);
+                }
+            }
+
+            int IValueTaskSource<int>.GetResult(short token)
+            {
+                try
+                {
+                    return _source.GetResult(token);
+                }
+                finally
+                {
+                    _source.Reset();
+#pragma warning disable CS0197
+                    Volatile.Write(ref _stream._readAsyncTaskSource, this);
+#pragma warning restore CS0197
+                }
+            }
+
+            ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) =>
+                _source.GetStatus(token);
+
+            void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
+                _source.OnCompleted(continuation, state, token, flags);
+        }
     }
 }
index d25fcd6..5e25132 100644 (file)
@@ -1,9 +1,6 @@
 // Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the MIT license.
 
-using System.Runtime.InteropServices;
-using System.Threading;
-using System.Threading.Tasks;
 using Microsoft.Win32.SafeHandles;
 
 namespace System.IO.Strategies
@@ -20,45 +17,5 @@ namespace System.IO.Strategies
         }
 
         internal override bool IsAsync => false;
-
-        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
-        {
-            // If we weren't opened for asynchronous I/O, we still call to the base implementation so that
-            // Read is invoked asynchronously.  But we can do so using the base Stream's internal helper
-            // that bypasses delegating to BeginRead, since we already know this is FileStream rather
-            // than something derived from it and what our BeginRead implementation is going to do.
-            return BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
-        }
-
-        public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
-        {
-            // If we weren't opened for asynchronous I/O, we still call to the base implementation so that
-            // Read is invoked asynchronously.  But if we have a byte[], we can do so using the base Stream's
-            // internal helper that bypasses delegating to BeginRead, since we already know this is FileStream
-            // rather than something derived from it and what our BeginRead implementation is going to do.
-            return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
-                new ValueTask<int>(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
-                base.ReadAsync(buffer, cancellationToken);
-        }
-
-        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
-        {
-            // If we weren't opened for asynchronous I/O, we still call to the base implementation so that
-            // Write is invoked asynchronously.  But we can do so using the base Stream's internal helper
-            // that bypasses delegating to BeginWrite, since we already know this is FileStream rather
-            // than something derived from it and what our BeginWrite implementation is going to do.
-            return BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
-        }
-
-        public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
-        {
-            // If we weren't opened for asynchronous I/O, we still call to the base implementation so that
-            // Write is invoked asynchronously.  But if we have a byte[], we can do so using the base Stream's
-            // internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream
-            // rather than something derived from it and what our BeginWrite implementation is going to do.
-            return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
-                new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
-                base.WriteAsync(buffer, cancellationToken);
-        }
     }
 }
index e1a64a6..3c54f1b 100644 (file)
@@ -1,18 +1,12 @@
 // Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the MIT license.
 
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-using System.Threading.Tasks.Sources;
 using Microsoft.Win32.SafeHandles;
 
 namespace System.IO.Strategies
 {
     internal sealed partial class UnixFileStreamStrategy : OSFileStreamStrategy
     {
-        private ReadAsyncTaskSource? _readAsyncTaskSource;
-
         internal UnixFileStreamStrategy(SafeFileHandle handle, FileAccess access) : base(handle, access)
         {
         }
@@ -21,147 +15,5 @@ namespace System.IO.Strategies
             base(path, mode, access, share, options, preallocationSize)
         {
         }
-
-        internal override bool IsAsync => _fileHandle.IsAsync;
-
-        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
-            TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
-
-        public override int EndRead(IAsyncResult asyncResult) =>
-            TaskToApm.End<int>(asyncResult);
-
-        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
-            ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
-
-        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
-        {
-            if (!CanSeek)
-            {
-                return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
-            }
-
-            // This implementation updates the file position before the operation starts and updates it after incomplete read.
-            // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
-            long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
-            ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
-            return rats.QueueRead(destination, readOffset, cancellationToken);
-        }
-
-        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
-            TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
-
-        public override void EndWrite(IAsyncResult asyncResult) =>
-            TaskToApm.End(asyncResult);
-
-        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
-            WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
-
-        public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
-        {
-            long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
-            return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken);
-        }
-
-        /// <summary>Provides a reusable ValueTask-backing object for implementing ReadAsync.</summary>
-        private sealed class ReadAsyncTaskSource : IValueTaskSource<int>, IThreadPoolWorkItem
-        {
-            private readonly UnixFileStreamStrategy _stream;
-            private ManualResetValueTaskSourceCore<int> _source;
-
-            private Memory<byte> _destination;
-            private long _readOffset;
-            private ExecutionContext? _context;
-            private CancellationToken _cancellationToken;
-
-            public ReadAsyncTaskSource(UnixFileStreamStrategy stream) => _stream = stream;
-
-            public ValueTask<int> QueueRead(Memory<byte> destination, long readOffset, CancellationToken cancellationToken)
-            {
-                _destination = destination;
-                _readOffset = readOffset;
-                _cancellationToken = cancellationToken;
-                _context = ExecutionContext.Capture();
-
-                ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
-                return new ValueTask<int>(this, _source.Version);
-            }
-
-            void IThreadPoolWorkItem.Execute()
-            {
-                if (_context is null || _context.IsDefault)
-                {
-                    Read();
-                }
-                else
-                {
-                    ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.Read(), this);
-                }
-            }
-
-            private void Read()
-            {
-                Exception? error = null;
-                int result = 0;
-
-                try
-                {
-                    if (_cancellationToken.IsCancellationRequested)
-                    {
-                        error = new OperationCanceledException(_cancellationToken);
-                    }
-                    else
-                    {
-                        result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset);
-                    }
-                }
-                catch (Exception e)
-                {
-                    error = e;
-                }
-                finally
-                {
-                    // if the read was incomplete, we need to update the file position:
-                    if (result != _destination.Length)
-                    {
-                        _stream.OnIncompleteRead(_destination.Length, result);
-                    }
-
-                    _destination = default;
-                    _readOffset = -1;
-                    _cancellationToken = default;
-                    _context = null;
-                }
-
-                if (error is not null)
-                {
-                    _source.SetException(error);
-                }
-                else
-                {
-                    _source.SetResult(result);
-                }
-            }
-
-            int IValueTaskSource<int>.GetResult(short token)
-            {
-                try
-                {
-                    return _source.GetResult(token);
-                }
-                finally
-                {
-                    _source.Reset();
-#pragma warning disable CS0197
-                    Volatile.Write(ref _stream._readAsyncTaskSource, this);
-#pragma warning restore CS0197
-                }
-            }
-
-            ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) =>
-                _source.GetStatus(token);
-
-            void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
-                _source.OnCompleted(continuation, state, token, flags);
-        }
     }
 }