Update position before ReadAsync starts, but fix it after incomplete read (#56531)
authorAdam Sitnik <adam.sitnik@gmail.com>
Sat, 31 Jul 2021 23:37:23 +0000 (01:37 +0200)
committerGitHub <noreply@github.com>
Sat, 31 Jul 2021 23:37:23 +0000 (19:37 -0400)
* move CanRead and CanWrite checks to FileStream

* don't check IsClosed twice_(FileHandle.CanSeek already contains a IsClosed check)

* add a failing test

* handle incomplete async reads

* don't try to cache file length when file is opened for writing, as updating file position before performing async write can lead to invalid cached length value

* maybe Win 7 & 8 fix

src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs
src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs
src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs
src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs
src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs
src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs
src/libraries/System.Private.CoreLib/src/System/IO/Strategies/UnixFileStreamStrategy.cs

index da429bb..e5b8ecd 100644 (file)
@@ -1,6 +1,8 @@
 // Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the MIT license.
 
+using System.Linq;
+using System.Security.Cryptography;
 using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
@@ -92,6 +94,38 @@ namespace System.IO.Tests
                 }
             }
         }
+
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(FileShare.None, FileOptions.Asynchronous)] // FileShare.None: exclusive access
+        [InlineData(FileShare.ReadWrite, FileOptions.Asynchronous)] // FileShare.ReadWrite: others can write to the file, the length can't be cached
+        [InlineData(FileShare.None, FileOptions.None)]
+        [InlineData(FileShare.ReadWrite, FileOptions.None)]
+        public async Task IncompleteReadCantSetPositionBeyondEndOfFile(FileShare fileShare, FileOptions options)
+        {
+            const int fileSize = 10_000;
+            string filePath = GetTestFilePath();
+            byte[] content = RandomNumberGenerator.GetBytes(fileSize);
+            File.WriteAllBytes(filePath, content);
+
+            byte[][] buffers = Enumerable.Repeat(Enumerable.Repeat(byte.MaxValue, fileSize * 2).ToArray(), 10).ToArray();
+
+            using (FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, fileShare, bufferSize: 0, options))
+            {
+                Task<int>[] reads = buffers.Select(buffer => fs.ReadAsync(buffer, 0, buffer.Length)).ToArray();
+
+                // the reads were not awaited, it's an anti-pattern and Position can be (0, buffersLength) now:
+                Assert.InRange(fs.Position, 0, buffers.Sum(buffer => buffer.Length));
+
+                await Task.WhenAll(reads);
+                // but when they are finished, the first buffer should contain valid data:
+                Assert.Equal(fileSize, reads.First().Result);
+                AssertExtensions.SequenceEqual(content, buffers.First().AsSpan(0, fileSize));
+                // and other reads should return 0:
+                Assert.All(reads.Skip(1), read => Assert.Equal(0, read.Result));
+                // and the Position must be correct:
+                Assert.Equal(fileSize, fs.Position);
+            }
+        }
     }
 
     [ActiveIssue("https://github.com/dotnet/runtime/issues/34582", TestPlatforms.Windows, TargetFrameworkMonikers.Netcoreapp, TestRuntimes.Mono)]
index 367184e..26e17f1 100644 (file)
@@ -5,6 +5,7 @@ using System;
 using System.Buffers;
 using System.Diagnostics;
 using System.IO;
+using System.IO.Strategies;
 using System.Threading;
 using System.Threading.Tasks.Sources;
 
@@ -45,7 +46,9 @@ namespace Microsoft.Win32.SafeHandles
 
             internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
             internal readonly SafeFileHandle _fileHandle;
+            private AsyncWindowsFileStreamStrategy? _strategy;
             internal MemoryHandle _memoryHandle;
+            private int _bufferSize;
             internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
             private NativeOverlapped* _overlapped;
             private CancellationTokenRegistration _cancellationRegistration;
@@ -74,9 +77,11 @@ namespace Microsoft.Win32.SafeHandles
                     ? ThrowHelper.CreateEndOfFileException()
                     : Win32Marshal.GetExceptionForWin32Error(errorCode, path);
 
