Skip to content

Commit

Permalink
Fixed major bug with EndpointWriter adaptive backoff
Browse files Browse the repository at this point in the history
Fixed issue with PlayerFSM that broke Throttle in MNTK; Added TestConductorSpec
cleaned up referential equality inside RemoteWatcher
  • Loading branch information
Aaronontheweb committed Mar 11, 2016
1 parent 228131a commit 1e97890
Show file tree
Hide file tree
Showing 20 changed files with 229 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,7 @@ namespace Akka.Configuration
{
public class Config
{
public static readonly Akka.Configuration.Config Empty;
public Config() { }
public Config(Akka.Configuration.Hocon.HoconRoot root) { }
public Config(Akka.Configuration.Config source, Akka.Configuration.Config fallback) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
<Error Condition="!Exists('..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props')" Text="$([System.String]::Format('$(ErrorText)', '..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props'))" />
</Target>
<PropertyGroup>
<PreBuildEvent>
</PreBuildEvent>
</PropertyGroup>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
Expand Down
5 changes: 5 additions & 0 deletions src/core/Akka.MultiNodeTestRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ static void Main(string[] args)
//Block until all Sinks have been terminated.
TestRunSystem.WhenTerminated.Wait(TimeSpan.FromMinutes(1));

if (Debugger.IsAttached)
{
Console.ReadLine(); //block when debugging
}

//Return the proper exit code
Environment.Exit(ExitCodeContainer.ExitCode);
}
Expand Down
35 changes: 22 additions & 13 deletions src/core/Akka.Remote.TestKit/Conductor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -65,17 +66,23 @@ public IActorRef Controller
public async Task<INode> StartController(int participants, RoleName name, INode controllerPort)
{
if(_controller != null) throw new IllegalStateException("TestConductorServer was already started");
_controller = _system.ActorOf(new Props(typeof (Controller), new object[] {participants, controllerPort}),
"controller");
_controller = _system.ActorOf(Props.Create(() => new Controller(participants, controllerPort)),
"controller");
//TODO: Need to review this async stuff
var node = await _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance).ConfigureAwait(false);
var node = await _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance, Settings.QueryTimeout).ConfigureAwait(false);
await StartClient(name, node).ConfigureAwait(false);
return node;
}

/// <summary>
/// Obtain the port to which the controller’s socket is actually bound. This
/// will deviate from the configuration in `akka.testconductor.port` in case
/// that was given as zero.
/// </summary>
/// <returns>The address of the controller's socket endpoint</returns>
public Task<INode> SockAddr()
{
return _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance);
return Controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance, Settings.QueryTimeout);
}

