Skip to content

Commit

Permalink
Refactor kapua-service-storable-api to improve usability and added a …
Browse files Browse the repository at this point in the history
…bunch of Javadoc to classes

Signed-off-by: coduz <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Oct 5, 2020
1 parent a307d72 commit 88a036f
Show file tree
Hide file tree
Showing 149 changed files with 2,434 additions and 1,700 deletions.
10 changes: 6 additions & 4 deletions assembly/broker/configurations/locator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@
<api>org.eclipse.kapua.service.certificate.info.CertificateInfoService</api>
<api>org.eclipse.kapua.service.certificate.info.CertificateInfoFactory</api>

<api>org.eclipse.kapua.service.datastore.MessageStoreService</api>
<api>org.eclipse.kapua.service.datastore.ClientInfoRegistryService</api>
<api>org.eclipse.kapua.service.datastore.ClientInfoFactory</api>
<api>org.eclipse.kapua.service.datastore.ChannelInfoRegistryService</api>
<api>org.eclipse.kapua.service.datastore.ChannelInfoFactory</api>
<api>org.eclipse.kapua.service.datastore.MessageStoreService</api>
<api>org.eclipse.kapua.service.datastore.MessageStoreFactory</api>
<api>org.eclipse.kapua.service.datastore.MetricInfoRegistryService</api>

<api>org.eclipse.kapua.service.datastore.DatastoreObjectFactory</api>
<api>org.eclipse.kapua.service.datastore.model.query.DatastorePredicateFactory</api>
<api>org.eclipse.kapua.service.datastore.MetricInfoFactory</api>
<api>org.eclipse.kapua.service.datastore.model.query.predicate.DatastorePredicateFactory</api>

<api>org.eclipse.kapua.service.device.call.DeviceCallFactory</api>
<api>org.eclipse.kapua.service.device.call.DeviceMessageFactory</api>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@
*******************************************************************************/
package org.eclipse.kapua.broker.core.message;

import java.util.Date;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Topic;

import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.broker.core.converter.AbstractKapuaConverter;
import org.eclipse.kapua.broker.core.plugin.ConnectorDescriptor;
import org.eclipse.kapua.broker.core.plugin.KapuaSecurityBrokerFilter;
import org.eclipse.kapua.broker.core.plugin.ConnectorDescriptor.MessageType;
import org.eclipse.kapua.broker.core.plugin.KapuaSecurityBrokerFilter;
import org.eclipse.kapua.message.KapuaMessage;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.device.call.message.DeviceMessage;
Expand All @@ -35,6 +29,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Topic;
import java.util.Date;