-            internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset)
+            internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null)
             {
                 _result = 0;
+                _strategy = strategy;
+                _bufferSize = memory.Length;
                 _memoryHandle = memory.Pin();
                 _overlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
                 _overlapped->OffsetLow = (int)fileOffset;
@@ -132,8 +137,9 @@ namespace Microsoft.Win32.SafeHandles
                 }
             }
 
-            internal void ReleaseResources()
+            private void ReleaseResources()
             {
+                _strategy = null;
                 // Unpin any pinned buffer.
                 _memoryHandle.Dispose();
 
@@ -187,11 +193,19 @@ namespace Microsoft.Win32.SafeHandles
 
             internal void Complete(uint errorCode, uint numBytes)
             {
+                Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes");
+
+                AsyncWindowsFileStreamStrategy? strategy = _strategy;
                 ReleaseResources();
 
+                if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads
+                {
+                    strategy.OnIncompleteRead(_bufferSize, (int)numBytes);
+                }
+
                 switch (errorCode)
                 {
-                    case 0:
+                    case Interop.Errors.ERROR_SUCCESS:
                     case Interop.Errors.ERROR_BROKEN_PIPE:
                     case Interop.Errors.ERROR_NO_DATA:
                     case Interop.Errors.ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file)
index a2be26f..6a94ad2 100644 (file)
@@ -280,9 +280,14 @@ namespace System.IO
             {
                 return Task.FromCanceled<int>(cancellationToken);
             }
-            else if (_strategy.IsClosed)
+            else if (!_strategy.CanRead)
             {
-                ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                if (_strategy.IsClosed)
+                {
+                    ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                }
+
+                ThrowHelper.ThrowNotSupportedException_UnreadableStream();
             }
 
             return _strategy.ReadAsync(buffer, offset, count, cancellationToken);
@@ -294,9 +299,14 @@ namespace System.IO
             {
                 return ValueTask.FromCanceled<int>(cancellationToken);
             }
-            else if (_strategy.IsClosed)
+            else if (!_strategy.CanRead)
             {
-                ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                if (_strategy.IsClosed)
+                {
+                    ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                }
+
+                ThrowHelper.ThrowNotSupportedException_UnreadableStream();
             }
 
             return _strategy.ReadAsync(buffer, cancellationToken);
@@ -319,9 +329,14 @@ namespace System.IO
             {
                 return Task.FromCanceled(cancellationToken);
             }
-            else if (_strategy.IsClosed)
+            else if (!_strategy.CanWrite)
             {
-                ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                if (_strategy.IsClosed)
+                {
+                    ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                }
+
+                ThrowHelper.ThrowNotSupportedException_UnwritableStream();
             }
 
             return _strategy.WriteAsync(buffer, offset, count, cancellationToken);
@@ -333,9 +348,14 @@ namespace System.IO
             {
                 return ValueTask.FromCanceled(cancellationToken);
             }
-            else if (_strategy.IsClosed)
+            else if (!_strategy.CanWrite)
             {
-                ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                if (_strategy.IsClosed)
+                {
+                    ThrowHelper.ThrowObjectDisposedException_FileClosed();
+                }
+
+                ThrowHelper.ThrowNotSupportedException_UnwritableStream();
             }
 
             return _strategy.WriteAsync(buffer, cancellationToken);
index 8711ce2..2bb8d46 100644 (file)
@@ -242,21 +242,23 @@ namespace System.IO
             return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
         }
 
-        internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
+        internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset,
+            CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null)
         {
             handle.EnsureThreadPoolBindingInitialized();
 
             SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource();
+            int errorCode = 0;
             try
             {
-                NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset);
+                NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy);
                 Debug.Assert(vts._memoryHandle.Pointer != null);
 
                 // Queue an async ReadFile operation.
                 if (Interop.Kernel32.ReadFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0)
                 {
                     // The operation failed, or it's pending.
-                    int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
+                    errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
                     switch (errorCode)
                     {
                         case Interop.Errors.ERROR_IO_PENDING:
@@ -286,6 +288,13 @@ namespace System.IO
                 vts.Dispose();
                 throw;
             }
+            finally
+            {
+                if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
+                {
+                    strategy?.OnIncompleteRead(buffer.Length, 0);
+                }
+            }
 
             // Completion handled by callback.
             vts.FinishedScheduling();
index 913ce59..9bc4965 100644 (file)
@@ -27,40 +27,29 @@ namespace System.IO.Strategies
         public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
             => ReadAsyncInternal(destination, cancellationToken);
 
-        private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
+        private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
         {
-            if (!CanRead)
+            if (!CanSeek)
             {
-                ThrowHelper.ThrowNotSupportedException_UnreadableStream();
+                return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
             }
 
-            long positionBefore = _filePosition;
-            if (CanSeek)
+            if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length)
             {
-                long len = Length;
-                if (positionBefore + destination.Length > len)
-                {
-                    destination = positionBefore <= len ?
-                        destination.Slice(0, (int)(len - positionBefore)) :
-                        default;
-                }
-
-                // When using overlapped IO, the OS is not supposed to
-                // touch the file pointer location at all.  We will adjust it
-                // ourselves, but only in memory. This isn't threadsafe.
-                _filePosition += destination.Length;
-
-                // We know for sure that there is nothing to read, so we just return here and avoid a sys-call.
-                if (destination.IsEmpty && LengthCachingSupported)
-                {
-                    return ValueTask.FromResult(0);
-                }
+                // We know for sure that the file length can be safely cached and it has already been obtained.
+                // If we have reached EOF we just return here and avoid a sys-call.
+                return ValueTask.FromResult(0);
             }
 
-            (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, positionBefore, cancellationToken);
+            // This implementation updates the file position before the operation starts and updates it after incomplete read.
+            // This is done to keep backward compatibility for concurrent reads.
+            // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time.
+            long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
+
+            (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this);
             return vts != null
                 ? new ValueTask<int>(vts, vts.Version)
-                : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(positionBefore, errorCode));
+                : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
         }
 
         public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -69,35 +58,22 @@ namespace System.IO.Strategies
         public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
             => WriteAsyncInternal(buffer, cancellationToken);
 
