Skip to content

Commit

Permalink
Fixed issues ChilliCream#2740 and ChilliCream#3595 (memory leaks in s…
Browse files Browse the repository at this point in the history
…ubscriptions)
  • Loading branch information
Sergey Kolpachev committed Apr 27, 2021
1 parent 79c372d commit 074e48b
Showing 8 changed files with 145 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ await _connection.SendAsync(
}
}
}
catch (TaskCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// the message processing was canceled.
}
Original file line number Diff line number Diff line change
@@ -34,38 +34,45 @@ public void Begin(CancellationToken cancellationToken)
private async Task ProcessMessagesAsync(
CancellationToken cancellationToken)
{
while (true)
try
{
SequencePosition? position;
ReadResult result = await _reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;

do
while (true)
{
position = buffer.PositionOf(Subscription.Delimiter);
SequencePosition? position;
ReadResult result = await _reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;

if (position is not null)
do
{
await _pipeline.ProcessAsync(
_connection,
buffer.Slice(0, position.Value),
cancellationToken);
position = buffer.PositionOf(Subscription.Delimiter);

if (position is not null)
{
await _pipeline.ProcessAsync(
_connection,
buffer.Slice(0, position.Value),
cancellationToken);

// Skip the message which was read.
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
// Skip the message which was read.
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
}
while (position != null);
while (position != null);

_reader.AdvanceTo(buffer.Start, buffer.End);
_reader.AdvanceTo(buffer.Start, buffer.End);

if (result.IsCompleted)
{
break;
if (result.IsCompleted)
{
break;
}
}
}

await _reader.CompleteAsync();
catch(OperationCanceledException) when (cancellationToken.IsCancellationRequested) {}
finally
{
// reader should be completed always, so that related pipe writer can stop write new messages
await _reader.CompleteAsync();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -18,13 +18,21 @@ public MessageReceiver(ISocketConnection connection, PipeWriter writer)

public async Task ReceiveAsync(CancellationToken cancellationToken)
{
while (!_connection.Closed &&
!cancellationToken.IsCancellationRequested)
try
{
await _connection.ReceiveAsync(_writer, cancellationToken);
await WriteMessageDelimiterAsync(cancellationToken);
while (!_connection.Closed &&
!cancellationToken.IsCancellationRequested)
{
await _connection.ReceiveAsync(_writer, cancellationToken);
await WriteMessageDelimiterAsync(cancellationToken);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
finally
{
// writer should be always completed
await _writer.CompleteAsync();
}
await _writer.CompleteAsync();
}

private async Task WriteMessageDelimiterAsync(
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ namespace HotChocolate.AspNetCore.Subscriptions
internal sealed class Subscription : ISubscription
{
internal const byte Delimiter = 0x07;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly CancellationTokenSource _cts;
private readonly ISocketConnection _connection;
private readonly IResponseStream _responseStream;
private bool _disposed;
@@ -28,9 +28,11 @@ public Subscription(
Id = id ??
throw new ArgumentNullException(nameof(id));

_cts = CancellationTokenSource.CreateLinkedTokenSource(_connection.RequestAborted);

Task.Factory.StartNew(
SendResultsAsync,
CancellationToken.None,
_cts.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
@@ -39,51 +41,64 @@ public Subscription(

private async Task SendResultsAsync()
{
CancellationToken cancellationToken = _cts.Token;

try
{
await foreach (IQueryResult result in
_responseStream.ReadResultsAsync().WithCancellation(_cts.Token))
_responseStream.ReadResultsAsync().WithCancellation(cancellationToken))
{
using (result)
{
await _connection.SendAsync(new DataResultMessage(Id, result), _cts.Token);
if (!_connection.Closed)
{
await _connection.SendAsync(new DataResultMessage(Id, result), cancellationToken);
}
}
}

if (!_cts.IsCancellationRequested)
if (!cancellationToken.IsCancellationRequested && !_connection.Closed)
{
await _connection.SendAsync(new DataCompleteMessage(Id), _cts.Token);
Completed?.Invoke(this, EventArgs.Empty);
await _connection.SendAsync(new DataCompleteMessage(Id), cancellationToken);
}
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
catch (ObjectDisposedException) { }
catch (Exception ex)
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
{
if (!_cts.IsCancellationRequested)
if (!_connection.Closed)
{
IError error =
ErrorBuilder
.New()
.SetException(ex)
.SetCode(ErrorCodes.Execution.TaskProcessingError)
.SetMessage("Unexpected Execution Error")
.Build();

IQueryResult result = QueryResultBuilder.CreateError(error);
try
{
await _connection.SendAsync(new DataResultMessage(Id, result), _cts.Token);
}
finally
{
await _connection.SendAsync(new DataCompleteMessage(Id), _cts.Token);
Completed?.Invoke(this, EventArgs.Empty);
IError error =
ErrorBuilder
.New()
.SetException(ex)
.SetCode(ErrorCodes.Execution.TaskProcessingError)
.SetMessage("Unexpected Execution Error")
.Build();

IQueryResult result = QueryResultBuilder.CreateError(error);
try
{
await _connection.SendAsync(new DataResultMessage(Id, result), cancellationToken);
}
finally
{
await _connection.SendAsync(new DataCompleteMessage(Id), cancellationToken);
}

}
catch { } // suppress all errors, so original exception can be rethrown
}

// original exception should be propagated to upper level in order to be logged correctly at least
throw;
}
finally
{
// completed should be always invoked to be ensure that disposed subscription is removed from subscription manager
Completed?.Invoke(this, EventArgs.Empty);
Dispose();
}
}
Original file line number Diff line number Diff line change
@@ -41,19 +41,27 @@ public async Task HandleAsync(CancellationToken cancellationToken)
_messageProcessor.Begin(cts.Token);
await _messageReceiver.ReceiveAsync(cts.Token);
}
catch(OperationCanceledException)
catch(OperationCanceledException) when (cts.Token.IsCancellationRequested)
{
// OperationCanceledException are catched and will not
// bubble further. We will just close the current subscription
// context.
}
finally
{
cts.Cancel();
await _connection.CloseAsync(
try
{
if (!cts.IsCancellationRequested)
{
cts.Cancel();
}

await _connection.CloseAsync(
"Session ended.",
SocketCloseStatus.NormalClosure,
CancellationToken.None);
}
catch {} // original exception must not be lost if new exception occurs during closing session
}
}
}
Original file line number Diff line number Diff line change
@@ -127,6 +127,9 @@ private async Task<IQueryResult> OnEvent(object payload)
return await _queryExecutor
.ExecuteAsync(operationContext, scopedContext)
.ConfigureAwait(false);

// I believe operation context should be marked as completed here?
//((IExecutionTaskContext)operationContext).Completed();
}
finally
{
@@ -202,6 +205,9 @@ private async ValueTask<ISourceStream> SubscribeAsync()
await rootSelection.Field.SubscribeResolver!.Invoke(middlewareContext)
.ConfigureAwait(false);

// mark operation context as Completed in order to free all associated resources and allow to return it to pool
((IExecutionTaskContext)operationContext).Completed();

if (operationContext.Result.Errors.Count > 0)
{
// again if we have any errors we will just throw them and not opening
Original file line number Diff line number Diff line change
@@ -123,10 +123,13 @@ private async Task ProcessMessages()
ImmutableHashSet<Channel<TMessage>> closedChannel =
ImmutableHashSet<Channel<TMessage>>.Empty;

for (var i = 0; i < _outgoing.Count; i++)
var outgoingCount = _outgoing.Count;

for (var i = 0; i < outgoingCount; i++)
{
Channel<TMessage> channel = _outgoing[i];

// close outgoing channel if related subscription is completed (no reader available)
if (!channel.Writer.TryWrite(message)
&& channel.Reader.Completion.IsCompleted)
{
@@ -139,7 +142,8 @@ private async Task ProcessMessages()
_outgoing.RemoveAll(c => closedChannel.Contains(c));
}

if (_outgoing.Count == 0)
// raises unsubscribed event only once when all outgoing channels (subscriptions) are removed
if (_outgoing.Count == 0 && outgoingCount > 0)
{
RaiseUnsubscribedEvent();
}
Original file line number Diff line number Diff line change
@@ -58,7 +58,13 @@ public EnumerateMessages(Channel<T> channel)
_channel = channel;
}

public async IAsyncEnumerator<T> GetAsyncEnumerator(
public IAsyncEnumerator<T> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
return new WrappedEnumerator<T>(GetAsyncEnumeratorInternally(cancellationToken), CompleteChannel);
}

public async IAsyncEnumerator<T> GetAsyncEnumeratorInternally(
CancellationToken cancellationToken = default)
{
while (await _channel.Reader.WaitToReadAsync(cancellationToken))
@@ -72,6 +78,13 @@ public async IAsyncEnumerator<T> GetAsyncEnumerator(
.ConfigureAwait(false);
}
}

private ValueTask CompleteChannel()
{
// no more readers, outgoing channel should be closed
_channel.Writer.TryComplete();
return default;
}
}

private class EnumerateMessages: IAsyncEnumerable<object>
@@ -92,5 +105,30 @@ public async IAsyncEnumerator<object> GetAsyncEnumerator(
}
}
}

private class WrappedEnumerator<T> : IAsyncEnumerator<T>
{
private readonly IAsyncEnumerator<T> _enumerator;
private readonly Func<ValueTask> _disposingAction;

public WrappedEnumerator(IAsyncEnumerator<T> enumerator, Func<ValueTask> disposingAction)
{
_enumerator = enumerator;
_disposingAction = disposingAction;
}

public async ValueTask DisposeAsync()
{
await _enumerator.DisposeAsync();
await _disposingAction();
}

public ValueTask<bool> MoveNextAsync()
{
return _enumerator.MoveNextAsync();
}

public T Current => _enumerator.Current;
}
}
}

0 comments on commit 074e48b

Please sign in to comment.