/**
* Jms message utility class
*
Expand Down Expand Up @@ -109,8 +108,8 @@ public static CamelKapuaMessage<?> convertToKapuaMessage(ConnectorDescriptor con

// TODO check the code with huge messages
private static CamelKapuaMessage<?> convertToKapuaMessage(ConnectorDescriptor connectorDescriptor, Class<? extends DeviceMessage<?, ?>> deviceMessageType,
Class<? extends KapuaMessage<?, ?>> kapuaMessageType, BytesMessage jmsMessage, String jmsTopic,
Date queuedOn, KapuaId connectionId, String clientId)
Class<? extends KapuaMessage<?, ?>> kapuaMessageType, BytesMessage jmsMessage, String jmsTopic,
Date queuedOn, KapuaId connectionId, String clientId)
throws JMSException, KapuaException {
byte[] payload = null;
// TODO JMS message have no size limits!
Expand All @@ -136,7 +135,7 @@ private static CamelKapuaMessage<?> convertToKapuaMessage(ConnectorDescriptor co
* @throws KapuaException
*/
public static CamelKapuaMessage<?> convertToCamelKapuaMessage(ConnectorDescriptor connectorDescriptor, MessageType messageType, byte[] messageBody, String jmsTopic, Date queuedOn,
KapuaId connectionId, String clientId)
KapuaId connectionId, String clientId)
throws KapuaException {
KapuaMessage<?, ?> kapuaMessage = convertToKapuaMessage(connectorDescriptor.getDeviceClass(messageType), connectorDescriptor.getKapuaClass(messageType), messageBody, jmsTopic, queuedOn, clientId);
return new CamelKapuaMessage<>(kapuaMessage, connectionId, connectorDescriptor);
Expand All @@ -154,7 +153,7 @@ public static CamelKapuaMessage<?> convertToCamelKapuaMessage(ConnectorDescripto
* @throws KapuaException
*/
private static KapuaMessage<?, ?> convertToKapuaMessage(Class<? extends DeviceMessage<?, ?>> deviceMessageType, Class<? extends KapuaMessage<?, ?>> kapuaMessageType, byte[] messageBody,
String jmsTopic, Date queuedOn, String clientId)
String jmsTopic, Date queuedOn, String clientId)
throws KapuaException {
// first step... from jms to device dependent protocol level (unknown)
Translator<JmsMessage, DeviceMessage<?, ?>> translatorFromJms = Translator.getTranslatorFor(JmsMessage.class, deviceMessageType);// birth ...
Expand All @@ -178,12 +177,10 @@ public static CamelKapuaMessage<?> convertToCamelKapuaMessage(ConnectorDescripto
* @param kapuaMessage
* @return
* @throws KapuaException
* @throws ClassNotFoundException
*/
public static JmsMessage convertToJmsMessage(ConnectorDescriptor connectorDescriptor, MessageType messageType, KapuaMessage<?, ?> kapuaMessage) throws KapuaException {
// first step... from Kapua to device level dependent protocol (unknown)
Translator<KapuaMessage<?, ?>, DeviceMessage<?, ?>> translatorFromKapua = Translator
.getTranslatorFor(connectorDescriptor.getKapuaClass(messageType), connectorDescriptor.getDeviceClass(messageType));
Translator<KapuaMessage<?, ?>, DeviceMessage<?, ?>> translatorFromKapua = Translator.getTranslatorFor(connectorDescriptor.getKapuaClass(messageType), connectorDescriptor.getDeviceClass(messageType));
DeviceMessage<?, ?> deviceMessage = translatorFromKapua.translate(kapuaMessage);

// second step.... from device level dependent protocol (unknown) to jms
Expand Down Expand Up @@ -219,12 +216,12 @@ public static String convertJmsWildCardToMqtt(String jmsTopic) {

private static char convertWildcardJmsToMqtt(char c) {
switch (c) {
case '.':
return '/';
case '/':
return '.';
default:
return c;
case '.':
return '/';
case '/':
return '.';
default:
return c;
}
}

Expand Down Expand Up @@ -252,12 +249,12 @@ public static String convertMqttWildCardToJms(String mqttTopic) {

private static char convertWildcardMqttToJms(char c) {
switch (c) {
case '.':
return '/';
case '/':
return '.';
default:
return c;
case '.':
return '/';
case '/':
return '.';
default:
return c;
}
}

Expand Down
12 changes: 7 additions & 5 deletions broker/core/src/test/resources/locator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@
<api>org.eclipse.kapua.service.certificate.info.CertificateInfoService</api>
<api>org.eclipse.kapua.service.certificate.info.CertificateInfoFactory</api>

<!-- datastore -->
<api>org.eclipse.kapua.service.datastore.MessageStoreService</api>
<api>org.eclipse.kapua.service.datastore.ClientInfoRegistryService</api>
<api>org.eclipse.kapua.service.datastore.ClientInfoFactory</api>
<api>org.eclipse.kapua.service.datastore.ChannelInfoRegistryService</api>
<api>org.eclipse.kapua.service.datastore.ChannelInfoFactory</api>
<api>org.eclipse.kapua.service.datastore.MessageStoreService</api>
<api>org.eclipse.kapua.service.datastore.MessageStoreFactory</api>
<api>org.eclipse.kapua.service.datastore.MetricInfoRegistryService</api>
<!-- datastore scope configuration -->
<api>org.eclipse.kapua.service.datastore.MetricInfoFactory</api>
<api>org.eclipse.kapua.service.datastore.model.query.predicate.DatastorePredicateFactory</api>

<api>org.eclipse.kapua.model.config.metatype.KapuaMetatypeFactory</api>
<api>org.eclipse.kapua.service.datastore.DatastoreObjectFactory</api>
<api>org.eclipse.kapua.service.datastore.model.query.DatastorePredicateFactory</api>

<api>org.eclipse.kapua.service.device.call.DeviceCallFactory</api>
<api>org.eclipse.kapua.service.device.call.DeviceMessageFactory</api>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,20 @@
import org.eclipse.kapua.app.console.module.data.shared.util.KapuaGwtDataModelConverter;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.datastore.ChannelInfoFactory;
import org.eclipse.kapua.service.datastore.ChannelInfoRegistryService;
import org.eclipse.kapua.service.datastore.ClientInfoFactory;
import org.eclipse.kapua.service.datastore.ClientInfoRegistryService;
import org.eclipse.kapua.service.datastore.DatastoreObjectFactory;
import org.eclipse.kapua.service.datastore.MessageStoreFactory;
import org.eclipse.kapua.service.datastore.MessageStoreService;
import org.eclipse.kapua.service.datastore.MetricInfoFactory;
import org.eclipse.kapua.service.datastore.MetricInfoRegistryService;
import org.eclipse.kapua.service.datastore.internal.mediator.ClientInfoField;
import org.eclipse.kapua.service.datastore.internal.mediator.MessageField;
import org.eclipse.kapua.service.datastore.internal.mediator.MetricInfoField;
import org.eclipse.kapua.service.datastore.internal.model.query.ChannelMatchPredicateImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.ClientInfoQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.MessageQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.predicate.ChannelMatchPredicateImpl;
import org.eclipse.kapua.service.datastore.internal.schema.MessageSchema;
import org.eclipse.kapua.service.datastore.model.ChannelInfo;
import org.eclipse.kapua.service.datastore.model.ChannelInfoListResult;
Expand All @@ -58,11 +61,11 @@
import org.eclipse.kapua.service.datastore.model.MetricInfo;
import org.eclipse.kapua.service.datastore.model.MetricInfoListResult;
import org.eclipse.kapua.service.datastore.model.query.ChannelInfoQuery;
import org.eclipse.kapua.service.datastore.model.query.ChannelMatchPredicate;
import org.eclipse.kapua.service.datastore.model.query.ClientInfoQuery;
import org.eclipse.kapua.service.datastore.model.query.DatastorePredicateFactory;
import org.eclipse.kapua.service.datastore.model.query.MessageQuery;
import org.eclipse.kapua.service.datastore.model.query.MetricInfoQuery;
import org.eclipse.kapua.service.datastore.model.query.predicate.ChannelMatchPredicate;
import org.eclipse.kapua.service.datastore.model.query.predicate.DatastorePredicateFactory;
import org.eclipse.kapua.service.device.registry.Device;
import org.eclipse.kapua.service.device.registry.DeviceFactory;
import org.eclipse.kapua.service.device.registry.DeviceListResult;
Expand Down Expand Up @@ -91,15 +94,19 @@ public class GwtDataServiceImpl extends KapuaRemoteServiceServlet implements Gwt

private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();

private static final DatastoreObjectFactory DATASTORE_FACTORY = LOCATOR.getFactory(DatastoreObjectFactory.class);
private static final ChannelInfoFactory CHANNEL_INFO_FACTORY = LOCATOR.getFactory(ChannelInfoFactory.class);
private static final ClientInfoFactory CLIENT_INFO_FACTORY = LOCATOR.getFactory(ClientInfoFactory.class);
private static final MessageStoreFactory MESSAGE_STORE_FACTORY = LOCATOR.getFactory(MessageStoreFactory.class);
private static final MetricInfoFactory METRIC_INFO_FACTORY = LOCATOR.getFactory(MetricInfoFactory.class);

private static final DatastorePredicateFactory DATASTORE_PREDICATE_FACTORY = LOCATOR.getFactory(DatastorePredicateFactory.class);

@Override
public List<GwtTopic> findTopicsTree(String scopeId) throws GwtKapuaException {
List<GwtTopic> channelInfoList = new ArrayList<GwtTopic>();
HashMap<String, GwtTopic> topicMap = new HashMap<String, GwtTopic>();
ChannelInfoRegistryService channelInfoService = LOCATOR.getService(ChannelInfoRegistryService.class);
ChannelInfoQuery query = DATASTORE_FACTORY.newChannelInfoQuery(GwtKapuaCommonsModelConverter.convertKapuaId(scopeId));
ChannelInfoQuery query = CHANNEL_INFO_FACTORY.newQuery(GwtKapuaCommonsModelConverter.convertKapuaId(scopeId));
int offset = 0;
int limit = 250;
try {
Expand Down Expand Up @@ -271,17 +278,17 @@ public PagingLoadResult<GwtDatastoreDevice> findDevices(PagingLoadConfig config,
DeviceFactory deviceFactory = LOCATOR.getFactory(DeviceFactory.class);
List<GwtDatastoreDevice> devices = new ArrayList<GwtDatastoreDevice>();
KapuaId convertedScopeId = GwtKapuaCommonsModelConverter.convertKapuaId(scopeId);
ClientInfoQuery clientInfoQuery = DATASTORE_FACTORY.newClientInfoQuery(convertedScopeId);
ClientInfoQuery clientInfoQuery = CLIENT_INFO_FACTORY.newQuery(convertedScopeId);
if (!Strings.isNullOrEmpty(filter)) {
StorablePredicate predicate = new ChannelMatchPredicateImpl(ClientInfoField.CLIENT_ID.field(), filter);
StorablePredicate predicate = new ChannelMatchPredicateImpl(ClientInfoField.CLIENT_ID, filter);
clientInfoQuery.setPredicate(predicate);
}
clientInfoQuery.setAskTotalCount(true);
clientInfoQuery.setLimit(config.getLimit());
clientInfoQuery.setOffset(config.getOffset());
SortDirection sortDirection = config.getSortDir() == Style.SortDir.ASC ? SortDirection.ASC : SortDirection.DESC;
String sortField = config.getSortField().equals("friendlyDevice") ? ClientInfoField.CLIENT_ID.field() : ClientInfoField.TIMESTAMP.field();
clientInfoQuery.setSortFields(Collections.singletonList(SortField.of(sortDirection, sortField)));
clientInfoQuery.setSortFields(Collections.singletonList(SortField.of(sortField, sortDirection)));

ClientInfoListResult result = null;
try {
Expand Down Expand Up @@ -334,7 +341,7 @@ public ListLoadResult<GwtDatastoreAsset> findAssets(LoadConfig config, String sc
ChannelInfoRegistryService clientInfoService = LOCATOR.getService(ChannelInfoRegistryService.class);
List<GwtDatastoreAsset> asset = new ArrayList<GwtDatastoreAsset>();
KapuaId convertedScopeId = GwtKapuaCommonsModelConverter.convertKapuaId(scopeId);
ChannelInfoQuery query = DATASTORE_FACTORY.newChannelInfoQuery(convertedScopeId);
ChannelInfoQuery query = CHANNEL_INFO_FACTORY.newQuery(convertedScopeId);
query.setLimit(10000);
try {
ChannelInfoListResult result = clientInfoService.query(query);
Expand Down Expand Up @@ -418,7 +425,7 @@ public PagingLoadResult<GwtMessage> findMessagesByAssets(PagingLoadConfig loadCo

private ListLoadResult<GwtHeader> findHeaders(String scopeId, StorablePredicate predicate) throws GwtKapuaException {
MetricInfoRegistryService metricService = LOCATOR.getService(MetricInfoRegistryService.class);
MetricInfoQuery query = DATASTORE_FACTORY.newMetricInfoQuery(GwtKapuaCommonsModelConverter.convertKapuaId(scopeId));
MetricInfoQuery query = METRIC_INFO_FACTORY.newQuery(GwtKapuaCommonsModelConverter.convertKapuaId(scopeId));
query.setLimit(10000);
if (predicate != null) {
query.setPredicate(predicate);
Expand All @@ -444,14 +451,14 @@ private PagingLoadResult<GwtMessage> findMessages(PagingLoadConfig loadConfig, S
MessageStoreService messageService = LOCATOR.getService(MessageStoreService.class);
List<GwtMessage> messages;
int totalLength = 0;
MessageQuery query = DATASTORE_FACTORY.newDatastoreMessageQuery(GwtKapuaCommonsModelConverter.convertKapuaId(scopeId));
MessageQuery query = MESSAGE_STORE_FACTORY.newQuery(GwtKapuaCommonsModelConverter.convertKapuaId(scopeId));
query.setLimit(loadConfig.getLimit());
query.setOffset(loadConfig.getOffset());
AndPredicate andPredicate = DATASTORE_PREDICATE_FACTORY.newAndPredicate();
if (predicate != null) {
andPredicate.getPredicates().add(predicate);
}
RangePredicate dateRangePredicate = DATASTORE_PREDICATE_FACTORY.newRangePredicate(MessageField.TIMESTAMP.field(), startDate, endDate);
RangePredicate dateRangePredicate = DATASTORE_PREDICATE_FACTORY.newRangePredicate(MessageField.TIMESTAMP, startDate, endDate);
andPredicate.getPredicates().add(dateRangePredicate);
if (headers != null) {
OrPredicate metricsPredicate = DATASTORE_PREDICATE_FACTORY.newOrPredicate();
Expand All @@ -468,7 +475,7 @@ private PagingLoadResult<GwtMessage> findMessages(PagingLoadConfig loadConfig, S
} else if (sortField.equals("clientId")) {
sortField = MessageField.CLIENT_ID.field();
}
query.setSortFields(Collections.singletonList(SortField.of(SortDirection.valueOf(loadConfig.getSortDir().name()), sortField)));
query.setSortFields(Collections.singletonList(SortField.of(sortField, SortDirection.valueOf(loadConfig.getSortDir().name()))));
}
messages = getMessagesList(query, headers);
try {
Expand Down
Loading

0 comments on commit 88a036f

Please sign in to comment.