Propagate cancellation tokens to TrySetCanceled in Dataflow (#80978)
authorStephen Toub <stoub@microsoft.com>
Sat, 28 Jan 2023 16:04:49 +0000 (11:04 -0500)
committerGitHub <noreply@github.com>
Sat, 28 Jan 2023 16:04:49 +0000 (11:04 -0500)
* Propagate cancellation tokens to TrySetCanceled in Dataflow

When the System.Threading.Tasks.Dataflow library was originally written, CancellationTokenSource's TrySetCanceled didn't have an overload that allowed passing in the CancellationToken that was the cause of the cancellation. Now it does, and we no longer build for target platforms that lack the needed overload.  Thus we can update the library to propagate it everywhere that's relevant.  In some cases, to do this well we do need to rely on a newer CancellationToken.Register overload that accepts a delegate which accepts a token, so there's a little bit of ifdef'ing involved still.

While doing this, I also took the opportunity to sprinkle some `static`s onto lambdas, since I was already doing so for some lambdas as part of this fix.

* Fix pipelines handling of cancellation token

Flush operations were synchronously throwing an exception if cancellation was requested prior to the operation.  Cancellation exceptions should always be propagated out through the returned task.

* Fix pre-cancellation in QuicStream.WriteAsync

Cancellation exceptions should flow out through the returned task, not synchronously.

39 files changed:
src/libraries/Common/tests/StreamConformanceTests/System/IO/StreamConformanceTests.cs
src/libraries/Common/tests/TestUtilities/System/AssertExtensions.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs
src/libraries/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs
src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs
src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs
src/libraries/System.Threading.Channels/tests/TestBase.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/ActionBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/BatchBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/BatchedJoinBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/BroadcastBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/BufferBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/JoinBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.IAsyncEnumerable.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/TransformManyBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Blocks/WriteOnceBlock.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Internal/Common.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Internal/DataflowEtwProvider.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SourceCore.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SpscTargetCore.cs
src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/ActionBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/BatchBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/BatchedJoinBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/BroadcastBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/BufferBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowBlockExtensionTests.IAsyncEnumerable.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowBlockExtensionTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/DataflowTestHelper.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/JoinBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.IAsyncEnumerable.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/TransformManyBlockTests.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/WriteOnceBlockTests.cs
src/libraries/System.Threading.Tasks.Parallel/tests/ParallelForEachAsyncTests.cs

index 52be9c0..2c7627d 100644 (file)
@@ -495,15 +495,6 @@ namespace System.IO.Tests
             }
         }
 
-        protected async Task AssertCanceledAsync(CancellationToken cancellationToken, Func<Task> testCode)
-        {
-            OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(testCode);
-            if (cancellationToken.CanBeCanceled)
-            {
-                Assert.Equal(cancellationToken, oce.CancellationToken);
-            }
-        }
-
         protected async Task ValidatePrecanceledOperations_ThrowsCancellationException(Stream stream)
         {
             var cts = new CancellationTokenSource();
@@ -511,14 +502,14 @@ namespace System.IO.Tests
 
             if (stream.CanRead)
             {
-                await AssertCanceledAsync(cts.Token, () => stream.ReadAsync(new byte[1], 0, 1, cts.Token));
-                await AssertCanceledAsync(cts.Token, async () => { await stream.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
+                await AssertExtensions.CanceledAsync(cts.Token, stream.ReadAsync(new byte[1], 0, 1, cts.Token));
+                await AssertExtensions.CanceledAsync(cts.Token, async () => { await stream.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
             }
 
             if (stream.CanWrite)
             {
-                await AssertCanceledAsync(cts.Token, () => stream.WriteAsync(new byte[1], 0, 1, cts.Token));
-                await AssertCanceledAsync(cts.Token, async () => { await stream.WriteAsync(new ReadOnlyMemory<byte>(new byte[1]), cts.Token); });
+                await AssertExtensions.CanceledAsync(cts.Token, stream.WriteAsync(new byte[1], 0, 1, cts.Token));
+                await AssertExtensions.CanceledAsync(cts.Token, async () => { await stream.WriteAsync(new ReadOnlyMemory<byte>(new byte[1]), cts.Token); });
             }
 
             Exception e = await Record.ExceptionAsync(() => stream.FlushAsync(cts.Token));
@@ -540,7 +531,7 @@ namespace System.IO.Tests
             Task<int> t = stream.ReadAsync(new byte[1], 0, 1, cts.Token);
 
             cts.CancelAfter(cancellationDelay);
-            await AssertCanceledAsync(cts.Token, () => t);
+            await AssertExtensions.CanceledAsync(cts.Token, t);
         }
 
         protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay)
@@ -555,7 +546,7 @@ namespace System.IO.Tests
             Task<int> t = stream.ReadAsync(new byte[1], cts.Token).AsTask();
 
             cts.CancelAfter(cancellationDelay);
-            await AssertCanceledAsync(cts.Token, () => t);
+            await AssertExtensions.CanceledAsync(cts.Token, t);
         }
 
         protected async Task WhenAllOrAnyFailed(Task task1, Task task2)
@@ -2584,18 +2575,18 @@ namespace System.IO.Tests
 
                 cts = new CancellationTokenSource();
                 cts.Cancel();
-                await AssertCanceledAsync(cts.Token, () => readable.ReadAsync(new byte[1], 0, 1, cts.Token));
-                await AssertCanceledAsync(cts.Token, async () => { await readable.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
+                await AssertExtensions.CanceledAsync(cts.Token, readable.ReadAsync(new byte[1], 0, 1, cts.Token));
+                await AssertExtensions.CanceledAsync(cts.Token, async () => { await readable.ReadAsync(new Memory<byte>(new byte[1]), cts.Token); });
 
                 cts = new CancellationTokenSource();
                 Task<int> t = readable.ReadAsync(new byte[1], 0, 1, cts.Token);
                 cts.Cancel();
-                await AssertCanceledAsync(cts.Token, () => t);
+                await AssertExtensions.CanceledAsync(cts.Token, t);
 
                 cts = new CancellationTokenSource();
                 ValueTask<int> vt = readable.ReadAsync(new Memory<byte>(new byte[1]), cts.Token);
                 cts.Cancel();
-                await AssertCanceledAsync(cts.Token, async () => await vt);
+                await AssertExtensions.CanceledAsync(cts.Token, vt.AsTask());
 
                 byte[] buffer = new byte[1];
                 vt = readable.ReadAsync(new Memory<byte>(buffer));
index da079a6..479d2ae 100644 (file)
@@ -4,6 +4,7 @@
 using System.Collections.Generic;
 using System.Linq;
 using System.Runtime.InteropServices;
+using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 using Xunit.Sdk;
@@ -243,6 +244,30 @@ namespace System
             }
         }
 
+        public static void Canceled(CancellationToken cancellationToken, Action testCode)
+        {
+            OperationCanceledException oce = Assert.ThrowsAny<OperationCanceledException>(testCode);
+            if (cancellationToken.CanBeCanceled)
+            {
+                Assert.Equal(cancellationToken, oce.CancellationToken);
+            }
+        }
+
+        public static Task CanceledAsync(CancellationToken cancellationToken, Task task)
+        {
+            Assert.NotNull(task);
+            return CanceledAsync(cancellationToken, () => task);
+        }
+
+        public static async Task CanceledAsync(CancellationToken cancellationToken, Func<Task> testCode)
+        {
+            OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(testCode);
+            if (cancellationToken.CanBeCanceled)
+            {
+                Assert.Equal(cancellationToken, oce.CancellationToken);
+            }
+        }
+
         private static string AddOptionalUserMessage(string message, string userMessage)
         {
             if (userMessage == null)
index 85dbc33..07868d6 100644 (file)
@@ -362,6 +362,11 @@ namespace System.IO.Pipelines
 
         internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
         {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
+            }
+
             CompletionData completionData;
             ValueTask<FlushResult> result;
             lock (SyncObj)
@@ -1058,6 +1063,11 @@ namespace System.IO.Pipelines
                 return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
             }
 
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
+            }
+
             CompletionData completionData;
             ValueTask<FlushResult> result;
 
index aba52f1..b6159a9 100644 (file)
@@ -45,8 +45,6 @@ namespace System.IO.Pipelines
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state)
         {
-            cancellationToken.ThrowIfCancellationRequested();
-
             // Don't register if already completed, we would immediately unregistered in ObserveCancellation
             if (cancellationToken.CanBeCanceled && !IsCompleted)
             {
index 8b49b88..a5a6157 100644 (file)
@@ -217,12 +217,14 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public void FlushAsyncThrowsIfPassedCanceledCancellationToken()
+        public async Task FlushAsyncThrowsIfPassedCanceledCancellationToken()
         {
             var cancellationTokenSource = new CancellationTokenSource();
             cancellationTokenSource.Cancel();
 
-            Assert.Throws<OperationCanceledException>(() => Pipe.Writer.FlushAsync(cancellationTokenSource.Token));
+            ValueTask<FlushResult> task = Pipe.Writer.FlushAsync(cancellationTokenSource.Token);
+            Assert.True(task.IsCanceled);
+            await AssertExtensions.CanceledAsync(cancellationTokenSource.Token, async () => await task);
         }
 
         [Fact]
@@ -318,7 +320,7 @@ namespace System.IO.Pipelines.Tests
             // and not only setting IsCompleted flag
             var task = Pipe.Reader.ReadAsync().AsTask();
 
-            await Assert.ThrowsAsync<OperationCanceledException>(async () => await Pipe.Writer.FlushAsync(cancellationTokenSource.Token));
+            await AssertExtensions.CanceledAsync(cancellationTokenSource.Token, async () => await Pipe.Writer.FlushAsync(cancellationTokenSource.Token));
 
             Pipe.Writer.Complete();
 
index 4a33824..4de2f95 100644 (file)
@@ -349,11 +349,11 @@ public sealed partial class QuicStream
             NetEventSource.Info(this, $"{this} Stream writing memory of '{buffer.Length}' bytes while {(completeWrites ? "completing" : "not completing")} writes.");
         }
 
-        if (_sendTcs.IsCompleted)
+        if (_sendTcs.IsCompleted && cancellationToken.IsCancellationRequested)
         {
             // Special case exception type for pre-canceled token while we've already transitioned to a final state and don't need to abort write.
             // It must happen before we try to get the value task, since the task source is versioned and each instance must be awaited.
-            cancellationToken.ThrowIfCancellationRequested();
+            return ValueTask.FromCanceled(cancellationToken);
         }
 
         // Concurrent call, this one lost the race.
index d5b6c09..62e5643 100644 (file)
@@ -480,8 +480,8 @@ namespace System.Threading.Channels.Tests
 
             var cts = new CancellationTokenSource();
 
-            Task write1 = c.Writer.WriteAsync(43, cts.Token).AsTask();
-            Assert.Equal(TaskStatus.WaitingForActivation, write1.Status);
+            ValueTask write1 = c.Writer.WriteAsync(43, cts.Token);
+            Assert.False(write1.IsCompleted);
 
             cts.Cancel();
 
@@ -490,7 +490,7 @@ namespace System.Threading.Channels.Tests
             Assert.Equal(42, await c.Reader.ReadAsync());
             Assert.Equal(44, await c.Reader.ReadAsync());
 
-            await AssertCanceled(write1, cts.Token);
+            await AssertExtensions.CanceledAsync(cts.Token, async () => await write1);
             await write2;
         }
 
