Skip to content

Commit

Permalink
Merge pull request #4310 from Aaronontheweb/v1.3.18-backport
Browse files Browse the repository at this point in the history
V1.3.18 backport
  • Loading branch information
Aaronontheweb authored Mar 9, 2020
2 parents da53e08 + c8cc24f commit be94d55
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 29 deletions.
5 changes: 0 additions & 5 deletions src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,6 @@ akka {
# i.e. how long a connect may take until it is timed out
connection-timeout = 15 s

# Toggles buffer pooling on and off inside DotNetty.
# Only intended to be a work-around for users who are still running on DotNetty v0.4.6-v0.4.7
# for the following bug: https://github.com/akkadotnet/akka.net/issues/3370
enable-pooling = true

# If set to "<id.of.dispatcher>" then the specified dispatcher
# will be used to accept inbound connections, and perform IO. If "" then
# dedicated threads will be used.
Expand Down
42 changes: 22 additions & 20 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1981,31 +1981,33 @@ private void NotReading()

private void SaveState()
{
var key = new EndpointManager.Link(LocalAddress, RemoteAddress);
_receiveBuffers.TryGetValue(key, out var previousValue);
UpdateSavedState(key, previousValue);
}

private EndpointManager.ResendState Merge(EndpointManager.ResendState current,
EndpointManager.ResendState oldState)
{
if (current.Uid == oldState.Uid) return new EndpointManager.ResendState(_uid, oldState.Buffer.MergeFrom(current.Buffer));
return current;
}
EndpointManager.ResendState Merge(EndpointManager.ResendState current,
EndpointManager.ResendState oldState)
{
if (current.Uid == oldState.Uid) return new EndpointManager.ResendState(_uid, oldState.Buffer.MergeFrom(current.Buffer));
return current;
}

private void UpdateSavedState(EndpointManager.Link key, EndpointManager.ResendState expectedState)
{
if (expectedState == null)
void UpdateSavedState(EndpointManager.Link key, EndpointManager.ResendState expectedState)
{
if (!_receiveBuffers.TryAdd(key, new EndpointManager.ResendState(_uid, _ackedReceiveBuffer)))
if (expectedState == null)
{
UpdateSavedState(key, _receiveBuffers[key]);
if (!_receiveBuffers.TryAdd(key, new EndpointManager.ResendState(_uid, _ackedReceiveBuffer)))
{
_receiveBuffers.TryGetValue(key, out var prevValue);
UpdateSavedState(key, prevValue);
}
}
else if (!_receiveBuffers.TryUpdate(key,
Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), expectedState), expectedState))
{
_receiveBuffers.TryGetValue(key, out var prevValue);
UpdateSavedState(key, prevValue);
}
} else if (!_receiveBuffers.TryUpdate(key,
Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), expectedState), expectedState))
{
UpdateSavedState(key, _receiveBuffers[key]);
}

var k = new EndpointManager.Link(LocalAddress, RemoteAddress);
UpdateSavedState(k, !_receiveBuffers.TryGetValue(k, out var previousValue) ? null : previousValue);
}

private void HandleDisassociated(DisassociateInfo info)
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ protected Bootstrap ClientFactory(Address remoteAddress)
.Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay)
.Option(ChannelOption.ConnectTimeout, Settings.ConnectTimeout)
.Option(ChannelOption.AutoRead, false)
.Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default)
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
.ChannelFactory(() => Settings.EnforceIpFamily
? new TcpSocketChannel(addressFamily)
: new TcpSocketChannel())
Expand Down Expand Up @@ -384,7 +384,7 @@ private ServerBootstrap ServerFactory()
.Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay)
.Option(ChannelOption.AutoRead, false)
.Option(ChannelOption.SoBacklog, Settings.Backlog)
.Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default)
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
.ChannelFactory(() => Settings.EnforceIpFamily
? new TcpServerSocketChannel(addressFamily)
: new TcpServerSocketChannel())
Expand Down
2 changes: 0 additions & 2 deletions src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ public override bool Write(ByteString payload)

private IByteBuffer ToByteBuffer(ByteString payload)
{
//TODO: optimize DotNetty byte buffer usage
// (maybe custom IByteBuffer working directly on ByteString?)
var buffer = Unpooled.WrappedBuffer(payload.ToByteArray());
return buffer;
}
Expand Down

0 comments on commit be94d55

Please sign in to comment.