Allow bounded channel to be created with drop delegate (#50331)
authorIvan Beňovic <Ivan.Benovic2@gmail.com>
Wed, 31 Mar 2021 11:13:59 +0000 (13:13 +0200)
committerGitHub <noreply@github.com>
Wed, 31 Mar 2021 11:13:59 +0000 (07:13 -0400)
* Bounded channel can be created with drop delegate.

- Add additional CreateBounded overload with delegate parameter that will be called when item is being dropped from channel
- Added unit tests

* Fix typo in comment.

* Apply suggestions from code review

Co-authored-by: Stephen Toub <stoub@microsoft.com>
* Call drop delegate outside of lock statement.

* Use overload of CreateBounded method instead of calling ctor directly.

* Code review suggestions refactor.

* Move Monitor.Enter before try and use local scoped parent variable everywhere.

* Drop delegate should not be called while sync lock is held.

Enqueuing of new item should be done while sync lock is being held.
Added additional tests.

* Rerun gitlab CI.

* Do not run deadlock test for bounded channels if platform do not support threading.

* Apply suggestions from code review

Co-authored-by: Ivan Benovic <ivan.benovic@innovatrics.com>
Co-authored-by: Stephen Toub <stoub@microsoft.com>
src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs
src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs
src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs
src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs

index 6422594..8c09368 100644 (file)
@@ -23,6 +23,7 @@ namespace System.Threading.Channels
     {
         public static System.Threading.Channels.Channel<T> CreateBounded<T>(int capacity) { throw null; }
         public static System.Threading.Channels.Channel<T> CreateBounded<T>(System.Threading.Channels.BoundedChannelOptions options) { throw null; }
+        public static System.Threading.Channels.Channel<T> CreateBounded<T>(BoundedChannelOptions options, Action<T>? itemDropped) { throw null; }
         public static System.Threading.Channels.Channel<T> CreateUnbounded<T>() { throw null; }
         public static System.Threading.Channels.Channel<T> CreateUnbounded<T>(System.Threading.Channels.UnboundedChannelOptions options) { throw null; }
     }
index c8e0746..bc7b18f 100644 (file)
@@ -15,6 +15,8 @@ namespace System.Threading.Channels
     {
         /// <summary>The mode used when the channel hits its bound.</summary>
         private readonly BoundedChannelFullMode _mode;
+        /// <summary>The delegate that will be invoked when the channel hits its bound and an item is dropped from the channel.</summary>
+        private readonly Action<T>? _itemDropped;
         /// <summary>Task signaled when the channel has completed.</summary>
         private readonly TaskCompletionSource _completion;
         /// <summary>The maximum capacity of the channel.</summary>
@@ -40,12 +42,14 @@ namespace System.Threading.Channels
         /// <param name="bufferedCapacity">The positive bounded capacity for the channel.</param>
         /// <param name="mode">The mode used when writing to a full channel.</param>
         /// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param>
-        internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously)
+        /// <param name="itemDropped">Delegate that will be invoked when an item is dropped from the channel. See <see cref="BoundedChannelFullMode"/>.</param>
+        internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action<T>? itemDropped)
         {
             Debug.Assert(bufferedCapacity > 0);
             _bufferedCapacity = bufferedCapacity;
             _mode = mode;
             _runContinuationsAsynchronously = runContinuationsAsynchronously;
+            _itemDropped = itemDropped;
             _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
             Reader = new BoundedChannelReader(this);
             Writer = new BoundedChannelWriter(this);
@@ -332,8 +336,12 @@ namespace System.Threading.Channels
                 AsyncOperation<bool>? waitingReadersTail = null;
 
                 BoundedChannel<T> parent = _parent;
-                lock (parent.SyncObj)
+
+                bool releaseLock = false;
+                try
                 {
+                    Monitor.Enter(parent.SyncObj, ref releaseLock);
+
                     parent.AssertInvariants();
 
                     // If we're done writing, nothing more to do.
@@ -393,24 +401,35 @@ namespace System.Threading.Channels
                     {
                         // The channel is full.  Just ignore the item being added
                         // but say we added it.
+                        Monitor.Exit(parent.SyncObj);
+                        releaseLock = false;
+                        parent._itemDropped?.Invoke(item);
                         return true;
                     }
                     else
                     {
                         // The channel is full, and we're in a dropping mode.
-                        // Drop either the oldest or the newest and write the new item.
-                        if (parent._mode == BoundedChannelFullMode.DropNewest)
-                        {
-                            parent._items.DequeueTail();
-                        }
-                        else
-                        {
+                        // Drop either the oldest or the newest
+                        T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
+                            parent._items.DequeueTail() :
                             parent._items.DequeueHead();
-                        }
+
                         parent._items.EnqueueTail(item);
+
+                        Monitor.Exit(parent.SyncObj);
+                        releaseLock = false;
+                        parent._itemDropped?.Invoke(droppedItem);
+
                         return true;
                     }
                 }
+                finally
+                {
+                    if (releaseLock)
+                    {
+                        Monitor.Exit(parent.SyncObj);
+                    }
+                }
 
                 // We either wrote the item already, or we're transferring it to the blocked reader we grabbed.
                 if (blockedReader != null)
@@ -492,8 +511,12 @@ namespace System.Threading.Channels
                 AsyncOperation<bool>? waitingReadersTail = null;
 
                 BoundedChannel<T> parent = _parent;
-                lock (parent.SyncObj)
+
+                bool releaseLock = false;
+                try
                 {
+                    Monitor.Enter(parent.SyncObj, ref releaseLock);
+
                     parent.AssertInvariants();
 
                     // If we're done writing, trying to write is an error.
@@ -569,24 +592,35 @@ namespace System.Threading.Channels
                     {
                         // The channel is full and we're in ignore mode.
                         // Ignore the item but say we accepted it.
+                        Monitor.Exit(parent.SyncObj);
+                        releaseLock = false;
+                        parent._itemDropped?.Invoke(item);
                         return default;
                     }
                     else
                     {
                         // The channel is full, and we're in a dropping mode.
                         // Drop either the oldest or the newest and write the new item.
-                        if (parent._mode == BoundedChannelFullMode.DropNewest)
-                        {
-                            parent._items.DequeueTail();
-                        }
-                        else
-                        {
+                        T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
+                            parent._items.DequeueTail() :
                             parent._items.DequeueHead();
-                        }
+
                         parent._items.EnqueueTail(item);
+
+                        Monitor.Exit(parent.SyncObj);
+                        releaseLock = false;
+                        parent._itemDropped?.Invoke(droppedItem);
+
                         return default;
                     }
                 }
+                finally
+                {
+                    if (releaseLock)
+                    {
+                        Monitor.Exit(parent.SyncObj);
+                    }
+                }
 
                 // We either wrote the item already, or we're transfering it to the blocked reader we grabbed.
                 if (blockedReader != null)
index f377993..1564892 100644 (file)
@@ -35,21 +35,31 @@ namespace System.Threading.Channels
                 throw new ArgumentOutOfRangeException(nameof(capacity));
             }
 