index f746b48..25c23c9 100644 (file)
@@ -131,7 +131,7 @@ namespace System.Threading.Channels.Tests
             var cts = new CancellationTokenSource();
             cts.Cancel();
             Assert.True(c.Writer.TryComplete(new OperationCanceledException(cts.Token)));
-            await AssertCanceled(c.Reader.Completion, cts.Token);
+            await AssertExtensions.CanceledAsync(cts.Token, c.Reader.Completion);
         }
 
         [Fact]
@@ -450,7 +450,7 @@ namespace System.Threading.Channels.Tests
             catch (Exception e) { exc = e; }
 
             c.Writer.Complete(exc);
-            await AssertCanceled(c.Reader.Completion, cts.Token);
+            await AssertExtensions.CanceledAsync(cts.Token, c.Reader.Completion);
         }
 
         [Fact]
@@ -653,7 +653,7 @@ namespace System.Threading.Channels.Tests
 
             cts.Cancel();
 
-            await AssertCanceled(r.AsTask(), cts.Token);
+            await AssertExtensions.CanceledAsync(cts.Token, async () => await r);
 
             if (c.Writer.TryWrite(42))
             {
@@ -760,7 +760,7 @@ namespace System.Threading.Channels.Tests
                 var cts = new CancellationTokenSource();
                 ValueTask<int> r = c.Reader.ReadAsync(cts.Token);
                 cts.Cancel();
-                await AssertCanceled(r.AsTask(), cts.Token);
+                await AssertExtensions.CanceledAsync(cts.Token, async () => await r);
             }
 
             for (int i = 0; i < 7; i++)
index 7783341..1787344 100644 (file)
@@ -21,12 +21,6 @@ namespace System.Threading.Channels.Tests
             }
         }
 
-        protected async Task AssertCanceled(Task task, CancellationToken token)
-        {
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => task);
-            AssertSynchronouslyCanceled(task, token);
-        }
-
         protected void AssertSynchronousSuccess<T>(ValueTask<T> task) => Assert.True(task.IsCompletedSuccessfully);
         protected void AssertSynchronousSuccess(ValueTask task) => Assert.True(task.IsCompletedSuccessfully);
         protected void AssertSynchronousSuccess(Task task) => Assert.Equal(TaskStatus.RanToCompletion, task.Status);
index fc3ea8c..18b7d40 100644 (file)
@@ -495,7 +495,11 @@ namespace System.Threading.Tasks.Dataflow
             {
                 RunCompletionAction(state =>
                 {
-                    try { ((SendAsyncSource<TOutput>)state!).TrySetCanceled(); }
+                    SendAsyncSource<TOutput> source = (SendAsyncSource<TOutput>)state!;
+                    try
+                    {
+                        source.TrySetCanceled(source._cancellationToken);
+                    }
                     catch (ObjectDisposedException) { }
                 }, this, runAsync);
             }
@@ -1056,7 +1060,7 @@ namespace System.Threading.Tasks.Dataflow
         }
 
         /// <summary>Cancels a CancellationTokenSource passed as the object state argument.</summary>
-        private static readonly Action<object?> _cancelCts = state => ((CancellationTokenSource)state!).Cancel();
+        private static readonly Action<object?> _cancelCts = static state => ((CancellationTokenSource)state!).Cancel();
 
         /// <summary>Receives an item from the source by linking a temporary target from it.</summary>
         /// <typeparam name="TOutput">Specifies the type of data contained in the source.</typeparam>
