From 23b1102714887a8c005bc9e7d8d4bc7ff94b1786 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Tue, 11 Jun 2024 21:31:42 -0400 Subject: [PATCH 1/2] Revert "Dedup UnboundedChannel and UnboundedPriorityChannel (#101396)" This reverts commit b4e0169bfe10cfe69f6e7a8952b8f80fdfe9e31e. --- .../src/System.Threading.Channels.csproj | 2 +- .../src/System/Threading/Channels/Channel.cs | 35 +- .../Threading/Channels/Channel.netcoreapp.cs | 44 +-- .../Threading/Channels/IDebugEnumerator.cs | 4 +- .../Channels/IUnboundedChannelQueue.cs | 35 -- .../Threading/Channels/UnboundedChannel.cs | 92 ++--- .../Channels/UnboundedPriorityChannel.cs | 369 ++++++++++++++++++ 7 files changed, 402 insertions(+), 179 deletions(-) delete mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj index a5030bcee6561..c058b0b216b17 100644 --- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -25,7 +25,6 @@ System.Threading.Channel<T> - @@ -45,6 +44,7 @@ System.Threading.Channel<T> + diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs index 834d8ad88ed1d..317636579a05f 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs @@ -1,10 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; - namespace System.Threading.Channels { /// Provides static methods for creating channels. @@ -13,7 +9,7 @@ public static partial class Channel /// Creates an unbounded channel usable by any number of readers and writers concurrently. /// The created channel. public static Channel CreateUnbounded() => - new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true); + new UnboundedChannel(runContinuationsAsynchronously: true); /// Creates an unbounded channel subject to the provided options. /// Specifies the type of data in the channel. @@ -31,7 +27,7 @@ public static Channel CreateUnbounded(UnboundedChannelOptions options) return new SingleConsumerUnboundedChannel(!options.AllowSynchronousContinuations); } - return new UnboundedChannel>(new(new()), !options.AllowSynchronousContinuations); + return new UnboundedChannel(!options.AllowSynchronousContinuations); } /// Creates a channel with the specified maximum capacity. @@ -75,32 +71,5 @@ public static Channel CreateBounded(BoundedChannelOptions options, Action< return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped); } - - /// Provides an for a . - private readonly struct UnboundedChannelConcurrentQueue(ConcurrentQueue queue) : IUnboundedChannelQueue - { - private readonly ConcurrentQueue _queue = queue; - - /// - public bool IsThreadSafe => true; - - /// - public void Enqueue(T item) => _queue.Enqueue(item); - - /// - public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out item); - - /// - public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out item); - - /// - public int Count => _queue.Count; - - /// - public bool IsEmpty => _queue.IsEmpty; - - /// - public IEnumerator GetEnumerator() => _queue.GetEnumerator(); - } } } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs index c8fc9baebae7d..6c24b3e41ec7b 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs @@ -2,7 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; namespace System.Threading.Channels { @@ -10,14 +9,13 @@ namespace System.Threading.Channels public static partial class Channel { /// Creates an unbounded prioritized channel usable by any number of readers and writers concurrently. - /// Specifies the type of data in the channel. /// The created channel. /// /// is used to determine priority of elements. /// The next item read from the channel will be the element available in the channel with the lowest priority value. /// public static Channel CreateUnboundedPrioritized() => - new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true); + new UnboundedPrioritizedChannel(runContinuationsAsynchronously: true, comparer: null); /// Creates an unbounded prioritized channel subject to the provided options. /// Specifies the type of data in the channel. @@ -32,45 +30,7 @@ public static Channel CreateUnboundedPrioritized(UnboundedPrioritizedChann { ArgumentNullException.ThrowIfNull(options); - return new UnboundedChannel>(new(new(options.Comparer)), !options.AllowSynchronousContinuations); - } - - /// Provides an for a . - private readonly struct UnboundedChannelPriorityQueue(PriorityQueue queue) : IUnboundedChannelQueue - { - private readonly PriorityQueue _queue = queue; - - /// - public bool IsThreadSafe => false; - - /// - public void Enqueue(T item) => _queue.Enqueue(true, item); - - /// - public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out _, out item); - - /// - public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out _, out item); - - /// - public int Count => _queue.Count; - - /// - public bool IsEmpty => _queue.Count == 0; - - /// - public IEnumerator GetEnumerator() - { - List list = []; - foreach ((bool _, T Priority) item in _queue.UnorderedItems) - { - list.Add(item.Priority); - } - - list.Sort(_queue.Comparer); - - return list.GetEnumerator(); - } + return new UnboundedPrioritizedChannel(!options.AllowSynchronousContinuations, options.Comparer); } } } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs index af2a77bb1bf77..a3d072ee9f7cb 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs @@ -11,7 +11,7 @@ internal interface IDebugEnumerable IEnumerator GetEnumerator(); } - internal class DebugEnumeratorDebugView + internal sealed class DebugEnumeratorDebugView { public DebugEnumeratorDebugView(IDebugEnumerable enumerable) { @@ -26,6 +26,4 @@ public DebugEnumeratorDebugView(IDebugEnumerable enumerable) [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get; } } - - internal sealed class DebugEnumeratorDebugView(IDebugEnumerable enumerable) : DebugEnumeratorDebugView(enumerable); } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs deleted file mode 100644 index b1b65a1dffeb1..0000000000000 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; - -namespace System.Threading.Channels -{ - /// Representation of the queue data structure used by . - internal interface IUnboundedChannelQueue : IDebugEnumerable - { - /// Gets whether the other members are safe to use concurrently with each other and themselves. - bool IsThreadSafe { get; } - - /// Enqueues an item into the queue. - /// The item to enqueue. - void Enqueue(T item); - - /// Dequeues an item from the queue, if possible. - /// The dequeued item, or default if the queue was empty. - /// Whether an item was dequeued. - bool TryDequeue([MaybeNullWhen(false)] out T item); - - /// Peeks at the next item from the queue that would be dequeued, if possible. - /// The peeked item, or default if the queue was empty. - /// Whether an item was peeked. - bool TryPeek([MaybeNullWhen(false)] out T item); - - /// Gets the number of elements in the queue. - int Count { get; } - - /// Gets whether the queue is empty. - bool IsEmpty { get; } - } -} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs index ad7ee0e3608d3..fb3facf83dc47 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs @@ -5,20 +5,19 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace System.Threading.Channels { /// Provides a buffered channel of unbounded capacity. [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))] - internal sealed class UnboundedChannel : Channel, IDebugEnumerable where TQueue : struct, IUnboundedChannelQueue + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + internal sealed class UnboundedChannel : Channel, IDebugEnumerable { /// Task that indicates the channel has completed. private readonly TaskCompletionSource _completion; /// The items in the channel. - private readonly TQueue _items; + private readonly ConcurrentQueue _items = new ConcurrentQueue(); /// Readers blocked reading from the channel. private readonly Deque> _blockedReaders = new Deque>(); /// Whether to force continuations to be executed asynchronously from producer writes. @@ -30,9 +29,8 @@ internal sealed class UnboundedChannel : Channel, IDebugEnumerable private Exception? _doneWriting; /// Initialize the channel. - internal UnboundedChannel(TQueue items, bool runContinuationsAsynchronously) + internal UnboundedChannel(bool runContinuationsAsynchronously) { - _items = items; _runContinuationsAsynchronously = runContinuationsAsynchronously; _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); Reader = new UnboundedChannelReader(this); @@ -40,14 +38,14 @@ internal UnboundedChannel(TQueue items, bool runContinuationsAsynchronously) } [DebuggerDisplay("Items = {Count}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class UnboundedChannelReader : ChannelReader, IDebugEnumerable { - internal readonly UnboundedChannel _parent; + internal readonly UnboundedChannel _parent; private readonly AsyncOperation _readerSingleton; private readonly AsyncOperation _waiterSingleton; - internal UnboundedChannelReader(UnboundedChannel parent) + internal UnboundedChannelReader(UnboundedChannel parent) { _parent = parent; _readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); @@ -70,8 +68,8 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken) } // Dequeue an item if we can. - UnboundedChannel parent = _parent; - if (parent._items.IsThreadSafe && parent._items.TryDequeue(out T? item)) + UnboundedChannel parent = _parent; + if (parent._items.TryDequeue(out T? item)) { CompleteIfDone(parent); return new ValueTask(item); @@ -114,60 +112,24 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken) public override bool TryRead([MaybeNullWhen(false)] out T item) { - UnboundedChannel parent = _parent; - return parent._items.IsThreadSafe ? - LockFree(parent, out item) : - Locked(parent, out item); + UnboundedChannel parent = _parent; - static bool LockFree(UnboundedChannel parent, [MaybeNullWhen(false)] out T item) + // Dequeue an item if we can + if (parent._items.TryDequeue(out item)) { - if (parent._items.TryDequeue(out item)) - { - CompleteIfDone(parent); - return true; - } - - item = default; - return false; + CompleteIfDone(parent); + return true; } - static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item) - { - lock (parent.SyncObj) - { - if (parent._items.TryDequeue(out item)) - { - CompleteIfDone(parent); - return true; - } - } - - item = default; - return false; - } + item = default; + return false; } - public override bool TryPeek([MaybeNullWhen(false)] out T item) - { - UnboundedChannel parent = _parent; - return parent._items.IsThreadSafe ? - parent._items.TryPeek(out item) : - Locked(parent, out item); - - // Separated out to keep the try/finally from preventing TryPeek from being inlined - static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item) - { - lock (parent.SyncObj) - { - return parent._items.TryPeek(out item); - } - } - } + public override bool TryPeek([MaybeNullWhen(false)] out T item) => + _parent._items.TryPeek(out item); - private static void CompleteIfDone(UnboundedChannel parent) + private static void CompleteIfDone(UnboundedChannel parent) { - Debug.Assert(parent._items.IsThreadSafe || Monitor.IsEntered(parent.SyncObj)); - if (parent._doneWriting != null && parent._items.IsEmpty) { // If we've now emptied the items queue and we're not getting any more, complete. @@ -182,12 +144,12 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo return new ValueTask(Task.FromCanceled(cancellationToken)); } - if (_parent._items.IsThreadSafe && !_parent._items.IsEmpty) + if (!_parent._items.IsEmpty) { return new ValueTask(true); } - UnboundedChannel parent = _parent; + UnboundedChannel parent = _parent; lock (parent.SyncObj) { @@ -230,15 +192,15 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo } [DebuggerDisplay("Items = {ItemsCountForDebugger}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] private sealed class UnboundedChannelWriter : ChannelWriter, IDebugEnumerable { - internal readonly UnboundedChannel _parent; - internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent; + internal readonly UnboundedChannel _parent; + internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent; public override bool TryComplete(Exception? error) { - UnboundedChannel parent = _parent; + UnboundedChannel parent = _parent; bool completeTask; lock (parent.SyncObj) @@ -278,7 +240,7 @@ public override bool TryComplete(Exception? error) public override bool TryWrite(T item) { - UnboundedChannel parent = _parent; + UnboundedChannel parent = _parent; while (true) { AsyncOperation? blockedReader = null; @@ -359,7 +321,7 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken } /// Gets the object used to synchronize access to all state on this instance. - private object SyncObj => _blockedReaders; + private object SyncObj => _items; [Conditional("DEBUG")] private void AssertInvariants() diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs new file mode 100644 index 0000000000000..7af18b9413ee2 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs @@ -0,0 +1,369 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +// This file is primarily a copy of UnboundedChannel, subsequently tweaked to account for differences +// between ConcurrentQueue and PriorityQueue, e.g. that PQ isn't thread safe and so fast +// paths outside of locks need to be removed, that Enqueue/Dequeue methods take priorities, etc. Any +// changes made to this or that file should largely be kept in sync. + +namespace System.Threading.Channels +{ + /// Provides a buffered channel of unbounded capacity. + [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + internal sealed class UnboundedPrioritizedChannel : Channel, IDebugEnumerable + { + /// Task that indicates the channel has completed. + private readonly TaskCompletionSource _completion; + /// The items in the channel. + /// To avoid double storing of a potentially large struct T, the priority doubles as the element and the element is ignored. + private readonly PriorityQueue _items; + /// Readers blocked reading from the channel. + private readonly Deque> _blockedReaders = new Deque>(); + /// Whether to force continuations to be executed asynchronously from producer writes. + private readonly bool _runContinuationsAsynchronously; + + /// Readers waiting for a notification that data is available. + private AsyncOperation? _waitingReadersTail; + /// Set to non-null once Complete has been called. + private Exception? _doneWriting; + + /// Initialize the channel. + internal UnboundedPrioritizedChannel(bool runContinuationsAsynchronously, IComparer? comparer) + { + _runContinuationsAsynchronously = runContinuationsAsynchronously; + _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); + Reader = new UnboundedPrioritizedChannelReader(this); + Writer = new UnboundedPrioritizedChannelWriter(this); + _items = new PriorityQueue(comparer); + } + + [DebuggerDisplay("Items = {Count}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + private sealed class UnboundedPrioritizedChannelReader : ChannelReader, IDebugEnumerable + { + internal readonly UnboundedPrioritizedChannel _parent; + private readonly AsyncOperation _readerSingleton; + private readonly AsyncOperation _waiterSingleton; + + internal UnboundedPrioritizedChannelReader(UnboundedPrioritizedChannel parent) + { + _parent = parent; + _readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); + _waiterSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); + } + + public override Task Completion => _parent._completion.Task; + + public override bool CanCount => true; + + public override bool CanPeek => true; + + public override int Count => _parent._items.Count; + + public override ValueTask ReadAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + // Dequeue an item if we can. + UnboundedPrioritizedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Try to dequeue again, now that we hold the lock. + if (parent._items.TryDequeue(out _, out T? item)) + { + CompleteIfDone(parent); + return new ValueTask(item); + } + + // There are no items, so if we're done writing, fail. + if (parent._doneWriting != null) + { + return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting); + } + + // If we're able to use the singleton reader, do so. + if (!cancellationToken.CanBeCanceled) + { + AsyncOperation singleton = _readerSingleton; + if (singleton.TryOwnAndReset()) + { + parent._blockedReaders.EnqueueTail(singleton); + return singleton.ValueTaskOfT; + } + } + + // Otherwise, create and queue a reader. + var reader = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken); + parent._blockedReaders.EnqueueTail(reader); + return reader.ValueTaskOfT; + } + } + + public override bool TryRead([MaybeNullWhen(false)] out T item) + { + UnboundedPrioritizedChannel parent = _parent; + lock (parent.SyncObj) + { + // Dequeue an item if we can + if (parent._items.TryDequeue(out _, out item)) + { + CompleteIfDone(parent); + return true; + } + + item = default; + return false; + } + } + + public override bool TryPeek([MaybeNullWhen(false)] out T item) => + _parent._items.TryPeek(out _, out item); + + private static void CompleteIfDone(UnboundedPrioritizedChannel parent) + { + Debug.Assert(Monitor.IsEntered(parent.SyncObj)); + + if (parent._doneWriting != null && parent._items.Count == 0) + { + // If we've now emptied the items queue and we're not getting any more, complete. + ChannelUtilities.Complete(parent._completion, parent._doneWriting); + } + } + + public override ValueTask WaitToReadAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + UnboundedPrioritizedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Try again to read now that we're synchronized with writers. + if (parent._items.Count != 0) + { + return new ValueTask(true); + } + + // There are no items, so if we're done writing, there's never going to be data available. + if (parent._doneWriting != null) + { + return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? + ValueTask.FromException(parent._doneWriting) : + default; + } + + // If we're able to use the singleton waiter, do so. + if (!cancellationToken.CanBeCanceled) + { + AsyncOperation singleton = _waiterSingleton; + if (singleton.TryOwnAndReset()) + { + ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, singleton); + return singleton.ValueTaskOfT; + } + } + + // Otherwise, create and queue a waiter. + var waiter = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken); + ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, waiter); + return waiter.ValueTaskOfT; + } + } + + /// Gets an enumerator the debugger can use to show the contents of the channel. + IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator(); + } + + [DebuggerDisplay("Items = {ItemsCountForDebugger}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + private sealed class UnboundedPrioritizedChannelWriter : ChannelWriter, IDebugEnumerable + { + internal readonly UnboundedPrioritizedChannel _parent; + internal UnboundedPrioritizedChannelWriter(UnboundedPrioritizedChannel parent) => _parent = parent; + + public override bool TryComplete(Exception? error) + { + UnboundedPrioritizedChannel parent = _parent; + bool completeTask; + + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If we've already marked the channel as completed, bail. + if (parent._doneWriting != null) + { + return false; + } + + // Mark that we're done writing. + parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; + completeTask = parent._items.Count == 0; + } + + // If there are no items in the queue, complete the channel's task, + // as no more data can possibly arrive at this point. We do this outside + // of the lock in case we'll be running synchronous completions, and we + // do it before completing blocked/waiting readers, so that when they + // wake up they'll see the task as being completed. + if (completeTask) + { + ChannelUtilities.Complete(parent._completion, error); + } + + // At this point, _blockedReaders and _waitingReaders will not be mutated: + // they're only mutated by readers while holding the lock, and only if _doneWriting is null. + // freely manipulate _blockedReaders and _waitingReaders without any concurrency concerns. + ChannelUtilities.FailOperations, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error)); + ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error: error); + + // Successfully transitioned to completed. + return true; + } + + public override bool TryWrite(T item) + { + UnboundedPrioritizedChannel parent = _parent; + while (true) + { + AsyncOperation? blockedReader = null; + AsyncOperation? waitingReadersTail = null; + lock (parent.SyncObj) + { + // If writing has already been marked as done, fail the write. + parent.AssertInvariants(); + if (parent._doneWriting != null) + { + return false; + } + + // If there aren't any blocked readers, just add the data to the queue, + // and let any waiting readers know that they should try to read it. + // We can only complete such waiters here under the lock if they run + // continuations asynchronously (otherwise the synchronous continuations + // could be invoked under the lock). If we don't complete them here, we + // need to do so outside of the lock. + if (parent._blockedReaders.IsEmpty) + { + parent._items.Enqueue(true, item); + waitingReadersTail = parent._waitingReadersTail; + if (waitingReadersTail == null) + { + return true; + } + parent._waitingReadersTail = null; + } + else + { + // There were blocked readers. Grab one, and then complete it outside of the lock. + blockedReader = parent._blockedReaders.DequeueHead(); + } + } + + if (blockedReader != null) + { + // Complete the reader. It's possible the reader was canceled, in which + // case we loop around to try everything again. + if (blockedReader.TrySetResult(item)) + { + return true; + } + } + else + { + // Wake up all of the waiters. Since we've released the lock, it's possible + // we could cause some spurious wake-ups here, if we tell a waiter there's + // something available but all data has already been removed. It's a benign + // race condition, though, as consumers already need to account for such things. + ChannelUtilities.WakeUpWaiters(ref waitingReadersTail, result: true); + return true; + } + } + } + + public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken) + { + Exception? doneWriting = _parent._doneWriting; + return + cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) : + doneWriting == null ? new ValueTask(true) : // unbounded writing can always be done if we haven't completed + doneWriting != ChannelUtilities.s_doneWritingSentinel ? ValueTask.FromException(doneWriting) : + default; + } + + public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) => + cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) : + TryWrite(item) ? default : + ValueTask.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting)); + + /// Gets the number of items in the channel. This should only be used by the debugger. + private int ItemsCountForDebugger => _parent._items.Count; + + /// Gets an enumerator the debugger can use to show the contents of the channel. + IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator(); + } + + /// Gets the object used to synchronize access to all state on this instance. + private object SyncObj => _items; + + [Conditional("DEBUG")] + private void AssertInvariants() + { + Debug.Assert(SyncObj != null, "The sync obj must not be null."); + Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock."); + + if (_items.Count != 0) + { + if (_runContinuationsAsynchronously) + { + Debug.Assert(_blockedReaders.IsEmpty, "There's data available, so there shouldn't be any blocked readers."); + Debug.Assert(_waitingReadersTail == null, "There's data available, so there shouldn't be any waiting readers."); + } + Debug.Assert(!_completion.Task.IsCompleted, "We still have data available, so shouldn't be completed."); + } + if ((!_blockedReaders.IsEmpty || _waitingReadersTail != null) && _runContinuationsAsynchronously) + { + Debug.Assert(_items.Count == 0, "There are blocked/waiting readers, so there shouldn't be any data available."); + } + if (_completion.Task.IsCompleted) + { + Debug.Assert(_doneWriting != null, "We're completed, so we must be done writing."); + } + } + + /// Gets the number of items in the channel. This should only be used by the debugger. + private int ItemsCountForDebugger => _items.Count; + + /// Report if the channel is closed or not. This should only be used by the debugger. + private bool ChannelIsClosedForDebugger => _doneWriting != null; + + /// Gets an enumerator the debugger can use to show the contents of the channel. + public IEnumerator GetEnumerator() + { + List list = []; + foreach ((bool _, T Priority) item in _items.UnorderedItems) + { + list.Add(item.Priority); + } + + list.Sort(_items.Comparer); + + return list.GetEnumerator(); + } + } +} From b0addff57218fadbfb1e966b7f5af907db9d1bb3 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Tue, 11 Jun 2024 21:37:03 -0400 Subject: [PATCH 2/2] Put back lock This fix has been part of the previous deduping. --- .../Threading/Channels/UnboundedPriorityChannel.cs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs index 7af18b9413ee2..a6eaba556c1f3 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs @@ -127,8 +127,14 @@ public override bool TryRead([MaybeNullWhen(false)] out T item) } } - public override bool TryPeek([MaybeNullWhen(false)] out T item) => - _parent._items.TryPeek(out _, out item); + public override bool TryPeek([MaybeNullWhen(false)] out T item) + { + UnboundedPrioritizedChannel parent = _parent; + lock (parent.SyncObj) + { + return parent._items.TryPeek(out _, out item); + } + } private static void CompleteIfDone(UnboundedPrioritizedChannel parent) { @@ -194,6 +200,7 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo private sealed class UnboundedPrioritizedChannelWriter : ChannelWriter, IDebugEnumerable { internal readonly UnboundedPrioritizedChannel _parent; + internal UnboundedPrioritizedChannelWriter(UnboundedPrioritizedChannel parent) => _parent = parent; public override bool TryComplete(Exception? error)