Skip to content

Commit

Permalink
Update subscription option handling to account no local (moquette-io#814
Browse files Browse the repository at this point in the history
)

Implements handling of `noLocal` subscription option on MQTT5 connections.


- Updates the Callable signature used by routing to return Void instead of String.
- Updates the publishing of messages to subscribers to exclude or not from the target the clientId of the sender, depending on the status of noLocal in the target subscription.
- Added integration test to proof the change
  • Loading branch information
andsel authored Feb 3, 2024
1 parent 0ac5221 commit e0d0609
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 25 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Version 0.18-SNAPSHOT:
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
- Implements handling of noLocal subscription option on MQTT5 connections. (#814)
[feature] subscription identifiers: (issue #801)
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)
Expand Down
8 changes: 8 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
https://github.com/netty/netty/blob/netty-4.1.100.Final/pom.xml#L650 -->
<netty.tcnative.version>2.0.61.Final</netty.tcnative.version>
<paho.version>1.2.5</paho.version>
<paho.mttt5.version>1.2.5</paho.mttt5.version>
<hivemqclient.version>1.3.3</hivemqclient.version>
<h2.version>2.1.212</h2.version>
</properties>
Expand Down Expand Up @@ -147,6 +148,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>${paho.mttt5.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
Expand Down
4 changes: 3 additions & 1 deletion broker/src/main/java/io/moquette/broker/Authorizator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,7 +75,8 @@ private List<MqttTopicSubscription> verifyTopicsReadAccessWithTopicExtractor(Str
Topic topic = topicExtractor.apply(req.topicName());
final MqttQoS qos = getQoSCheckingAlsoPermissionsOnTopic(clientID, username, messageId, topic,
req.qualityOfService());
ackTopics.add(new MqttTopicSubscription(req.topicName(), qos));
MqttSubscriptionOption option = PostOffice.optionWithQos(qos, req.option());
ackTopics.add(new MqttTopicSubscription(req.topicName(), option));
}
return ackTopics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void processPubComp(MqttMessage msg) {
this.postOffice.routeCommand(clientID, "PUBCOMP", () -> {
checkMatchSessionLoop(clientID);
bindedSession.processPubComp(messageID);
return clientID;
return null;
});
}

Expand Down
57 changes: 38 additions & 19 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@

class PostOffice {

private static final String WILL_PUBLISHER = "will_publisher";
private static final String INTERNAL_PUBLISHER = "internal_publisher";

/**
* Maps the failed packetID per clientId (id client source, id_packet) -> [id client target]
* */
Expand Down Expand Up @@ -271,7 +274,7 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession
}

private void publishWill(ISessionsRepository.Will will) {
publish2Subscribers(Unpooled.copiedBuffer(will.payload), new Topic(will.topic), will.qos);
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic), will.qos);
}

