From 6fc8441311120c47c96b2605bf3a0d186f82454a Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 5 Aug 2021 07:56:29 +0200 Subject: [PATCH] Ensure FileStream.Position is correct after a failed|cancelled WriteAsync attempt (#56716) --- .../tests/FileStream/ReadAsync.cs | 12 +- .../tests/FileStream/WriteAsync.cs | 13 ++- ...FileHandle.OverlappedValueTaskSource.Windows.cs | 12 +- .../SafeFileHandle.ThreadPoolValueTaskSource.cs | 22 +++- .../src/System/IO/RandomAccess.Unix.cs | 8 +- .../src/System/IO/RandomAccess.Windows.cs | 41 ++++--- .../src/System/IO/RandomAccess.cs | 9 +- .../Strategies/AsyncWindowsFileStreamStrategy.cs | 46 -------- .../System/IO/Strategies/OSFileStreamStrategy.cs | 127 +++------------------ 9 files changed, 97 insertions(+), 193 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs index e5b8ecd..9985119 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs @@ -65,8 +65,12 @@ namespace System.IO.Tests } } - [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] - public async Task ReadAsyncCanceledFile() + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(0, true)] // 0 == no buffering + [InlineData(4096, true)] // 4096 == default buffer size + [InlineData(0, false)] + [InlineData(4096, false)] + public async Task ReadAsyncCanceledFile(int bufferSize, bool isAsync) { string fileName = GetTestFilePath(); using (FileStream fs = new FileStream(fileName, FileMode.Create)) @@ -75,7 +79,7 @@ namespace System.IO.Tests fs.Write(TestBuffer, 0, TestBuffer.Length); } - using (FileStream fs = new FileStream(fileName, FileMode.Open)) + using (FileStream fs = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, bufferSize, isAsync)) { byte[] buffer = new byte[fs.Length]; CancellationTokenSource cts = new CancellationTokenSource(); @@ -91,6 +95,8 @@ namespace System.IO.Tests // Ideally we'd be doing an Assert.Throws // but since cancellation is a race condition we accept either outcome Assert.Equal(cts.Token, oce.CancellationToken); + + Assert.Equal(0, fs.Position); // if read was cancelled, the Position should remain unchanged } } } diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs index 99f5162..06a9422 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs @@ -100,11 +100,15 @@ namespace System.IO.Tests } } - [Fact] - public async Task WriteAsyncCancelledFile() + [Theory] + [InlineData(0, true)] // 0 == no buffering + [InlineData(4096, true)] // 4096 == default buffer size + [InlineData(0, false)] + [InlineData(4096, false)] + public async Task WriteAsyncCancelledFile(int bufferSize, bool isAsync) { const int writeSize = 1024 * 1024; - using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.CreateNew, FileAccess.Write, FileShare.None, bufferSize, isAsync)) { byte[] buffer = new byte[writeSize]; CancellationTokenSource cts = new CancellationTokenSource(); @@ -119,6 +123,9 @@ namespace System.IO.Tests // Ideally we'd be doing an Assert.Throws // but since cancellation is a race condition we accept either outcome Assert.Equal(cts.Token, oce.CancellationToken); + + Assert.Equal(0, fs.Length); // if write was cancelled, the file should be empty + Assert.Equal(0, fs.Position); // if write was cancelled, the Position should remain unchanged } } } diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs index 26e17f1..1ba3516 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs @@ -46,7 +46,7 @@ namespace Microsoft.Win32.SafeHandles internal readonly PreAllocatedOverlapped _preallocatedOverlapped; internal readonly SafeFileHandle _fileHandle; - private AsyncWindowsFileStreamStrategy? _strategy; + private OSFileStreamStrategy? _strategy; internal MemoryHandle _memoryHandle; private int _bufferSize; internal ManualResetValueTaskSourceCore _source; // mutable struct; do not make this readonly @@ -77,8 +77,10 @@ namespace Microsoft.Win32.SafeHandles ? ThrowHelper.CreateEndOfFileException() : Win32Marshal.GetExceptionForWin32Error(errorCode, path); - internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null) + internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory memory, long fileOffset, OSFileStreamStrategy? strategy = null) { + Debug.Assert(strategy is null || strategy is AsyncWindowsFileStreamStrategy, $"Strategy was expected to be null or async, got {strategy}."); + _result = 0; _strategy = strategy; _bufferSize = memory.Length; @@ -195,12 +197,12 @@ namespace Microsoft.Win32.SafeHandles { Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes"); - AsyncWindowsFileStreamStrategy? strategy = _strategy; + OSFileStreamStrategy? strategy = _strategy; ReleaseResources(); - if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads + if (strategy is not null && _bufferSize != numBytes) // true only for incomplete operations { - strategy.OnIncompleteRead(_bufferSize, (int)numBytes); + strategy.OnIncompleteOperation(_bufferSize, (int)numBytes); } switch (errorCode) diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs index abb6238..9fff830 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.ThreadPoolValueTaskSource.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.IO.Strategies; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -32,6 +33,7 @@ namespace Microsoft.Win32.SafeHandles private ManualResetValueTaskSourceCore _source; private Operation _operation = Operation.None; private ExecutionContext? _context; + private OSFileStreamStrategy? _strategy; // These fields store the parameters for the operation. // The first two are common for all kinds of operations. @@ -116,8 +118,22 @@ namespace Microsoft.Win32.SafeHandles } finally { + if (_strategy is not null) + { + // WriteAtOffset returns void, so we need to fix position only in case of an exception + if (exception is not null) + { + _strategy.OnIncompleteOperation(_singleSegment.Length, 0); + } + else if (_operation == Operation.Read && result != _singleSegment.Length) + { + _strategy.OnIncompleteOperation(_singleSegment.Length, (int)result); + } + } + _operation = Operation.None; _context = null; + _strategy = null; _cancellationToken = default; _singleSegment = default; _readScatterBuffers = null; @@ -152,7 +168,7 @@ namespace Microsoft.Win32.SafeHandles ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true); } - public ValueTask QueueRead(Memory buffer, long fileOffset, CancellationToken cancellationToken) + public ValueTask QueueRead(Memory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { ValidateInvariants(); @@ -160,12 +176,13 @@ namespace Microsoft.Win32.SafeHandles _singleSegment = buffer; _fileOffset = fileOffset; _cancellationToken = cancellationToken; + _strategy = strategy; QueueToThreadPool(); return new ValueTask(this, _source.Version); } - public ValueTask QueueWrite(ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) + public ValueTask QueueWrite(ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { ValidateInvariants(); @@ -173,6 +190,7 @@ namespace Microsoft.Win32.SafeHandles _singleSegment = buffer; _fileOffset = fileOffset; _cancellationToken = cancellationToken; + _strategy = strategy; QueueToThreadPool(); return new ValueTask(this, _source.Version); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs index f98e987..992dcc5 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs @@ -74,8 +74,8 @@ namespace System.IO return FileStreamHelpers.CheckFileCall(result, handle.Path); } - internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, CancellationToken cancellationToken) - => ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) + => ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); private static ValueTask ReadScatterAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, long fileOffset, CancellationToken cancellationToken) @@ -202,8 +202,8 @@ namespace System.IO } } - internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) - => ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) + => ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); private static ValueTask WriteGatherAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, long fileOffset, CancellationToken cancellationToken) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs index 09f7a69..d367abb 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs @@ -220,11 +220,12 @@ namespace System.IO } } - internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, CancellationToken cancellationToken) + internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) { if (handle.IsAsync) { - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncReadFile(handle, buffer, fileOffset, cancellationToken); + (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncReadFile(handle, buffer, fileOffset, cancellationToken, strategy); if (vts is not null) { @@ -236,19 +237,19 @@ namespace System.IO return ValueTask.FromResult(0); } - return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode)); + return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode, handle.Path)); } - return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); } - internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory buffer, long fileOffset, - CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null) + private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { handle.EnsureThreadPoolBindingInitialized(); SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource(); - int errorCode = 0; + int errorCode = Interop.Errors.ERROR_SUCCESS; try { NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy); @@ -292,7 +293,7 @@ namespace System.IO { if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS) { - strategy?.OnIncompleteRead(buffer.Length, 0); + strategy?.OnIncompleteOperation(buffer.Length, 0); } } @@ -301,11 +302,12 @@ namespace System.IO return (vts, -1); } - internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) + internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null) { if (handle.IsAsync) { - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncWriteFile(handle, buffer, fileOffset, cancellationToken); + (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncWriteFile(handle, buffer, fileOffset, cancellationToken, strategy); if (vts is not null) { @@ -317,27 +319,29 @@ namespace System.IO return ValueTask.CompletedTask; } - return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode)); + return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode, handle.Path)); } - return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken); + return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy); } - internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, CancellationToken cancellationToken) + private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory buffer, long fileOffset, + CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { handle.EnsureThreadPoolBindingInitialized(); SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource(); + int errorCode = Interop.Errors.ERROR_SUCCESS; try { - NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset); + NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy); Debug.Assert(vts._memoryHandle.Pointer != null); // Queue an async WriteFile operation. if (Interop.Kernel32.WriteFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0) { // The operation failed, or it's pending. - int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle); + errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle); switch (errorCode) { case Interop.Errors.ERROR_IO_PENDING: @@ -360,6 +364,13 @@ namespace System.IO vts.Dispose(); throw; } + finally + { + if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS) + { + strategy?.OnIncompleteOperation(buffer.Length, 0); + } + } // Completion handled by callback. vts.FinishedScheduling(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs index ad190fe..04b59ae 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.IO.Strategies; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -264,9 +265,9 @@ namespace System.IO } private static ValueTask ScheduleSyncReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, - long fileOffset, CancellationToken cancellationToken) + long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { - return handle.GetThreadPoolValueTaskSource().QueueRead(buffer, fileOffset, cancellationToken); + return handle.GetThreadPoolValueTaskSource().QueueRead(buffer, fileOffset, cancellationToken, strategy); } private static ValueTask ScheduleSyncReadScatterAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, @@ -276,9 +277,9 @@ namespace System.IO } private static ValueTask ScheduleSyncWriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory buffer, - long fileOffset, CancellationToken cancellationToken) + long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy) { - return handle.GetThreadPoolValueTaskSource().QueueWrite(buffer, fileOffset, cancellationToken); + return handle.GetThreadPoolValueTaskSource().QueueWrite(buffer, fileOffset, cancellationToken, strategy); } private static ValueTask ScheduleSyncWriteGatherAtOffsetAsync(SafeFileHandle handle, IReadOnlyList> buffers, diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs index 12af993..82ed765 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs @@ -21,52 +21,6 @@ namespace System.IO.Strategies internal override bool IsAsync => true; - public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) - { - if (!CanSeek) - { - return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); - } - - if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length) - { - // We know for sure that the file length can be safely cached and it has already been obtained. - // If we have reached EOF we just return here and avoid a sys-call. - return ValueTask.FromResult(0); - } - - // This implementation updates the file position before the operation starts and updates it after incomplete read. - // This is done to keep backward compatibility for concurrent reads. - // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time. - long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; - - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this); - return vts != null - ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException(HandleIOError(readOffset, errorCode)); - } - - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) - { - long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, buffer.Length) - buffer.Length : -1; - - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, buffer, writeOffset, cancellationToken); - return vts != null - ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode)); - } - - private Exception HandleIOError(long positionBefore, int errorCode) - { - if (_fileHandle.CanSeek) - { - // Update Position... it could be anywhere. - Interlocked.Exchange(ref _filePosition, positionBefore); - } - - return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path); - } - public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) { // Fail if the file was closed diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs index 260624b..0386dfa 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs @@ -14,10 +14,9 @@ namespace System.IO.Strategies { protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws private readonly FileAccess _access; // What file was opened for. - private ReadAsyncTaskSource? _readAsyncTaskSource; // Cached IValueTaskSource used for async-over-sync reads protected long _filePosition; - protected long _length = -1; // negative means that hasn't been fetched. + private long _length = -1; // negative means that hasn't been fetched. private long _appendStart; // When appending, prevent overwriting file. private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing. @@ -102,9 +101,10 @@ namespace System.IO.Strategies // in case of concurrent incomplete reads, there can be multiple threads trying to update the position // at the same time. That is why we are using Interlocked here. - internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead); + internal void OnIncompleteOperation(int expectedBytesTransferred, int actualBytesTransferred) + => Interlocked.Add(ref _filePosition, actualBytesTransferred - expectedBytesTransferred); - protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached; + private bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached; /// Gets or sets the position within the current stream public sealed override long Position @@ -292,10 +292,10 @@ namespace System.IO.Strategies public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); - public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) + public sealed override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) { long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1; - return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken); + return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken, this); } public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => @@ -307,120 +307,25 @@ namespace System.IO.Strategies public sealed override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => ReadAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); - public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken) + public sealed override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken) { if (!CanSeek) { return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); } - // This implementation updates the file position before the operation starts and updates it after incomplete read. - // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations. - long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; - ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this); - return rats.QueueRead(destination, readOffset, cancellationToken); - } - - /// Provides a reusable ValueTask-backing object for implementing ReadAsync. - private sealed class ReadAsyncTaskSource : IValueTaskSource, IThreadPoolWorkItem - { - private readonly OSFileStreamStrategy _stream; - private ManualResetValueTaskSourceCore _source; - - private Memory _destination; - private long _readOffset; - private ExecutionContext? _context; - private CancellationToken _cancellationToken; - - public ReadAsyncTaskSource(OSFileStreamStrategy stream) => _stream = stream; - - public ValueTask QueueRead(Memory destination, long readOffset, CancellationToken cancellationToken) - { - _destination = destination; - _readOffset = readOffset; - _cancellationToken = cancellationToken; - _context = ExecutionContext.Capture(); - - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true); - return new ValueTask(this, _source.Version); - } - - void IThreadPoolWorkItem.Execute() + if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length) { - if (_context is null || _context.IsDefault) - { - Read(); - } - else - { - ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.Read(), this); - } + // We know for sure that the file length can be safely cached and it has already been obtained. + // If we have reached EOF we just return here and avoid a sys-call. + return ValueTask.FromResult(0); } - private void Read() - { - Exception? error = null; - int result = 0; - - try - { - if (_cancellationToken.IsCancellationRequested) - { - error = new OperationCanceledException(_cancellationToken); - } - else - { - result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset); - } - } - catch (Exception e) - { - error = e; - } - finally - { - // if the read was incomplete, we need to update the file position: - if (result != _destination.Length) - { - _stream.OnIncompleteRead(_destination.Length, result); - } - - _destination = default; - _readOffset = -1; - _cancellationToken = default; - _context = null; - } - - if (error is not null) - { - _source.SetException(error); - } - else - { - _source.SetResult(result); - } - } - - int IValueTaskSource.GetResult(short token) - { - try - { - return _source.GetResult(token); - } - finally - { - _source.Reset(); -#pragma warning disable CS0197 - Volatile.Write(ref _stream._readAsyncTaskSource, this); -#pragma warning restore CS0197 - } - } - - ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => - _source.GetStatus(token); - - void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => - _source.OnCompleted(continuation, state, token, flags); + // This implementation updates the file position before the operation starts and updates it after incomplete read. + // This is done to keep backward compatibility for concurrent reads. + // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time. + long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; + return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, readOffset, cancellationToken, this); } } } -- 2.7.4