Skip to content

Commit

Permalink
implemented remotingterminator
Browse files Browse the repository at this point in the history
upgraded to Helios 1.4.1 to avoid crash on TCP shutdown
  • Loading branch information
Aaronontheweb committed Jul 18, 2015
1 parent 27cd38d commit b2bbe59
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 45 deletions.
31 changes: 13 additions & 18 deletions src/Akka.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.22823.1
# Visual Studio 2013
VisualStudioVersion = 12.0.30723.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{69279534-1DBA-4115-BF8B-03F77FC8125E}"
EndProject
Expand Down Expand Up @@ -192,8 +192,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Persistence", "Persistence"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.TestKit.Xunit2", "contrib\testkits\Akka.TestKit.Xunit2\Akka.TestKit.Xunit2.csproj", "{7DBD5C17-5E9D-40C4-9201-D092751532A7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.MultiNodeTests", "core\Akka.MultiNodeTests\Akka.MultiNodeTests.csproj", "{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Sql.Common", "contrib\persistence\Akka.Persistence.Sql.Common\Akka.Persistence.Sql.Common.csproj", "{3B9E6211-9488-4DB5-B714-24248693B38F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.DI.StructureMap", "contrib\dependencyInjection\Akka.DI.StructureMap\Akka.DI.StructureMap.csproj", "{34E5B4E5-0ED0-4A27-B53A-BFD812D45E1E}"
Expand All @@ -208,6 +206,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Transports", "Transports",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Remote.AkkaIOTransport", "contrib\transports\Akka.Remote.AkkaIOTransport\Akka.Remote.AkkaIOTransport.csproj", "{45AAAF15-C5D7-4107-94F0-766A32ECB9AE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.MultiNodeTests", "core\Akka.MultiNodeTests\Akka.MultiNodeTests.csproj", "{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug Mono|Any CPU = Debug Mono|Any CPU
Expand Down Expand Up @@ -709,14 +709,6 @@ Global
{7DBD5C17-5E9D-40C4-9201-D092751532A7}.Release Mono|Any CPU.Build.0 = Release|Any CPU
{7DBD5C17-5E9D-40C4-9201-D092751532A7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7DBD5C17-5E9D-40C4-9201-D092751532A7}.Release|Any CPU.Build.0 = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug Mono|Any CPU.ActiveCfg = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug Mono|Any CPU.Build.0 = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release Mono|Any CPU.ActiveCfg = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release Mono|Any CPU.Build.0 = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release|Any CPU.Build.0 = Release|Any CPU
{3B9E6211-9488-4DB5-B714-24248693B38F}.Debug Mono|Any CPU.ActiveCfg = Debug|Any CPU
{3B9E6211-9488-4DB5-B714-24248693B38F}.Debug Mono|Any CPU.Build.0 = Debug|Any CPU
{3B9E6211-9488-4DB5-B714-24248693B38F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -757,6 +749,14 @@ Global
{45AAAF15-C5D7-4107-94F0-766A32ECB9AE}.Release Mono|Any CPU.Build.0 = Release|Any CPU
{45AAAF15-C5D7-4107-94F0-766A32ECB9AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{45AAAF15-C5D7-4107-94F0-766A32ECB9AE}.Release|Any CPU.Build.0 = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug Mono|Any CPU.ActiveCfg = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug Mono|Any CPU.Build.0 = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release Mono|Any CPU.ActiveCfg = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release Mono|Any CPU.Build.0 = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -842,18 +842,13 @@ Global
{D63223FA-03F5-4B32-A6EC-668F718C0826} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E}
{264C22A4-CAFC-41F6-B82C-4DDC5C196767} = {588C1513-FAB6-42C3-B6FC-3485F13620CF}
{7DBD5C17-5E9D-40C4-9201-D092751532A7} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E}
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
{3B9E6211-9488-4DB5-B714-24248693B38F} = {264C22A4-CAFC-41F6-B82C-4DDC5C196767}
{34E5B4E5-0ED0-4A27-B53A-BFD812D45E1E} = {B1D10183-8FAE-4506-B935-403FCED89BDB}
{13BA1CC1-A431-441D-8B11-3969D2C68A6E} = {D1CCD86E-0EF8-473A-979B-25E1235FEA2D}
{7DBD5C17-5E9D-40C4-9201-D092751532A7} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E}
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
{3B9E6211-9488-4DB5-B714-24248693B38F} = {264C22A4-CAFC-41F6-B82C-4DDC5C196767}
{34E5B4E5-0ED0-4A27-B53A-BFD812D45E1E} = {B1D10183-8FAE-4506-B935-403FCED89BDB}
{13BA1CC1-A431-441D-8B11-3969D2C68A6E} = {D1CCD86E-0EF8-473A-979B-25E1235FEA2D}
{05FD0A87-1D0C-49CF-91DE-A9605F1C8E95} = {69279534-1DBA-4115-BF8B-03F77FC8125E}
{825196A4-4B08-401F-8994-E2DB7C77A8B7} = {05FD0A87-1D0C-49CF-91DE-A9605F1C8E95}
{04FDBFE1-61B2-42F9-8E71-FAE53B41BE9E} = {588C1513-FAB6-42C3-B6FC-3485F13620CF}
{45AAAF15-C5D7-4107-94F0-766A32ECB9AE} = {04FDBFE1-61B2-42F9-8E71-FAE53B41BE9E}
{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
EndGlobalSection
EndGlobal
8 changes: 5 additions & 3 deletions src/core/Akka.Remote/Akka.Remote.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@
<Reference Include="Google.ProtocolBuffers.Serialization">
<HintPath>..\..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.Serialization.dll</HintPath>
</Reference>
<Reference Include="Helios, Version=1.4.1.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Helios.1.4.1\lib\net45\Helios.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Web" />
<Reference Include="Helios">
<HintPath>..\..\packages\Helios.1.4.0\lib\net45\Helios.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="..\..\SharedAssemblyInfo.cs">
Expand Down
35 changes: 27 additions & 8 deletions src/core/Akka.Remote/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ public EndpointManager(Config config, ILoggingAdapter log)
private ILoggingAdapter log;
private EventPublisher eventPublisher;

/// <summary>
/// Used to indicate when an abrupt shutdown occurs
/// </summary>
private bool _normalShutdown = false;

/// <summary>
/// Mapping between transports and the local addresses they listen to
/// </summary>
Expand Down Expand Up @@ -393,6 +398,19 @@ protected override void PostStop()
{
if(PruneTimerCancelleable != null)
_pruneTimeCancelable.Cancel();
foreach(var h in pendingReadHandoffs.Values)
h.Disassociate(DisassociateInfo.Shutdown);

if (!_normalShutdown)
{
// Remaining running endpoints are children, so they will clean up themselves.
// We still need to clean up any remaining transports because handles might be in mailboxes, and for example
// Netty is not part of the actor hierarchy, so its handles will not be cleaned up if no actor is taking
// responsibility of them (because they are sitting in a mailbox).
log.Error("Remoting system has been terminated abrubtly. Attempting to shut down transports");
foreach (var t in _transportMapping.Values)
t.Shutdown();
}
}

protected override void OnReceive(object message)
Expand Down Expand Up @@ -558,30 +576,30 @@ protected void Accepting(object message)

// The construction of the Task for shutdownStatus has to happen after the flushStatus future has been finished
// so that endpoints are shut down before transports.
var shutdownStatus = Task.WhenAll(endpoints.AllEndpoints.Select(
x => x.GracefulStop(settings.FlushWait, EndpointWriter.FlushAndStop.Instance))).ContinueWith(
var flushStatus = Task.WhenAll(_transportMapping.Values.Select(x => x.Shutdown())).ContinueWith(
result =>
{
if (result.IsFaulted)
if (result.IsFaulted || result.IsCanceled)
{
if(result.Exception != null)
if (result.Exception != null)
result.Exception.Handle(e => true);
return false;
}
return result.Result.All(x => x);
}, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent);
}, TaskContinuationOptions.ExecuteSynchronously);

