Fix handling of postponed messages in TargetCore (#81011)
authorStephen Toub <stoub@microsoft.com>
Mon, 6 Feb 2023 16:57:06 +0000 (11:57 -0500)
committerGitHub <noreply@github.com>
Mon, 6 Feb 2023 16:57:06 +0000 (11:57 -0500)
commit71b4b91e9c440501e0394a4b281d036a74ab9631
treed685b1386e7da25f474710908bdb626326b99057
parent7941c5459d90f021f1b128f6debc0e7814856e85
Fix handling of postponed messages in TargetCore (#81011)

The TargetCore type represents the bulk of the logic used in dataflow blocks for receiving messages pushed to it.  If a block is bounded and the block reaches its bound, subsequent pushes to the block may be postponed, and then when room is available, that TargetCore implementation needs to try to consume messages it previously postponed.  That's handled by the TryConsumePostponedMessage method.

Inside this method, it pops an element from its postponed messages queue.  The first time it does so, it does two or three things, depending on the reason the method is being called.  It optimistically increments the bounding count, meaning the number of items it claims to hold, under the assumption it's going to successfully consume a previously postponed message and thus have one more message to count against the established bound.  It also optimistically assigns the next available message ID, again assuming it'll successfully consume one.  And if the method is being called in order to transfer a postponed message into its input queue (as opposed to processing it immediately), it also tracks that a transfer is in process (in order to then prevent other incoming messages from concurrently going straight into the input messages and violating ordering).

Those optimistically changed values need to be undone if the assumption fails.  If, for example, we fail to consume any message, we need to decrement the bounding count and note that a transfer is no longer in progress.  We also need to burn the message ID that was assigned, in particular if there's a reordering buffer: that reordering buffer may have been waiting for that particular ID in order to release messages it's already stored with higher IDs, and it needs to be told that ID will never arrive.

We had two bugs here:
1. When we decremented the outstanding transfers counter, we were doing so without holding the appropriate lock.  That could lead to, for example, losing one of the decrements and ending up in a situation where all future messages are forced to be postponed.
2. We were only correctly tracking transfers for the first postponed message we'd try to consume.  If there were multiple, we'd end up tracking the first but then not tracking subsequent ones in that batch, leading to other messages arriving concurrently potentially ending up out of order.
3. When this method was being called not for postponement, we might end up not burning the message ID with the reordering buffer.  That would result in the reordering buffer not releasing any more messages, and could do so without incurring any exceptions or other indication of data loss.
src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs
src/libraries/System.Threading.Tasks.Dataflow/tests/Dataflow/ConcurrentTests.cs