d9ed08f737e01ebe5638b5445afac3de1235075e
[platform/upstream/coreclr.git] / src / mscorlib / src / System / IO / Stream.cs
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4
5 /*============================================================
6 **
7 ** 
8 ** 
9 **
10 **
11 ** Purpose: Abstract base class for all Streams.  Provides
12 ** default implementations of asynchronous reads & writes, in
13 ** terms of the synchronous reads & writes (and vice versa).
14 **
15 **
16 ===========================================================*/
17
18 using System;
19 using System.Buffers;
20 using System.Threading;
21 using System.Threading.Tasks;
22 using System.Runtime;
23 using System.Runtime.InteropServices;
24 using System.Runtime.CompilerServices;
25 using System.Runtime.ExceptionServices;
26 using System.Security;
27 using System.Diagnostics;
28 using System.Reflection;
29
30 namespace System.IO
31 {
32     public abstract class Stream : MarshalByRefObject, IDisposable
33     {
34         public static readonly Stream Null = new NullStream();
35
36         //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
37         // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
38         // improvement in Copy performance.
39         private const int _DefaultCopyBufferSize = 81920;
40
41         // To implement Async IO operations on streams that don't support async IO
42
43         private ReadWriteTask _activeReadWriteTask;
44         private SemaphoreSlim _asyncActiveSemaphore;
45
46         internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
47         {
48             // Lazily-initialize _asyncActiveSemaphore.  As we're never accessing the SemaphoreSlim's
49             // WaitHandle, we don't need to worry about Disposing it.
50             return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
51         }
52
53         public abstract bool CanRead
54         {
55             get;
56         }
57
58         // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
59         public abstract bool CanSeek
60         {
61             get;
62         }
63
64         public virtual bool CanTimeout
65         {
66             get
67             {
68                 return false;
69             }
70         }
71
72         public abstract bool CanWrite
73         {
74             get;
75         }
76
77         public abstract long Length
78         {
79             get;
80         }
81
82         public abstract long Position
83         {
84             get;
85             set;
86         }
87
88         public virtual int ReadTimeout
89         {
90             get
91             {
92                 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
93             }
94             set
95             {
96                 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
97             }
98         }
99
100         public virtual int WriteTimeout
101         {
102             get
103             {
104                 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
105             }
106             set
107             {
108                 throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
109             }
110         }
111
112         public Task CopyToAsync(Stream destination)
113         {
114             int bufferSize = GetCopyBufferSize();
115
116             return CopyToAsync(destination, bufferSize);
117         }
118
119         public Task CopyToAsync(Stream destination, Int32 bufferSize)
120         {
121             return CopyToAsync(destination, bufferSize, CancellationToken.None);
122         }
123
124         public Task CopyToAsync(Stream destination, CancellationToken cancellationToken)
125         {
126             int bufferSize = GetCopyBufferSize();
127
128             return CopyToAsync(destination, bufferSize, cancellationToken);
129         }
130
131         public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
132         {
133             StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
134
135             return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
136         }
137
138         private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
139         {
140             Debug.Assert(destination != null);
141             Debug.Assert(bufferSize > 0);
142             Debug.Assert(CanRead);
143             Debug.Assert(destination.CanWrite);
144
145             byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
146             bufferSize = 0; // reuse same field for high water mark to avoid needing another field in the state machine
147             try
148             {
149                 while (true)
150                 {
151                     int bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
152                     if (bytesRead == 0) break;
153                     if (bytesRead > bufferSize) bufferSize = bytesRead;
154                     await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
155                 }
156             }
157             finally
158             {
159                 Array.Clear(buffer, 0, bufferSize); // clear only the most we used
160                 ArrayPool<byte>.Shared.Return(buffer, clearArray: false);
161             }
162         }
163
164         // Reads the bytes from the current stream and writes the bytes to
165         // the destination stream until all bytes are read, starting at
166         // the current position.
167         public void CopyTo(Stream destination)
168         {
169             int bufferSize = GetCopyBufferSize();
170
171             CopyTo(destination, bufferSize);
172         }
173
174         public virtual void CopyTo(Stream destination, int bufferSize)
175         {
176             StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
177
178             byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
179             int highwaterMark = 0;
180             try
181             {
182                 int read;
183                 while ((read = Read(buffer, 0, buffer.Length)) != 0)
184                 {
185                     if (read > highwaterMark) highwaterMark = read;
186                     destination.Write(buffer, 0, read);
187                 }
188             }
189             finally
190             {
191                 Array.Clear(buffer, 0, highwaterMark); // clear only the most we used
192                 ArrayPool<byte>.Shared.Return(buffer, clearArray: false);
193             }
194         }
195
196         private int GetCopyBufferSize()
197         {
198             int bufferSize = _DefaultCopyBufferSize;
199
200             if (CanSeek)
201             {
202                 long length = Length;
203                 long position = Position;
204                 if (length <= position) // Handles negative overflows
205                 {
206                     // There are no bytes left in the stream to copy.
207                     // However, because CopyTo{Async} is virtual, we need to
208                     // ensure that any override is still invoked to provide its
209                     // own validation, so we use the smallest legal buffer size here.
210                     bufferSize = 1;
211                 }
212                 else
213                 {
214                     long remaining = length - position;
215                     if (remaining > 0)
216                     {
217                         // In the case of a positive overflow, stick to the default size
218                         bufferSize = (int)Math.Min(bufferSize, remaining);
219                     }
220                 }
221             }
222
223             return bufferSize;
224         }
225
226         // Stream used to require that all cleanup logic went into Close(),
227         // which was thought up before we invented IDisposable.  However, we
228         // need to follow the IDisposable pattern so that users can write 
229         // sensible subclasses without needing to inspect all their base 
230         // classes, and without worrying about version brittleness, from a
231         // base class switching to the Dispose pattern.  We're moving
232         // Stream to the Dispose(bool) pattern - that's where all subclasses 
233         // should put their cleanup starting in V2.
234         public virtual void Close()
235         {
236             // Ideally we would assert CanRead == CanWrite == CanSeek = false, 
237             // but we'd have to fix PipeStream & NetworkStream very carefully.
238
239             Dispose(true);
240             GC.SuppressFinalize(this);
241         }
242
243         public void Dispose()
244         {
245             // Ideally we would assert CanRead == CanWrite == CanSeek = false, 
246             // but we'd have to fix PipeStream & NetworkStream very carefully.
247
248             Close();
249         }
250
251
252         protected virtual void Dispose(bool disposing)
253         {
254             // Note: Never change this to call other virtual methods on Stream
255             // like Write, since the state on subclasses has already been 
256             // torn down.  This is the last code to run on cleanup for a stream.
257         }
258
259         public abstract void Flush();
260
261         public Task FlushAsync()
262         {
263             return FlushAsync(CancellationToken.None);
264         }
265
266         public virtual Task FlushAsync(CancellationToken cancellationToken)
267         {
268             return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
269                 cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
270         }
271
272         [Obsolete("CreateWaitHandle will be removed eventually.  Please use \"new ManualResetEvent(false)\" instead.")]
273         protected virtual WaitHandle CreateWaitHandle()
274         {
275             return new ManualResetEvent(false);
276         }
277
278         public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
279         {
280             return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
281         }
282
283         internal IAsyncResult BeginReadInternal(
284             byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
285             bool serializeAsynchronously, bool apm)
286         {
287             if (!CanRead) throw Error.GetReadNotSupported();
288
289             // To avoid a race with a stream's position pointer & generating race conditions 
290             // with internal buffer indexes in our own streams that 
291             // don't natively support async IO operations when there are multiple 
292             // async requests outstanding, we will block the application's main
293             // thread if it does a second IO request until the first one completes.
294             var semaphore = EnsureAsyncActiveSemaphoreInitialized();
295             Task semaphoreTask = null;
296             if (serializeAsynchronously)
297             {
298                 semaphoreTask = semaphore.WaitAsync();
299             }
300             else
301             {
302                 semaphore.Wait();
303             }
304
305             // Create the task to asynchronously do a Read.  This task serves both
306             // as the asynchronous work item and as the IAsyncResult returned to the user.
307             var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
308             {
309                 // The ReadWriteTask stores all of the parameters to pass to Read.
310                 // As we're currently inside of it, we can get the current task
311                 // and grab the parameters from it.
312                 var thisTask = Task.InternalCurrent as ReadWriteTask;
313                 Debug.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
314
315                 try
316                 {
317                     // Do the Read and return the number of bytes read
318                     return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
319                 }
320                 finally
321                 {
322                     // If this implementation is part of Begin/EndXx, then the EndXx method will handle
323                     // finishing the async operation.  However, if this is part of XxAsync, then there won't
324                     // be an end method, and this task is responsible for cleaning up.
325                     if (!thisTask._apm)
326                     {
327                         thisTask._stream.FinishTrackingAsyncOperation();
328                     }
329
330                     thisTask.ClearBeginState(); // just to help alleviate some memory pressure
331                 }
332             }, state, this, buffer, offset, count, callback);
333
334             // Schedule it
335             if (semaphoreTask != null)
336                 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
337             else
338                 RunReadWriteTask(asyncResult);
339
340
341             return asyncResult; // return it
342         }
343
344         public virtual int EndRead(IAsyncResult asyncResult)
345         {
346             if (asyncResult == null)
347                 throw new ArgumentNullException(nameof(asyncResult));
348
349             var readTask = _activeReadWriteTask;
350
351             if (readTask == null)
352             {
353                 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
354             }
355             else if (readTask != asyncResult)
356             {
357                 throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
358             }
359             else if (!readTask._isRead)
360             {
361                 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
362             }
363
364             try
365             {
366                 return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
367             }
368             finally
369             {
370                 FinishTrackingAsyncOperation();
371             }
372         }
373
374         public Task<int> ReadAsync(Byte[] buffer, int offset, int count)
375         {
376             return ReadAsync(buffer, offset, count, CancellationToken.None);
377         }
378
379         public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
380         {
381             // If cancellation was requested, bail early with an already completed task.
382             // Otherwise, return a task that represents the Begin/End methods.
383             return cancellationToken.IsCancellationRequested
384                         ? Task.FromCanceled<int>(cancellationToken)
385                         : BeginEndReadAsync(buffer, offset, count);
386         }
387
388         public virtual ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
389         {
390             if (destination.TryGetArray(out ArraySegment<byte> array))
391             {
392                 return new ValueTask<int>(ReadAsync(array.Array, array.Offset, array.Count, cancellationToken));
393             }
394             else
395             {
396                 byte[] buffer = ArrayPool<byte>.Shared.Rent(destination.Length);
397                 return FinishReadAsync(ReadAsync(buffer, 0, destination.Length, cancellationToken), buffer, destination);
398
399                 async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
400                 {
401                     try
402                     {
403                         int result = await readTask.ConfigureAwait(false);
404                         new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
405                         return result;
406                     }
407                     finally
408                     {
409                         ArrayPool<byte>.Shared.Return(localBuffer);
410                     }
411                 }
412             }
413         }
414
415         [MethodImplAttribute(MethodImplOptions.InternalCall)]
416         private extern bool HasOverriddenBeginEndRead();
417
418         private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
419         {
420             if (!HasOverriddenBeginEndRead())
421             {
422                 // If the Stream does not override Begin/EndRead, then we can take an optimized path
423                 // that skips an extra layer of tasks / IAsyncResults.
424                 return (Task<Int32>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
425             }
426
427             // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
428             return TaskFactory<Int32>.FromAsyncTrim(
429                         this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
430                         (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
431                         (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
432         }
433
434         private struct ReadWriteParameters // struct for arguments to Read and Write calls
435         {
436             internal byte[] Buffer;
437             internal int Offset;
438             internal int Count;
439         }
440
441
442
443         public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
444         {
445             return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
446         }
447
448         internal IAsyncResult BeginWriteInternal(
449             byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
450             bool serializeAsynchronously, bool apm)
451         {
452             if (!CanWrite) throw Error.GetWriteNotSupported();
453
454             // To avoid a race condition with a stream's position pointer & generating conditions 
455             // with internal buffer indexes in our own streams that 
456             // don't natively support async IO operations when there are multiple 
457             // async requests outstanding, we will block the application's main
458             // thread if it does a second IO request until the first one completes.
459             var semaphore = EnsureAsyncActiveSemaphoreInitialized();
460             Task semaphoreTask = null;
461             if (serializeAsynchronously)
462             {
463                 semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
464             }
465             else
466             {
467                 semaphore.Wait(); // synchronously wait here
468             }
469
470             // Create the task to asynchronously do a Write.  This task serves both
471             // as the asynchronous work item and as the IAsyncResult returned to the user.
472             var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
473             {
474                 // The ReadWriteTask stores all of the parameters to pass to Write.
475                 // As we're currently inside of it, we can get the current task
476                 // and grab the parameters from it.
477                 var thisTask = Task.InternalCurrent as ReadWriteTask;
478                 Debug.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
479
480                 try
481                 {
482                     // Do the Write
483                     thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
484                     return 0; // not used, but signature requires a value be returned
485                 }
486                 finally
487                 {
488                     // If this implementation is part of Begin/EndXx, then the EndXx method will handle
489                     // finishing the async operation.  However, if this is part of XxAsync, then there won't
490                     // be an end method, and this task is responsible for cleaning up.
491                     if (!thisTask._apm)
492                     {
493                         thisTask._stream.FinishTrackingAsyncOperation();
494                     }
495
496                     thisTask.ClearBeginState(); // just to help alleviate some memory pressure
497                 }
498             }, state, this, buffer, offset, count, callback);
499
500             // Schedule it
501             if (semaphoreTask != null)
502                 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
503             else
504                 RunReadWriteTask(asyncResult);
505
506             return asyncResult; // return it
507         }
508
509         private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
510         {
511             Debug.Assert(readWriteTask != null);
512             Debug.Assert(asyncWaiter != null);
513
514             // If the wait has already completed, run the task.
515             if (asyncWaiter.IsCompleted)
516             {
517                 Debug.Assert(asyncWaiter.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
518                 RunReadWriteTask(readWriteTask);
519             }
520             else  // Otherwise, wait for our turn, and then run the task.
521             {
522                 asyncWaiter.ContinueWith((t, state) =>
523                 {
524                     Debug.Assert(t.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
525                     var rwt = (ReadWriteTask)state;
526                     rwt._stream.RunReadWriteTask(rwt); // RunReadWriteTask(readWriteTask);
527                 }, readWriteTask, default(CancellationToken), TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
528             }
529         }
530
531         private void RunReadWriteTask(ReadWriteTask readWriteTask)
532         {
533             Debug.Assert(readWriteTask != null);
534             Debug.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
535
536             // Schedule the task.  ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
537             // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
538             // two interlocked operations.  However, if ReadWriteTask is ever changed to use
539             // a cancellation token, this should be changed to use Start.
540             _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
541             readWriteTask.m_taskScheduler = TaskScheduler.Default;
542             readWriteTask.ScheduleAndStart(needsProtection: false);
543         }
544
545         private void FinishTrackingAsyncOperation()
546         {
547             _activeReadWriteTask = null;
548             Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
549             _asyncActiveSemaphore.Release();
550         }
551
552         public virtual void EndWrite(IAsyncResult asyncResult)
553         {
554             if (asyncResult == null)
555                 throw new ArgumentNullException(nameof(asyncResult));
556
557             var writeTask = _activeReadWriteTask;
558             if (writeTask == null)
559             {
560                 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
561             }
562             else if (writeTask != asyncResult)
563             {
564                 throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
565             }
566             else if (writeTask._isRead)
567             {
568                 throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
569             }
570
571             try
572             {
573                 writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
574                 Debug.Assert(writeTask.Status == TaskStatus.RanToCompletion);
575             }
576             finally
577             {
578                 FinishTrackingAsyncOperation();
579             }
580         }
581
582         // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
583         // A single instance of this task serves four purposes:
584         // 1. The work item scheduled to run the Read / Write operation
585         // 2. The state holding the arguments to be passed to Read / Write
586         // 3. The IAsyncResult returned from BeginRead / BeginWrite
587         // 4. The completion action that runs to invoke the user-provided callback.
588         // This last item is a bit tricky.  Before the AsyncCallback is invoked, the
589         // IAsyncResult must have completed, so we can't just invoke the handler
590         // from within the task, since it is the IAsyncResult, and thus it's not
591         // yet completed.  Instead, we use AddCompletionAction to install this
592         // task as its own completion handler.  That saves the need to allocate
593         // a separate completion handler, it guarantees that the task will
594         // have completed by the time the handler is invoked, and it allows
595         // the handler to be invoked synchronously upon the completion of the
596         // task.  This all enables BeginRead / BeginWrite to be implemented
597         // with a single allocation.
598         private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
599         {
600             internal readonly bool _isRead;
601             internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
602             internal Stream _stream;
603             internal byte[] _buffer;
604             internal readonly int _offset;
605             internal readonly int _count;
606             private AsyncCallback _callback;
607             private ExecutionContext _context;
608
609             internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
610             {
611                 _stream = null;
612                 _buffer = null;
613             }
614
615             public ReadWriteTask(
616                 bool isRead,
617                 bool apm,
618                 Func<object, int> function, object state,
619                 Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
620                 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
621             {
622                 Debug.Assert(function != null);
623                 Debug.Assert(stream != null);
624                 Debug.Assert(buffer != null);
625
626                 // Store the arguments
627                 _isRead = isRead;
628                 _apm = apm;
629                 _stream = stream;
630                 _buffer = buffer;
631                 _offset = offset;
632                 _count = count;
633
634                 // If a callback was provided, we need to:
635                 // - Store the user-provided handler
636                 // - Capture an ExecutionContext under which to invoke the handler
637                 // - Add this task as its own completion handler so that the Invoke method
638                 //   will run the callback when this task completes.
639                 if (callback != null)
640                 {
641                     _callback = callback;
642                     _context = ExecutionContext.Capture();
643                     base.AddCompletionAction(this);
644                 }
645             }
646
647             private static void InvokeAsyncCallback(object completedTask)
648             {
649                 var rwc = (ReadWriteTask)completedTask;
650                 var callback = rwc._callback;
651                 rwc._callback = null;
652                 callback(rwc);
653             }
654
655             private static ContextCallback s_invokeAsyncCallback;
656
657             void ITaskCompletionAction.Invoke(Task completingTask)
658             {
659                 // Get the ExecutionContext.  If there is none, just run the callback
660                 // directly, passing in the completed task as the IAsyncResult.
661                 // If there is one, process it with ExecutionContext.Run.
662                 var context = _context;
663                 if (context == null)
664                 {
665                     var callback = _callback;
666                     _callback = null;
667                     callback(completingTask);
668                 }
669                 else
670                 {
671                     _context = null;
672
673                     var invokeAsyncCallback = s_invokeAsyncCallback;
674                     if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign race condition
675
676                     ExecutionContext.Run(context, invokeAsyncCallback, this);
677                 }
678             }
679
680             bool ITaskCompletionAction.InvokeMayRunArbitraryCode { get { return true; } }
681         }
682
683         public Task WriteAsync(Byte[] buffer, int offset, int count)
684         {
685             return WriteAsync(buffer, offset, count, CancellationToken.None);
686         }
687
688         public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
689         {
690             // If cancellation was requested, bail early with an already completed task.
691             // Otherwise, return a task that represents the Begin/End methods.
692             return cancellationToken.IsCancellationRequested
693                         ? Task.FromCanceled(cancellationToken)
694                         : BeginEndWriteAsync(buffer, offset, count);
695         }
696
697         public virtual Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
698         {
699             if (MemoryMarshal.TryGetArray(source, out ArraySegment<byte> array))
700             {
701                 return WriteAsync(array.Array, array.Offset, array.Count, cancellationToken);
702             }
703             else
704             {
705                 byte[] buffer = ArrayPool<byte>.Shared.Rent(source.Length);
706                 source.Span.CopyTo(buffer);
707                 return FinishWriteAsync(WriteAsync(buffer, 0, source.Length, cancellationToken), buffer);
708
709                 async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
710                 {
711                     try
712                     {
713                         await writeTask.ConfigureAwait(false);
714                     }
715                     finally
716                     {
717                         ArrayPool<byte>.Shared.Return(localBuffer);
718                     }
719                 }
720             }
721         }
722
723         [MethodImplAttribute(MethodImplOptions.InternalCall)]
724         private extern bool HasOverriddenBeginEndWrite();
725
726         private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
727         {
728             if (!HasOverriddenBeginEndWrite())
729             {
730                 // If the Stream does not override Begin/EndWrite, then we can take an optimized path
731                 // that skips an extra layer of tasks / IAsyncResults.
732                 return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
733             }
734
735             // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
736             return TaskFactory<VoidTaskResult>.FromAsyncTrim(
737                         this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
738                         (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
739                         (stream, asyncResult) => // cached by compiler
740                         {
741                             stream.EndWrite(asyncResult);
742                             return default(VoidTaskResult);
743                         });
744         }
745
746         public abstract long Seek(long offset, SeekOrigin origin);
747
748         public abstract void SetLength(long value);
749
750         public abstract int Read(byte[] buffer, int offset, int count);
751
752         public virtual int Read(Span<byte> destination)
753         {
754             byte[] buffer = ArrayPool<byte>.Shared.Rent(destination.Length);
755             try
756             {
757                 int numRead = Read(buffer, 0, destination.Length);
758                 if ((uint)numRead > destination.Length)
759                 {
760                     throw new IOException(SR.IO_StreamTooLong);
761                 }
762                 new Span<byte>(buffer, 0, numRead).CopyTo(destination);
763                 return numRead;
764             }
765             finally { ArrayPool<byte>.Shared.Return(buffer); }
766         }
767
768         // Reads one byte from the stream by calling Read(byte[], int, int). 
769         // Will return an unsigned byte cast to an int or -1 on end of stream.
770         // This implementation does not perform well because it allocates a new
771         // byte[] each time you call it, and should be overridden by any 
772         // subclass that maintains an internal buffer.  Then, it can help perf
773         // significantly for people who are reading one byte at a time.
774         public virtual int ReadByte()
775         {
776             byte[] oneByteArray = new byte[1];
777             int r = Read(oneByteArray, 0, 1);
778             if (r == 0)
779                 return -1;
780             return oneByteArray[0];
781         }
782
783         public abstract void Write(byte[] buffer, int offset, int count);
784
785         public virtual void Write(ReadOnlySpan<byte> source)
786         {
787             byte[] buffer = ArrayPool<byte>.Shared.Rent(source.Length);
788             try
789             {
790                 source.CopyTo(buffer);
791                 Write(buffer, 0, source.Length);
792             }
793             finally { ArrayPool<byte>.Shared.Return(buffer); }
794         }
795
796         // Writes one byte from the stream by calling Write(byte[], int, int).
797         // This implementation does not perform well because it allocates a new
798         // byte[] each time you call it, and should be overridden by any 
799         // subclass that maintains an internal buffer.  Then, it can help perf
800         // significantly for people who are writing one byte at a time.
801         public virtual void WriteByte(byte value)
802         {
803             byte[] oneByteArray = new byte[1];
804             oneByteArray[0] = value;
805             Write(oneByteArray, 0, 1);
806         }
807
808         public static Stream Synchronized(Stream stream)
809         {
810             if (stream == null)
811                 throw new ArgumentNullException(nameof(stream));
812             if (stream is SyncStream)
813                 return stream;
814
815             return new SyncStream(stream);
816         }
817
818         [Obsolete("Do not call or override this method.")]
819         protected virtual void ObjectInvariant()
820         {
821         }
822
823         internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
824         {
825             // To avoid a race with a stream's position pointer & generating conditions
826             // with internal buffer indexes in our own streams that 
827             // don't natively support async IO operations when there are multiple 
828             // async requests outstanding, we will block the application's main
829             // thread and do the IO synchronously.  
830             // This can't perform well - use a different approach.
831             SynchronousAsyncResult asyncResult;
832             try
833             {
834                 int numRead = Read(buffer, offset, count);
835                 asyncResult = new SynchronousAsyncResult(numRead, state);
836             }
837             catch (IOException ex)
838             {
839                 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
840             }
841
842             if (callback != null)
843             {
844                 callback(asyncResult);
845             }
846
847             return asyncResult;
848         }
849
850         internal static int BlockingEndRead(IAsyncResult asyncResult)
851         {
852             return SynchronousAsyncResult.EndRead(asyncResult);
853         }
854
855         internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
856         {
857             // To avoid a race condition with a stream's position pointer & generating conditions 
858             // with internal buffer indexes in our own streams that 
859             // don't natively support async IO operations when there are multiple 
860             // async requests outstanding, we will block the application's main
861             // thread and do the IO synchronously.  
862             // This can't perform well - use a different approach.
863             SynchronousAsyncResult asyncResult;
864             try
865             {
866                 Write(buffer, offset, count);
867                 asyncResult = new SynchronousAsyncResult(state);
868             }
869             catch (IOException ex)
870             {
871                 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
872             }
873
874             if (callback != null)
875             {
876                 callback(asyncResult);
877             }
878
879             return asyncResult;
880         }
881
882         internal static void BlockingEndWrite(IAsyncResult asyncResult)
883         {
884             SynchronousAsyncResult.EndWrite(asyncResult);
885         }
886
887         private sealed class NullStream : Stream
888         {
889             internal NullStream() { }
890
891             public override bool CanRead
892             {
893                 get { return true; }
894             }
895
896             public override bool CanWrite
897             {
898                 get { return true; }
899             }
900
901             public override bool CanSeek
902             {
903                 get { return true; }
904             }
905
906             public override long Length
907             {
908                 get { return 0; }
909             }
910
911             public override long Position
912             {
913                 get { return 0; }
914                 set { }
915             }
916
917             public override void CopyTo(Stream destination, int bufferSize)
918             {
919                 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
920
921                 // After we validate arguments this is a nop.
922             }
923
924             public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
925             {
926                 // Validate arguments here for compat, since previously this method
927                 // was inherited from Stream (which did check its arguments).
928                 StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
929
930                 return cancellationToken.IsCancellationRequested ?
931                     Task.FromCanceled(cancellationToken) :
932                     Task.CompletedTask;
933             }
934
935             protected override void Dispose(bool disposing)
936             {
937                 // Do nothing - we don't want NullStream singleton (static) to be closable
938             }
939
940             public override void Flush()
941             {
942             }
943
944             public override Task FlushAsync(CancellationToken cancellationToken)
945             {
946                 return cancellationToken.IsCancellationRequested ?
947                     Task.FromCanceled(cancellationToken) :
948                     Task.CompletedTask;
949             }
950
951             public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
952             {
953                 if (!CanRead) throw Error.GetReadNotSupported();
954
955                 return BlockingBeginRead(buffer, offset, count, callback, state);
956             }
957
958             public override int EndRead(IAsyncResult asyncResult)
959             {
960                 if (asyncResult == null)
961                     throw new ArgumentNullException(nameof(asyncResult));
962
963                 return BlockingEndRead(asyncResult);
964             }
965
966             public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
967             {
968                 if (!CanWrite) throw Error.GetWriteNotSupported();
969
970                 return BlockingBeginWrite(buffer, offset, count, callback, state);
971             }
972
973             public override void EndWrite(IAsyncResult asyncResult)
974             {
975                 if (asyncResult == null)
976                     throw new ArgumentNullException(nameof(asyncResult));
977
978                 BlockingEndWrite(asyncResult);
979             }
980
981             public override int Read(byte[] buffer, int offset, int count)
982             {
983                 return 0;
984             }
985
986             public override int Read(Span<byte> destination)
987             {
988                 return 0;
989             }
990
991             public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
992             {
993                 return AsyncTaskMethodBuilder<int>.s_defaultResultTask;
994             }
995
996             public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
997             {
998                 return new ValueTask<int>(0);
999             }
1000
1001             public override int ReadByte()
1002             {
1003                 return -1;
1004             }
1005
1006             public override void Write(byte[] buffer, int offset, int count)
1007             {
1008             }
1009
1010             public override void Write(ReadOnlySpan<byte> source)
1011             {
1012             }
1013
1014             public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
1015             {
1016                 return cancellationToken.IsCancellationRequested ?
1017                     Task.FromCanceled(cancellationToken) :
1018                     Task.CompletedTask;
1019             }
1020
1021             public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
1022             {
1023                 return cancellationToken.IsCancellationRequested ?
1024                     Task.FromCanceled(cancellationToken) :
1025                     Task.CompletedTask;
1026             }
1027
1028             public override void WriteByte(byte value)
1029             {
1030             }
1031
1032             public override long Seek(long offset, SeekOrigin origin)
1033             {
1034                 return 0;
1035             }
1036
1037             public override void SetLength(long length)
1038             {
1039             }
1040         }
1041
1042
1043         /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
1044         internal sealed class SynchronousAsyncResult : IAsyncResult
1045         {
1046             private readonly Object _stateObject;
1047             private readonly bool _isWrite;
1048             private ManualResetEvent _waitHandle;
1049             private ExceptionDispatchInfo _exceptionInfo;
1050
1051             private bool _endXxxCalled;
1052             private Int32 _bytesRead;
1053
1054             internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject)
1055             {
1056                 _bytesRead = bytesRead;
1057                 _stateObject = asyncStateObject;
1058                 //_isWrite = false;
1059             }
1060
1061             internal SynchronousAsyncResult(Object asyncStateObject)
1062             {
1063                 _stateObject = asyncStateObject;
1064                 _isWrite = true;
1065             }
1066
1067             internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite)
1068             {
1069                 _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
1070                 _stateObject = asyncStateObject;
1071                 _isWrite = isWrite;
1072             }
1073
1074             public bool IsCompleted
1075             {
1076                 // We never hand out objects of this type to the user before the synchronous IO completed:
1077                 get { return true; }
1078             }
1079
1080             public WaitHandle AsyncWaitHandle
1081             {
1082                 get
1083                 {
1084                     return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
1085                 }
1086             }
1087
1088             public Object AsyncState
1089             {
1090                 get { return _stateObject; }
1091             }
1092
1093             public bool CompletedSynchronously
1094             {
1095                 get { return true; }
1096             }
1097
1098             internal void ThrowIfError()
1099             {
1100                 if (_exceptionInfo != null)
1101                     _exceptionInfo.Throw();
1102             }
1103
1104             internal static Int32 EndRead(IAsyncResult asyncResult)
1105             {
1106                 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1107                 if (ar == null || ar._isWrite)
1108                     throw new ArgumentException(SR.Arg_WrongAsyncResult);
1109
1110                 if (ar._endXxxCalled)
1111                     throw new ArgumentException(SR.InvalidOperation_EndReadCalledMultiple);
1112
1113                 ar._endXxxCalled = true;
1114
1115                 ar.ThrowIfError();
1116                 return ar._bytesRead;
1117             }
1118
1119             internal static void EndWrite(IAsyncResult asyncResult)
1120             {
1121                 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1122                 if (ar == null || !ar._isWrite)
1123                     throw new ArgumentException(SR.Arg_WrongAsyncResult);
1124
1125                 if (ar._endXxxCalled)
1126                     throw new ArgumentException(SR.InvalidOperation_EndWriteCalledMultiple);
1127
1128                 ar._endXxxCalled = true;
1129
1130                 ar.ThrowIfError();
1131             }
1132         }   // class SynchronousAsyncResult
1133
1134
1135         // SyncStream is a wrapper around a stream that takes 
1136         // a lock for every operation making it thread safe.
1137         internal sealed class SyncStream : Stream, IDisposable
1138         {
1139             private Stream _stream;
1140
1141             internal SyncStream(Stream stream)
1142             {
1143                 if (stream == null)
1144                     throw new ArgumentNullException(nameof(stream));
1145                 _stream = stream;
1146             }
1147
1148             public override bool CanRead
1149             {
1150                 get { return _stream.CanRead; }
1151             }
1152
1153             public override bool CanWrite
1154             {
1155                 get { return _stream.CanWrite; }
1156             }
1157
1158             public override bool CanSeek
1159             {
1160                 get { return _stream.CanSeek; }
1161             }
1162
1163             public override bool CanTimeout
1164             {
1165                 get
1166                 {
1167                     return _stream.CanTimeout;
1168                 }
1169             }
1170
1171             public override long Length
1172             {
1173                 get
1174                 {
1175                     lock (_stream)
1176                     {
1177                         return _stream.Length;
1178                     }
1179                 }
1180             }
1181
1182             public override long Position
1183             {
1184                 get
1185                 {
1186                     lock (_stream)
1187                     {
1188                         return _stream.Position;
1189                     }
1190                 }
1191                 set
1192                 {
1193                     lock (_stream)
1194                     {
1195                         _stream.Position = value;
1196                     }
1197                 }
1198             }
1199
1200             public override int ReadTimeout
1201             {
1202                 get
1203                 {
1204                     return _stream.ReadTimeout;
1205                 }
1206                 set
1207                 {
1208                     _stream.ReadTimeout = value;
1209                 }
1210             }
1211
1212             public override int WriteTimeout
1213             {
1214                 get
1215                 {
1216                     return _stream.WriteTimeout;
1217                 }
1218                 set
1219                 {
1220                     _stream.WriteTimeout = value;
1221                 }
1222             }
1223
1224             // In the off chance that some wrapped stream has different 
1225             // semantics for Close vs. Dispose, let's preserve that.
1226             public override void Close()
1227             {
1228                 lock (_stream)
1229                 {
1230                     try
1231                     {
1232                         _stream.Close();
1233                     }
1234                     finally
1235                     {
1236                         base.Dispose(true);
1237                     }
1238                 }
1239             }
1240
1241             protected override void Dispose(bool disposing)
1242             {
1243                 lock (_stream)
1244                 {
1245                     try
1246                     {
1247                         // Explicitly pick up a potentially methodimpl'ed Dispose
1248                         if (disposing)
1249                             ((IDisposable)_stream).Dispose();
1250                     }
1251                     finally
1252                     {
1253                         base.Dispose(disposing);
1254                     }
1255                 }
1256             }
1257
1258             public override void Flush()
1259             {
1260                 lock (_stream)
1261                     _stream.Flush();
1262             }
1263
1264             public override int Read(byte[] bytes, int offset, int count)
1265             {
1266                 lock (_stream)
1267                     return _stream.Read(bytes, offset, count);
1268             }
1269
1270             public override int Read(Span<byte> destination)
1271             {
1272                 lock (_stream)
1273                     return _stream.Read(destination);
1274             }
1275
1276             public override int ReadByte()
1277             {
1278                 lock (_stream)
1279                     return _stream.ReadByte();
1280             }
1281
1282             public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1283             {
1284                 bool overridesBeginRead = _stream.HasOverriddenBeginEndRead();
1285
1286                 lock (_stream)
1287                 {
1288                     // If the Stream does have its own BeginRead implementation, then we must use that override.
1289                     // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1290                     // which ensures only one asynchronous operation does so with an asynchronous wait rather
1291                     // than a synchronous wait.  A synchronous wait will result in a deadlock condition, because
1292                     // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1293                     // _stream due to this call blocked while holding the lock.
1294                     return overridesBeginRead ?
1295                         _stream.BeginRead(buffer, offset, count, callback, state) :
1296                         _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
1297                 }
1298             }
1299
1300             public override int EndRead(IAsyncResult asyncResult)
1301             {
1302                 if (asyncResult == null)
1303                     throw new ArgumentNullException(nameof(asyncResult));
1304
1305                 lock (_stream)
1306                     return _stream.EndRead(asyncResult);
1307             }
1308
1309             public override long Seek(long offset, SeekOrigin origin)
1310             {
1311                 lock (_stream)
1312                     return _stream.Seek(offset, origin);
1313             }
1314
1315             public override void SetLength(long length)
1316             {
1317                 lock (_stream)
1318                     _stream.SetLength(length);
1319             }
1320
1321             public override void Write(byte[] bytes, int offset, int count)
1322             {
1323                 lock (_stream)
1324                     _stream.Write(bytes, offset, count);
1325             }
1326
1327             public override void Write(ReadOnlySpan<byte> source)
1328             {
1329                 lock (_stream)
1330                     _stream.Write(source);
1331             }
1332
1333             public override void WriteByte(byte b)
1334             {
1335                 lock (_stream)
1336                     _stream.WriteByte(b);
1337             }
1338
1339             public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1340             {
1341                 bool overridesBeginWrite = _stream.HasOverriddenBeginEndWrite();
1342
1343                 lock (_stream)
1344                 {
1345                     // If the Stream does have its own BeginWrite implementation, then we must use that override.
1346                     // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1347                     // which ensures only one asynchronous operation does so with an asynchronous wait rather
1348                     // than a synchronous wait.  A synchronous wait will result in a deadlock condition, because
1349                     // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1350                     // _stream due to this call blocked while holding the lock.
1351                     return overridesBeginWrite ?
1352                         _stream.BeginWrite(buffer, offset, count, callback, state) :
1353                         _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
1354                 }
1355             }
1356
1357             public override void EndWrite(IAsyncResult asyncResult)
1358             {
1359                 if (asyncResult == null)
1360                     throw new ArgumentNullException(nameof(asyncResult));
1361
1362                 lock (_stream)
1363                     _stream.EndWrite(asyncResult);
1364             }
1365         }
1366     }
1367 }