Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4630] Fix concurrency problem and split task handle threadpool #4679

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,21 @@
ListenProcessor listenProcessor = new ListenProcessor(this);
registerProcessor(Command.LISTEN_REQUEST, listenProcessor, taskHandleExecutorService);

ThreadPoolExecutor sendExecutorService = super.getTcpThreadPoolGroup().getSendExecutorService();

Check warning on line 225 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java#L225

Added line #L225 was not covered by tests
MessageTransferProcessor messageTransferProcessor = new MessageTransferProcessor(this);
registerProcessor(Command.REQUEST_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.RESPONSE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.REQUEST_TO_SERVER, messageTransferProcessor, sendExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService);

Check warning on line 229 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java#L227-L229

Added lines #L227 - L229 were not covered by tests

ThreadPoolExecutor replyExecutorService = super.getTcpThreadPoolGroup().getReplyExecutorService();
registerProcessor(Command.RESPONSE_TO_SERVER, messageTransferProcessor, replyExecutorService);

Check warning on line 232 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java#L231-L232

Added lines #L231 - L232 were not covered by tests

ThreadPoolExecutor ackExecutorService = super.getTcpThreadPoolGroup().getAckExecutorService();

Check warning on line 234 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java#L234

Added line #L234 was not covered by tests
MessageAckProcessor messageAckProcessor = new MessageAckProcessor(this);
registerProcessor(Command.RESPONSE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.REQUEST_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.RESPONSE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
registerProcessor(Command.REQUEST_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);

Check warning on line 239 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java#L236-L239

Added lines #L236 - L239 were not covered by tests
}

public EventMeshServer getEventMeshServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
private final EventMeshTCPConfiguration eventMeshTCPConfiguration;
private ScheduledExecutorService scheduler;
private ThreadPoolExecutor taskHandleExecutorService;
private ThreadPoolExecutor sendExecutorService;
private ThreadPoolExecutor ackExecutorService;
private ThreadPoolExecutor replyExecutorService;
private ThreadPoolExecutor broadcastMsgDownstreamExecutorService;

public TCPThreadPoolGroup(EventMeshTCPConfiguration eventMeshTCPConfiguration) {
Expand All @@ -45,9 +48,27 @@
taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
new LinkedBlockingQueue<>(10_000),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorQueueSize()),

Check warning on line 51 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L51

Added line #L51 was not covered by tests
new EventMeshThreadFactory("eventMesh-tcp-task-handle", true));

sendExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorPoolSize(),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorQueueSize()),

Check warning on line 57 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L54-L57

Added lines #L54 - L57 were not covered by tests
new EventMeshThreadFactory("eventMesh-tcp-msg-send", true));

replyExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorPoolSize(),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorQueueSize()),

Check warning on line 63 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L60-L63

Added lines #L60 - L63 were not covered by tests
new EventMeshThreadFactory("eventMesh-tcp-msg-reply", true));

ackExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorPoolSize(),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorQueueSize()),

Check warning on line 69 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L66-L69

Added lines #L66 - L69 were not covered by tests
new EventMeshThreadFactory("eventMesh-tcp-msg-ack", true));

broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the queue length from a hard-coded value to a configurable value for this ExecutorService, like that of ExecutorServices above?

eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
Expand All @@ -59,6 +80,9 @@
public void shutdownThreadPool() {
scheduler.shutdown();
taskHandleExecutorService.shutdown();
sendExecutorService.shutdown();;
replyExecutorService.shutdown();
ackExecutorService.shutdown();

Check warning on line 85 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L83-L85

Added lines #L83 - L85 were not covered by tests
broadcastMsgDownstreamExecutorService.shutdown();
}

Expand All @@ -73,4 +97,16 @@
public ThreadPoolExecutor getBroadcastMsgDownstreamExecutorService() {
return broadcastMsgDownstreamExecutorService;
}

public ThreadPoolExecutor getSendExecutorService() {
return sendExecutorService;

Check warning on line 102 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L102

Added line #L102 was not covered by tests
}

public ThreadPoolExecutor getAckExecutorService() {
return ackExecutorService;

Check warning on line 106 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L106

Added line #L106 was not covered by tests
}

