Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds debug-enabled check for sessions loop assignments #714

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.17-SNAPSHOT:
[feature] add `moquette.session_loop.debug` property to enable session loop checking assignments (#714).
[break] deprecate `persistent_store` to separate the enablement of persistence with `persistence_enabled` and the path `data_path` (#706).
[enhancement] introduced new queue implementation based on segments in memory mapped files. The type of queue implementation
could be selected by setting `persistent_queue_type` (#691, #704).
Expand Down
37 changes: 33 additions & 4 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ final class MQTTConnection {

private static final Logger LOG = LoggerFactory.getLogger(MQTTConnection.class);

static final boolean sessionLoopDebug = Boolean.parseBoolean(System.getProperty("moquette.session_loop.debug", "false"));

final Channel channel;
private final BrokerConfiguration brokerConfig;
private final IAuthenticator authenticator;
Expand Down Expand Up @@ -108,15 +110,19 @@ void handleMessage(MqttMessage msg) {

private void processPubComp(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
this.postOffice.routeCommand(bindedSession.getClientID(), "PUBCOMP", () -> {
final String clientID = bindedSession.getClientID();
this.postOffice.routeCommand(clientID, "PUBCOMP", () -> {
checkMatchSessionLoop(clientID);
bindedSession.processPubComp(messageID);
return bindedSession.getClientID();
return clientID;
});
}

private void processPubRec(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
this.postOffice.routeCommand(bindedSession.getClientID(), "PUBREC", () -> {
final String clientID = bindedSession.getClientID();
this.postOffice.routeCommand(clientID, "PUBREC", () -> {
checkMatchSessionLoop(clientID);
bindedSession.processPubRec(messageID);
return null;
});
Expand All @@ -131,6 +137,7 @@ private void processPubAck(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
final String clientId = getClientId();
this.postOffice.routeCommand(clientId, "PUB ACK", () -> {
checkMatchSessionLoop(clientId);
bindedSession.pubAckReceived(messageID);
return null;
});
Expand Down Expand Up @@ -174,11 +181,23 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {

final String sessionId = clientId;
return postOffice.routeCommand(clientId, "CONN", () -> {
checkMatchSessionLoop(sessionId);
executeConnect(msg, sessionId);
return null;
});
}

private void checkMatchSessionLoop(String clientId) {
if (!sessionLoopDebug) {
return;
}
final String currentThreadName = Thread.currentThread().getName();
final String expectedThreadName = postOffice.sessionLoopThreadName(clientId);
if (!expectedThreadName.equals(currentThreadName)) {
throw new IllegalStateException("Expected to be executed on thread " + expectedThreadName + " but running on " + currentThreadName + ". This means a programming error");
}
}

/**
* Invoked by the Session's event loop.
* */
Expand Down Expand Up @@ -304,6 +323,7 @@ void handleConnectionLost() {
// this must not be done on the netty thread
LOG.debug("Notifying connection lost event");
postOffice.routeCommand(clientID, "CONN LOST", () -> {
checkMatchSessionLoop(clientID);
if (isBoundToSession() || isSessionUnbound()) {
LOG.debug("Cleaning {}", clientID);
processConnectionLost(clientID);
Expand All @@ -314,6 +334,7 @@ void handleConnectionLost() {
});
}

// Invoked when a TCP connection drops and not when a client send DISCONNECT and close.
private void processConnectionLost(String clientID) {
if (bindedSession.hasWill()) {
postOffice.fireWill(bindedSession.getWill());
Expand Down Expand Up @@ -348,6 +369,7 @@ PostOffice.RouteResult processDisconnect(MqttMessage msg) {
}

return this.postOffice.routeCommand(clientID, "DISCONN", () -> {
checkMatchSessionLoop(clientID);
if (!isBoundToSession()) {
LOG.debug("NOT processing disconnect {}, not bound.", clientID);
return null;
Expand All @@ -371,6 +393,7 @@ PostOffice.RouteResult processSubscribe(MqttSubscribeMessage msg) {
}
final String username = NettyUtils.userName(channel);
return postOffice.routeCommand(clientID, "SUB", () -> {
checkMatchSessionLoop(clientID);
if (isBoundToSession())
postOffice.subscribeClientToTopics(msg, clientID, username, this);
return null;
Expand All @@ -388,6 +411,7 @@ private void processUnsubscribe(MqttUnsubscribeMessage msg) {
final int messageId = msg.variableHeader().messageId();

postOffice.routeCommand(clientID, "UNSUB", () -> {
checkMatchSessionLoop(clientID);
if (!isBoundToSession())
return null;
LOG.trace("Processing UNSUBSCRIBE message. topics: {}", topics);
Expand Down Expand Up @@ -425,20 +449,23 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
switch (qos) {
case AT_MOST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS0", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
postOffice.receivedPublishQos0(topic, username, clientId, msg);
return null;
}).ifFailed(msg::release);
case AT_LEAST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS1", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
postOffice.receivedPublishQos1(this, topic, username, messageID, msg);
return null;
}).ifFailed(msg::release);
case EXACTLY_ONCE: {
final PostOffice.RouteResult firstStepResult = postOffice.routeCommand(clientId, "PUB QoS2", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
bindedSession.receivedPublishQos2(messageID, msg);
Expand Down Expand Up @@ -470,7 +497,9 @@ void sendPubRec(int messageID) {

private void processPubRel(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
postOffice.routeCommand(bindedSession.getClientID(), "PUBREL", () -> {
final String clientID = bindedSession.getClientID();
postOffice.routeCommand(clientID, "PUBREL", () -> {
checkMatchSessionLoop(clientID);
executePubRel(messageID);
return null;
});
Expand Down
17 changes: 15 additions & 2 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,15 @@ public RouteResult ifFailed(Runnable action) {
this.sessionExecutors = new Thread[eventLoops];
for (int i = 0; i < eventLoops; i++) {
this.sessionExecutors[i] = new Thread(new SessionEventLoop(this.sessionQueues[i]));
this.sessionExecutors[i].setName("Session Executor " + i);
this.sessionExecutors[i].setName(sessionLoopName(i));
this.sessionExecutors[i].start();
}
}

private String sessionLoopName(int i) {
return "Session Executor " + i;
}

public void init(SessionRegistry sessionRegistry) {
this.sessionRegistry = sessionRegistry;
}
Expand Down Expand Up @@ -624,12 +628,21 @@ void dispatchConnectionLost(String clientId,String userName) {
interceptor.notifyClientConnectionLost(clientId, userName);
}

String sessionLoopThreadName(String clientId) {
final int targetQueueId = targetQueueOrdinal(clientId);
return sessionLoopName(targetQueueId);
}

private int targetQueueOrdinal(String clientId) {
return Math.abs(clientId.hashCode()) % this.eventLoops;
}

/**
* Route the command to the owning SessionEventLoop
* */
public RouteResult routeCommand(String clientId, String actionDescription, Callable<String> action) {
SessionCommand cmd = new SessionCommand(clientId, action);
final int targetQueueId = Math.abs(cmd.getSessionId().hashCode()) % this.eventLoops;
final int targetQueueId = targetQueueOrdinal(cmd.getSessionId());
LOG.debug("Routing cmd [{}] for session [{}] to event processor {}", actionDescription, cmd.getSessionId(), targetQueueId);
final FutureTask<String> task = new FutureTask<>(() -> {
cmd.execute();
Expand Down