var flushStatus = Task.WhenAll(_transportMapping.Values.Select(x => x.Shutdown())).ContinueWith(
var shutdownStatus = Task.WhenAll(endpoints.AllEndpoints.Select(
x => x.GracefulStop(settings.FlushWait, EndpointWriter.FlushAndStop.Instance))).ContinueWith(
result =>
{
if (result.IsFaulted)
if (result.IsFaulted || result.IsCanceled)
{
if (result.Exception != null)
result.Exception.Handle(e => true);
return false;
}
return result.Result.All(x => x);
}, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent);
}, TaskContinuationOptions.ExecuteSynchronously);

Task.WhenAll(shutdownStatus, flushStatus)
.ContinueWith(x => x.Result.All(y => y),
Expand All @@ -594,6 +612,7 @@ protected void Accepting(object message)
}

//Ignore all other writes
_normalShutdown = true;
Context.Become(Flushing);
});
}
Expand Down
86 changes: 78 additions & 8 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public RemoteActorRefProvider(string systemName, Settings settings, EventStream
}

private readonly LocalActorRefProvider _local;
private Internals _internals;
private volatile Internals _internals;
private ActorSystemImpl _system;

private Internals RemoteInternals
Expand All @@ -50,7 +50,7 @@ private Internals RemoteInternals
return _internals ??
(_internals =
new Internals(new Remoting(_system, this), _system.Serialization,
new RemoteSystemDaemon(_system, RootPath / "remote", SystemGuardian,this.DeadLetters /* TODO: should be RemoteTerminator*/, _log)));
new RemoteSystemDaemon(_system, RootPath / "remote", SystemGuardian, _remotingTerminator, _log)));
}
}

