Skip to content

Commit

Permalink
Move from qos to subscription option (moquette-io#810)
Browse files Browse the repository at this point in the history
Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage.


- Modified all places where MqttQos was used in subscription and shared subscription to use the broader SubscriptionOption.
- Updated serialization for H2 to serialize all the fields of MqttSubscriptionOption.
- Adapted all the tests.
  • Loading branch information
andsel authored Jan 22, 2024
1 parent cfca15e commit 1cc5db6
Show file tree
Hide file tree
Showing 28 changed files with 318 additions and 203 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
Version 0.18-SNAPSHOT:
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)

[feature] subscription identifiers: (issue #801)
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.moquette.broker.subscriptions.*;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;

import java.util.Collection;
import java.util.Set;
Expand All @@ -42,12 +43,12 @@ public interface ISubscriptionsRepository {
/**
* Add shared subscription to storage.
* */
void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS);
void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option);

/**
* Add shared subscription with subscription identifier to storage.
* */
void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS,
void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttSubscriptionOption option,
SubscriptionIdentifier subscriptionIdentifier);

/**
Expand Down
43 changes: 22 additions & 21 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,20 @@ public void wipeExistingScheduledWill(String clientId) {
private static final class SharedSubscriptionData {
final ShareName name;
final Topic topicFilter;
final MqttQoS requestedQoS;
final MqttSubscriptionOption option;

private SharedSubscriptionData(ShareName name, Topic topicFilter, MqttQoS requestedQoS) {
private SharedSubscriptionData(ShareName name, Topic topicFilter, MqttSubscriptionOption option) {
Objects.requireNonNull(name);
Objects.requireNonNull(topicFilter);
Objects.requireNonNull(requestedQoS);
Objects.requireNonNull(option);
this.name = name;
this.topicFilter = topicFilter;
this.requestedQoS = requestedQoS;
this.option = option;
}

static SharedSubscriptionData fromMqttSubscription(MqttTopicSubscription sub) {
return new SharedSubscriptionData(new ShareName(SharedSubscriptionUtils.extractShareName(sub.topicName())),
Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(sub.topicName())), sub.qualityOfService());
Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(sub.topicName())), sub.option());
}
}

Expand All @@ -284,7 +284,7 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
final Session session = sessionRegistry.retrieve(clientID);

final List<SharedSubscriptionData> sharedSubscriptions;
final Optional<SubscriptionIdentifier> subscritionIdOpt;
final Optional<SubscriptionIdentifier> subscriptionIdOpt;

if (mqttConnection.isProtocolVersion5()) {
sharedSubscriptions = msg.payload().topicSubscriptions().stream()
Expand All @@ -303,14 +303,14 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}

try {
subscritionIdOpt = verifyAndExtractMessageIdentifier(msg);
subscriptionIdOpt = verifyAndExtractMessageIdentifier(msg);
} catch (IllegalArgumentException ex) {
session.disconnectFromBroker();
return;
}
} else {
sharedSubscriptions = Collections.emptyList();
subscritionIdOpt = Optional.empty();
subscriptionIdOpt = Optional.empty();
}

List<MqttTopicSubscription> ackTopics;
Expand All @@ -327,28 +327,29 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
.filter(sub -> !SharedSubscriptionUtils.isSharedSubscription(sub.topicName()))
.map(sub -> {
final Topic topic = new Topic(sub.topicName());
if (subscritionIdOpt.isPresent()) {
return new Subscription(clientID, topic, sub.qualityOfService(), subscritionIdOpt.get());
MqttSubscriptionOption option = MqttSubscriptionOption.onlyFromQos(sub.qualityOfService());
if (subscriptionIdOpt.isPresent()) {
return new Subscription(clientID, topic, option, subscriptionIdOpt.get());
} else {
return new Subscription(clientID, topic, sub.qualityOfService());
return new Subscription(clientID, topic, option);
}
}).collect(Collectors.toList());

for (Subscription subscription : newSubscriptions) {
if (subscritionIdOpt.isPresent()) {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos(),
subscritionIdOpt.get());
if (subscriptionIdOpt.isPresent()) {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option(),
subscriptionIdOpt.get());
} else {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos());
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option());
}
}

for (SharedSubscriptionData sharedSubData : sharedSubscriptions) {
if (subscritionIdOpt.isPresent()) {
subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.requestedQoS,
subscritionIdOpt.get());
if (subscriptionIdOpt.isPresent()) {
subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.option,
subscriptionIdOpt.get());
} else {
subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.requestedQoS);
subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.option);
}
}

Expand Down Expand Up @@ -745,8 +746,8 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
}

static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) {
if (qos.value() > sub.getRequestedQos().value()) {
qos = sub.getRequestedQos();
if (qos.value() > sub.option().qos().value()) {
qos = sub.option().qos();
}
return qos;
}
Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void addSubscriptions(List<Subscription> newSubscriptions) {
}

public void removeSubscription(Topic topic) {
subscriptions.remove(new Subscription(data.clientId(), topic, MqttQoS.EXACTLY_ONCE));
subscriptions.remove(new Subscription(data.clientId(), topic, MqttSubscriptionOption.onlyFromQos(MqttQoS.EXACTLY_ONCE)));
}

public boolean hasWill() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest;
import io.moquette.broker.subscriptions.CTrie.UnsubscribeRequest;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;

import java.security.SecureRandom;
import java.util.ArrayList;
Expand Down Expand Up @@ -161,7 +162,7 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri
) {
// if subscription identifier hasn't changed,
// then check QoS but don't lower the requested QoS level
return existing.getRequestedQos().value() < newSubscription.getRequestedQos().value();
return existing.option().qos().value() < newSubscription.option().qos().value();
}

// subscription identifier changed
Expand Down Expand Up @@ -201,7 +202,7 @@ private boolean containsSharedSubscriptionsForClient(String clientId) {

private static SharedSubscription wrapKey(String clientId) {
MqttQoS unusedQoS = MqttQoS.AT_MOST_ONCE;
return new SharedSubscription(null, Topic.asTopic("unused"), clientId, unusedQoS);
return new SharedSubscription(null, Topic.asTopic("unused"), clientId, MqttSubscriptionOption.onlyFromQos(unusedQoS));
}

//TODO this is equivalent to negate(containsOnly(clientId))
Expand Down
45 changes: 22 additions & 23 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.moquette.broker.subscriptions;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -18,50 +18,49 @@ public final static class SubscriptionRequest {

private final Topic topicFilter;
private final String clientId;
private final MqttQoS requestedQoS;

private final MqttSubscriptionOption option;
private boolean shared = false;
private ShareName shareName;
private Optional<SubscriptionIdentifier> subscriptionIdOpt;

private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) {
private SubscriptionRequest(String clientId, Topic topicFilter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
this.topicFilter = topicFilter;
this.clientId = clientId;
this.requestedQoS = requestedQoS;
this.option = option;
this.subscriptionIdOpt = Optional.of(subscriptionId);
}

private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS) {
private SubscriptionRequest(String clientId, Topic topicFilter, MqttSubscriptionOption option) {
this.topicFilter = topicFilter;
this.clientId = clientId;
this.requestedQoS = requestedQoS;
this.option = option;
this.subscriptionIdOpt = Optional.empty();
}

public static SubscriptionRequest buildNonShared(Subscription subscription) {
return buildNonShared(subscription.clientId, subscription.topicFilter, subscription.getRequestedQos());
return buildNonShared(subscription.clientId, subscription.topicFilter, subscription.option());
}

public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttQoS requestedQoS) {
return new SubscriptionRequest(clientId, topicFilter, requestedQoS);
public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttSubscriptionOption option) {
return new SubscriptionRequest(clientId, topicFilter, option);
}

public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttQoS requestedQoS,
SubscriptionIdentifier subscriptionId) {
public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter,
MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null");
return new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId);
return new SubscriptionRequest(clientId, topicFilter, option, subscriptionId);
}

public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId,
MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) {
MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null");
return buildSharedHelper(shareName, topicFilter,
() -> new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId));
() -> new SubscriptionRequest(clientId, topicFilter, option, subscriptionId));
}

public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS) {
public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, MqttSubscriptionOption option) {
return buildSharedHelper(shareName, topicFilter,
() -> new SubscriptionRequest(clientId, topicFilter, requestedQoS));
() -> buildNonShared(clientId, topicFilter, option));
}

private static SubscriptionRequest buildSharedHelper(ShareName shareName, Topic topicFilter, Supplier<SubscriptionRequest> instantiator) {
Expand All @@ -78,20 +77,20 @@ public Topic getTopicFilter() {
return topicFilter;
}

public MqttQoS getRequestedQoS() {
return requestedQoS;
public MqttSubscriptionOption getOption() {
return option;
}

public Subscription subscription() {
return subscriptionIdOpt
.map(subscriptionIdentifier -> new Subscription(clientId, topicFilter, requestedQoS, subscriptionIdentifier))
.orElseGet(() -> new Subscription(clientId, topicFilter, requestedQoS));
.map(subscriptionIdentifier -> new Subscription(clientId, topicFilter, option, subscriptionIdentifier))
.orElseGet(() -> new Subscription(clientId, topicFilter, option));
}

public SharedSubscription sharedSubscription() {
return subscriptionIdOpt
.map(subId -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS, subId))
.orElseGet(() -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS));
.map(subId -> new SharedSubscription(shareName, topicFilter, clientId, option, subId))
.orElseGet(() -> new SharedSubscription(shareName, topicFilter, clientId, option));
}

public boolean isShared() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest;
import io.moquette.broker.subscriptions.CTrie.UnsubscribeRequest;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -59,7 +60,7 @@ public void init(ISubscriptionsRepository subscriptionsRepository) {
for (SharedSubscription shared : subscriptionsRepository.listAllSharedSubscription()) {
LOG.debug("Re-subscribing shared {}", shared);
ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(),
shared.clientId(), shared.requestedQoS()));
shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS())));
}

if (LOG.isTraceEnabled()) {
Expand Down Expand Up @@ -101,14 +102,14 @@ public List<Subscription> matchQosSharpening(Topic topicName) {
}

@Override
public void add(String clientId, Topic filter, MqttQoS requestedQoS) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS);
public void add(String clientId, Topic filter, MqttSubscriptionOption option) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option);
addNonSharedSubscriptionRequest(subRequest);
}

@Override
public void add(String clientId, Topic filter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS, subscriptionId);
public void add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option, subscriptionId);
addNonSharedSubscriptionRequest(subRequest);
}

Expand All @@ -118,8 +119,8 @@ private void addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) {
}

@Override
public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS) {
SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS);
public void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option) {
SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, option);
addSharedSubscriptionRequest(shareSubRequest);
}

Expand All @@ -128,19 +129,19 @@ private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) {

if (shareSubRequest.hasSubscriptionIdentifier()) {
subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(),
shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS(), shareSubRequest.getSubscriptionIdentifier());
shareSubRequest.getTopicFilter(), shareSubRequest.getOption(), shareSubRequest.getSubscriptionIdentifier());
} else {
subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(),
shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS());
shareSubRequest.getTopicFilter(), shareSubRequest.getOption());
}
List<SharedSubscription> sharedSubscriptions = clientSharedSubscriptions.computeIfAbsent(shareSubRequest.getClientId(), unused -> new ArrayList<>());
sharedSubscriptions.add(shareSubRequest.sharedSubscription());
}

@Override
public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS,
public void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option,
SubscriptionIdentifier subscriptionId) {
SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS, subscriptionId);
SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, option, subscriptionId);
addSharedSubscriptionRequest(shareSubRequest);
}

Expand All @@ -165,7 +166,7 @@ public void removeSharedSubscription(ShareName name, Topic topicFilter, String c

subscriptionsRepository.removeSharedSubscription(clientId, name, topicFilter);

SharedSubscription sharedSubscription = new SharedSubscription(name, topicFilter, clientId, MqttQoS.AT_MOST_ONCE /* UNUSED in compare */);
SharedSubscription sharedSubscription = new SharedSubscription(name, topicFilter, clientId, MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE) /* UNUSED in compare */);
List<SharedSubscription> sharedSubscriptions = clientSharedSubscriptions.get(clientId);
if (sharedSubscriptions != null && !sharedSubscriptions.isEmpty()) {
sharedSubscriptions.remove(sharedSubscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private String prettySubscriptions(CNode node) {
for (Subscription couple : node.subscriptions()) {
subScriptionsStr
.append("{filter=").append(couple.topicFilter).append(", ")
.append("qos=").append(couple.getRequestedQos()).append(", ")
.append("option=").append(couple.option()).append(", ")
.append("client='").append(couple.clientId).append("'}");
counter++;
if (counter < node.subscriptions().size()) {
Expand Down
Loading

0 comments on commit 1cc5db6

Please sign in to comment.