-            return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true);
+            return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null);
         }
 
-        /// <summary>Creates a channel with the specified maximum capacity.</summary>
+        /// <summary>Creates a channel subject to the provided options.</summary>
         /// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
         /// <param name="options">Options that guide the behavior of the channel.</param>
         /// <returns>The created channel.</returns>
         public static Channel<T> CreateBounded<T>(BoundedChannelOptions options)
         {
+            return CreateBounded<T>(options, itemDropped: null);
+        }
+
+        /// <summary>Creates a channel subject to the provided options.</summary>
+        /// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
+        /// <param name="options">Options that guide the behavior of the channel.</param>
+        /// <param name="itemDropped">Delegate that will be called when item is being dropped from channel. See <see cref="BoundedChannelFullMode"/>.</param>
+        /// <returns>The created channel.</returns>
+        public static Channel<T> CreateBounded<T>(BoundedChannelOptions options, Action<T>? itemDropped)
+        {
             if (options == null)
             {
                 throw new ArgumentNullException(nameof(options));
             }
 
-            return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations);
+            return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
         }
     }
 }
index 4c8989d..6eb1d3f 100644 (file)
@@ -1,6 +1,8 @@
 // Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the MIT license.
 
+using System.Collections.Generic;
+using System.Linq;
 using System.Threading.Tasks;
 using Microsoft.DotNet.XUnitExtensions;
 using Xunit;
