Skip to content

Commit

Permalink
Context.WatchWith memory leak fix (#4047)
Browse files Browse the repository at this point in the history
* Added repro project and failing memory leak unit test

* Ported scala's PR to .NET

* Removed temporary project and simplify test

* Compile error fix

* Approved API changes
  • Loading branch information
IgorFedchenko authored and Aaronontheweb committed Nov 20, 2019
1 parent 408b00b commit 42cfae6
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 53 deletions.
11 changes: 7 additions & 4 deletions src/Akka.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29201.188
# Visual Studio 15
VisualStudioVersion = 15.0.28307.645
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmark", "Benchmark", "{73108242-625A-4D7B-AA09-63375DBAE464}"
EndProject
Expand Down Expand Up @@ -203,11 +203,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.TestKit.Xu
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{C7012F7B-F68E-440D-9265-987266A04302}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Docs.Tests", "core\Akka.Docs.Tests\Akka.Docs.Tests.csproj", "{159F6312-3F12-4A84-9E7C-0A4B4FD72D1E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Docs.Tests", "core\Akka.Docs.Tests\Akka.Docs.Tests.csproj", "{159F6312-3F12-4A84-9E7C-0A4B4FD72D1E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Docs.Tutorials", "core\Akka.Docs.Tutorials\Akka.Docs.Tutorials.csproj", "{9AE636F2-988C-42E4-9B40-FF1F7177DF91}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Docs.Tutorials", "core\Akka.Docs.Tutorials\Akka.Docs.Tutorials.csproj", "{9AE636F2-988C-42E4-9B40-FF1F7177DF91}"
EndProject
Global
GlobalSection(Performance) = preSolution
HasPerformanceSessions = true
EndGlobalSection
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|x64 = Debug|x64
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ namespace Akka.Actor
protected void StopFunctionRefs() { }
public void Suspend() { }
protected void TellWatchersWeDied() { }
public void TerminatedQueuedFor(Akka.Actor.IActorRef subject) { }
public void TerminatedQueuedFor(Akka.Actor.IActorRef subject, Akka.Util.Option<object> customMessage) { }
public bool TryGetChildStatsByName(string name, out Akka.Actor.Internal.IChildStats child) { }
protected bool TryGetChildStatsByRef(Akka.Actor.IActorRef actor, out Akka.Actor.Internal.ChildRestartStats child) { }
public bool TryGetSingleChild(string name, out Akka.Actor.IInternalActorRef child) { }
Expand Down Expand Up @@ -4804,6 +4804,7 @@ namespace Akka.Util
public bool Equals(Akka.Util.Option<T> other) { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
public T GetOrElse(T fallbackValue) { }
public override string ToString() { }
}
public abstract class Resolve : Akka.Actor.IIndirectActorProducer
Expand Down
101 changes: 101 additions & 0 deletions src/core/Akka.Tests/Actor/ContextWatchWithSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// //-----------------------------------------------------------------------
// // <copyright file="ContextWatchWithSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2019 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2019 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using FluentAssertions;
using Newtonsoft.Json;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Actor
{
public class ContextWatchWithSpec : AkkaSpec
{
private readonly ITestOutputHelper _outputHelper;

public ContextWatchWithSpec(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
}

[Fact(Skip = "This test is used with Performance Profiler to check memory leaks")]
public void Context_WatchWith_Should_not_have_memory_leak()
{
using (var actorSystem = ActorSystem.Create("repro"))
{
actorSystem.ActorOf(Props.Create<LoadHandler>());

Thread.Sleep(60.Seconds());
}
}

public class LoadHandler : ReceiveActor
{
private readonly List<IActorRef> _subjects;
private readonly ICancelable _cancel;

public LoadHandler()
{
_subjects = new List<IActorRef>();
_cancel = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
initialDelay: TimeSpan.FromSeconds(1),
interval: TimeSpan.FromSeconds(1),
receiver: Self,
message: Iteration.Instance,
sender: ActorRefs.NoSender);

Receive<Iteration>(
_ =>
{
// stop actors created on previous iteration
_subjects.ForEach(Context.Stop);
_subjects.Clear();

// create a set of actors and start watching them
for (var i = 0; i < 10_000; i++)
{
var subject = Context.ActorOf(Props.Create<Subject>());
_subjects.Add(subject);
Context.WatchWith(subject, new Stopped(subject));
}
});

Receive<Stopped>(_ => { });
}

private class Iteration
{
public static readonly Iteration Instance = new Iteration();
private Iteration() { }
}

public class Stopped
{
public IActorRef ActorRef { get; }

public Stopped(IActorRef actorRef)
{
ActorRef = actorRef;
}
}

public class Subject : ReceiveActor
{
// simulate internal state
private byte[] _state = new byte[1000];
}

protected override void PostStop() => _cancel.Cancel();
}
}
}
27 changes: 15 additions & 12 deletions src/core/Akka/Actor/ActorCell.DeathWatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Linq;
using Akka.Dispatch.SysMsg;
using Akka.Event;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Actor
Expand All @@ -31,7 +32,7 @@ public IActorRef Watch(IActorRef subject)
MaintainAddressTerminatedSubscription(() =>
{
a.SendSystemMessage(new Watch(a, _self)); // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
_state = _state.AddWatching(a, null);
_state = _state.AddWatching(a, Option<object>.None);
}, a);
}
return a;
Expand Down Expand Up @@ -77,7 +78,7 @@ public IActorRef Unwatch(IActorRef subject)
_state = _state.RemoveWatching(a);
}, a);
}
_state = _state.RemoveTerminated(a);
(_state, _) = _state.RemoveTerminated(a);
return a;
}

