{
/// <summary>The mode used when the channel hits its bound.</summary>
private readonly BoundedChannelFullMode _mode;
+ /// <summary>The delegate that will be invoked when the channel hits its bound and an item is dropped from the channel.</summary>
+ private readonly Action<T>? _itemDropped;
/// <summary>Task signaled when the channel has completed.</summary>
private readonly TaskCompletionSource _completion;
/// <summary>The maximum capacity of the channel.</summary>
/// <param name="bufferedCapacity">The positive bounded capacity for the channel.</param>
/// <param name="mode">The mode used when writing to a full channel.</param>
/// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param>
- internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously)
+ /// <param name="itemDropped">Delegate that will be invoked when an item is dropped from the channel. See <see cref="BoundedChannelFullMode"/>.</param>
+ internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action<T>? itemDropped)
{
Debug.Assert(bufferedCapacity > 0);
_bufferedCapacity = bufferedCapacity;
_mode = mode;
_runContinuationsAsynchronously = runContinuationsAsynchronously;
+ _itemDropped = itemDropped;
_completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
Reader = new BoundedChannelReader(this);
Writer = new BoundedChannelWriter(this);
AsyncOperation<bool>? waitingReadersTail = null;
BoundedChannel<T> parent = _parent;
- lock (parent.SyncObj)
+
+ bool releaseLock = false;
+ try
{
+ Monitor.Enter(parent.SyncObj, ref releaseLock);
+
parent.AssertInvariants();
// If we're done writing, nothing more to do.
{
// The channel is full. Just ignore the item being added
// but say we added it.
+ Monitor.Exit(parent.SyncObj);
+ releaseLock = false;
+ parent._itemDropped?.Invoke(item);
return true;
}
else
{
// The channel is full, and we're in a dropping mode.
- // Drop either the oldest or the newest and write the new item.
- if (parent._mode == BoundedChannelFullMode.DropNewest)
- {
- parent._items.DequeueTail();
- }
- else
- {
+ // Drop either the oldest or the newest
+ T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
+ parent._items.DequeueTail() :
parent._items.DequeueHead();
- }
+
parent._items.EnqueueTail(item);
+
+ Monitor.Exit(parent.SyncObj);
+ releaseLock = false;
+ parent._itemDropped?.Invoke(droppedItem);
+
return true;
}
}
+ finally
+ {
+ if (releaseLock)
+ {
+ Monitor.Exit(parent.SyncObj);
+ }
+ }
// We either wrote the item already, or we're transferring it to the blocked reader we grabbed.
if (blockedReader != null)
AsyncOperation<bool>? waitingReadersTail = null;
BoundedChannel<T> parent = _parent;
- lock (parent.SyncObj)
+
+ bool releaseLock = false;
+ try
{
+ Monitor.Enter(parent.SyncObj, ref releaseLock);
+
parent.AssertInvariants();
// If we're done writing, trying to write is an error.
{
// The channel is full and we're in ignore mode.
// Ignore the item but say we accepted it.
+ Monitor.Exit(parent.SyncObj);
+ releaseLock = false;
+ parent._itemDropped?.Invoke(item);
return default;
}
else
{
// The channel is full, and we're in a dropping mode.
// Drop either the oldest or the newest and write the new item.
- if (parent._mode == BoundedChannelFullMode.DropNewest)
- {
- parent._items.DequeueTail();
- }
- else
- {
+ T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
+ parent._items.DequeueTail() :
parent._items.DequeueHead();
- }
+
parent._items.EnqueueTail(item);
+
+ Monitor.Exit(parent.SyncObj);
+ releaseLock = false;
+ parent._itemDropped?.Invoke(droppedItem);
+
return default;
}
}
+ finally
+ {
+ if (releaseLock)
+ {
+ Monitor.Exit(parent.SyncObj);
+ }
+ }
// We either wrote the item already, or we're transfering it to the blocked reader we grabbed.
if (blockedReader != null)
throw new ArgumentOutOfRangeException(nameof(capacity));
}
- return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true);
+ return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null);
}
- /// <summary>Creates a channel with the specified maximum capacity.</summary>
+ /// <summary>Creates a channel subject to the provided options.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <returns>The created channel.</returns>
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options)
{
+ return CreateBounded<T>(options, itemDropped: null);
+ }
+
+ /// <summary>Creates a channel subject to the provided options.</summary>
+ /// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
+ /// <param name="options">Options that guide the behavior of the channel.</param>
+ /// <param name="itemDropped">Delegate that will be called when item is being dropped from channel. See <see cref="BoundedChannelFullMode"/>.</param>
+ /// <returns>The created channel.</returns>
+ public static Channel<T> CreateBounded<T>(BoundedChannelOptions options, Action<T>? itemDropped)
+ {
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
- return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations);
+ return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
}
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.Collections.Generic;
+using System.Linq;
using System.Threading.Tasks;
using Microsoft.DotNet.XUnitExtensions;
using Xunit;
return c;
}
+ public static IEnumerable<object[]> ChannelDropModes()
+ {
+ foreach (BoundedChannelFullMode mode in Enum.GetValues(typeof(BoundedChannelFullMode)))
+ {
+ if (mode != BoundedChannelFullMode.Wait)
+ {
+ yield return new object[] { mode };
+ }
+ }
+ }
+
[Fact]
public void Count_IncrementsDecrementsAsExpected()
{
}
[Fact]
+ public void DroppedDelegateNotCalledOnWaitMode_SyncWrites()
+ {
+ bool dropDelegateCalled = false;
+
+ Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.Wait },
+ item =>
+ {
+ dropDelegateCalled = true;
+ });
+
+ Assert.True(c.Writer.TryWrite(1));
+ Assert.False(c.Writer.TryWrite(1));
+
+ Assert.False(dropDelegateCalled);
+ }
+
+ [Fact]
+ public async Task DroppedDelegateNotCalledOnWaitMode_AsyncWrites()
+ {
+ bool dropDelegateCalled = false;
+
+ Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.Wait },
+ item =>
+ {
+ dropDelegateCalled = true;
+ });
+
+ // First async write should pass
+ await c.Writer.WriteAsync(1);
+
+ // Second write should wait
+ var secondWriteTask = c.Writer.WriteAsync(2);
+ Assert.False(secondWriteTask.IsCompleted);
+
+ // Read from channel to free up space
+ var readItem = await c.Reader.ReadAsync();
+ // Second write should complete
+ await secondWriteTask;
+
+ // No dropped delegate should be called
+ Assert.False(dropDelegateCalled);
+ }
+
+ [Theory]
+ [MemberData(nameof(ChannelDropModes))]
+ public void DroppedDelegateIsNull_SyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+ {
+ Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = boundedChannelFullMode }, itemDropped: null);
+
+ Assert.True(c.Writer.TryWrite(5));
+ Assert.True(c.Writer.TryWrite(5));
+ }
+
+ [Theory]
+ [MemberData(nameof(ChannelDropModes))]
+ public async Task DroppedDelegateIsNull_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+ {
+ Channel<int> c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = boundedChannelFullMode }, itemDropped: null);
+
+ await c.Writer.WriteAsync(5);
+ await c.Writer.WriteAsync(5);
+ }
+
+ [Theory]
+ [MemberData(nameof(ChannelDropModes))]
+ public void DroppedDelegateCalledOnChannelFull_SyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+ {
+ var droppedItems = new HashSet<int>();
+
+ void AddDroppedItem(int itemDropped)
+ {
+ Assert.True(droppedItems.Add(itemDropped));
+ }
+
+ const int channelCapacity = 10;
+ var c = Channel.CreateBounded<int>(new BoundedChannelOptions(channelCapacity)
+ {
+ FullMode = boundedChannelFullMode
+ }, AddDroppedItem);
+
+ for (int i = 0; i < channelCapacity; i++)
+ {
+ Assert.True(c.Writer.TryWrite(i));
+ }
+
+ // No dropped delegate should be called while channel is not full
+ Assert.Empty(droppedItems);
+
+ for (int i = channelCapacity; i < channelCapacity + 10; i++)
+ {
+ Assert.True(c.Writer.TryWrite(i));
+ }
+
+ // Assert expected number of dropped items delegate calls
+ Assert.Equal(10, droppedItems.Count);
+ }
+
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [MemberData(nameof(ChannelDropModes))]
+ public void DroppedDelegateCalledAfterLockReleased_SyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+ {
+ Channel<int> c = null;
+ bool dropDelegateCalled = false;
+
+ c = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
+ {
+ FullMode = boundedChannelFullMode
+ }, (droppedItem) =>
+ {
+ if (dropDelegateCalled)
+ {
+ // Prevent infinite callbacks being called
+ return;
+ }
+
+ dropDelegateCalled = true;
+
+ // Dropped delegate should not be called while holding the channel lock.
+ // Verify this by trying to write into the channel from different thread.
+ // If lock is held during callback, this should effecitvely cause deadlock.
+ var mres = new ManualResetEventSlim();
+ ThreadPool.QueueUserWorkItem(delegate
+ {
+ c.Writer.TryWrite(3);
+ mres.Set();
+ });
+
+ mres.Wait();
+ });
+
+ Assert.True(c.Writer.TryWrite(1));
+ Assert.True(c.Writer.TryWrite(2));
+
+ Assert.True(dropDelegateCalled);
+ }
+
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [MemberData(nameof(ChannelDropModes))]
+ public async Task DroppedDelegateCalledAfterLockReleased_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+ {
+ Channel<int> c = null;
+ bool dropDelegateCalled = false;
+
+ c = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
+ {
+ FullMode = boundedChannelFullMode
+ }, (droppedItem) =>
+ {
+ if (dropDelegateCalled)
+ {
+ // Prevent infinite callbacks being called
+ return;
+ }
+
+ dropDelegateCalled = true;
+
+ // Dropped delegate should not be called while holding the channel synchronisation lock.
+ // Verify this by trying to write into the channel from different thread.
+ // If lock is held during callback, this should effecitvely cause deadlock.
+ var mres = new ManualResetEventSlim();
+ ThreadPool.QueueUserWorkItem(delegate
+ {
+ c.Writer.TryWrite(11);
+ mres.Set();
+ });
+
+ mres.Wait();
+ });
+
+ await c.Writer.WriteAsync(1);
+ await c.Writer.WriteAsync(2);
+
+ Assert.True(dropDelegateCalled);
+ }
+
+ [Theory]
+ [MemberData(nameof(ChannelDropModes))]
+ public async Task DroppedDelegateCalledOnChannelFull_AsyncWrites(BoundedChannelFullMode boundedChannelFullMode)
+ {
+ var droppedItems = new HashSet<int>();
+
+ void AddDroppedItem(int itemDropped)
+ {
+ Assert.True(droppedItems.Add(itemDropped));
+ }
+
+ const int channelCapacity = 10;
+ var c = Channel.CreateBounded<int>(new BoundedChannelOptions(channelCapacity)
+ {
+ FullMode = boundedChannelFullMode
+ }, AddDroppedItem);
+
+ for (int i = 0; i < channelCapacity; i++)
+ {
+ await c.Writer.WriteAsync(i);
+ }
+
+ // No dropped delegate should be called while channel is not full
+ Assert.Empty(droppedItems);
+
+ for (int i = channelCapacity; i < channelCapacity + 10; i++)
+ {
+ await c.Writer.WriteAsync(i);
+ }
+
+ // Assert expected number of dropped items delegate calls
+ Assert.Equal(10, droppedItems.Count);
+ }
+
+ [Fact]
public async Task CancelPendingWrite_Reading_DataTransferredFromCorrectWriter()
{
var c = Channel.CreateBounded<int>(1);