Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKI-690: Refactor P2P manager periodic threads to use ExecutorService #1149

Merged
merged 9 commits into from
Apr 22, 2020
33 changes: 0 additions & 33 deletions modP2pImpl/src/org/aion/p2p/impl/TaskRequestActiveNodes.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aion.p2p.impl1.tasks;
package org.aion.p2p.impl1;

import java.nio.ByteBuffer;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aion.p2p.impl1.tasks;
package org.aion.p2p.impl1;

/** An incoming message. */
public class MsgIn {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aion.p2p.impl1.tasks;
package org.aion.p2p.impl1;

import org.aion.p2p.Msg;
import org.aion.p2p.impl1.P2pMgr.Dest;
Expand Down
10 changes: 0 additions & 10 deletions modP2pImpl/src/org/aion/p2p/impl1/P2pException.java

This file was deleted.

163 changes: 105 additions & 58 deletions modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.aion.p2p.Ctrl;
Expand All @@ -31,28 +31,23 @@
import org.aion.p2p.Msg;
import org.aion.p2p.P2pConstant;
import org.aion.p2p.Ver;
import org.aion.p2p.impl.TaskRequestActiveNodes;
import org.aion.p2p.impl.TaskUPnPManager;
import org.aion.p2p.impl.comm.Node;
import org.aion.p2p.impl.comm.NodeMgr;
import org.aion.p2p.impl.zero.msg.ReqActiveNodes;
import org.aion.p2p.impl.zero.msg.ReqHandshake1;
import org.aion.p2p.impl.zero.msg.ResHandshake1;
import org.aion.p2p.impl1.tasks.MsgIn;
import org.aion.p2p.impl1.tasks.MsgOut;
import org.aion.p2p.impl1.tasks.TaskClear;
import org.aion.p2p.impl1.tasks.TaskConnectPeers;
import org.aion.p2p.impl1.tasks.TaskInbound;
import org.aion.p2p.impl1.tasks.TaskReceive;
import org.aion.p2p.impl1.tasks.TaskSend;
import org.aion.p2p.impl1.tasks.TaskStatus;
import org.apache.commons.collections4.map.LRUMap;
import org.slf4j.Logger;

/** @author Chris p2p://{uuid}@{ip}:{port} */
public final class P2pMgr implements IP2pMgr {
private static final int PERIOD_SHOW_STATUS = 10000;
private static final int PERIOD_REQUEST_ACTIVE_NODES = 1000;
private static final int DELAY_SHOW_P2P_STATUS = 10; // in seconds
private static final int DELAY_CLEAR_PEERS = 10; // in seconds
private static final int DELAY_REQUEST_ACTIVE_NODES = 1; // in seconds
private static final int PERIOD_UPNP_PORT_MAPPING = 3600000;
private static final int DELAY_CONNECT_OUTBOUND = 1; // in seconds
private static final int TIMEOUT_OUTBOUND_CONNECT = 10000; // in milliseconds
private static final int TIMEOUT_MSG_READ = 10000;

private static final int OFFER_TIMEOUT = 100; // in milliseconds
Expand Down Expand Up @@ -96,6 +91,7 @@ public final class P2pMgr implements IP2pMgr {

private static ReqHandshake1 cachedReqHandshake1;
private static ResHandshake1 cachedResHandshake1;
private static final ReqActiveNodes cachedReqActiveNodesMsg = new ReqActiveNodes();

public enum Dest {
INBOUND,
Expand Down Expand Up @@ -171,7 +167,7 @@ public void run() {
try {
selector = Selector.open();

scheduledWorkers = new ScheduledThreadPoolExecutor(2);
scheduledWorkers = Executors.newScheduledThreadPool(5);

tcpServer = ServerSocketChannel.open();
tcpServer.configureBlocking(false);
Expand Down Expand Up @@ -237,26 +233,35 @@ public void run() {
}

if (p2pLOG.isInfoEnabled()) {
Thread threadStatus = new Thread(getStatusInstance(), "p2p-ts");
threadStatus.setPriority(Thread.NORM_PRIORITY);
threadStatus.start();
scheduledWorkers.scheduleWithFixedDelay(
() -> {
Thread.currentThread().setName("p2p-status");
p2pLOG.info(nodeMgr.dumpNodeInfo(selfShortId, p2pLOG.isDebugEnabled()));
p2pLOG.debug("receive queue[{}] send queue[{}]", receiveMsgQue.size(), sendMsgQue.size());
},
DELAY_SHOW_P2P_STATUS, DELAY_SHOW_P2P_STATUS, TimeUnit.SECONDS);
}

if (!syncSeedsOnly) {
scheduledWorkers.scheduleWithFixedDelay(
new TaskRequestActiveNodes(this, p2pLOG),
5000,
PERIOD_REQUEST_ACTIVE_NODES,
TimeUnit.MILLISECONDS);
scheduledWorkers.scheduleWithFixedDelay(() -> {
Thread.currentThread().setName("p2p-reqNodes");
INode node = getRandom();
if (node != null) {
p2pLOG.trace("TaskRequestActiveNodes: {}", node.toString());
send(node.getIdHash(), node.getIdShort(), cachedReqActiveNodesMsg);
}
},
5 * DELAY_REQUEST_ACTIVE_NODES, DELAY_REQUEST_ACTIVE_NODES, TimeUnit.SECONDS);
}

Thread thrdClear = new Thread(getClearInstance(), "p2p-clear");
thrdClear.setPriority(Thread.NORM_PRIORITY);
thrdClear.start();
scheduledWorkers.scheduleWithFixedDelay(
() -> {
Thread.currentThread().setName("p2p-clear");
nodeMgr.timeoutCheck(System.currentTimeMillis());
},
DELAY_CLEAR_PEERS, DELAY_CLEAR_PEERS, TimeUnit.SECONDS);

Thread thrdConn = new Thread(getConnectPeersInstance(), "p2p-conn");
thrdConn.setPriority(Thread.NORM_PRIORITY);
thrdConn.start();
scheduledWorkers.scheduleWithFixedDelay(() -> connectPeers(), DELAY_CONNECT_OUTBOUND, DELAY_CONNECT_OUTBOUND, TimeUnit.SECONDS);
} catch (SocketException e) {
p2pLOG.error("tcp-server-socket-exception.", e);
} catch (IOException e) {
Expand Down Expand Up @@ -285,16 +290,19 @@ public void register(final List<Handler> _cbs) {
}
}

List<Short> supportedVersions = new ArrayList<>(versions);
cachedReqHandshake1 = getReqHandshake1Instance(supportedVersions);
cachedReqHandshake1 = new ReqHandshake1(selfNodeId, selfChainId, selfIp, selfPort, selfRevision.getBytes(), new ArrayList<>(versions));
}

@Override
public void send(int _nodeIdHash, String _nodeIdShort, final Msg _msg) {
public void send(int nodeId, String displayId, final Msg message) {
send(nodeId, displayId, message, Dest.ACTIVE);
}

private void send(int nodeId, String displayId, final Msg message, Dest peerList) {
try {
boolean added = sendMsgQue.offer(new MsgOut(_nodeIdHash, _nodeIdShort, _msg, Dest.ACTIVE), OFFER_TIMEOUT, TimeUnit.MILLISECONDS);
boolean added = sendMsgQue.offer(new MsgOut(nodeId, displayId, message, peerList), OFFER_TIMEOUT, TimeUnit.MILLISECONDS);
if (!added) {
p2pLOG.warn("Message not added to the send queue due to exceeded capacity: msg={} for node={}", _msg, _nodeIdShort);
p2pLOG.warn("Message not added to the send queue due to exceeded capacity: msg={} for node={}", message, displayId);
}
} catch (InterruptedException e) {
p2pLOG.error("Interrupted while attempting to add the message to send to the processing queue:", e);
Expand Down Expand Up @@ -478,7 +486,6 @@ private TaskInbound getInboundInstance() {
this.start,
this.nodeMgr,
this.handlers,
this.sendMsgQue,
cachedResHandshake1,
this.receiveMsgQue);
}
Expand All @@ -487,33 +494,73 @@ private TaskReceive getReceiveInstance() {
return new TaskReceive(p2pLOG, surveyLog, start, receiveMsgQue, handlers);
}

private TaskStatus getStatusInstance() {
return new TaskStatus(p2pLOG, surveyLog, start, nodeMgr, selfShortId, sendMsgQue, receiveMsgQue);
}
private void connectPeers() {
Thread.currentThread().setName("p2p-peer");

private TaskClear getClearInstance() {
return new TaskClear(p2pLOG, nodeMgr, start);
}
INode node;
try {
if (nodeMgr.activeNodesSize() >= maxActiveNodes) {
p2pLOG.warn("tcp-connect-peer pass max-active-nodes.");
return;
}

private TaskConnectPeers getConnectPeersInstance() {
return new TaskConnectPeers(
p2pLOG,
this,
this.start,
this.nodeMgr,
this.maxActiveNodes,
this.selector,
this.sendMsgQue,
cachedReqHandshake1);
}
node = nodeMgr.tempNodesTake();
if (node == null) {
p2pLOG.debug("no temp node can take.");
return;
}

private ReqHandshake1 getReqHandshake1Instance(List<Short> versions) {
return new ReqHandshake1(
selfNodeId,
selfChainId,
this.selfIp,
this.selfPort,
this.selfRevision.getBytes(),
versions);
if (node.getIfFromBootList()) {
nodeMgr.addTempNode(node);
}
} catch (Exception e) {
p2pLOG.debug("tcp-Exception.", e);
return;
}
int nodeIdHash = node.getIdHash();
if (nodeMgr.notAtOutboundList(nodeIdHash) && nodeMgr.notActiveNode(nodeIdHash)) {
int _port = node.getPort();
SocketChannel channel = null;
try {
channel = SocketChannel.open();
channel.socket().connect(new InetSocketAddress(node.getIpStr(), _port), TIMEOUT_OUTBOUND_CONNECT);
configChannel(channel);

if (channel.isConnected()) {
p2pLOG.debug("success-connect node-id={} ip={}", node.getIdShort(), node.getIpStr());

channel.configureBlocking(false);
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ);
ChannelBuffer rb = new ChannelBuffer(p2pLOG);
rb.setDisplayId(node.getIdShort());
rb.setNodeIdHash(nodeIdHash);
sk.attach(rb);

node.refreshTimestamp();
node.setChannel(channel);
nodeMgr.addOutboundNode(node);

p2pLOG.debug("prepare-request-handshake -> id={} ip={}", node.getIdShort(), node.getIpStr());

send(node.getIdHash(), node.getIdShort(), cachedReqHandshake1, Dest.OUTBOUND);
} else {
p2pLOG.debug("fail-connect node-id -> id={} ip={}", node.getIdShort(), node.getIpStr());

channel.close();
// node.peerMetric.incFailedCount();
}
} catch (Exception e) {
p2pLOG.debug("connect-outbound exception -> id=" + node.getIdShort() + " ip=" + node.getIpStr(), e);
p2pLOG.trace("close channel {}", node.toString());

if (channel != null) {
try {
channel.close();
} catch (IOException e1) {
p2pLOG.debug("TaskConnectPeers close exception.", e1);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aion.p2p.impl1.tasks;
package org.aion.p2p.impl1;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -29,7 +29,6 @@
import org.aion.p2p.impl.zero.msg.ResActiveNodes;
import org.aion.p2p.impl.zero.msg.ResHandshake;
import org.aion.p2p.impl.zero.msg.ResHandshake1;
import org.aion.p2p.impl1.P2pException;
import org.aion.p2p.impl1.P2pMgr.Dest;
import org.slf4j.Logger;

Expand All @@ -41,7 +40,6 @@ public class TaskInbound implements Runnable {
private final INodeMgr nodeMgr;
private final Map<Integer, List<Handler>> handlers;
private final AtomicBoolean start;
private final BlockingQueue<MsgOut> sendMsgQue;
private final ResHandshake1 cachedResHandshake1;
private final BlockingQueue<MsgIn> receiveMsgQue;

Expand All @@ -63,7 +61,6 @@ public TaskInbound(
final AtomicBoolean _start,
final INodeMgr _nodeMgr,
final Map<Integer, List<Handler>> _handlers,
final BlockingQueue<MsgOut> _sendMsgQue,
final ResHandshake1 _cachedResHandshake1,
final BlockingQueue<MsgIn> _receiveMsgQue) {

Expand All @@ -74,7 +71,6 @@ public TaskInbound(
this.start = _start;
this.nodeMgr = _nodeMgr;
this.handlers = _handlers;
this.sendMsgQue = _sendMsgQue;
this.cachedResHandshake1 = _cachedResHandshake1;
this.receiveMsgQue = _receiveMsgQue;
}
Expand Down Expand Up @@ -294,7 +290,7 @@ private void readBuffer(
private int readMsg(SelectionKey _sk, ByteBuffer _readBuf, int _cnt) throws IOException {
ChannelBuffer cb = (ChannelBuffer) _sk.attachment();
if (cb == null) {
throw new P2pException("attachment is null");
throw new IOException("attachment is null");
}

int readCnt;
Expand Down Expand Up @@ -456,10 +452,7 @@ private void handleP2pMsg(final SelectionKey _sk, byte _act, final byte[] _msgBy
INode node = nodeMgr.getActiveNode(rb.getNodeIdHash());
if (node != null) {
ResActiveNodes resActiveNodes = new ResActiveNodes(p2pLOG, nodeMgr.getActiveNodesList());
boolean added = sendMsgQue.offer(new MsgOut(node.getIdHash(), node.getIdShort(), resActiveNodes, Dest.ACTIVE));
if (!added) {
p2pLOG.warn("Message not added to the send queue due to exceeded capacity: msg={} for node={}", resActiveNodes, node.getIdShort());
}
mgr.send(node.getIdHash(), node.getIdShort(), resActiveNodes);
}
}
break;
Expand Down Expand Up @@ -537,10 +530,7 @@ private void handleReqHandshake(
binaryVersion = new String(_revision, StandardCharsets.UTF_8);
node.setBinaryVersion(binaryVersion);
nodeMgr.movePeerToActive(_channelHash, "inbound");
boolean added = sendMsgQue.offer(new MsgOut(node.getIdHash(), node.getIdShort(), cachedResHandshake1, Dest.ACTIVE));
if (!added) {
p2pLOG.warn("Message not added to the send queue due to exceeded capacity: msg={} for node={}", cachedResHandshake1, node.getIdShort());
}
mgr.send(node.getIdHash(), node.getIdShort(), cachedResHandshake1);
}

} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aion.p2p.impl1.tasks;
package org.aion.p2p.impl1;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aion.p2p.impl1.tasks;
package org.aion.p2p.impl1;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Loading