Skip to content

Commit

Permalink
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 52 deletions.
22 changes: 0 additions & 22 deletions src/core/Akka.MultiNodeTestRunner/Akka.MultiNodeTestRunner.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,6 @@
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
<Error Condition="!Exists('..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props')" Text="$([System.String]::Format('$(ErrorText)', '..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props'))" />
</Target>
<PropertyGroup>
<PreBuildEvent>echo PREBUILDSTEP for $(ProjectName)

echo Copying files from $(SolutionDir)core\Akka.Remote.Tests.MultiNode\$(OutDir) to $(ProjectDir)bin\$(Configuration)\Akka.Remote

if not exist "$(ProjectDir)bin\$(Configuration)\Akka.Remote" mkdir "$(ProjectDir)bin\$(Configuration)\Akka.Remote"

xcopy "$(SolutionDir)core\Akka.Remote.Tests.MultiNode\$(OutDir)*.dll" "$(ProjectDir)bin\$(Configuration)\Akka.Remote" /i /d /y
if errorlevel 1 goto BuildEventFailed

if errorlevel 1 goto BuildEventFailed

REM Exit properly because the build will not fail
REM unless the final step exits with an error code

goto BuildEventOK
:BuildEventFailed
echo PREBUILDSTEP for $(ProjectName) FAILED
exit 1
:BuildEventOK
echo PREBUILDSTEP for $(ProjectName) COMPLETED OK</PreBuildEvent>
</PropertyGroup>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
Expand Down
34 changes: 29 additions & 5 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private static string GetConfig()
applied-adapters = []
registry-key = aX33k0jWKg
local-address = ""test://RemotingSpec@localhost:12345""
maximum-payload-bytes = 32000 bytes
maximum-payload-bytes = 32000b
scheme-identifier = test
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ protected string GetOtherRemoteSysConfig()
applied-adapters = []
registry-key = aX33k0jWKg
local-address = ""test://remote-sys@localhost:12346""
maximum-payload-bytes = 48000 bytes
maximum-payload-bytes = 128000b
scheme-identifier = test
}
}
Expand Down Expand Up @@ -166,7 +166,6 @@ public async Task Remoting_must_support_Ask()
Assert.IsType<FutureActorRef>(msg.Item2);
}


[Fact]
public void Remoting_must_create_and_supervise_children_on_remote_Node()
{
Expand Down Expand Up @@ -230,6 +229,31 @@ public async Task Bug_884_Remoting_must_support_reply_to_child_of_Routee()
Assert.Equal(reporter, msg.Item2);
}

[Fact]
public void Drop_sent_messages_over_payload_size()
{
var oversized = ByteStringOfSize(MaxPayloadBytes + 1);
EventFilter.Exception<OversizedPayloadException>(start: "Discarding oversized payload sent to ").ExpectOne(() =>
{
VerifySend(oversized, () =>
{
ExpectNoMsg();
});
});
}

[Fact]
public void Drop_received_messages_over_payload_size()
{
EventFilter.Exception<OversizedPayloadException>(start: "Discarding oversized payload received").ExpectOne(() =>
{
VerifySend(MaxPayloadBytes + 1, () =>
{
ExpectNoMsg();
});
});
}

#endregion

#region Internal Methods
Expand All @@ -239,7 +263,7 @@ private int MaxPayloadBytes
get
{
var byteSize = Sys.Settings.Config.GetByteSize("akka.remote.test.maximum-payload-bytes");
if (byteSize != null)
if (byteSize != null)
return (int)byteSize.Value;
return 0;
}
Expand Down Expand Up @@ -290,7 +314,7 @@ private void VerifySend(object msg, Action afterSend)
{
bigBounceHere.Tell(msg, TestActor);
afterSend();
ExpectNoMsg(TimeSpan.FromMilliseconds(500));
ExpectNoMsg();
}
finally
{
Expand Down
67 changes: 45 additions & 22 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1234,17 +1234,28 @@ private bool WriteSend(EndpointManager.Send send)

//todo: RemoteMetrics https://github.com/akka/akka/blob/dc0547dd73b54b5de9c3e0b45a21aa865c5db8e2/akka-remote/src/main/scala/akka/remote/Endpoint.scala#L742

//todo: max payload size validation

var ok = _handle.Write(pdu);

if (ok)
if (pdu.Length > Transport.MaximumPayloadBytes)
{
_ackDeadline = NewAckDeadline();
_lastAck = null;
var reason = new OversizedPayloadException(
string.Format("Discarding oversized payload sent to {0}: max allowed size {1} bytes, actual size of encoded {2} was {3} bytes.",
send.Recipient,
Transport.MaximumPayloadBytes,
send.Message.GetType(),
pdu.Length));
_log.Error(reason, "Transient association error (association remains live)");
return true;
}
else
{
var ok = _handle.Write(pdu);

if (ok)
{
_ackDeadline = NewAckDeadline();
_lastAck = null;
return true;
}
}
return false;
}
catch (SerializationException ex)
Expand Down Expand Up @@ -1519,6 +1530,7 @@ public EndpointReader(
_provider = RARP.For(Context.System).Provider;
}

