From 71b4b91e9c440501e0394a4b281d036a74ab9631 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Mon, 6 Feb 2023 11:57:06 -0500 Subject: [PATCH] Fix handling of postponed messages in TargetCore (#81011) The TargetCore type represents the bulk of the logic used in dataflow blocks for receiving messages pushed to it. If a block is bounded and the block reaches its bound, subsequent pushes to the block may be postponed, and then when room is available, that TargetCore implementation needs to try to consume messages it previously postponed. That's handled by the TryConsumePostponedMessage method. Inside this method, it pops an element from its postponed messages queue. The first time it does so, it does two or three things, depending on the reason the method is being called. It optimistically increments the bounding count, meaning the number of items it claims to hold, under the assumption it's going to successfully consume a previously postponed message and thus have one more message to count against the established bound. It also optimistically assigns the next available message ID, again assuming it'll successfully consume one. And if the method is being called in order to transfer a postponed message into its input queue (as opposed to processing it immediately), it also tracks that a transfer is in process (in order to then prevent other incoming messages from concurrently going straight into the input messages and violating ordering). Those optimistically changed values need to be undone if the assumption fails. If, for example, we fail to consume any message, we need to decrement the bounding count and note that a transfer is no longer in progress. We also need to burn the message ID that was assigned, in particular if there's a reordering buffer: that reordering buffer may have been waiting for that particular ID in order to release messages it's already stored with higher IDs, and it needs to be told that ID will never arrive. We had two bugs here: 1. When we decremented the outstanding transfers counter, we were doing so without holding the appropriate lock. That could lead to, for example, losing one of the decrements and ending up in a situation where all future messages are forced to be postponed. 2. We were only correctly tracking transfers for the first postponed message we'd try to consume. If there were multiple, we'd end up tracking the first but then not tracking subsequent ones in that batch, leading to other messages arriving concurrently potentially ending up out of order. 3. When this method was being called not for postponement, we might end up not burning the message ID with the reordering buffer. That would result in the reordering buffer not releasing any more messages, and could do so without incurring any exceptions or other indication of data loss. --- .../src/Internal/TargetCore.cs | 54 +++++++++++++--------- .../tests/Dataflow/ConcurrentTests.cs | 41 ++++++++++++++++ 2 files changed, 73 insertions(+), 22 deletions(-) diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs index c48b0a8..0465688 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs @@ -618,7 +618,7 @@ namespace System.Threading.Tasks.Dataflow.Internal Common.ContractAssertMonitorStatus(IncomingLock, held: false); // Iterate until we either consume a message successfully or there are no more postponed messages. - bool countIncrementedExpectingToGetItem = false; + bool stateOptimisticallyUpdatedForConsumedMessage = false; long messageId = Common.INVALID_REORDERING_ID; while (true) { @@ -632,22 +632,30 @@ namespace System.Threading.Tasks.Dataflow.Internal // In particular, the input queue may have been filled up and messages may have // gotten postponed. If we process such a postponed message, we would mess up the // order. Therefore, we have to double-check the input queue first. - if (!forPostponementTransfer && _messages.TryDequeue(out result)) return true; + if (!forPostponementTransfer && _messages.TryDequeue(out result)) + { + // We got a message. If on a previous iteration of this loop we allocated a + // message ID, we need to inform the reordering buffer (if there is one) that + // the message ID will never used (since the message we got already has its + // own ID assigned). + if (stateOptimisticallyUpdatedForConsumedMessage) + { + _reorderingBuffer?.IgnoreItem(messageId); + } + + return true; + } // We can consume a message to process if there's one to process and also if // if we have logical room within our bound for the message. if (!_boundingState!.CountIsLessThanBound || !_boundingState.PostponedMessages.TryPop(out element)) { - if (countIncrementedExpectingToGetItem) - { - countIncrementedExpectingToGetItem = false; - _boundingState.CurrentCount -= 1; - } break; } - if (!countIncrementedExpectingToGetItem) + + if (!stateOptimisticallyUpdatedForConsumedMessage) { - countIncrementedExpectingToGetItem = true; + stateOptimisticallyUpdatedForConsumedMessage = true; messageId = _nextAvailableInputMessageId.Value++; // optimistically assign an ID Debug.Assert(messageId != Common.INVALID_REORDERING_ID, "The assigned message ID is invalid."); _boundingState.CurrentCount += 1; // optimistically take bounding space @@ -666,24 +674,26 @@ namespace System.Threading.Tasks.Dataflow.Internal result = new KeyValuePair(consumedValue!, messageId); return true; } - else + } + + if (stateOptimisticallyUpdatedForConsumedMessage) + { + // If we optimistically increased the bounding count, allocated a message ID, + // and noted an outstanding transfer, we need to undo those state changes, now + // that we've failed to consume any message. + + _reorderingBuffer?.IgnoreItem(messageId); + + if (forPostponementTransfer) { - if (forPostponementTransfer) + lock (IncomingLock) { - // We didn't consume message so we need to decrement because we haven't consumed the element. - _boundingState.OutstandingTransfers--; + _boundingState!.OutstandingTransfers--; } } - } - // We optimistically acquired a message ID for a message that, in the end, we never got. - // So, we need to let the reordering buffer (if one exists) know that it should not - // expect an item with this ID. Otherwise, it would stall forever. - if (_reorderingBuffer != null && messageId != Common.INVALID_REORDERING_ID) _reorderingBuffer.IgnoreItem(messageId); - - // Similarly, we optimistically increased the bounding count, expecting to get another message in. - // Since we didn't, we need to fix the bounding count back to what it should have been. - if (countIncrementedExpectingToGetItem) ChangeBoundingCount(-1); + ChangeBoundingCount(-1); + } // Inform the caller that no message could be consumed. result = default(KeyValuePair); diff --git a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/ConcurrentTests.cs b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/ConcurrentTests.cs index bfa70a2..3e3675e 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/ConcurrentTests.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/ConcurrentTests.cs @@ -11,6 +11,47 @@ namespace System.Threading.Tasks.Dataflow.Tests { public class ConcurrentTests { + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [OuterLoop] + public async Task StressTargetCorePostponement() + { + for (int trial = 0; trial < 1000; trial++) + { + var bufferBlock = new BufferBlock(); + + var transformOptions = new ExecutionDataflowBlockOptions() + { + BoundedCapacity = 10, + MaxDegreeOfParallelism = 2, + }; + var transformBlocks = new TransformBlock[] + { + new TransformBlock(x => x, transformOptions), + new TransformBlock(x => x, transformOptions) + }; + + int done = 0; + var actionBlock = new ActionBlock(_ => Interlocked.Increment(ref done)); + + foreach (TransformBlock transformBlock in transformBlocks) + { + bufferBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true }); + transformBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = false }); + } + _ = Task.Factory.ContinueWhenAll(transformBlocks.Select(b => b.Completion).ToArray(), _ => actionBlock.Complete()); + + const int ItemCount = 40; + for (int item = 0; item < ItemCount; item++) + { + await bufferBlock.SendAsync(item); + } + bufferBlock.Complete(); + await actionBlock.Completion; + + Assert.Equal(ItemCount, done); + } + } + static readonly int s_dop = Environment.ProcessorCount * 2; const int IterationCount = 10000; -- 2.7.4