return new ManualResetEvent(false);
}
- [HostProtection(ExternalThreading=true)]
+ internal virtual bool OverridesBeginEnd
+ {
+ get
+ {
+#if FEATURE_CORECLR
+ return false; // methods aren't exposed outside of mscorlib
+#else
+ return true; // have to assume they are overridden as they're exposed
+#endif
+ }
+ }
+
+ [HostProtection(ExternalThreading=true)]
public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
{
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
- return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
+ return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
}
[HostProtection(ExternalThreading = true)]
- internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
+ internal IAsyncResult BeginReadInternal(
+ byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
+ bool serializeAsynchronously, bool apm)
{
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
if (!CanRead) __Error.ReadNotSupported();
// Create the task to asynchronously do a Read. This task serves both
// as the asynchronous work item and as the IAsyncResult returned to the user.
- var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
+ var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
{
// The ReadWriteTask stores all of the parameters to pass to Read.
// As we're currently inside of it, we can get the current task
var thisTask = Task.InternalCurrent as ReadWriteTask;
Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
- // Do the Read and return the number of bytes read
- var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
- thisTask.ClearBeginState(); // just to help alleviate some memory pressure
- return bytesRead;
+ try
+ {
+ // Do the Read and return the number of bytes read
+ return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
+ }
+ finally
+ {
+ // If this implementation is part of Begin/EndXx, then the EndXx method will handle
+ // finishing the async operation. However, if this is part of XxAsync, then there won't
+ // be an end method, and this task is responsible for cleaning up.
+ if (!thisTask._apm)
+ {
+ thisTask._stream.FinishTrackingAsyncOperation();
+ }
+
+ thisTask.ClearBeginState(); // just to help alleviate some memory pressure
+ }
}, state, this, buffer, offset, count, callback);
// Schedule it
}
finally
{
- _activeReadWriteTask = null;
- Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
- _asyncActiveSemaphore.Release();
+ FinishTrackingAsyncOperation();
}
#endif
}
}
private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
- {
+ {
+ if (!OverridesBeginEnd)
+ {
+ return (Task<Int32>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
+ }
+
return TaskFactory<Int32>.FromAsyncTrim(
this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
(stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
{
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
- return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
+ return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
}
[HostProtection(ExternalThreading = true)]
- internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
+ internal IAsyncResult BeginWriteInternal(
+ byte[] buffer, int offset, int count, AsyncCallback callback, Object state,
+ bool serializeAsynchronously, bool apm)
{
Contract.Ensures(Contract.Result<IAsyncResult>() != null);
if (!CanWrite) __Error.WriteNotSupported();
// Create the task to asynchronously do a Write. This task serves both
// as the asynchronous work item and as the IAsyncResult returned to the user.
- var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
+ var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
{
// The ReadWriteTask stores all of the parameters to pass to Write.
// As we're currently inside of it, we can get the current task
var thisTask = Task.InternalCurrent as ReadWriteTask;
Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
- // Do the Write
- thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
- thisTask.ClearBeginState(); // just to help alleviate some memory pressure
- return 0; // not used, but signature requires a value be returned
+ try
+ {
+ // Do the Write
+ thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
+ return 0; // not used, but signature requires a value be returned
+ }
+ finally
+ {
+ // If this implementation is part of Begin/EndXx, then the EndXx method will handle
+ // finishing the async operation. However, if this is part of XxAsync, then there won't
+ // be an end method, and this task is responsible for cleaning up.
+ if (!thisTask._apm)
+ {
+ thisTask._stream.FinishTrackingAsyncOperation();
+ }
+
+ thisTask.ClearBeginState(); // just to help alleviate some memory pressure
+ }
}, state, this, buffer, offset, count, callback);
// Schedule it
readWriteTask.m_taskScheduler = TaskScheduler.Default;
readWriteTask.ScheduleAndStart(needsProtection: false);
}
+
+ private void FinishTrackingAsyncOperation()
+ {
+ _activeReadWriteTask = null;
+ Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
+ _asyncActiveSemaphore.Release();
+ }
#endif
public virtual void EndWrite(IAsyncResult asyncResult)
}
finally
{
- _activeReadWriteTask = null;
- Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
- _asyncActiveSemaphore.Release();
+ FinishTrackingAsyncOperation();
}
#endif
}
// with a single allocation.
private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
{
- internal readonly bool _isRead;
+ internal readonly bool _isRead;
+ internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
internal Stream _stream;
internal byte [] _buffer;
internal int _offset;
[MethodImpl(MethodImplOptions.NoInlining)]
public ReadWriteTask(
bool isRead,
+ bool apm,
Func<object,int> function, object state,
Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
// Store the arguments
_isRead = isRead;
+ _apm = apm;
_stream = stream;
_buffer = buffer;
_offset = offset;
private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
- {
+ {
+ if (!OverridesBeginEnd)
+ {
+ return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
+ }
+
return TaskFactory<VoidTaskResult>.FromAsyncTrim(
this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
(stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
// _stream due to this call blocked while holding the lock.
return _overridesBeginRead.Value ?
_stream.BeginRead(buffer, offset, count, callback, state) :
- _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
+ _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
}
}
// _stream due to this call blocked while holding the lock.
return _overridesBeginWrite.Value ?
_stream.BeginWrite(buffer, offset, count, callback, state) :
- _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
+ _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
}
}