/**
Expand Down Expand Up @@ -359,7 +362,7 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
.filter(sub -> !SharedSubscriptionUtils.isSharedSubscription(sub.topicName()))
.map(sub -> {
final Topic topic = new Topic(sub.topicName());
MqttSubscriptionOption option = MqttSubscriptionOption.onlyFromQos(sub.qualityOfService());
MqttSubscriptionOption option = sub.option();//MqttSubscriptionOption.onlyFromQos(sub.qualityOfService());
if (subscriptionIdOpt.isPresent()) {
return new Subscription(clientID, topic, option, subscriptionIdOpt.get());
} else {
Expand Down Expand Up @@ -400,10 +403,20 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S

private List<MqttTopicSubscription> updateWithMaximumSupportedQoS(List<MqttTopicSubscription> subscriptions) {
return subscriptions.stream()
.map(s -> new MqttTopicSubscription(s.topicName(), minQos(s.qualityOfService(), maxServerGrantedQos)))
.map(this::updateWithMaximumSupportedQoS)
.collect(Collectors.toList());
}

private MqttTopicSubscription updateWithMaximumSupportedQoS(MqttTopicSubscription s) {
MqttQoS grantedQos = minQos(s.qualityOfService(), maxServerGrantedQos);
MqttSubscriptionOption option = optionWithQos(grantedQos, s.option());
return new MqttTopicSubscription(s.topicName(), option);
}

static MqttSubscriptionOption optionWithQos(MqttQoS grantedQos, MqttSubscriptionOption option) {
return new MqttSubscriptionOption(grantedQos, option.isNoLocal(), option.isRetainAsPublished(), option.retainHandling());
}

private static MqttQoS minQos(MqttQoS q1, MqttQoS q2) {
if (q1 == FAILURE || q2 == FAILURE) {
return FAILURE;
Expand Down Expand Up @@ -515,7 +528,7 @@ CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String
ReferenceCountUtil.release(msg);
return CompletableFuture.completedFuture(null);
}
final RoutingResults publishResult = publish2Subscribers(msg.payload(), topic, AT_MOST_ONCE);
final RoutingResults publishResult = publish2Subscribers(clientID, msg.payload(), topic, AT_MOST_ONCE);
if (publishResult.isAllFailed()) {
LOG.info("No one publish was successfully enqueued to session loops");
ReferenceCountUtil.release(msg);
Expand All @@ -534,7 +547,7 @@ CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String
}

RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, String username, int messageID,
MqttPublishMessage msg) {
MqttPublishMessage msg) {
// verify if topic can be written
topic.getTokens();
if (!topic.isValid()) {
Expand All @@ -554,9 +567,9 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, Strin
final RoutingResults routes;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
routes = publish2Subscribers(payload, topic, AT_LEAST_ONCE, failedClients);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, failedClients);
} else {
routes = publish2Subscribers(payload, topic, AT_LEAST_ONCE);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE);
}
if (LOG.isTraceEnabled()) {
LOG.trace("subscriber routes: {}", routes);
Expand Down Expand Up @@ -589,8 +602,8 @@ private void manageRetain(Topic topic, MqttPublishMessage msg) {
}
}

private RoutingResults publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS publishingQos) {
return publish2Subscribers(payload, topic, publishingQos, NO_FILTER);
private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, MqttQoS publishingQos) {
return publish2Subscribers(publisherClientId, payload, topic, publishingQos, NO_FILTER);
}

private class BatchingPublishesCollector {
Expand Down Expand Up @@ -656,7 +669,7 @@ public int countBatches() {
}
}

private RoutingResults publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS publishingQos,
private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, MqttQoS publishingQos,
Set<String> filterTargetClients) {
List<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
if (topicMatchingSubscriptions.isEmpty()) {
Expand All @@ -669,7 +682,15 @@ private RoutingResults publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS

for (final Subscription sub : topicMatchingSubscriptions) {
if (filterTargetClients == NO_FILTER || filterTargetClients.contains(sub.getClientId())) {
collector.add(sub);
if (sub.option().isNoLocal()) {
if (publisherClientId.equals(sub.getClientId())) {
// if noLocal do not publish to the publisher
continue;
}
collector.add(sub);
} else {
collector.add(sub);
}
}
}

Expand Down Expand Up @@ -769,9 +790,9 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
final RoutingResults publishRoutings;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
publishRoutings = publish2Subscribers(payload, topic, EXACTLY_ONCE, failedClients);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, failedClients);
} else {
publishRoutings = publish2Subscribers(payload, topic, EXACTLY_ONCE);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE);
}
if (publishRoutings.isAllSuccess()) {
// QoS2 PUB message was enqueued successfully to every event loop
Expand Down Expand Up @@ -804,18 +825,16 @@ static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) {
* also doesn't notifyTopicPublished because using internally the owner should already know
* where it's publishing.
*
* @param msg
* the message to publish
* @return
* the result of the enqueuing operation to session loops.
* @param msg the message to publish
* @return the result of the enqueuing operation to session loops.
*/
public RoutingResults internalPublish(MqttPublishMessage msg) {
final MqttQoS qos = msg.fixedHeader().qosLevel();
final Topic topic = new Topic(msg.variableHeader().topicName());
final ByteBuf payload = msg.payload();
LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qos);

final RoutingResults publishResult = publish2Subscribers(payload, topic, qos);
final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, payload, topic, qos);
LOG.trace("after routed publishes: {}", publishResult);

