diff --git a/ChangeLog.txt b/ChangeLog.txt index 17ac165a6..36c14ee56 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,7 @@ Version 0.18-SNAPSHOT: + [feature] subscription option handling: (issue #808) + - Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810) + [feature] subscription identifiers: (issue #801) - Implements the validation of subscription identifier properties in SUBSCRIBE. (#803) - Store and retrieve the subscription identifier into the subscriptions directory. (#804) diff --git a/broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java b/broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java index cb284492a..3b344facc 100644 --- a/broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java +++ b/broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java @@ -17,6 +17,7 @@ import io.moquette.broker.subscriptions.*; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import java.util.Collection; import java.util.Set; @@ -42,12 +43,12 @@ public interface ISubscriptionsRepository { /** * Add shared subscription to storage. * */ - void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS); + void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option); /** * Add shared subscription with subscription identifier to storage. * */ - void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS, + void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionIdentifier); /** diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 6b4adab85..97a5f2dc3 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -260,20 +260,20 @@ public void wipeExistingScheduledWill(String clientId) { private static final class SharedSubscriptionData { final ShareName name; final Topic topicFilter; - final MqttQoS requestedQoS; + final MqttSubscriptionOption option; - private SharedSubscriptionData(ShareName name, Topic topicFilter, MqttQoS requestedQoS) { + private SharedSubscriptionData(ShareName name, Topic topicFilter, MqttSubscriptionOption option) { Objects.requireNonNull(name); Objects.requireNonNull(topicFilter); - Objects.requireNonNull(requestedQoS); + Objects.requireNonNull(option); this.name = name; this.topicFilter = topicFilter; - this.requestedQoS = requestedQoS; + this.option = option; } static SharedSubscriptionData fromMqttSubscription(MqttTopicSubscription sub) { return new SharedSubscriptionData(new ShareName(SharedSubscriptionUtils.extractShareName(sub.topicName())), - Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(sub.topicName())), sub.qualityOfService()); + Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(sub.topicName())), sub.option()); } } @@ -284,7 +284,7 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S final Session session = sessionRegistry.retrieve(clientID); final List sharedSubscriptions; - final Optional subscritionIdOpt; + final Optional subscriptionIdOpt; if (mqttConnection.isProtocolVersion5()) { sharedSubscriptions = msg.payload().topicSubscriptions().stream() @@ -303,14 +303,14 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S } try { - subscritionIdOpt = verifyAndExtractMessageIdentifier(msg); + subscriptionIdOpt = verifyAndExtractMessageIdentifier(msg); } catch (IllegalArgumentException ex) { session.disconnectFromBroker(); return; } } else { sharedSubscriptions = Collections.emptyList(); - subscritionIdOpt = Optional.empty(); + subscriptionIdOpt = Optional.empty(); } List ackTopics; @@ -327,28 +327,29 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S .filter(sub -> !SharedSubscriptionUtils.isSharedSubscription(sub.topicName())) .map(sub -> { final Topic topic = new Topic(sub.topicName()); - if (subscritionIdOpt.isPresent()) { - return new Subscription(clientID, topic, sub.qualityOfService(), subscritionIdOpt.get()); + MqttSubscriptionOption option = MqttSubscriptionOption.onlyFromQos(sub.qualityOfService()); + if (subscriptionIdOpt.isPresent()) { + return new Subscription(clientID, topic, option, subscriptionIdOpt.get()); } else { - return new Subscription(clientID, topic, sub.qualityOfService()); + return new Subscription(clientID, topic, option); } }).collect(Collectors.toList()); for (Subscription subscription : newSubscriptions) { - if (subscritionIdOpt.isPresent()) { - subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos(), - subscritionIdOpt.get()); + if (subscriptionIdOpt.isPresent()) { + subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option(), + subscriptionIdOpt.get()); } else { - subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos()); + subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option()); } } for (SharedSubscriptionData sharedSubData : sharedSubscriptions) { - if (subscritionIdOpt.isPresent()) { - subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.requestedQoS, - subscritionIdOpt.get()); + if (subscriptionIdOpt.isPresent()) { + subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.option, + subscriptionIdOpt.get()); } else { - subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.requestedQoS); + subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.option); } } @@ -745,8 +746,8 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage } static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) { - if (qos.value() > sub.getRequestedQos().value()) { - qos = sub.getRequestedQos(); + if (qos.value() > sub.option().qos().value()) { + qos = sub.option().qos(); } return qos; } diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 860f615ec..ef9dc90f2 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -157,7 +157,7 @@ public void addSubscriptions(List newSubscriptions) { } public void removeSubscription(Topic topic) { - subscriptions.remove(new Subscription(data.clientId(), topic, MqttQoS.EXACTLY_ONCE)); + subscriptions.remove(new Subscription(data.clientId(), topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.EXACTLY_ONCE))); } public boolean hasWill() { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index db6705359..00c813636 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -18,6 +18,7 @@ import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest; import io.moquette.broker.subscriptions.CTrie.UnsubscribeRequest; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import java.security.SecureRandom; import java.util.ArrayList; @@ -161,7 +162,7 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri ) { // if subscription identifier hasn't changed, // then check QoS but don't lower the requested QoS level - return existing.getRequestedQos().value() < newSubscription.getRequestedQos().value(); + return existing.option().qos().value() < newSubscription.option().qos().value(); } // subscription identifier changed @@ -201,7 +202,7 @@ private boolean containsSharedSubscriptionsForClient(String clientId) { private static SharedSubscription wrapKey(String clientId) { MqttQoS unusedQoS = MqttQoS.AT_MOST_ONCE; - return new SharedSubscription(null, Topic.asTopic("unused"), clientId, unusedQoS); + return new SharedSubscription(null, Topic.asTopic("unused"), clientId, MqttSubscriptionOption.onlyFromQos(unusedQoS)); } //TODO this is equivalent to negate(containsOnly(clientId)) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 2ed019ed2..5c1740fd1 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -1,6 +1,6 @@ package io.moquette.broker.subscriptions; -import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import java.util.ArrayList; import java.util.Collections; @@ -18,50 +18,49 @@ public final static class SubscriptionRequest { private final Topic topicFilter; private final String clientId; - private final MqttQoS requestedQoS; - + private final MqttSubscriptionOption option; private boolean shared = false; private ShareName shareName; private Optional subscriptionIdOpt; - private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) { + private SubscriptionRequest(String clientId, Topic topicFilter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) { this.topicFilter = topicFilter; this.clientId = clientId; - this.requestedQoS = requestedQoS; + this.option = option; this.subscriptionIdOpt = Optional.of(subscriptionId); } - private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS) { + private SubscriptionRequest(String clientId, Topic topicFilter, MqttSubscriptionOption option) { this.topicFilter = topicFilter; this.clientId = clientId; - this.requestedQoS = requestedQoS; + this.option = option; this.subscriptionIdOpt = Optional.empty(); } public static SubscriptionRequest buildNonShared(Subscription subscription) { - return buildNonShared(subscription.clientId, subscription.topicFilter, subscription.getRequestedQos()); + return buildNonShared(subscription.clientId, subscription.topicFilter, subscription.option()); } - public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttQoS requestedQoS) { - return new SubscriptionRequest(clientId, topicFilter, requestedQoS); + public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttSubscriptionOption option) { + return new SubscriptionRequest(clientId, topicFilter, option); } - public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttQoS requestedQoS, - SubscriptionIdentifier subscriptionId) { + public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, + MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) { Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null"); - return new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId); + return new SubscriptionRequest(clientId, topicFilter, option, subscriptionId); } public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, - MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) { + MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) { Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null"); return buildSharedHelper(shareName, topicFilter, - () -> new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId)); + () -> new SubscriptionRequest(clientId, topicFilter, option, subscriptionId)); } - public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS) { + public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, MqttSubscriptionOption option) { return buildSharedHelper(shareName, topicFilter, - () -> new SubscriptionRequest(clientId, topicFilter, requestedQoS)); + () -> buildNonShared(clientId, topicFilter, option)); } private static SubscriptionRequest buildSharedHelper(ShareName shareName, Topic topicFilter, Supplier instantiator) { @@ -78,20 +77,20 @@ public Topic getTopicFilter() { return topicFilter; } - public MqttQoS getRequestedQoS() { - return requestedQoS; + public MqttSubscriptionOption getOption() { + return option; } public Subscription subscription() { return subscriptionIdOpt - .map(subscriptionIdentifier -> new Subscription(clientId, topicFilter, requestedQoS, subscriptionIdentifier)) - .orElseGet(() -> new Subscription(clientId, topicFilter, requestedQoS)); + .map(subscriptionIdentifier -> new Subscription(clientId, topicFilter, option, subscriptionIdentifier)) + .orElseGet(() -> new Subscription(clientId, topicFilter, option)); } public SharedSubscription sharedSubscription() { return subscriptionIdOpt - .map(subId -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS, subId)) - .orElseGet(() -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS)); + .map(subId -> new SharedSubscription(shareName, topicFilter, clientId, option, subId)) + .orElseGet(() -> new SharedSubscription(shareName, topicFilter, clientId, option)); } public boolean isShared() { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index bbd1be347..ca66754ed 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -19,6 +19,7 @@ import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest; import io.moquette.broker.subscriptions.CTrie.UnsubscribeRequest; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +60,7 @@ public void init(ISubscriptionsRepository subscriptionsRepository) { for (SharedSubscription shared : subscriptionsRepository.listAllSharedSubscription()) { LOG.debug("Re-subscribing shared {}", shared); ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(), - shared.clientId(), shared.requestedQoS())); + shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS()))); } if (LOG.isTraceEnabled()) { @@ -101,14 +102,14 @@ public List matchQosSharpening(Topic topicName) { } @Override - public void add(String clientId, Topic filter, MqttQoS requestedQoS) { - SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS); + public void add(String clientId, Topic filter, MqttSubscriptionOption option) { + SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option); addNonSharedSubscriptionRequest(subRequest); } @Override - public void add(String clientId, Topic filter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) { - SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS, subscriptionId); + public void add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) { + SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option, subscriptionId); addNonSharedSubscriptionRequest(subRequest); } @@ -118,8 +119,8 @@ private void addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) { } @Override - public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS) { - SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS); + public void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option) { + SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, option); addSharedSubscriptionRequest(shareSubRequest); } @@ -128,19 +129,19 @@ private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) { if (shareSubRequest.hasSubscriptionIdentifier()) { subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(), - shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS(), shareSubRequest.getSubscriptionIdentifier()); + shareSubRequest.getTopicFilter(), shareSubRequest.getOption(), shareSubRequest.getSubscriptionIdentifier()); } else { subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(), - shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS()); + shareSubRequest.getTopicFilter(), shareSubRequest.getOption()); } List sharedSubscriptions = clientSharedSubscriptions.computeIfAbsent(shareSubRequest.getClientId(), unused -> new ArrayList<>()); sharedSubscriptions.add(shareSubRequest.sharedSubscription()); } @Override - public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS, + public void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) { - SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS, subscriptionId); + SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, option, subscriptionId); addSharedSubscriptionRequest(shareSubRequest); } @@ -165,7 +166,7 @@ public void removeSharedSubscription(ShareName name, Topic topicFilter, String c subscriptionsRepository.removeSharedSubscription(clientId, name, topicFilter); - SharedSubscription sharedSubscription = new SharedSubscription(name, topicFilter, clientId, MqttQoS.AT_MOST_ONCE /* UNUSED in compare */); + SharedSubscription sharedSubscription = new SharedSubscription(name, topicFilter, clientId, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE) /* UNUSED in compare */); List sharedSubscriptions = clientSharedSubscriptions.get(clientId); if (sharedSubscriptions != null && !sharedSubscriptions.isEmpty()) { sharedSubscriptions.remove(sharedSubscription); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java index aa6a689c8..3647ee0b8 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java @@ -39,7 +39,7 @@ private String prettySubscriptions(CNode node) { for (Subscription couple : node.subscriptions()) { subScriptionsStr .append("{filter=").append(couple.topicFilter).append(", ") - .append("qos=").append(couple.getRequestedQos()).append(", ") + .append("option=").append(couple.option()).append(", ") .append("client='").append(couple.clientId).append("'}"); counter++; if (counter < node.subscriptions().size()) { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java index f778a060d..620e74f52 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java @@ -17,6 +17,7 @@ import io.moquette.broker.ISubscriptionsRepository; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import java.util.List; import java.util.Set; @@ -32,13 +33,13 @@ public interface ISubscriptionsDirectory { List matchQosSharpening(Topic topic); - void add(String clientId, Topic filter, MqttQoS requestedQoS); + void add(String clientId, Topic filter, MqttSubscriptionOption option); - void add(String clientId, Topic filter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId); + void add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId); - void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS); + void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option); - void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId); + void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId); void removeSubscription(Topic topic, String clientID); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java b/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java index 4ad857451..d1912f27e 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java @@ -16,6 +16,7 @@ package io.moquette.broker.subscriptions; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import java.util.Objects; import java.util.Optional; @@ -27,25 +28,25 @@ public final class SharedSubscription implements Comparable private final ShareName shareName; private final Topic topicFilter; private final String clientId; - private final MqttQoS requestedQoS; + private final MqttSubscriptionOption option; private final Optional subscriptionId; - public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS) { - Objects.requireNonNull(requestedQoS, "qos parameter can't be null"); + public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttSubscriptionOption option) { + Objects.requireNonNull(option, "option parameter can't be null"); this.shareName = shareName; this.topicFilter = topicFilter; this.clientId = clientId; - this.requestedQoS = requestedQoS; + this.option = option; this.subscriptionId = Optional.empty(); } - public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS, - SubscriptionIdentifier subscriptionId) { - Objects.requireNonNull(requestedQoS, "qos parameter can't be null"); + public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, + MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) { + Objects.requireNonNull(option, "option parameter can't be null"); this.shareName = shareName; this.topicFilter = topicFilter; this.clientId = clientId; - this.requestedQoS = requestedQoS; + this.option = option; this.subscriptionId = Optional.of(subscriptionId); } @@ -58,7 +59,11 @@ public Topic topicFilter() { } public MqttQoS requestedQoS() { - return requestedQoS; + return option.qos(); + } + + public MqttSubscriptionOption getOption() { + return option; } public ShareName getShareName() { @@ -70,9 +75,9 @@ public ShareName getShareName() { * */ Subscription createSubscription() { if (subscriptionId.isPresent()) { - return new Subscription(clientId, topicFilter, requestedQoS, shareName.getShareName(), subscriptionId.get()); + return new Subscription(clientId, topicFilter, option, shareName.getShareName(), subscriptionId.get()); } else { - return new Subscription(clientId, topicFilter, requestedQoS, shareName.getShareName()); + return new Subscription(clientId, topicFilter, option, shareName.getShareName()); } } @@ -97,11 +102,11 @@ public boolean equals(Object o) { return Objects.equals(shareName, that.shareName) && Objects.equals(topicFilter, that.topicFilter) && Objects.equals(clientId, that.clientId) && - Objects.equals(requestedQoS, that.requestedQoS); + Objects.equals(option, that.option); } @Override public int hashCode() { - return Objects.hash(shareName, topicFilter, clientId, requestedQoS); + return Objects.hash(shareName, topicFilter, clientId, option); } } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java b/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java index 02ed44a48..35ecc2b21 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java @@ -17,6 +17,7 @@ package io.moquette.broker.subscriptions; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import java.io.Serializable; import java.util.Objects; @@ -25,59 +26,55 @@ /** * Maintain the information about which Topic a certain ClientID is subscribed and at which QoS */ -public final class Subscription implements Serializable, Comparable { +public final class Subscription implements Serializable, Comparable{ private static final long serialVersionUID = -3383457629635732794L; - private final MqttQoS requestedQos; // max QoS acceptable + private final MqttSubscriptionOption option; final String clientId; final Topic topicFilter; final String shareName; private final Optional subscriptionId; - public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos) { - this(clientId, topicFilter, requestedQos, "", null); + public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption options) { + this(clientId, topicFilter, options, ""); } - public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, SubscriptionIdentifier subscriptionId) { - this(clientId, topicFilter, requestedQos, "", subscriptionId); + public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption options, SubscriptionIdentifier subscriptionId) { + this(clientId, topicFilter, options, "", subscriptionId); } - public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, String shareName) { - this(clientId, topicFilter, requestedQos, shareName, null); + public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption options, String shareName) { + this(clientId, topicFilter, options, shareName, null); } - public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, String shareName, + public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption options, String shareName, SubscriptionIdentifier subscriptionId) { - this.requestedQos = requestedQos; this.clientId = clientId; this.topicFilter = topicFilter; this.shareName = shareName; this.subscriptionId = Optional.ofNullable(subscriptionId); + this.option = options; } public Subscription(Subscription orig) { - this.requestedQos = orig.requestedQos; this.clientId = orig.clientId; this.topicFilter = orig.topicFilter; this.shareName = orig.shareName; this.subscriptionId = orig.subscriptionId; + this.option = orig.option; } public String getClientId() { return clientId; } - public MqttQoS getRequestedQos() { - return requestedQos; - } - public Topic getTopicFilter() { return topicFilter; } public boolean qosLessThan(Subscription sub) { - return requestedQos.value() < sub.requestedQos.value(); + return option.qos().value() < sub.option.qos().value(); } public boolean hasSubscriptionIdentifier() { @@ -105,7 +102,7 @@ public int hashCode() { @Override public String toString() { - return String.format("[filter:%s, clientID: %s, qos: %s - shareName: %s]", topicFilter, clientId, requestedQos, shareName); + return String.format("[filter:%s, clientID: %s, options: %s - shareName: %s]", topicFilter, clientId, option, shareName); } @Override @@ -142,4 +139,8 @@ public boolean hasShareName() { public String getShareName() { return shareName; } + + public MqttSubscriptionOption option() { + return option; + } } diff --git a/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java b/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java index a3463d0a3..14cfb4b45 100644 --- a/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java +++ b/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java @@ -18,6 +18,7 @@ import io.moquette.broker.subscriptions.Subscription; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; public class InterceptSubscribeMessage implements InterceptMessage { @@ -34,7 +35,11 @@ public String getClientID() { } public MqttQoS getRequestedQos() { - return subscription.getRequestedQos(); + return subscription.option().qos(); + } + + public MqttSubscriptionOption getOption() { + return subscription.option(); } public String getTopicFilter() { diff --git a/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java b/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java index 85c50002f..d94987bb3 100644 --- a/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java +++ b/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java @@ -7,6 +7,7 @@ import io.moquette.broker.subscriptions.SubscriptionIdentifier; import io.moquette.broker.subscriptions.Topic; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.h2.mvstore.Cursor; import org.h2.mvstore.MVMap; import org.h2.mvstore.MVStore; @@ -32,7 +33,7 @@ public class H2SubscriptionsRepository implements ISubscriptionsRepository { private static final String SUBSCRIPTIONS_MAP = "subscriptions"; private static final String SHARED_SUBSCRIPTIONS_MAP = "shared_subscriptions"; private final MVStore mvStore; - private final MVMap.Builder, QoSSubscriptionId> submapBuilder; + private final MVMap.Builder, SubscriptionOptionAndId> submapBuilder; private MVMap subscriptions; // clientId -> shared subscription map name @@ -43,8 +44,9 @@ public class H2SubscriptionsRepository implements ISubscriptionsRepository { H2SubscriptionsRepository(MVStore mvStore) { this.mvStore = mvStore; - submapBuilder = new MVMap.Builder, QoSSubscriptionId>() - .keyType(new CoupleValueType()); + submapBuilder = new MVMap.Builder, SubscriptionOptionAndId>() + .keyType(new CoupleValueType()) + .valueType(new SubscriptionOptionAndIdValueType()); this.subscriptions = mvStore.openMap(SUBSCRIPTIONS_MAP, subscriptionBuilder); sharedSubscriptions = mvStore.openMap(SHARED_SUBSCRIPTIONS_MAP); @@ -96,7 +98,7 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top LOG.info("Removing a non existing shared subscription for client: {}", clientId); return; } - MVMap, QoSSubscriptionId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); + MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); Couple sharedSubKey = Couple.of(share, topicFilter); // remove from submap, null means the key didn't exist @@ -111,40 +113,40 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top } } - private static class QoSSubscriptionId implements Serializable { - final MqttQoS qos; + private static class SubscriptionOptionAndId implements Serializable { + final MqttSubscriptionOption option; final Integer subscriptionIdentifier; - public QoSSubscriptionId(MqttQoS qos, int subscriptionIdentifier) { - this.qos = qos; + public SubscriptionOptionAndId(MqttSubscriptionOption option, int subscriptionIdentifier) { + this.option = option; this.subscriptionIdentifier = subscriptionIdentifier; } - public QoSSubscriptionId(MqttQoS qos) { - this.qos = qos; + public SubscriptionOptionAndId(MqttSubscriptionOption option) { + this.option = option; this.subscriptionIdentifier = null; } } @Override - public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS) { - QoSSubscriptionId qosPart = new QoSSubscriptionId(requestedQoS); + public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option) { + SubscriptionOptionAndId qosPart = new SubscriptionOptionAndId(option); storeNewSharedSubscription(clientId, share, topicFilter, qosPart); } - private void storeNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, QoSSubscriptionId value) { + private void storeNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, SubscriptionOptionAndId value) { String sharedSubsMapName = sharedSubscriptions.computeIfAbsent(clientId, H2SubscriptionsRepository::computeShareSubscriptionSubMap); // maps the couple (share name, topic) to requested qos - MVMap, QoSSubscriptionId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); + MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); subMap.put(Couple.of(share, topicFilter), value); } @Override - public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS, + public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionIdentifier) { - QoSSubscriptionId qosAndSubscriptionIdPart = new QoSSubscriptionId(requestedQoS, subscriptionIdentifier.value()); + SubscriptionOptionAndId qosAndSubscriptionIdPart = new SubscriptionOptionAndId(option, subscriptionIdentifier.value()); storeNewSharedSubscription(clientId, share, topicFilter, qosAndSubscriptionIdPart); } @@ -156,19 +158,19 @@ public Collection listAllSharedSubscription() { String clientId = entry.getKey(); String sharedSubsMapName = entry.getValue(); - MVMap, QoSSubscriptionId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); - for (Map.Entry, QoSSubscriptionId> subEntry : subMap.entrySet()) { + MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); + for (Map.Entry, SubscriptionOptionAndId> subEntry : subMap.entrySet()) { final ShareName shareName = subEntry.getKey().v1; final Topic topicFilter = subEntry.getKey().v2; - final MqttQoS qos = subEntry.getValue().qos; + final MqttSubscriptionOption option = subEntry.getValue().option; SharedSubscription subscription; if (subEntry.getValue().subscriptionIdentifier == null) { // without subscription identifier - subscription = new SharedSubscription(shareName, topicFilter, clientId, qos); + subscription = new SharedSubscription(shareName, topicFilter, clientId, option); } else { // with subscription identifier SubscriptionIdentifier subscriptionId = new SubscriptionIdentifier(subEntry.getValue().subscriptionIdentifier); - subscription = new SharedSubscription(shareName, topicFilter, clientId, qos, subscriptionId); + subscription = new SharedSubscription(shareName, topicFilter, clientId, option, subscriptionId); } result.add(subscription); } @@ -217,6 +219,80 @@ public Couple[] createStorage(int i) { } } + private static final class SubscriptionOptionAndIdValueType extends BasicDataType { + + @Override + public int getMemory(SubscriptionOptionAndId obj) { + return 4 + // integer, subscription identifier + SubscriptionOptionValueType.INSTANCE.getMemory(obj.option); + } + + @Override + public void write(WriteBuffer buff, SubscriptionOptionAndId obj) { + if (obj.subscriptionIdentifier != null) { + buff.putInt(obj.subscriptionIdentifier.intValue()); + } else { + buff.putInt(-1); + } + SubscriptionOptionValueType.INSTANCE.write(buff, obj.option); + } + + @Override + public SubscriptionOptionAndId read(ByteBuffer buff) { + int subId = buff.getInt(); + MqttSubscriptionOption option = SubscriptionOptionValueType.INSTANCE.read(buff); + if (subId != -1) { + return new SubscriptionOptionAndId(option, subId); + } else { + return new SubscriptionOptionAndId(option); + } + } + + @Override + public SubscriptionOptionAndId[] createStorage(int size) { + return new SubscriptionOptionAndId[size]; + } + } + + private static final class SubscriptionOptionValueType extends BasicDataType { + public static final SubscriptionOptionValueType INSTANCE = new SubscriptionOptionValueType(); + + @Override + public int getMemory(MqttSubscriptionOption obj) { + return 1; + } + + @Override + public void write(WriteBuffer buff, MqttSubscriptionOption opt) { + // 2 bits for QoS (LSB) + // 1 flag for no local + // 1 flag for retains as published + // 2 bits for retains handling policy (MSB) + byte composed = (byte) (opt.qos().value() & 0x03); // qos + composed = (byte) (composed | ((byte)(opt.isNoLocal() ? 1 : 0) << 2)); // no local + composed = (byte) (composed | ((byte)(opt.isRetainAsPublished() ? 1 : 0) << 3)); // retains as published + composed = (byte) (composed | (byte) (opt.retainHandling().value() << 4)); + buff.put(composed); + } + + @Override + public MqttSubscriptionOption read(ByteBuffer buff) { + byte fields = buff.get(); + final MqttQoS qos = MqttQoS.valueOf(fields & 0x03); + final boolean noLocal = (fields & 0x04) > 0; + final boolean retainAsPublished = (fields & 0x08) > 0; + final MqttSubscriptionOption.RetainedHandlingPolicy retainedHandlingPolicy = + MqttSubscriptionOption.RetainedHandlingPolicy.valueOf((fields & 0x30) >> 4); + + return new MqttSubscriptionOption(qos, noLocal, retainAsPublished, retainedHandlingPolicy); + } + + @Override + public MqttSubscriptionOption[] createStorage(int size) { + return new MqttSubscriptionOption[size]; + } + } + private static final class SubscriptionValueType extends BasicDataType { @@ -224,7 +300,7 @@ private static final class SubscriptionValueType extends BasicDataType 0; boolean hasSubscriptionIdentifier = (flag & (byte) 0x2) > 0; @@ -259,16 +335,16 @@ public Subscription read(ByteBuffer buff) { String shareName = StringDataType.INSTANCE.read(buff); if (hasSubscriptionIdentifier) { SubscriptionIdentifier subId = new SubscriptionIdentifier(buff.getInt()); - return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS, shareName, subId); + return new Subscription(clientId, Topic.asTopic(topicFilter), options, shareName, subId); } else { - return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS, shareName); + return new Subscription(clientId, Topic.asTopic(topicFilter), options, shareName); } } else { if (hasSubscriptionIdentifier) { SubscriptionIdentifier subId = new SubscriptionIdentifier(buff.getInt()); - return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS, subId); + return new Subscription(clientId, Topic.asTopic(topicFilter), options, subId); } else { - return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS); + return new Subscription(clientId, Topic.asTopic(topicFilter), options); } } } diff --git a/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java b/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java index af42abc40..25d6c63bc 100644 --- a/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java +++ b/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java @@ -22,6 +22,7 @@ import io.moquette.broker.subscriptions.SubscriptionIdentifier; import io.moquette.broker.subscriptions.Topic; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,8 +79,8 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top } @Override - public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS) { - SharedSubscription sharedSub = new SharedSubscription(share, topicFilter, clientId, requestedQoS); + public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option) { + SharedSubscription sharedSub = new SharedSubscription(share, topicFilter, clientId, option); storeNewSharedSubscription(clientId, share, topicFilter, sharedSub); } @@ -89,9 +90,9 @@ private void storeNewSharedSubscription(String clientId, ShareName share, Topic } @Override - public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS, + public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionIdentifier) { - SharedSubscription sharedSub = new SharedSubscription(share, topicFilter, clientId, requestedQoS, subscriptionIdentifier); + SharedSubscription sharedSub = new SharedSubscription(share, topicFilter, clientId, option, subscriptionIdentifier); storeNewSharedSubscription(clientId, share, topicFilter, sharedSub); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index 5b622219c..5c285ad14 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -337,7 +336,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire assertEquals(desiredQos.value(), (int) subAck.payload().grantedQoSLevels().get(0)); final String clientId = connection.getClientId(); - Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); + Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 074d804c0..338d504ee 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -195,7 +195,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire assertEquals(desiredQos.value(), (int) subAck.payload().grantedQoSLevels().get(0)); final String clientId = connection.getClientId(); - Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); + Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index fd9963e33..53d2aa39f 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -143,7 +143,7 @@ protected void subscribe(EmbeddedChannel channel, String topic, MqttQoS desiredQ assertEquals(desiredQos.value(), (int) subAck.payload().grantedQoSLevels().get(0)); final String clientId = NettyUtils.clientID(channel); - Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); + Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); @@ -163,7 +163,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire assertEquals(desiredQos.value(), (int) subAck.payload().grantedQoSLevels().get(0)); final String clientId = connection.getClientId(); - Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); + Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); @@ -337,10 +337,10 @@ public void testReceiveRetainedPublishRespectingSubscriptionQoSAndNotPublisher() @Test public void testLowerTheQosToTheRequestedBySubscription() { - Subscription subQos1 = new Subscription("Sub A", new Topic("a/b"), MqttQoS.AT_LEAST_ONCE); + Subscription subQos1 = new Subscription("Sub A", new Topic("a/b"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE)); assertEquals(MqttQoS.AT_LEAST_ONCE, PostOffice.lowerQosToTheSubscriptionDesired(subQos1, EXACTLY_ONCE)); - Subscription subQos2 = new Subscription("Sub B", new Topic("a/+"), EXACTLY_ONCE); + Subscription subQos2 = new Subscription("Sub B", new Topic("a/+"), MqttSubscriptionOption.onlyFromQos(EXACTLY_ONCE)); assertEquals(EXACTLY_ONCE, PostOffice.lowerQosToTheSubscriptionDesired(subQos2, EXACTLY_ONCE)); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 22acf885b..ca5d4148d 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -33,7 +33,6 @@ import java.nio.charset.Charset; import java.util.Collections; -import java.util.Set; import java.util.concurrent.*; import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; @@ -123,7 +122,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire assertEquals(desiredQos.value(), (int) subAck.payload().grantedQoSLevels().get(0)); final String clientId = connection.getClientId(); - Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); + Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); final List matchedSubscriptions = subscriptions.matchQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); diff --git a/broker/src/test/java/io/moquette/broker/SessionTest.java b/broker/src/test/java/io/moquette/broker/SessionTest.java index 22f9f272f..31b158935 100644 --- a/broker/src/test/java/io/moquette/broker/SessionTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionTest.java @@ -6,6 +6,7 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import io.netty.handler.codec.mqtt.MqttVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -21,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; -import static org.junit.jupiter.api.Assertions.*; public class SessionTest { @@ -115,9 +115,9 @@ public void testSecondResendOfANotAckedMessage() throws InterruptedException { @Test public void testRemoveSubscription() { - client.addSubscriptions(Arrays.asList(new Subscription(CLIENT_ID, new Topic("topic/one"), MqttQoS.AT_MOST_ONCE))); + client.addSubscriptions(Arrays.asList(new Subscription(CLIENT_ID, new Topic("topic/one"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)))); Assertions.assertThat(client.getSubscriptions()).hasSize(1); - client.addSubscriptions(Arrays.asList(new Subscription(CLIENT_ID, new Topic("topic/one"), MqttQoS.EXACTLY_ONCE))); + client.addSubscriptions(Arrays.asList(new Subscription(CLIENT_ID, new Topic("topic/one"), MqttSubscriptionOption.onlyFromQos(MqttQoS.EXACTLY_ONCE)))); Assertions.assertThat(client.getSubscriptions()).hasSize(1); client.removeSubscription(new Topic("topic/one")); Assertions.assertThat(client.getSubscriptions()).isEmpty(); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java index 3c9a8eb0b..cb83fcb6f 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java @@ -16,6 +16,7 @@ package io.moquette.broker.subscriptions; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.junit.jupiter.api.Test; import java.util.List; import static io.moquette.broker.subscriptions.Topic.asTopic; @@ -27,18 +28,22 @@ public class CTrieSharedSubscriptionDirectoryMatchingTest extends CTrieSubscriptionDirectMatchingCommon { + static MqttSubscriptionOption asOption(MqttQoS qos) { + return MqttSubscriptionOption.onlyFromQos(qos); + } + @Test public void whenNotMatchingSharedTopicThenNoSubscriptionShouldBeSelected() { - sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/"), MqttQoS.AT_MOST_ONCE); + sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/"), asOption(MqttQoS.AT_MOST_ONCE)); assertThat(sut.matchWithoutQosSharpening(asTopic("livingroom"))).isEmpty(); - sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), MqttQoS.AT_MOST_ONCE); + sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); assertThat(sut.matchWithoutQosSharpening(asTopic("livingroom"))).isEmpty(); } @Test public void whenMatchingSharedTopicThenOneSubscriptionShouldBeSelected() { - sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), MqttQoS.AT_MOST_ONCE); + sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); assertThat(sut.matchWithoutQosSharpening(asTopic("/livingroom"))) .contains(SubscriptionTestUtils.asSubscription("TempSensor1", "/livingroom", "temp_sensors")); @@ -46,8 +51,8 @@ public void whenMatchingSharedTopicThenOneSubscriptionShouldBeSelected() { @Test public void whenManySharedSubscriptionsOfDifferentShareNameMatchATopicThenOneSubscriptionForEachShareNameMustBeSelected() { - sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), MqttQoS.AT_MOST_ONCE); - sut.addShared("TempSensor1", new ShareName("livingroom_devices"), asTopic("/livingroom"), MqttQoS.AT_MOST_ONCE); + sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); + sut.addShared("TempSensor1", new ShareName("livingroom_devices"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions) @@ -59,8 +64,8 @@ public void whenManySharedSubscriptionsOfDifferentShareNameMatchATopicThenOneSub @Test public void givenSessionHasMultipleSharedSubscriptionWhenTheClientIsRemovedThenNoMatchingShouldHappen() { String clientId = "TempSensor1"; - sut.addShared(clientId, new ShareName("temp_sensors"), asTopic("/livingroom"), MqttQoS.AT_MOST_ONCE); - sut.addShared(clientId, new ShareName("livingroom_devices"), asTopic("/livingroom"), MqttQoS.AT_MOST_ONCE); + sut.addShared(clientId, new ShareName("temp_sensors"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); + sut.addShared(clientId, new ShareName("livingroom_devices"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); // Exercise sut.removeSharedSubscriptionsForClient(clientId); @@ -73,7 +78,7 @@ public void givenSessionHasMultipleSharedSubscriptionWhenTheClientIsRemovedThenN @Test public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThenSubscriptionIdIsUpdated() { // subscribe a client on topic with subscription identifier - sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); // verify it contains the subscription identifier @@ -81,7 +86,7 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1), "share_temp"); // update the subscription of same clientId on same topic filter but with different subscription identifier - sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(123)); // verify the subscription identifier is updated @@ -101,7 +106,7 @@ private static void verifySubscriptionIdentifierIsPresent(List mat @Test public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscriptionIdIsProcessedThenSubscriptionIdIsWiped() { // subscribe a client on topic with subscription identifier - sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); // verify it contains the subscription identifier @@ -109,7 +114,7 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscri verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); // update the subscription of same clientId on same topic filter but removing subscription identifier - sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index a0e010df0..60660cbfd 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; + +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -36,7 +38,7 @@ public class CTrieSpeedTest { private static final int TOTAL_SUBSCRIPTIONS = 500_000; static SubscriptionRequest clientSubOnTopic(String clientID, String topicName) { - return SubscriptionRequest.buildNonShared(clientID, asTopic(topicName), MqttQoS.AT_MOST_ONCE); + return SubscriptionRequest.buildNonShared(clientID, asTopic(topicName), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); } @Test @@ -88,7 +90,7 @@ public List prepareSubscriptionsManyClientsFewTopic() { List subscriptionList = new ArrayList<>(TOTAL_SUBSCRIPTIONS); for (int i = 0; i < TOTAL_SUBSCRIPTIONS; i++) { Topic topic = asTopic("topic/test/" + new Random().nextInt(1 + i % 10) + "/test"); - subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + i, topic, MqttQoS.AT_LEAST_ONCE)); + subscriptionList.add(SubscriptionRequest.buildNonShared("TestClient-" + i, topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_LEAST_ONCE))); } return subscriptionList; } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java index 5c3d79bca..7d1438732 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java @@ -19,11 +19,13 @@ import io.moquette.broker.ISubscriptionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Optional; +import static io.moquette.broker.subscriptions.CTrieSharedSubscriptionDirectoryMatchingTest.asOption; import static io.moquette.broker.subscriptions.Topic.asTopic; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; @@ -187,13 +189,13 @@ private void assertNotMatch(String topicFilter, String topicName) { @Test public void testOverlappingSubscriptions() { - Subscription genericSub = new Subscription("Sensor1", asTopic("a/+"), MqttQoS.AT_MOST_ONCE); + Subscription genericSub = new Subscription("Sensor1", asTopic("a/+"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); this.sessionsRepository.addNewSubscription(genericSub); - sut.add(genericSub.clientId, genericSub.topicFilter, genericSub.getRequestedQos()); + sut.add(genericSub.clientId, genericSub.topicFilter, genericSub.option()); - Subscription specificSub = new Subscription("Sensor1", asTopic("a/b"), MqttQoS.AT_MOST_ONCE); + Subscription specificSub = new Subscription("Sensor1", asTopic("a/b"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); this.sessionsRepository.addNewSubscription(specificSub); - sut.add(specificSub.clientId, specificSub.topicFilter, specificSub.getRequestedQos()); + sut.add(specificSub.clientId, specificSub.topicFilter, specificSub.option()); //Exercise final List matchingForSpecific = sut.matchQosSharpening(asTopic("a/b")); @@ -233,13 +235,13 @@ public void removeSubscription_sameClients_subscribedSameTopic() { */ @Test public void duplicatedSubscriptionsWithDifferentQos() { - Subscription client2Sub = new Subscription("client2", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); - this.sut.add("client2", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); - Subscription client1SubQoS0 = new Subscription("client1", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); - this.sut.add("client1", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); + Subscription client2Sub = new Subscription("client2", asTopic("client/test/b"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); + this.sut.add("client2", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); + Subscription client1SubQoS0 = new Subscription("client1", asTopic("client/test/b"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); + this.sut.add("client1", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); - Subscription client1SubQoS2 = new Subscription("client1", asTopic("client/test/b"), MqttQoS.EXACTLY_ONCE); - this.sut.add("client1", asTopic("client/test/b"), MqttQoS.EXACTLY_ONCE); + Subscription client1SubQoS2 = new Subscription("client1", asTopic("client/test/b"), MqttSubscriptionOption.onlyFromQos(MqttQoS.EXACTLY_ONCE)); + this.sut.add("client1", asTopic("client/test/b"), asOption(MqttQoS.EXACTLY_ONCE)); // Verify List subscriptions = this.sut.matchQosSharpening(asTopic("client/test/b")); @@ -253,23 +255,23 @@ public void duplicatedSubscriptionsWithDifferentQos() { assertTrue(matchingClient1Sub.isPresent()); Subscription client1Sub = matchingClient1Sub.get(); - assertThat(client1SubQoS0.getRequestedQos()).isNotEqualTo(client1Sub.getRequestedQos()); + assertThat(client1SubQoS0.option().qos()).isNotEqualTo(client1Sub.option().qos()); // client1SubQoS2 should override client1SubQoS0 - assertThat(client1Sub.getRequestedQos()).isEqualTo(client1SubQoS2.getRequestedQos()); + assertThat(client1Sub.option().qos()).isEqualTo(client1SubQoS2.option().qos()); } @Test public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThenSubscriptionIdIsUpdated() { // subscribe a client on topic with subscription identifier - sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(1)); + sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); // verify it contains the subscription identifier final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1)); // update the subscription of same clientId on same topic filter but with different subscription identifier - sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(123)); + sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(123)); // verify the subscription identifier is updated final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); @@ -287,14 +289,14 @@ private static void verifySubscriptionIdentifierIsPresent(List mat @Test public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscriptionIdIsProcessedThenSubscriptionIdIsWiped() { // subscribe a client on topic with subscription identifier - sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(1)); + sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); // verify it contains the subscription identifier SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); // update the subscription of same clientId on same topic filter but removing subscription identifier - sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); + sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 175f8c6e8..ad85d1c7b 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -18,6 +18,7 @@ import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -121,7 +122,7 @@ public void testAddNewDeepNodes() { } static SubscriptionRequest clientSubOnTopic(String clientID, String topicFilter) { - return SubscriptionRequest.buildNonShared(clientID, asTopic(topicFilter), null); + return SubscriptionRequest.buildNonShared(clientID, asTopic(topicFilter), MqttSubscriptionOption.onlyFromQos(null)); } @Test @@ -193,7 +194,7 @@ public void givenTreeWithSomeNodeHierarchyWhenRemoveContainedSubscriptionThenNod final List matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); //Verify - final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp/2"), MqttQoS.AT_MOST_ONCE); + final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp/2"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); assertThat(matchingSubs).contains(expectedMatchingsub); } @@ -212,9 +213,9 @@ public void givenTreeWithSomeNodeHierarchWhenRemoveContainedSubscriptionSmallerT //Verify // not clear to me, but I believe /temp unsubscribe should not unsub you from downstream /temp/1 or /temp/2 - final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("/temp/1"), MqttQoS.AT_MOST_ONCE); + final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("/temp/1"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); assertThat(matchingSubs1).contains(expectedMatchingsub1); - final Subscription expectedMatchingsub2 = new Subscription("TempSensor1", asTopic("/temp/2"), MqttQoS.AT_MOST_ONCE); + final Subscription expectedMatchingsub2 = new Subscription("TempSensor1", asTopic("/temp/2"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); assertThat(matchingSubs2).contains(expectedMatchingsub2); } @@ -239,7 +240,7 @@ public void testMatchSubscriptionNoWildcards() { final List matchingSubs = sut.recursiveMatch(asTopic("/temp")); //Verify - final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp"), MqttQoS.AT_MOST_ONCE); + final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); assertThat(matchingSubs).contains(expectedMatchingsub); } @@ -255,8 +256,8 @@ public void testRemovalInnerTopicOffRootSameClient() { final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify - final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttQoS.AT_MOST_ONCE); - final Subscription expectedMatchingsub2 = new Subscription("TempSensor1", asTopic("temp/1"), MqttQoS.AT_MOST_ONCE); + final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); + final Subscription expectedMatchingsub2 = new Subscription("TempSensor1", asTopic("temp/1"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); assertThat(matchingSubs1).contains(expectedMatchingsub1); assertThat(matchingSubs2).contains(expectedMatchingsub2); @@ -283,8 +284,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify - final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttQoS.AT_MOST_ONCE); - final Subscription expectedMatchingsub2 = new Subscription("TempSensor2", asTopic("temp/1"), MqttQoS.AT_MOST_ONCE); + final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); + final Subscription expectedMatchingsub2 = new Subscription("TempSensor2", asTopic("temp/1"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); assertThat(matchingSubs1).contains(expectedMatchingsub1); assertThat(matchingSubs2).contains(expectedMatchingsub2); @@ -311,8 +312,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify - final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttQoS.AT_MOST_ONCE); - final Subscription expectedMatchingsub2 = new Subscription("TempSensor2", asTopic("temp/1"), MqttQoS.AT_MOST_ONCE); + final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); + final Subscription expectedMatchingsub2 = new Subscription("TempSensor2", asTopic("temp/1"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); assertThat(matchingSubs1).contains(expectedMatchingsub1); assertThat(matchingSubs2).contains(expectedMatchingsub2); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/SubscriptionTestUtils.java b/broker/src/test/java/io/moquette/broker/subscriptions/SubscriptionTestUtils.java index d6ad415a2..65eea5556 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/SubscriptionTestUtils.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/SubscriptionTestUtils.java @@ -15,6 +15,7 @@ */ package io.moquette.broker.subscriptions; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.jetbrains.annotations.NotNull; import static io.moquette.broker.subscriptions.Topic.asTopic; @@ -22,11 +23,11 @@ public class SubscriptionTestUtils { @NotNull static Subscription asSubscription(String clientId, String topicFilter) { - return new Subscription(clientId, asTopic(topicFilter), null); + return new Subscription(clientId, asTopic(topicFilter), MqttSubscriptionOption.onlyFromQos(null)); } @NotNull static Subscription asSubscription(String clientId, String topicFilter, String shareName) { - return new Subscription(clientId, asTopic(topicFilter), null, shareName); + return new Subscription(clientId, asTopic(topicFilter), MqttSubscriptionOption.onlyFromQos(null), shareName); } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java index 6002516eb..b0baf4350 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionWithIdentifierTest.java @@ -30,7 +30,7 @@ public void givenNonSharedSubscriptionWithIdentifierWhenPublishMatchedThenReceiv // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("/metrics/measures/temp", - MqttQoS.AT_LEAST_ONCE, 123); + MqttQoS.AT_LEAST_ONCE, 123, 400, TimeUnit.MILLISECONDS); verifyOfType(received, MqttMessageType.SUBACK); Mqtt5BlockingClient publisher = createPublisherClient(); diff --git a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java index 48cbf3f23..916f9eefb 100644 --- a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java +++ b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java @@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -145,7 +146,7 @@ public void testNotifyTopicPublished() throws Exception { @Test public void testNotifyTopicSubscribed() throws Exception { - interceptor.notifyTopicSubscribed(new Subscription("cli1", new Topic("o2"), MqttQoS.AT_MOST_ONCE), "cli1234"); + interceptor.notifyTopicSubscribed(new Subscription("cli1", new Topic("o2"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)), "cli1234"); interval(); assertEquals(70, n.get()); } @@ -165,7 +166,7 @@ public void testAddAndRemoveInterceptHandler() throws Exception { interceptor.addInterceptHandler(interceptHandlerMock1); interceptor.addInterceptHandler(interceptHandlerMock2); - Subscription subscription = new Subscription("cli1", new Topic("o2"), MqttQoS.AT_MOST_ONCE); + Subscription subscription = new Subscription("cli1", new Topic("o2"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); interceptor.notifyTopicSubscribed(subscription, "cli1234"); interval(); diff --git a/broker/src/test/java/io/moquette/persistence/H2SubscriptionsRepositorySharedSubscriptionsTest.java b/broker/src/test/java/io/moquette/persistence/H2SubscriptionsRepositorySharedSubscriptionsTest.java index 11595e7eb..e39ff0e5d 100644 --- a/broker/src/test/java/io/moquette/persistence/H2SubscriptionsRepositorySharedSubscriptionsTest.java +++ b/broker/src/test/java/io/moquette/persistence/H2SubscriptionsRepositorySharedSubscriptionsTest.java @@ -6,6 +6,7 @@ import io.moquette.broker.subscriptions.SubscriptionIdentifier; import io.moquette.broker.subscriptions.Topic; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,7 +31,7 @@ public void setUp() { @Test public void givenNewSubscriptionWhenItsStoredThenCanGetRetrieved() { Subscription subscription = new Subscription("subscriber", Topic.asTopic("metering/temperature"), - MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(1)); + MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); sut.addNewSubscription(subscription); // verify deserialize @@ -42,8 +43,9 @@ public void givenNewSubscriptionWhenItsStoredThenCanGetRetrieved() { @Test public void givenNewSharedSubscriptionWhenItsStoredThenCanGetRetrieved() { + MqttSubscriptionOption option = MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE); sut.addNewSharedSubscription("subscriber", new ShareName("thermometers"), - Topic.asTopic("/first_floor/living/temp"), MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(1)); + Topic.asTopic("/first_floor/living/temp"), option, new SubscriptionIdentifier(1)); // verify deserialize Collection subs = sut.listAllSharedSubscription(); @@ -55,8 +57,9 @@ public void givenNewSharedSubscriptionWhenItsStoredThenCanGetRetrieved() { @Test public void givenAPersistedSharedSubscriptionWhenListedThenItAppears() { + MqttSubscriptionOption op = MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE); sut.addNewSharedSubscription("subscriber", new ShareName("thermometers"), - Topic.asTopic("/first_floor/living/temp"), MqttQoS.AT_MOST_ONCE); + Topic.asTopic("/first_floor/living/temp"), op); Collection subscriptions = sut.listAllSharedSubscription(); assertThat(subscriptions).hasSize(1); @@ -68,8 +71,9 @@ public void givenAPersistedSharedSubscriptionWhenListedThenItAppears() { @Test public void givenAPersistedSubscriptionWhenItsDeletedThenItNotAnymoreListed() { + MqttSubscriptionOption option = MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE); sut.addNewSharedSubscription("subscriber", new ShareName("thermometers"), - Topic.asTopic("/first_floor/living/temp"), MqttQoS.AT_MOST_ONCE); + Topic.asTopic("/first_floor/living/temp"), option); assertThat(sut.listAllSharedSubscription()).hasSize(1); // remove the shared subscription @@ -83,12 +87,13 @@ public void givenAPersistedSubscriptionWhenItsDeletedThenItNotAnymoreListed() { @Test public void givenMultipleSharedSubscriptionForSameClientIdWhenTheyAreRemovedInBlockThenArentAnymoreListed() { String clientId = "subscriber"; + MqttSubscriptionOption atMostOnceOption = MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE); sut.addNewSharedSubscription(clientId, new ShareName("thermometers"), - Topic.asTopic("/first_floor/living/temp"), MqttQoS.AT_MOST_ONCE); + Topic.asTopic("/first_floor/living/temp"), atMostOnceOption); sut.addNewSharedSubscription(clientId, new ShareName("anemometers"), - Topic.asTopic("/garden/wind/speed"), MqttQoS.AT_MOST_ONCE); + Topic.asTopic("/garden/wind/speed"), atMostOnceOption); sut.addNewSharedSubscription(clientId, new ShareName("anemometers"), - Topic.asTopic("/garden/wind/direction"), MqttQoS.AT_MOST_ONCE); + Topic.asTopic("/garden/wind/direction"), atMostOnceOption); assertThat(sut.listAllSharedSubscription()).hasSize(3); // remove all shared subscriptions for client diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index df1e5d02e..7d907d019 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -30,7 +30,6 @@ import java.nio.charset.Charset; import java.time.Duration; import java.util.Optional; -import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -190,7 +189,7 @@ public MqttSubAckMessage subscribe(String topic1, MqttQoS qos1, String topic2, M .addSubscription(qos2, topic2) .build(); - return doSubscribeWithAckCasting(subscribeMessage); + return doSubscribeWithAckCasting(subscribeMessage, 200, TimeUnit.MILLISECONDS); } public MqttSubAckMessage subscribe(String topic, MqttQoS qos) { @@ -199,10 +198,16 @@ public MqttSubAckMessage subscribe(String topic, MqttQoS qos) { .addSubscription(qos, topic) .build(); - return doSubscribeWithAckCasting(subscribeMessage); + return doSubscribeWithAckCasting(subscribeMessage, 200, TimeUnit.MILLISECONDS); } public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier) { + return subscribeWithIdentifier(topic, qos, subscriptionIdentifier, 200, TimeUnit.MILLISECONDS); + } + + @NotNull + public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int subscriptionIdentifier, + int timeout, TimeUnit timeUnit) { MqttProperties subProps = new MqttProperties(); subProps.add(new MqttProperties.IntegerProperty( MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(), @@ -214,12 +219,12 @@ public MqttSubAckMessage subscribeWithIdentifier(String topic, MqttQoS qos, int .properties(subProps) .build(); - return doSubscribeWithAckCasting(subscribeMessage); + return doSubscribeWithAckCasting(subscribeMessage, timeout, timeUnit); } @NotNull - private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscribeMessage) { - doSubscribe(subscribeMessage); + private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscribeMessage, int timeout, TimeUnit timeUnit) { + doSubscribe(subscribeMessage, timeout, timeUnit); final MqttMessage subAckMessage = this.receivedMsg.get(); if (!(subAckMessage instanceof MqttSubAckMessage)) { @@ -229,7 +234,7 @@ private MqttSubAckMessage doSubscribeWithAckCasting(MqttSubscribeMessage subscri return (MqttSubAckMessage) subAckMessage; } - private void doSubscribe(MqttSubscribeMessage subscribeMessage) { + private void doSubscribe(MqttSubscribeMessage subscribeMessage, int timeout, TimeUnit timeUnit) { final CountDownLatch subscribeAckLatch = new CountDownLatch(1); this.setCallback(msg -> { receivedMsg.getAndSet(msg); @@ -246,13 +251,13 @@ private void doSubscribe(MqttSubscribeMessage subscribeMessage) { boolean waitElapsed; try { - waitElapsed = !subscribeAckLatch.await(200, TimeUnit.MILLISECONDS); + waitElapsed = !subscribeAckLatch.await(timeout, timeUnit); } catch (InterruptedException e) { throw new RuntimeException("Interrupted while waiting", e); } if (waitElapsed) { - throw new RuntimeException("Cannot receive SubscribeAck in 200 ms"); + throw new RuntimeException("Cannot receive SubscribeAck in " + timeout + " " + timeUnit); } } @@ -262,7 +267,7 @@ public MqttMessage subscribeWithError(String topic, MqttQoS qos) { .addSubscription(qos, topic) .build(); - doSubscribe(subscribeMessage); + doSubscribe(subscribeMessage, 200, TimeUnit.MILLISECONDS); return this.receivedMsg.get(); }