@@ -1316,7 +1320,7 @@ namespace System.Threading.Tasks.Dataflow
                 {
                     // Task final state: RanToCompletion
                     case ReceiveCoreByLinkingCleanupReason.Success:
-                        System.Threading.Tasks.Task.Factory.StartNew(state =>
+                        System.Threading.Tasks.Task.Factory.StartNew(static state =>
                         {
                             // Complete with the received value
                             var target = (ReceiveTarget<T>)state!;
@@ -1327,7 +1331,7 @@ namespace System.Threading.Tasks.Dataflow
 
                     // Task final state: Canceled
                     case ReceiveCoreByLinkingCleanupReason.Cancellation:
-                        System.Threading.Tasks.Task.Factory.StartNew(state =>
+                        System.Threading.Tasks.Task.Factory.StartNew(static state =>
                         {
                             // Complete as canceled
                             var target = (ReceiveTarget<T>)state!;
@@ -1465,17 +1469,16 @@ namespace System.Threading.Tasks.Dataflow
                 if (cancellationToken.CanBeCanceled)
                 {
                     // When cancellation is requested, unlink the target from the source and cancel the target.
-                    target._ctr = cancellationToken.Register(OutputAvailableAsyncTarget<TOutput>.s_cancelAndUnlink, target);
+                    target._ctr = cancellationToken.Register(
+#if NET6_0_OR_GREATER
+                        OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink,
+#else
+                        static state => OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink(state, default),
+#endif
+                        target);
                 }
 
-                // We can't return the task directly, as the source block will be completing the task synchronously,
-                // and thus any synchronous continuations would run as part of the source block's call.  We don't have to worry
-                // about cancellation, as we've coded cancellation to complete the task asynchronously, and with the continuation
-                // set as NotOnCanceled, so the continuation will be canceled immediately when the antecedent is canceled, which
-                // will thus be asynchronously from the cancellation token source's cancellation call.
-                return target.Task.ContinueWith(
-                    OutputAvailableAsyncTarget<TOutput>.s_handleCompletion, target,
-                    CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
+                return target.Task;
             }
             catch (Exception exc)
             {
@@ -1496,6 +1499,11 @@ namespace System.Threading.Tasks.Dataflow
         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
         private sealed class OutputAvailableAsyncTarget<T> : TaskCompletionSource<bool>, ITargetBlock<T>, IDebuggerDisplay
         {
+            public OutputAvailableAsyncTarget() :
+                base(TaskCreationOptions.RunContinuationsAsynchronously)
+            {
+            }
+
             /// <summary>
             /// Cached continuation delegate that unregisters from cancellation and
             /// marshals the antecedent's result to the return value.
@@ -1508,29 +1516,16 @@ namespace System.Threading.Tasks.Dataflow
                 return antecedent.GetAwaiter().GetResult();
             };
 
-            /// <summary>
-            /// Cached delegate that cancels the target and unlinks the target from the source.
-            /// Expects an OutputAvailableAsyncTarget as the state argument.
-            /// </summary>
-            internal static readonly Action<object?> s_cancelAndUnlink = CancelAndUnlink;
-
             /// <summary>Cancels the target and unlinks the target from the source.</summary>
             /// <param name="state">An OutputAvailableAsyncTarget.</param>
-            private static void CancelAndUnlink(object? state)
+            /// <param name="cancellationToken">The token that triggered cancellation</param>
+            internal static void CancelAndUnlink(object? state, CancellationToken cancellationToken)
             {
                 var target = state as OutputAvailableAsyncTarget<T>;
                 Debug.Assert(target != null, "Expected a non-null target");
 
-                // Cancel asynchronously so that we're not completing the task as part of the cts.Cancel() call,
-                // since synchronous continuations off that task would then run as part of Cancel.
-                // Take advantage of this task and unlink from there to avoid doing the interlocked operation synchronously.
-                System.Threading.Tasks.Task.Factory.StartNew(tgt =>
-                                                            {
-                                                                var thisTarget = (OutputAvailableAsyncTarget<T>)tgt!;
-                                                                thisTarget.TrySetCanceled();
-                                                                thisTarget.AttemptThreadSafeUnlink();
-                                                            },
-                    target, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
+                target.TrySetCanceled(cancellationToken);
+                target.AttemptThreadSafeUnlink();
             }
 
             /// <summary>Disposes of _unlinker if the target has been linked.</summary>
@@ -1585,7 +1580,7 @@ namespace System.Threading.Tasks.Dataflow
             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
         }
-        #endregion
+#endregion
 
         #region Encapsulate
         /// <summary>Encapsulates a target and a source into a single propagator.</summary>
@@ -2120,7 +2115,7 @@ namespace System.Threading.Tasks.Dataflow
                 }
                 else
                 {
-                    result.TrySetCanceled();
+                    result.TrySetCanceled(dataflowBlockOptions.CancellationToken);
                 }
 
                 // By now we know that all of the tasks have completed, so there
@@ -2231,10 +2226,10 @@ namespace System.Threading.Tasks.Dataflow
                 // Handle async cancellation by canceling the target without storing it into _completed.
                 // _completed must only be set to a RanToCompletion task for a successful branch.
                 Common.WireCancellationToComplete(cancellationToken, base.Task,
-                    state =>
+                    static (state, cancellationToken) =>
                     {
                         var thisChooseTarget = (ChooseTarget<T>)state!;
-                        lock (thisChooseTarget._completed) thisChooseTarget.TrySetCanceled();
+                        lock (thisChooseTarget._completed) thisChooseTarget.TrySetCanceled(cancellationToken);
                     }, this);
             }
 
@@ -2326,7 +2321,7 @@ namespace System.Threading.Tasks.Dataflow
             internal static IObservable<TOutput> From(ISourceBlock<TOutput> source)
             {
                 Debug.Assert(source != null, "Requires a source for which to retrieve the observable.");
-                return _table.GetValue(source, s => new SourceObservable<TOutput>(s));
+                return _table.GetValue(source, static s => new SourceObservable<TOutput>(s));
             }
 
             /// <summary>Object used to synchronize all subscriptions, unsubscriptions, and propagations.</summary>
@@ -2403,7 +2398,7 @@ namespace System.Threading.Tasks.Dataflow
 
                         // Return a disposable that will unlink this observer, and if it's the last
                         // observer for the source, shut off the pipe to observers.
-                        return Disposables.Create((s, o) => s.Unsubscribe(o), this, observer);
+                        return Disposables.Create(static (s, o) => s.Unsubscribe(o), this, observer);
                     }
                 }
 
@@ -2529,7 +2524,7 @@ namespace System.Threading.Tasks.Dataflow
                     // If the target block fails due to an unexpected exception (e.g. it calls back to the source and the source throws an error),
                     // we fault currently registered observers and reset the observable.
                     Target.Completion.ContinueWith(
-                        (t, state) => ((ObserversState)state!).NotifyObserversOfCompletion(t.Exception!), this,
+                        static (t, state) => ((ObserversState)state!).NotifyObserversOfCompletion(t.Exception!), this,
                         CancellationToken.None,
                         Common.GetContinuationOptions(TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
                         TaskScheduler.Default);
@@ -2537,12 +2532,12 @@ namespace System.Threading.Tasks.Dataflow
                     // When the source completes, complete the target. Then when the target completes,
                     // send completion messages to any observers still registered.
                     Task? sourceCompletionTask = Common.GetPotentiallyNotSupportedCompletionTask(Observable._source);
-                    sourceCompletionTask?.ContinueWith((_1, state1) =>
+                    sourceCompletionTask?.ContinueWith(static (_1, state1) =>
                     {
                         var ti = (ObserversState)state1!;
                         ti.Target.Complete();
                         ti.Target.Completion.ContinueWith(
-                            (_2, state2) => ((ObserversState)state2!).NotifyObserversOfCompletion(), state1,
+                            static (_2, state2) => ((ObserversState)state2!).NotifyObserversOfCompletion(), state1,
                             CancellationToken.None,
                             Common.GetContinuationOptions(TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.ExecuteSynchronously),
                             TaskScheduler.Default);
@@ -2782,7 +2777,7 @@ namespace System.Threading.Tasks.Dataflow
             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
             Task IDataflowBlock.Completion
             {
-                get { return LazyInitializer.EnsureInitialized(ref _completion, () => new TaskCompletionSource<VoidResult>().Task); }
+                get { return LazyInitializer.EnsureInitialized(ref _completion, static () => new TaskCompletionSource<VoidResult>().Task); }
             }
         }
         #endregion
index b41661a..53b6641 100644 (file)
@@ -117,7 +117,7 @@ namespace System.Threading.Tasks.Dataflow
 
                 // Handle async cancellation requests by declining on the target
                 Common.WireCancellationToComplete(
-                    dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
+                    dataflowBlockOptions.CancellationToken, Completion, static (state, _) => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
             }
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
@@ -190,7 +190,7 @@ namespace System.Threading.Tasks.Dataflow
             else
             {
                 // Otherwise, join with the asynchronous operation when it completes.
-                task.ContinueWith((completed, state) =>
+                task.ContinueWith(static (completed, state) =>
                 {
                     ((ActionBlock<TInput>)state!).AsyncCompleteProcessMessageWithTask(completed);
                 }, this, CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
index 05ca506..6f26215 100644 (file)
@@ -58,13 +58,13 @@ namespace System.Threading.Tasks.Dataflow
             Func<ISourceBlock<T[]>, T[], IList<T[]>?, int>? itemCountingFunc = null;
             if (dataflowBlockOptions.BoundedCapacity > 0)
             {
-                onItemsRemoved = (owningSource, count) => ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count);
-                itemCountingFunc = (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems);
+                onItemsRemoved = static (owningSource, count) => ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count);
+                itemCountingFunc = static (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems);
             }
 
             // Initialize source
             _source = new SourceCore<T[]>(this, dataflowBlockOptions,
-                owningSource => ((BatchBlock<T>)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false),
+                static owningSource => ((BatchBlock<T>)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false),
                 onItemsRemoved, itemCountingFunc);
 
             // Initialize target
@@ -78,7 +78,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((BatchBlock<T>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -87,7 +87,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchBlockTargetCore)state!).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target);
+                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchBlockTargetCore)state!).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -638,7 +638,7 @@ namespace System.Threading.Tasks.Dataflow
 
                 // Create task and store into _taskForInputProcessing prior to scheduling the task
                 // so that _taskForInputProcessing will be visibly set in the task loop.
-                _nonGreedyState!.TaskForInputProcessing = new Task(thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget!).ProcessMessagesLoopCore(), this,
+                _nonGreedyState!.TaskForInputProcessing = new Task(static thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget!).ProcessMessagesLoopCore(), this,
                                                     Common.GetCreationOptionsForTask(isReplacementReplica));
 
                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
index 708e4da..891e632 100644 (file)
@@ -65,7 +65,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Configure the source
             _source = new SourceCore<Tuple<IList<T1>, IList<T2>>>(
-                this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
+                this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2>)owningSource).CompleteEachTarget());
 
             // The action to run when a batch should be created.  This is typically called
             // when we have a full batch, but it will also be called when we're done receiving
@@ -96,7 +96,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((BatchedJoinBlock<T1, T2>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -105,7 +105,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2>)state!).CompleteEachTarget(), this);
+                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2>)state!).CompleteEachTarget(), this);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -316,7 +316,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Configure the source
             _source = new SourceCore<Tuple<IList<T1>, IList<T2>, IList<T3>>>(
-                this, dataflowBlockOptions, owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
+                this, dataflowBlockOptions, static owningSource => ((BatchedJoinBlock<T1, T2, T3>)owningSource).CompleteEachTarget());
 
             // The action to run when a batch should be created.  This is typically called
             // when we have a full batch, but it will also be called when we're done receiving
@@ -348,7 +348,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((BatchedJoinBlock<T1, T2, T3>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -357,7 +357,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchedJoinBlock<T1, T2, T3>)state!).CompleteEachTarget(), this);
+                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BatchedJoinBlock<T1, T2, T3>)state!).CompleteEachTarget(), this);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
index f4f8d33..ae52fe0 100644 (file)
@@ -88,7 +88,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((BroadcastBlock<T>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -97,7 +97,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BroadcastBlock<T>)state!).Complete(), this);
+                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((BroadcastBlock<T>)state!).Complete(), this);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -257,7 +257,7 @@ namespace System.Threading.Tasks.Dataflow
                 // Create task and store into _taskForInputProcessing prior to scheduling the task
                 // so that _taskForInputProcessing will be visibly set in the task loop.
                 _boundingState.TaskForInputProcessing =
