Skip to content

Commit

Permalink
Allow bounded channel to be created with drop delegate (#50331)
Browse files Browse the repository at this point in the history
* Bounded channel can be created with drop delegate.

- Add additional CreateBounded overload with delegate parameter that will be called when item is being dropped from channel
- Added unit tests

* Fix typo in comment.

* Apply suggestions from code review

Co-authored-by: Stephen Toub <[email protected]>

* Call drop delegate outside of lock statement.

* Use overload of CreateBounded method instead of calling ctor directly.

* Code review suggestions refactor.

* Move Monitor.Enter before try and use local scoped parent variable everywhere.

* Drop delegate should not be called while sync lock is held.

Enqueuing of new item should be done while sync lock is being held.
Added additional tests.

* Rerun gitlab CI.

* Do not run deadlock test for bounded channels if platform do not support threading.

* Apply suggestions from code review

Co-authored-by: Ivan Benovic <[email protected]>
Co-authored-by: Stephen Toub <[email protected]>
  • Loading branch information
3 people committed Mar 31, 2021
1 parent a913740 commit b47094d
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public static partial class Channel
{
public static System.Threading.Channels.Channel<T> CreateBounded<T>(int capacity) { throw null; }
public static System.Threading.Channels.Channel<T> CreateBounded<T>(System.Threading.Channels.BoundedChannelOptions options) { throw null; }
public static System.Threading.Channels.Channel<T> CreateBounded<T>(BoundedChannelOptions options, Action<T>? itemDropped) { throw null; }
public static System.Threading.Channels.Channel<T> CreateUnbounded<T>() { throw null; }
public static System.Threading.Channels.Channel<T> CreateUnbounded<T>(System.Threading.Channels.UnboundedChannelOptions options) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
{
/// <summary>The mode used when the channel hits its bound.</summary>
private readonly BoundedChannelFullMode _mode;
/// <summary>The delegate that will be invoked when the channel hits its bound and an item is dropped from the channel.</summary>
private readonly Action<T>? _itemDropped;
/// <summary>Task signaled when the channel has completed.</summary>
private readonly TaskCompletionSource _completion;
/// <summary>The maximum capacity of the channel.</summary>
Expand All @@ -40,12 +42,14 @@ internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
/// <param name="bufferedCapacity">The positive bounded capacity for the channel.</param>
/// <param name="mode">The mode used when writing to a full channel.</param>
/// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param>
internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously)
/// <param name="itemDropped">Delegate that will be invoked when an item is dropped from the channel. See <see cref="BoundedChannelFullMode"/>.</param>
internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action<T>? itemDropped)
{
Debug.Assert(bufferedCapacity > 0);
_bufferedCapacity = bufferedCapacity;
_mode = mode;
_runContinuationsAsynchronously = runContinuationsAsynchronously;
_itemDropped = itemDropped;
_completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
Reader = new BoundedChannelReader(this);
Writer = new BoundedChannelWriter(this);
Expand Down Expand Up @@ -332,8 +336,12 @@ public override bool TryWrite(T item)
AsyncOperation<bool>? waitingReadersTail = null;

BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)

bool releaseLock = false;
try
{
Monitor.Enter(parent.SyncObj, ref releaseLock);

parent.AssertInvariants();

// If we're done writing, nothing more to do.
Expand Down Expand Up @@ -393,24 +401,35 @@ public override bool TryWrite(T item)
{
// The channel is full. Just ignore the item being added
// but say we added it.
Monitor.Exit(parent.SyncObj);
releaseLock = false;
parent._itemDropped?.Invoke(item);
return true;
}
else
{
// The channel is full, and we're in a dropping mode.
// Drop either the oldest or the newest and write the new item.
if (parent._mode == BoundedChannelFullMode.DropNewest)
{
parent._items.DequeueTail();
}
else
{
// Drop either the oldest or the newest
T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
parent._items.DequeueTail() :
parent._items.DequeueHead();
}

parent._items.EnqueueTail(item);

Monitor.Exit(parent.SyncObj);
releaseLock = false;
parent._itemDropped?.Invoke(droppedItem);

return true;
}
}
finally
{
if (releaseLock)
{
Monitor.Exit(parent.SyncObj);
}
}

// We either wrote the item already, or we're transferring it to the blocked reader we grabbed.
if (blockedReader != null)
Expand Down Expand Up @@ -492,8 +511,12 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken
AsyncOperation<bool>? waitingReadersTail = null;

BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)

bool releaseLock = false;
try
{
Monitor.Enter(parent.SyncObj, ref releaseLock);

parent.AssertInvariants();

// If we're done writing, trying to write is an error.
Expand Down Expand Up @@ -569,24 +592,35 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken
{
// The channel is full and we're in ignore mode.
// Ignore the item but say we accepted it.
Monitor.Exit(parent.SyncObj);
releaseLock = false;
parent._itemDropped?.Invoke(item);
return default;
}
else
{
// The channel is full, and we're in a dropping mode.
// Drop either the oldest or the newest and write the new item.
if (parent._mode == BoundedChannelFullMode.DropNewest)
{
parent._items.DequeueTail();
}
else
{
T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
parent._items.DequeueTail() :
parent._items.DequeueHead();
}

parent._items.EnqueueTail(item);

Monitor.Exit(parent.SyncObj);
releaseLock = false;
parent._itemDropped?.Invoke(droppedItem);

return default;
}
}
finally
{
if (releaseLock)
{
Monitor.Exit(parent.SyncObj);
}
}

// We either wrote the item already, or we're transfering it to the blocked reader we grabbed.
if (blockedReader != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,31 @@ public static Channel<T> CreateBounded<T>(int capacity)
throw new ArgumentOutOfRangeException(nameof(capacity));
}

return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true);
return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null);
}

/// <summary>Creates a channel with the specified maximum capacity.</summary>
/// <summary>Creates a channel subject to the provided options.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <returns>The created channel.</returns>
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options)
{
return CreateBounded<T>(options, itemDropped: null);
}

/// <summary>Creates a channel subject to the provided options.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <param name="itemDropped">Delegate that will be called when item is being dropped from channel. See <see cref="BoundedChannelFullMode"/>.</param>
/// <returns>The created channel.</returns>
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options, Action<T>? itemDropped)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}

return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations);
return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
}
}
}
Loading

0 comments on commit b47094d

Please sign in to comment.