/// <summary>
Expand Down Expand Up @@ -104,7 +111,7 @@ public Task<Done> Throttle(RoleName node, RoleName target, ThrottleTransportAdap
float rateMBit)
{
RequireTestConductorTransport();
return Controller.Ask<Done>(new Throttle(node, target, direction, rateMBit));
return Controller.Ask<Done>(new Throttle(node, target, direction, rateMBit), Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -128,9 +135,10 @@ public Task<Done> Blackhole(RoleName node, RoleName target, ThrottleTransportAda

private void RequireTestConductorTransport()
{
// Verifies that the Throttle and FailureInjector TransportAdapters are active
if(!Transport.DefaultAddress.Protocol.Contains(".trttl.gremlin."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
"(trttl, gremlin) by specifying `testTransport(on = true)` in your MultiNodeConfig.");
"(trttl, gremlin) by specifying `TestTransport(on = true)` in your MultiNodeConfig.");
}

/// <summary>
Expand Down Expand Up @@ -160,7 +168,7 @@ public Task<Done> PassThrough(RoleName node, RoleName target, ThrottleTransportA
/// <returns></returns>
public Task<Done> Disconnect(RoleName node, RoleName target)
{
return Controller.Ask<Done>(new Disconnect(node, target, false));
return Controller.Ask<Done>(new Disconnect(node, target, false), Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -173,7 +181,7 @@ public Task<Done> Disconnect(RoleName node, RoleName target)
/// <returns></returns>
public Task<Done> Abort(RoleName node, RoleName target)
{
return Controller.Ask<Done>(new Disconnect(node, target, true));
return Controller.Ask<Done>(new Disconnect(node, target, true), Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -187,7 +195,7 @@ public Task<Done> Exit(RoleName node, int exitValue)
{
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
return Controller.Ask(new Terminate(node, new Right<bool, int>(exitValue))).ContinueWith(t =>
return Controller.Ask(new Terminate(node, new Right<bool, int>(exitValue)), Settings.QueryTimeout).ContinueWith(t =>
{
if(t.Result is Done) return Done.Instance;
var failure = t.Result as FSMBase.Failure;
Expand All @@ -209,7 +217,7 @@ public Task<Done> Shutdown(RoleName node, bool abort = false)
{
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
return Controller.Ask(new Terminate(node, new Left<bool, int>(abort))).ContinueWith(t =>
return Controller.Ask(new Terminate(node, new Left<bool, int>(abort)), Settings.QueryTimeout).ContinueWith(t =>
{
if (t.Result is Done) return Done.Instance;
var failure = t.Result as FSMBase.Failure;
Expand All @@ -224,7 +232,7 @@ public Task<Done> Shutdown(RoleName node, bool abort = false)
/// </summary>
public Task<IEnumerable<RoleName>> GetNodes()
{
return Controller.Ask<IEnumerable<RoleName>>(TestKit.Controller.GetNodes.Instance);
return Controller.Ask<IEnumerable<RoleName>>(TestKit.Controller.GetNodes.Instance, Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -237,7 +245,7 @@ public Task<IEnumerable<RoleName>> GetNodes()
/// <returns></returns>
public Task<Done> RemoveNode(RoleName node)
{
return Controller.Ask<Done>(new Remove(node));
return Controller.Ask<Done>(new Remove(node), Settings.QueryTimeout);
}
}

Expand All @@ -262,7 +270,8 @@ public ConductorHandler(IActorRef controller, ILoggingAdapter log)
public async void OnConnect(INode remoteAddress, IConnection responseChannel)
{
_log.Debug("connection from {0}", responseChannel.RemoteHost);
//TODO: Seems wrong to create new RemoteConnection here

// Duration of this Ask operation needs to be infinite
var fsm = await _controller.Ask<IActorRef>(new Controller.CreateServerFSM(new RemoteConnection(responseChannel, this)), TimeSpan.FromMilliseconds(Int32.MaxValue));
_clients.AddOrUpdate(responseChannel, fsm, (connection, @ref) => fsm);
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote.TestKit/Internals/Reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ akka {
pool-size-factor = 1.0

# Max number of threads to cap factor-based number to
pool-size-max = 2
pool-size-max = 1
}

# (I&O) Used to configure the number of I/O worker threads on client sockets
Expand All @@ -58,7 +58,7 @@ akka {
pool-size-factor = 1.0

# Max number of threads to cap factor-based number to
pool-size-max = 2
pool-size-max = 1
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/core/Akka.Remote.TestKit/MsgEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ public void Encode(IConnection connection, object message, out List<IByteBuf> en
.SetOp(BarrierOp.Fail)))
.With<ThrottleMsg>(
throttle =>
{
w.SetFailure(
InjectFailure.CreateBuilder()
.SetFailure(TCP.FailType.Throttle)
.SetAddress(Address2Proto(throttle.Target))
.SetFailure(TCP.FailType.Throttle)
.SetDirection(Direction2Proto(throttle.Direction))
.SetRateMBit(throttle.RateMBit)))
.SetRateMBit(throttle.RateMBit));
})
.With<DisconnectMsg>(
disconnect =>
w.SetFailure(
Expand Down
6 changes: 5 additions & 1 deletion src/core/Akka.Remote.TestKit/MultiNodeSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ namespace Akka.Remote.TestKit
/// </summary>
public abstract class MultiNodeConfig
{
Config _commonConf = null;
// allows us to avoid NullReferenceExceptions if we make this empty rather than null
// so that way if a MultiNodeConfig doesn't explicitly set CommonConfig to some value
// it will remain safe by defaut
Config _commonConf = Akka.Configuration.Config.Empty;

ImmutableDictionary<RoleName, Config> _nodeConf = ImmutableDictionary.Create<RoleName, Config>();
ImmutableList<RoleName> _roles = ImmutableList.Create<RoleName>();
ImmutableDictionary<RoleName, ImmutableList<string>> _deployments = ImmutableDictionary.Create<RoleName, ImmutableList<string>>();
Expand Down
21 changes: 12 additions & 9 deletions src/core/Akka.Remote.TestKit/Player.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ public IActorRef Client
public Task<Done> StartClient(RoleName name, INode controllerAddr)
{
if(_client != null) throw new IllegalStateException("TestConductorClient already started");
_client =
_system.ActorOf(new Props(typeof (ClientFSM),
new object[] {name, controllerAddr}), "TestConductorClient");

_client =
_system.ActorOf(Props.Create(() => new ClientFSM(name, controllerAddr)), "TestConductorClient");

//TODO: IRequiresMessageQueue
var a = _system.ActorOf(Props.Create<WaitForClientFSMToConnect>());

Expand Down Expand Up @@ -132,22 +131,26 @@ public void Enter(TimeSpan timeout, ImmutableList<string> names)
try
{
var askTimeout = barrierTimeout + Settings.QueryTimeout;
//TODO: Wait?
// Need to force barrier to wait here, so we can pass along a "fail barrier" message in the event
// of a failed operation
_client.Ask(new ToServer<EnterBarrier>(new EnterBarrier(name, barrierTimeout)), askTimeout).Wait();
}
catch (OperationCanceledException)
catch (AggregateException)
{
_client.Tell(new ToServer<FailBarrier>(new FailBarrier(name)));
throw new TimeoutException("Client timed out while waiting for barrier " + name);
}
catch (OperationCanceledException)
{
_system.Log.Debug("OperationCanceledException was thrown instead of AggregateException");
}
_system.Log.Debug("passed barrier {0}", name);
}
}

public Task<Address> GetAddressFor(RoleName name)
{
//TODO: QueryTimeout implicit?
return _client.Ask<Address>(new ToServer<GetAddress>(new GetAddress(name)));
return _client.Ask<Address>(new ToServer<GetAddress>(new GetAddress(name)), Settings.QueryTimeout);
}
}

Expand Down Expand Up @@ -363,7 +366,7 @@ public void InitFSM()
_log.Info("disconnected from TestConductor");
throw new ConnectionFailure("disconnect");
}
if(@event.FsmEvent is ToServer<Done> && @event.StateData.Channel != null && @event.StateData.RunningOp == null)
if(@event.FsmEvent is ToServer<Done> && @event.StateData.Channel != null)
{
@event.StateData.Channel.Write(Done.Instance);
return Stay();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
<Compile Include="RemoteNodeShutdownAndComesBackSpec.cs" />
<Compile Include="RemoteRoundRobinSpec.cs" />
<Compile Include="RemoteRandomSpec.cs" />
<Compile Include="TestConductor\TestConductorSpec.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\contrib\testkits\Akka.TestKit.Xunit2\Akka.TestKit.Xunit2.csproj">
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote.Tests.MultiNode/RemoteRandomSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected override void OnReceive(object message)
}
}

[MultiNodeFact]
[MultiNodeFact()]
public void RemoteRandomSpecs()
{
A_remote_random_pool_must_be_locally_instantiated_on_a_remote_node_and_be_able_to_communicate_through_its_remote_actor_ref();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public override int Resize(IEnumerable<Routee> currentRoutees)
}
}

[MultiNodeFact]
[MultiNodeFact()]
public void RemoteRoundRobinSpecs()
{
A_remote_round_robin_must_be_locally_instantiated_on_a_remote_node_and_be_able_to_communicate_through_its_remote_actor_ref();
Expand Down
Loading

0 comments on commit 1e97890

Please sign in to comment.