Expand All @@ -90,8 +91,9 @@ protected void ReceivedTerminated(Terminated t)
if (!_state.ContainsTerminated(t.ActorRef))
return;

_state = _state.RemoveTerminated(t.ActorRef); // here we know that it is the SAME ref which was put in
ReceiveMessage(t);
Option<object> customTerminatedMessage;
(_state, customTerminatedMessage) = _state.RemoveTerminated(t.ActorRef); // here we know that it is the SAME ref which was put in
ReceiveMessage(customTerminatedMessage.GetOrElse(t));
}

/// <summary>
Expand All @@ -103,17 +105,17 @@ protected void ReceivedTerminated(Terminated t)
/// <param name="addressTerminated">TBD</param>
protected void WatchedActorTerminated(IActorRef actor, bool existenceConfirmed, bool addressTerminated)
{
object message; // The custom termination message that was requested
if (TryGetWatching(actor, out message))
if (TryGetWatching(actor, out var message)) // message is custom termination message that was requested
{
MaintainAddressTerminatedSubscription(() =>
{
_state = _state.RemoveWatching(actor);
}, actor);
if (!IsTerminating)
{
Self.Tell(message ?? new Terminated(actor, existenceConfirmed, addressTerminated), actor);
TerminatedQueuedFor(actor);
// Unwatch could be called somewhere there inbetween here and the actual delivery of the custom message
Self.Tell(new Terminated(actor, existenceConfirmed, addressTerminated), actor);
TerminatedQueuedFor(actor, message);
}
}
if (ChildrenContainer.Contains(actor))
Expand All @@ -125,18 +127,19 @@ protected void WatchedActorTerminated(IActorRef actor, bool existenceConfirmed,
/// <summary>
/// TBD
/// </summary>
/// <param name="subject">TBD</param>
public void TerminatedQueuedFor(IActorRef subject)
/// <param name="subject">Tracked subject</param>
/// <param name="customMessage">Terminated custom message</param>
public void TerminatedQueuedFor(IActorRef subject, Option<object> customMessage)
{
_state = _state.AddTerminated(subject);
_state = _state.AddTerminated(subject, customMessage);
}

private bool WatchingContains(IActorRef subject)
{
return _state.ContainsWatching(subject);
}

private bool TryGetWatching(IActorRef subject, out object message)
private bool TryGetWatching(IActorRef subject, out Option<object> message)
{
return _state.TryGetWatching(subject, out message);
}
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka/Actor/Stash/Internal/AbstractStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using Akka.Dispatch;
using Akka.Dispatch.MessageQueues;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Actor.Internal
Expand Down Expand Up @@ -167,7 +168,7 @@ private void EnqueueFirst(Envelope msg)
var terminatedMessage = msg.Message as Terminated;
if(terminatedMessage != null)
{
_actorCell.TerminatedQueuedFor(terminatedMessage.ActorRef);
_actorCell.TerminatedQueuedFor(terminatedMessage.ActorRef, Option<object>.None);
}
}
}
Expand Down
Loading

0 comments on commit 42cfae6

Please sign in to comment.