private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly AkkaPduCodec _codec;
private readonly IActorRef _reliableDeliverySupervisor;
private readonly ConcurrentDictionary<EndpointManager.Link, EndpointManager.ResendState> _receiveBuffers;
Expand Down Expand Up @@ -1553,23 +1565,34 @@ protected override void OnReceive(object message)
}
else if (message is InboundPayload)
{
var payload = message as InboundPayload;
var ackAndMessage = TryDecodeMessageAndAck(payload.Payload);
if (ackAndMessage.AckOption != null && _reliableDeliverySupervisor != null)
_reliableDeliverySupervisor.Tell(ackAndMessage.AckOption);
if (ackAndMessage.MessageOption != null)
var payload = ((InboundPayload)message).Payload;
if (payload.Length > Transport.MaximumPayloadBytes)
{
if (ackAndMessage.MessageOption.ReliableDeliveryEnabled)
{
_ackedReceiveBuffer = _ackedReceiveBuffer.Receive(ackAndMessage.MessageOption);
DeliverAndAck();
}
else
var reason = new OversizedPayloadException(
string.Format("Discarding oversized payload received: max allowed size {0} bytes, actual size {1} bytes.",
Transport.MaximumPayloadBytes,
payload.Length));
_log.Error(reason, "Transient error while reading from association (association remains live)");
}
else
{
var ackAndMessage = TryDecodeMessageAndAck(payload);
if (ackAndMessage.AckOption != null && _reliableDeliverySupervisor != null)
_reliableDeliverySupervisor.Tell(ackAndMessage.AckOption);
if (ackAndMessage.MessageOption != null)
{
_msgDispatch.Dispatch(ackAndMessage.MessageOption.Recipient,
ackAndMessage.MessageOption.RecipientAddress,
ackAndMessage.MessageOption.SerializedMessage,
ackAndMessage.MessageOption.SenderOptional);
if (ackAndMessage.MessageOption.ReliableDeliveryEnabled)
{
_ackedReceiveBuffer = _ackedReceiveBuffer.Receive(ackAndMessage.MessageOption);
DeliverAndAck();
}
else
{
_msgDispatch.Dispatch(ackAndMessage.MessageOption.Recipient,
ackAndMessage.MessageOption.RecipientAddress,
ackAndMessage.MessageOption.SerializedMessage,
ackAndMessage.MessageOption.SenderOptional);
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/core/Akka.Remote/Transport/Helios/HeliosTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ public override string SchemeIdentifier
}
}

public override long MaximumPayloadBytes
{
get
{
return Settings.MaxFrameSize;
}
}

protected ILoggingAdapter Log;

/// <summary>
Expand Down
10 changes: 7 additions & 3 deletions src/core/Akka.Remote/Transport/TestTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ private TaskCompletionSource<IAssociationEventListener> _associationListenerProm

public TestTransport(ActorSystem system, Config conf)
: this(
Address.Parse(GetConfigString(conf, "local-address")), AssociationRegistry.Get(GetConfigString(conf,"registry-key")),
GetConfigString(conf,"scheme-identifier")) { }
Address.Parse(GetConfigString(conf, "local-address")),
AssociationRegistry.Get(GetConfigString(conf,"registry-key")),
conf.GetByteSize("maximum-payload-bytes") ?? 32000,
GetConfigString(conf,"scheme-identifier")
) { }

private static string GetConfigString(Config conf, string name)
{
Expand All @@ -51,10 +54,11 @@ private static string GetConfigString(Config conf, string name)
return value;
}

public TestTransport(Address localAddress, AssociationRegistry registry, string schemeIdentifier = "test")
public TestTransport(Address localAddress, AssociationRegistry registry, long maximumPayloadBytes = 32000, string schemeIdentifier = "test")
{
LocalAddress = localAddress;
_registry = registry;
MaximumPayloadBytes = maximumPayloadBytes;
SchemeIdentifier = schemeIdentifier;
ListenBehavior =
new SwitchableLoggedBehavior<bool, Tuple<Address, TaskCompletionSource<IAssociationEventListener>>>(
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Remote/Transport/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public abstract class Transport
public ActorSystem System { get; protected set; }

public virtual string SchemeIdentifier { get; protected set; }
public virtual long MaximumPayloadBytes { get; protected set; }
public abstract Task<Tuple<Address, TaskCompletionSource<IAssociationEventListener>>> Listen();

public abstract bool IsResponsibleFor(Address remote);
Expand Down
8 changes: 8 additions & 0 deletions src/core/Akka.Remote/Transport/TransportAdapters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ public override string SchemeIdentifier
}
}

public override long MaximumPayloadBytes
{
get
{
return WrappedTransport.MaximumPayloadBytes;
}
}

protected abstract Task<IAssociationEventListener> InterceptListen(Address listenAddress,
Task<IAssociationEventListener> listenerTask);

Expand Down

0 comments on commit 05f57b9

Please sign in to comment.