From 074e48b066b903c7fab97543b36e1d26e431b8b7 Mon Sep 17 00:00:00 2001 From: Sergey Kolpachev Date: Tue, 27 Apr 2021 20:09:23 +0300 Subject: [PATCH] Fixed issues #2740 and #3595 (memory leaks in subscriptions) --- .../Subscriptions/KeepConnectionAliveJob.cs | 2 +- .../Subscriptions/MessageProcessor.cs | 51 ++++++++------- .../Subscriptions/MessageReceiver.cs | 18 +++-- .../AspNetCore/Subscriptions/Subscription.cs | 65 ++++++++++++------- .../Subscriptions/WebSocketSession.cs | 14 +++- .../SubscriptionExecutor.Subscription.cs | 6 ++ .../src/Subscriptions.InMemory/EventTopic.cs | 8 ++- .../InMemorySourceStream.cs | 40 +++++++++++- 8 files changed, 145 insertions(+), 59 deletions(-) diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/KeepConnectionAliveJob.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/KeepConnectionAliveJob.cs index 58ed6ca61c4..c35f9c403b7 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/KeepConnectionAliveJob.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/KeepConnectionAliveJob.cs @@ -51,7 +51,7 @@ await _connection.SendAsync( } } } - catch (TaskCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // the message processing was canceled. } diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageProcessor.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageProcessor.cs index ef86a26d163..cff1e15d50d 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageProcessor.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageProcessor.cs @@ -34,38 +34,45 @@ public void Begin(CancellationToken cancellationToken) private async Task ProcessMessagesAsync( CancellationToken cancellationToken) { - while (true) + try { - SequencePosition? position; - ReadResult result = await _reader.ReadAsync(cancellationToken); - ReadOnlySequence buffer = result.Buffer; - - do + while (true) { - position = buffer.PositionOf(Subscription.Delimiter); + SequencePosition? position; + ReadResult result = await _reader.ReadAsync(cancellationToken); + ReadOnlySequence buffer = result.Buffer; - if (position is not null) + do { - await _pipeline.ProcessAsync( - _connection, - buffer.Slice(0, position.Value), - cancellationToken); + position = buffer.PositionOf(Subscription.Delimiter); + + if (position is not null) + { + await _pipeline.ProcessAsync( + _connection, + buffer.Slice(0, position.Value), + cancellationToken); - // Skip the message which was read. - buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); + // Skip the message which was read. + buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); + } } - } - while (position != null); + while (position != null); - _reader.AdvanceTo(buffer.Start, buffer.End); + _reader.AdvanceTo(buffer.Start, buffer.End); - if (result.IsCompleted) - { - break; + if (result.IsCompleted) + { + break; + } } } - - await _reader.CompleteAsync(); + catch(OperationCanceledException) when (cancellationToken.IsCancellationRequested) {} + finally + { + // reader should be completed always, so that related pipe writer can stop write new messages + await _reader.CompleteAsync(); + } } } } diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageReceiver.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageReceiver.cs index 46b84aa71a8..9b183150e48 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageReceiver.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/MessageReceiver.cs @@ -18,13 +18,21 @@ public MessageReceiver(ISocketConnection connection, PipeWriter writer) public async Task ReceiveAsync(CancellationToken cancellationToken) { - while (!_connection.Closed && - !cancellationToken.IsCancellationRequested) + try { - await _connection.ReceiveAsync(_writer, cancellationToken); - await WriteMessageDelimiterAsync(cancellationToken); + while (!_connection.Closed && + !cancellationToken.IsCancellationRequested) + { + await _connection.ReceiveAsync(_writer, cancellationToken); + await WriteMessageDelimiterAsync(cancellationToken); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { } + finally + { + // writer should be always completed + await _writer.CompleteAsync(); } - await _writer.CompleteAsync(); } private async Task WriteMessageDelimiterAsync( diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/Subscription.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/Subscription.cs index 6554fec57ae..7b9fc06b384 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/Subscription.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/Subscription.cs @@ -9,7 +9,7 @@ namespace HotChocolate.AspNetCore.Subscriptions internal sealed class Subscription : ISubscription { internal const byte Delimiter = 0x07; - private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + private readonly CancellationTokenSource _cts; private readonly ISocketConnection _connection; private readonly IResponseStream _responseStream; private bool _disposed; @@ -28,9 +28,11 @@ public Subscription( Id = id ?? throw new ArgumentNullException(nameof(id)); + _cts = CancellationTokenSource.CreateLinkedTokenSource(_connection.RequestAborted); + Task.Factory.StartNew( SendResultsAsync, - CancellationToken.None, + _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } @@ -39,51 +41,64 @@ public Subscription( private async Task SendResultsAsync() { + CancellationToken cancellationToken = _cts.Token; + try { await foreach (IQueryResult result in - _responseStream.ReadResultsAsync().WithCancellation(_cts.Token)) + _responseStream.ReadResultsAsync().WithCancellation(cancellationToken)) { using (result) { - await _connection.SendAsync(new DataResultMessage(Id, result), _cts.Token); + if (!_connection.Closed) + { + await _connection.SendAsync(new DataResultMessage(Id, result), cancellationToken); + } } } - if (!_cts.IsCancellationRequested) + if (!cancellationToken.IsCancellationRequested && !_connection.Closed) { - await _connection.SendAsync(new DataCompleteMessage(Id), _cts.Token); - Completed?.Invoke(this, EventArgs.Empty); + await _connection.SendAsync(new DataCompleteMessage(Id), cancellationToken); } } - catch (OperationCanceledException) { } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { } catch (ObjectDisposedException) { } - catch (Exception ex) + catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { - if (!_cts.IsCancellationRequested) + if (!_connection.Closed) { - IError error = - ErrorBuilder - .New() - .SetException(ex) - .SetCode(ErrorCodes.Execution.TaskProcessingError) - .SetMessage("Unexpected Execution Error") - .Build(); - - IQueryResult result = QueryResultBuilder.CreateError(error); try { - await _connection.SendAsync(new DataResultMessage(Id, result), _cts.Token); - } - finally - { - await _connection.SendAsync(new DataCompleteMessage(Id), _cts.Token); - Completed?.Invoke(this, EventArgs.Empty); + IError error = + ErrorBuilder + .New() + .SetException(ex) + .SetCode(ErrorCodes.Execution.TaskProcessingError) + .SetMessage("Unexpected Execution Error") + .Build(); + + IQueryResult result = QueryResultBuilder.CreateError(error); + try + { + await _connection.SendAsync(new DataResultMessage(Id, result), cancellationToken); + } + finally + { + await _connection.SendAsync(new DataCompleteMessage(Id), cancellationToken); + } + } + catch { } // suppress all errors, so original exception can be rethrown } + + // original exception should be propagated to upper level in order to be logged correctly at least + throw; } finally { + // completed should be always invoked to be ensure that disposed subscription is removed from subscription manager + Completed?.Invoke(this, EventArgs.Empty); Dispose(); } } diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/WebSocketSession.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/WebSocketSession.cs index 569b7e9a42c..8132a8d688b 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/WebSocketSession.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Subscriptions/WebSocketSession.cs @@ -41,7 +41,7 @@ public async Task HandleAsync(CancellationToken cancellationToken) _messageProcessor.Begin(cts.Token); await _messageReceiver.ReceiveAsync(cts.Token); } - catch(OperationCanceledException) + catch(OperationCanceledException) when (cts.Token.IsCancellationRequested) { // OperationCanceledException are catched and will not // bubble further. We will just close the current subscription @@ -49,11 +49,19 @@ public async Task HandleAsync(CancellationToken cancellationToken) } finally { - cts.Cancel(); - await _connection.CloseAsync( + try + { + if (!cts.IsCancellationRequested) + { + cts.Cancel(); + } + + await _connection.CloseAsync( "Session ended.", SocketCloseStatus.NormalClosure, CancellationToken.None); + } + catch {} // original exception must not be lost if new exception occurs during closing session } } } diff --git a/src/HotChocolate/Core/src/Execution/Processing/SubscriptionExecutor.Subscription.cs b/src/HotChocolate/Core/src/Execution/Processing/SubscriptionExecutor.Subscription.cs index 6f80d143f6f..fbea791ef86 100644 --- a/src/HotChocolate/Core/src/Execution/Processing/SubscriptionExecutor.Subscription.cs +++ b/src/HotChocolate/Core/src/Execution/Processing/SubscriptionExecutor.Subscription.cs @@ -127,6 +127,9 @@ private async Task OnEvent(object payload) return await _queryExecutor .ExecuteAsync(operationContext, scopedContext) .ConfigureAwait(false); + + // I believe operation context should be marked as completed here? + //((IExecutionTaskContext)operationContext).Completed(); } finally { @@ -202,6 +205,9 @@ private async ValueTask SubscribeAsync() await rootSelection.Field.SubscribeResolver!.Invoke(middlewareContext) .ConfigureAwait(false); + // mark operation context as Completed in order to free all associated resources and allow to return it to pool + ((IExecutionTaskContext)operationContext).Completed(); + if (operationContext.Result.Errors.Count > 0) { // again if we have any errors we will just throw them and not opening diff --git a/src/HotChocolate/Core/src/Subscriptions.InMemory/EventTopic.cs b/src/HotChocolate/Core/src/Subscriptions.InMemory/EventTopic.cs index d8514fe5a33..f8e196dfc79 100644 --- a/src/HotChocolate/Core/src/Subscriptions.InMemory/EventTopic.cs +++ b/src/HotChocolate/Core/src/Subscriptions.InMemory/EventTopic.cs @@ -123,10 +123,13 @@ private async Task ProcessMessages() ImmutableHashSet> closedChannel = ImmutableHashSet>.Empty; - for (var i = 0; i < _outgoing.Count; i++) + var outgoingCount = _outgoing.Count; + + for (var i = 0; i < outgoingCount; i++) { Channel channel = _outgoing[i]; + // close outgoing channel if related subscription is completed (no reader available) if (!channel.Writer.TryWrite(message) && channel.Reader.Completion.IsCompleted) { @@ -139,7 +142,8 @@ private async Task ProcessMessages() _outgoing.RemoveAll(c => closedChannel.Contains(c)); } - if (_outgoing.Count == 0) + // raises unsubscribed event only once when all outgoing channels (subscriptions) are removed + if (_outgoing.Count == 0 && outgoingCount > 0) { RaiseUnsubscribedEvent(); } diff --git a/src/HotChocolate/Core/src/Subscriptions.InMemory/InMemorySourceStream.cs b/src/HotChocolate/Core/src/Subscriptions.InMemory/InMemorySourceStream.cs index 02eac83ca7d..5b39f8eeee8 100644 --- a/src/HotChocolate/Core/src/Subscriptions.InMemory/InMemorySourceStream.cs +++ b/src/HotChocolate/Core/src/Subscriptions.InMemory/InMemorySourceStream.cs @@ -58,7 +58,13 @@ public EnumerateMessages(Channel channel) _channel = channel; } - public async IAsyncEnumerator GetAsyncEnumerator( + public IAsyncEnumerator GetAsyncEnumerator( + CancellationToken cancellationToken = default) + { + return new WrappedEnumerator(GetAsyncEnumeratorInternally(cancellationToken), CompleteChannel); + } + + public async IAsyncEnumerator GetAsyncEnumeratorInternally( CancellationToken cancellationToken = default) { while (await _channel.Reader.WaitToReadAsync(cancellationToken)) @@ -72,6 +78,13 @@ public async IAsyncEnumerator GetAsyncEnumerator( .ConfigureAwait(false); } } + + private ValueTask CompleteChannel() + { + // no more readers, outgoing channel should be closed + _channel.Writer.TryComplete(); + return default; + } } private class EnumerateMessages: IAsyncEnumerable @@ -92,5 +105,30 @@ public async IAsyncEnumerator GetAsyncEnumerator( } } } + + private class WrappedEnumerator : IAsyncEnumerator + { + private readonly IAsyncEnumerator _enumerator; + private readonly Func _disposingAction; + + public WrappedEnumerator(IAsyncEnumerator enumerator, Func disposingAction) + { + _enumerator = enumerator; + _disposingAction = disposingAction; + } + + public async ValueTask DisposeAsync() + { + await _enumerator.DisposeAsync(); + await _disposingAction(); + } + + public ValueTask MoveNextAsync() + { + return _enumerator.MoveNextAsync(); + } + + public T Current => _enumerator.Current; + } } }