Skip to content

Commit

Permalink
Fixed Request Executor Event Memory Leak (#6106)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib authored May 1, 2023
1 parent 9f9bc75 commit 8a1b877
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ internal class ExecutorWarmupService : BackgroundService
{
private readonly IRequestExecutorResolver _executorResolver;
private readonly Dictionary<string, WarmupSchemaTask[]> _tasks;
private IDisposable? _eventSubscription;
private CancellationToken _stopping;

public ExecutorWarmupService(
Expand All @@ -25,7 +26,8 @@ public ExecutorWarmupService(
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_stopping = stoppingToken;
_executorResolver.RequestExecutorEvicted += (_, args) => BeginWarmup(args.Name);
_eventSubscription = _executorResolver.Events.Subscribe(
new WarmupObserver(name => BeginWarmup(name)));

foreach (var task in _tasks)
{
Expand Down Expand Up @@ -62,4 +64,32 @@ private async Task WarmupAsync(
await warmup.ExecuteAsync(executor, ct);
}
}

public override void Dispose()
{
_eventSubscription?.Dispose();
base.Dispose();
}

private sealed class WarmupObserver : IObserver<RequestExecutorEvent>
{
public WarmupObserver(Action<string> onEvicted)
{
OnEvicted = onEvicted;
}

public Action<string> OnEvicted { get; }

public void OnNext(RequestExecutorEvent value)
{
if (value.Type is RequestExecutorEventType.Evicted)
{
OnEvicted(value.Name);
}
}

public void OnError(Exception error) { }

public void OnCompleted() { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ public interface IRequestExecutorResolver
/// The consumers of a request executor shall subscribe to this event
/// in order to release once this event is triggered.
/// </summary>
[Obsolete("Use the events property instead.")]
event EventHandler<RequestExecutorEvictedEventArgs>? RequestExecutorEvicted;

/// <summary>
/// An event that is raised when a request executor is created or evicted.
/// </summary>
IObservable<RequestExecutorEvent> Events { get; }

/// <summary>
/// Gets or creates the request executor that is associated with the
/// given configuration <paramref name="schemaName" />.
Expand Down Expand Up @@ -46,3 +52,4 @@ ValueTask<IRequestExecutor> GetRequestExecutorAsync(
/// </param>
void EvictRequestExecutor(string? schemaName = default);
}

46 changes: 46 additions & 0 deletions src/HotChocolate/Core/src/Execution/RequestExecutorEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;

namespace HotChocolate.Execution;

/// <summary>
/// Represents the event arguments of a request executor evicted event.
/// </summary>
public sealed class RequestExecutorEvent : EventArgs
{
/// <summary>
/// Initializes a new instance of <see cref="RequestExecutorEvent" />.
/// </summary>
/// <param name="type">
/// The type of the event.
/// </param>
/// <param name="name">
/// The name of the request executor that is being evicted.
/// </param>
/// <param name="executor">
/// The request executor that is being evicted.
/// </param>
internal RequestExecutorEvent(
RequestExecutorEventType type,
string name,
IRequestExecutor executor)
{
Type = type;
Name = name;
Executor = executor;
}

/// <summary>
/// Gets the type of the event.
/// </summary>
public RequestExecutorEventType Type { get; }

/// <summary>
/// Gets the name of the request executor that is being evicted.
/// </summary>
public string Name { get; }

/// <summary>
/// Gets the request executor that is being evicted.
/// </summary>
public IRequestExecutor Executor { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System;

namespace HotChocolate.Execution;

/// <summary>
/// Represents an observer that can be used to subscribe to the request executor events.
/// </summary>
public sealed class RequestExecutorEventObserver : IObserver<RequestExecutorEvent>
{
private readonly Action<RequestExecutorEvent>? _onNext;
private readonly Action<Exception>? _onError;
private readonly Action? _onCompleted;

/// <summary>
/// Initializes a new instance of <see cref="RequestExecutorEventObserver" />.
/// </summary>
/// <param name="onNext">
/// The action that is invoked when a new event is received.
/// </param>
/// <param name="onError">
/// The action that is invoked when an error occurs.
/// </param>
/// <param name="onCompleted">
/// The action that is invoked when the observer is completed.
/// </param>
public RequestExecutorEventObserver(
Action<RequestExecutorEvent>? onNext = null,
Action<Exception>? onError = null,
Action? onCompleted = null)
{
_onNext = onNext;
_onError = onError;
_onCompleted = onCompleted;
}

/// <summary>
/// Invoked when a new event is received.
/// </summary>
/// <param name="value">
/// The event that was received.
/// </param>
public void OnNext(RequestExecutorEvent value)
=> _onNext?.Invoke(value);

/// <summary>
/// Invoked when an error occurs.
/// </summary>
/// <param name="error">
/// The error that occurred.
/// </param>
public void OnError(Exception error)
=> _onError?.Invoke(error);

/// <summary>
/// Invoked when the observer is completed.
/// </summary>
public void OnCompleted()
=> _onCompleted?.Invoke();
}
17 changes: 17 additions & 0 deletions src/HotChocolate/Core/src/Execution/RequestExecutorEventType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace HotChocolate.Execution;

/// <summary>
/// Defines the possible event types of a request executor.
/// </summary>
public enum RequestExecutorEventType
{
/// <summary>
/// A request executor was created.
/// </summary>
Created,

/// <summary>
/// A request executor was evicted.
/// </summary>
Evicted
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@

namespace HotChocolate.Execution;

/// <summary>
/// Represents the event arguments of a request executor evicted event.
/// </summary>
public sealed class RequestExecutorEvictedEventArgs : EventArgs
{
/// <summary>
/// Initializes a new instance of <see cref="RequestExecutorEvictedEventArgs" />.
/// </summary>
/// <param name="name">
/// The name of the request executor that is being evicted.
/// </param>
/// <param name="evictedExecutor">
/// The request executor that is being evicted.
/// </param>
public RequestExecutorEvictedEventArgs(string name, IRequestExecutor evictedExecutor)
{
Name = name;
EvictedExecutor = evictedExecutor;
}

/// <summary>
/// Gets the name of the request executor that is being evicted.
/// </summary>
public string Name { get; }

/// <summary>
/// Gets the request executor that is being evicted.
/// </summary>
public IRequestExecutor EvictedExecutor { get; }
}
33 changes: 30 additions & 3 deletions src/HotChocolate/Core/src/Execution/RequestExecutorProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public sealed class RequestExecutorProxy : IDisposable
private readonly IRequestExecutorResolver _executorResolver;
private readonly string _schemaName;
private IRequestExecutor? _executor;
private IDisposable? _eventSubscription;
private bool _disposed;

public event EventHandler<RequestExecutorUpdatedEventArgs>? ExecutorUpdated;
Expand All @@ -31,7 +32,9 @@ public RequestExecutorProxy(IRequestExecutorResolver executorResolver, string sc
_executorResolver = executorResolver ??
throw new ArgumentNullException(nameof(executorResolver));
_schemaName = schemaName;
_executorResolver.RequestExecutorEvicted += EvictRequestExecutor;
_eventSubscription =
_executorResolver.Events.Subscribe(
new ExecutorObserver(name => EvictRequestExecutor(name)));
}

/// <summary>
Expand Down Expand Up @@ -176,11 +179,12 @@ public async ValueTask<IRequestExecutor> GetRequestExecutorAsync(
return executor;
}

private void EvictRequestExecutor(object? sender, RequestExecutorEvictedEventArgs args)
private void EvictRequestExecutor(string schemaName)
{
if (!_disposed && args.Name.Equals(_schemaName))
if (!_disposed && schemaName.Equals(_schemaName))
{
_semaphore.Wait();

try
{
_executor = null;
Expand All @@ -198,8 +202,31 @@ public void Dispose()
if (!_disposed)
{
_executor = null;
_eventSubscription?.Dispose();
_semaphore.Dispose();
_disposed = true;
}
}

private sealed class ExecutorObserver : IObserver<RequestExecutorEvent>
{
public ExecutorObserver(Action<string> evicted)
{
Evicted = evicted;
}

public Action<string> Evicted { get; }

public void OnNext(RequestExecutorEvent value)
{
if (value.Type is RequestExecutorEventType.Evicted)
{
Evicted(value.Name);
}
}

public void OnError(Exception error) { }

public void OnCompleted() { }
}
}
Loading

0 comments on commit 8a1b877

Please sign in to comment.