1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 /*============================================================
11 ** Purpose: Abstract base class for all Streams. Provides
12 ** default implementations of asynchronous reads & writes, in
13 ** terms of the synchronous reads & writes (and vice versa).
16 ===========================================================*/
20 using System.Threading;
21 using System.Threading.Tasks;
23 using System.Runtime.InteropServices;
24 using System.Runtime.CompilerServices;
25 using System.Runtime.ExceptionServices;
26 using System.Security;
27 using System.Diagnostics;
28 using System.Reflection;
32 public abstract class Stream : MarshalByRefObject, IDisposable
34 public static readonly Stream Null = new NullStream();
36 //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
37 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
38 // improvement in Copy performance.
39 private const int _DefaultCopyBufferSize = 81920;
41 // To implement Async IO operations on streams that don't support async IO
43 private ReadWriteTask _activeReadWriteTask;
44 private SemaphoreSlim _asyncActiveSemaphore;
46 internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
48 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
49 // WaitHandle, we don't need to worry about Disposing it.
50 return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
53 public abstract bool CanRead
58 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
59 public abstract bool CanSeek
64 public virtual bool CanTimeout
72 public abstract bool CanWrite
77 public abstract long Length
82 public abstract long Position
88 public virtual int ReadTimeout
92 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
96 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
100 public virtual int WriteTimeout
104 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
108 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
112 public Task CopyToAsync(Stream destination)
114 int bufferSize = GetCopyBufferSize();
116 return CopyToAsync(destination, bufferSize);
119 public Task CopyToAsync(Stream destination, Int32 bufferSize)
121 return CopyToAsync(destination, bufferSize, CancellationToken.None);
124 public Task CopyToAsync(Stream destination, CancellationToken cancellationToken)
126 int bufferSize = GetCopyBufferSize();
128 return CopyToAsync(destination, bufferSize, cancellationToken);
131 public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
133 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
135 return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
138 private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
140 Debug.Assert(destination != null);
141 Debug.Assert(bufferSize > 0);
142 Debug.Assert(CanRead);
143 Debug.Assert(destination.CanWrite);
145 byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
146 bufferSize = 0; // reuse same field for high water mark to avoid needing another field in the state machine
151 int bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
152 if (bytesRead == 0) break;
153 if (bytesRead > bufferSize) bufferSize = bytesRead;
154 await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
159 Array.Clear(buffer, 0, bufferSize); // clear only the most we used
160 ArrayPool<byte>.Shared.Return(buffer, clearArray: false);
164 // Reads the bytes from the current stream and writes the bytes to
165 // the destination stream until all bytes are read, starting at
166 // the current position.
167 public void CopyTo(Stream destination)
169 int bufferSize = GetCopyBufferSize();
171 CopyTo(destination, bufferSize);
174 public virtual void CopyTo(Stream destination, int bufferSize)
176 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
178 byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
179 int highwaterMark = 0;
183 while ((read = Read(buffer, 0, buffer.Length)) != 0)
185 if (read > highwaterMark) highwaterMark = read;
186 destination.Write(buffer, 0, read);
191 Array.Clear(buffer, 0, highwaterMark); // clear only the most we used
192 ArrayPool<byte>.Shared.Return(buffer, clearArray: false);
196 private int GetCopyBufferSize()
198 int bufferSize = _DefaultCopyBufferSize;
202 long length = Length;
203 long position = Position;
204 if (length <= position) // Handles negative overflows
206 // There are no bytes left in the stream to copy.
207 // However, because CopyTo{Async} is virtual, we need to
208 // ensure that any override is still invoked to provide its
209 // own validation, so we use the smallest legal buffer size here.
214 long remaining = length - position;
217 // In the case of a positive overflow, stick to the default size
218 bufferSize = (int)Math.Min(bufferSize, remaining);
226 // Stream used to require that all cleanup logic went into Close(),
227 // which was thought up before we invented IDisposable. However, we
228 // need to follow the IDisposable pattern so that users can write
229 // sensible subclasses without needing to inspect all their base
230 // classes, and without worrying about version brittleness, from a
231 // base class switching to the Dispose pattern. We're moving
232 // Stream to the Dispose(bool) pattern - that's where all subclasses
233 // should put their cleanup starting in V2.
234 public virtual void Close()
236 // Ideally we would assert CanRead == CanWrite == CanSeek = false,
237 // but we'd have to fix PipeStream & NetworkStream very carefully.
240 GC.SuppressFinalize(this);
243 public void Dispose()
245 // Ideally we would assert CanRead == CanWrite == CanSeek = false,
246 // but we'd have to fix PipeStream & NetworkStream very carefully.
252 protected virtual void Dispose(bool disposing)
254 // Note: Never change this to call other virtual methods on Stream
255 // like Write, since the state on subclasses has already been
256 // torn down. This is the last code to run on cleanup for a stream.
259 public abstract void Flush();
261 public Task FlushAsync()
263 return FlushAsync(CancellationToken.None);
266 public virtual Task FlushAsync(CancellationToken cancellationToken)
268 return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
269 cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
272 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
273 protected virtual WaitHandle CreateWaitHandle()
275 return new ManualResetEvent(false);
278 public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
280 return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
283 internal IAsyncResult BeginReadInternal(
284 byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
285 bool serializeAsynchronously, bool apm)
287 if (!CanRead) throw Error.GetReadNotSupported();
289 // To avoid a race with a stream's position pointer & generating race conditions
290 // with internal buffer indexes in our own streams that
291 // don't natively support async IO operations when there are multiple
292 // async requests outstanding, we will block the application's main
293 // thread if it does a second IO request until the first one completes.
294 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
295 Task semaphoreTask = null;
296 if (serializeAsynchronously)
298 semaphoreTask = semaphore.WaitAsync();
305 // Create the task to asynchronously do a Read. This task serves both
306 // as the asynchronous work item and as the IAsyncResult returned to the user.
307 var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
309 // The ReadWriteTask stores all of the parameters to pass to Read.
310 // As we're currently inside of it, we can get the current task
311 // and grab the parameters from it.
312 var thisTask = Task.InternalCurrent as ReadWriteTask;
313 Debug.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
317 // Do the Read and return the number of bytes read
318 return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
322 // If this implementation is part of Begin/EndXx, then the EndXx method will handle
323 // finishing the async operation. However, if this is part of XxAsync, then there won't
324 // be an end method, and this task is responsible for cleaning up.
327 thisTask._stream.FinishTrackingAsyncOperation();
330 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
332 }, state, this, buffer, offset, count, callback);
335 if (semaphoreTask != null)
336 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
338 RunReadWriteTask(asyncResult);
341 return asyncResult; // return it
344 public virtual int EndRead(IAsyncResult asyncResult)
346 if (asyncResult == null)
347 throw new ArgumentNullException(nameof(asyncResult));
349 var readTask = _activeReadWriteTask;
351 if (readTask == null)
353 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
355 else if (readTask != asyncResult)
357 throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
359 else if (!readTask._isRead)
361 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
366 return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
370 FinishTrackingAsyncOperation();
374 public Task<int> ReadAsync(Byte[] buffer, int offset, int count)
376 return ReadAsync(buffer, offset, count, CancellationToken.None);
379 public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
381 // If cancellation was requested, bail early with an already completed task.
382 // Otherwise, return a task that represents the Begin/End methods.
383 return cancellationToken.IsCancellationRequested
384 ? Task.FromCanceled<int>(cancellationToken)
385 : BeginEndReadAsync(buffer, offset, count);
388 public virtual ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
390 if (destination.TryGetArray(out ArraySegment<byte> array))
392 return new ValueTask<int>(ReadAsync(array.Array, array.Offset, array.Count, cancellationToken));
396 byte[] buffer = ArrayPool<byte>.Shared.Rent(destination.Length);
397 return FinishReadAsync(ReadAsync(buffer, 0, destination.Length, cancellationToken), buffer, destination);
399 async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
403 int result = await readTask.ConfigureAwait(false);
404 new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
409 ArrayPool<byte>.Shared.Return(localBuffer);
415 [MethodImplAttribute(MethodImplOptions.InternalCall)]
416 private extern bool HasOverriddenBeginEndRead();
418 private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
420 if (!HasOverriddenBeginEndRead())
422 // If the Stream does not override Begin/EndRead, then we can take an optimized path
423 // that skips an extra layer of tasks / IAsyncResults.
424 return (Task<Int32>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
427 // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
428 return TaskFactory<Int32>.FromAsyncTrim(
429 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
430 (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
431 (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
434 private struct ReadWriteParameters // struct for arguments to Read and Write calls
436 internal byte[] Buffer;
443 public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
445 return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
448 internal IAsyncResult BeginWriteInternal(
449 byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
450 bool serializeAsynchronously, bool apm)
452 if (!CanWrite) throw Error.GetWriteNotSupported();
454 // To avoid a race condition with a stream's position pointer & generating conditions
455 // with internal buffer indexes in our own streams that
456 // don't natively support async IO operations when there are multiple
457 // async requests outstanding, we will block the application's main
458 // thread if it does a second IO request until the first one completes.
459 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
460 Task semaphoreTask = null;
461 if (serializeAsynchronously)
463 semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
467 semaphore.Wait(); // synchronously wait here
470 // Create the task to asynchronously do a Write. This task serves both
471 // as the asynchronous work item and as the IAsyncResult returned to the user.
472 var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
474 // The ReadWriteTask stores all of the parameters to pass to Write.
475 // As we're currently inside of it, we can get the current task
476 // and grab the parameters from it.
477 var thisTask = Task.InternalCurrent as ReadWriteTask;
478 Debug.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
483 thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
484 return 0; // not used, but signature requires a value be returned
488 // If this implementation is part of Begin/EndXx, then the EndXx method will handle
489 // finishing the async operation. However, if this is part of XxAsync, then there won't
490 // be an end method, and this task is responsible for cleaning up.
493 thisTask._stream.FinishTrackingAsyncOperation();
496 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
498 }, state, this, buffer, offset, count, callback);
501 if (semaphoreTask != null)
502 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
504 RunReadWriteTask(asyncResult);
506 return asyncResult; // return it
509 private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
511 Debug.Assert(readWriteTask != null);
512 Debug.Assert(asyncWaiter != null);
514 // If the wait has already completed, run the task.
515 if (asyncWaiter.IsCompleted)
517 Debug.Assert(asyncWaiter.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
518 RunReadWriteTask(readWriteTask);
520 else // Otherwise, wait for our turn, and then run the task.
522 asyncWaiter.ContinueWith((t, state) =>
524 Debug.Assert(t.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
525 var rwt = (ReadWriteTask)state;
526 rwt._stream.RunReadWriteTask(rwt); // RunReadWriteTask(readWriteTask);
527 }, readWriteTask, default(CancellationToken), TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
531 private void RunReadWriteTask(ReadWriteTask readWriteTask)
533 Debug.Assert(readWriteTask != null);
534 Debug.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
536 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
537 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
538 // two interlocked operations. However, if ReadWriteTask is ever changed to use
539 // a cancellation token, this should be changed to use Start.
540 _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
541 readWriteTask.m_taskScheduler = TaskScheduler.Default;
542 readWriteTask.ScheduleAndStart(needsProtection: false);
545 private void FinishTrackingAsyncOperation()
547 _activeReadWriteTask = null;
548 Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
549 _asyncActiveSemaphore.Release();
552 public virtual void EndWrite(IAsyncResult asyncResult)
554 if (asyncResult == null)
555 throw new ArgumentNullException(nameof(asyncResult));
557 var writeTask = _activeReadWriteTask;
558 if (writeTask == null)
560 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
562 else if (writeTask != asyncResult)
564 throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
566 else if (writeTask._isRead)
568 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
573 writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
574 Debug.Assert(writeTask.Status == TaskStatus.RanToCompletion);
578 FinishTrackingAsyncOperation();
582 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
583 // A single instance of this task serves four purposes:
584 // 1. The work item scheduled to run the Read / Write operation
585 // 2. The state holding the arguments to be passed to Read / Write
586 // 3. The IAsyncResult returned from BeginRead / BeginWrite
587 // 4. The completion action that runs to invoke the user-provided callback.
588 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
589 // IAsyncResult must have completed, so we can't just invoke the handler
590 // from within the task, since it is the IAsyncResult, and thus it's not
591 // yet completed. Instead, we use AddCompletionAction to install this
592 // task as its own completion handler. That saves the need to allocate
593 // a separate completion handler, it guarantees that the task will
594 // have completed by the time the handler is invoked, and it allows
595 // the handler to be invoked synchronously upon the completion of the
596 // task. This all enables BeginRead / BeginWrite to be implemented
597 // with a single allocation.
598 private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
600 internal readonly bool _isRead;
601 internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
602 internal Stream _stream;
603 internal byte[] _buffer;
604 internal readonly int _offset;
605 internal readonly int _count;
606 private AsyncCallback _callback;
607 private ExecutionContext _context;
609 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
615 public ReadWriteTask(
618 Func<object, int> function, object state,
619 Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
620 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
622 Debug.Assert(function != null);
623 Debug.Assert(stream != null);
624 Debug.Assert(buffer != null);
626 // Store the arguments
634 // If a callback was provided, we need to:
635 // - Store the user-provided handler
636 // - Capture an ExecutionContext under which to invoke the handler
637 // - Add this task as its own completion handler so that the Invoke method
638 // will run the callback when this task completes.
639 if (callback != null)
641 _callback = callback;
642 _context = ExecutionContext.Capture();
643 base.AddCompletionAction(this);
647 private static void InvokeAsyncCallback(object completedTask)
649 var rwc = (ReadWriteTask)completedTask;
650 var callback = rwc._callback;
651 rwc._callback = null;
655 private static ContextCallback s_invokeAsyncCallback;
657 void ITaskCompletionAction.Invoke(Task completingTask)
659 // Get the ExecutionContext. If there is none, just run the callback
660 // directly, passing in the completed task as the IAsyncResult.
661 // If there is one, process it with ExecutionContext.Run.
662 var context = _context;
665 var callback = _callback;
667 callback(completingTask);
673 var invokeAsyncCallback = s_invokeAsyncCallback;
674 if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign race condition
676 ExecutionContext.Run(context, invokeAsyncCallback, this);
680 bool ITaskCompletionAction.InvokeMayRunArbitraryCode { get { return true; } }
683 public Task WriteAsync(Byte[] buffer, int offset, int count)
685 return WriteAsync(buffer, offset, count, CancellationToken.None);
688 public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
690 // If cancellation was requested, bail early with an already completed task.
691 // Otherwise, return a task that represents the Begin/End methods.
692 return cancellationToken.IsCancellationRequested
693 ? Task.FromCanceled(cancellationToken)
694 : BeginEndWriteAsync(buffer, offset, count);
697 public virtual Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
699 if (MemoryMarshal.TryGetArray(source, out ArraySegment<byte> array))
701 return WriteAsync(array.Array, array.Offset, array.Count, cancellationToken);
705 byte[] buffer = ArrayPool<byte>.Shared.Rent(source.Length);
706 source.Span.CopyTo(buffer);
707 return FinishWriteAsync(WriteAsync(buffer, 0, source.Length, cancellationToken), buffer);
709 async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
713 await writeTask.ConfigureAwait(false);
717 ArrayPool<byte>.Shared.Return(localBuffer);
723 [MethodImplAttribute(MethodImplOptions.InternalCall)]
724 private extern bool HasOverriddenBeginEndWrite();
726 private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
728 if (!HasOverriddenBeginEndWrite())
730 // If the Stream does not override Begin/EndWrite, then we can take an optimized path
731 // that skips an extra layer of tasks / IAsyncResults.
732 return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
735 // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
736 return TaskFactory<VoidTaskResult>.FromAsyncTrim(
737 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
738 (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
739 (stream, asyncResult) => // cached by compiler
741 stream.EndWrite(asyncResult);
742 return default(VoidTaskResult);
746 public abstract long Seek(long offset, SeekOrigin origin);
748 public abstract void SetLength(long value);
750 public abstract int Read(byte[] buffer, int offset, int count);
752 public virtual int Read(Span<byte> destination)
754 byte[] buffer = ArrayPool<byte>.Shared.Rent(destination.Length);
757 int numRead = Read(buffer, 0, destination.Length);
758 if ((uint)numRead > destination.Length)
760 throw new IOException(SR.IO_StreamTooLong);
762 new Span<byte>(buffer, 0, numRead).CopyTo(destination);
765 finally { ArrayPool<byte>.Shared.Return(buffer); }
768 // Reads one byte from the stream by calling Read(byte[], int, int).
769 // Will return an unsigned byte cast to an int or -1 on end of stream.
770 // This implementation does not perform well because it allocates a new
771 // byte[] each time you call it, and should be overridden by any
772 // subclass that maintains an internal buffer. Then, it can help perf
773 // significantly for people who are reading one byte at a time.
774 public virtual int ReadByte()
776 byte[] oneByteArray = new byte[1];
777 int r = Read(oneByteArray, 0, 1);
780 return oneByteArray[0];
783 public abstract void Write(byte[] buffer, int offset, int count);
785 public virtual void Write(ReadOnlySpan<byte> source)
787 byte[] buffer = ArrayPool<byte>.Shared.Rent(source.Length);
790 source.CopyTo(buffer);
791 Write(buffer, 0, source.Length);
793 finally { ArrayPool<byte>.Shared.Return(buffer); }
796 // Writes one byte from the stream by calling Write(byte[], int, int).
797 // This implementation does not perform well because it allocates a new
798 // byte[] each time you call it, and should be overridden by any
799 // subclass that maintains an internal buffer. Then, it can help perf
800 // significantly for people who are writing one byte at a time.
801 public virtual void WriteByte(byte value)
803 byte[] oneByteArray = new byte[1];
804 oneByteArray[0] = value;
805 Write(oneByteArray, 0, 1);
808 public static Stream Synchronized(Stream stream)
811 throw new ArgumentNullException(nameof(stream));
812 if (stream is SyncStream)
815 return new SyncStream(stream);
818 [Obsolete("Do not call or override this method.")]
819 protected virtual void ObjectInvariant()
823 internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
825 // To avoid a race with a stream's position pointer & generating conditions
826 // with internal buffer indexes in our own streams that
827 // don't natively support async IO operations when there are multiple
828 // async requests outstanding, we will block the application's main
829 // thread and do the IO synchronously.
830 // This can't perform well - use a different approach.
831 SynchronousAsyncResult asyncResult;
834 int numRead = Read(buffer, offset, count);
835 asyncResult = new SynchronousAsyncResult(numRead, state);
837 catch (IOException ex)
839 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
842 if (callback != null)
844 callback(asyncResult);
850 internal static int BlockingEndRead(IAsyncResult asyncResult)
852 return SynchronousAsyncResult.EndRead(asyncResult);
855 internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
857 // To avoid a race condition with a stream's position pointer & generating conditions
858 // with internal buffer indexes in our own streams that
859 // don't natively support async IO operations when there are multiple
860 // async requests outstanding, we will block the application's main
861 // thread and do the IO synchronously.
862 // This can't perform well - use a different approach.
863 SynchronousAsyncResult asyncResult;
866 Write(buffer, offset, count);
867 asyncResult = new SynchronousAsyncResult(state);
869 catch (IOException ex)
871 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
874 if (callback != null)
876 callback(asyncResult);
882 internal static void BlockingEndWrite(IAsyncResult asyncResult)
884 SynchronousAsyncResult.EndWrite(asyncResult);
887 private sealed class NullStream : Stream
889 internal NullStream() { }
891 public override bool CanRead
896 public override bool CanWrite
901 public override bool CanSeek
906 public override long Length
911 public override long Position
917 public override void CopyTo(Stream destination, int bufferSize)
919 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
921 // After we validate arguments this is a nop.
924 public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
926 // Validate arguments here for compat, since previously this method
927 // was inherited from Stream (which did check its arguments).
928 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
930 return cancellationToken.IsCancellationRequested ?
931 Task.FromCanceled(cancellationToken) :
935 protected override void Dispose(bool disposing)
937 // Do nothing - we don't want NullStream singleton (static) to be closable
940 public override void Flush()
944 public override Task FlushAsync(CancellationToken cancellationToken)
946 return cancellationToken.IsCancellationRequested ?
947 Task.FromCanceled(cancellationToken) :
951 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
953 if (!CanRead) throw Error.GetReadNotSupported();
955 return BlockingBeginRead(buffer, offset, count, callback, state);
958 public override int EndRead(IAsyncResult asyncResult)
960 if (asyncResult == null)
961 throw new ArgumentNullException(nameof(asyncResult));
963 return BlockingEndRead(asyncResult);
966 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
968 if (!CanWrite) throw Error.GetWriteNotSupported();
970 return BlockingBeginWrite(buffer, offset, count, callback, state);
973 public override void EndWrite(IAsyncResult asyncResult)
975 if (asyncResult == null)
976 throw new ArgumentNullException(nameof(asyncResult));
978 BlockingEndWrite(asyncResult);
981 public override int Read(byte[] buffer, int offset, int count)
986 public override int Read(Span<byte> destination)
991 public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
993 return AsyncTaskMethodBuilder<int>.s_defaultResultTask;
996 public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
998 return new ValueTask<int>(0);
1001 public override int ReadByte()
1006 public override void Write(byte[] buffer, int offset, int count)
1010 public override void Write(ReadOnlySpan<byte> source)
1014 public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
1016 return cancellationToken.IsCancellationRequested ?
1017 Task.FromCanceled(cancellationToken) :
1021 public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
1023 return cancellationToken.IsCancellationRequested ?
1024 Task.FromCanceled(cancellationToken) :
1028 public override void WriteByte(byte value)
1032 public override long Seek(long offset, SeekOrigin origin)
1037 public override void SetLength(long length)
1043 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
1044 internal sealed class SynchronousAsyncResult : IAsyncResult
1046 private readonly Object _stateObject;
1047 private readonly bool _isWrite;
1048 private ManualResetEvent _waitHandle;
1049 private ExceptionDispatchInfo _exceptionInfo;
1051 private bool _endXxxCalled;
1052 private Int32 _bytesRead;
1054 internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject)
1056 _bytesRead = bytesRead;
1057 _stateObject = asyncStateObject;
1061 internal SynchronousAsyncResult(Object asyncStateObject)
1063 _stateObject = asyncStateObject;
1067 internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite)
1069 _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
1070 _stateObject = asyncStateObject;
1074 public bool IsCompleted
1076 // We never hand out objects of this type to the user before the synchronous IO completed:
1077 get { return true; }
1080 public WaitHandle AsyncWaitHandle
1084 return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
1088 public Object AsyncState
1090 get { return _stateObject; }
1093 public bool CompletedSynchronously
1095 get { return true; }
1098 internal void ThrowIfError()
1100 if (_exceptionInfo != null)
1101 _exceptionInfo.Throw();
1104 internal static Int32 EndRead(IAsyncResult asyncResult)
1106 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1107 if (ar == null || ar._isWrite)
1108 throw new ArgumentException(SR.Arg_WrongAsyncResult);
1110 if (ar._endXxxCalled)
1111 throw new ArgumentException(SR.InvalidOperation_EndReadCalledMultiple);
1113 ar._endXxxCalled = true;
1116 return ar._bytesRead;
1119 internal static void EndWrite(IAsyncResult asyncResult)
1121 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1122 if (ar == null || !ar._isWrite)
1123 throw new ArgumentException(SR.Arg_WrongAsyncResult);
1125 if (ar._endXxxCalled)
1126 throw new ArgumentException(SR.InvalidOperation_EndWriteCalledMultiple);
1128 ar._endXxxCalled = true;
1132 } // class SynchronousAsyncResult
1135 // SyncStream is a wrapper around a stream that takes
1136 // a lock for every operation making it thread safe.
1137 internal sealed class SyncStream : Stream, IDisposable
1139 private Stream _stream;
1141 internal SyncStream(Stream stream)
1144 throw new ArgumentNullException(nameof(stream));
1148 public override bool CanRead
1150 get { return _stream.CanRead; }
1153 public override bool CanWrite
1155 get { return _stream.CanWrite; }
1158 public override bool CanSeek
1160 get { return _stream.CanSeek; }
1163 public override bool CanTimeout
1167 return _stream.CanTimeout;
1171 public override long Length
1177 return _stream.Length;
1182 public override long Position
1188 return _stream.Position;
1195 _stream.Position = value;
1200 public override int ReadTimeout
1204 return _stream.ReadTimeout;
1208 _stream.ReadTimeout = value;
1212 public override int WriteTimeout
1216 return _stream.WriteTimeout;
1220 _stream.WriteTimeout = value;
1224 // In the off chance that some wrapped stream has different
1225 // semantics for Close vs. Dispose, let's preserve that.
1226 public override void Close()
1241 protected override void Dispose(bool disposing)
1247 // Explicitly pick up a potentially methodimpl'ed Dispose
1249 ((IDisposable)_stream).Dispose();
1253 base.Dispose(disposing);
1258 public override void Flush()
1264 public override int Read(byte[] bytes, int offset, int count)
1267 return _stream.Read(bytes, offset, count);
1270 public override int Read(Span<byte> destination)
1273 return _stream.Read(destination);
1276 public override int ReadByte()
1279 return _stream.ReadByte();
1282 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1284 bool overridesBeginRead = _stream.HasOverriddenBeginEndRead();
1288 // If the Stream does have its own BeginRead implementation, then we must use that override.
1289 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1290 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1291 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1292 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1293 // _stream due to this call blocked while holding the lock.
1294 return overridesBeginRead ?
1295 _stream.BeginRead(buffer, offset, count, callback, state) :
1296 _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
1300 public override int EndRead(IAsyncResult asyncResult)
1302 if (asyncResult == null)
1303 throw new ArgumentNullException(nameof(asyncResult));
1306 return _stream.EndRead(asyncResult);
1309 public override long Seek(long offset, SeekOrigin origin)
1312 return _stream.Seek(offset, origin);
1315 public override void SetLength(long length)
1318 _stream.SetLength(length);
1321 public override void Write(byte[] bytes, int offset, int count)
1324 _stream.Write(bytes, offset, count);
1327 public override void Write(ReadOnlySpan<byte> source)
1330 _stream.Write(source);
1333 public override void WriteByte(byte b)
1336 _stream.WriteByte(b);
1339 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1341 bool overridesBeginWrite = _stream.HasOverriddenBeginEndWrite();
1345 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1346 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1347 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1348 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1349 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1350 // _stream due to this call blocked while holding the lock.
1351 return overridesBeginWrite ?
1352 _stream.BeginWrite(buffer, offset, count, callback, state) :
1353 _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
1357 public override void EndWrite(IAsyncResult asyncResult)
1359 if (asyncResult == null)
1360 throw new ArgumentNullException(nameof(asyncResult));
1363 _stream.EndWrite(asyncResult);