diff --git a/ChangeLog.txt b/ChangeLog.txt index 8de9e3d4a..e3a5196b2 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,6 +1,7 @@ Version 0.18-SNAPSHOT: [feature] subscription identifiers: (issue #801) - Implements the validation of subscription identifier properties in SUBSCRIBE. (#803) + - Store and retrieve the subscription identifier into the subscriptions directory. (#804) [feature] shared subscriptions: - Initial implementation of shared subscription subscribe and publish part. (#796) - Added unsubscribe of shared subscriptions. (#799) diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index cbc3be50f..db6705359 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -112,7 +112,7 @@ private List sharedSubscriptions() { // select a subscription randomly int randIdx = SECURE_RANDOM.nextInt(list.size()); SharedSubscription sub = list.get(randIdx); - selectedSubscriptions.add(new Subscription(sub.clientId(), sub.topicFilter(), sub.requestedQoS(), shareName)); + selectedSubscriptions.add(sub.createSubscription()); } return selectedSubscriptions; } @@ -144,17 +144,33 @@ CNode addSubscription(SubscriptionRequest request) { if (idx >= 0) { // Subscription already exists final Subscription existing = subscriptions.get(idx); - if (existing.getRequestedQos().value() < newSubscription.getRequestedQos().value()) { + if (needsToUpdateExistingSubscription(newSubscription, existing)) { subscriptions.set(idx, newSubscription); } } else { // insert into the expected index so that the sorting is maintained - this.subscriptions.add(-1 - idx, new Subscription(newSubscription)); + this.subscriptions.add(-1 - idx, newSubscription); } } return this; } + private static boolean needsToUpdateExistingSubscription(Subscription newSubscription, Subscription existing) { + if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) && + newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier()) + ) { + // if subscription identifier hasn't changed, + // then check QoS but don't lower the requested QoS level + return existing.getRequestedQos().value() < newSubscription.getRequestedQos().value(); + } + + // subscription identifier changed + // TODO need to understand if requestedQoS has to be also replaced or not, if not + // the existing QoS has to be copied. This to avoid that a subscription identifier + // change silently break the rule of existing qos never lowered. + return true; + } + /** * @return true iff the subscriptions contained in this node are owned by clientId * AND at least one subscription is actually present for that clientId diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 2675806c3..191e5d577 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -5,7 +5,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.function.Supplier; public class CTrie { @@ -20,11 +22,20 @@ public final static class SubscriptionRequest { private boolean shared = false; private ShareName shareName; + private Optional subscriptionIdOpt; + + private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) { + this.topicFilter = topicFilter; + this.clientId = clientId; + this.requestedQoS = requestedQoS; + this.subscriptionIdOpt = Optional.of(subscriptionId); + } private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS) { this.topicFilter = topicFilter; this.clientId = clientId; this.requestedQoS = requestedQoS; + this.subscriptionIdOpt = Optional.empty(); } public static SubscriptionRequest buildNonShared(Subscription subscription) { @@ -35,12 +46,29 @@ public static SubscriptionRequest buildNonShared(String clientId, Topic topicFil return new SubscriptionRequest(clientId, topicFilter, requestedQoS); } + public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttQoS requestedQoS, + SubscriptionIdentifier subscriptionId) { + Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null"); + return new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId); + } + + public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, + MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) { + Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null"); + return buildSharedHelper(shareName, topicFilter, + () -> new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId)); + } + public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS) { + return buildSharedHelper(shareName, topicFilter, + () -> new SubscriptionRequest(clientId, topicFilter, requestedQoS)); + } + + private static SubscriptionRequest buildSharedHelper(ShareName shareName, Topic topicFilter, Supplier instantiator) { if (topicFilter.headToken().name().startsWith("$share")) { throw new IllegalArgumentException("Topic filter of a shared subscription can't contains $share and share name"); } - - SubscriptionRequest request = new SubscriptionRequest(clientId, topicFilter, requestedQoS); + SubscriptionRequest request = instantiator.get(); request.shared = true; request.shareName = shareName; return request; @@ -50,12 +78,20 @@ public Topic getTopicFilter() { return topicFilter; } + public MqttQoS getRequestedQoS() { + return requestedQoS; + } + public Subscription subscription() { - return new Subscription(clientId, topicFilter, requestedQoS); + return subscriptionIdOpt + .map(subscriptionIdentifier -> new Subscription(clientId, topicFilter, requestedQoS, subscriptionIdentifier)) + .orElseGet(() -> new Subscription(clientId, topicFilter, requestedQoS)); } public SharedSubscription sharedSubscription() { - return new SharedSubscription(shareName, topicFilter, clientId, requestedQoS); + return subscriptionIdOpt + .map(subId -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS, subId)) + .orElseGet(() -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS)); } public boolean isShared() { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index 17dd1db68..8a2f026f8 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -103,31 +103,41 @@ public List matchQosSharpening(Topic topicName) { @Override public void add(String clientId, Topic filter, MqttQoS requestedQoS) { SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS); - ctrie.addToTree(subRequest); - subscriptionsRepository.addNewSubscription(subRequest.subscription()); + addNonSharedSubscriptionRequest(subRequest); } @Override public void add(String clientId, Topic filter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) { - // TODO implement, save the subscription Id into the ctrie - throw new IllegalStateException("Implement this"); + SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS, subscriptionId); + addNonSharedSubscriptionRequest(subRequest); + } + + private void addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) { + ctrie.addToTree(subRequest); + subscriptionsRepository.addNewSubscription(subRequest.subscription()); } @Override public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS) { SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS); + addSharedSubscriptionRequest(shareSubRequest); + } + + private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) { ctrie.addToTree(shareSubRequest); - subscriptionsRepository.addNewSharedSubscription(clientId, name, topicFilter, requestedQoS); + subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(), + shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS()); - List sharedSubscriptions = clientSharedSubscriptions.computeIfAbsent(clientId, unused -> new ArrayList<>()); + List sharedSubscriptions = clientSharedSubscriptions.computeIfAbsent(shareSubRequest.getClientId(), unused -> new ArrayList<>()); sharedSubscriptions.add(shareSubRequest.sharedSubscription()); } @Override - public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) { - // TODO implement, save the subscription Id into the ctrie - throw new IllegalStateException("Implement this"); + public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS, + SubscriptionIdentifier subscriptionId) { + SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS, subscriptionId); + addSharedSubscriptionRequest(shareSubRequest); } /** diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java b/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java index fb93ee409..ed5227a0d 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java @@ -18,6 +18,7 @@ import io.netty.handler.codec.mqtt.MqttQoS; import java.util.Objects; +import java.util.Optional; /** * Shared subscription data class. @@ -27,6 +28,7 @@ public final class SharedSubscription implements Comparable private final Topic topicFilter; private final String clientId; private final MqttQoS requestedQoS; + private final Optional subscriptionId; public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS) { Objects.requireNonNull(requestedQoS, "qos parameter can't be null"); @@ -34,6 +36,17 @@ public SharedSubscription(ShareName shareName, Topic topicFilter, String clientI this.topicFilter = topicFilter; this.clientId = clientId; this.requestedQoS = requestedQoS; + this.subscriptionId = Optional.empty(); + } + + public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS + , SubscriptionIdentifier subscriptionId) { + Objects.requireNonNull(requestedQoS, "qos parameter can't be null"); + this.shareName = shareName; + this.topicFilter = topicFilter; + this.clientId = clientId; + this.requestedQoS = requestedQoS; + this.subscriptionId = Optional.of(subscriptionId); } public String clientId() { @@ -52,6 +65,18 @@ public ShareName getShareName() { return shareName; } + /** + * Create a new Subscription instance from the data present in SharedSubscription + * */ + Subscription createSubscription() { + if (subscriptionId.isPresent()) { + return new Subscription(clientId, topicFilter, requestedQoS, shareName.getShareName(), subscriptionId.get()); + } else { + return new Subscription(clientId, topicFilter, requestedQoS, shareName.getShareName()); + } + } + + @Override public int compareTo(SharedSubscription o) { return this.clientId.compareTo(o.clientId); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java b/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java index 072c3ef61..195de3d69 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Objects; +import java.util.Optional; /** * Maintain the information about which Topic a certain ClientID is subscribed and at which QoS @@ -32,15 +33,28 @@ public final class Subscription implements Serializable, Comparable subscriptionId; + public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos) { - this(clientId, topicFilter, requestedQos, ""); + this(clientId, topicFilter, requestedQos, "", null); + } + + public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, SubscriptionIdentifier subscriptionId) { + this(clientId, topicFilter, requestedQos, "", subscriptionId); } public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, String shareName) { + this(clientId, topicFilter, requestedQos, shareName, null); + } + + public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, String shareName, + SubscriptionIdentifier subscriptionId) { this.requestedQos = requestedQos; this.clientId = clientId; this.topicFilter = topicFilter; this.shareName = shareName; + this.subscriptionId = Optional.ofNullable(subscriptionId); } public Subscription(Subscription orig) { @@ -48,6 +62,7 @@ public Subscription(Subscription orig) { this.clientId = orig.clientId; this.topicFilter = orig.topicFilter; this.shareName = orig.shareName; + this.subscriptionId = orig.subscriptionId; } public String getClientId() { @@ -66,6 +81,14 @@ public boolean qosLessThan(Subscription sub) { return requestedQos.value() < sub.requestedQos.value(); } + public boolean hasSubscriptionIdentifier() { + return subscriptionId.isPresent(); + } + + public SubscriptionIdentifier getSubscriptionIdentifier() { + return subscriptionId.get(); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionIdentifier.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionIdentifier.java index f0210adf0..c58bc1f15 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionIdentifier.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionIdentifier.java @@ -1,5 +1,7 @@ package io.moquette.broker.subscriptions; +import java.util.Objects; + /** * Models the subscription identifier for MQTT5 Subscription. * */ @@ -17,5 +19,21 @@ public int value() { return subscriptionId; } + @Override + public String toString() { + return "SubscriptionIdentifier: " + subscriptionId; + } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SubscriptionIdentifier that = (SubscriptionIdentifier) o; + return subscriptionId == that.subscriptionId; + } + + @Override + public int hashCode() { + return Objects.hash(subscriptionId); + } } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java index 8ea8252b2..3c9a8eb0b 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java @@ -20,6 +20,10 @@ import java.util.List; import static io.moquette.broker.subscriptions.Topic.asTopic; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class CTrieSharedSubscriptionDirectoryMatchingTest extends CTrieSubscriptionDirectMatchingCommon { @@ -65,4 +69,53 @@ public void givenSessionHasMultipleSharedSubscriptionWhenTheClientIsRemovedThenN List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions).isEmpty(); } + + @Test + public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThenSubscriptionIdIsUpdated() { + // subscribe a client on topic with subscription identifier + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, + new SubscriptionIdentifier(1)); + + // verify it contains the subscription identifier + final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1), "share_temp"); + + // update the subscription of same clientId on same topic filter but with different subscription identifier + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, + new SubscriptionIdentifier(123)); + + // verify the subscription identifier is updated + final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123), "share_temp"); + } + + private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) { + assertAll("subscription contains the subscription identifier", + () -> assertEquals(1, matchingSubscriptions.size()), + () -> assertEquals(expectedShareName, matchingSubscriptions.iterator().next().shareName), + () -> assertTrue(matchingSubscriptions.iterator().next().hasSubscriptionIdentifier()), + () -> assertEquals(subscriptionIdentifier, matchingSubscriptions.iterator().next().getSubscriptionIdentifier()) + ); + } + + @Test + public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscriptionIdIsProcessedThenSubscriptionIdIsWiped() { + // subscribe a client on topic with subscription identifier + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, + new SubscriptionIdentifier(1)); + + // verify it contains the subscription identifier + SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); + verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); + + // update the subscription of same clientId on same topic filter but removing subscription identifier + sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); + + // verify the subscription identifier is removed + final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + assertAll("subscription doesn't contain subscription identifier", + () -> assertEquals(1, reloadedSubscriptions.size()), + () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) + ); + } } diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java index 54eb0109a..5c3d79bca 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java @@ -19,7 +19,6 @@ import io.moquette.broker.ISubscriptionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.netty.handler.codec.mqtt.MqttQoS; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; @@ -27,8 +26,7 @@ import static io.moquette.broker.subscriptions.Topic.asTopic; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; public class CTrieSubscriptionDirectoryMatchingTest extends CTrieSubscriptionDirectMatchingCommon { @@ -260,4 +258,49 @@ public void duplicatedSubscriptionsWithDifferentQos() { // client1SubQoS2 should override client1SubQoS0 assertThat(client1Sub.getRequestedQos()).isEqualTo(client1SubQoS2.getRequestedQos()); } + + @Test + public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThenSubscriptionIdIsUpdated() { + // subscribe a client on topic with subscription identifier + sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(1)); + + // verify it contains the subscription identifier + final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1)); + + // update the subscription of same clientId on same topic filter but with different subscription identifier + sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(123)); + + // verify the subscription identifier is updated + final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123)); + } + + private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier) { + assertAll("subscription contains the subscription identifier", + () -> assertEquals(1, matchingSubscriptions.size()), + () -> assertTrue(matchingSubscriptions.iterator().next().hasSubscriptionIdentifier()), + () -> assertEquals(subscriptionIdentifier, matchingSubscriptions.iterator().next().getSubscriptionIdentifier()) + ); + } + + @Test + public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscriptionIdIsProcessedThenSubscriptionIdIsWiped() { + // subscribe a client on topic with subscription identifier + sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE, new SubscriptionIdentifier(1)); + + // verify it contains the subscription identifier + SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); + verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); + + // update the subscription of same clientId on same topic filter but removing subscription identifier + sut.add("client", asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE); + + // verify the subscription identifier is removed + final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + assertAll("subscription doesn't contain subscription identifier", + () -> assertEquals(1, reloadedSubscriptions.size()), + () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) + ); + } }