Skip to content

Commit

Permalink
Implements serialization and deserialization for Subscription in H2 (m…
Browse files Browse the repository at this point in the history
…oquette-io#805)

Update H2 storage to persist and reload subscription identifiers for shared and non-shared subscriptions.

Adds subscription identifier to SubscriptionRequest, SharedSubscription classes.
Updates H2SubscriptionsRepository to persists and reload subscription identifier for shared and non-shared subscriptions.
Updates CTrieSubscriptionDirectory to store subscription identifier contained in SharedSubscription into the H2 storage.
  • Loading branch information
andsel authored Jan 6, 2024
1 parent 8480e22 commit 6a8f174
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 37 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Version 0.18-SNAPSHOT:
[feature] subscription identifiers: (issue #801)
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)
- Update H2 storage to persist and reload subscription identifiers for shared and non-shared subscriptions. (#805)
[feature] shared subscriptions:
- Initial implementation of shared subscription subscribe and publish part. (#796)
- Added unsubscribe of shared subscriptions. (#799)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*/
package io.moquette.broker;

import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.SharedSubscription;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.subscriptions.*;
import io.netty.handler.codec.mqtt.MqttQoS;

import java.util.Collection;
Expand All @@ -43,10 +40,16 @@ public interface ISubscriptionsRepository {
void removeSharedSubscription(String clientId, ShareName share, Topic topicFilter);

/**
* Add shared subscription from Storage.
* Add shared subscription to storage.
* */
void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS);

/**
* Add shared subscription with subscription identifier to storage.
* */
void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS,
SubscriptionIdentifier subscriptionIdentifier);

/**
* List all shared subscriptions to re-add to the tree during a restart.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ public String getClientId() {
return clientId;
}

public boolean hasSubscriptionIdentifier() {
return subscriptionIdOpt.isPresent();
}

public SubscriptionIdentifier getSubscriptionIdentifier() {
return subscriptionIdOpt.get();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,13 @@ public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQo
private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) {
ctrie.addToTree(shareSubRequest);

subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(),
shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS());

if (shareSubRequest.hasSubscriptionIdentifier()) {
subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(),
shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS(), shareSubRequest.getSubscriptionIdentifier());
} else {
subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(),
shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS());
}
List<SharedSubscription> sharedSubscriptions = clientSharedSubscriptions.computeIfAbsent(shareSubRequest.getClientId(), unused -> new ArrayList<>());
sharedSubscriptions.add(shareSubRequest.sharedSubscription());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public SharedSubscription(ShareName shareName, Topic topicFilter, String clientI
this.subscriptionId = Optional.empty();
}

public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS
, SubscriptionIdentifier subscriptionId) {
public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS,
SubscriptionIdentifier subscriptionId) {
Objects.requireNonNull(requestedQoS, "qos parameter can't be null");
this.shareName = shareName;
this.topicFilter = topicFilter;
Expand Down Expand Up @@ -76,6 +76,13 @@ Subscription createSubscription() {
}
}

public boolean hasSubscriptionIdentifier() {
return subscriptionId.isPresent();
}

public SubscriptionIdentifier getSubscriptionIdentifier() {
return subscriptionId.get();
}

@Override
public int compareTo(SharedSubscription o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public final class Subscription implements Serializable, Comparable<Subscription
final Topic topicFilter;
final String shareName;

// TODO remove transient when the subscription identifier has to be persisted
private transient final Optional<SubscriptionIdentifier> subscriptionId;
private final Optional<SubscriptionIdentifier> subscriptionId;

public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos) {
this(clientId, topicFilter, requestedQos, "", null);
Expand Down Expand Up @@ -135,4 +134,12 @@ public int compareTo(Subscription o) {
public String clientAndShareName() {
return clientId + (shareName.isEmpty() ? "" : "-" + shareName);
}

public boolean hasShareName() {
return shareName != null;
}

public String getShareName() {
return shareName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.SharedSubscription;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.SubscriptionIdentifier;
import io.moquette.broker.subscriptions.Topic;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.h2.mvstore.Cursor;
Expand All @@ -15,28 +16,37 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class H2SubscriptionsRepository implements ISubscriptionsRepository {

private static final Logger LOG = LoggerFactory.getLogger(H2SubscriptionsRepository.class);
private static final String SUBSCRIPTIONS_MAP = "subscriptions";
private static final String SHARED_SUBSCRIPTIONS_MAP = "shared_subscriptions";
private final MVStore mvStore;
private final MVMap.Builder<Couple<ShareName, Topic>, Integer> submapBuilder;
private final MVMap.Builder<Couple<ShareName, Topic>, QoSSubscriptionId> submapBuilder;

private MVMap<String, Subscription> subscriptions;
// clientId -> shared subscription map name
private MVMap<String, String> sharedSubscriptions;
private final MVMap.Builder<String, Subscription> subscriptionBuilder = new MVMap.Builder<String, Subscription>()
.valueType(new SubscriptionValueType());

H2SubscriptionsRepository(MVStore mvStore) {
this.mvStore = mvStore;

submapBuilder = new MVMap.Builder<Couple<ShareName, Topic>, Integer>()
submapBuilder = new MVMap.Builder<Couple<ShareName, Topic>, QoSSubscriptionId>()
.keyType(new CoupleValueType());

this.subscriptions = mvStore.openMap(SUBSCRIPTIONS_MAP);
this.subscriptions = mvStore.openMap(SUBSCRIPTIONS_MAP, subscriptionBuilder);
sharedSubscriptions = mvStore.openMap(SHARED_SUBSCRIPTIONS_MAP);
}

Expand Down Expand Up @@ -86,7 +96,7 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top
LOG.info("Removing a non existing shared subscription for client: {}", clientId);
return;
}
MVMap<Couple<ShareName, Topic>, Integer> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
MVMap<Couple<ShareName, Topic>, QoSSubscriptionId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
Couple<ShareName, Topic> sharedSubKey = Couple.of(share, topicFilter);

// remove from submap, null means the key didn't exist
Expand All @@ -101,14 +111,41 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top
}
}

private static class QoSSubscriptionId implements Serializable {
final MqttQoS qos;
final Integer subscriptionIdentifier;

public QoSSubscriptionId(MqttQoS qos, int subscriptionIdentifier) {
this.qos = qos;
this.subscriptionIdentifier = subscriptionIdentifier;
}

public QoSSubscriptionId(MqttQoS qos) {
this.qos = qos;
this.subscriptionIdentifier = null;
}
}

@Override
public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS) {
QoSSubscriptionId qosPart = new QoSSubscriptionId(requestedQoS);
storeNewSharedSubscription(clientId, share, topicFilter, qosPart);
}

private void storeNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, QoSSubscriptionId value) {
String sharedSubsMapName = sharedSubscriptions.computeIfAbsent(clientId,
H2SubscriptionsRepository::computeShareSubscriptionSubMap);

// maps the couple (share name, topic) to requested qos
MVMap<Couple<ShareName, Topic>, Integer> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
subMap.put(Couple.of(share, topicFilter), requestedQoS.value());
MVMap<Couple<ShareName, Topic>, QoSSubscriptionId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
subMap.put(Couple.of(share, topicFilter), value);
}

@Override
public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS,
SubscriptionIdentifier subscriptionIdentifier) {
QoSSubscriptionId qosAndSubscriptionIdPart = new QoSSubscriptionId(requestedQoS, subscriptionIdentifier.value());
storeNewSharedSubscription(clientId, share, topicFilter, qosAndSubscriptionIdPart);
}

@Override
Expand All @@ -119,12 +156,21 @@ public Collection<SharedSubscription> listAllSharedSubscription() {
String clientId = entry.getKey();
String sharedSubsMapName = entry.getValue();

MVMap<Couple<ShareName, Topic>, Integer> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
for (Map.Entry<Couple<ShareName, Topic>, Integer> subEntry : subMap.entrySet()) {
MVMap<Couple<ShareName, Topic>, QoSSubscriptionId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
for (Map.Entry<Couple<ShareName, Topic>, QoSSubscriptionId> subEntry : subMap.entrySet()) {
final ShareName shareName = subEntry.getKey().v1;
final Topic topicFilter = subEntry.getKey().v2;
final MqttQoS qos = MqttQoS.valueOf(subEntry.getValue());
result.add(new SharedSubscription(shareName, topicFilter, clientId, qos));
final MqttQoS qos = subEntry.getValue().qos;
SharedSubscription subscription;
if (subEntry.getValue().subscriptionIdentifier == null) {
// without subscription identifier
subscription = new SharedSubscription(shareName, topicFilter, clientId, qos);
} else {
// with subscription identifier
SubscriptionIdentifier subscriptionId = new SubscriptionIdentifier(subEntry.getValue().subscriptionIdentifier);
subscription = new SharedSubscription(shareName, topicFilter, clientId, qos, subscriptionId);
}
result.add(subscription);
}
}

Expand Down Expand Up @@ -170,4 +216,66 @@ public Couple<ShareName, Topic>[] createStorage(int i) {
return new Couple[i];
}
}


private static final class SubscriptionValueType extends BasicDataType<Subscription> {

@Override
public int getMemory(Subscription sub) {
return StringDataType.INSTANCE.getMemory(sub.getClientId()) +
StringDataType.INSTANCE.getMemory(sub.getTopicFilter().toString()) +
1 + // qos
1 + // flag to say if share name is present and/or subscription identifier
(sub.hasShareName() ? StringDataType.INSTANCE.getMemory(sub.getShareName()) : 0) +
(sub.hasSubscriptionIdentifier() ? 4 : 0);
}

@Override
public void write(WriteBuffer buff, Subscription sub) {
StringDataType.INSTANCE.write(buff, sub.getClientId());
StringDataType.INSTANCE.write(buff, sub.getTopicFilter().toString());
buff.put((byte) sub.getRequestedQos().value());
final byte flag = (byte) ((sub.hasShareName() ? 0x1 : 0x0) |
(sub.hasSubscriptionIdentifier() ? 0x2 : 0x0));
buff.put(flag);
if (sub.hasShareName()) {
StringDataType.INSTANCE.write(buff, sub.getShareName());
}
if (sub.hasSubscriptionIdentifier()) {
buff.putInt(sub.getSubscriptionIdentifier().value());
}
}

@Override
public Subscription read(ByteBuffer buff) {
final String clientId = StringDataType.INSTANCE.read(buff);
final String topicFilter = StringDataType.INSTANCE.read(buff);
MqttQoS requestedQoS = MqttQoS.valueOf(buff.get());
byte flag = buff.get();
boolean hasShareName = (flag & (byte) 0x1) > 0;
boolean hasSubscriptionIdentifier = (flag & (byte) 0x2) > 0;

if (hasShareName) {
String shareName = StringDataType.INSTANCE.read(buff);
if (hasSubscriptionIdentifier) {
SubscriptionIdentifier subId = new SubscriptionIdentifier(buff.getInt());
return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS, shareName, subId);
} else {
return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS, shareName);
}
} else {
if (hasSubscriptionIdentifier) {
SubscriptionIdentifier subId = new SubscriptionIdentifier(buff.getInt());
return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS, subId);
} else {
return new Subscription(clientId, Topic.asTopic(topicFilter),requestedQoS);
}
}
}

@Override
public Subscription[] createStorage(int size) {
return new Subscription[size];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.SharedSubscription;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.SubscriptionIdentifier;
import io.moquette.broker.subscriptions.Topic;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;

public class MemorySubscriptionsRepository implements ISubscriptionsRepository {

private static final Logger LOG = LoggerFactory.getLogger(MemorySubscriptionsRepository.class);
private final Set<Subscription> subscriptions = new ConcurrentSkipListSet<>();
private final Map<String, Map<Couple<ShareName, Topic>, MqttQoS>> sharedSubscriptions = new HashMap<>();
private final Map<String, Map<Couple<ShareName, Topic>, SharedSubscription>> sharedSubscriptions = new HashMap<>();

@Override
public Set<Subscription> listAllSubscriptions() {
Expand All @@ -58,7 +65,7 @@ public void removeAllSharedSubscriptions(String clientId) {

@Override
public void removeSharedSubscription(String clientId, ShareName share, Topic topicFilter) {
Map<Couple<ShareName, Topic>, MqttQoS> subsMap = sharedSubscriptions.get(clientId);
Map<Couple<ShareName, Topic>, SharedSubscription> subsMap = sharedSubscriptions.get(clientId);
if (subsMap == null) {
LOG.info("Removing a non existing shared subscription for client: {}", clientId);
return;
Expand All @@ -72,20 +79,28 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top

@Override
public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS) {
Map<Couple<ShareName, Topic>, MqttQoS> subsMap = sharedSubscriptions.computeIfAbsent(clientId, unused -> new HashMap<>());
subsMap.put(Couple.of(share, topicFilter), requestedQoS);
SharedSubscription sharedSub = new SharedSubscription(share, topicFilter, clientId, requestedQoS);
storeNewSharedSubscription(clientId, share, topicFilter, sharedSub);
}

private void storeNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, SharedSubscription sharedSub) {
Map<Couple<ShareName, Topic>, SharedSubscription> subsMap = sharedSubscriptions.computeIfAbsent(clientId, unused -> new HashMap<>());
subsMap.put(Couple.of(share, topicFilter), sharedSub);
}

@Override
public void addNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, MqttQoS requestedQoS,
SubscriptionIdentifier subscriptionIdentifier) {
SharedSubscription sharedSub = new SharedSubscription(share, topicFilter, clientId, requestedQoS, subscriptionIdentifier);
storeNewSharedSubscription(clientId, share, topicFilter, sharedSub);
}

@Override
public Collection<SharedSubscription> listAllSharedSubscription() {
final List<SharedSubscription> result = new ArrayList<>();
for (Map.Entry<String, Map<Couple<ShareName, Topic>, MqttQoS>> entry : sharedSubscriptions.entrySet()) {
final String clientId = entry.getKey();
for (Map.Entry<Couple<ShareName, Topic>, MqttQoS> nestedEntry : entry.getValue().entrySet()) {
final ShareName share = nestedEntry.getKey().v1;
final Topic filter = nestedEntry.getKey().v2;
final MqttQoS qos = nestedEntry.getValue();
result.add(new SharedSubscription(share, filter, clientId, qos));
for (Map.Entry<String, Map<Couple<ShareName, Topic>, SharedSubscription>> entry : sharedSubscriptions.entrySet()) {
for (Map.Entry<Couple<ShareName, Topic>, SharedSubscription> nestedEntry : entry.getValue().entrySet()) {
result.add(nestedEntry.getValue());
}
}
return result;
Expand Down
Loading

0 comments on commit 6a8f174

Please sign in to comment.