// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.Linq;
+using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
}
}
}
+
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(FileShare.None, FileOptions.Asynchronous)] // FileShare.None: exclusive access
+ [InlineData(FileShare.ReadWrite, FileOptions.Asynchronous)] // FileShare.ReadWrite: others can write to the file, the length can't be cached
+ [InlineData(FileShare.None, FileOptions.None)]
+ [InlineData(FileShare.ReadWrite, FileOptions.None)]
+ public async Task IncompleteReadCantSetPositionBeyondEndOfFile(FileShare fileShare, FileOptions options)
+ {
+ const int fileSize = 10_000;
+ string filePath = GetTestFilePath();
+ byte[] content = RandomNumberGenerator.GetBytes(fileSize);
+ File.WriteAllBytes(filePath, content);
+
+ byte[][] buffers = Enumerable.Repeat(Enumerable.Repeat(byte.MaxValue, fileSize * 2).ToArray(), 10).ToArray();
+
+ using (FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, fileShare, bufferSize: 0, options))
+ {
+ Task<int>[] reads = buffers.Select(buffer => fs.ReadAsync(buffer, 0, buffer.Length)).ToArray();
+
+ // the reads were not awaited, it's an anti-pattern and Position can be (0, buffersLength) now:
+ Assert.InRange(fs.Position, 0, buffers.Sum(buffer => buffer.Length));
+
+ await Task.WhenAll(reads);
+ // but when they are finished, the first buffer should contain valid data:
+ Assert.Equal(fileSize, reads.First().Result);
+ AssertExtensions.SequenceEqual(content, buffers.First().AsSpan(0, fileSize));
+ // and other reads should return 0:
+ Assert.All(reads.Skip(1), read => Assert.Equal(0, read.Result));
+ // and the Position must be correct:
+ Assert.Equal(fileSize, fs.Position);
+ }
+ }
}
[ActiveIssue("https://github.com/dotnet/runtime/issues/34582", TestPlatforms.Windows, TargetFrameworkMonikers.Netcoreapp, TestRuntimes.Mono)]
using System.Buffers;
using System.Diagnostics;
using System.IO;
+using System.IO.Strategies;
using System.Threading;
using System.Threading.Tasks.Sources;
internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
internal readonly SafeFileHandle _fileHandle;
+ private AsyncWindowsFileStreamStrategy? _strategy;
internal MemoryHandle _memoryHandle;
+ private int _bufferSize;
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
? ThrowHelper.CreateEndOfFileException()
: Win32Marshal.GetExceptionForWin32Error(errorCode, path);
- internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset)
+ internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null)
{
_result = 0;
+ _strategy = strategy;
+ _bufferSize = memory.Length;
_memoryHandle = memory.Pin();
_overlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
_overlapped->OffsetLow = (int)fileOffset;
}
}
- internal void ReleaseResources()
+ private void ReleaseResources()
{
+ _strategy = null;
// Unpin any pinned buffer.
_memoryHandle.Dispose();
internal void Complete(uint errorCode, uint numBytes)
{
+ Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes");
+
+ AsyncWindowsFileStreamStrategy? strategy = _strategy;
ReleaseResources();
+ if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads
+ {
+ strategy.OnIncompleteRead(_bufferSize, (int)numBytes);
+ }
+
switch (errorCode)
{
- case 0:
+ case Interop.Errors.ERROR_SUCCESS:
case Interop.Errors.ERROR_BROKEN_PIPE:
case Interop.Errors.ERROR_NO_DATA:
case Interop.Errors.ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file)
{
return Task.FromCanceled<int>(cancellationToken);
}
- else if (_strategy.IsClosed)
+ else if (!_strategy.CanRead)
{
- ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ if (_strategy.IsClosed)
+ {
+ ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ }
+
+ ThrowHelper.ThrowNotSupportedException_UnreadableStream();
}
return _strategy.ReadAsync(buffer, offset, count, cancellationToken);
{
return ValueTask.FromCanceled<int>(cancellationToken);
}
- else if (_strategy.IsClosed)
+ else if (!_strategy.CanRead)
{
- ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ if (_strategy.IsClosed)
+ {
+ ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ }
+
+ ThrowHelper.ThrowNotSupportedException_UnreadableStream();
}
return _strategy.ReadAsync(buffer, cancellationToken);
{
return Task.FromCanceled(cancellationToken);
}
- else if (_strategy.IsClosed)
+ else if (!_strategy.CanWrite)
{
- ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ if (_strategy.IsClosed)
+ {
+ ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ }
+
+ ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}
return _strategy.WriteAsync(buffer, offset, count, cancellationToken);
{
return ValueTask.FromCanceled(cancellationToken);
}
- else if (_strategy.IsClosed)
+ else if (!_strategy.CanWrite)
{
- ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ if (_strategy.IsClosed)
+ {
+ ThrowHelper.ThrowObjectDisposedException_FileClosed();
+ }
+
+ ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}
return _strategy.WriteAsync(buffer, cancellationToken);
return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
}
- internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
+ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset,
+ CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null)
{
handle.EnsureThreadPoolBindingInitialized();
SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource();
+ int errorCode = 0;
try
{
- NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset);
+ NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy);
Debug.Assert(vts._memoryHandle.Pointer != null);
// Queue an async ReadFile operation.
if (Interop.Kernel32.ReadFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0)
{
// The operation failed, or it's pending.
- int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
+ errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
switch (errorCode)
{
case Interop.Errors.ERROR_IO_PENDING:
vts.Dispose();
throw;
}
+ finally
+ {
+ if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
+ {
+ strategy?.OnIncompleteRead(buffer.Length, 0);
+ }
+ }
// Completion handled by callback.
vts.FinishedScheduling();
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
=> ReadAsyncInternal(destination, cancellationToken);
- private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
+ private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
- if (!CanRead)
+ if (!CanSeek)
{
- ThrowHelper.ThrowNotSupportedException_UnreadableStream();
+ return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
}
- long positionBefore = _filePosition;
- if (CanSeek)
+ if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length)
{
- long len = Length;
- if (positionBefore + destination.Length > len)
- {
- destination = positionBefore <= len ?
- destination.Slice(0, (int)(len - positionBefore)) :
- default;
- }
-
- // When using overlapped IO, the OS is not supposed to
- // touch the file pointer location at all. We will adjust it
- // ourselves, but only in memory. This isn't threadsafe.
- _filePosition += destination.Length;
-
- // We know for sure that there is nothing to read, so we just return here and avoid a sys-call.
- if (destination.IsEmpty && LengthCachingSupported)
- {
- return ValueTask.FromResult(0);
- }
+ // We know for sure that the file length can be safely cached and it has already been obtained.
+ // If we have reached EOF we just return here and avoid a sys-call.
+ return ValueTask.FromResult(0);
}
- (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, positionBefore, cancellationToken);
+ // This implementation updates the file position before the operation starts and updates it after incomplete read.
+ // This is done to keep backward compatibility for concurrent reads.
+ // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time.
+ long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
+
+ (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this);
return vts != null
? new ValueTask<int>(vts, vts.Version)
- : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(positionBefore, errorCode));
+ : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
=> WriteAsyncInternal(buffer, cancellationToken);
- private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
+ private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
- if (!CanWrite)
- {
- ThrowHelper.ThrowNotSupportedException_UnwritableStream();
- }
-
- long positionBefore = _filePosition;
- if (CanSeek)
- {
- // When using overlapped IO, the OS is not supposed to
- // touch the file pointer location at all. We will adjust it
- // ourselves, but only in memory. This isn't threadsafe.
- _filePosition += source.Length;
- UpdateLengthOnChangePosition();
- }
+ long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
- (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, positionBefore, cancellationToken);
+ (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, writeOffset, cancellationToken);
return vts != null
? new ValueTask(vts, vts.Version)
- : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(positionBefore, errorCode));
+ : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
}
private Exception HandleIOError(long positionBefore, int errorCode)
{
- if (!_fileHandle.IsClosed && CanSeek)
+ if (_fileHandle.CanSeek)
{
// Update Position... it could be anywhere.
- _filePosition = positionBefore;
+ Interlocked.Exchange(ref _filePosition, positionBefore);
}
return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path);
private readonly FileAccess _access; // What file was opened for.
protected long _filePosition;
+ protected long _length = -1; // negative means that hasn't been fetched.
private long _appendStart; // When appending, prevent overwriting file.
- private long _length = -1; // When the file is locked for writes on Windows ((share & FileShare.Write) == 0) cache file length in-memory, negative means that hasn't been fetched.
- private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed and FileShare.Write was not specified when the handle was opened.
+ private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing.
internal OSFileStreamStrategy(SafeFileHandle handle, FileAccess access)
{
string fullPath = Path.GetFullPath(path);
_access = access;
- _lengthCanBeCached = (share & FileShare.Write) == 0;
+ _lengthCanBeCached = (share & FileShare.Write) == 0 && (access & FileAccess.Write) == 0;
_fileHandle = SafeFileHandle.Open(fullPath, mode, access, share, options, preallocationSize);
}
}
- protected void UpdateLengthOnChangePosition()
- {
- // Do not update the cached length if the file is not locked
- // or if the length hasn't been fetched.
- if (!LengthCachingSupported || _length < 0)
- {
- Debug.Assert(_length < 0);
- return;
- }
-
- if (_filePosition > _length)
- {
- _length = _filePosition;
- }
- }
+ // in case of concurrent incomplete reads, there can be multiple threads trying to update the position
+ // at the same time. That is why we are using Interlocked here.
+ internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead);
protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached;
ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}
- try
- {
- RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
- }
- catch
- {
- _length = -1; // invalidate cached length
- throw;
- }
-
+ RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
_filePosition += buffer.Length;
- UpdateLengthOnChangePosition();
}
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
{
- if (!CanRead)
+ if (!CanSeek)
{
- ThrowHelper.ThrowNotSupportedException_UnreadableStream();
+ return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
}
- if (CanSeek)
- {
- // This implementation updates the file position after the operation completes, rather than before.
- // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
- ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
- return rats.QueueRead(destination, cancellationToken);
- }
-
- return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
+ // This implementation updates the file position before the operation starts and updates it after incomplete read.
+ // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
+ long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
+ ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
+ return rats.QueueRead(destination, readOffset, cancellationToken);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
- if (!CanWrite)
- {
- ThrowHelper.ThrowNotSupportedException_UnwritableStream();
- }
-
- long filePositionBefore = -1;
- if (CanSeek)
- {
- filePositionBefore = _filePosition;
- _filePosition += source.Length;
- }
-
- return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, filePositionBefore, cancellationToken);
+ long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
+ return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken);
}
/// <summary>Provides a reusable ValueTask-backing object for implementing ReadAsync.</summary>
private ManualResetValueTaskSourceCore<int> _source;
private Memory<byte> _destination;
+ private long _readOffset;
private ExecutionContext? _context;
private CancellationToken _cancellationToken;
public ReadAsyncTaskSource(UnixFileStreamStrategy stream) => _stream = stream;
- public ValueTask<int> QueueRead(Memory<byte> destination, CancellationToken cancellationToken)
+ public ValueTask<int> QueueRead(Memory<byte> destination, long readOffset, CancellationToken cancellationToken)
{
_destination = destination;
+ _readOffset = readOffset;
_cancellationToken = cancellationToken;
_context = ExecutionContext.Capture();
}
else
{
- result = _stream.Read(_destination.Span);
+ result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset);
}
}
catch (Exception e)
}
finally
{
+ // if the read was incomplete, we need to update the file position:
+ if (result != _destination.Length)
+ {
+ _stream.OnIncompleteRead(_destination.Length, result);
+ }
+
_destination = default;
+ _readOffset = -1;
_cancellationToken = default;
_context = null;
}