if (!msg.fixedHeader().isRetain()) {
Expand Down Expand Up @@ -853,7 +872,7 @@ String sessionLoopThreadName(String clientId) {
/**
* Route the command to the owning SessionEventLoop
* */
public RouteResult routeCommand(String clientId, String actionDescription, Callable<String> action) {
public RouteResult routeCommand(String clientId, String actionDescription, Callable<Void> action) {
return sessionLoops.routeCommand(clientId, actionDescription, action);
}

Expand Down
1 change: 0 additions & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@

import static io.moquette.broker.Session.INFINITE_EXPIRY;
import static io.moquette.logging.LoggingUtils.getInterceptorIds;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;

public class Server {

Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/SessionCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
final class SessionCommand {

private final String sessionId;
private final Callable<String> action;
private final Callable<Void> action;
private final CompletableFuture<String> task;

public SessionCommand(String sessionId, Callable<String> action) {
public SessionCommand(String sessionId, Callable<Void> action) {
this.sessionId = sessionId;
this.action = action;
this.task = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ String sessionLoopThreadName(String clientId) {
/**
* Route the command to the owning SessionEventLoop
*/
public PostOffice.RouteResult routeCommand(String clientId, String actionDescription, Callable<String> action) {
public PostOffice.RouteResult routeCommand(String clientId, String actionDescription, Callable<Void> action) {
SessionCommand cmd = new SessionCommand(clientId, action);

if (clientId == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
*
* * Copyright (c) 2012-2024 The original author or authors
* * ------------------------------------------------------
* * All rights reserved. This program and the accompanying materials
* * are made available under the terms of the Eclipse Public License v1.0
* * and Apache License v2.0 which accompanies this distribution.
* *
* * The Eclipse Public License is available at
* * http://www.eclipse.org/legal/epl-v10.html
* *
* * The Apache License v2.0 is available at
* * http://www.opensource.org/licenses/apache2.0.php
* *
* * You may elect to redistribute this code under either of these licenses.
*
*/

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class SubscriptionOptionsTest extends AbstractSubscriptionIntegrationTest {

@Override
public String clientName() {
return "client";
}

static class PublishCollector implements IMqttMessageListener {
private final CountDownLatch latch = new CountDownLatch(1);
private String receivedTopic;
private MqttMessage receivedMessage;

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
latch.countDown();
receivedTopic = topic;
receivedMessage = message;
}

public String receivedPayload() {
return new String(receivedMessage.getPayload(), StandardCharsets.UTF_8);
}

public void assertReceivedMessageIn(int time, TimeUnit unit) {
try {
assertTrue(latch.await(time, unit), "Publish is received");
} catch (InterruptedException e) {
fail("Wait for message was interrupted");
}
}

public void assertNotReceivedMessageIn(int time, TimeUnit unit) {
try {
assertFalse(latch.await(time, unit), "Publish MUSTN'T be received");
} catch (InterruptedException e) {
fail("Wait for message was interrupted");
}
}
}

@Test
public void givenSubscriptionWithNoLocalEnabledWhenTopicMatchPublishByItselfThenNoPublishAreSentBackToSubscriber() throws MqttException {
MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
client.connect();
MqttSubscription subscription = new MqttSubscription("/metering/temp", 1);
subscription.setNoLocal(true);

PublishCollector publishCollector = new PublishCollector();
IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
verifySubscribedSuccessfully(subscribeToken);

// publish a message on same topic the client subscribed
client.publish("/metering/temp", new MqttMessage("18".getBytes(StandardCharsets.UTF_8), 1, false, null));

// Verify no message is reflected back to the sender
publishCollector.assertNotReceivedMessageIn(2, TimeUnit.SECONDS);
}

@Test
public void givenSubscriptionWithNoLocalDisabledWhenTopicMatchPublishByItselfThenAPublishAreSentBackToSubscriber() throws MqttException {
MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
client.connect();
MqttSubscription subscription = new MqttSubscription("/metering/temp", 1);
// subscription.setNoLocal(false);
PublishCollector publishCollector = new PublishCollector();
IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
verifySubscribedSuccessfully(subscribeToken);

// publish a message on same topic the client subscribed
client.publish("/metering/temp", new MqttMessage("18".getBytes(StandardCharsets.UTF_8), 1, false, null));

// Verify the message is also reflected back to the sender
publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS);
assertEquals("/metering/temp", publishCollector.receivedTopic);
assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match");
assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), publishCollector.receivedMessage.getQos());
}

private static void verifySubscribedSuccessfully(IMqttToken subscribeToken) {
assertEquals(1, subscribeToken.getReasonCodes().length);
assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1.getCode(), subscribeToken.getReasonCodes()[0],
"Client is subscribed to the topic");
}
}

0 comments on commit e0d0609

Please sign in to comment.