}
}
- [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
- public async Task ReadAsyncCanceledFile()
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(0, true)] // 0 == no buffering
+ [InlineData(4096, true)] // 4096 == default buffer size
+ [InlineData(0, false)]
+ [InlineData(4096, false)]
+ public async Task ReadAsyncCanceledFile(int bufferSize, bool isAsync)
{
string fileName = GetTestFilePath();
using (FileStream fs = new FileStream(fileName, FileMode.Create))
fs.Write(TestBuffer, 0, TestBuffer.Length);
}
- using (FileStream fs = new FileStream(fileName, FileMode.Open))
+ using (FileStream fs = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, bufferSize, isAsync))
{
byte[] buffer = new byte[fs.Length];
CancellationTokenSource cts = new CancellationTokenSource();
// Ideally we'd be doing an Assert.Throws<OperationCanceledException>
// but since cancellation is a race condition we accept either outcome
Assert.Equal(cts.Token, oce.CancellationToken);
+
+ Assert.Equal(0, fs.Position); // if read was cancelled, the Position should remain unchanged
}
}
}
}
}
- [Fact]
- public async Task WriteAsyncCancelledFile()
+ [Theory]
+ [InlineData(0, true)] // 0 == no buffering
+ [InlineData(4096, true)] // 4096 == default buffer size
+ [InlineData(0, false)]
+ [InlineData(4096, false)]
+ public async Task WriteAsyncCancelledFile(int bufferSize, bool isAsync)
{
const int writeSize = 1024 * 1024;
- using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create))
+ using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.CreateNew, FileAccess.Write, FileShare.None, bufferSize, isAsync))
{
byte[] buffer = new byte[writeSize];
CancellationTokenSource cts = new CancellationTokenSource();
// Ideally we'd be doing an Assert.Throws<OperationCanceledException>
// but since cancellation is a race condition we accept either outcome
Assert.Equal(cts.Token, oce.CancellationToken);
+
+ Assert.Equal(0, fs.Length); // if write was cancelled, the file should be empty
+ Assert.Equal(0, fs.Position); // if write was cancelled, the Position should remain unchanged
}
}
}
internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
internal readonly SafeFileHandle _fileHandle;
- private AsyncWindowsFileStreamStrategy? _strategy;
+ private OSFileStreamStrategy? _strategy;
internal MemoryHandle _memoryHandle;
private int _bufferSize;
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
? ThrowHelper.CreateEndOfFileException()
: Win32Marshal.GetExceptionForWin32Error(errorCode, path);
- internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null)
+ internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset, OSFileStreamStrategy? strategy = null)
{
+ Debug.Assert(strategy is null || strategy is AsyncWindowsFileStreamStrategy, $"Strategy was expected to be null or async, got {strategy}.");
+
_result = 0;
_strategy = strategy;
_bufferSize = memory.Length;
{
Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes");
- AsyncWindowsFileStreamStrategy? strategy = _strategy;
+ OSFileStreamStrategy? strategy = _strategy;
ReleaseResources();
- if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads
+ if (strategy is not null && _bufferSize != numBytes) // true only for incomplete operations
{
- strategy.OnIncompleteRead(_bufferSize, (int)numBytes);
+ strategy.OnIncompleteOperation(_bufferSize, (int)numBytes);
}
switch (errorCode)
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
+using System.IO.Strategies;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
private ManualResetValueTaskSourceCore<long> _source;
private Operation _operation = Operation.None;
private ExecutionContext? _context;
+ private OSFileStreamStrategy? _strategy;
// These fields store the parameters for the operation.
// The first two are common for all kinds of operations.
}
finally
{
+ if (_strategy is not null)
+ {
+ // WriteAtOffset returns void, so we need to fix position only in case of an exception
+ if (exception is not null)
+ {
+ _strategy.OnIncompleteOperation(_singleSegment.Length, 0);
+ }
+ else if (_operation == Operation.Read && result != _singleSegment.Length)
+ {
+ _strategy.OnIncompleteOperation(_singleSegment.Length, (int)result);
+ }
+ }
+
_operation = Operation.None;
_context = null;
+ _strategy = null;
_cancellationToken = default;
_singleSegment = default;
_readScatterBuffers = null;
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
}
- public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
+ public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy)
{
ValidateInvariants();
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
+ _strategy = strategy;
QueueToThreadPool();
return new ValueTask<int>(this, _source.Version);
}
- public ValueTask QueueWrite(ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
+ public ValueTask QueueWrite(ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy)
{
ValidateInvariants();
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
+ _strategy = strategy;
QueueToThreadPool();
return new ValueTask(this, _source.Version);
return FileStreamHelpers.CheckFileCall(result, handle.Path);
}
- internal static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
- => ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
+ internal static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null)
+ => ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy);
private static ValueTask<long> ReadScatterAtOffsetAsync(SafeFileHandle handle, IReadOnlyList<Memory<byte>> buffers,
long fileOffset, CancellationToken cancellationToken)
}
}
- internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
- => ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
+ internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null)
+ => ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy);
private static ValueTask WriteGatherAtOffsetAsync(SafeFileHandle handle, IReadOnlyList<ReadOnlyMemory<byte>> buffers,
long fileOffset, CancellationToken cancellationToken)
}
}
- internal static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
+ internal static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<byte> buffer, long fileOffset,
+ CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null)
{
if (handle.IsAsync)
{
- (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncReadFile(handle, buffer, fileOffset, cancellationToken);
+ (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncReadFile(handle, buffer, fileOffset, cancellationToken, strategy);
if (vts is not null)
{
return ValueTask.FromResult(0);
}
- return ValueTask.FromException<int>(Win32Marshal.GetExceptionForWin32Error(errorCode));
+ return ValueTask.FromException<int>(Win32Marshal.GetExceptionForWin32Error(errorCode, handle.Path));
}
- return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
+ return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy);
}
- internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset,
- CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null)
+ private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset,
+ CancellationToken cancellationToken, OSFileStreamStrategy? strategy)
{
handle.EnsureThreadPoolBindingInitialized();
SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource();
- int errorCode = 0;
+ int errorCode = Interop.Errors.ERROR_SUCCESS;
try
{
NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy);
{
if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
{
- strategy?.OnIncompleteRead(buffer.Length, 0);
+ strategy?.OnIncompleteOperation(buffer.Length, 0);
}
}
return (vts, -1);
}
- internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
+ internal static ValueTask WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset,
+ CancellationToken cancellationToken, OSFileStreamStrategy? strategy = null)
{
if (handle.IsAsync)
{
- (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncWriteFile(handle, buffer, fileOffset, cancellationToken);
+ (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = QueueAsyncWriteFile(handle, buffer, fileOffset, cancellationToken, strategy);
if (vts is not null)
{
return ValueTask.CompletedTask;
}
- return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode));
+ return ValueTask.FromException(Win32Marshal.GetExceptionForWin32Error(errorCode, handle.Path));
}
- return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
+ return ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken, strategy);
}
- internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
+ private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncWriteFile(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset,
+ CancellationToken cancellationToken, OSFileStreamStrategy? strategy)
{
handle.EnsureThreadPoolBindingInitialized();
SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource();
+ int errorCode = Interop.Errors.ERROR_SUCCESS;
try
{
- NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset);
+ NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy);
Debug.Assert(vts._memoryHandle.Pointer != null);
// Queue an async WriteFile operation.
if (Interop.Kernel32.WriteFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0)
{
// The operation failed, or it's pending.
- int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
+ errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
switch (errorCode)
{
case Interop.Errors.ERROR_IO_PENDING:
vts.Dispose();
throw;
}
+ finally
+ {
+ if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
+ {
+ strategy?.OnIncompleteOperation(buffer.Length, 0);
+ }
+ }
// Completion handled by callback.
vts.FinishedScheduling();
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
+using System.IO.Strategies;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
}
private static ValueTask<int> ScheduleSyncReadAtOffsetAsync(SafeFileHandle handle, Memory<byte> buffer,
- long fileOffset, CancellationToken cancellationToken)
+ long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy)
{
- return handle.GetThreadPoolValueTaskSource().QueueRead(buffer, fileOffset, cancellationToken);
+ return handle.GetThreadPoolValueTaskSource().QueueRead(buffer, fileOffset, cancellationToken, strategy);
}
private static ValueTask<long> ScheduleSyncReadScatterAtOffsetAsync(SafeFileHandle handle, IReadOnlyList<Memory<byte>> buffers,
}
private static ValueTask ScheduleSyncWriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory<byte> buffer,
- long fileOffset, CancellationToken cancellationToken)
+ long fileOffset, CancellationToken cancellationToken, OSFileStreamStrategy? strategy)
{
- return handle.GetThreadPoolValueTaskSource().QueueWrite(buffer, fileOffset, cancellationToken);
+ return handle.GetThreadPoolValueTaskSource().QueueWrite(buffer, fileOffset, cancellationToken, strategy);
}
private static ValueTask ScheduleSyncWriteGatherAtOffsetAsync(SafeFileHandle handle, IReadOnlyList<ReadOnlyMemory<byte>> buffers,
internal override bool IsAsync => true;
- public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
- {
- if (!CanSeek)
- {
- return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
- }
-
- if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length)
- {
- // We know for sure that the file length can be safely cached and it has already been obtained.
- // If we have reached EOF we just return here and avoid a sys-call.
- return ValueTask.FromResult(0);
- }
-
- // This implementation updates the file position before the operation starts and updates it after incomplete read.
- // This is done to keep backward compatibility for concurrent reads.
- // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time.
- long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
-
- (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this);
- return vts != null
- ? new ValueTask<int>(vts, vts.Version)
- : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
- }
-
- public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
- {
- long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, buffer.Length) - buffer.Length : -1;
-
- (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, buffer, writeOffset, cancellationToken);
- return vts != null
- ? new ValueTask(vts, vts.Version)
- : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
- }
-
- private Exception HandleIOError(long positionBefore, int errorCode)
- {
- if (_fileHandle.CanSeek)
- {
- // Update Position... it could be anywhere.
- Interlocked.Exchange(ref _filePosition, positionBefore);
- }
-
- return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path);
- }
-
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// Fail if the file was closed
{
protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws
private readonly FileAccess _access; // What file was opened for.
- private ReadAsyncTaskSource? _readAsyncTaskSource; // Cached IValueTaskSource used for async-over-sync reads
protected long _filePosition;
- protected long _length = -1; // negative means that hasn't been fetched.
+ private long _length = -1; // negative means that hasn't been fetched.
private long _appendStart; // When appending, prevent overwriting file.
private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing.
// in case of concurrent incomplete reads, there can be multiple threads trying to update the position
// at the same time. That is why we are using Interlocked here.
- internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead);
+ internal void OnIncompleteOperation(int expectedBytesTransferred, int actualBytesTransferred)
+ => Interlocked.Add(ref _filePosition, actualBytesTransferred - expectedBytesTransferred);
- protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached;
+ private bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached;
/// <summary>Gets or sets the position within the current stream</summary>
public sealed override long Position
public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
- public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
+ public sealed override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
- return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken);
+ return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken, this);
}
public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
public sealed override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
- public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
+ public sealed override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
{
if (!CanSeek)
{
return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
}
- // This implementation updates the file position before the operation starts and updates it after incomplete read.
- // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
- long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
- ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
- return rats.QueueRead(destination, readOffset, cancellationToken);
- }
-
- /// <summary>Provides a reusable ValueTask-backing object for implementing ReadAsync.</summary>
- private sealed class ReadAsyncTaskSource : IValueTaskSource<int>, IThreadPoolWorkItem
- {
- private readonly OSFileStreamStrategy _stream;
- private ManualResetValueTaskSourceCore<int> _source;
-
- private Memory<byte> _destination;
- private long _readOffset;
- private ExecutionContext? _context;
- private CancellationToken _cancellationToken;
-
- public ReadAsyncTaskSource(OSFileStreamStrategy stream) => _stream = stream;
-
- public ValueTask<int> QueueRead(Memory<byte> destination, long readOffset, CancellationToken cancellationToken)
- {
- _destination = destination;
- _readOffset = readOffset;
- _cancellationToken = cancellationToken;
- _context = ExecutionContext.Capture();
-
- ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
- return new ValueTask<int>(this, _source.Version);
- }
-
- void IThreadPoolWorkItem.Execute()
+ if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length)
{
- if (_context is null || _context.IsDefault)
- {
- Read();
- }
- else
- {
- ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.Read(), this);
- }
+ // We know for sure that the file length can be safely cached and it has already been obtained.
+ // If we have reached EOF we just return here and avoid a sys-call.
+ return ValueTask.FromResult(0);
}
- private void Read()
- {
- Exception? error = null;
- int result = 0;
-
- try
- {
- if (_cancellationToken.IsCancellationRequested)
- {
- error = new OperationCanceledException(_cancellationToken);
- }
- else
- {
- result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset);
- }
- }
- catch (Exception e)
- {
- error = e;
- }
- finally
- {
- // if the read was incomplete, we need to update the file position:
- if (result != _destination.Length)
- {
- _stream.OnIncompleteRead(_destination.Length, result);
- }
-
- _destination = default;
- _readOffset = -1;
- _cancellationToken = default;
- _context = null;
- }
-
- if (error is not null)
- {
- _source.SetException(error);
- }
- else
- {
- _source.SetResult(result);
- }
- }
-
- int IValueTaskSource<int>.GetResult(short token)
- {
- try
- {
- return _source.GetResult(token);
- }
- finally
- {
- _source.Reset();
-#pragma warning disable CS0197
- Volatile.Write(ref _stream._readAsyncTaskSource, this);
-#pragma warning restore CS0197
- }
- }
-
- ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) =>
- _source.GetStatus(token);
-
- void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
- _source.OnCompleted(continuation, state, token, flags);
+ // This implementation updates the file position before the operation starts and updates it after incomplete read.
+ // This is done to keep backward compatibility for concurrent reads.
+ // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time.
+ long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
+ return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, readOffset, cancellationToken, this);
}
}
}