@@ -17,6 +19,17 @@ namespace System.Threading.Channels.Tests
             return c;
         }
 
+        public static IEnumerable<object[]> ChannelDropModes()
+        {
+            foreach (BoundedChannelFullMode mode in Enum.GetValues(typeof(BoundedChannelFullMode)))
+            {
+                if (mode != BoundedChannelFullMode.Wait)
+                {
+                    yield return new object[] { mode };
+                }
+            }
+        }
+
         [Fact]
         public void Count_IncrementsDecrementsAsExpected()
         {
@@ -250,6 +263,216 @@ namespace System.Threading.Channels.Tests
         }
 
         [Fact]
+        public void DroppedDelegateNotCalledOnWaitMode_SyncWrites()
+        {
+            bool dropDelegateCalled = false;
+
+            Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.Wait },
+                item =>
+                {
+                    dropDelegateCalled = true;
+                });
+
+            Assert.True(c.Writer.TryWrite(1));
+            Assert.False(c.Writer.TryWrite(1));
+
+            Assert.False(dropDelegateCalled);
+        }
+
+        [Fact]
+        public async Task DroppedDelegateNotCalledOnWaitMode_AsyncWrites()
+        {
+            bool dropDelegateCalled = false;
+
+            Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.Wait },
+                item =>
+                {
+                    dropDelegateCalled = true;
+                });
+
+            // First async write should pass
+            await c.Writer.WriteAsync(1);
+
+            // Second write should wait
+            var secondWriteTask = c.Writer.WriteAsync(2);
+            Assert.False(secondWriteTask.IsCompleted);
+
+            // Read from channel to free up space
+            var readItem = await c.Reader.ReadAsync();
+            // Second write should complete
+            await secondWriteTask;
+
+            // No dropped delegate should be called
+            Assert.False(dropDelegateCalled);
+        }
+
+        [Theory]
+        [MemberData(nameof(ChannelDropModes))]
+        public void DroppedDelegateIsNull_SyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+        {
+            Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = boundedChannelFullMode }, itemDropped: null);
+
+            Assert.True(c.Writer.TryWrite(5));
+            Assert.True(c.Writer.TryWrite(5));
+        }
+
+        [Theory]
+        [MemberData(nameof(ChannelDropModes))]
+        public async Task DroppedDelegateIsNull_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+        {
+            Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = boundedChannelFullMode }, itemDropped: null);
+
+            await c.Writer.WriteAsync(5);
+            await c.Writer.WriteAsync(5);
+        }
+
+        [Theory]
+        [MemberData(nameof(ChannelDropModes))]
+        public void DroppedDelegateCalledOnChannelFull_SyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+        {
+            var droppedItems = new HashSet<int>();
+
+            void AddDroppedItem(int itemDropped)
+            {
+                Assert.True(droppedItems.Add(itemDropped));
+            }
+
+            const int channelCapacity = 10;
+            var c = Channel.CreateBounded<int>(new BoundedChannelOptions(channelCapacity)
+            {
+                FullMode = boundedChannelFullMode
+            }, AddDroppedItem);
+
+            for (int i = 0; i < channelCapacity; i++)
+            {
+                Assert.True(c.Writer.TryWrite(i));
+            }
+
+            // No dropped delegate should be called while channel is not full
+            Assert.Empty(droppedItems);
+
+            for (int i = channelCapacity; i < channelCapacity + 10; i++)
+            {
+                Assert.True(c.Writer.TryWrite(i));
+            }
+
+            // Assert expected number of dropped items delegate calls
+            Assert.Equal(10, droppedItems.Count);
+        }
+
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [MemberData(nameof(ChannelDropModes))]        
+        public void DroppedDelegateCalledAfterLockReleased_SyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+        {
+            Channel<int> c = null;
+            bool dropDelegateCalled = false;
+            
+            c = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
+            {
+                FullMode = boundedChannelFullMode
+            }, (droppedItem) =>
+            {
+                if (dropDelegateCalled)
+                {
+                    // Prevent infinite callbacks being called
+                    return;
+                }
+
+                dropDelegateCalled = true;
+
+                // Dropped delegate should not be called while holding the channel lock.
+                // Verify this by trying to write into the channel from different thread.
+                // If lock is held during callback, this should effecitvely cause deadlock.
+                var mres = new ManualResetEventSlim();
+                ThreadPool.QueueUserWorkItem(delegate
+                {
+                    c.Writer.TryWrite(3);
+                    mres.Set();
+                });
+
+                mres.Wait();
+            });
+
+            Assert.True(c.Writer.TryWrite(1));
+            Assert.True(c.Writer.TryWrite(2));
+
+            Assert.True(dropDelegateCalled);
+        }
+
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [MemberData(nameof(ChannelDropModes))]        
+        public async Task DroppedDelegateCalledAfterLockReleased_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+        {
+            Channel<int> c = null;
+            bool dropDelegateCalled = false;
+
+            c = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
+            {
+                FullMode = boundedChannelFullMode
+            }, (droppedItem) =>
+            {
+                if (dropDelegateCalled)
+                {
+                    // Prevent infinite callbacks being called
+                    return;
+                }
+
+                dropDelegateCalled = true;
+
+                // Dropped delegate should not be called while holding the channel synchronisation lock.
+                // Verify this by trying to write into the channel from different thread.
+                // If lock is held during callback, this should effecitvely cause deadlock.
+                var mres = new ManualResetEventSlim();
+                ThreadPool.QueueUserWorkItem(delegate
+                {
+                    c.Writer.TryWrite(11);
+                    mres.Set();
+                });
+
+                mres.Wait();
+            });
+
+            await c.Writer.WriteAsync(1);
+            await c.Writer.WriteAsync(2);
+
+            Assert.True(dropDelegateCalled);
+        }
+
+        [Theory]
+        [MemberData(nameof(ChannelDropModes))]
+        public async Task DroppedDelegateCalledOnChannelFull_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+        {
+            var droppedItems = new HashSet<int>();
+
+            void AddDroppedItem(int itemDropped)
+            {
+                Assert.True(droppedItems.Add(itemDropped));
+            }
+
+            const int channelCapacity = 10;
+            var c = Channel.CreateBounded<int>(new BoundedChannelOptions(channelCapacity)
+            {
+                FullMode = boundedChannelFullMode
+            }, AddDroppedItem);
+
+            for (int i = 0; i < channelCapacity; i++)
+            {
+                await c.Writer.WriteAsync(i);
+            }
+
+            // No dropped delegate should be called while channel is not full
+            Assert.Empty(droppedItems);
+
+            for (int i = channelCapacity; i < channelCapacity + 10; i++)
+            {
+                await c.Writer.WriteAsync(i);
+            }
+
+            // Assert expected number of dropped items delegate calls
+            Assert.Equal(10, droppedItems.Count);
+        }
+
+        [Fact]
         public async Task CancelPendingWrite_Reading_DataTransferredFromCorrectWriter()
         {
             var c = Channel.CreateBounded<int>(1);