From 8a1b877b6d676fb7e18a19b62c019ee27b6c61da Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Mon, 1 May 2023 20:29:05 +0200 Subject: [PATCH] Fixed Request Executor Event Memory Leak (#6106) --- .../Warmup/ExecutorWarmupService.cs | 32 ++++- .../src/Execution/IRequestExecutorResolver.cs | 7 ++ .../src/Execution/RequestExecutorEvent.cs | 46 +++++++ .../Execution/RequestExecutorEventObserver.cs | 59 +++++++++ .../src/Execution/RequestExecutorEventType.cs | 17 +++ .../RequestExecutorEvictedEventArgs.cs | 18 +++ .../src/Execution/RequestExecutorProxy.cs | 33 ++++- .../src/Execution/RequestExecutorResolver.cs | 115 ++++++++++++++++++ .../Types.Mutations/Errors/ErrorMiddleware.cs | 2 +- .../ErrorMiddlewareTests.cs | 3 +- .../Integration/FederatedRedisSchemaTests.cs | 32 ++--- 11 files changed, 344 insertions(+), 20 deletions(-) create mode 100644 src/HotChocolate/Core/src/Execution/RequestExecutorEvent.cs create mode 100644 src/HotChocolate/Core/src/Execution/RequestExecutorEventObserver.cs create mode 100644 src/HotChocolate/Core/src/Execution/RequestExecutorEventType.cs diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Warmup/ExecutorWarmupService.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Warmup/ExecutorWarmupService.cs index 5261442c95b..5b211f8782f 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Warmup/ExecutorWarmupService.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Warmup/ExecutorWarmupService.cs @@ -6,6 +6,7 @@ internal class ExecutorWarmupService : BackgroundService { private readonly IRequestExecutorResolver _executorResolver; private readonly Dictionary _tasks; + private IDisposable? _eventSubscription; private CancellationToken _stopping; public ExecutorWarmupService( @@ -25,7 +26,8 @@ public ExecutorWarmupService( protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _stopping = stoppingToken; - _executorResolver.RequestExecutorEvicted += (_, args) => BeginWarmup(args.Name); + _eventSubscription = _executorResolver.Events.Subscribe( + new WarmupObserver(name => BeginWarmup(name))); foreach (var task in _tasks) { @@ -62,4 +64,32 @@ private async Task WarmupAsync( await warmup.ExecuteAsync(executor, ct); } } + + public override void Dispose() + { + _eventSubscription?.Dispose(); + base.Dispose(); + } + + private sealed class WarmupObserver : IObserver + { + public WarmupObserver(Action onEvicted) + { + OnEvicted = onEvicted; + } + + public Action OnEvicted { get; } + + public void OnNext(RequestExecutorEvent value) + { + if (value.Type is RequestExecutorEventType.Evicted) + { + OnEvicted(value.Name); + } + } + + public void OnError(Exception error) { } + + public void OnCompleted() { } + } } diff --git a/src/HotChocolate/Core/src/Execution/IRequestExecutorResolver.cs b/src/HotChocolate/Core/src/Execution/IRequestExecutorResolver.cs index ca8e9fe6087..b452b466f76 100644 --- a/src/HotChocolate/Core/src/Execution/IRequestExecutorResolver.cs +++ b/src/HotChocolate/Core/src/Execution/IRequestExecutorResolver.cs @@ -14,8 +14,14 @@ public interface IRequestExecutorResolver /// The consumers of a request executor shall subscribe to this event /// in order to release once this event is triggered. /// + [Obsolete("Use the events property instead.")] event EventHandler? RequestExecutorEvicted; + /// + /// An event that is raised when a request executor is created or evicted. + /// + IObservable Events { get; } + /// /// Gets or creates the request executor that is associated with the /// given configuration . @@ -46,3 +52,4 @@ ValueTask GetRequestExecutorAsync( /// void EvictRequestExecutor(string? schemaName = default); } + diff --git a/src/HotChocolate/Core/src/Execution/RequestExecutorEvent.cs b/src/HotChocolate/Core/src/Execution/RequestExecutorEvent.cs new file mode 100644 index 00000000000..e1ec5801baa --- /dev/null +++ b/src/HotChocolate/Core/src/Execution/RequestExecutorEvent.cs @@ -0,0 +1,46 @@ +using System; + +namespace HotChocolate.Execution; + +/// +/// Represents the event arguments of a request executor evicted event. +/// +public sealed class RequestExecutorEvent : EventArgs +{ + /// + /// Initializes a new instance of . + /// + /// + /// The type of the event. + /// + /// + /// The name of the request executor that is being evicted. + /// + /// + /// The request executor that is being evicted. + /// + internal RequestExecutorEvent( + RequestExecutorEventType type, + string name, + IRequestExecutor executor) + { + Type = type; + Name = name; + Executor = executor; + } + + /// + /// Gets the type of the event. + /// + public RequestExecutorEventType Type { get; } + + /// + /// Gets the name of the request executor that is being evicted. + /// + public string Name { get; } + + /// + /// Gets the request executor that is being evicted. + /// + public IRequestExecutor Executor { get; } +} diff --git a/src/HotChocolate/Core/src/Execution/RequestExecutorEventObserver.cs b/src/HotChocolate/Core/src/Execution/RequestExecutorEventObserver.cs new file mode 100644 index 00000000000..ec5eb5130ee --- /dev/null +++ b/src/HotChocolate/Core/src/Execution/RequestExecutorEventObserver.cs @@ -0,0 +1,59 @@ +using System; + +namespace HotChocolate.Execution; + +/// +/// Represents an observer that can be used to subscribe to the request executor events. +/// +public sealed class RequestExecutorEventObserver : IObserver +{ + private readonly Action? _onNext; + private readonly Action? _onError; + private readonly Action? _onCompleted; + + /// + /// Initializes a new instance of . + /// + /// + /// The action that is invoked when a new event is received. + /// + /// + /// The action that is invoked when an error occurs. + /// + /// + /// The action that is invoked when the observer is completed. + /// + public RequestExecutorEventObserver( + Action? onNext = null, + Action? onError = null, + Action? onCompleted = null) + { + _onNext = onNext; + _onError = onError; + _onCompleted = onCompleted; + } + + /// + /// Invoked when a new event is received. + /// + /// + /// The event that was received. + /// + public void OnNext(RequestExecutorEvent value) + => _onNext?.Invoke(value); + + /// + /// Invoked when an error occurs. + /// + /// + /// The error that occurred. + /// + public void OnError(Exception error) + => _onError?.Invoke(error); + + /// + /// Invoked when the observer is completed. + /// + public void OnCompleted() + => _onCompleted?.Invoke(); +} diff --git a/src/HotChocolate/Core/src/Execution/RequestExecutorEventType.cs b/src/HotChocolate/Core/src/Execution/RequestExecutorEventType.cs new file mode 100644 index 00000000000..90f4ef87658 --- /dev/null +++ b/src/HotChocolate/Core/src/Execution/RequestExecutorEventType.cs @@ -0,0 +1,17 @@ +namespace HotChocolate.Execution; + +/// +/// Defines the possible event types of a request executor. +/// +public enum RequestExecutorEventType +{ + /// + /// A request executor was created. + /// + Created, + + /// + /// A request executor was evicted. + /// + Evicted +} diff --git a/src/HotChocolate/Core/src/Execution/RequestExecutorEvictedEventArgs.cs b/src/HotChocolate/Core/src/Execution/RequestExecutorEvictedEventArgs.cs index c55c0a3590e..973e0642eee 100644 --- a/src/HotChocolate/Core/src/Execution/RequestExecutorEvictedEventArgs.cs +++ b/src/HotChocolate/Core/src/Execution/RequestExecutorEvictedEventArgs.cs @@ -2,15 +2,33 @@ namespace HotChocolate.Execution; +/// +/// Represents the event arguments of a request executor evicted event. +/// public sealed class RequestExecutorEvictedEventArgs : EventArgs { + /// + /// Initializes a new instance of . + /// + /// + /// The name of the request executor that is being evicted. + /// + /// + /// The request executor that is being evicted. + /// public RequestExecutorEvictedEventArgs(string name, IRequestExecutor evictedExecutor) { Name = name; EvictedExecutor = evictedExecutor; } + /// + /// Gets the name of the request executor that is being evicted. + /// public string Name { get; } + /// + /// Gets the request executor that is being evicted. + /// public IRequestExecutor EvictedExecutor { get; } } diff --git a/src/HotChocolate/Core/src/Execution/RequestExecutorProxy.cs b/src/HotChocolate/Core/src/Execution/RequestExecutorProxy.cs index 4bd5eda8129..bed80514cd7 100644 --- a/src/HotChocolate/Core/src/Execution/RequestExecutorProxy.cs +++ b/src/HotChocolate/Core/src/Execution/RequestExecutorProxy.cs @@ -15,6 +15,7 @@ public sealed class RequestExecutorProxy : IDisposable private readonly IRequestExecutorResolver _executorResolver; private readonly string _schemaName; private IRequestExecutor? _executor; + private IDisposable? _eventSubscription; private bool _disposed; public event EventHandler? ExecutorUpdated; @@ -31,7 +32,9 @@ public RequestExecutorProxy(IRequestExecutorResolver executorResolver, string sc _executorResolver = executorResolver ?? throw new ArgumentNullException(nameof(executorResolver)); _schemaName = schemaName; - _executorResolver.RequestExecutorEvicted += EvictRequestExecutor; + _eventSubscription = + _executorResolver.Events.Subscribe( + new ExecutorObserver(name => EvictRequestExecutor(name))); } /// @@ -176,11 +179,12 @@ public async ValueTask GetRequestExecutorAsync( return executor; } - private void EvictRequestExecutor(object? sender, RequestExecutorEvictedEventArgs args) + private void EvictRequestExecutor(string schemaName) { - if (!_disposed && args.Name.Equals(_schemaName)) + if (!_disposed && schemaName.Equals(_schemaName)) { _semaphore.Wait(); + try { _executor = null; @@ -198,8 +202,31 @@ public void Dispose() if (!_disposed) { _executor = null; + _eventSubscription?.Dispose(); _semaphore.Dispose(); _disposed = true; } } + + private sealed class ExecutorObserver : IObserver + { + public ExecutorObserver(Action evicted) + { + Evicted = evicted; + } + + public Action Evicted { get; } + + public void OnNext(RequestExecutorEvent value) + { + if (value.Type is RequestExecutorEventType.Evicted) + { + Evicted(value.Name); + } + } + + public void OnError(Exception error) { } + + public void OnCompleted() { } + } } diff --git a/src/HotChocolate/Core/src/Execution/RequestExecutorResolver.cs b/src/HotChocolate/Core/src/Execution/RequestExecutorResolver.cs index f330e736593..bc8d677075e 100644 --- a/src/HotChocolate/Core/src/Execution/RequestExecutorResolver.cs +++ b/src/HotChocolate/Core/src/Execution/RequestExecutorResolver.cs @@ -41,9 +41,11 @@ internal sealed partial class RequestExecutorResolver private readonly ConcurrentDictionary _executors = new(); private readonly IRequestExecutorOptionsMonitor _optionsMonitor; private readonly IServiceProvider _applicationServices; + private readonly EventObservable _events = new(); private ulong _version; private bool _disposed; + [Obsolete("Use the events property instead.")] public event EventHandler? RequestExecutorEvicted; public RequestExecutorResolver( @@ -63,6 +65,8 @@ public RequestExecutorResolver( #endif } + public IObservable Events => _events; + public async ValueTask GetRequestExecutorAsync( string? schemaName = default, CancellationToken cancellationToken = default) @@ -124,6 +128,12 @@ await OnRequestExecutorCreatedAsync(context, executor, setup, cancellationToken) schemaName, registeredExecutor.Executor); _executors.TryAdd(schemaName, registeredExecutor); + + _events.RaiseEvent( + new RequestExecutorEvent( + RequestExecutorEventType.Created, + schemaName, + registeredExecutor.Executor)); } return registeredExecutor.Executor; @@ -142,6 +152,11 @@ public void EvictRequestExecutor(string? schemaName = default) RequestExecutorEvicted?.Invoke( this, new RequestExecutorEvictedEventArgs(schemaName, re.Executor)); + _events.RaiseEvent( + new RequestExecutorEvent( + RequestExecutorEventType.Evicted, + schemaName, + re.Executor)); } finally { @@ -164,6 +179,11 @@ private void EvictAllRequestExecutors() RequestExecutorEvicted?.Invoke( this, new RequestExecutorEvictedEventArgs(key, re.Executor)); + _events.RaiseEvent( + new RequestExecutorEvent( + RequestExecutorEventType.Evicted, + key, + re.Executor)); } finally { @@ -407,6 +427,7 @@ public void Dispose() { if (!_disposed) { + _events.Dispose(); _executors.Clear(); _semaphore.Dispose(); _disposed = true; @@ -584,6 +605,100 @@ public override bool Return(RequestContext obj) } } + private sealed class EventObservable : IObservable, IDisposable + { + private readonly object _sync = new(); + private readonly List _subscriptions = new(); + private bool _disposed; + + public IDisposable Subscribe(IObserver observer) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(EventObservable)); + } + + if (observer is null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var subscription = new Subscription(this, observer); + + lock (_sync) + { + _subscriptions.Add(subscription); + } + + return subscription; + } + + public void RaiseEvent(RequestExecutorEvent eventMessage) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(EventObservable)); + } + + lock (_sync) + { + foreach (var subscription in _subscriptions) + { + subscription.Observer.OnNext(eventMessage); + } + } + } + + private void Unsubscribe(Subscription subscription) + { + lock (_sync) + { + _subscriptions.Remove(subscription); + } + } + + public void Dispose() + { + if (!_disposed) + { + lock (_sync) + { + foreach (var subscription in _subscriptions) + { + subscription.Observer.OnCompleted(); + } + + _subscriptions.Clear(); + } + + _disposed = true; + } + } + + private sealed class Subscription : IDisposable + { + private readonly EventObservable _parent; + private bool _disposed; + + public Subscription(EventObservable parent, IObserver observer) + { + _parent = parent; + Observer = observer; + } + + public IObserver Observer { get; } + + public void Dispose() + { + if (!_disposed) + { + _parent.Unsubscribe(this); + _disposed = true; + } + } + } + } + #if NET6_0_OR_GREATER /// /// A helper calls that receives hot reload update events from the runtime and triggers diff --git a/src/HotChocolate/Core/src/Types.Mutations/Errors/ErrorMiddleware.cs b/src/HotChocolate/Core/src/Types.Mutations/Errors/ErrorMiddleware.cs index edb4888b019..bdd2a55ee3f 100644 --- a/src/HotChocolate/Core/src/Types.Mutations/Errors/ErrorMiddleware.cs +++ b/src/HotChocolate/Core/src/Types.Mutations/Errors/ErrorMiddleware.cs @@ -56,7 +56,7 @@ public async ValueTask InvokeAsync(IMiddlewareContext context) // if we have some errors that we could not handle // we will report them as GraphQL errors. - if(unhandledErrors.Count > 0) + if(unhandledErrors?.Count > 0) { foreach (var unhandledError in unhandledErrors) { diff --git a/src/HotChocolate/Core/test/Types.Mutations.Tests/ErrorMiddlewareTests.cs b/src/HotChocolate/Core/test/Types.Mutations.Tests/ErrorMiddlewareTests.cs index c11c4b60a23..97c7b776550 100644 --- a/src/HotChocolate/Core/test/Types.Mutations.Tests/ErrorMiddlewareTests.cs +++ b/src/HotChocolate/Core/test/Types.Mutations.Tests/ErrorMiddlewareTests.cs @@ -88,7 +88,8 @@ await BuildSchemaAsync( new InvalidOperationException(), new NullReferenceException(), new ArgumentException()), - field => field.Error() + field => field + .Error() .Error() .Error()); diff --git a/src/HotChocolate/Stitching/test/Stitching.Tests/Integration/FederatedRedisSchemaTests.cs b/src/HotChocolate/Stitching/test/Stitching.Tests/Integration/FederatedRedisSchemaTests.cs index 015a340f23a..d60491c8b7e 100644 --- a/src/HotChocolate/Stitching/test/Stitching.Tests/Integration/FederatedRedisSchemaTests.cs +++ b/src/HotChocolate/Stitching/test/Stitching.Tests/Integration/FederatedRedisSchemaTests.cs @@ -102,13 +102,15 @@ public async Task AutoMerge_HotReload_Schema() await executorResolver.GetRequestExecutorAsync(cancellationToken: cts.Token); var raised = false; - executorResolver.RequestExecutorEvicted += (_, args) => - { - if (args.Name.Equals(Schema.DefaultName)) - { - raised = true; - } - }; + using var session = executorResolver.Events.Subscribe( + new RequestExecutorEventObserver( + args => + { + if (args.Name.Equals(Schema.DefaultName)) + { + raised = true; + } + })); // act Assert.False(raised, "eviction was raised before act."); @@ -174,13 +176,15 @@ public async Task AutoMerge_HotReload_ClearOperationCaches() await executorResolver.GetRequestExecutorAsync(cancellationToken: cts.Token); var raised = false; - executorResolver.RequestExecutorEvicted += (_, args) => - { - if (args.Name.Equals(Schema.DefaultName)) - { - raised = true; - } - }; + using var session = executorResolver.Events.Subscribe( + new RequestExecutorEventObserver( + args => + { + if (args.Name.Equals(Schema.DefaultName)) + { + raised = true; + } + })); Assert.False(documentCache.TryGetDocument(queryHash, out _)); Assert.False(preparedOperationCache.TryGetOperation(queryHash, out _));