From: Ivan Beňovic Date: Wed, 31 Mar 2021 11:13:59 +0000 (+0200) Subject: Allow bounded channel to be created with drop delegate (#50331) X-Git-Tag: submit/tizen/20210909.063632~2313 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=b47094da840f8b775fbadea5c5739022a951ac84;p=platform%2Fupstream%2Fdotnet%2Fruntime.git Allow bounded channel to be created with drop delegate (#50331) * Bounded channel can be created with drop delegate. - Add additional CreateBounded overload with delegate parameter that will be called when item is being dropped from channel - Added unit tests * Fix typo in comment. * Apply suggestions from code review Co-authored-by: Stephen Toub * Call drop delegate outside of lock statement. * Use overload of CreateBounded method instead of calling ctor directly. * Code review suggestions refactor. * Move Monitor.Enter before try and use local scoped parent variable everywhere. * Drop delegate should not be called while sync lock is held. Enqueuing of new item should be done while sync lock is being held. Added additional tests. * Rerun gitlab CI. * Do not run deadlock test for bounded channels if platform do not support threading. * Apply suggestions from code review Co-authored-by: Ivan Benovic Co-authored-by: Stephen Toub --- diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs index 6422594..8c09368 100644 --- a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs @@ -23,6 +23,7 @@ namespace System.Threading.Channels { public static System.Threading.Channels.Channel CreateBounded(int capacity) { throw null; } public static System.Threading.Channels.Channel CreateBounded(System.Threading.Channels.BoundedChannelOptions options) { throw null; } + public static System.Threading.Channels.Channel CreateBounded(BoundedChannelOptions options, Action? itemDropped) { throw null; } public static System.Threading.Channels.Channel CreateUnbounded() { throw null; } public static System.Threading.Channels.Channel CreateUnbounded(System.Threading.Channels.UnboundedChannelOptions options) { throw null; } } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs index c8e0746..bc7b18fd 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs @@ -15,6 +15,8 @@ namespace System.Threading.Channels { /// The mode used when the channel hits its bound. private readonly BoundedChannelFullMode _mode; + /// The delegate that will be invoked when the channel hits its bound and an item is dropped from the channel. + private readonly Action? _itemDropped; /// Task signaled when the channel has completed. private readonly TaskCompletionSource _completion; /// The maximum capacity of the channel. @@ -40,12 +42,14 @@ namespace System.Threading.Channels /// The positive bounded capacity for the channel. /// The mode used when writing to a full channel. /// Whether to force continuations to be executed asynchronously. - internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously) + /// Delegate that will be invoked when an item is dropped from the channel. See . + internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action? itemDropped) { Debug.Assert(bufferedCapacity > 0); _bufferedCapacity = bufferedCapacity; _mode = mode; _runContinuationsAsynchronously = runContinuationsAsynchronously; + _itemDropped = itemDropped; _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); Reader = new BoundedChannelReader(this); Writer = new BoundedChannelWriter(this); @@ -332,8 +336,12 @@ namespace System.Threading.Channels AsyncOperation? waitingReadersTail = null; BoundedChannel parent = _parent; - lock (parent.SyncObj) + + bool releaseLock = false; + try { + Monitor.Enter(parent.SyncObj, ref releaseLock); + parent.AssertInvariants(); // If we're done writing, nothing more to do. @@ -393,24 +401,35 @@ namespace System.Threading.Channels { // The channel is full. Just ignore the item being added // but say we added it. + Monitor.Exit(parent.SyncObj); + releaseLock = false; + parent._itemDropped?.Invoke(item); return true; } else { // The channel is full, and we're in a dropping mode. - // Drop either the oldest or the newest and write the new item. - if (parent._mode == BoundedChannelFullMode.DropNewest) - { - parent._items.DequeueTail(); - } - else - { + // Drop either the oldest or the newest + T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ? + parent._items.DequeueTail() : parent._items.DequeueHead(); - } + parent._items.EnqueueTail(item); + + Monitor.Exit(parent.SyncObj); + releaseLock = false; + parent._itemDropped?.Invoke(droppedItem); + return true; } } + finally + { + if (releaseLock) + { + Monitor.Exit(parent.SyncObj); + } + } // We either wrote the item already, or we're transferring it to the blocked reader we grabbed. if (blockedReader != null) @@ -492,8 +511,12 @@ namespace System.Threading.Channels AsyncOperation? waitingReadersTail = null; BoundedChannel parent = _parent; - lock (parent.SyncObj) + + bool releaseLock = false; + try { + Monitor.Enter(parent.SyncObj, ref releaseLock); + parent.AssertInvariants(); // If we're done writing, trying to write is an error. @@ -569,24 +592,35 @@ namespace System.Threading.Channels { // The channel is full and we're in ignore mode. // Ignore the item but say we accepted it. + Monitor.Exit(parent.SyncObj); + releaseLock = false; + parent._itemDropped?.Invoke(item); return default; } else { // The channel is full, and we're in a dropping mode. // Drop either the oldest or the newest and write the new item. - if (parent._mode == BoundedChannelFullMode.DropNewest) - { - parent._items.DequeueTail(); - } - else - { + T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ? + parent._items.DequeueTail() : parent._items.DequeueHead(); - } + parent._items.EnqueueTail(item); + + Monitor.Exit(parent.SyncObj); + releaseLock = false; + parent._itemDropped?.Invoke(droppedItem); + return default; } } + finally + { + if (releaseLock) + { + Monitor.Exit(parent.SyncObj); + } + } // We either wrote the item already, or we're transfering it to the blocked reader we grabbed. if (blockedReader != null) diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs index f377993..1564892 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs @@ -35,21 +35,31 @@ namespace System.Threading.Channels throw new ArgumentOutOfRangeException(nameof(capacity)); } - return new BoundedChannel(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true); + return new BoundedChannel(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null); } - /// Creates a channel with the specified maximum capacity. + /// Creates a channel subject to the provided options. /// Specifies the type of data in the channel. /// Options that guide the behavior of the channel. /// The created channel. public static Channel CreateBounded(BoundedChannelOptions options) { + return CreateBounded(options, itemDropped: null); + } + + /// Creates a channel subject to the provided options. + /// Specifies the type of data in the channel. + /// Options that guide the behavior of the channel. + /// Delegate that will be called when item is being dropped from channel. See . + /// The created channel. + public static Channel CreateBounded(BoundedChannelOptions options, Action? itemDropped) + { if (options == null) { throw new ArgumentNullException(nameof(options)); } - return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations); + return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped); } } } diff --git a/src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs b/src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs index 4c8989d..6eb1d3f 100644 --- a/src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs +++ b/src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs @@ -1,6 +1,8 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Microsoft.DotNet.XUnitExtensions; using Xunit; @@ -17,6 +19,17 @@ namespace System.Threading.Channels.Tests return c; } + public static IEnumerable ChannelDropModes() + { + foreach (BoundedChannelFullMode mode in Enum.GetValues(typeof(BoundedChannelFullMode))) + { + if (mode != BoundedChannelFullMode.Wait) + { + yield return new object[] { mode }; + } + } + } + [Fact] public void Count_IncrementsDecrementsAsExpected() { @@ -250,6 +263,216 @@ namespace System.Threading.Channels.Tests } [Fact] + public void DroppedDelegateNotCalledOnWaitMode_SyncWrites() + { + bool dropDelegateCalled = false; + + Channel c = Channel.CreateBounded(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.Wait }, + item => + { + dropDelegateCalled = true; + }); + + Assert.True(c.Writer.TryWrite(1)); + Assert.False(c.Writer.TryWrite(1)); + + Assert.False(dropDelegateCalled); + } + + [Fact] + public async Task DroppedDelegateNotCalledOnWaitMode_AsyncWrites() + { + bool dropDelegateCalled = false; + + Channel c = Channel.CreateBounded(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.Wait }, + item => + { + dropDelegateCalled = true; + }); + + // First async write should pass + await c.Writer.WriteAsync(1); + + // Second write should wait + var secondWriteTask = c.Writer.WriteAsync(2); + Assert.False(secondWriteTask.IsCompleted); + + // Read from channel to free up space + var readItem = await c.Reader.ReadAsync(); + // Second write should complete + await secondWriteTask; + + // No dropped delegate should be called + Assert.False(dropDelegateCalled); + } + + [Theory] + [MemberData(nameof(ChannelDropModes))] + public void DroppedDelegateIsNull_SyncWrites(BoundedChannelFullMode boundedChannelFullMode) + { + Channel c = Channel.CreateBounded(new BoundedChannelOptions(1) { FullMode = boundedChannelFullMode }, itemDropped: null); + + Assert.True(c.Writer.TryWrite(5)); + Assert.True(c.Writer.TryWrite(5)); + } + + [Theory] + [MemberData(nameof(ChannelDropModes))] + public async Task DroppedDelegateIsNull_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode) + { + Channel c = Channel.CreateBounded(new BoundedChannelOptions(1) { FullMode = boundedChannelFullMode }, itemDropped: null); + + await c.Writer.WriteAsync(5); + await c.Writer.WriteAsync(5); + } + + [Theory] + [MemberData(nameof(ChannelDropModes))] + public void DroppedDelegateCalledOnChannelFull_SyncWrites(BoundedChannelFullMode boundedChannelFullMode) + { + var droppedItems = new HashSet(); + + void AddDroppedItem(int itemDropped) + { + Assert.True(droppedItems.Add(itemDropped)); + } + + const int channelCapacity = 10; + var c = Channel.CreateBounded(new BoundedChannelOptions(channelCapacity) + { + FullMode = boundedChannelFullMode + }, AddDroppedItem); + + for (int i = 0; i < channelCapacity; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + + // No dropped delegate should be called while channel is not full + Assert.Empty(droppedItems); + + for (int i = channelCapacity; i < channelCapacity + 10; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + + // Assert expected number of dropped items delegate calls + Assert.Equal(10, droppedItems.Count); + } + + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [MemberData(nameof(ChannelDropModes))] + public void DroppedDelegateCalledAfterLockReleased_SyncWrites(BoundedChannelFullMode boundedChannelFullMode) + { + Channel c = null; + bool dropDelegateCalled = false; + + c = Channel.CreateBounded(new BoundedChannelOptions(1) + { + FullMode = boundedChannelFullMode + }, (droppedItem) => + { + if (dropDelegateCalled) + { + // Prevent infinite callbacks being called + return; + } + + dropDelegateCalled = true; + + // Dropped delegate should not be called while holding the channel lock. + // Verify this by trying to write into the channel from different thread. + // If lock is held during callback, this should effecitvely cause deadlock. + var mres = new ManualResetEventSlim(); + ThreadPool.QueueUserWorkItem(delegate + { + c.Writer.TryWrite(3); + mres.Set(); + }); + + mres.Wait(); + }); + + Assert.True(c.Writer.TryWrite(1)); + Assert.True(c.Writer.TryWrite(2)); + + Assert.True(dropDelegateCalled); + } + + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [MemberData(nameof(ChannelDropModes))] + public async Task DroppedDelegateCalledAfterLockReleased_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode) + { + Channel c = null; + bool dropDelegateCalled = false; + + c = Channel.CreateBounded(new BoundedChannelOptions(1) + { + FullMode = boundedChannelFullMode + }, (droppedItem) => + { + if (dropDelegateCalled) + { + // Prevent infinite callbacks being called + return; + } + + dropDelegateCalled = true; + + // Dropped delegate should not be called while holding the channel synchronisation lock. + // Verify this by trying to write into the channel from different thread. + // If lock is held during callback, this should effecitvely cause deadlock. + var mres = new ManualResetEventSlim(); + ThreadPool.QueueUserWorkItem(delegate + { + c.Writer.TryWrite(11); + mres.Set(); + }); + + mres.Wait(); + }); + + await c.Writer.WriteAsync(1); + await c.Writer.WriteAsync(2); + + Assert.True(dropDelegateCalled); + } + + [Theory] + [MemberData(nameof(ChannelDropModes))] + public async Task DroppedDelegateCalledOnChannelFull_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode) + { + var droppedItems = new HashSet(); + + void AddDroppedItem(int itemDropped) + { + Assert.True(droppedItems.Add(itemDropped)); + } + + const int channelCapacity = 10; + var c = Channel.CreateBounded(new BoundedChannelOptions(channelCapacity) + { + FullMode = boundedChannelFullMode + }, AddDroppedItem); + + for (int i = 0; i < channelCapacity; i++) + { + await c.Writer.WriteAsync(i); + } + + // No dropped delegate should be called while channel is not full + Assert.Empty(droppedItems); + + for (int i = channelCapacity; i < channelCapacity + 10; i++) + { + await c.Writer.WriteAsync(i); + } + + // Assert expected number of dropped items delegate calls + Assert.Equal(10, droppedItems.Count); + } + + [Fact] public async Task CancelPendingWrite_Reading_DataTransferredFromCorrectWriter() { var c = Channel.CreateBounded(1);