diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java
index 7b1c2ea6b6b..862878d7af2 100644
--- a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java
+++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java
@@ -34,6 +34,7 @@
import org.eclipse.kapua.service.datastore.internal.model.DataIndexBy;
import org.eclipse.kapua.service.datastore.internal.model.DatastoreMessageImpl;
import org.eclipse.kapua.service.datastore.internal.model.MessageListResultImpl;
+import org.eclipse.kapua.service.datastore.internal.model.MessageUniquenessCheck;
import org.eclipse.kapua.service.datastore.internal.model.query.ChannelInfoQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.ClientInfoQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.MessageQueryImpl;
@@ -148,8 +149,10 @@ public StorableId store(KapuaMessage, ?> message, String messageId, boolean ne
String indexName = schemaMetadata.getDataIndexName();
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
- if (!newInsert) {
- DatastoreMessage datastoreMessage = find(message.getScopeId(), STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT);
+ if (!newInsert && !MessageUniquenessCheck.NONE.equals(accountServicePlan.getMessageUniquenessCheck())) {
+ DatastoreMessage datastoreMessage = MessageUniquenessCheck.FULL.equals(accountServicePlan.getMessageUniquenessCheck()) ?
+ find(message.getScopeId(), STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT) :
+ find(message.getScopeId(), indexName, STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT);
if (datastoreMessage != null) {
LOG.debug("Message with datatstore id '{}' already found", messageId);
metrics.getAlreadyInTheDatastore().inc();
@@ -243,7 +246,23 @@ public void delete(KapuaId scopeId, StorableId id)
* @throws QueryMappingException
* @throws ClientException
*/
- public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle)
+ public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle) throws KapuaIllegalArgumentException, ClientException {
+ ArgumentValidator.notNull(scopeId, SCOPE_ID);
+ return find(scopeId, SchemaUtil.getDataIndexName(scopeId), id, fetchStyle);
+ }
+
+ /**
+ * Find message by identifier
+ *
+ * @param scopeId
+ * @param id
+ * @param fetchStyle
+ * @return
+ * @throws KapuaIllegalArgumentException
+ * @throws QueryMappingException
+ * @throws ClientException
+ */
+ public DatastoreMessage find(KapuaId scopeId, String indexName, StorableId id, StorableFetchStyle fetchStyle)
throws KapuaIllegalArgumentException, ClientException {
ArgumentValidator.notNull(scopeId, SCOPE_ID);
@@ -257,7 +276,7 @@ public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle
idsPredicate.addId(id);
idsQuery.setPredicate(idsPredicate);
- String indexName = SchemaUtil.getDataIndexName(scopeId);
+// String indexName = SchemaUtil.getDataIndexName(scopeId);
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
return getElasticsearchClient().find(typeDescriptor, idsQuery, DatastoreMessage.class);
}
diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java
index 88464823ac4..08ddcf2409c 100644
--- a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java
+++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java
@@ -15,6 +15,7 @@
import org.eclipse.kapua.commons.util.KapuaDateUtils;
import org.eclipse.kapua.service.datastore.internal.model.DataIndexBy;
+import org.eclipse.kapua.service.datastore.internal.model.MessageUniquenessCheck;
import org.eclipse.kapua.service.datastore.internal.model.metric.MetricsIndexBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,12 @@ public class MessageStoreConfiguration {
*/
public static final String CONFIGURATION_METRICS_INDEX_BY_KEY = "metricsIndexBy";
+ /**
+ * Message Uniqueness Check key (available options are in MessageUniquenessCheck enumeration).
+ * The key must be aligned with the key used in org.eclipse.kapua.service.datastore.MessageStoreService.xml meta data configuration file).
+ */
+ public static final String CONFIGURATION_MESSAGE_UNIQUENESS_CHECK = "messageUniquenessCheck";
+
/**
* Defines a value in service plan as unlimited resource
*/
@@ -87,6 +94,7 @@ public class MessageStoreConfiguration {
private long rxByteLimit = 1000000;
private DataIndexBy dataIndexBy = DataIndexBy.SERVER_TIMESTAMP;
private MetricsIndexBy metricsIndexBy = MetricsIndexBy.TIMESTAMP;
+ private MessageUniquenessCheck messageUniquenessCheck;
private Map values;
@@ -120,6 +128,9 @@ public MessageStoreConfiguration(Map values) {
if (this.values.get(CONFIGURATION_METRICS_INDEX_BY_KEY) != null) {
setMetricsIndexBy(MetricsIndexBy.valueOf((String) this.values.get(CONFIGURATION_METRICS_INDEX_BY_KEY)));
}
+ if (this.values.get(CONFIGURATION_MESSAGE_UNIQUENESS_CHECK) != null) {
+ setMessageUniquenessCheck(MessageUniquenessCheck.valueOf((String) this.values.get(CONFIGURATION_MESSAGE_UNIQUENESS_CHECK)));
+ }
}
}
@@ -231,4 +242,20 @@ public MetricsIndexBy getMetricsIndexBy() {
public void setMetricsIndexBy(MetricsIndexBy metricsIndexBy) {
this.metricsIndexBy = metricsIndexBy;
}
+
+ /**
+ * Get the message uniqueness check parameter ({@link MessageStoreConfiguration#CONFIGURATION_MESSAGE_UNIQUENESS_CHECK}
+ * @return
+ */
+ public MessageUniquenessCheck getMessageUniquenessCheck() {
+ return messageUniquenessCheck;
+ }
+
+ /**
+ * Set the message uniqueness check parameter ({@link MessageStoreConfiguration#CONFIGURATION_MESSAGE_UNIQUENESS_CHECK}
+ */
+ public void setMessageUniquenessCheck(MessageUniquenessCheck messageUniquenessCheck) {
+ this.messageUniquenessCheck = messageUniquenessCheck;
+ }
+
}
diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java
new file mode 100644
index 00000000000..1580658aaa9
--- /dev/null
+++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Copyright (c) 2022 Eurotech and/or its affiliates and others
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ * Eurotech - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.kapua.service.datastore.internal.model;
+
+/**
+ * Once a message is going to be stored to datastore the store call can terminate with error on client side (due to timeout for example) but performed on server side.
+ * The store call is then retried and the message could be inserted twice.
+ * To avoid that, the current implementation does a query looking for a message with a specific id (the one from the message) in all the indexes belonging to the account.
+ * This is safer since changes in the message indexing by or the settings of the datastore (index by week/day/hour) can affect the index where the message should be stored to and then the effectivness of the check.
+ * But this has a performance drawback. The number of queries to be performed are linear with the indexes available so, if there are a lot of indexes, the query will need more time and resources to be executed.
+ * This enum define a new parameter to change the search behavior by account.
+ */
+public enum MessageUniquenessCheck {
+
+ /**
+ * No check
+ */
+ NONE,
+ /**
+ * The search is done only to the index where the message is expected to be, based on current configuration.
+ */
+ BOUND,
+ /**
+ * Will check in all the indexes defined for the account
+ */
+ FULL
+}
\ No newline at end of file
diff --git a/service/datastore/internal/src/main/resources/META-INF/metatypes/org.eclipse.kapua.service.datastore.MessageStoreService.xml b/service/datastore/internal/src/main/resources/META-INF/metatypes/org.eclipse.kapua.service.datastore.MessageStoreService.xml
index 3c69fa0a0af..2c2093a11df 100644
--- a/service/datastore/internal/src/main/resources/META-INF/metatypes/org.eclipse.kapua.service.datastore.MessageStoreService.xml
+++ b/service/datastore/internal/src/main/resources/META-INF/metatypes/org.eclipse.kapua.service.datastore.MessageStoreService.xml
@@ -56,6 +56,18 @@
+
+
+
+
+
+