--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Tests
+{
+ public partial class StreamReaderTests
+ {
+ [Theory]
+ [InlineData(0)]
+ [InlineData(10)]
+ public async Task Read_EmptySpan_ReadsNothing(int length)
+ {
+ using (var r = new StreamReader(new MemoryStream(Enumerable.Repeat((byte)'s', length).ToArray())))
+ {
+ Assert.Equal(0, r.Read(Span<char>.Empty));
+ Assert.Equal(0, r.ReadBlock(Span<char>.Empty));
+ Assert.Equal(0, await r.ReadAsync(Memory<char>.Empty));
+ Assert.Equal(0, await r.ReadBlockAsync(Memory<char>.Empty));
+ }
+ }
+
+ [Theory]
+ [InlineData(1, 100, 1)]
+ [InlineData(1, 100, 101)]
+ [InlineData(100, 50, 1)]
+ [InlineData(100, 50, 101)]
+ public void Read_ReadsExpectedData(int readLength, int totalLength, int bufferSize)
+ {
+ var data = new char[totalLength];
+ var r = new Random(42);
+ for (int i = 0; i < data.Length; i++)
+ {
+ data[i] = (char)('a' + r.Next(0, 26));
+ }
+
+ var result = new char[data.Length];
+ Span<char> dst = result;
+
+ using (var sr = new StreamReader(new MemoryStream(data.Select(i => (byte)i).ToArray()), Encoding.ASCII, false, bufferSize))
+ {
+ while (dst.Length > 0)
+ {
+ int read = sr.Read(dst);
+ Assert.InRange(read, 1, dst.Length);
+ dst = dst.Slice(read);
+ }
+ }
+
+ Assert.Equal<char>(data, result);
+ }
+
+ [Theory]
+ [InlineData(1, 100, 1)]
+ [InlineData(1, 100, 101)]
+ [InlineData(100, 50, 1)]
+ [InlineData(100, 50, 101)]
+ public void ReadBlock_ReadsExpectedData(int readLength, int totalLength, int bufferSize)
+ {
+ var data = new char[totalLength];
+ var r = new Random(42);
+ for (int i = 0; i < data.Length; i++)
+ {
+ data[i] = (char)('a' + r.Next(0, 26));
+ }
+
+ var result = new char[data.Length];
+ Span<char> dst = result;
+
+ using (var sr = new StreamReader(new MemoryStream(data.Select(i => (byte)i).ToArray()), Encoding.ASCII, false, bufferSize))
+ {
+ while (dst.Length > 0)
+ {
+ int read = sr.ReadBlock(dst);
+ Assert.InRange(read, 1, dst.Length);
+ dst = dst.Slice(read);
+ }
+ }
+
+ Assert.Equal<char>(data, result);
+ }
+
+ [Theory]
+ [InlineData(1, 100, 1)]
+ [InlineData(1, 100, 101)]
+ [InlineData(100, 50, 1)]
+ [InlineData(100, 50, 101)]
+ public async Task ReadAsync_ReadsExpectedData(int readLength, int totalLength, int bufferSize)
+ {
+ var data = new char[totalLength];
+ var r = new Random(42);
+ for (int i = 0; i < data.Length; i++)
+ {
+ data[i] = (char)('a' + r.Next(0, 26));
+ }
+
+ var result = new char[data.Length];
+ Memory<char> dst = result;
+
+ using (var sr = new StreamReader(new MemoryStream(data.Select(i => (byte)i).ToArray()), Encoding.ASCII, false, bufferSize))
+ {
+ while (dst.Length > 0)
+ {
+ int read = await sr.ReadAsync(dst);
+ Assert.InRange(read, 1, dst.Length);
+ dst = dst.Slice(read);
+ }
+ }
+
+ Assert.Equal<char>(data, result);
+ }
+
+ [Theory]
+ [InlineData(1, 100, 1)]
+ [InlineData(1, 100, 101)]
+ [InlineData(100, 50, 1)]
+ [InlineData(100, 50, 101)]
+ public async Task ReadBlockAsync_ReadsExpectedData(int readLength, int totalLength, int bufferSize)
+ {
+ var data = new char[totalLength];
+ var r = new Random(42);
+ for (int i = 0; i < data.Length; i++)
+ {
+ data[i] = (char)('a' + r.Next(0, 26));
+ }
+
+ var result = new char[data.Length];
+ Memory<char> dst = result;
+
+ using (var sr = new StreamReader(new MemoryStream(data.Select(i => (byte)i).ToArray()), Encoding.ASCII, false, bufferSize))
+ {
+ while (dst.Length > 0)
+ {
+ int read = await sr.ReadBlockAsync(dst);
+ Assert.InRange(read, 1, dst.Length);
+ dst = dst.Slice(read);
+ }
+ }
+
+ Assert.Equal<char>(data, result);
+ }
+
+ [Fact]
+ public void ReadBlock_RepeatsReadsUntilReadDesiredAmount()
+ {
+ char[] data = "hello world".ToCharArray();
+ var ms = new MemoryStream(Encoding.UTF8.GetBytes(data));
+ var s = new DelegateStream(
+ canReadFunc: () => true,
+ readFunc: (buffer, offset, count) => ms.Read(buffer, offset, 1)); // do actual reads a byte at a time
+ using (var r = new StreamReader(s, Encoding.UTF8, false, 2))
+ {
+ var result = new char[data.Length];
+ Assert.Equal(data.Length, r.ReadBlock((Span<char>)result));
+ Assert.Equal<char>(data, result);
+ }
+ }
+
+ [Fact]
+ public async Task ReadBlockAsync_RepeatsReadsUntilReadDesiredAmount()
+ {
+ char[] data = "hello world".ToCharArray();
+ var ms = new MemoryStream(Encoding.UTF8.GetBytes(data));
+ var s = new DelegateStream(
+ canReadFunc: () => true,
+ readAsyncFunc: (buffer, offset, count, cancellationToken) => ms.ReadAsync(buffer, offset, 1)); // do actual reads a byte at a time
+ using (var r = new StreamReader(s, Encoding.UTF8, false, 2))
+ {
+ var result = new char[data.Length];
+ Assert.Equal(data.Length, await r.ReadBlockAsync((Memory<char>)result));
+ Assert.Equal<char>(data, result);
+ }
+ }
+
+ [Fact]
+ public async Task ReadAsync_Precanceled_ThrowsException()
+ {
+ using (var sr = new StreamReader(new MemoryStream()))
+ {
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(() => sr.ReadAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(() => sr.ReadBlockAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
+ }
+ }
+
+ [Fact]
+ public async Task Read_SpanMemory_DisposedStream_ThrowsException()
+ {
+ var sr = new StreamReader(new MemoryStream());
+ sr.Dispose();
+
+ Assert.Throws<ObjectDisposedException>(() => sr.Read(Span<char>.Empty));
+ Assert.Throws<ObjectDisposedException>(() => sr.ReadBlock(Span<char>.Empty));
+ await Assert.ThrowsAsync<ObjectDisposedException>(() => sr.ReadAsync(Memory<char>.Empty).AsTask());
+ await Assert.ThrowsAsync<ObjectDisposedException>(() => sr.ReadBlockAsync(Memory<char>.Empty).AsTask());
+ }
+ }
+}
<Compile Include="MemoryStream\MemoryStreamTests.netcoreapp.cs" Condition="'$(TargetGroup)' == 'netcoreapp'" />
<Compile Include="StreamReader\StreamReader.CtorTests.cs" />
<Compile Include="StreamReader\StreamReaderTests.cs" />
+ <Compile Include="StreamReader\StreamReaderTests.netcoreapp.cs" Condition="'$(TargetGroup)' == 'netcoreapp'" />
<Compile Include="StreamWriter\StreamWriter.BaseStream.cs" />
<Compile Include="StreamWriter\StreamWriter.CloseTests.cs" />
<Compile Include="StreamWriter\StreamWriter.CtorTests.cs" />
public override int Peek() { throw null; }
public override int Read() { throw null; }
public override int Read(char[] buffer, int index, int count) { throw null; }
+ public override int Read(System.Span<char> buffer) { throw null; }
+ public override int ReadBlock(System.Span<char> buffer) { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(char[] buffer, int index, int count) { throw null; }
+ public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<char> buffer, System.Threading.CancellationToken cancellationToken = default) { throw null; }
public override int ReadBlock(char[] buffer, int index, int count) { throw null; }
public override System.Threading.Tasks.Task<int> ReadBlockAsync(char[] buffer, int index, int count) { throw null; }
+ public override System.Threading.Tasks.ValueTask<int> ReadBlockAsync(System.Memory<char> buffer, System.Threading.CancellationToken cancellationToken = default) { throw null; }
public override string ReadLine() { throw null; }
public override System.Threading.Tasks.Task<string> ReadLineAsync() { throw null; }
public override string ReadToEnd() { throw null; }
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using System.Text;
-using System.Diagnostics.CodeAnalysis;
-using System.Threading.Tasks;
using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
namespace System.IO
{
throw new ArgumentException(SR.Argument_InvalidOffLen);
}
+ return ReadSpan(new Span<char>(buffer, index, count));
+ }
+
+ public override int Read(Span<char> buffer) =>
+ GetType() == typeof(StreamReader) ? ReadSpan(buffer) :
+ base.Read(buffer); // Defer to Read(char[], ...) if a derived type may have previously overridden it
+
+ private int ReadSpan(Span<char> buffer)
+ {
if (_stream == null)
{
throw new ObjectDisposedException(null, SR.ObjectDisposed_ReaderClosed);
// As a perf optimization, if we had exactly one buffer's worth of
// data read in, let's try writing directly to the user's buffer.
bool readToUserBuffer = false;
+ int count = buffer.Length;
while (count > 0)
{
int n = _charLen - _charPos;
if (n == 0)
{
- n = ReadBuffer(buffer, index + charsRead, count, out readToUserBuffer);
+ n = ReadBuffer(buffer.Slice(charsRead), out readToUserBuffer);
}
if (n == 0)
{
}
if (!readToUserBuffer)
{
- Buffer.BlockCopy(_charBuffer, _charPos * 2, buffer, (index + charsRead) * 2, n * 2);
+ new Span<char>(_charBuffer, _charPos, n).CopyTo(buffer.Slice(charsRead));
_charPos += n;
}
return base.ReadBlock(buffer, index, count);
}
+ public override int ReadBlock(Span<char> buffer)
+ {
+ if (GetType() != typeof(StreamReader))
+ {
+ // Defer to Read(char[], ...) if a derived type may have previously overridden it.
+ return base.ReadBlock(buffer);
+ }
+
+ int i, n = 0;
+ do
+ {
+ i = ReadSpan(buffer.Slice(n));
+ n += i;
+ } while (i > 0 && n < buffer.Length);
+ return n;
+ }
+
// Trims n bytes from the front of the buffer.
private void CompressBuffer(int n)
{
// buffer's worth of bytes could produce.
// This optimization, if run, will break SwitchEncoding, so we must not do
// this on the first call to ReadBuffer.
- private int ReadBuffer(char[] userBuffer, int userOffset, int desiredChars, out bool readToUserBuffer)
+ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
{
_charLen = 0;
_charPos = 0;
// buffer optimization. This affects reads where the end of the
// Stream comes in the middle somewhere, and when you ask for
// fewer chars than your buffer could produce.
- readToUserBuffer = desiredChars >= _maxCharsPerBuffer;
+ readToUserBuffer = userBuffer.Length >= _maxCharsPerBuffer;
do
{
{
if (readToUserBuffer)
{
- charsRead = _decoder.GetChars(_byteBuffer, 0, _byteLen, userBuffer, userOffset + charsRead);
+ charsRead = _decoder.GetChars(new ReadOnlySpan<byte>(_byteBuffer, 0, _byteLen), userBuffer.Slice(charsRead), flush: false);
_charLen = 0; // StreamReader's buffer is empty.
}
else
{
DetectEncoding();
// DetectEncoding changes some buffer state. Recompute this.
- readToUserBuffer = desiredChars >= _maxCharsPerBuffer;
+ readToUserBuffer = userBuffer.Length >= _maxCharsPerBuffer;
}
_charPos = 0;
if (readToUserBuffer)
{
- charsRead += _decoder.GetChars(_byteBuffer, 0, _byteLen, userBuffer, userOffset + charsRead);
+ charsRead += _decoder.GetChars(new ReadOnlySpan<byte>(_byteBuffer, 0, _byteLen), userBuffer.Slice(charsRead), flush:false);
_charLen = 0; // StreamReader's buffer is empty.
}
else
}
} while (charsRead == 0);
- _isBlocked &= charsRead < desiredChars;
+ _isBlocked &= charsRead < userBuffer.Length;
//Console.WriteLine("ReadBuffer: charsRead: "+charsRead+" readToUserBuffer: "+readToUserBuffer);
return charsRead;
CheckAsyncTaskInProgress();
- Task<int> task = ReadAsyncInternal(buffer, index, count);
+ Task<int> task = ReadAsyncInternal(new Memory<char>(buffer, index, count), default).AsTask();
_asyncReadTask = task;
return task;
}
- internal override async Task<int> ReadAsyncInternal(char[] buffer, int index, int count)
+ public override ValueTask<int> ReadAsync(Memory<char> buffer, CancellationToken cancellationToken = default)
+ {
+ if (GetType() != typeof(StreamReader))
+ {
+ // Ensure we use existing overrides if a class already overrode existing overloads.
+ return base.ReadAsync(buffer, cancellationToken);
+ }
+
+ if (_stream == null)
+ {
+ throw new ObjectDisposedException(null, SR.ObjectDisposed_ReaderClosed);
+ }
+
+ CheckAsyncTaskInProgress();
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+ }
+
+ return ReadAsyncInternal(buffer, cancellationToken);
+ }
+
+ internal override async ValueTask<int> ReadAsyncInternal(Memory<char> buffer, CancellationToken cancellationToken)
{
if (_charPos == _charLen && (await ReadBufferAsync().ConfigureAwait(false)) == 0)
{
Byte[] tmpByteBuffer = _byteBuffer;
Stream tmpStream = _stream;
+ int count = buffer.Length;
while (count > 0)
{
// n is the characters available in _charBuffer
{
Debug.Assert(_bytePos <= _encoding.Preamble.Length, "possible bug in _compressPreamble. Are two threads using this StreamReader at the same time?");
int tmpBytePos = _bytePos;
- int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos).ConfigureAwait(false);
+ int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos, cancellationToken).ConfigureAwait(false);
Debug.Assert(len >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");
if (len == 0)
{
if (readToUserBuffer)
{
- n = _decoder.GetChars(tmpByteBuffer, 0, _byteLen, buffer, index + charsRead);
+ n = _decoder.GetChars(new ReadOnlySpan<byte>(tmpByteBuffer, 0, _byteLen), buffer.Slice(charsRead).Span, flush: false);
_charLen = 0; // StreamReader's buffer is empty.
}
else
{
Debug.Assert(_bytePos == 0, "_bytePos can be non zero only when we are trying to _checkPreamble. Are two threads using this StreamReader at the same time?");
- _byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length).ConfigureAwait(false);
+ _byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length, cancellationToken).ConfigureAwait(false);
Debug.Assert(_byteLen >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");
_charPos = 0;
if (readToUserBuffer)
{
- n += _decoder.GetChars(tmpByteBuffer, 0, _byteLen, buffer, index + charsRead);
+ n += _decoder.GetChars(new ReadOnlySpan<byte>(tmpByteBuffer, 0, _byteLen), buffer.Slice(charsRead).Span, flush: false);
// Why did the bytes yield no chars?
Debug.Assert(n > 0);
if (!readToUserBuffer)
{
- Buffer.BlockCopy(_charBuffer, _charPos * 2, buffer, (index + charsRead) * 2, n * 2);
+ new Span<char>(_charBuffer, _charPos, n).CopyTo(buffer.Slice(charsRead).Span);
_charPos += n;
}
return task;
}
+ public override ValueTask<int> ReadBlockAsync(Memory<char> buffer, CancellationToken cancellationToken = default)
+ {
+ if (GetType() != typeof(StreamReader))
+ {
+ // If a derived type may have overridden ReadBlockAsync(char[], ...) before this overload
+ // was introduced, defer to it.
+ return base.ReadBlockAsync(buffer, cancellationToken);
+ }
+
+ if (_stream == null)
+ {
+ throw new ObjectDisposedException(null, SR.ObjectDisposed_ReaderClosed);
+ }
+
+ CheckAsyncTaskInProgress();
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
+ }
+
+ ValueTask<int> vt = ReadBlockAsyncInternal(buffer, cancellationToken);
+ if (vt.IsCompletedSuccessfully)
+ {
+ return vt;
+ }
+
+ Task<int> t = vt.AsTask();
+ _asyncReadTask = t;
+ return new ValueTask<int>(t);
+ }
+
private async Task<int> ReadBufferAsync()
{
_charLen = 0;
char[] charBuffer, int charPos, int charLen, char[] coreNewLine,
bool autoFlush, bool appendNewLine, CancellationToken cancellationToken)
{
- while (source.Length > 0)
+ int copied = 0;
+ while (copied < source.Length)
{
if (charPos == charLen)
{
charPos = 0;
}
- int n = Math.Min(charLen - charPos, source.Length);
+ int n = Math.Min(charLen - charPos, source.Length - copied);
Debug.Assert(n > 0, "StreamWriter::Write(char[], int, int) isn't making progress! This is most likely a race condition in user code.");
- source.Span.Slice(0, n).CopyTo(new Span<char>(charBuffer, charPos, n));
-
- source = source.Slice(n);
+ source.Span.Slice(copied, n).CopyTo(new Span<char>(charBuffer, charPos, n));
charPos += n;
+ copied += n;
}
if (appendNewLine)
public async virtual Task<string> ReadToEndAsync()
{
- char[] chars = new char[4096];
- int len;
- StringBuilder sb = new StringBuilder(4096);
- while ((len = await ReadAsyncInternal(chars, 0, chars.Length).ConfigureAwait(false)) != 0)
+ var sb = new StringBuilder(4096);
+ char[] chars = ArrayPool<char>.Shared.Rent(4096);
+ try
{
- sb.Append(chars, 0, len);
+ int len;
+ while ((len = await ReadAsyncInternal(chars, default).ConfigureAwait(false)) != 0)
+ {
+ sb.Append(chars, 0, len);
+ }
+ }
+ finally
+ {
+ ArrayPool<char>.Shared.Return(chars);
}
return sb.ToString();
}
throw new ArgumentException(SR.Argument_InvalidOffLen);
}
- return ReadAsyncInternal(buffer, index, count);
+ return ReadAsyncInternal(new Memory<char>(buffer, index, count), default).AsTask();
}
public virtual ValueTask<int> ReadAsync(Memory<char> buffer, CancellationToken cancellationToken = default(CancellationToken)) =>
return t.Item1.Read(t.Item2.Span);
}, Tuple.Create(this, buffer), cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default));
- internal virtual Task<int> ReadAsyncInternal(char[] buffer, int index, int count)
+ internal virtual ValueTask<int> ReadAsyncInternal(Memory<char> buffer, CancellationToken cancellationToken)
{
- Debug.Assert(buffer != null);
- Debug.Assert(index >= 0);
- Debug.Assert(count >= 0);
- Debug.Assert(buffer.Length - index >= count);
-
- var tuple = new Tuple<TextReader, char[], int, int>(this, buffer, index, count);
- return Task<int>.Factory.StartNew(state =>
+ var tuple = new Tuple<TextReader, Memory<char>>(this, buffer);
+ return new ValueTask<int>(Task<int>.Factory.StartNew(state =>
{
- var t = (Tuple<TextReader, char[], int, int>)state;
- return t.Item1.Read(t.Item2, t.Item3, t.Item4);
+ var t = (Tuple<TextReader, Memory<char>>)state;
+ return t.Item1.Read(t.Item2.Span);
},
- tuple, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
+ tuple, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default));
}
public virtual Task<int> ReadBlockAsync(char[] buffer, int index, int count)
throw new ArgumentException(SR.Argument_InvalidOffLen);
}
- return ReadBlockAsyncInternal(buffer, index, count);
+ return ReadBlockAsyncInternal(new Memory<char>(buffer, index, count), default).AsTask();
}
public virtual ValueTask<int> ReadBlockAsync(Memory<char> buffer, CancellationToken cancellationToken = default(CancellationToken)) =>
return t.Item1.ReadBlock(t.Item2.Span);
}, Tuple.Create(this, buffer), cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default));
- private async Task<int> ReadBlockAsyncInternal(char[] buffer, int index, int count)
+ internal async ValueTask<int> ReadBlockAsyncInternal(Memory<char> buffer, CancellationToken cancellationToken)
{
- Debug.Assert(buffer != null);
- Debug.Assert(index >= 0);
- Debug.Assert(count >= 0);
- Debug.Assert(buffer.Length - index >= count);
-
- int i, n = 0;
+ int n = 0, i;
do
{
- i = await ReadAsyncInternal(buffer, index + n, count - n).ConfigureAwait(false);
+ i = await ReadAsyncInternal(buffer.Slice(n), cancellationToken).ConfigureAwait(false);
n += i;
- } while (i > 0 && n < count);
+ } while (i > 0 && n < buffer.Length);
return n;
}