}
}
- protected async Task AssertCanceledAsync(CancellationToken cancellationToken, Func<Task> testCode)
- {
- OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(testCode);
- if (cancellationToken.CanBeCanceled)
- {
- Assert.Equal(cancellationToken, oce.CancellationToken);
- }
- }
-
protected async Task ValidatePrecanceledOperations_ThrowsCancellationException(Stream stream)
{
var cts = new CancellationTokenSource();
if (stream.CanRead)
{
- await AssertCanceledAsync(cts.Token, () => stream.ReadAsync(new byte[1], 0, 1, cts.Token));
- await AssertCanceledAsync(cts.Token, async () => { await stream.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
+ await AssertExtensions.CanceledAsync(cts.Token, stream.ReadAsync(new byte[1], 0, 1, cts.Token));
+ await AssertExtensions.CanceledAsync(cts.Token, async () => { await stream.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
}
if (stream.CanWrite)
{
- await AssertCanceledAsync(cts.Token, () => stream.WriteAsync(new byte[1], 0, 1, cts.Token));
- await AssertCanceledAsync(cts.Token, async () => { await stream.WriteAsync(new ReadOnlyMemory<byte>(new byte[1]), cts.Token); });
+ await AssertExtensions.CanceledAsync(cts.Token, stream.WriteAsync(new byte[1], 0, 1, cts.Token));
+ await AssertExtensions.CanceledAsync(cts.Token, async () => { await stream.WriteAsync(new ReadOnlyMemory<byte>(new byte[1]), cts.Token); });
}
Exception e = await Record.ExceptionAsync(() => stream.FlushAsync(cts.Token));
Task<int> t = stream.ReadAsync(new byte[1], 0, 1, cts.Token);
cts.CancelAfter(cancellationDelay);
- await AssertCanceledAsync(cts.Token, () => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
}
protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay)
Task<int> t = stream.ReadAsync(new byte[1], cts.Token).AsTask();
cts.CancelAfter(cancellationDelay);
- await AssertCanceledAsync(cts.Token, () => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
}
protected async Task WhenAllOrAnyFailed(Task task1, Task task2)
cts = new CancellationTokenSource();
cts.Cancel();
- await AssertCanceledAsync(cts.Token, () => readable.ReadAsync(new byte[1], 0, 1, cts.Token));
- await AssertCanceledAsync(cts.Token, async () => { await readable.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
+ await AssertExtensions.CanceledAsync(cts.Token, readable.ReadAsync(new byte[1], 0, 1, cts.Token));
+ await AssertExtensions.CanceledAsync(cts.Token, async () => { await readable.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
cts = new CancellationTokenSource();
Task<int> t = readable.ReadAsync(new byte[1], 0, 1, cts.Token);
cts.Cancel();
- await AssertCanceledAsync(cts.Token, () => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
cts = new CancellationTokenSource();
ValueTask<int> vt = readable.ReadAsync(new Memory<byte>(new byte[1]), cts.Token);
cts.Cancel();
- await AssertCanceledAsync(cts.Token, async () => await vt);
+ await AssertExtensions.CanceledAsync(cts.Token, vt.AsTask());
byte[] buffer = new byte[1];
vt = readable.ReadAsync(new Memory<byte>(buffer));
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
+using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Sdk;
}
}
+ public static void Canceled(CancellationToken cancellationToken, Action testCode)
+ {
+ OperationCanceledException oce = Assert.ThrowsAny<OperationCanceledException>(testCode);
+ if (cancellationToken.CanBeCanceled)
+ {
+ Assert.Equal(cancellationToken, oce.CancellationToken);
+ }
+ }
+
+ public static Task CanceledAsync(CancellationToken cancellationToken, Task task)
+ {
+ Assert.NotNull(task);
+ return CanceledAsync(cancellationToken, () => task);
+ }
+
+ public static async Task CanceledAsync(CancellationToken cancellationToken, Func<Task> testCode)
+ {
+ OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(testCode);
+ if (cancellationToken.CanBeCanceled)
+ {
+ Assert.Equal(cancellationToken, oce.CancellationToken);
+ }
+ }
+
private static string AddOptionalUserMessage(string message, string userMessage)
{
if (userMessage == null)
internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
{
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
+ }
+
CompletionData completionData;
ValueTask<FlushResult> result;
lock (SyncObj)
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
}
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
+ }
+
CompletionData completionData;
ValueTask<FlushResult> result;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state)
{
- cancellationToken.ThrowIfCancellationRequested();
-
// Don't register if already completed, we would immediately unregistered in ObserveCancellation
if (cancellationToken.CanBeCanceled && !IsCompleted)
{
}
[Fact]
- public void FlushAsyncThrowsIfPassedCanceledCancellationToken()
+ public async Task FlushAsyncThrowsIfPassedCanceledCancellationToken()
{
var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
- Assert.Throws<OperationCanceledException>(() => Pipe.Writer.FlushAsync(cancellationTokenSource.Token));
+ ValueTask<FlushResult> task = Pipe.Writer.FlushAsync(cancellationTokenSource.Token);
+ Assert.True(task.IsCanceled);
+ await AssertExtensions.CanceledAsync(cancellationTokenSource.Token, async () => await task);
}
[Fact]
// and not only setting IsCompleted flag
var task = Pipe.Reader.ReadAsync().AsTask();
- await Assert.ThrowsAsync<OperationCanceledException>(async () => await Pipe.Writer.FlushAsync(cancellationTokenSource.Token));
+ await AssertExtensions.CanceledAsync(cancellationTokenSource.Token, async () => await Pipe.Writer.FlushAsync(cancellationTokenSource.Token));
Pipe.Writer.Complete();
NetEventSource.Info(this, $"{this} Stream writing memory of '{buffer.Length}' bytes while {(completeWrites ? "completing" : "not completing")} writes.");
}
- if (_sendTcs.IsCompleted)
+ if (_sendTcs.IsCompleted && cancellationToken.IsCancellationRequested)
{
// Special case exception type for pre-canceled token while we've already transitioned to a final state and don't need to abort write.
// It must happen before we try to get the value task, since the task source is versioned and each instance must be awaited.
- cancellationToken.ThrowIfCancellationRequested();
+ return ValueTask.FromCanceled(cancellationToken);
}
// Concurrent call, this one lost the race.
var cts = new CancellationTokenSource();
- Task write1 = c.Writer.WriteAsync(43, cts.Token).AsTask();
- Assert.Equal(TaskStatus.WaitingForActivation, write1.Status);
+ ValueTask write1 = c.Writer.WriteAsync(43, cts.Token);
+ Assert.False(write1.IsCompleted);
cts.Cancel();
Assert.Equal(42, await c.Reader.ReadAsync());
Assert.Equal(44, await c.Reader.ReadAsync());
- await AssertCanceled(write1, cts.Token);
+ await AssertExtensions.CanceledAsync(cts.Token, async () => await write1);
await write2;
}
var cts = new CancellationTokenSource();
cts.Cancel();
Assert.True(c.Writer.TryComplete(new OperationCanceledException(cts.Token)));
- await AssertCanceled(c.Reader.Completion, cts.Token);
+ await AssertExtensions.CanceledAsync(cts.Token, c.Reader.Completion);
}
[Fact]
catch (Exception e) { exc = e; }
c.Writer.Complete(exc);
- await AssertCanceled(c.Reader.Completion, cts.Token);
+ await AssertExtensions.CanceledAsync(cts.Token, c.Reader.Completion);
}
[Fact]
cts.Cancel();
- await AssertCanceled(r.AsTask(), cts.Token);
+ await AssertExtensions.CanceledAsync(cts.Token, async () => await r);
if (c.Writer.TryWrite(42))
{
var cts = new CancellationTokenSource();
ValueTask<int> r = c.Reader.ReadAsync(cts.Token);
cts.Cancel();
- await AssertCanceled(r.AsTask(), cts.Token);
+ await AssertExtensions.CanceledAsync(cts.Token, async () => await r);
}
for (int i = 0; i < 7; i++)
}
}
- protected async Task AssertCanceled(Task task, CancellationToken token)
- {
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => task);
- AssertSynchronouslyCanceled(task, token);
- }
-
protected void AssertSynchronousSuccess<T>(ValueTask<T> task) => Assert.True(task.IsCompletedSuccessfully);
protected void AssertSynchronousSuccess(ValueTask task) => Assert.True(task.IsCompletedSuccessfully);
protected void AssertSynchronousSuccess(Task task) => Assert.Equal(TaskStatus.RanToCompletion, task.Status);
{
RunCompletionAction(state =>
{
- try { ((SendAsyncSource<TOutput>)state!).TrySetCanceled(); }
+ SendAsyncSource<TOutput> source = (SendAsyncSource<TOutput>)state!;
+ try
+ {
+ source.TrySetCanceled(source._cancellationToken);
+ }
catch (ObjectDisposedException) { }
}, this, runAsync);
}
}
/// <summary>Cancels a CancellationTokenSource passed as the object state argument.</summary>
- private static readonly Action<object?> _cancelCts = state => ((CancellationTokenSource)state!).Cancel();
+ private static readonly Action<object?> _cancelCts = static state => ((CancellationTokenSource)state!).Cancel();
/// <summary>Receives an item from the source by linking a temporary target from it.</summary>
/// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
{
// Task final state: RanToCompletion
case ReceiveCoreByLinkingCleanupReason.Success:
- System.Threading.Tasks.Task.Factory.StartNew(state =>
+ System.Threading.Tasks.Task.Factory.StartNew(static state =>
{
// Complete with the received value
var target = (ReceiveTarget<T>)state!;
// Task final state: Canceled
case ReceiveCoreByLinkingCleanupReason.Cancellation:
- System.Threading.Tasks.Task.Factory.StartNew(state =>
+ System.Threading.Tasks.Task.Factory.StartNew(static state =>
{
// Complete as canceled
var target = (ReceiveTarget<T>)state!;
if (cancellationToken.CanBeCanceled)
{
// When cancellation is requested, unlink the target from the source and cancel the target.
- target._ctr = cancellationToken.Register(OutputAvailableAsyncTarget<TOutput>.s_cancelAndUnlink, target);
+ target._ctr = cancellationToken.Register(
+#if NET6_0_OR_GREATER
+ OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink,
+#else
+ static state => OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink(state, default),
+#endif
+ target);
}
- // We can't return the task directly, as the source block will be completing the task synchronously,
- // and thus any synchronous continuations would run as part of the source block's call. We don't have to worry
- // about cancellation, as we've coded cancellation to complete the task asynchronously, and with the continuation
- // set as NotOnCanceled, so the continuation will be canceled immediately when the antecedent is canceled, which
- // will thus be asynchronously from the cancellation token source's cancellation call.
- return target.Task.ContinueWith(
- OutputAvailableAsyncTarget<TOutput>.s_handleCompletion, target,
- CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
+ return target.Task;
}
catch (Exception exc)
{
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
private sealed class OutputAvailableAsyncTarget<T> : TaskCompletionSource<bool>, ITargetBlock<T>, IDebuggerDisplay
{
+ public OutputAvailableAsyncTarget() :
+ base(TaskCreationOptions.RunContinuationsAsynchronously)
+ {
+ }
+
/// <summary>
/// Cached continuation delegate that unregisters from cancellation and
/// marshals the antecedent's result to the return value.
return antecedent.GetAwaiter().GetResult();
};
- /// <summary>
- /// Cached delegate that cancels the target and unlinks the target from the source.
- /// Expects an OutputAvailableAsyncTarget as the state argument.
- /// </summary>
- internal static readonly Action<object?> s_cancelAndUnlink = CancelAndUnlink;
-
/// <summary>Cancels the target and unlinks the target from the source.</summary>
/// <param name="state">An OutputAvailableAsyncTarget.</param>
- private static void CancelAndUnlink(object? state)
+ /// <param name="cancellationToken">The token that triggered cancellation</param>
+ internal static void CancelAndUnlink(object? state, CancellationToken cancellationToken)
{
var target = state as OutputAvailableAsyncTarget<T>;
Debug.Assert(target != null, "Expected a non-null target");
- // Cancel asynchronously so that we're not completing the task as part of the cts.Cancel() call,
- // since synchronous continuations off that task would then run as part of Cancel.
- // Take advantage of this task and unlink from there to avoid doing the interlocked operation synchronously.
- System.Threading.Tasks.Task.Factory.StartNew(tgt =>
- {
- var thisTarget = (OutputAvailableAsyncTarget<T>)tgt!;
- thisTarget.TrySetCanceled();
- thisTarget.AttemptThreadSafeUnlink();
- },
- target, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
+ target.TrySetCanceled(cancellationToken);
+ target.AttemptThreadSafeUnlink();
}
/// <summary>Disposes of _unlinker if the target has been linked.</summary>
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
}
- #endregion
+#endregion
#region Encapsulate
/// <summary>Encapsulates a target and a source into a single propagator.</summary>
}
else
{
- result.TrySetCanceled();
+ result.TrySetCanceled(dataflowBlockOptions.CancellationToken);
}
// By now we know that all of the tasks have completed, so there
// Handle async cancellation by canceling the target without storing it into _completed.
// _completed must only be set to a RanToCompletion task for a successful branch.
Common.WireCancellationToComplete(cancellationToken, base.Task,
- state =>
+ static (state, cancellationToken) =>
{
var thisChooseTarget = (ChooseTarget<T>)state!;
- lock (thisChooseTarget._completed) thisChooseTarget.TrySetCanceled();
+ lock (thisChooseTarget._completed) thisChooseTarget.TrySetCanceled(cancellationToken);
}, this);
}
internal static IObservable<TOutput> From(ISourceBlock<TOutput> source)
{
Debug.Assert(source != null, "Requires a source for which to retrieve the observable.");
- return _table.GetValue(source, s => new SourceObservable<TOutput>(s));
+ return _table.GetValue(source, static s => new SourceObservable<TOutput>(s));
}
/// <summary>Object used to synchronize all subscriptions, unsubscriptions, and propagations.</summary>
// Return a disposable that will unlink this observer, and if it's the last
// observer for the source, shut off the pipe to observers.
- return Disposables.Create((s, o) => s.Unsubscribe(o), this, observer);
+ return Disposables.Create(static (s, o) => s.Unsubscribe(o), this, observer);
}
}
// If the target block fails due to an unexpected exception (e.g. it calls back to the source and the source throws an error),
// we fault currently registered observers and reset the observable.
Target.Completion.ContinueWith(
- (t, state) => ((ObserversState)state!).NotifyObserversOfCompletion(t.Exception!), this,
+ static (t, state) => ((ObserversState)state!).NotifyObserversOfCompletion(t.Exception!), this,
CancellationToken.None,
Common.GetContinuationOptions(TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
TaskScheduler.Default);
// When the source completes, complete the target. Then when the target completes,
// send completion messages to any observers still registered.
Task? sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(Observable._source);
- sourceCompletionTask?.ContinueWith((_1, state1) =>
+ sourceCompletionTask?.ContinueWith(static (_1, state1) =>
{
var ti = (ObserversState)state1!;
ti.Target.Complete();
ti.Target.Completion.ContinueWith(
- (_2, state2) => ((ObserversState)state2!).NotifyObserversOfCompletion(), state1,
+ static (_2, state2) => ((ObserversState)state2!).NotifyObserversOfCompletion(), state1,
CancellationToken.None,
Common.GetContinuationOptions(TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
TaskScheduler.Default);
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
Task IDataflowBlock.Completion
{
- get { return LazyInitializer.EnsureInitialized(ref _completion, () => new TaskCompletionSource<VoidResult>().Task); }
+ get { return LazyInitializer.EnsureInitialized(ref _completion, static () => new TaskCompletionSource<VoidResult>().Task); }
}
}
#endregion
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
+ dataflowBlockOptions.CancellationToken, Completion, static (state, _) => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
}
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
else
{
// Otherwise, join with the asynchronous operation when it completes.
- task.ContinueWith((completed, state) =>
+ task.ContinueWith(static (completed, state) =>
{
((ActionBlock<TInput>)state!).AsyncCompleteProcessMessageWithTask(completed);
}, this, CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
Func<ISourceBlock<T[]>, T[], IList<T[]>?, int>? itemCountingFunc = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
{
- onItemsRemoved = (owningSource, count) => ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count);
- itemCountingFunc = (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems);
+ onItemsRemoved = static (owningSource, count) => ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count);
+ itemCountingFunc = static (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems);
}
// Initialize source
_source = new SourceCore<T[]>(this, dataflowBlockOptions,
- owningSource => ((BatchBlock<T>)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false),
+ static owningSource => ((BatchBlock<T>)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false),
onItemsRemoved, itemCountingFunc);
// Initialize target
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BatchBlock<T>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchBlockTargetCore)state!).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target);
+ dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchBlockTargetCore)state!).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
- _nonGreedyState!.TaskForInputProcessing = new Task(thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget!).ProcessMessagesLoopCore(), this,
+ _nonGreedyState!.TaskForInputProcessing = new Task(static thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget!).ProcessMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
// Configure the source
_source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
- this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
+ this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
// The action to run when a batch should be created. This is typically called
// when we have a full batch, but it will also be called when we're done receiving
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BatchedJoinBlock<T1, T2>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2>)state!).CompleteEachTarget(), this);
+ dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2>)state!).CompleteEachTarget(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// Configure the source
_source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
- this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
+ this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
// The action to run when a batch should be created. This is typically called
// when we have a full batch, but it will also be called when we're done receiving
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2, T3>)state!).CompleteEachTarget(), this);
+ dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2, T3>)state!).CompleteEachTarget(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BroadcastBlock<T>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BroadcastBlock<T>)state!).Complete(), this);
+ dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BroadcastBlock<T>)state!).Complete(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
_boundingState.TaskForInputProcessing =
- new Task(state => ((BroadcastBlock<T>)state!).ConsumeMessagesLoopCore(), this,
+ new Task(static state => ((BroadcastBlock<T>)state!).ConsumeMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
// which means calling back to the source, which means we need to escape the incoming lock.
if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
{
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
var thisBroadcastBlock = (BroadcastBlock<T>)state!;
// However, now that _decliningPermanently has been set, the timing of
// CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
// and take the necessary locks in a situation where we're sure it won't cause a problem.
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
var thisSourceCore = (BroadcastingSourceCore<TOutput>)state!;
lock (thisSourceCore.OutgoingLock)
{
// Create task and store into _taskForOutputProcessing prior to scheduling the task
// so that _taskForOutputProcessing will be visibly set in the task loop.
- _taskForOutputProcessing = new Task(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
+ _taskForOutputProcessing = new Task(static thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
// Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
// Re-take the locks on a separate thread.
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
var thisSourceCore = (BroadcastingSourceCore<TOutput>)state!;
lock (thisSourceCore.OutgoingLock)
_completionReserved = true;
// Run asynchronously to get out of the currently held locks
- Task.Factory.StartNew(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).CompleteBlockOncePossible(),
+ Task.Factory.StartNew(static thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).CompleteBlockOncePossible(),
this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
// It's due to cancellation, finish in a canceled state
else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
{
- _completionTask.TrySetCanceled();
+ _completionTask.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
}
// Otherwise, finish in a successful state.
else
Action<ISourceBlock<T>, int>? onItemsRemoved = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
{
- onItemsRemoved = (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
+ onItemsRemoved = static (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
_boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
}
// Initialize the source state
_source = new SourceCore<T>(this, dataflowBlockOptions,
- owningSource => ((BufferBlock<T>)owningSource).Complete(),
+ static owningSource => ((BufferBlock<T>)owningSource).Complete(),
onItemsRemoved);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((BufferBlock<T>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _source.Completion, owningSource => ((BufferBlock<T>)owningSource!).Complete(), this);
+ dataflowBlockOptions.CancellationToken, _source.Completion, static (owningSource, _) => ((BufferBlock<T>)owningSource!).Complete(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
_boundingState.TaskForInputProcessing =
- new Task(state => ((BufferBlock<T>)state!).ConsumeMessagesLoopCore(), this,
+ new Task(static state => ((BufferBlock<T>)state!).ConsumeMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
// which means calling back to the source, which means we need to escape the incoming lock.
if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
{
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
var thisBufferBlock = (BufferBlock<T>)state!;
// Initialize bounding state if necessary
Action<ISourceBlock<Tuple<T1, T2>>, int>? onItemsRemoved = null;
- if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = (owningSource, count) => ((JoinBlock<T1, T2>)owningSource)._sharedResources.OnItemsRemoved(count);
+ if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = static (owningSource, count) => ((JoinBlock<T1, T2>)owningSource)._sharedResources.OnItemsRemoved(count);
// Configure the source
_source = new SourceCore<Tuple<T1, T2>>(this, dataflowBlockOptions,
- owningSource => ((JoinBlock<T1, T2>)owningSource)._sharedResources.CompleteEachTarget(),
+ static owningSource => ((JoinBlock<T1, T2>)owningSource)._sharedResources.CompleteEachTarget(),
onItemsRemoved);
// Configure targets
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((JoinBlock<T1, T2>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _source.Completion, state => ((JoinBlock<T1, T2>)state!)._sharedResources.CompleteEachTarget(), this);
+ dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((JoinBlock<T1, T2>)state!)._sharedResources.CompleteEachTarget(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// Initialize bounding state if necessary
Action<ISourceBlock<Tuple<T1, T2, T3>>, int>? onItemsRemoved = null;
- if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = (owningSource, count) => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.OnItemsRemoved(count);
+ if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = static (owningSource, count) => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.OnItemsRemoved(count);
// Configure the source
_source = new SourceCore<Tuple<T1, T2, T3>>(this, dataflowBlockOptions,
- owningSource => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.CompleteEachTarget(),
+ static owningSource => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.CompleteEachTarget(),
onItemsRemoved);
// Configure the targets
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((JoinBlock<T1, T2, T3>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _source.Completion, state => ((JoinBlock<T1, T2, T3>)state!)._sharedResources.CompleteEachTarget(), this);
+ dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((JoinBlock<T1, T2, T3>)state!)._sharedResources.CompleteEachTarget(), this);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
- _taskForInputProcessing = new Task(thisSharedResources => ((JoinBlockTargetSharedResources)thisSharedResources!).ProcessMessagesLoopCore(), this,
+ _taskForInputProcessing = new Task(static thisSharedResources => ((JoinBlockTargetSharedResources)thisSharedResources!).ProcessMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
{
etwLog.TaskLaunchedForMessageHandling(
_ownerJoin, _taskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
- _targets.Max(t => t.NumberOfMessagesAvailableOrPostponed));
+ _targets.Max(static t => t.NumberOfMessagesAvailableOrPostponed));
}
// Start the task handling scheduling exceptions
_decliningPermanently = true;
// Complete each target asynchronously so as not to invoke synchronous continuations under a lock
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
var sharedResources = (JoinBlockTargetSharedResources)state!;
foreach (JoinBlockTargetBase target in sharedResources._targets) target.CompleteOncePossible();
// Initialize onItemsRemoved delegate if necessary
Action<ISourceBlock<TOutput>, int>? onItemsRemoved = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
- onItemsRemoved = (owningSource, count) => ((TransformBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
+ onItemsRemoved = static (owningSource, count) => ((TransformBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
// Initialize source component.
_source = new SourceCore<TOutput>(this, dataflowBlockOptions,
- owningSource => ((TransformBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
+ static owningSource => ((TransformBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
onItemsRemoved);
// If parallelism is employed, we will need to support reordering messages that complete out-of-order.
// However, a developer can override this with EnsureOrdered == false.
if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
{
- _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
+ _reorderingBuffer = new ReorderingBuffer<TOutput>(this, static (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
}
// Create the underlying target
// As the target has completed, and as the target synchronously pushes work
// through the reordering buffer when async processing completes,
// we know for certain that no more messages will need to be sent to the source.
- _target.Completion.ContinueWith((completed, state) =>
+ _target.Completion.ContinueWith(static (completed, state) =>
{
var sourceCore = (SourceCore<TOutput>)state!;
if (completed.IsFaulted) sourceCore.AddAndUnwrapAggregateException(completed.Exception!);
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- _source.Completion.ContinueWith((completed, state) =>
+ _source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((TransformBlock<TInput, TOutput>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _target);
+ dataflowBlockOptions.CancellationToken, Completion, static (state, _) => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _target);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
}
// Otherwise, join with the asynchronous operation when it completes.
- task.ContinueWith((completed, state) =>
+ task.ContinueWith(static (completed, state) =>
{
var tuple = (Tuple<TransformBlock<TInput, TOutput>, KeyValuePair<TInput, long>>)state!;
tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
#if DEBUG
// Task returned from ProcessMessageAsync is explicitly ignored.
// That function handles all exceptions.
- t.ContinueWith(t => Debug.Assert(t.IsCompletedSuccessfully), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
+ t.ContinueWith(static t => Debug.Assert(t.IsCompletedSuccessfully), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
#endif
}, dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion);
}
Action<ISourceBlock<TOutput>, int>? onItemsRemoved = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
{
- onItemsRemoved = (owningSource, count) => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
+ onItemsRemoved = static (owningSource, count) => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
}
// Initialize source component
source = new SourceCore<TOutput>(this, dataflowBlockOptions,
- owningSource => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
+ static owningSource => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
onItemsRemoved);
// If parallelism is employed, we will need to support reordering messages that complete out-of-order.
if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
{
reorderingBuffer = new ReorderingBuffer<IEnumerable<TOutput>>(
- this, (source, messages) => ((TransformManyBlock<TInput, TOutput>)source)._source.AddMessages(messages));
+ this, static (source, messages) => ((TransformManyBlock<TInput, TOutput>)source)._source.AddMessages(messages));
}
// Create the underlying target and source
// As the target has completed, and as the target synchronously pushes work
// through the reordering buffer when async processing completes,
// we know for certain that no more messages will need to be sent to the source.
- target.Completion.ContinueWith((completed, state) =>
+ target.Completion.ContinueWith(static (completed, state) =>
{
var sourceCore = (SourceCore<TOutput>)state!;
if (completed.IsFaulted) sourceCore.AddAndUnwrapAggregateException(completed.Exception!);
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
- source.Completion.ContinueWith((completed, state) =>
+ source.Completion.ContinueWith(static (completed, state) =>
{
var thisBlock = ((TransformManyBlock<TInput, TOutput>)state!) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), target);
+ dataflowBlockOptions.CancellationToken, Completion, static (state, _) => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), target);
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
// We got back a task. Now wait for it to complete and store its results.
// Unlike with TransformBlock and ActionBlock, We run the continuation on the user-provided
// scheduler as we'll be running user code through enumerating the returned enumerable.
- task.ContinueWith((completed, state) =>
+ task.ContinueWith(static (completed, state) =>
{
var tuple = (Tuple<TransformManyBlock<TInput, TOutput>, KeyValuePair<TInput, long>>)state!;
tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
_completionReserved = _decliningPermanently = true;
// Cancel the completion task's TCS
- _lazyCompletionTaskSource.SetCanceled();
+ _lazyCompletionTaskSource.TrySetCanceled(dataflowBlockOptions.CancellationToken);
}
else
{
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
- dataflowBlockOptions.CancellationToken, _lazyCompletionTaskSource.Task, state => ((WriteOnceBlock<T>)state!).Complete(), this);
+ dataflowBlockOptions.CancellationToken, _lazyCompletionTaskSource.Task, static (state, _) => ((WriteOnceBlock<T>)state!).Complete(), this);
}
}
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (exceptions == null)
{
// Offer the message to any linked targets and complete the block asynchronously to avoid blocking the caller
- var taskForOutputProcessing = new Task(state => ((WriteOnceBlock<T>)state!).OfferToTargetsAndCompleteBlock(), this,
+ var taskForOutputProcessing = new Task(static state => ((WriteOnceBlock<T>)state!).OfferToTargetsAndCompleteBlock(), this,
Common.GetCreationOptionsForTask());
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
else
{
// Complete the block asynchronously to avoid blocking the caller
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
Tuple<WriteOnceBlock<T>, IList<Exception>> blockAndList = (Tuple<WriteOnceBlock<T>, IList<Exception>>)state!;
blockAndList.Item1.CompleteBlock(blockAndList.Item2);
}
else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
{
- CompletionTaskSource.TrySetCanceled();
+ CompletionTaskSource.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
}
else
{
/// <param name="completeAction">An action that will decline permanently on the state passed to it.</param>
/// <param name="completeState">The block on which to decline permanently.</param>
internal static void WireCancellationToComplete(
- CancellationToken cancellationToken, Task completionTask, Action<object?> completeAction, object completeState)
+ CancellationToken cancellationToken, Task completionTask, Action<object?, CancellationToken> completeAction, object completeState)
{
Debug.Assert(completionTask != null, "A task to wire up for completion is needed.");
Debug.Assert(completeAction != null, "An action to invoke upon cancellation is required.");
- // If a cancellation request has already occurred, just invoke the declining action synchronously.
- // CancellationToken would do this anyway but we can short-circuit it further and avoid a bunch of unnecessary checks.
if (cancellationToken.IsCancellationRequested)
{
- completeAction(completeState);
+ // If a cancellation request has already occurred, just invoke the declining action synchronously.
+ // CancellationToken would do this anyway but we can short-circuit it further and avoid a bunch of unnecessary checks.
+ completeAction(completeState, cancellationToken);
}
- // Otherwise, if a cancellation request occurs, we want to prevent the block from accepting additional
- // data, and we also want to dispose of that registration when we complete so that we don't
- // leak into a long-living cancellation token.
else if (cancellationToken.CanBeCanceled)
{
- CancellationTokenRegistration reg = cancellationToken.Register(completeAction, completeState);
- completionTask.ContinueWith((completed, state) => ((CancellationTokenRegistration)state!).Dispose(),
+ // Otherwise, if a cancellation request occurs, we want to prevent the block from accepting additional
+ // data, and we also want to dispose of that registration when we complete so that we don't
+ // leak into a long-living cancellation token.
+ CancellationTokenRegistration reg = cancellationToken.Register(
+#if NET6_0_OR_GREATER
+ completeAction, completeState
+#else
+ state =>
+ {
+ var tuple = (Tuple<Action<object?, CancellationToken>, object, CancellationToken>)state!;
+ tuple.Item1(tuple.Item2, tuple.Item3);
+ },
+ Tuple.Create(completeAction, completeState, cancellationToken)
+#endif
+ );
+ completionTask.ContinueWith(static (completed, state) => ((CancellationTokenRegistration)state!).Dispose(),
reg, cancellationToken, Common.GetContinuationOptions(), TaskScheduler.Default);
}
}
internal static void ThrowAsync(Exception error)
{
ExceptionDispatchInfo edi = ExceptionDispatchInfo.Capture(error);
- ThreadPool.QueueUserWorkItem(state => { ((ExceptionDispatchInfo)state!).Throw(); }, edi);
+ ThreadPool.QueueUserWorkItem(static state => { ((ExceptionDispatchInfo)state!).Throw(); }, edi);
}
/// <summary>Adds the exception to the list, first initializing the list if the list is null.</summary>
{
Debug.Assert(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
Debug.Assert(target != null, "The target where completion is to be propagated may not be null.");
- sourceCompletionTask.ContinueWith((task, state) => Common.PropagateCompletion(task, (IDataflowBlock)state!, AsyncExceptionHandler),
+ sourceCompletionTask.ContinueWith(static (task, state) => Common.PropagateCompletion(task, (IDataflowBlock)state!, AsyncExceptionHandler),
target, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
}
private static class CachedGenericDelegates<T>
{
/// <summary>A function that returns the default value of T.</summary>
- internal static readonly Func<T> DefaultTResultFunc = () => default(T)!;
+ internal static readonly Func<T> DefaultTResultFunc = static () => default(T)!;
/// <summary>
/// A function to use as the body of ActionOnDispose in CreateUnlinkerShim.
/// Passed a tuple of the sync obj, the target registry, and the target block as the state parameter.
if (completionTask.IsFaulted)
{
- try { exceptionData = string.Join(Environment.NewLine, completionTask.Exception!.InnerExceptions.Select(e => e.ToString())); }
+ try { exceptionData = string.Join(Environment.NewLine, completionTask.Exception!.InnerExceptions.Select(static e => e.ToString())); }
catch { }
}
// However, we know that _decliningPermanently has been set, and thus the timing of
// CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
// and take the necessary locks in a situation where we're sure it won't cause a problem.
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
var thisSourceCore = (SourceCore<TOutput>)state!;
lock (thisSourceCore.OutgoingLock)
{
// Create task and store into _taskForOutputProcessing prior to scheduling the task
// so that _taskForOutputProcessing will be visibly set in the task loop.
- _taskForOutputProcessing = new Task(thisSourceCore => ((SourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
+ _taskForOutputProcessing = new Task(static thisSourceCore => ((SourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
// Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
// Re-take the locks on a separate thread.
- Task.Factory.StartNew(state =>
+ Task.Factory.StartNew(static state =>
{
var thisSourceCore = (SourceCore<TOutput>)state!;
lock (thisSourceCore.OutgoingLock)
// Get out from under currently held locks. This is to avoid
// invoking synchronous continuations off of _completionTask.Task
// while holding a lock.
- Task.Factory.StartNew(state => ((SourceCore<TOutput>)state!).CompleteBlockOncePossible(),
+ Task.Factory.StartNew(static state => ((SourceCore<TOutput>)state!).CompleteBlockOncePossible(),
this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
// If it's due to cancellation, finish in a canceled state
else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
{
- _completionTask.TrySetCanceled();
+ _completionTask.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
}
// Otherwise, finish in a successful state.
else
{
// Create a new consumption task and try to set it as current as long as there's still no other task
var newConsumer = new Task(
- state => ((SpscTargetCore<TInput>)state!).ProcessMessagesLoopCore(),
+ static state => ((SpscTargetCore<TInput>)state!).ProcessMessagesLoopCore(),
this, CancellationToken.None, Common.GetCreationOptionsForTask(isReplica));
if (Interlocked.CompareExchange(ref _activeConsumer, newConsumer, null) == null)
{
// by the producer and consumer, a producer calling Fault and the
// processing task processing the user delegate which might throw.
#pragma warning disable 0420
- lock (LazyInitializer.EnsureInitialized(ref _exceptions, () => new List<Exception>()))
+ lock (LazyInitializer.EnsureInitialized(ref _exceptions, static () => new List<Exception>()))
#pragma warning restore 0420
{
_exceptions.Add(exception);
/// <summary>Gets the lazily-initialized completion source.</summary>
private TaskCompletionSource<VoidResult> CompletionSource
{
- get { return LazyInitializer.EnsureInitialized(ref _completionTask, () => new TaskCompletionSource<VoidResult>()); }
+ get { return LazyInitializer.EnsureInitialized(ref _completionTask, static () => new TaskCompletionSource<VoidResult>()); }
}
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
_numberOfOutstandingOperations++;
if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks++;
- var taskForInputProcessing = new Task(thisTargetCore => ((TargetCore<TInput>)thisTargetCore!).ProcessMessagesLoopCore(), this,
+ var taskForInputProcessing = new Task(static thisTargetCore => ((TargetCore<TInput>)thisTargetCore!).ProcessMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(repeat));
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
// Get out from under currently held locks. This is to avoid
// invoking synchronous continuations off of _completionSource.Task
// while holding a lock.
- Task.Factory.StartNew(state => ((TargetCore<TInput>)state!).CompleteBlockOncePossible(),
+ Task.Factory.StartNew(static state => ((TargetCore<TInput>)state!).CompleteBlockOncePossible(),
this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
// If we completed with cancellation, finish in a canceled state
else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
{
- _completionSource.TrySetCanceled();
+ _completionSource.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
}
// Otherwise, finish in a successful state.
else
/// <summary>Gets the number of messages waiting to be processed.</summary>
internal int InputCount { get { return _target._messages.Count; } }
/// <summary>Gets the messages waiting to be processed.</summary>
- internal IEnumerable<TInput> InputQueue { get { return _target._messages.Select(kvp => kvp.Key).ToList(); } }
+ internal IEnumerable<TInput> InputQueue { get { return _target._messages.Select(static kvp => kvp.Key).ToList(); } }
/// <summary>Gets any postponed messages.</summary>
internal QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader>? PostponedMessages
[Fact]
public async Task TestPrecanceledToken()
{
- var options = new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) };
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var options = new ExecutionDataflowBlockOptions { CancellationToken = cts.Token };
var blocks = new []
{
new ActionBlock<int>(i => { }, options),
ab.Complete();
((IDataflowBlock)ab).Fault(new Exception());
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => ab.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, ab.Completion);
}
}
[Fact]
public async Task TestPrecancellation()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var b = new BatchBlock<int>(42, new GroupingDataflowBlockOptions {
- CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1
+ CancellationToken = cts.Token, MaxNumberOfGroups = 1
});
Assert.Equal(expected: 42, actual: b.BatchSize);
Assert.NotNull(b.Completion);
b.Complete(); // verify doesn't throw
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
}
[Fact]
else
{
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
}
Assert.Equal(expected: 0, actual: bb.OutputCount);
[Fact]
public async Task TestPrecanceled2()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var b = new BatchedJoinBlock<int, int>(42,
- new GroupingDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1 });
+ new GroupingDataflowBlockOptions { CancellationToken = cts.Token, MaxNumberOfGroups = 1 });
Tuple<IList<int>, IList<int>> ignoredValue;
IList<Tuple<IList<int>, IList<int>>> ignoredValues;
b.Target1.Complete();
b.Target2.Complete();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
}
[Fact]
public async Task TestPrecanceled3()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var b = new BatchedJoinBlock<int, int, int>(42,
- new GroupingDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1 });
+ new GroupingDataflowBlockOptions { CancellationToken = cts.Token, MaxNumberOfGroups = 1 });
Tuple<IList<int>, IList<int>, IList<int>> ignoredValue;
IList<Tuple<IList<int>, IList<int>, IList<int>>> ignoredValues;
b.Target1.Complete();
b.Target2.Complete();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
}
[Fact]
[Fact]
public async Task TestPrecancellation()
{
- var b = new BroadcastBlock<int>(null, new DataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var b = new BroadcastBlock<int>(null, new DataflowBlockOptions { CancellationToken = cts.Token });
Assert.NotNull(b.LinkTo(DataflowBlock.NullTarget<int>()));
Assert.False(b.Post(42));
Assert.NotNull(b.Completion);
b.Complete(); // verify doesn't throw
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
}
[Fact]
else
{
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
}
await Task.WhenAll(sends);
var buffer = new BufferBlock<int>(new DataflowBlockOptions() { CancellationToken = cts.Token });
buffer.Post(1);
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => buffer.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, buffer.Completion);
Assert.Equal(expected: 0, actual: buffer.Count);
cts = new CancellationTokenSource();
[Fact]
public async Task TestPrecanceled()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var bb = new BufferBlock<int>(
- new DataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+ new DataflowBlockOptions { CancellationToken = cts.Token });
int ignoredValue;
IList<int> ignoredValues;
Assert.False(bb.TryReceiveAll(out ignoredValues));
Assert.False(bb.TryReceive(out ignoredValue));
- Assert.NotNull(bb.Completion);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
bb.Complete(); // just make sure it doesn't throw
}
else
{
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
}
await Task.WhenAll(sends);
ValueTask<bool> vt = e.MoveNextAsync();
Assert.True(vt.IsCompleted);
Assert.False(vt.IsCompletedSuccessfully);
- OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
- Assert.Equal(cts.Token, oce.CancellationToken);
+ await AssertExtensions.CanceledAsync(cts.Token, vt.AsTask());
}
[Fact]
Assert.False(vt.IsCompleted);
cts.Cancel();
- OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
+ await AssertExtensions.CanceledAsync(cts.Token, vt.AsTask());
vt = e.MoveNextAsync();
Assert.True(vt.IsCompletedSuccessfully);
using System.Linq;
using System.Runtime.CompilerServices;
using Xunit;
+using Xunit.Sdk;
namespace System.Threading.Tasks.Dataflow.Tests
{
t = bb.SendAsync(3, cts.Token);
Assert.False(t.IsCompleted);
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
Assert.Equal(expected: 2, actual: await bb.ReceiveAsync());
bb.Complete();
if (withCancellation)
{
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => target.SendAsync(42, cts.Token));
+ await AssertExtensions.CanceledAsync(cts.Token, () => target.SendAsync(42, cts.Token));
}
else
{
}
};
Task<bool> send = target.SendAsync(42, cts.Token);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => send);
+ await AssertExtensions.CanceledAsync(cts.Token, send);
}
[Fact]
var bb = new BufferBlock<int>();
// Cancel before Receive/ReceiveAsync
- Assert.ThrowsAny<OperationCanceledException>(() => bb.Receive(new CancellationToken(canceled: true)));
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.ReceiveAsync(new CancellationToken(canceled: true)));
+ {
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+ AssertExtensions.Canceled(cts.Token, () => bb.Receive(cts.Token));
+ await AssertExtensions.CanceledAsync(cts.Token, bb.ReceiveAsync(cts.Token));
+ }
// Cancel after Receive/ReceiveAsync but before data
{
var cts = new CancellationTokenSource();
var t = bb.ReceiveAsync(cts.Token);
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.ReceiveAsync(cts.Token));
+ await AssertExtensions.CanceledAsync(cts.Token, bb.ReceiveAsync(cts.Token));
}
// Cancel after data received
TaskScheduler usedScheduler = null, requestedScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
var cts = new CancellationTokenSource();
- var t = chooseTestCase < 3 ?
+ var t = chooseTestCase < 2 ?
DataflowBlock.Choose(
source1, i => intValue = i,
source2, s => stringValue = s) :
Assert.Equal(expected: 1, actual: source2.Count);
break;
+ // >= 2 TEST USING DATAFLOW BLOCK OPTIONS
+
case 2: // Test no data on either source
source1.Complete();
source2.Complete();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
Assert.Equal(expected: 0, actual: intValue);
Assert.Null(stringValue);
break;
- // >= 3 TEST USING DATAFLOW BLOCK OPTIONS
-
case 3: // Test correct TaskScheduler is used
source1.Post(42);
await t;
cts.Cancel();
source1.Post(42);
source2.Post("43");
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
Assert.Equal(expected: 1, actual: source1.Count);
Assert.Equal(expected: 1, actual: source2.Count);
break;
TaskScheduler usedScheduler = null, requestedScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
var cts = new CancellationTokenSource();
- var t = chooseTestCase < 7 ?
+ var t = chooseTestCase < 6 ?
DataflowBlock.Choose(
source1, i => intValue = i,
source2, s => stringValue = s,
break;
+ // >= 6 TEST USING DATAFLOW BLOCK OPTIONS
+
case 6: // Test all sources complete
source1.Complete();
source2.Complete();
source3.Complete();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
Assert.Equal(expected: 0, actual: intValue);
Assert.Null(stringValue);
Assert.Equal(expected: 0, actual: doubleValue);
break;
- // >= 7 TEST USING DATAFLOW BLOCK OPTIONS
-
case 7: // Test correct TaskScheduler is used
source3.Post(42);
await t;
source1.Post(42);
source2.Post("43");
source3.Post(44.0);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ await AssertExtensions.CanceledAsync(cts.Token, t);
Assert.Equal(expected: 1, actual: source1.Count);
Assert.Equal(expected: 1, actual: source2.Count);
Assert.Equal(expected: 1, actual: source3.Count);
if (!cancelBeforeChoose)
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => choose);
+ await AssertExtensions.CanceledAsync(cts.Token, choose);
int expectedLinkCount = cancelBeforeChoose ? 0 : 1;
Assert.All(linkCounts, i => Assert.Equal(expected: expectedLinkCount, actual: i));
var t = buffer.OutputAvailableAsync(cts.Token);
Assert.False(t.IsCompleted);
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+ await AssertExtensions.CanceledAsync(
+ PlatformDetection.IsNetFramework ? default : cts.Token, // token doesn't currently flow in netstandard2.0 build due to lack of necessary API
+ t);
cts = new CancellationTokenSource();
t = buffer.OutputAvailableAsync(cts.Token);
}
return DataflowBlock.Encapsulate(transforms[0], transforms[transforms.Length - 1]);
}
-
}
}
[Fact]
public async Task TestPrecancellation2()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var b = new JoinBlock<int, int>(new GroupingDataflowBlockOptions {
- CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1
+ CancellationToken = cts.Token, MaxNumberOfGroups = 1
});
Assert.NotNull(b.LinkTo(DataflowBlock.NullTarget<Tuple<int, int>>()));
Assert.NotNull(b.Completion);
b.Complete();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
}
[Fact]
public async Task TestPrecancellation3()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var b = new JoinBlock<int, int, int>(new GroupingDataflowBlockOptions
{
- CancellationToken = new CancellationToken(canceled: true),
+ CancellationToken = cts.Token,
MaxNumberOfGroups = 1
});
Assert.NotNull(b.Completion);
b.Complete();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
}
[Fact]
cts.Cancel();
foreach (Task<bool> send in sends)
{
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => send);
+ await AssertExtensions.CanceledAsync(cts.Token, send);
}
joinBlock.Target2.Post(1);
[Fact]
public async Task TestCtor()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var blocks = new[] {
new TransformBlock<int, string>(i => i.ToString()),
new TransformBlock<int, string>(i => i.ToString(), new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 }),
blocks = new[] {
new TransformBlock<int, string>(i => i.ToString(),
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) }),
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token }),
new TransformBlock<int, string>(i => Task.Run(() => i.ToString()),
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) })
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token })
};
foreach (var block in blocks)
{
Assert.Equal(expected: 0, actual: block.InputCount);
Assert.Equal(expected: 0, actual: block.OutputCount);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => block.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, block.Completion);
}
}
var tb = new TransformBlock<int, int>(i => i, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
tb.Post(1);
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
Assert.Equal(expected: 0, actual: tb.InputCount);
Assert.Equal(expected: 0, actual: tb.OutputCount);
[Fact]
public async Task TestPrecanceled()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var bb = new TransformBlock<int, int>(i => i,
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
int ignoredValue;
IList<int> ignoredValues;
Assert.False(bb.TryReceive(out ignoredValue));
Assert.NotNull(bb.Completion);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
bb.Complete(); // just make sure it doesn't throw
}
else
{
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
}
Assert.Equal(expected: 0, actual: tb.InputCount);
[Fact]
public async Task TestCtorAsyncEnumerable()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var blocks = new[] {
new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable),
new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 })
var canceledBlock = new TransformManyBlock<int, int>(
DataflowTestHelpers.ToAsyncEnumerable,
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) });
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
Assert.Equal(expected: 0, actual: canceledBlock.InputCount);
Assert.Equal(expected: 0, actual: canceledBlock.OutputCount);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => canceledBlock.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, canceledBlock.Completion);
}
[Fact]
var tb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
tb.Post(1);
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
Assert.Equal(expected: 0, actual: tb.InputCount);
Assert.Equal(expected: 0, actual: tb.OutputCount);
[Fact]
public async Task TestPrecanceledAsyncEnumerable()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var bb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable,
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
IDisposable link = bb.LinkTo(DataflowBlock.NullTarget<int>());
Assert.NotNull(link);
Assert.False(bb.TryReceive(out _));
Assert.NotNull(bb.Completion);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
bb.Complete(); // just make sure it doesn't throw
}
else
{
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
}
Assert.Equal(expected: 0, actual: tb.InputCount);
[Fact]
public async Task TestCtor()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var blocks = new[] {
new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable),
new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable, new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 }),
blocks = new[] {
new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable,
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) }),
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token }),
new TransformManyBlock<int, int>(i => Task.Run(() => DataflowTestHelpers.ToEnumerable(i)),
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) })
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token })
};
foreach (var block in blocks)
{
Assert.Equal(expected: 0, actual: block.InputCount);
Assert.Equal(expected: 0, actual: block.OutputCount);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => block.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, block.Completion);
}
}
var tb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
tb.Post(1);
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
Assert.Equal(expected: 0, actual: tb.InputCount);
Assert.Equal(expected: 0, actual: tb.OutputCount);
[Fact]
public async Task TestPrecanceled()
{
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
var bb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable,
- new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+ new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
int ignoredValue;
IList<int> ignoredValues;
Assert.False(bb.TryReceive(out ignoredValue));
Assert.NotNull(bb.Completion);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
bb.Complete(); // just make sure it doesn't throw
}
else
{
cts.Cancel();
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
}
Assert.Equal(expected: 0, actual: tb.InputCount);
Assert.False(((IReceivableSourceBlock<int>)wob).TryReceiveAll(out ignoredValues));
Assert.NotNull(wob.Completion);
- await Assert.ThrowsAnyAsync<OperationCanceledException>(() => wob.Completion);
+ await AssertExtensions.CanceledAsync(cts.Token, wob.Completion);
}
}
void AssertCanceled(Task t)
{
Assert.True(t.IsCanceled);
- var oce = Assert.ThrowsAny<OperationCanceledException>(() => t.GetAwaiter().GetResult());
- Assert.Equal(cts.Token, oce.CancellationToken);
+ AssertExtensions.CanceledAsync(cts.Token, t).GetAwaiter().GetResult();
}
Func<int, CancellationToken, ValueTask> body = (item, cancellationToken) =>