-                    new Task(state => ((BroadcastBlock<T>)state!).ConsumeMessagesLoopCore(), this,
+                    new Task(static state => ((BroadcastBlock<T>)state!).ConsumeMessagesLoopCore(), this,
                         Common.GetCreationOptionsForTask(isReplacementReplica));
 
                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -385,7 +385,7 @@ namespace System.Threading.Tasks.Dataflow
                 // which means calling back to the source, which means we need to escape the incoming lock.
                 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
                 {
-                    Task.Factory.StartNew(state =>
+                    Task.Factory.StartNew(static state =>
                     {
                         var thisBroadcastBlock = (BroadcastBlock<T>)state!;
 
@@ -643,7 +643,7 @@ namespace System.Threading.Tasks.Dataflow
                     // However, now that _decliningPermanently has been set, the timing of
                     // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
                     // and take the necessary locks in a situation where we're sure it won't cause a problem.
-                    Task.Factory.StartNew(state =>
+                    Task.Factory.StartNew(static state =>
                     {
                         var thisSourceCore = (BroadcastingSourceCore<TOutput>)state!;
                         lock (thisSourceCore.OutgoingLock)
@@ -834,7 +834,7 @@ namespace System.Threading.Tasks.Dataflow
                 {
                     // Create task and store into _taskForOutputProcessing prior to scheduling the task
                     // so that _taskForOutputProcessing will be visibly set in the task loop.
-                    _taskForOutputProcessing = new Task(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
+                    _taskForOutputProcessing = new Task(static thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
                                                         Common.GetCreationOptionsForTask(isReplacementReplica));
 
                     DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -857,7 +857,7 @@ namespace System.Threading.Tasks.Dataflow
 
                         // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
                         // Re-take the locks on a separate thread.
-                        Task.Factory.StartNew(state =>
+                        Task.Factory.StartNew(static state =>
                         {
                             var thisSourceCore = (BroadcastingSourceCore<TOutput>)state!;
                             lock (thisSourceCore.OutgoingLock)
@@ -949,7 +949,7 @@ namespace System.Threading.Tasks.Dataflow
                 _completionReserved = true;
 
                 // Run asynchronously to get out of the currently held locks
-                Task.Factory.StartNew(thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).CompleteBlockOncePossible(),
+                Task.Factory.StartNew(static thisSourceCore => ((BroadcastingSourceCore<TOutput>)thisSourceCore!).CompleteBlockOncePossible(),
                     this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
             }
 
@@ -988,7 +988,7 @@ namespace System.Threading.Tasks.Dataflow
                 // It's due to cancellation, finish in a canceled state
                 else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
                 {
-                    _completionTask.TrySetCanceled();
+                    _completionTask.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
                 }
                 // Otherwise, finish in a successful state.
                 else
index 2b10be0..73c0f4a 100644 (file)
@@ -57,20 +57,20 @@ namespace System.Threading.Tasks.Dataflow
             Action<ISourceBlock<T>, int>? onItemsRemoved = null;
             if (dataflowBlockOptions.BoundedCapacity > 0)
             {
-                onItemsRemoved = (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
+                onItemsRemoved = static (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
                 _boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
             }
 
             // Initialize the source state
             _source = new SourceCore<T>(this, dataflowBlockOptions,
-                owningSource => ((BufferBlock<T>)owningSource).Complete(),
+                static owningSource => ((BufferBlock<T>)owningSource).Complete(),
                 onItemsRemoved);
 
             // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((BufferBlock<T>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -79,7 +79,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, _source.Completion, owningSource => ((BufferBlock<T>)owningSource!).Complete(), this);
+                dataflowBlockOptions.CancellationToken, _source.Completion, static (owningSource, _) => ((BufferBlock<T>)owningSource!).Complete(), this);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -258,7 +258,7 @@ namespace System.Threading.Tasks.Dataflow
                 // Create task and store into _taskForInputProcessing prior to scheduling the task
                 // so that _taskForInputProcessing will be visibly set in the task loop.
                 _boundingState.TaskForInputProcessing =
-                    new Task(state => ((BufferBlock<T>)state!).ConsumeMessagesLoopCore(), this,
+                    new Task(static state => ((BufferBlock<T>)state!).ConsumeMessagesLoopCore(), this,
                         Common.GetCreationOptionsForTask(isReplacementReplica));
 
                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -388,7 +388,7 @@ namespace System.Threading.Tasks.Dataflow
                 // which means calling back to the source, which means we need to escape the incoming lock.
                 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
                 {
-                    Task.Factory.StartNew(state =>
+                    Task.Factory.StartNew(static state =>
                     {
                         var thisBufferBlock = (BufferBlock<T>)state!;
 
index 7cb8ff8..d281ab7 100644 (file)
@@ -59,11 +59,11 @@ namespace System.Threading.Tasks.Dataflow
 
             // Initialize bounding state if necessary
             Action<ISourceBlock<Tuple<T1, T2>>, int>? onItemsRemoved = null;
-            if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = (owningSource, count) => ((JoinBlock<T1, T2>)owningSource)._sharedResources.OnItemsRemoved(count);
+            if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = static (owningSource, count) => ((JoinBlock<T1, T2>)owningSource)._sharedResources.OnItemsRemoved(count);
 
             // Configure the source
             _source = new SourceCore<Tuple<T1, T2>>(this, dataflowBlockOptions,
-                owningSource => ((JoinBlock<T1, T2>)owningSource)._sharedResources.CompleteEachTarget(),
+                static owningSource => ((JoinBlock<T1, T2>)owningSource)._sharedResources.CompleteEachTarget(),
                 onItemsRemoved);
 
             // Configure targets
@@ -92,7 +92,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((JoinBlock<T1, T2>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -101,7 +101,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, _source.Completion, state => ((JoinBlock<T1, T2>)state!)._sharedResources.CompleteEachTarget(), this);
+                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((JoinBlock<T1, T2>)state!)._sharedResources.CompleteEachTarget(), this);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -286,11 +286,11 @@ namespace System.Threading.Tasks.Dataflow
 
             // Initialize bounding state if necessary
             Action<ISourceBlock<Tuple<T1, T2, T3>>, int>? onItemsRemoved = null;
-            if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = (owningSource, count) => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.OnItemsRemoved(count);
+            if (dataflowBlockOptions.BoundedCapacity > 0) onItemsRemoved = static (owningSource, count) => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.OnItemsRemoved(count);
 
             // Configure the source
             _source = new SourceCore<Tuple<T1, T2, T3>>(this, dataflowBlockOptions,
-                owningSource => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.CompleteEachTarget(),
+                static owningSource => ((JoinBlock<T1, T2, T3>)owningSource)._sharedResources.CompleteEachTarget(),
                 onItemsRemoved);
 
             // Configure the targets
@@ -317,7 +317,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((JoinBlock<T1, T2, T3>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -326,7 +326,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, _source.Completion, state => ((JoinBlock<T1, T2, T3>)state!)._sharedResources.CompleteEachTarget(), this);
+                dataflowBlockOptions.CancellationToken, _source.Completion, static (state, _) => ((JoinBlock<T1, T2, T3>)state!)._sharedResources.CompleteEachTarget(), this);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -1279,7 +1279,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
 
             // Create task and store into _taskForInputProcessing prior to scheduling the task
             // so that _taskForInputProcessing will be visibly set in the task loop.
-            _taskForInputProcessing = new Task(thisSharedResources => ((JoinBlockTargetSharedResources)thisSharedResources!).ProcessMessagesLoopCore(), this,
+            _taskForInputProcessing = new Task(static thisSharedResources => ((JoinBlockTargetSharedResources)thisSharedResources!).ProcessMessagesLoopCore(), this,
                                                 Common.GetCreationOptionsForTask(isReplacementReplica));
 
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -1287,7 +1287,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
             {
                 etwLog.TaskLaunchedForMessageHandling(
                     _ownerJoin, _taskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
-                    _targets.Max(t => t.NumberOfMessagesAvailableOrPostponed));
+                    _targets.Max(static t => t.NumberOfMessagesAvailableOrPostponed));
             }
 
             // Start the task handling scheduling exceptions
@@ -1343,7 +1343,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
                     _decliningPermanently = true;
 
                     // Complete each target asynchronously so as not to invoke synchronous continuations under a lock
-                    Task.Factory.StartNew(state =>
+                    Task.Factory.StartNew(static state =>
                     {
                         var sharedResources = (JoinBlockTargetSharedResources)state!;
                         foreach (JoinBlockTargetBase target in sharedResources._targets) target.CompleteOncePossible();
index bdb9c44..da0d0da 100644 (file)
@@ -106,18 +106,18 @@ namespace System.Threading.Tasks.Dataflow
             // Initialize onItemsRemoved delegate if necessary
             Action<ISourceBlock<TOutput>, int>? onItemsRemoved = null;
             if (dataflowBlockOptions.BoundedCapacity > 0)
-                onItemsRemoved = (owningSource, count) => ((TransformBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
+                onItemsRemoved = static (owningSource, count) => ((TransformBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
 
             // Initialize source component.
             _source = new SourceCore<TOutput>(this, dataflowBlockOptions,
-                owningSource => ((TransformBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
+                static owningSource => ((TransformBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
                 onItemsRemoved);
 
             // If parallelism is employed, we will need to support reordering messages that complete out-of-order.
             // However, a developer can override this with EnsureOrdered == false.
             if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
             {
-                _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
+                _reorderingBuffer = new ReorderingBuffer<TOutput>(this, static (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
             }
 
             // Create the underlying target
@@ -140,7 +140,7 @@ namespace System.Threading.Tasks.Dataflow
             // As the target has completed, and as the target synchronously pushes work
             // through the reordering buffer when async processing completes,
             // we know for certain that no more messages will need to be sent to the source.
-            _target.Completion.ContinueWith((completed, state) =>
+            _target.Completion.ContinueWith(static (completed, state) =>
             {
                 var sourceCore = (SourceCore<TOutput>)state!;
                 if (completed.IsFaulted) sourceCore.AddAndUnwrapAggregateException(completed.Exception!);
@@ -151,7 +151,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            _source.Completion.ContinueWith((completed, state) =>
+            _source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((TransformBlock<TInput, TOutput>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -160,7 +160,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _target);
+                dataflowBlockOptions.CancellationToken, Completion, static (state, _) => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), _target);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -257,7 +257,7 @@ namespace System.Threading.Tasks.Dataflow
             }
 
             // Otherwise, join with the asynchronous operation when it completes.
-            task.ContinueWith((completed, state) =>
+            task.ContinueWith(static (completed, state) =>
             {
                 var tuple = (Tuple<TransformBlock<TInput, TOutput>, KeyValuePair<TInput, long>>)state!;
                 tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
index a60ae98..d667d9a 100644 (file)
@@ -40,7 +40,7 @@ namespace System.Threading.Tasks.Dataflow
 #if DEBUG
                 // Task returned from ProcessMessageAsync is explicitly ignored.
                 // That function handles all exceptions.
-                t.ContinueWith(t => Debug.Assert(t.IsCompletedSuccessfully), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
+                t.ContinueWith(static t => Debug.Assert(t.IsCompletedSuccessfully), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
 #endif
             }, dataflowBlockOptions, ref _source, ref _target, ref _reorderingBuffer, TargetCoreOptions.UsesAsyncCompletion);
         }
index 927e971..f84c6fb 100644 (file)
@@ -112,12 +112,12 @@ namespace System.Threading.Tasks.Dataflow
             Action<ISourceBlock<TOutput>, int>? onItemsRemoved = null;
             if (dataflowBlockOptions.BoundedCapacity > 0)
             {
-                onItemsRemoved = (owningSource, count) => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
+                onItemsRemoved = static (owningSource, count) => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
             }
 
             // Initialize source component
             source = new SourceCore<TOutput>(this, dataflowBlockOptions,
-                owningSource => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
+                static owningSource => ((TransformManyBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
                 onItemsRemoved);
 
             // If parallelism is employed, we will need to support reordering messages that complete out-of-order.
@@ -125,7 +125,7 @@ namespace System.Threading.Tasks.Dataflow
             if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
             {
                 reorderingBuffer = new ReorderingBuffer<IEnumerable<TOutput>>(
-                    this, (source, messages) => ((TransformManyBlock<TInput, TOutput>)source)._source.AddMessages(messages));
+                    this, static (source, messages) => ((TransformManyBlock<TInput, TOutput>)source)._source.AddMessages(messages));
             }
 
             // Create the underlying target and source
@@ -136,7 +136,7 @@ namespace System.Threading.Tasks.Dataflow
             // As the target has completed, and as the target synchronously pushes work
             // through the reordering buffer when async processing completes,
             // we know for certain that no more messages will need to be sent to the source.
-            target.Completion.ContinueWith((completed, state) =>
+            target.Completion.ContinueWith(static (completed, state) =>
             {
                 var sourceCore = (SourceCore<TOutput>)state!;
                 if (completed.IsFaulted) sourceCore.AddAndUnwrapAggregateException(completed.Exception!);
@@ -147,7 +147,7 @@ namespace System.Threading.Tasks.Dataflow
             // In those cases we need to fault the target half to drop its buffered messages and to release its
             // reservations. This should not create an infinite loop, because all our implementations are designed
             // to handle multiple completion requests and to carry over only one.
-            source.Completion.ContinueWith((completed, state) =>
+            source.Completion.ContinueWith(static (completed, state) =>
             {
                 var thisBlock = ((TransformManyBlock<TInput, TOutput>)state!) as IDataflowBlock;
                 Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
@@ -156,7 +156,7 @@ namespace System.Threading.Tasks.Dataflow
 
             // Handle async cancellation requests by declining on the target
             Common.WireCancellationToComplete(
-                dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), target);
+                dataflowBlockOptions.CancellationToken, Completion, static (state, _) => ((TargetCore<TInput>)state!).Complete(exception: null, dropPendingMessages: true), target);
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
             if (etwLog.IsEnabled())
             {
@@ -237,7 +237,7 @@ namespace System.Threading.Tasks.Dataflow
             // We got back a task.  Now wait for it to complete and store its results.
             // Unlike with TransformBlock and ActionBlock, We run the continuation on the user-provided
             // scheduler as we'll be running user code through enumerating the returned enumerable.
-            task.ContinueWith((completed, state) =>
+            task.ContinueWith(static (completed, state) =>
             {
                 var tuple = (Tuple<TransformManyBlock<TInput, TOutput>, KeyValuePair<TInput, long>>)state!;
                 tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
index 0b0d50d..ea9ddd6 100644 (file)
@@ -89,13 +89,13 @@ namespace System.Threading.Tasks.Dataflow
                     _completionReserved = _decliningPermanently = true;
 
                     // Cancel the completion task's TCS
-                    _lazyCompletionTaskSource.SetCanceled();
+                    _lazyCompletionTaskSource.TrySetCanceled(dataflowBlockOptions.CancellationToken);
                 }
                 else
                 {
                     // Handle async cancellation requests by declining on the target
                     Common.WireCancellationToComplete(
-                        dataflowBlockOptions.CancellationToken, _lazyCompletionTaskSource.Task, state => ((WriteOnceBlock<T>)state!).Complete(), this);
+                        dataflowBlockOptions.CancellationToken, _lazyCompletionTaskSource.Task, static (state, _) => ((WriteOnceBlock<T>)state!).Complete(), this);
                 }
             }
             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -120,7 +120,7 @@ namespace System.Threading.Tasks.Dataflow
             if (exceptions == null)
             {
                 // Offer the message to any linked targets and complete the block asynchronously to avoid blocking the caller
-                var taskForOutputProcessing = new Task(state => ((WriteOnceBlock<T>)state!).OfferToTargetsAndCompleteBlock(), this,
+                var taskForOutputProcessing = new Task(static state => ((WriteOnceBlock<T>)state!).OfferToTargetsAndCompleteBlock(), this,
                                                         Common.GetCreationOptionsForTask());
 
                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -137,7 +137,7 @@ namespace System.Threading.Tasks.Dataflow
             else
             {
                 // Complete the block asynchronously to avoid blocking the caller
-                Task.Factory.StartNew(state =>
+                Task.Factory.StartNew(static state =>
                 {
                     Tuple<WriteOnceBlock<T>, IList<Exception>> blockAndList = (Tuple<WriteOnceBlock<T>, IList<Exception>>)state!;
                     blockAndList.Item1.CompleteBlock(blockAndList.Item2);
@@ -182,7 +182,7 @@ namespace System.Threading.Tasks.Dataflow
             }
             else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
             {
-                CompletionTaskSource.TrySetCanceled();
+                CompletionTaskSource.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
             }
             else
             {
index 5ef1bd2..a64a8ef 100644 (file)
@@ -173,24 +173,35 @@ namespace System.Threading.Tasks.Dataflow.Internal
         /// <param name="completeAction">An action that will decline permanently on the state passed to it.</param>
         /// <param name="completeState">The block on which to decline permanently.</param>
         internal static void WireCancellationToComplete(
-            CancellationToken cancellationToken, Task completionTask, Action<object?> completeAction, object completeState)
+            CancellationToken cancellationToken, Task completionTask, Action<object?, CancellationToken> completeAction, object completeState)
         {
             Debug.Assert(completionTask != null, "A task to wire up for completion is needed.");
             Debug.Assert(completeAction != null, "An action to invoke upon cancellation is required.");
 
-            // If a cancellation request has already occurred, just invoke the declining action synchronously.
-            // CancellationToken would do this anyway but we can short-circuit it further and avoid a bunch of unnecessary checks.
             if (cancellationToken.IsCancellationRequested)
             {
-                completeAction(completeState);
+                // If a cancellation request has already occurred, just invoke the declining action synchronously.
+                // CancellationToken would do this anyway but we can short-circuit it further and avoid a bunch of unnecessary checks.
+                completeAction(completeState, cancellationToken);
             }
-            // Otherwise, if a cancellation request occurs, we want to prevent the block from accepting additional
-            // data, and we also want to dispose of that registration when we complete so that we don't
-            // leak into a long-living cancellation token.
             else if (cancellationToken.CanBeCanceled)
             {
-                CancellationTokenRegistration reg = cancellationToken.Register(completeAction, completeState);
-                completionTask.ContinueWith((completed, state) => ((CancellationTokenRegistration)state!).Dispose(),
+                // Otherwise, if a cancellation request occurs, we want to prevent the block from accepting additional
+                // data, and we also want to dispose of that registration when we complete so that we don't
+                // leak into a long-living cancellation token.
+                CancellationTokenRegistration reg = cancellationToken.Register(
+#if NET6_0_OR_GREATER
+                    completeAction, completeState
+#else
+                    state =>
+                    {
+                        var tuple = (Tuple<Action<object?, CancellationToken>, object, CancellationToken>)state!;
+                        tuple.Item1(tuple.Item2, tuple.Item3);
+                    },
+                    Tuple.Create(completeAction, completeState, cancellationToken)
+#endif
+                    );
+                completionTask.ContinueWith(static (completed, state) => ((CancellationTokenRegistration)state!).Dispose(),
                     reg, cancellationToken, Common.GetContinuationOptions(), TaskScheduler.Default);
             }
         }
@@ -289,7 +300,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
         internal static void ThrowAsync(Exception error)
         {
             ExceptionDispatchInfo edi = ExceptionDispatchInfo.Capture(error);
-            ThreadPool.QueueUserWorkItem(state => { ((ExceptionDispatchInfo)state!).Throw(); }, edi);
+            ThreadPool.QueueUserWorkItem(static state => { ((ExceptionDispatchInfo)state!).Throw(); }, edi);
         }
 
         /// <summary>Adds the exception to the list, first initializing the list if the list is null.</summary>
@@ -559,7 +570,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
         {
             Debug.Assert(sourceCompletionTask != null, "sourceCompletionTask may not be null.");
             Debug.Assert(target != null, "The target where completion is to be propagated may not be null.");
-            sourceCompletionTask.ContinueWith((task, state) => Common.PropagateCompletion(task, (IDataflowBlock)state!, AsyncExceptionHandler),
+            sourceCompletionTask.ContinueWith(static (task, state) => Common.PropagateCompletion(task, (IDataflowBlock)state!, AsyncExceptionHandler),
                 target, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
         }
 
@@ -582,7 +593,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
         private static class CachedGenericDelegates<T>
         {
             /// <summary>A function that returns the default value of T.</summary>
-            internal static readonly Func<T> DefaultTResultFunc = () => default(T)!;
+            internal static readonly Func<T> DefaultTResultFunc = static () => default(T)!;
             /// <summary>
             /// A function to use as the body of ActionOnDispose in CreateUnlinkerShim.
             /// Passed a tuple of the sync obj, the target registry, and the target block as the state parameter.
index f36786a..51560f2 100644 (file)
@@ -139,7 +139,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
 
                     if (completionTask.IsFaulted)
                     {
-                        try { exceptionData = string.Join(Environment.NewLine, completionTask.Exception!.InnerExceptions.Select(e => e.ToString())); }
+                        try { exceptionData = string.Join(Environment.NewLine, completionTask.Exception!.InnerExceptions.Select(static e => e.ToString())); }
                         catch { }
                     }
 
index 0da08f9..e029b93 100644 (file)
@@ -516,7 +516,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
                 // However, we know that _decliningPermanently has been set, and thus the timing of
                 // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously
                 // and take the necessary locks in a situation where we're sure it won't cause a problem.
-                Task.Factory.StartNew(state =>
+                Task.Factory.StartNew(static state =>
                 {
                     var thisSourceCore = (SourceCore<TOutput>)state!;
                     lock (thisSourceCore.OutgoingLock)
@@ -756,7 +756,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
             {
                 // Create task and store into _taskForOutputProcessing prior to scheduling the task
                 // so that _taskForOutputProcessing will be visibly set in the task loop.
-                _taskForOutputProcessing = new Task(thisSourceCore => ((SourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
+                _taskForOutputProcessing = new Task(static thisSourceCore => ((SourceCore<TOutput>)thisSourceCore!).OfferMessagesLoopCore(), this,
                                                      Common.GetCreationOptionsForTask(isReplacementReplica));
 
                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -779,7 +779,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
 
                     // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be.
                     // Re-take the locks on a separate thread.
-                    Task.Factory.StartNew(state =>
+                    Task.Factory.StartNew(static state =>
                     {
                         var thisSourceCore = (SourceCore<TOutput>)state!;
                         lock (thisSourceCore.OutgoingLock)
@@ -922,7 +922,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
                 // Get out from under currently held locks.  This is to avoid
                 // invoking synchronous continuations off of _completionTask.Task
                 // while holding a lock.
-                Task.Factory.StartNew(state => ((SourceCore<TOutput>)state!).CompleteBlockOncePossible(),
+                Task.Factory.StartNew(static state => ((SourceCore<TOutput>)state!).CompleteBlockOncePossible(),
                     this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
             }
         }
@@ -962,7 +962,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
             // If it's due to cancellation, finish in a canceled state
             else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
             {
-                _completionTask.TrySetCanceled();
+                _completionTask.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
             }
             // Otherwise, finish in a successful state.
             else
index d610978..a9107ee 100644 (file)
@@ -164,7 +164,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
             {
                 // Create a new consumption task and try to set it as current as long as there's still no other task
                 var newConsumer = new Task(
-                    state => ((SpscTargetCore<TInput>)state!).ProcessMessagesLoopCore(),
+                    static state => ((SpscTargetCore<TInput>)state!).ProcessMessagesLoopCore(),
                     this, CancellationToken.None, Common.GetCreationOptionsForTask(isReplica));
                 if (Interlocked.CompareExchange(ref _activeConsumer, newConsumer, null) == null)
                 {
@@ -309,7 +309,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
             // by the producer and consumer, a producer calling Fault and the
             // processing task processing the user delegate which might throw.
 #pragma warning disable 0420
-            lock (LazyInitializer.EnsureInitialized(ref _exceptions, () => new List<Exception>()))
+            lock (LazyInitializer.EnsureInitialized(ref _exceptions, static () => new List<Exception>()))
 #pragma warning restore 0420
             {
                 _exceptions.Add(exception);
@@ -357,7 +357,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
         /// <summary>Gets the lazily-initialized completion source.</summary>
         private TaskCompletionSource<VoidResult> CompletionSource
         {
-            get { return LazyInitializer.EnsureInitialized(ref _completionTask, () => new TaskCompletionSource<VoidResult>()); }
+            get { return LazyInitializer.EnsureInitialized(ref _completionTask, static () => new TaskCompletionSource<VoidResult>()); }
         }
 
         /// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
index 393c9ca..c48b0a8 100644 (file)
@@ -369,7 +369,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
                 _numberOfOutstandingOperations++;
                 if (UsesAsyncCompletion) _numberOfOutstandingServiceTasks++;
 
-                var taskForInputProcessing = new Task(thisTargetCore => ((TargetCore<TInput>)thisTargetCore!).ProcessMessagesLoopCore(), this,
+                var taskForInputProcessing = new Task(static thisTargetCore => ((TargetCore<TInput>)thisTargetCore!).ProcessMessagesLoopCore(), this,
                                                       Common.GetCreationOptionsForTask(repeat));
 
                 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
@@ -732,7 +732,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
                 // Get out from under currently held locks.  This is to avoid
                 // invoking synchronous continuations off of _completionSource.Task
                 // while holding a lock.
-                Task.Factory.StartNew(state => ((TargetCore<TInput>)state!).CompleteBlockOncePossible(),
+                Task.Factory.StartNew(static state => ((TargetCore<TInput>)state!).CompleteBlockOncePossible(),
                     this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
             }
         }
@@ -773,7 +773,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
             // If we completed with cancellation, finish in a canceled state
             else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
             {
-                _completionSource.TrySetCanceled();
+                _completionSource.TrySetCanceled(_dataflowBlockOptions.CancellationToken);
             }
             // Otherwise, finish in a successful state.
             else
@@ -843,7 +843,7 @@ namespace System.Threading.Tasks.Dataflow.Internal
             /// <summary>Gets the number of messages waiting to be processed.</summary>
             internal int InputCount { get { return _target._messages.Count; } }
             /// <summary>Gets the messages waiting to be processed.</summary>
-            internal IEnumerable<TInput> InputQueue { get { return _target._messages.Select(kvp => kvp.Key).ToList(); } }
+            internal IEnumerable<TInput> InputQueue { get { return _target._messages.Select(static kvp => kvp.Key).ToList(); } }
 
             /// <summary>Gets any postponed messages.</summary>
             internal QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader>? PostponedMessages
index 55f7c4f..4b7dea5 100644 (file)
@@ -332,7 +332,10 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecanceledToken()
         {
-            var options = new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) };
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
+            var options = new ExecutionDataflowBlockOptions { CancellationToken = cts.Token };
             var blocks = new []
             {
                 new ActionBlock<int>(i => { }, options),
@@ -348,7 +351,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 ab.Complete();
                 ((IDataflowBlock)ab).Fault(new Exception());
 
-                await Assert.ThrowsAnyAsync<OperationCanceledException>(() => ab.Completion);
+                await AssertExtensions.CanceledAsync(cts.Token, ab.Completion);
             }
         }
 
index 5727ff6..a4a47d2 100644 (file)
@@ -486,8 +486,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecancellation()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var b = new BatchBlock<int>(42, new GroupingDataflowBlockOptions {
-                CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1
+                CancellationToken = cts.Token, MaxNumberOfGroups = 1
             });
 
             Assert.Equal(expected: 42, actual: b.BatchSize);
@@ -504,7 +507,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.NotNull(b.Completion);
             b.Complete(); // verify doesn't throw
 
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
         }
 
         [Fact]
@@ -527,7 +530,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 else
                 {
                     cts.Cancel();
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+                    await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
                 }
 
                 Assert.Equal(expected: 0, actual: bb.OutputCount);
index c01b7fe..64ab5de 100644 (file)
@@ -276,8 +276,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecanceled2()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var b = new BatchedJoinBlock<int, int>(42,
-                new GroupingDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1 });
+                new GroupingDataflowBlockOptions { CancellationToken = cts.Token, MaxNumberOfGroups = 1 });
 
             Tuple<IList<int>, IList<int>> ignoredValue;
             IList<Tuple<IList<int>, IList<int>>> ignoredValues;
@@ -300,14 +303,17 @@ namespace System.Threading.Tasks.Dataflow.Tests
             b.Target1.Complete();
             b.Target2.Complete();
 
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
         }
 
         [Fact]
         public async Task TestPrecanceled3()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var b = new BatchedJoinBlock<int, int, int>(42,
-                new GroupingDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1 });
+                new GroupingDataflowBlockOptions { CancellationToken = cts.Token, MaxNumberOfGroups = 1 });
 
             Tuple<IList<int>, IList<int>, IList<int>> ignoredValue;
             IList<Tuple<IList<int>, IList<int>, IList<int>>> ignoredValues;
@@ -330,7 +336,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             b.Target1.Complete();
             b.Target2.Complete();
 
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
         }
 
         [Fact]
index 7d4c2fd..7be2ea4 100644 (file)
@@ -209,7 +209,10 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecancellation()
         {
-            var b = new BroadcastBlock<int>(null, new DataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
+            var b = new BroadcastBlock<int>(null, new DataflowBlockOptions { CancellationToken = cts.Token });
 
             Assert.NotNull(b.LinkTo(DataflowBlock.NullTarget<int>()));
             Assert.False(b.Post(42));
@@ -223,7 +226,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.NotNull(b.Completion);
             b.Complete(); // verify doesn't throw
 
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
         }
 
         [Fact]
@@ -270,7 +273,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 else
                 {
                     cts.Cancel();
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+                    await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
                 }
 
                 await Task.WhenAll(sends);
index c3d6d9b..3b107b4 100644 (file)
@@ -341,7 +341,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             var buffer = new BufferBlock<int>(new DataflowBlockOptions() { CancellationToken = cts.Token });
             buffer.Post(1);
             cts.Cancel();
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => buffer.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, buffer.Completion);
             Assert.Equal(expected: 0, actual: buffer.Count);
 
             cts = new CancellationTokenSource();
@@ -419,8 +419,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecanceled()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var bb = new BufferBlock<int>(
-                new DataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+                new DataflowBlockOptions { CancellationToken = cts.Token });
 
             int ignoredValue;
             IList<int> ignoredValues;
@@ -438,8 +441,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.False(bb.TryReceiveAll(out ignoredValues));
             Assert.False(bb.TryReceive(out ignoredValue));
 
-            Assert.NotNull(bb.Completion);
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
             bb.Complete(); // just make sure it doesn't throw
         }
 
@@ -465,7 +467,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 else
                 {
                     cts.Cancel();
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+                    await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
                 }
 
                 await Task.WhenAll(sends);
index 2990371..b8f589f 100644 (file)
@@ -239,8 +239,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             ValueTask<bool> vt = e.MoveNextAsync();
             Assert.True(vt.IsCompleted);
             Assert.False(vt.IsCompletedSuccessfully);
-            OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
-            Assert.Equal(cts.Token, oce.CancellationToken);
+            await AssertExtensions.CanceledAsync(cts.Token, vt.AsTask());
         }
 
         [Fact]
@@ -254,7 +253,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.False(vt.IsCompleted);
 
             cts.Cancel();
-            OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
+            await AssertExtensions.CanceledAsync(cts.Token, vt.AsTask());
 
             vt = e.MoveNextAsync();
             Assert.True(vt.IsCompletedSuccessfully);
index 7b9e79c..c5f3d60 100644 (file)
@@ -5,6 +5,7 @@ using System.Collections.Generic;
 using System.Linq;
 using System.Runtime.CompilerServices;
 using Xunit;
+using Xunit.Sdk;
 
 namespace System.Threading.Tasks.Dataflow.Tests
 {
@@ -783,7 +784,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             t = bb.SendAsync(3, cts.Token);
             Assert.False(t.IsCompleted);
             cts.Cancel();
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+            await AssertExtensions.CanceledAsync(cts.Token, t);
 
             Assert.Equal(expected: 2, actual: await bb.ReceiveAsync());
             bb.Complete();
@@ -854,7 +855,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
 
                 if (withCancellation)
                 {
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => target.SendAsync(42, cts.Token));
+                    await AssertExtensions.CanceledAsync(cts.Token, () => target.SendAsync(42, cts.Token));
                 }
                 else
                 {
@@ -927,7 +928,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 }
             };
             Task<bool> send = target.SendAsync(42, cts.Token);
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => send);
+            await AssertExtensions.CanceledAsync(cts.Token, send);
         }
 
         [Fact]
@@ -1033,8 +1034,12 @@ namespace System.Threading.Tasks.Dataflow.Tests
             var bb = new BufferBlock<int>();
 
             // Cancel before Receive/ReceiveAsync
-            Assert.ThrowsAny<OperationCanceledException>(() => bb.Receive(new CancellationToken(canceled: true)));
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.ReceiveAsync(new CancellationToken(canceled: true)));
+            {
+                var cts = new CancellationTokenSource();
+                cts.Cancel();
+                AssertExtensions.Canceled(cts.Token, () => bb.Receive(cts.Token));
+                await AssertExtensions.CanceledAsync(cts.Token, bb.ReceiveAsync(cts.Token));
+            }
 
             // Cancel after Receive/ReceiveAsync but before data
             {
@@ -1046,7 +1051,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 var cts = new CancellationTokenSource();
                 var t = bb.ReceiveAsync(cts.Token);
                 cts.Cancel();
-                await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.ReceiveAsync(cts.Token));
+                await AssertExtensions.CanceledAsync(cts.Token, bb.ReceiveAsync(cts.Token));
             }
 
             // Cancel after data received
@@ -1234,7 +1239,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 TaskScheduler usedScheduler = null, requestedScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
                 var cts = new CancellationTokenSource();
 
-                var t = chooseTestCase < 3 ?
+                var t = chooseTestCase < 2 ?
                     DataflowBlock.Choose(
                         source1, i => intValue = i,
                         source2, s => stringValue = s) :
@@ -1261,16 +1266,16 @@ namespace System.Threading.Tasks.Dataflow.Tests
                         Assert.Equal(expected: 1, actual: source2.Count);
                         break;
 
+                    // >= 2 TEST USING DATAFLOW BLOCK OPTIONS
+
                     case 2: // Test no data on either source
                         source1.Complete();
                         source2.Complete();
-                        await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+                        await AssertExtensions.CanceledAsync(cts.Token, t);
                         Assert.Equal(expected: 0, actual: intValue);
                         Assert.Null(stringValue);
                         break;
 
-                    // >= 3 TEST USING DATAFLOW BLOCK OPTIONS
-
                     case 3: // Test correct TaskScheduler is used
                         source1.Post(42);
                         await t;
@@ -1281,7 +1286,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                         cts.Cancel();
                         source1.Post(42);
                         source2.Post("43");
-                        await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+                        await AssertExtensions.CanceledAsync(cts.Token, t);
                         Assert.Equal(expected: 1, actual: source1.Count);
                         Assert.Equal(expected: 1, actual: source2.Count);
                         break;
@@ -1443,7 +1448,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
 
                 TaskScheduler usedScheduler = null, requestedScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler;
                 var cts = new CancellationTokenSource();
-                var t = chooseTestCase < 7 ?
+                var t = chooseTestCase < 6 ?
                     DataflowBlock.Choose(
                         source1, i => intValue = i,
                         source2, s => stringValue = s,
@@ -1514,18 +1519,18 @@ namespace System.Threading.Tasks.Dataflow.Tests
 
                         break;
 
+                    // >= 6 TEST USING DATAFLOW BLOCK OPTIONS
+
                     case 6: // Test all sources complete
                         source1.Complete();
                         source2.Complete();
                         source3.Complete();
-                        await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+                        await AssertExtensions.CanceledAsync(cts.Token, t);
                         Assert.Equal(expected: 0, actual: intValue);
                         Assert.Null(stringValue);
                         Assert.Equal(expected: 0, actual: doubleValue);
                         break;
 
-                    // >= 7 TEST USING DATAFLOW BLOCK OPTIONS
-
                     case 7: // Test correct TaskScheduler is used
                         source3.Post(42);
                         await t;
@@ -1537,7 +1542,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                         source1.Post(42);
                         source2.Post("43");
                         source3.Post(44.0);
-                        await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+                        await AssertExtensions.CanceledAsync(cts.Token, t);
                         Assert.Equal(expected: 1, actual: source1.Count);
                         Assert.Equal(expected: 1, actual: source2.Count);
                         Assert.Equal(expected: 1, actual: source3.Count);
@@ -1625,7 +1630,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                     if (!cancelBeforeChoose)
                         cts.Cancel();
 
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => choose);
+                    await AssertExtensions.CanceledAsync(cts.Token, choose);
 
                     int expectedLinkCount = cancelBeforeChoose ? 0 : 1;
                     Assert.All(linkCounts, i => Assert.Equal(expected: expectedLinkCount, actual: i));
@@ -1949,7 +1954,9 @@ namespace System.Threading.Tasks.Dataflow.Tests
             var t = buffer.OutputAvailableAsync(cts.Token);
             Assert.False(t.IsCompleted);
             cts.Cancel();
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => t);
+            await AssertExtensions.CanceledAsync(
+                PlatformDetection.IsNetFramework ? default : cts.Token, // token doesn't currently flow in netstandard2.0 build due to lack of necessary API
+                t);
 
             cts = new CancellationTokenSource();
             t = buffer.OutputAvailableAsync(cts.Token);
index 0e82c13..3dbabb2 100644 (file)
@@ -249,6 +249,5 @@ namespace System.Threading.Tasks.Dataflow.Tests
             }
             return DataflowBlock.Encapsulate(transforms[0], transforms[transforms.Length - 1]);
         }
-
     }
 }
index 463d351..9d1907d 100644 (file)
@@ -196,8 +196,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecancellation2()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var b = new JoinBlock<int, int>(new GroupingDataflowBlockOptions {
-                CancellationToken = new CancellationToken(canceled: true), MaxNumberOfGroups = 1
+                CancellationToken = cts.Token, MaxNumberOfGroups = 1
             });
 
             Assert.NotNull(b.LinkTo(DataflowBlock.NullTarget<Tuple<int, int>>()));
@@ -219,15 +222,18 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.NotNull(b.Completion);
             b.Complete();
 
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
         }
 
         [Fact]
         public async Task TestPrecancellation3()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var b = new JoinBlock<int, int, int>(new GroupingDataflowBlockOptions
             {
-                CancellationToken = new CancellationToken(canceled: true),
+                CancellationToken = cts.Token,
                 MaxNumberOfGroups = 1
             });
 
@@ -254,7 +260,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.NotNull(b.Completion);
             b.Complete();
 
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => b.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, b.Completion);
         }
 
         [Fact]
@@ -424,7 +430,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             cts.Cancel();
             foreach (Task<bool> send in sends)
             {
-                await Assert.ThrowsAnyAsync<OperationCanceledException>(() => send);
+                await AssertExtensions.CanceledAsync(cts.Token, send);
             }
 
             joinBlock.Target2.Post(1);
index a294870..f65cbde 100644 (file)
@@ -12,6 +12,9 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestCtor()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var blocks = new[] {
                 new TransformBlock<int, string>(i => i.ToString()),
                 new TransformBlock<int, string>(i => i.ToString(), new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 }),
@@ -26,15 +29,15 @@ namespace System.Threading.Tasks.Dataflow.Tests
 
             blocks = new[] {
                 new TransformBlock<int, string>(i => i.ToString(),
-                    new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) }),
+                    new ExecutionDataflowBlockOptions { CancellationToken = cts.Token }),
                 new TransformBlock<int, string>(i => Task.Run(() => i.ToString()),
-                    new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) })
+                    new ExecutionDataflowBlockOptions { CancellationToken = cts.Token })
             };
             foreach (var block in blocks)
             {
                 Assert.Equal(expected: 0, actual: block.InputCount);
                 Assert.Equal(expected: 0, actual: block.OutputCount);
-                await Assert.ThrowsAnyAsync<OperationCanceledException>(() => block.Completion);
+                await AssertExtensions.CanceledAsync(cts.Token, block.Completion);
             }
         }
 
@@ -269,7 +272,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             var tb = new TransformBlock<int, int>(i => i, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
             tb.Post(1);
             cts.Cancel();
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
             Assert.Equal(expected: 0, actual: tb.InputCount);
             Assert.Equal(expected: 0, actual: tb.OutputCount);
 
@@ -388,8 +391,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecanceled()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var bb = new TransformBlock<int, int>(i => i,
-                new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+                new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
 
             int ignoredValue;
             IList<int> ignoredValues;
@@ -407,7 +413,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.False(bb.TryReceive(out ignoredValue));
 
             Assert.NotNull(bb.Completion);
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
             bb.Complete(); // just make sure it doesn't throw
         }
 
@@ -452,7 +458,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 else
                 {
                     cts.Cancel();
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+                    await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
                 }
 
                 Assert.Equal(expected: 0, actual: tb.InputCount);
index 5fcc146..eb73a6a 100644 (file)
@@ -14,6 +14,9 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestCtorAsyncEnumerable()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var blocks = new[] {
                 new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable),
                 new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 })
@@ -27,11 +30,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
 
             var canceledBlock = new TransformManyBlock<int, int>(
                 DataflowTestHelpers.ToAsyncEnumerable,
-                new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) });
+                new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
 
             Assert.Equal(expected: 0, actual: canceledBlock.InputCount);
             Assert.Equal(expected: 0, actual: canceledBlock.OutputCount);
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => canceledBlock.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, canceledBlock.Completion);
         }
 
         [Fact]
@@ -290,7 +293,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             var tb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
             tb.Post(1);
             cts.Cancel();
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
             Assert.Equal(expected: 0, actual: tb.InputCount);
             Assert.Equal(expected: 0, actual: tb.OutputCount);
 
@@ -398,8 +401,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecanceledAsyncEnumerable()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var bb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToAsyncEnumerable,
-                new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+                new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
 
             IDisposable link = bb.LinkTo(DataflowBlock.NullTarget<int>());
             Assert.NotNull(link);
@@ -414,7 +420,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.False(bb.TryReceive(out _));
 
             Assert.NotNull(bb.Completion);
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
             bb.Complete(); // just make sure it doesn't throw
         }
 
@@ -466,7 +472,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 else
                 {
                     cts.Cancel();
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+                    await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
                 }
 
                 Assert.Equal(expected: 0, actual: tb.InputCount);
index 8ecc9c5..0e02327 100644 (file)
@@ -13,6 +13,9 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestCtor()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var blocks = new[] {
                 new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable),
                 new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable, new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 }),
@@ -27,15 +30,15 @@ namespace System.Threading.Tasks.Dataflow.Tests
 
             blocks = new[] {
                 new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable,
-                    new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) }),
+                    new ExecutionDataflowBlockOptions { CancellationToken = cts.Token }),
                 new TransformManyBlock<int, int>(i => Task.Run(() => DataflowTestHelpers.ToEnumerable(i)),
-                    new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(true) })
+                    new ExecutionDataflowBlockOptions { CancellationToken = cts.Token })
             };
             foreach (var block in blocks)
             {
                 Assert.Equal(expected: 0, actual: block.InputCount);
                 Assert.Equal(expected: 0, actual: block.OutputCount);
-                await Assert.ThrowsAnyAsync<OperationCanceledException>(() => block.Completion);
+                await AssertExtensions.CanceledAsync(cts.Token, block.Completion);
             }
         }
 
@@ -297,7 +300,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             var tb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
             tb.Post(1);
             cts.Cancel();
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
             Assert.Equal(expected: 0, actual: tb.InputCount);
             Assert.Equal(expected: 0, actual: tb.OutputCount);
 
@@ -417,8 +420,11 @@ namespace System.Threading.Tasks.Dataflow.Tests
         [Fact]
         public async Task TestPrecanceled()
         {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
             var bb = new TransformManyBlock<int, int>(DataflowTestHelpers.ToEnumerable,
-                new ExecutionDataflowBlockOptions { CancellationToken = new CancellationToken(canceled: true) });
+                new ExecutionDataflowBlockOptions { CancellationToken = cts.Token });
 
             int ignoredValue;
             IList<int> ignoredValues;
@@ -436,7 +442,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
             Assert.False(bb.TryReceive(out ignoredValue));
 
             Assert.NotNull(bb.Completion);
-            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => bb.Completion);
+            await AssertExtensions.CanceledAsync(cts.Token, bb.Completion);
             bb.Complete(); // just make sure it doesn't throw
         }
 
@@ -496,7 +502,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 else
                 {
                     cts.Cancel();
-                    await Assert.ThrowsAnyAsync<OperationCanceledException>(() => tb.Completion);
+                    await AssertExtensions.CanceledAsync(cts.Token, tb.Completion);
                 }
 
                 Assert.Equal(expected: 0, actual: tb.InputCount);
index 39a23cd..d6ff711 100644 (file)
@@ -250,7 +250,7 @@ namespace System.Threading.Tasks.Dataflow.Tests
                 Assert.False(((IReceivableSourceBlock<int>)wob).TryReceiveAll(out ignoredValues));
                 Assert.NotNull(wob.Completion);
 
-                await Assert.ThrowsAnyAsync<OperationCanceledException>(() => wob.Completion);
+                await AssertExtensions.CanceledAsync(cts.Token, wob.Completion);
             }
         }
 
index 661df3a..9f02443 100644 (file)
@@ -45,8 +45,7 @@ namespace System.Threading.Tasks.Tests
             void AssertCanceled(Task t)
             {
                 Assert.True(t.IsCanceled);
-                var oce = Assert.ThrowsAny<OperationCanceledException>(() => t.GetAwaiter().GetResult());
-                Assert.Equal(cts.Token, oce.CancellationToken);
+                AssertExtensions.CanceledAsync(cts.Token, t).GetAwaiter().GetResult();
             }
 
             Func<int, CancellationToken, ValueTask> body = (item, cancellationToken) =>