From 800061b0d5f1a9fdd2af31aa6af7060b01a22c90 Mon Sep 17 00:00:00 2001 From: skwasjer <11424653+skwasjer@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:22:48 +0200 Subject: [PATCH] chore: replace our own TaskHelpers with a more robust impl., adapted from RestSharp/Rebus. We cannot really avoid doing sync over async in some areas, specifically in older .NET SDK's which do not have sync HttpClient API's. --- .../Extensions/ResponseBuilderExtensions.cs | 2 +- src/MockHttp/MockHttpHandler.cs | 2 +- src/MockHttp/Threading/AsyncHelpers.cs | 170 ++++++++++++++++++ src/MockHttp/Threading/TaskHelpers.cs | 58 ------ 4 files changed, 172 insertions(+), 60 deletions(-) create mode 100644 src/MockHttp/Threading/AsyncHelpers.cs delete mode 100644 src/MockHttp/Threading/TaskHelpers.cs diff --git a/src/MockHttp/Extensions/ResponseBuilderExtensions.cs b/src/MockHttp/Extensions/ResponseBuilderExtensions.cs index 3bcff83f..8ae76e82 100644 --- a/src/MockHttp/Extensions/ResponseBuilderExtensions.cs +++ b/src/MockHttp/Extensions/ResponseBuilderExtensions.cs @@ -54,7 +54,7 @@ public static IWithContentResult Body(this IWithContent builder, HttpContent htt #if NET6_0_OR_GREATER using Stream stream = httpContent.ReadAsStream(); #else - using Stream stream = Threading.TaskHelpers.RunSync(httpContent.ReadAsStreamAsync, TimeSpan.FromMinutes(1)); + using Stream stream = Threading.AsyncHelpers.RunSync(httpContent.ReadAsStreamAsync); #endif return (IWithContentResult)BufferedStreamBody(builder, stream) .Headers(httpContent.Headers.Select(kvp => new KeyValuePair>(kvp.Key, kvp.Value))); diff --git a/src/MockHttp/MockHttpHandler.cs b/src/MockHttp/MockHttpHandler.cs index baf1575a..1aa48c41 100644 --- a/src/MockHttp/MockHttpHandler.cs +++ b/src/MockHttp/MockHttpHandler.cs @@ -147,7 +147,7 @@ public void Verify(Action matching, Func times, string? /// public void Verify(Action matching, IsSent times, string? because = null) { - TaskHelpers.RunSync(() => VerifyAsync(matching, times, because), TimeSpan.FromSeconds(30)); + AsyncHelpers.RunSync(() => VerifyAsync(matching, times, because)); } /// diff --git a/src/MockHttp/Threading/AsyncHelpers.cs b/src/MockHttp/Threading/AsyncHelpers.cs new file mode 100644 index 00000000..c79cf020 --- /dev/null +++ b/src/MockHttp/Threading/AsyncHelpers.cs @@ -0,0 +1,170 @@ +// Copyright (c) .NET Foundation and Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from Rebus +// Adapted from RestSharp (sha: 159c8a79963b): +// - dispose ManualResetEvent +// - added DebuggerStepThroughAttribute +// - suppress SonarCloud lint S927 + +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.ExceptionServices; + +namespace MockHttp.Threading; + +#pragma warning disable S927 +[DebuggerStepThrough] +internal static class AsyncHelpers +{ + /// + /// Executes a task synchronously on the calling thread by installing a temporary synchronization context that queues continuations + /// + /// Callback for asynchronous task to run + public static void RunSync(Func task) + { + SynchronizationContext? currentContext = SynchronizationContext.Current; + using var customContext = new CustomSynchronizationContext(task); + + try + { + SynchronizationContext.SetSynchronizationContext(customContext); + customContext.Run(); + } + finally + { + SynchronizationContext.SetSynchronizationContext(currentContext); + } + } + + /// + /// Executes a task synchronously on the calling thread by installing a temporary synchronization context that queues continuations + /// + /// Callback for asynchronous task to run + /// Return type for the task + /// Return value from the task + public static T RunSync(Func> task) + { + T result = default!; +#pragma warning disable CA2007 + RunSync(async () => { result = await task(); }); +#pragma warning restore CA2007 + return result; + } + + /// + /// Synchronization context that can be "pumped" in order to have it execute continuations posted back to it + /// + private sealed class CustomSynchronizationContext : SynchronizationContext, IDisposable + { + private readonly ConcurrentQueue> _items = new(); + private readonly Func _task; + private readonly AutoResetEvent _workItemsWaiting = new(false); + private ExceptionDispatchInfo? _caughtException; + private bool _done; + + /// + /// Constructor for the custom context + /// + /// Task to execute + public CustomSynchronizationContext(Func task) + { + _task = task ?? throw new ArgumentNullException(nameof(task), "Please remember to pass a Task to be executed"); + } + + public void Dispose() + { + _workItemsWaiting.Dispose(); + } + + /// + /// When overridden in a derived class, dispatches an asynchronous message to a synchronization context. + /// + /// Callback function + /// Callback state + public override void Post(SendOrPostCallback function, object? state) + { + _items.Enqueue(Tuple.Create(function, state)); + _workItemsWaiting.Set(); + } + + /// + /// Enqueues the function to be executed and executes all resulting continuations until it is completely done + /// + public void Run() + { + Post(PostCallback, null); + + while (!_done) + { + if (_items.TryDequeue(out Tuple? task)) + { + task.Item1(task.Item2); + if (_caughtException is null) + { + continue; + } + + _caughtException.Throw(); + } + else + { + _workItemsWaiting.WaitOne(); + } + } + + return; + + async void PostCallback(object? _) + { + try + { + await _task().ConfigureAwait(false); + } + catch (Exception exception) + { + _caughtException = ExceptionDispatchInfo.Capture(exception); + throw; + } + finally + { + Post(_ => _done = true, null); + } + } + } + + /// + /// When overridden in a derived class, dispatches a synchronous message to a synchronization context. + /// + /// Callback function + /// Callback state + [ExcludeFromCodeCoverage] + public override void Send(SendOrPostCallback function, object? state) + { + throw new NotSupportedException("Cannot send to same thread"); + } + + /// + /// When overridden in a derived class, creates a copy of the synchronization context. Not needed, so just return ourselves. + /// + /// Copy of the context + [ExcludeFromCodeCoverage] + public override SynchronizationContext CreateCopy() + { + return this; + } + } +} +#pragma warning restore S927 diff --git a/src/MockHttp/Threading/TaskHelpers.cs b/src/MockHttp/Threading/TaskHelpers.cs deleted file mode 100644 index db0d4c58..00000000 --- a/src/MockHttp/Threading/TaskHelpers.cs +++ /dev/null @@ -1,58 +0,0 @@ -using System.Runtime.ExceptionServices; - -namespace MockHttp.Threading; - -internal static class TaskHelpers -{ - public static T RunSync(Func> action, TimeSpan timeout) - { - Task? task = null; - - RunSync(() => - { - task = action(); - return (Task)task; - }, - timeout); - - return task is null ? default! : task.Result; - } - - public static void RunSync(Func action, TimeSpan timeout) - { - if (SynchronizationContext.Current is null) - { - RunSyncAndWait(action, timeout); - } - else - { - RunSyncAndWait(() => Task.Factory.StartNew(action, - CancellationToken.None, - TaskCreationOptions.None, - TaskScheduler.Default - ) - .Unwrap(), - timeout); - } - } - - private static void RunSyncAndWait(Func action, TimeSpan timeout) - { - try - { - action().Wait(timeout); - } - catch (AggregateException ex) - { - AggregateException flattened = ex.Flatten(); - if (flattened.InnerExceptions.Count == 1) - { - ExceptionDispatchInfo.Capture(ex.InnerException!).Throw(); - } - else - { - throw; - } - } - } -}