-        private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
+        private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
         {
-            if (!CanWrite)
-            {
-                ThrowHelper.ThrowNotSupportedException_UnwritableStream();
-            }
-
-            long positionBefore = _filePosition;
-            if (CanSeek)
-            {
-                // When using overlapped IO, the OS is not supposed to
-                // touch the file pointer location at all.  We will adjust it
-                // ourselves, but only in memory.  This isn't threadsafe.
-                _filePosition += source.Length;
-                UpdateLengthOnChangePosition();
-            }
+            long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
 
-            (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, positionBefore, cancellationToken);
+            (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, writeOffset, cancellationToken);
             return vts != null
                 ? new ValueTask(vts, vts.Version)
-                : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(positionBefore, errorCode));
+                : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
         }
 
         private Exception HandleIOError(long positionBefore, int errorCode)
         {
-            if (!_fileHandle.IsClosed && CanSeek)
+            if (_fileHandle.CanSeek)
             {
                 // Update Position... it could be anywhere.
-                _filePosition = positionBefore;
+                Interlocked.Exchange(ref _filePosition, positionBefore);
             }
 
             return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path);
index 226765f..2faf4c2 100644 (file)
@@ -15,9 +15,9 @@ namespace System.IO.Strategies
         private readonly FileAccess _access; // What file was opened for.
 
         protected long _filePosition;
+        protected long _length = -1; // negative means that hasn't been fetched.
         private long _appendStart; // When appending, prevent overwriting file.
-        private long _length = -1; // When the file is locked for writes on Windows ((share & FileShare.Write) == 0) cache file length in-memory, negative means that hasn't been fetched.
-        private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed and FileShare.Write was not specified when the handle was opened.
+        private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing.
 
         internal OSFileStreamStrategy(SafeFileHandle handle, FileAccess access)
         {
@@ -44,7 +44,7 @@ namespace System.IO.Strategies
             string fullPath = Path.GetFullPath(path);
 
             _access = access;
-            _lengthCanBeCached = (share & FileShare.Write) == 0;
+            _lengthCanBeCached = (share & FileShare.Write) == 0 && (access & FileAccess.Write) == 0;
 
             _fileHandle = SafeFileHandle.Open(fullPath, mode, access, share, options, preallocationSize);
 
@@ -96,21 +96,9 @@ namespace System.IO.Strategies
             }
         }
 