public ThreadPoolExecutor getReplyExecutorService() {
return replyExecutorService;

Check warning on line 110 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java#L110

Added line #L110 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,28 @@
private int eventMeshTcpGlobalScheduler = 5;

@ConfigFiled(field = "tcp.taskHandleExecutorPoolSize")
private int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();
private int eventMeshTcpTaskHandleExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.sendExecutorPoolSize")
private int eventMeshTcpMsgSendExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.replyExecutorPoolSize")
private int eventMeshTcpMsgReplyExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.ackExecutorPoolSize")
private int eventMeshTcpMsgAckExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

Comment on lines 61 to +72
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is an IO-intensive scenario, with the original single thread pool being split into four, and each thread pool having twice the coreSize as before, the number of threads is 8 times the original. Considering that the 1-RTT time in EventMesh is relatively short, I suggest setting the coreSize of each thread pool to the number of physical CPU cores and the maxSize to double that. What do you think?

@ConfigFiled(field = "tcp.taskHandleExecutorQueueSize")
private int eventMeshTcpTaskHandleExecutorQueueSize = 10000;

Check warning on line 74 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java#L74

Added line #L74 was not covered by tests

@ConfigFiled(field = "tcp.sendExecutorQueueSize")
private int eventMeshTcpMsgSendExecutorQueueSize = 10000;

Check warning on line 77 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java#L77

Added line #L77 was not covered by tests

@ConfigFiled(field = "tcp.replyExecutorQueueSize")
private int eventMeshTcpMsgReplyExecutorQueueSize = 10000;

Check warning on line 80 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java#L80

Added line #L80 was not covered by tests

@ConfigFiled(field = "tcp.ackExecutorQueueSize")
private int eventMeshTcpMsgAckExecutorQueueSize = 10000;

Check warning on line 83 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java#L83

Added line #L83 was not covered by tests

@ConfigFiled(field = "tcp.msgDownStreamExecutorPoolSize")
private int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,20 @@

session.setSessionState(SessionState.CLOSED);

if (EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) {
cleanClientGroupWrapperByCloseSub(session);
} else if (EventMeshConstants.PURPOSE_PUB.equals(session.getClient().getPurpose())) {
cleanClientGroupWrapperByClosePub(session);
} else {
log.error("client purpose config is error:{}", session.getClient().getPurpose());
final String clientGroup = session.getClient().getGroup();

Check warning on line 170 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java#L170

Added line #L170 was not covered by tests
if (!lockMap.containsKey(clientGroup)) {
lockMap.putIfAbsent(clientGroup, new Object());

Check warning on line 172 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java#L172

Added line #L172 was not covered by tests
}
synchronized (lockMap.get(clientGroup)) {

Check warning on line 174 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java#L174

Added line #L174 was not covered by tests
if (EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) {
cleanClientGroupWrapperByCloseSub(session);

Check warning on line 176 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java#L176

Added line #L176 was not covered by tests
} else if (EventMeshConstants.PURPOSE_PUB.equals(
session.getClient().getPurpose())) {
cleanClientGroupWrapperByClosePub(session);

Check warning on line 179 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java#L178-L179

Added lines #L178 - L179 were not covered by tests
} else {
log.error("client purpose config is error:{}",
session.getClient().getPurpose());

Check warning on line 182 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java#L181-L182

Added lines #L181 - L182 were not covered by tests
}
}

if (session.getContext() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.util.EventMeshUtil;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -67,7 +66,7 @@
log.info("rebalance service shutdown......");
}

public void printRebalanceThreadPoolState() {
EventMeshUtil.printState((ThreadPoolExecutor) serviceRebalanceScheduler);
public int getRebalanceThreadPoolQueueSize() {
return ((ThreadPoolExecutor) serviceRebalanceScheduler).getQueue().size();

Check warning on line 70 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java#L70

Added line #L70 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -126,8 +127,16 @@

}), delay, period, TimeUnit.MILLISECONDS);

monitorThreadPoolTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> {
eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
monitorThreadPoolTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> {
appLogger.info("{TaskHandle:{},Send:{},Ack:{},Reply:{},Push:{},Scheduler:{},Rebalance:{}}",
eventMeshTCPServer.getTcpThreadPoolGroup().getTaskHandleExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getSendExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getAckExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getReplyExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getBroadcastMsgDownstreamExecutorService().getQueue().size(),
((ThreadPoolExecutor) eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler()).getQueue().size(),
eventMeshTCPServer.getEventMeshRebalanceService().getRebalanceThreadPoolQueueSize());

Check warning on line 138 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java#L130-L138

Added lines #L130 - L138 were not covered by tests

eventMeshTCPServer.getTcpRetryer().printState();

// monitor retry queue size
Expand All @@ -137,7 +146,6 @@
EventMeshConstants.PROTOCOL_TCP,
MonitorMetricConstants.RETRY_QUEUE_SIZE,
tcpSummaryMetrics.getRetrySize());

}, 10, PRINT_THREADPOOLSTATE_INTERVAL, TimeUnit.SECONDS);
log.info("EventMeshTcpMonitor started......");
}
Expand Down
Loading