Skip to content

Commit

Permalink
Merge pull request #705 from alex-bogomaz/Port-GenericTransportSpec-#624
Browse files Browse the repository at this point in the history
#624 port GenericTransportSpec
  • Loading branch information
Aaronontheweb committed Mar 3, 2015
2 parents f319c27 + 59a95ee commit b319b3c
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Remote.Tests/Akka.Remote.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<Compile Include="Serialization\DaemonMsgCreateSerializerSpec.cs" />
<Compile Include="Transport\AkkaProtocolSpec.cs" />
<Compile Include="Transport\AkkaProtocolStressTest.cs" />
<Compile Include="Transport\GenericTransportSpec.cs" />
<Compile Include="Transport\TestTransportSpec.cs" />
<Compile Include="Transport\ThrottleModeSpec.cs" />
<Compile Include="Transport\ThrottlerTransportAdapterSpec.cs" />
Expand Down
217 changes: 217 additions & 0 deletions src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
using Akka.Actor;
using Akka.Remote.Transport;
using Akka.TestKit;
using Google.ProtocolBuffers;
using System;
using System.Linq;
using System.Threading.Tasks;
using Xunit;

namespace Akka.Remote.Tests.Transport
{
public abstract class GenericTransportSpec : AkkaSpec
{
private Address addressATest = new Address("test", "testsytemA", "testhostA", 4321);
private Address addressBTest = new Address("test", "testsytemB", "testhostB", 5432);

private Address addressA;
private Address addressB;
private Address nonExistingAddress;
private bool withAkkaProtocol;

public GenericTransportSpec(bool withAkkaProtocol = false)
: base("akka.actor.provider = \"Akka.Remote.RemoteActorRefProvider, Akka.Remote\" ")
{
this.withAkkaProtocol = withAkkaProtocol;

addressA = addressATest.Copy(protocol: string.Format("{0}.{1}", SchemeIdentifier, addressATest.Protocol));
addressB = addressBTest.Copy(protocol: string.Format("{0}.{1}", SchemeIdentifier, addressBTest.Protocol));
nonExistingAddress = new Address(SchemeIdentifier + ".test", "nosystem", "nohost", 0);
}

private TimeSpan DefaultTimeout { get { return Dilated(TestKitSettings.DefaultTimeout); } }

protected abstract Akka.Remote.Transport.Transport FreshTransport(TestTransport testTransport);

protected abstract string SchemeIdentifier { get; }

private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Transport transport)
{
if (withAkkaProtocol) {
var provider = (RemoteActorRefProvider)((ExtendedActorSystem)Sys).Provider;

return new AkkaProtocolTransport(transport, Sys, new AkkaProtocolSettings(provider.RemoteSettings.Config), new AkkaPduProtobuffCodec());
}

return transport;
}

private Akka.Remote.Transport.Transport NewTransportA(AssociationRegistry registry)
{
return WrapTransport(FreshTransport(new TestTransport(addressATest, registry)));
}

private Akka.Remote.Transport.Transport NewTransportB(AssociationRegistry registry)
{
return WrapTransport(FreshTransport(new TestTransport(addressBTest, registry)));
}

[Fact]
public void Transport_must_return_an_Address_and_promise_when_listen_is_called()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);

var result = AwaitResult(transportA.Listen());

Assert.Equal(addressA, result.Item1);
Assert.NotNull(result.Item2);

Assert.True(
registry.LogSnapshot().OfType<ListenAttempt>().Any(x => x.BoundAddress == addressATest)
);
}

[Fact]
public void Transport_must_associate_successfully_with_another_transport_of_its_kind()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

// Must complete the returned promise to receive events
AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));

transportA.Associate(addressB);
ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var inbound = o as InboundAssociation;

if (inbound != null && inbound.Association.RemoteAddress == addressA)
return inbound.Association;

return null;
});

Assert.True(
registry.LogSnapshot().OfType<AssociateAttempt>().Any(x => x.LocalAddress == addressATest && x.RemoteAddress == addressBTest)
);
AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));
}

[Fact]
public void Transport_must_fail_to_associate_with_nonexisting_address()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitCondition(() => registry.TransportsReady(addressATest));

// Transport throws InvalidAssociationException when trying to associate with non-existing system
XAssert.Throws<InvalidAssociationException>(() =>
AwaitResult(transportA.Associate(nonExistingAddress))
);
}

[Fact]
public void Transport_must_successfully_send_PDUs()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress == addressA)
return handle.Association;

return null;
});

var handleA = AwaitResult(associate);

// Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

var payload = ByteString.CopyFromUtf8("PDU");
var pdu = withAkkaProtocol ? new AkkaPduProtobuffCodec().ConstructPayload(payload) : payload;

AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));

handleA.Write(payload);
ExpectMsgPf(DefaultTimeout, "Expect InboundPayload from A", o =>
{
var inboundPayload = o as InboundPayload;

if (inboundPayload != null && inboundPayload.Payload.Equals(pdu))
return inboundPayload.Payload;

return null;
});

Assert.True(
registry.LogSnapshot().OfType<WriteAttempt>().Any(x => x.Sender == addressATest && x.Recipient == addressBTest && x.Payload.Equals(pdu))
);
}

[Fact]
public void Transport_must_successfully_disassociate()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress == addressA)
return handle.Association;

return null;
});

var handleA = AwaitResult(associate);

// Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));

handleA.Disassociate();

ExpectMsgPf(DefaultTimeout, "Should receive Disassociated", o => o as Disassociated);

AwaitCondition(() => !registry.ExistsAssociation(addressATest, addressBTest));

AwaitCondition(() =>
registry.LogSnapshot().OfType<DisassociateAttempt>().Any(x => x.Requestor == addressATest && x.Remote == addressBTest)
);
}

private T AwaitResult<T>(Task<T> task)
{
task.Wait(DefaultTimeout);

return task.Result;
}
}
}

2 comments on commit b319b3c

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 92 is now running

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 92 outcome was FAILURE
Summary: System.Exception: xUnit failed for the following assemblies: D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Persistence.Tests\bin\Release\Akka.Persistence.Tests.dll at Microsoft.FSharp.Core.Operators.FailWith[T](String message) at Fake.XUnitHel... Build time: 00:09:16

Failed tests

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.PersistentActorSpec.PersistentActor_should_reply_to_the_original_sender_of_a_command_even_on_PersistAsync: <no details avaliable>

Please sign in to comment.