Skip to content

Commit

Permalink
fix(datastore): improve performances on messages store
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese committed Dec 5, 2023
1 parent 5c0ea6c commit 7cfbf78
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.kapua.service.datastore.internal.model.DataIndexBy;
import org.eclipse.kapua.service.datastore.internal.model.DatastoreMessageImpl;
import org.eclipse.kapua.service.datastore.internal.model.MessageListResultImpl;
import org.eclipse.kapua.service.datastore.internal.model.MessageUniquenessCheck;
import org.eclipse.kapua.service.datastore.internal.model.query.ChannelInfoQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.ClientInfoQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.MessageQueryImpl;
Expand Down Expand Up @@ -148,8 +149,10 @@ public StorableId store(KapuaMessage<?, ?> message, String messageId, boolean ne
String indexName = schemaMetadata.getDataIndexName();
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);

if (!newInsert) {
DatastoreMessage datastoreMessage = find(message.getScopeId(), STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT);
if (!newInsert && !MessageUniquenessCheck.NONE.equals(accountServicePlan.getMessageUniquenessCheck())) {
DatastoreMessage datastoreMessage = MessageUniquenessCheck.FULL.equals(accountServicePlan.getMessageUniquenessCheck()) ?
find(message.getScopeId(), STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT) :
find(message.getScopeId(), indexName, STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT);

Check warning on line 155 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java#L154-L155

Added lines #L154 - L155 were not covered by tests
if (datastoreMessage != null) {
LOG.debug("Message with datatstore id '{}' already found", messageId);
metrics.getAlreadyInTheDatastore().inc();
Expand Down Expand Up @@ -243,7 +246,23 @@ public void delete(KapuaId scopeId, StorableId id)
* @throws QueryMappingException
* @throws ClientException
*/
public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle)
public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle) throws KapuaIllegalArgumentException, ClientException {
ArgumentValidator.notNull(scopeId, SCOPE_ID);
return find(scopeId, SchemaUtil.getDataIndexName(scopeId), id, fetchStyle);

Check warning on line 251 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageStoreFacade.java#L250-L251

Added lines #L250 - L251 were not covered by tests
}

/**
* Find message by identifier
*
* @param scopeId
* @param id
* @param fetchStyle
* @return
* @throws KapuaIllegalArgumentException
* @throws QueryMappingException
* @throws ClientException
*/
public DatastoreMessage find(KapuaId scopeId, String indexName, StorableId id, StorableFetchStyle fetchStyle)
throws KapuaIllegalArgumentException, ClientException {

ArgumentValidator.notNull(scopeId, SCOPE_ID);
Expand All @@ -257,7 +276,7 @@ public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle
idsPredicate.addId(id);
idsQuery.setPredicate(idsPredicate);

String indexName = SchemaUtil.getDataIndexName(scopeId);
// String indexName = SchemaUtil.getDataIndexName(scopeId);
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
return getElasticsearchClient().find(typeDescriptor, idsQuery, DatastoreMessage.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import org.eclipse.kapua.commons.util.KapuaDateUtils;
import org.eclipse.kapua.service.datastore.internal.model.DataIndexBy;
import org.eclipse.kapua.service.datastore.internal.model.MessageUniquenessCheck;
import org.eclipse.kapua.service.datastore.internal.model.metric.MetricsIndexBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,12 @@ public class MessageStoreConfiguration {
*/
public static final String CONFIGURATION_METRICS_INDEX_BY_KEY = "metricsIndexBy";

/**
* Message Uniqueness Check key (available options are in MessageUniquenessCheck enumeration).<br>
* <b>The key must be aligned with the key used in org.eclipse.kapua.service.datastore.MessageStoreService.xml meta data configuration file).</b>
*/
public static final String CONFIGURATION_MESSAGE_UNIQUENESS_CHECK = "messageUniquenessCheck";

/**
* Defines a value in service plan as unlimited resource
*/
Expand All @@ -87,6 +94,7 @@ public class MessageStoreConfiguration {
private long rxByteLimit = 1000000;
private DataIndexBy dataIndexBy = DataIndexBy.SERVER_TIMESTAMP;
private MetricsIndexBy metricsIndexBy = MetricsIndexBy.TIMESTAMP;
private MessageUniquenessCheck messageUniquenessCheck;

private Map<String, Object> values;

Expand Down Expand Up @@ -120,6 +128,9 @@ public MessageStoreConfiguration(Map<String, Object> values) {
if (this.values.get(CONFIGURATION_METRICS_INDEX_BY_KEY) != null) {
setMetricsIndexBy(MetricsIndexBy.valueOf((String) this.values.get(CONFIGURATION_METRICS_INDEX_BY_KEY)));
}
if (this.values.get(CONFIGURATION_MESSAGE_UNIQUENESS_CHECK) != null) {
setMessageUniquenessCheck(MessageUniquenessCheck.valueOf((String) this.values.get(CONFIGURATION_MESSAGE_UNIQUENESS_CHECK)));

Check warning on line 132 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java#L132

Added line #L132 was not covered by tests
}
}
}

Expand Down Expand Up @@ -231,4 +242,20 @@ public MetricsIndexBy getMetricsIndexBy() {
public void setMetricsIndexBy(MetricsIndexBy metricsIndexBy) {
this.metricsIndexBy = metricsIndexBy;
}

/**
* Get the message uniqueness check parameter ({@link MessageStoreConfiguration#CONFIGURATION_MESSAGE_UNIQUENESS_CHECK}
* @return
*/
public MessageUniquenessCheck getMessageUniquenessCheck() {
return messageUniquenessCheck;

Check warning on line 251 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java#L251

Added line #L251 was not covered by tests
}

/**
* Set the message uniqueness check parameter ({@link MessageStoreConfiguration#CONFIGURATION_MESSAGE_UNIQUENESS_CHECK}
*/
public void setMessageUniquenessCheck(MessageUniquenessCheck messageUniquenessCheck) {
this.messageUniquenessCheck = messageUniquenessCheck;
}

Check warning on line 259 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/MessageStoreConfiguration.java#L258-L259

Added lines #L258 - L259 were not covered by tests

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*******************************************************************************
* Copyright (c) 2022 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.service.datastore.internal.model;

/**
* Once a message is going to be stored to datastore the store call can terminate with error on client side (due to timeout for example) but performed on server side.
* The store call is then retried and the message could be inserted twice.
* To avoid that, the current implementation does a query looking for a message with a specific id (the one from the message) in all the indexes belonging to the account.
* This is safer since changes in the message indexing by or the settings of the datastore (index by week/day/hour) can affect the index where the message should be stored to and then the effectivness of the check.
* But this has a performance drawback. The number of queries to be performed are linear with the indexes available so, if there are a lot of indexes, the query will need more time and resources to be executed.
* This enum define a new parameter to change the search behavior by account.
*/
public enum MessageUniquenessCheck {

Check warning on line 23 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java#L23

Added line #L23 was not covered by tests

/**
* No check
*/
NONE,

Check warning on line 28 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java#L28

Added line #L28 was not covered by tests
/**
* The search is done only to the index where the message is expected to be, based on current configuration.
*/
BOUND,

Check warning on line 32 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java#L32

Added line #L32 was not covered by tests
/**
* Will check in all the indexes defined for the account
*/
FULL

Check warning on line 36 in service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java

View check run for this annotation

Codecov / codecov/patch

service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/model/MessageUniquenessCheck.java#L36

Added line #L36 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@
<Option label="SERVER_TIMESTAMP" value="SERVER_TIMESTAMP" />
</AD>

<AD id="messageUniquenessCheck"
name="messageUniquenessCheck"
type="String"
cardinality="0"
required="true"
default="STRONG"
description="Message uniqueness check type (on telemetry message datastore insert)">
<Option label="NONE" value="NONE" />
<Option label="BOUND" value="BOUND" />
<Option label="FULL" value="FULL" />
</AD>

</OCD>

<Designate pid="org.eclipse.kapua.service.datastore.MessageStoreService">
Expand Down

0 comments on commit 7cfbf78

Please sign in to comment.