Expand Down Expand Up @@ -93,16 +93,21 @@ public void UnregisterTempActor(ActorPath path)
_local.UnregisterTempActor(path);
}

//TODO: Why volatile?
private IActorRef _remoteWatcher;
private volatile IActorRef _remotingTerminator;
private volatile IActorRef _remoteWatcher;

public virtual void Init(ActorSystemImpl system)
{
_system = system;

_local.Init(system);

//TODO: RemotingTerminator
_remotingTerminator =
_system.SystemActorOf(
RemoteSettings.ConfigureDispatcher(Props.Create(() => new RemotingTerminator(_local.SystemGuardian))),
"remoting-terminator");

_remotingTerminator.Tell(RemoteInternals);

Transport.Start();
_remoteWatcher = CreateRemoteWatcher(system);
Expand Down Expand Up @@ -407,6 +412,9 @@ public Internals(RemoteTransport transport, Akka.Serialization.Serialization ser

#region RemotingTerminator

/// <summary>
/// Describes the FSM states of the <see cref="RemotingTerminator"/>
/// </summary>
enum TerminatorState
{
Uninitialized,
Expand All @@ -416,32 +424,94 @@ enum TerminatorState
Finished
}

/// <summary>
/// Responsible for shutting down the <see cref="RemoteDaemon"/> and all transports
/// when the <see cref="ActorSystem"/> is being shutdown.
/// </summary>
private class RemotingTerminator : FSM<TerminatorState, Internals>
{
private readonly IActorRef _systemGuardian;
private readonly ILoggingAdapter _log;

public RemotingTerminator(IActorRef systemGuardian)
{
_systemGuardian = systemGuardian;
_log = Context.GetLogger();
InitFSM();
}

private void InitFSM()
{

When(TerminatorState.Uninitialized, @event =>
{
var internals = @event.StateData;
var internals = @event.FsmEvent as Internals;
if (internals != null)
{
//TODO: add a termination hook to the system guardian
_systemGuardian.Tell(RegisterTerminationHook.Instance);
return GoTo(TerminatorState.Idle).Using(internals);
}
return null;
});

When(TerminatorState.Idle, @event =>
{
if (@event.StateData != null && @event.FsmEvent is TerminationHook)
{
_log.Info("Shutting down remote daemon.");
@event.StateData.RemoteDaemon.Tell(TerminationHook.Instance);
return GoTo(TerminatorState.WaitDaemonShutdown);
}
return null;
});

// TODO: state timeout
When(TerminatorState.WaitDaemonShutdown, @event =>
{
if (@event.StateData != null && @event.FsmEvent is TerminationHookDone)
{
_log.Info("Remote daemon shut down; proceeding with flushing remote transports.");
@event.StateData.Transport.Shutdown()
.ContinueWith(t => TransportShutdown.Instance,
TaskContinuationOptions.ExecuteSynchronously & TaskContinuationOptions.AttachedToParent)
.PipeTo(Self);
return GoTo(TerminatorState.WaitTransportShutdown);
}

return null;
});

When(TerminatorState.WaitTransportShutdown, @event =>
{
if (@event.FsmEvent is TransportShutdown)
{
_log.Info("Remoting shut down.");
_systemGuardian.Tell(TerminationHookDone.Instance);
Stop();
return GoTo(TerminatorState.Finished);
}
return null;
});

StartWith(TerminatorState.Uninitialized, null);
}

public sealed class TransportShutdown
{
private TransportShutdown() { }
private static readonly TransportShutdown _instance = new TransportShutdown();
public static TransportShutdown Instance
{
get
{
return _instance;
}
}

public override string ToString()
{
return "<TransportShutdown>";
}
}
}

#endregion
Expand Down
18 changes: 15 additions & 3 deletions src/core/Akka.Remote/RemoteSystemDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ internal class RemoteSystemDaemon : VirtualPathContainer
{
private readonly ActorSystemImpl _system;
private readonly Switch _terminating = new Switch(false);
//private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]]
private readonly ConcurrentDictionary<IActorRef, IImmutableSet<IActorRef>> _parent2Children = new ConcurrentDictionary<IActorRef, IImmutableSet<IActorRef>>();
private readonly IActorRef _terminator;

Expand All @@ -104,7 +103,7 @@ public RemoteSystemDaemon(ActorSystemImpl system, ActorPath path, IInternalActor
/// Called when [receive].
/// </summary>
/// <param name="message">The message.</param>
protected void OnReceive(object message)
protected void OnReceive(object message, IActorRef sender)
{
//note: RemoteDaemon does not handle ActorSelection messages - those are handled directly by the RemoteActorRefProvider.
if (message is IDaemonMsg)
Expand All @@ -124,6 +123,19 @@ protected void OnReceive(object message)
if(@ref.Parent.Path.Address == addressTerminated.Address) _system.Stop(@ref);
});
}
else if (message is Identify)
{
var identify = message as Identify;
sender.Tell(new ActorIdentity(identify.MessageId, this));
}
else if (message is TerminationHook)
{
_terminating.SwitchOn(() =>
{
TerminationHookDoneWhenNoChildren();
ForEachChild(c => _system.Stop(c));
});
}
else if (message is DeathWatchNotification)
{
var deathWatchNotification = message as DeathWatchNotification;
Expand Down Expand Up @@ -210,7 +222,7 @@ private void TerminationHookDoneWhenNoChildren()
/// <param name="sender">The sender.</param>
protected override void TellInternal(object message, IActorRef sender)
{
OnReceive(message);
OnReceive(message, sender);
}

/// <summary>
Expand Down
7 changes: 4 additions & 3 deletions src/core/Akka.Remote/Remoting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public override Task Shutdown()
}
finalize();
}
}, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent);
}, TaskContinuationOptions.ExecuteSynchronously);
}
}

Expand Down Expand Up @@ -255,9 +255,10 @@ public override Task<bool> ManagementCommand(object cmd)
Provider.RemoteSettings.CommandAckTimeout)
.ContinueWith(result =>
{
if (result.IsCanceled || result.IsFaulted)
return false;
return result.Result.Status;
},
TaskContinuationOptions.ExecuteSynchronously & TaskContinuationOptions.AttachedToParent);
}, TaskContinuationOptions.ExecuteSynchronously);
}

public override Address LocalAddressForRemote(Address remote)
Expand Down
Loading

0 comments on commit b2bbe59

Please sign in to comment.