-        protected void UpdateLengthOnChangePosition()
-        {
-            // Do not update the cached length if the file is not locked
-            // or if the length hasn't been fetched.
-            if (!LengthCachingSupported || _length < 0)
-            {
-                Debug.Assert(_length < 0);
-                return;
-            }
-
-            if (_filePosition > _length)
-            {
-                _length = _filePosition;
-            }
-        }
+        // in case of concurrent incomplete reads, there can be multiple threads trying to update the position
+        // at the same time. That is why we are using Interlocked here.
+        internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead);
 
         protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached;
 
@@ -287,18 +275,8 @@ namespace System.IO.Strategies
                 ThrowHelper.ThrowNotSupportedException_UnwritableStream();
             }
 
-            try
-            {
-                RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
-            }
-            catch
-            {
-                _length = -1; // invalidate cached length
-                throw;
-            }
-
+            RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
             _filePosition += buffer.Length;
-            UpdateLengthOnChangePosition();
         }
     }
 }
index e9f00f2..e1a64a6 100644 (file)
@@ -1,6 +1,7 @@
 // Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the MIT license.
 
+using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Sources;
@@ -34,20 +35,16 @@ namespace System.IO.Strategies
 
         public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
         {
-            if (!CanRead)
+            if (!CanSeek)
             {
-                ThrowHelper.ThrowNotSupportedException_UnreadableStream();
+                return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
             }
 
-            if (CanSeek)
-            {
-                // This implementation updates the file position after the operation completes, rather than before.
-                // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
-                ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
-                return rats.QueueRead(destination, cancellationToken);
-            }
-
-            return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
+            // This implementation updates the file position before the operation starts and updates it after incomplete read.
+            // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
+            long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
+            ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
+            return rats.QueueRead(destination, readOffset, cancellationToken);
         }
 
         public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
@@ -61,19 +58,8 @@ namespace System.IO.Strategies
 
         public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
         {
-            if (!CanWrite)
-            {
-                ThrowHelper.ThrowNotSupportedException_UnwritableStream();
-            }
-
-            long filePositionBefore = -1;
-            if (CanSeek)
-            {
-                filePositionBefore = _filePosition;
-                _filePosition += source.Length;
-            }
-
-            return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, filePositionBefore, cancellationToken);
+            long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
+            return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken);
         }
 
         /// <summary>Provides a reusable ValueTask-backing object for implementing ReadAsync.</summary>
@@ -83,14 +69,16 @@ namespace System.IO.Strategies
             private ManualResetValueTaskSourceCore<int> _source;
 
             private Memory<byte> _destination;
+            private long _readOffset;
             private ExecutionContext? _context;
             private CancellationToken _cancellationToken;
 
             public ReadAsyncTaskSource(UnixFileStreamStrategy stream) => _stream = stream;
 
-            public ValueTask<int> QueueRead(Memory<byte> destination, CancellationToken cancellationToken)
+            public ValueTask<int> QueueRead(Memory<byte> destination, long readOffset, CancellationToken cancellationToken)
             {
                 _destination = destination;
+                _readOffset = readOffset;
                 _cancellationToken = cancellationToken;
                 _context = ExecutionContext.Capture();
 
@@ -123,7 +111,7 @@ namespace System.IO.Strategies
                     }
                     else
                     {
-                        result = _stream.Read(_destination.Span);
+                        result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset);
                     }
                 }
                 catch (Exception e)
@@ -132,7 +120,14 @@ namespace System.IO.Strategies
                 }
                 finally
                 {
+                    // if the read was incomplete, we need to update the file position:
+                    if (result != _destination.Length)
+                    {
+                        _stream.OnIncompleteRead(_destination.Length, result);
+                    }
+
                     _destination = default;
+                    _readOffset = -1;
                     _cancellationToken = default;
                     _context = null;
                 }