Skip to content

Commit

Permalink
Merge pull request #5984 from 317787106/feature/test_isolated3
Browse files Browse the repository at this point in the history
feat(net): prefer to disconnect from broadcast nodes
  • Loading branch information
lvs007 authored Sep 5, 2024
2 parents ea7ef8e + 59e92c8 commit 1996f7b
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.tron.core.net.service.effective;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -29,6 +31,7 @@ public class ResilienceService {
//when node is isolated, retention percent peers will not be disconnected
public static final double retentionPercent = 0.8;
private static final int initialDelay = 300;
public static final int minBroadcastPeerSize = 3;
private static final String esName = "resilience-service";
private final ScheduledExecutorService executor = ExecutorServiceManager
.newSingleThreadScheduledExecutor(esName);
Expand All @@ -47,7 +50,7 @@ public void init() {
} catch (Exception e) {
logger.error("DisconnectRandom node failed", e);
}
}, initialDelay, 60, TimeUnit.SECONDS);
}, initialDelay, 30, TimeUnit.SECONDS);
} else {
logger.info("OpenFullTcpDisconnect is disabled");
}
Expand All @@ -71,21 +74,50 @@ public void init() {

private void disconnectRandom() {
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
if (peerSize < CommonParameter.getInstance().getMaxConnections()) {
return;
}
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.collect(Collectors.toList());

if (peers.size() >= minBroadcastPeerSize) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> now - peer.getLastInteractiveTime() >= inactiveThreshold)
Map<Object, Integer> weights = new HashMap<>();
peers.forEach(peer -> {
int weight = (int) Math.ceil((double) (now - peer.getLastInteractiveTime()) / 500);
weights.put(peer, Math.max(weight, 1));
});
WeightedRandom weightedRandom = new WeightedRandom(weights);
PeerConnection one = (PeerConnection) weightedRandom.next();
disconnectFromPeer(one, ReasonCode.RANDOM_ELIMINATION, DisconnectCause.RANDOM_ELIMINATION);
return;
}

int needSyncFromPeerCount = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(PeerConnection::isNeedSyncFromPeer)
.count();
if (needSyncFromPeerCount >= 2) {
peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.filter(peer -> peer.isNeedSyncFromUs() || peer.isNeedSyncFromPeer())
.collect(Collectors.toList());
} else {
peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(PeerConnection::isNeedSyncFromUs)
.collect(Collectors.toList());
if (!peers.isEmpty()) {
int index = new Random().nextInt(peers.size());
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
DisconnectCause.RANDOM_ELIMINATION);
}
}
if (!peers.isEmpty()) {
int index = new Random().nextInt(peers.size());
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
DisconnectCause.RANDOM_ELIMINATION);
}
}


private void disconnectLan() {
if (!isLanNode()) {
return;
Expand Down Expand Up @@ -198,6 +230,32 @@ private enum DisconnectCause {
ISOLATE2_PASSIVE,
}

static class WeightedRandom {

private final Map<Object, Integer> weights;
private final Random random;

public WeightedRandom(Map<Object, Integer> weights) {
this.weights = weights;
this.random = new Random();
}

public Object next() {
int totalWeight = 0;
for (int weight : weights.values()) {
totalWeight += weight;
}
int randomNum = random.nextInt(totalWeight);
for (Object key : weights.keySet()) {
randomNum -= weights.get(key);
if (randomNum < 0) {
return key;
}
}
throw new IllegalStateException("Sum of weights should not be negative.");
}
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.tron.core.net.peer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;

import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.tron.common.application.TronApplicationContext;
import org.tron.common.utils.ReflectUtils;
import org.tron.core.Constant;
import org.tron.core.capsule.BlockCapsule.BlockId;
import org.tron.core.config.DefaultConfig;
import org.tron.core.config.Parameter.NetConstants;
import org.tron.core.config.args.Args;
import org.tron.p2p.connection.Channel;


public class PeerStatusCheckTest {

protected TronApplicationContext context;
private PeerStatusCheck service;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

@Before
public void init() throws IOException {
Args.setParam(new String[] {"--output-directory",
temporaryFolder.newFolder().toString(), "--debug"}, Constant.TEST_CONF);
context = new TronApplicationContext(DefaultConfig.class);
service = context.getBean(PeerStatusCheck.class);
}

/**
* destroy.
*/
@After
public void destroy() {
Args.clearParam();
context.destroy();
}

@Test
public void testCheck() {
int maxConnection = 30;
Assert.assertEquals(maxConnection, Args.getInstance().getMaxConnections());
Assert.assertEquals(0, PeerManager.getPeers().size());

for (int i = 0; i < maxConnection; i++) {
InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001);
Channel c1 = spy(Channel.class);
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress());
ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class));
Mockito.doNothing().when(c1).send((byte[]) any());

PeerManager.add(context, c1);
}

PeerManager.getPeers().get(0).getSyncBlockRequested()
.put(new BlockId(), System.currentTimeMillis() - NetConstants.SYNC_TIME_OUT - 1000);
ReflectUtils.invokeMethod(service, "statusCheck");

Assert.assertEquals(maxConnection - 1L, PeerManager.getPeers().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testDisconnectRandom() {
clearPeers();
Assert.assertEquals(0, PeerManager.getPeers().size());

for (int i = 0; i < maxConnection; i++) {
for (int i = 0; i < maxConnection + 1; i++) {
InetSocketAddress inetSocketAddress = new InetSocketAddress("201.0.0." + i, 10001);
Channel c1 = spy(Channel.class);
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
Expand All @@ -61,21 +61,37 @@ public void testDisconnectRandom() {

PeerManager.add(context, c1);
}
for (PeerConnection peer : PeerManager.getPeers()) {
for (PeerConnection peer : PeerManager.getPeers()
.subList(0, ResilienceService.minBroadcastPeerSize)) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
peer.setLastInteractiveTime(System.currentTimeMillis() - 1000);
}
for (PeerConnection peer : PeerManager.getPeers()
.subList(ResilienceService.minBroadcastPeerSize, maxConnection + 1)) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(true);
}
int size1 = (int) PeerManager.getPeers().stream()
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.count();
Assert.assertEquals(ResilienceService.minBroadcastPeerSize, size1);
Assert.assertEquals(maxConnection + 1, PeerManager.getPeers().size());

//disconnect from broadcasting peer
ReflectUtils.invokeMethod(service, "disconnectRandom");
size1 = (int) PeerManager.getPeers().stream()
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.count();
Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1);
Assert.assertEquals(maxConnection, PeerManager.getPeers().size());

PeerConnection p1 = PeerManager.getPeers().get(1);
p1.setLastInteractiveTime(
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000);
PeerConnection p2 = PeerManager.getPeers().get(10);
p2.setLastInteractiveTime(
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000);

//disconnect from syncing peer
ReflectUtils.invokeMethod(service, "disconnectRandom");
size1 = (int) PeerManager.getPeers().stream()
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.count();
Assert.assertEquals(ResilienceService.minBroadcastPeerSize - 1, size1);
Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size());
}

Expand Down

0 comments on commit 1996f7b

Please sign in to comment.