From 97eac4687c2c2301f34c26608ba8f959a46baf09 Mon Sep 17 00:00:00 2001 From: riccardomodanese Date: Tue, 12 May 2020 16:26:16 +0200 Subject: [PATCH 1/3] Security broker filter cleanup - add authorizer Signed-off-by: riccardomodanese --- .../core/plugin/KapuaConnectionContext.java | 45 +++++ .../core/plugin/KapuaSecurityAclChecker.java | 24 +++ .../plugin/KapuaSecurityBrokerFilter.java | 189 +++++++++--------- .../core/plugin/KapuaSecurityContext.java | 32 ++- .../AdminAuthenticationLogic.java | 1 + .../UserAuthenticationLogic.java | 24 +-- .../core/plugin/authorization/Authorizer.java | 33 +++ .../authorization/DefaultAuthorizer.java | 63 ++++++ .../broker/core/setting/BrokerSettingKey.java | 4 + .../resources/kapua-broker-setting.properties | 3 + 10 files changed, 293 insertions(+), 125 deletions(-) create mode 100644 broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityAclChecker.java create mode 100644 broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/Authorizer.java create mode 100644 broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/DefaultAuthorizer.java diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java index 45b85174ecb..326e46b7b6a 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java @@ -33,6 +33,12 @@ public class KapuaConnectionContext { protected static final Logger logger = LoggerFactory.getLogger(KapuaConnectionContext.class); + public static final int BROKER_CONNECT_IDX = 0; + public static final int DEVICE_MANAGE_IDX = 1; + public static final int DATA_VIEW_IDX = 2; + public static final int DATA_MANAGE_IDX = 3; + public static final int DEVICE_VIEW_IDX = 4; + private String brokerId; private KapuaPrincipal principal; private String userName; @@ -50,6 +56,9 @@ public class KapuaConnectionContext { private String brokerIpOrHostName; private Certificate[] clientCertificates; + private boolean admin; + private boolean provisioning; + //flag to help the correct lifecycle handling private boolean missing; @@ -188,6 +197,42 @@ public boolean[] getHasPermissions() { return hasPermissions; } + public boolean isAdmin() { + return admin; + } + + public void setAdmin(boolean admin) { + this.admin = admin; + } + + public boolean isProvisioning() { + return provisioning; + } + + public void setProvisioning(boolean provisioning) { + this.provisioning = provisioning; + } + + public boolean isBrokerConnect() { + return hasPermissions[BROKER_CONNECT_IDX]; + } + + public boolean isDeviceView() { + return hasPermissions[DEVICE_VIEW_IDX]; + } + + public boolean isDeviceManage() { + return hasPermissions[DEVICE_MANAGE_IDX]; + } + + public boolean isDataView() { + return hasPermissions[DATA_VIEW_IDX]; + } + + public boolean isDataManage() { + return hasPermissions[DATA_MANAGE_IDX]; + } + public String getBrokerIpOrHostName() { return brokerIpOrHostName; } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityAclChecker.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityAclChecker.java new file mode 100644 index 00000000000..56d45a44670 --- /dev/null +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityAclChecker.java @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright (c) 2020 Eurotech and/or its affiliates and others + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eurotech - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.broker.core.plugin; + +import org.apache.activemq.security.AuthorizationMap; + +public class KapuaSecurityAclChecker { + + private AuthorizationMap authMap; + private boolean hasDataView; + private boolean hasDataManage; + private boolean hasDeviceView; + private boolean hasDeviceManage; + +} diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java index f255f7ca6c3..e8df74a4918 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java @@ -42,6 +42,9 @@ import org.eclipse.kapua.broker.core.message.MessageConstants; import org.eclipse.kapua.broker.core.plugin.authentication.Authenticator; import org.eclipse.kapua.broker.core.plugin.authentication.DefaultAuthenticator; +import org.eclipse.kapua.broker.core.plugin.authorization.Authorizer; +import org.eclipse.kapua.broker.core.plugin.authorization.Authorizer.ActionType; +import org.eclipse.kapua.broker.core.plugin.authorization.DefaultAuthorizer; import org.eclipse.kapua.broker.core.plugin.metric.LoginMetric; import org.eclipse.kapua.broker.core.plugin.metric.PublishMetric; import org.eclipse.kapua.broker.core.plugin.metric.SubscribeMetric; @@ -68,7 +71,6 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; -import javax.jms.MessageListener; import javax.security.auth.login.CredentialException; import java.io.IOException; import java.text.MessageFormat; @@ -111,6 +113,7 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { private static final String BROKER_IP_RESOLVER_CLASS_NAME; private static final String BROKER_ID_RESOLVER_CLASS_NAME; private static final String AUTHENTICATOR_CLASS_NAME; + private static final String AUTHORIZER_CLASS_NAME; private static final Long STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME; private static boolean stealingLinkEnabled; private Future stealingLinkManagerFuture; @@ -121,6 +124,7 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { BROKER_IP_RESOLVER_CLASS_NAME = config.getString(BrokerSettingKey.BROKER_IP_RESOLVER_CLASS_NAME); BROKER_ID_RESOLVER_CLASS_NAME = config.getString(BrokerSettingKey.BROKER_ID_RESOLVER_CLASS_NAME); AUTHENTICATOR_CLASS_NAME = config.getString(BrokerSettingKey.AUTHENTICATOR_CLASS_NAME); + AUTHORIZER_CLASS_NAME = config.getString(BrokerSettingKey.AUTHORIZER_CLASS_NAME); STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME = config.getLong(BrokerSettingKey.STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME); stealingLinkEnabled = config.getBoolean(BrokerSettingKey.BROKER_STEALING_LINK_ENABLED); } @@ -133,6 +137,7 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { protected static final Map CONNECTION_MAP = new ConcurrentHashMap<>(); private static final String CONNECTOR_NAME_VM = String.format("vm://%s", BrokerSetting.getInstance().getString(BrokerSettingKey.BROKER_NAME)); private Authenticator authenticator; + private Authorizer authorizer; private AuthenticationService authenticationService = KapuaLocator.getInstance().getService(AuthenticationService.class); private CredentialsFactory credentialsFactory = KapuaLocator.getInstance().getFactory(CredentialsFactory.class); @@ -158,8 +163,10 @@ public KapuaSecurityBrokerFilter(Broker next) { public void start() throws Exception { logger.info(">>> Security broker filter: calling start..."); - logger.info(">>> Security broker filter: calling start... Initialize authenticator"); + logger.info(">>> Security broker filter: calling start... Initialize authenticator {}", AUTHENTICATOR_CLASS_NAME); authenticator = ClassUtil.newInstance(AUTHENTICATOR_CLASS_NAME, DefaultAuthenticator.class, new Class[] { Map.class }, new Object[] { options }); + logger.info(">>> Security broker filter: calling start... Initialize authorizer {}", AUTHORIZER_CLASS_NAME); + authorizer = ClassUtil.newInstance(AUTHORIZER_CLASS_NAME, DefaultAuthorizer.class, new Class[] {}, new Object[] {}); logger.info(">>> Security broker filter: calling start... Initialize broker ip resolver"); brokerIpResolver = ClassUtil.newInstance(BROKER_IP_RESOLVER_CLASS_NAME, DefaultBrokerIpResolver.class); logger.info(">>> Security broker filter: calling start... Initialize broker id resolver"); @@ -221,11 +228,8 @@ protected void registerStealingLinkManager() { */ protected void subscribeStealingLinkManager() throws JMSException, KapuaException { stealingLinkManagerConsumer = new JmsConsumerWrapper( - String.format(KapuaSecurityBrokerFilter.CONNECT_MESSAGE_TOPIC_PATTERN + ".>", SystemSetting.getInstance().getMessageClassifier(), "*", "*"), - false, true, new MessageListener() { - - @Override - public void onMessage(javax.jms.Message message) { + String.format(KapuaSecurityBrokerFilter.CONNECT_MESSAGE_TOPIC_PATTERN + ".>", SystemSetting.getInstance().getMessageClassifier(), "*", "*"), + false, true, message -> { // just for logging purpose String destination = null; String messageId = null; @@ -241,63 +245,67 @@ public void onMessage(javax.jms.Message message) { messageBrokerId = message.getStringProperty(MessageConstants.PROPERTY_BROKER_ID); if (!brokerId.equals(messageBrokerId)) { logger.debug("Received connect message from another broker id: '{}' topic: '{}' - message id: '{}'", messageBrokerId, destination, messageId); - KapuaConnectionContext kcc = null; - // try parsing from message context (if the message is coming from other brokers it has these fields evaluated) - try { - logger.debug("Get connected device informations from the message session"); - kcc = parseMessageSession(message); - } catch (JMSException | KapuaException e) { - logger.debug("Get connected device informations from the topic"); - // otherwise looking for these informations by looking at the topic - kcc = parseTopicInfo(message); - } + KapuaConnectionContext kcc = getKapuaConnectionContext(message); if (CONNECTION_MAP.get(kcc.getFullClientId()) != null) { logger.debug("Stealing link detected - broker id: '{}' topic: '{}' - message id: '{}'", messageBrokerId, destination, messageId); // iterate over all connected clients - for (Connection conn : getClients()) { - logger.debug("Checking if {} equals {}", kcc.getFullClientId(), conn.getConnectionId()); - if (kcc.getFullClientId().equals(conn.getConnectionId())) { - // Include Exception to notify the security broker filter - logger.info("New connection detected for {} on another broker. Stopping the current connection...", kcc.getFullClientId()); - loginMetric.getRemoteStealingLinkDisconnect().inc(); - conn.serviceExceptionAsync(new IOException(new KapuaDuplicateClientIdException(kcc.getFullClientId()))); - - // assume only one connection since this broker should have already handled any duplicates - return; - } - } + disconnectClients(kcc); } } } catch (Exception e) { logger.error("Cannot enforce stealing link check in the received message on topic '{}'", destination, e); } - } + }); + } - private KapuaConnectionContext parseMessageSession(javax.jms.Message message) throws JMSException, KapuaException { - Long scopeId = message.propertyExists(MessageConstants.PROPERTY_SCOPE_ID) ? message.getLongProperty(MessageConstants.PROPERTY_SCOPE_ID) : null; - String clientId = message.getStringProperty(MessageConstants.PROPERTY_CLIENT_ID); - if (scopeId == null || scopeId <= 0 || StringUtils.isEmpty(clientId)) { - logger.debug("Invalid message context. Try parsing the topic."); - throw new KapuaException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "Invalid message context"); - } - return new KapuaConnectionContext(scopeId, clientId, MULTI_ACCOUNT_CLIENT_ID); + private void disconnectClients(KapuaConnectionContext kcc) throws Exception { + for (Connection conn : getClients()) { + logger.debug("Checking if {} equals {}", kcc.getFullClientId(), conn.getConnectionId()); + if (kcc.getFullClientId().equals(conn.getConnectionId())) { + // Include Exception to notify the security broker filter + logger.info("New connection detected for {} on another broker. Stopping the current connection...", kcc.getFullClientId()); + loginMetric.getRemoteStealingLinkDisconnect().inc(); + conn.serviceExceptionAsync(new IOException(new KapuaDuplicateClientIdException(kcc.getFullClientId()))); + // assume only one connection since this broker should have already handled any duplicates + return; } + } + } - private KapuaConnectionContext parseTopicInfo(javax.jms.Message message) throws JMSException, KapuaException { - String originalTopic = message.getStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC); - String[] topic = originalTopic.split("\\."); - if (topic.length != 5) { - logger.error("Invalid topic format. Cannot process connect message."); - throw new KapuaException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "wrong connect message topic"); - } - String accountName = topic[1]; - String clientId = topic[2]; - Account account = KapuaSecurityUtils.doPrivileged(() -> accountService.findByName(accountName)); - Long scopeId = account.getId().getId().longValue(); - return new KapuaConnectionContext(scopeId, clientId, MULTI_ACCOUNT_CLIENT_ID); - } + private KapuaConnectionContext getKapuaConnectionContext(javax.jms.Message message) throws JMSException, KapuaException { + // try parsing from message context (if the message is coming from other brokers it has these fields evaluated) + try { + logger.debug("Get connected device informations from the message session"); + return parseMessageSession(message); + } catch (JMSException | KapuaException e) { + logger.debug("Get connected device informations from the topic"); + // otherwise looking for these informations by looking at the topic + return parseTopicInfo(message); + } + } - }); + private KapuaConnectionContext parseMessageSession(javax.jms.Message message) throws JMSException, KapuaException { + Long scopeId = message.propertyExists(MessageConstants.PROPERTY_SCOPE_ID) ? message.getLongProperty(MessageConstants.PROPERTY_SCOPE_ID) : null; + String clientId = message.getStringProperty(MessageConstants.PROPERTY_CLIENT_ID); + if (scopeId == null || scopeId <= 0 || StringUtils.isEmpty(clientId)) { + logger.debug("Invalid message context. Try parsing the topic."); + throw new KapuaException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "Invalid message context"); + } + return new KapuaConnectionContext(scopeId, clientId, MULTI_ACCOUNT_CLIENT_ID); + } + + private KapuaConnectionContext parseTopicInfo(javax.jms.Message message) throws JMSException, KapuaException { + String originalTopic = message.getStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC); + String[] topic = originalTopic.split("\\."); + if (topic.length != 5) { + logger.error("Invalid topic format. Cannot process connect message."); + throw new KapuaException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "wrong connect message topic"); + } + String accountName = topic[1]; + String clientId = topic[2]; + Account account = KapuaSecurityUtils.doPrivileged(() -> accountService.findByName(accountName)); + Long scopeId = account.getId().getId().longValue(); + return new KapuaConnectionContext(scopeId, clientId, MULTI_ACCOUNT_CLIENT_ID); } /** @@ -507,6 +515,18 @@ public void removeConnection(ConnectionContext context, ConnectionInfo info, Thr context.setSecurityContext(null); } + private Account getAccount(KapuaId accountId) { + try { + return KapuaSecurityUtils.doPrivileged(() -> accountService.find(accountId)); + } + catch(AuthenticationException e) { + throw (AuthenticationException) e; + } + catch (Exception e) { + throw new ShiroException("Error while find account!", e); + } + } + // ------------------------------------------------------------------ // // Destinations @@ -545,22 +565,19 @@ private void internalSend(ProducerBrokerExchange producerExchange, Message messa messageSend.setProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().toEpochMilli()); if (!isBrokerContext(producerExchange.getConnectionContext())) { KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(producerExchange.getConnectionContext()); - if (!messageSend.getDestination().isTemporary()) { - Set allowedACLs = kapuaSecurityContext.getAuthorizationMap().getWriteACLs(messageSend.getDestination()); - if (allowedACLs != null && !kapuaSecurityContext.isInOneOf(allowedACLs)) { - String message = MessageFormat.format("User {0} ({1} - {2} - conn id {3}) is not authorized to write to: {4}", - kapuaSecurityContext.getUserName(), - ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId(), - ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientIp(), - kapuaSecurityContext.getConnectionId(), - messageSend.getDestination()); - logger.warn(message); - publishMetric.getMessageSizeNotAllowed().update(messageSend.getSize()); - publishMetric.getNotAllowedMessages().inc(); - // IMPORTANT - // restored the throw exception because otherwise we got acl's issues - throw new SecurityException(message); - } + if (!messageSend.getDestination().isTemporary() && !authorizer.isAllowed(ActionType.WRITE, kapuaSecurityContext, messageSend.getDestination())) { + String message = MessageFormat.format("User {0} ({1} - {2} - conn id {3}) is not authorized to write to: {4}", + kapuaSecurityContext.getUserName(), + ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId(), + ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientIp(), + kapuaSecurityContext.getConnectionId(), + messageSend.getDestination()); + logger.warn(message); + publishMetric.getMessageSizeNotAllowed().update(messageSend.getSize()); + publishMetric.getNotAllowedMessages().inc(); + // IMPORTANT + // restored the throw exception because otherwise we got acl's issues + throw new SecurityException(message); } if (isLwt(originalTopic)) { //handle the missing message case @@ -627,23 +644,18 @@ private Subscription internalAddConsumer(ConnectionContext context, ConsumerInfo destination = sb.toString(); } info.getDestination().setPhysicalName(destination); - - if (!info.getDestination().isTemporary()) { - Set allowedACLs = null; - allowedACLs = kapuaSecurityContext.getAuthorizationMap().getReadACLs(info.getDestination()); - if (allowedACLs != null && !kapuaSecurityContext.isInOneOf(allowedACLs)) { - String message = MessageFormat.format("User {0} ({1} - {2} - conn id {3}) is not authorized to read from: {4}", - kapuaSecurityContext.getUserName(), - ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId(), - ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientIp(), - kapuaSecurityContext.getConnectionId(), - info.getDestination()); - logger.warn(message); - subscribeMetric.getNotAllowedMessages().inc(); - // IMPORTANT - // restored the throw exception because otherwise we got acl's issues - throw new SecurityException(message); - } + if (!info.getDestination().isTemporary() && !authorizer.isAllowed(ActionType.READ, kapuaSecurityContext, info.getDestination())) { + String message = MessageFormat.format("User {0} ({1} - {2} - conn id {3}) is not authorized to read from: {4}", + kapuaSecurityContext.getUserName(), + ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId(), + ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientIp(), + kapuaSecurityContext.getConnectionId(), + info.getDestination()); + logger.warn(message); + subscribeMetric.getNotAllowedMessages().inc(); + // IMPORTANT + // restored the throw exception because otherwise we got acl's issues + throw new SecurityException(message); } } subscribeMetric.getAllowedMessages().inc(); @@ -693,11 +705,10 @@ protected AuthorizationEntry createAuthorizationEntry(KapuaConnectionContext kcc protected KapuaSecurityContext getKapuaSecurityContext(ConnectionContext context) { SecurityContext securityContext = context.getSecurityContext(); - if (!(securityContext instanceof KapuaSecurityContext)) { - throw new SecurityException("Invalid SecurityContext."); + if (securityContext instanceof KapuaSecurityContext) { + return (KapuaSecurityContext) securityContext; } - - return (KapuaSecurityContext) securityContext; + throw new SecurityException("Invalid SecurityContext."); } } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java index 048a25de221..17019b04916 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java @@ -17,6 +17,7 @@ import org.apache.activemq.security.AuthorizationMap; import org.apache.activemq.security.SecurityContext; +import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.commons.security.KapuaSession; import org.eclipse.kapua.model.id.KapuaId; import org.eclipse.kapua.service.authentication.KapuaPrincipal; @@ -36,10 +37,8 @@ public class KapuaSecurityContext extends SecurityContext { private String brokerConnectionId; private AuthorizationMap authMap; - private boolean hasDataView; - private boolean hasDataManage; - private boolean hasDeviceView; - private boolean hasDeviceManage; + + private KapuaConnectionContext kcc; //flag to help the correct lifecycle handling private boolean missing; @@ -48,6 +47,7 @@ public KapuaSecurityContext(KapuaConnectionContext kcc, AuthorizationMap authMap) { super(kcc.getPrincipal().getName()); + this.kcc = kcc; this.principal = kcc.getPrincipal(); this.kapuaSession = KapuaSession.createFrom(); principals = new HashSet(); @@ -59,6 +59,14 @@ public KapuaSecurityContext(KapuaConnectionContext kcc, this.brokerConnectionId = kcc.getConnectionId(); } + public KapuaPrincipal getKapuaPrincipal() throws KapuaException { + return principal; + } + + public KapuaConnectionContext getKapuaConnectionContext() { + return kcc; + } + public Principal getMainPrincipal() { return principal; } @@ -83,22 +91,6 @@ public ConnectorDescriptor getConnectorDescriptor() { return connectorDescriptor; } - public boolean hasDataView() { - return hasDataView; - } - - public boolean hasDataManage() { - return hasDataManage; - } - - public boolean hasDeviceView() { - return hasDeviceView; - } - - public boolean hasDeviceManage() { - return hasDeviceManage; - } - public KapuaSession getKapuaSession() { return kapuaSession; } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java index 3a663b07f08..34414aac69f 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java @@ -37,6 +37,7 @@ public AdminAuthenticationLogic(Map options) { @Override public List connect(KapuaConnectionContext kcc) throws KapuaException { + kcc.setAdmin(true); return buildAuthorizationMap(kcc); } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java index b469e8f1454..ac6130f636e 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java @@ -44,11 +44,6 @@ public class UserAuthenticationLogic extends AuthenticationLogic { protected String aclDataAccCli; protected String aclCtrlAccNotify; - protected static final int BROKER_CONNECT_IDX = 0; - protected static final int DEVICE_MANAGE_IDX = 1; - protected static final int DATA_VIEW_IDX = 2; - protected static final int DATA_MANAGE_IDX = 3; - /** * Default constructor * @@ -73,11 +68,9 @@ public List connect(KapuaConnectionContext kcc) throws Kapua Context loginNormalUserTimeContext = loginMetric.getNormalUserTime().time(); Context loginCheckAccessTimeContext = loginMetric.getCheckAccessTime().time(); - boolean[] hasPermissions = checkPermissions(kcc); + updatePermissions(kcc); loginCheckAccessTimeContext.stop(); - kcc.updatePermissions(hasPermissions); - Context loginFindDeviceConnectionTimeContext = loginMetric.getFindDeviceConnectionTime().time(); DeviceConnection deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kcc.getScopeId(), kcc.getClientId())); loginFindDeviceConnectionTimeContext.stop(); @@ -159,14 +152,14 @@ protected List buildAuthorizationMap(KapuaConnectionContext // addConnection checks BROKER_CONNECT_IDX permission before call this method // then here user has BROKER_CONNECT_IDX permission and if check isn't needed // if (hasPermissions[BROKER_CONNECT_IDX]) { - if (kcc.getHasPermissions()[DEVICE_MANAGE_IDX]) { + if (kcc.isDeviceManage()) { ael.add(createAuthorizationEntry(kcc, Acl.ALL, formatAcl(aclCtrlAcc, kcc))); } else { ael.add(createAuthorizationEntry(kcc, Acl.ALL, formatAclFull(aclCtrlAccCli, kcc))); } - if (kcc.getHasPermissions()[DATA_MANAGE_IDX]) { + if (kcc.isDataManage()) { ael.add(createAuthorizationEntry(kcc, Acl.ALL, formatAcl(aclDataAcc, kcc))); - } else if (kcc.getHasPermissions()[DATA_VIEW_IDX]) { + } else if (kcc.isDataView()) { ael.add(createAuthorizationEntry(kcc, Acl.READ_ADMIN, formatAcl(aclDataAcc, kcc))); ael.add(createAuthorizationEntry(kcc, Acl.WRITE, formatAclFull(aclDataAccCli, kcc))); } else { @@ -182,19 +175,18 @@ protected List buildAuthorizationMap(KapuaConnectionContext return ael; } - protected boolean[] checkPermissions(KapuaConnectionContext kcc) throws KapuaException { + protected void updatePermissions(KapuaConnectionContext kcc) throws KapuaException { List permissions = new ArrayList<>(); permissions.add(permissionFactory.newPermission(BROKER_DOMAIN, Actions.connect, kcc.getScopeId())); permissions.add(permissionFactory.newPermission(DEVICE_MANAGEMENT_DOMAIN, Actions.write, kcc.getScopeId())); permissions.add(permissionFactory.newPermission(DATASTORE_DOMAIN, Actions.read, kcc.getScopeId())); permissions.add(permissionFactory.newPermission(DATASTORE_DOMAIN, Actions.write, kcc.getScopeId())); - boolean[] hasPermissions = authorizationService.isPermitted(permissions); + permissions.add(permissionFactory.newPermission(DEVICE_MANAGEMENT_DOMAIN, Actions.read, kcc.getScopeId())); + kcc.updatePermissions(authorizationService.isPermitted(permissions)); - if (!hasPermissions[BROKER_CONNECT_IDX]) { + if (!kcc.isBrokerConnect()) { throw new KapuaIllegalAccessException(permissionFactory.newPermission(BROKER_DOMAIN, Actions.connect, kcc.getScopeId()).toString()); } - - return hasPermissions; } /** diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/Authorizer.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/Authorizer.java new file mode 100644 index 00000000000..5a1f375e10d --- /dev/null +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/Authorizer.java @@ -0,0 +1,33 @@ +/******************************************************************************* + * Copyright (c) 2020 Eurotech and/or its affiliates and others + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eurotech - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.broker.core.plugin.authorization; + +import org.apache.activemq.command.ActiveMQDestination; +import org.eclipse.kapua.KapuaException; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; + +/** + * Broker authorizer definition.
+ * This class is meant to allow custom authorization pluggability. + * + */ +public interface Authorizer { + + public enum ActionType { + READ, + WRITE, + ADMIN + } + + boolean isAllowed(ActionType actionType, KapuaSecurityContext kapuaSecurityContext, ActiveMQDestination destination) throws KapuaException; + +} diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/DefaultAuthorizer.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/DefaultAuthorizer.java new file mode 100644 index 00000000000..9fea2a0bed0 --- /dev/null +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authorization/DefaultAuthorizer.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * Copyright (c) 2020 Eurotech and/or its affiliates and others + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eurotech - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.broker.core.plugin.authorization; + +import java.util.Set; + +import org.apache.activemq.command.ActiveMQDestination; +import org.eclipse.kapua.KapuaException; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; + +/** + * Default authorizer implementation. + * + */ +public class DefaultAuthorizer implements Authorizer { + + @Override + public boolean isAllowed(ActionType actionType, KapuaSecurityContext kapuaSecurityContext, ActiveMQDestination destination) throws KapuaException { + switch (actionType) { + case READ: + return isConsumeAllowed(kapuaSecurityContext, destination); + case WRITE: + return isSendAllowed(kapuaSecurityContext, destination); + case ADMIN: + return isAdminAllowed(kapuaSecurityContext, destination); + default: + return false; + } + } + + protected boolean isSendAllowed(KapuaSecurityContext kapuaSecurityContext, ActiveMQDestination destination) throws KapuaException { + Set allowedACLs = kapuaSecurityContext.getAuthorizationMap().getWriteACLs(destination); + if (allowedACLs != null && !kapuaSecurityContext.isInOneOf(allowedACLs)) { + return false; + } + return true; + } + + protected boolean isConsumeAllowed(KapuaSecurityContext kapuaSecurityContext, ActiveMQDestination destination) throws KapuaException { + Set allowedACLs = kapuaSecurityContext.getAuthorizationMap().getReadACLs(destination); + if (allowedACLs != null && !kapuaSecurityContext.isInOneOf(allowedACLs)) { + return false; + } + return true; + } + + protected boolean isAdminAllowed(KapuaSecurityContext kapuaSecurityContext, ActiveMQDestination destination) throws KapuaException { + Set allowedACLs = kapuaSecurityContext.getAuthorizationMap().getAdminACLs(destination); + if (allowedACLs != null && !kapuaSecurityContext.isInOneOf(allowedACLs)) { + return false; + } + return true; + } +} diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java index 56c39f4b4df..0f64094c5c5 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java @@ -51,6 +51,10 @@ public enum BrokerSettingKey implements SettingKey { * Authenticator implementation */ AUTHENTICATOR_CLASS_NAME("broker.authenticator_class_name"), + /** + * Authorizer implementation + */ + AUTHORIZER_CLASS_NAME("broker.authorizer_class_name"), /** * Enable/disable the clustered stealing link feature */ diff --git a/broker/core/src/main/resources/kapua-broker-setting.properties b/broker/core/src/main/resources/kapua-broker-setting.properties index 2efbedfa1e3..fca82e243ca 100644 --- a/broker/core/src/main/resources/kapua-broker-setting.properties +++ b/broker/core/src/main/resources/kapua-broker-setting.properties @@ -11,6 +11,9 @@ broker.jaxb_context_class_name=org.eclipse.kapua.broker.core.BrokerJAXBContextPr #Authenticator implementation #A custom implementation will be used if provided, otherwise the default implementation is used. broker.authenticator_class_name= +#Authorizer implementation +#A custom implementation will be used if provided, otherwise the default implementation is used. +broker.authorizer_class_name= #System message creator custom implementation. #A custom implementation will be used if provided (implementation of the SystemMessageCreator interface), otherwise the default implementation is used. broker.system.message_creator_class_name= From d99d2941e6917648fb9eb9bdf0e1c36d18e4dde9 Mon Sep 17 00:00:00 2001 From: riccardomodanese Date: Thu, 14 May 2020 14:00:20 +0200 Subject: [PATCH 2/3] Security broker filter cleanup - merge security/connection context Signed-off-by: riccardomodanese --- .../system/DefaultSystemMessageCreator.java | 8 +- .../message/system/SystemMessageCreator.java | 6 +- .../core/plugin/KapuaConnectionContext.java | 259 ------------------ .../plugin/KapuaSecurityBrokerFilter.java | 163 +++++------ .../core/plugin/KapuaSecurityContext.java | 232 ++++++++++++++-- .../AdminAuthenticationLogic.java | 24 +- .../authentication/AuthenticationLogic.java | 54 ++-- .../plugin/authentication/Authenticator.java | 18 +- .../authentication/DefaultAuthenticator.java | 42 +-- .../UserAuthenticationLogic.java | 130 ++++----- .../pool/JmsAssistantProducerWrapper.java | 10 +- .../broker/core/setting/BrokerSettingKey.java | 4 + .../resources/kapua-broker-setting.properties | 1 + .../resources/kapua-broker-setting.properties | 1 + 14 files changed, 443 insertions(+), 509 deletions(-) delete mode 100644 broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java index 078556f2e52..24c6a8a6a3b 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/DefaultSystemMessageCreator.java @@ -13,7 +13,7 @@ import java.util.Map; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; import com.google.common.base.Splitter; @@ -31,11 +31,11 @@ public class DefaultSystemMessageCreator implements SystemMessageCreator { private static final String USERNAME_KEY = "Username"; @Override - public String createMessage(SystemMessageType systemMessageType, KapuaConnectionContext kbc) { + public String createMessage(SystemMessageType systemMessageType, KapuaSecurityContext kapuaSecurityContext) { StringBuilder builder = new StringBuilder(); builder.append(EVENT_KEY).append(PAIR_SEPARATOR).append(systemMessageType.name()); - builder.append(FIELD_SEPARATOR).append(DEVICE_ID_KEY).append(PAIR_SEPARATOR).append(kbc.getClientId()); - builder.append(FIELD_SEPARATOR).append(USERNAME_KEY).append(PAIR_SEPARATOR).append(kbc.getUserName()); + builder.append(FIELD_SEPARATOR).append(DEVICE_ID_KEY).append(PAIR_SEPARATOR).append(kapuaSecurityContext.getClientId()); + builder.append(FIELD_SEPARATOR).append(USERNAME_KEY).append(PAIR_SEPARATOR).append(kapuaSecurityContext.getUserName()); return builder.toString(); } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/SystemMessageCreator.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/SystemMessageCreator.java index 251452fc32d..ac8f83934ed 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/SystemMessageCreator.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/message/system/SystemMessageCreator.java @@ -11,7 +11,7 @@ *******************************************************************************/ package org.eclipse.kapua.broker.core.message.system; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; /** * System message creator (i.e message sent by the broker on device connect) @@ -29,9 +29,9 @@ enum SystemMessageType { * Create a system message * * @param systemMessageType - * @param kbc + * @param kapuaSecurityContext * @return */ - String createMessage(SystemMessageType systemMessageType, KapuaConnectionContext kbc); + String createMessage(SystemMessageType systemMessageType, KapuaSecurityContext kapuaSecurityContext); } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java deleted file mode 100644 index 326e46b7b6a..00000000000 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaConnectionContext.java +++ /dev/null @@ -1,259 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2017, 2020 Eurotech and/or its affiliates and others - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Eurotech - initial API and implementation - *******************************************************************************/ -package org.eclipse.kapua.broker.core.plugin; - -import org.apache.activemq.command.ConnectionInfo; -import org.eclipse.kapua.model.id.KapuaId; -import org.eclipse.kapua.service.authentication.KapuaPrincipal; -import org.eclipse.kapua.service.authentication.token.AccessToken; -import org.eclipse.kapua.service.device.registry.connection.DeviceConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.security.cert.Certificate; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.List; - -/** - * Connection information container - * - * @since 1.0 - */ -public class KapuaConnectionContext { - - protected static final Logger logger = LoggerFactory.getLogger(KapuaConnectionContext.class); - - public static final int BROKER_CONNECT_IDX = 0; - public static final int DEVICE_MANAGE_IDX = 1; - public static final int DATA_VIEW_IDX = 2; - public static final int DATA_MANAGE_IDX = 3; - public static final int DEVICE_VIEW_IDX = 4; - - private String brokerId; - private KapuaPrincipal principal; - private String userName; - private KapuaId scopeId; - private KapuaId userId; - private String accountName; - private String clientId; - private String fullClientId; - private String clientIp; - private String connectionId; - private String oldConnectionId; - private KapuaId kapuaConnectionId; - private ConnectorDescriptor connectorDescriptor; - private boolean[] hasPermissions; - private String brokerIpOrHostName; - private Certificate[] clientCertificates; - - private boolean admin; - private boolean provisioning; - - //flag to help the correct lifecycle handling - private boolean missing; - - // use to track the allowed destinations for debug purpose - private List authDestinations; - - public KapuaConnectionContext(Long scopeId, String clientId, String fullClientIdPattern) { - authDestinations = new ArrayList<>(); - this.clientId = clientId; - updateFullClientId(scopeId, fullClientIdPattern); - } - - public KapuaConnectionContext(String brokerId, ConnectionInfo info) { - authDestinations = new ArrayList<>(); - this.brokerId = brokerId; - userName = info.getUserName(); - clientId = info.getClientId(); - clientIp = info.getClientIp(); - connectionId = info.getConnectionId().getValue(); - if(info.getTransportContext() instanceof Certificate[]) { - clientCertificates = (Certificate[]) info.getTransportContext(); - } - } - - public KapuaConnectionContext(String brokerId, String brokerIpOrHostName, KapuaPrincipal kapuaPrincipal, String accountName, ConnectionInfo info, String fullClientIdPattern, boolean missing) { - authDestinations = new ArrayList<>(); - this.brokerId = brokerId; - this.brokerIpOrHostName = brokerIpOrHostName; - this.accountName = accountName; - this.missing = missing; - userName = info.getUserName(); - clientId = kapuaPrincipal.getClientId(); - scopeId = kapuaPrincipal.getAccountId(); - clientIp = info.getClientIp(); - connectionId = info.getConnectionId().getValue(); - updateFullClientId(fullClientIdPattern); - } - - public void update(AccessToken accessToken, String accountName, KapuaId scopeId, KapuaId userId, String connectorName, String brokerIpOrHostName, String fullClientIdPattern) { - this.accountName = accountName; - this.scopeId = scopeId; - this.userId = userId; - this.brokerIpOrHostName = brokerIpOrHostName; - connectorDescriptor = ConnectorDescriptorProviders.getDescriptor(connectorName); - if (connectorDescriptor == null) { - throw new IllegalStateException(String.format("Unable to find connector descriptor for connector '%s'", connectorName)); - } - updateFullClientId(fullClientIdPattern); - principal = new KapuaPrincipalImpl(accessToken, - userName, - clientId, - clientIp); - } - - public void updatePermissions(boolean[] hasPermissions) { - this.hasPermissions = hasPermissions; - } - - public void updateKapuaConnectionId(DeviceConnection deviceConnection) { - kapuaConnectionId = deviceConnection != null ? deviceConnection.getId() : null; - } - - public void updateOldConnectionId(String oldConnectionId) { - this.oldConnectionId = oldConnectionId; - } - - private void updateFullClientId(String fullClientIdPattern) { - fullClientId = MessageFormat.format(fullClientIdPattern, scopeId.getId().longValue(), clientId); - } - - private void updateFullClientId(Long scopeId, String fullClientIdPattern) { - fullClientId = MessageFormat.format(fullClientIdPattern, scopeId, clientId); - } - - public String getFullClientId() { - return fullClientId; - } - - public String getAccountName() { - return accountName; - } - - public String getBrokerId() { - return brokerId; - } - - public Certificate[] getClientCertificates() { - return clientCertificates; - } - - public String getUserName() { - return userName; - } - - public KapuaId getScopeId() { - return scopeId; - } - - public long getScopeIdAsLong() { - return scopeId != null ? scopeId.getId().longValue() : 0; - } - - public String getClientId() { - return clientId; - } - - public String getClientIp() { - return clientIp; - } - - public KapuaPrincipal getPrincipal() { - return principal; - } - - public String getConnectionId() { - return connectionId; - } - - public String getOldConnectionId() { - return oldConnectionId; - } - - public ConnectorDescriptor getConnectorDescriptor() { - return connectorDescriptor; - } - - public KapuaId getKapuaConnectionId() { - return kapuaConnectionId; - } - - public KapuaId getUserId() { - return userId; - } - - public boolean[] getHasPermissions() { - return hasPermissions; - } - - public boolean isAdmin() { - return admin; - } - - public void setAdmin(boolean admin) { - this.admin = admin; - } - - public boolean isProvisioning() { - return provisioning; - } - - public void setProvisioning(boolean provisioning) { - this.provisioning = provisioning; - } - - public boolean isBrokerConnect() { - return hasPermissions[BROKER_CONNECT_IDX]; - } - - public boolean isDeviceView() { - return hasPermissions[DEVICE_VIEW_IDX]; - } - - public boolean isDeviceManage() { - return hasPermissions[DEVICE_MANAGE_IDX]; - } - - public boolean isDataView() { - return hasPermissions[DATA_VIEW_IDX]; - } - - public boolean isDataManage() { - return hasPermissions[DATA_MANAGE_IDX]; - } - - public String getBrokerIpOrHostName() { - return brokerIpOrHostName; - } - - public boolean isMissing() { - return missing; - } - - public void addAuthDestinationToLog(String message) { - if (logger.isDebugEnabled()) { - authDestinations.add(message); - } - } - - public void logAuthDestinationToLog() { - if (!authDestinations.isEmpty()) { - logger.debug("Authorization map:"); - for (String str : authDestinations) { - logger.debug(str); - } - } - } - -} diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java index e8df74a4918..1bd0031c5ff 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityBrokerFilter.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2020 Eurotech and/or its affiliates and others + * Copyright (c) 2011, 2020 Eurotech and/or its affiliates and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -115,10 +115,18 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { private static final String AUTHENTICATOR_CLASS_NAME; private static final String AUTHORIZER_CLASS_NAME; private static final Long STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME; + private static final int DEFAULT_PUBLISHED_MESSAGE_SIZE_LOG_THRESHOLD = 100000; + private static boolean stealingLinkEnabled; private Future stealingLinkManagerFuture; private static final String ERROR = "@@ error"; + + /** + * publish message size threshold for printing message information + */ + private static int publishInfoMessageSizeLimit; + static { BrokerSetting config = BrokerSetting.getInstance(); BROKER_IP_RESOLVER_CLASS_NAME = config.getString(BrokerSettingKey.BROKER_IP_RESOLVER_CLASS_NAME); @@ -127,6 +135,7 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { AUTHORIZER_CLASS_NAME = config.getString(BrokerSettingKey.AUTHORIZER_CLASS_NAME); STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME = config.getLong(BrokerSettingKey.STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME); stealingLinkEnabled = config.getBoolean(BrokerSettingKey.BROKER_STEALING_LINK_ENABLED); + publishInfoMessageSizeLimit = BrokerSetting.getInstance().getInt(BrokerSettingKey.PUBLISHED_MESSAGE_SIZE_LOG_THRESHOLD, DEFAULT_PUBLISHED_MESSAGE_SIZE_LOG_THRESHOLD); } protected BrokerIpResolver brokerIpResolver; @@ -245,11 +254,11 @@ protected void subscribeStealingLinkManager() throws JMSException, KapuaExceptio messageBrokerId = message.getStringProperty(MessageConstants.PROPERTY_BROKER_ID); if (!brokerId.equals(messageBrokerId)) { logger.debug("Received connect message from another broker id: '{}' topic: '{}' - message id: '{}'", messageBrokerId, destination, messageId); - KapuaConnectionContext kcc = getKapuaConnectionContext(message); - if (CONNECTION_MAP.get(kcc.getFullClientId()) != null) { + KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(message); + if (CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId()) != null) { logger.debug("Stealing link detected - broker id: '{}' topic: '{}' - message id: '{}'", messageBrokerId, destination, messageId); // iterate over all connected clients - disconnectClients(kcc); + disconnectClients(kapuaSecurityContext); } } } catch (Exception e) { @@ -258,21 +267,21 @@ protected void subscribeStealingLinkManager() throws JMSException, KapuaExceptio }); } - private void disconnectClients(KapuaConnectionContext kcc) throws Exception { + private void disconnectClients(KapuaSecurityContext kapuaSecurityContext) throws Exception { for (Connection conn : getClients()) { - logger.debug("Checking if {} equals {}", kcc.getFullClientId(), conn.getConnectionId()); - if (kcc.getFullClientId().equals(conn.getConnectionId())) { + logger.debug("Checking if {} equals {}", kapuaSecurityContext.getFullClientId(), conn.getConnectionId()); + if (kapuaSecurityContext.getFullClientId().equals(conn.getConnectionId())) { // Include Exception to notify the security broker filter - logger.info("New connection detected for {} on another broker. Stopping the current connection...", kcc.getFullClientId()); + logger.info("New connection detected for {} on another broker. Stopping the current connection...", kapuaSecurityContext.getFullClientId()); loginMetric.getRemoteStealingLinkDisconnect().inc(); - conn.serviceExceptionAsync(new IOException(new KapuaDuplicateClientIdException(kcc.getFullClientId()))); + conn.serviceExceptionAsync(new IOException(new KapuaDuplicateClientIdException(kapuaSecurityContext.getFullClientId()))); // assume only one connection since this broker should have already handled any duplicates return; } } } - private KapuaConnectionContext getKapuaConnectionContext(javax.jms.Message message) throws JMSException, KapuaException { + private KapuaSecurityContext getKapuaSecurityContext(javax.jms.Message message) throws JMSException, KapuaException { // try parsing from message context (if the message is coming from other brokers it has these fields evaluated) try { logger.debug("Get connected device informations from the message session"); @@ -284,17 +293,17 @@ private KapuaConnectionContext getKapuaConnectionContext(javax.jms.Message messa } } - private KapuaConnectionContext parseMessageSession(javax.jms.Message message) throws JMSException, KapuaException { + private KapuaSecurityContext parseMessageSession(javax.jms.Message message) throws JMSException, KapuaException { Long scopeId = message.propertyExists(MessageConstants.PROPERTY_SCOPE_ID) ? message.getLongProperty(MessageConstants.PROPERTY_SCOPE_ID) : null; String clientId = message.getStringProperty(MessageConstants.PROPERTY_CLIENT_ID); if (scopeId == null || scopeId <= 0 || StringUtils.isEmpty(clientId)) { logger.debug("Invalid message context. Try parsing the topic."); throw new KapuaException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "Invalid message context"); } - return new KapuaConnectionContext(scopeId, clientId, MULTI_ACCOUNT_CLIENT_ID); + return new KapuaSecurityContext(scopeId, clientId); } - private KapuaConnectionContext parseTopicInfo(javax.jms.Message message) throws JMSException, KapuaException { + private KapuaSecurityContext parseTopicInfo(javax.jms.Message message) throws JMSException, KapuaException { String originalTopic = message.getStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC); String[] topic = originalTopic.split("\\."); if (topic.length != 5) { @@ -305,7 +314,7 @@ private KapuaConnectionContext parseTopicInfo(javax.jms.Message message) throws String clientId = topic[2]; Account account = KapuaSecurityUtils.doPrivileged(() -> accountService.findByName(accountName)); Long scopeId = account.getId().getId().longValue(); - return new KapuaConnectionContext(scopeId, clientId, MULTI_ACCOUNT_CLIENT_ID); + return new KapuaSecurityContext(scopeId, clientId); } /** @@ -395,29 +404,32 @@ private void addExternalConnection(ConnectionContext context, ConnectionInfo inf // Clean-up credentials possibly associated with the current thread by previous connection. ThreadContext.unbindSubject(); Context loginTotalContext = loginMetric.getAddConnectionTime().time(); - KapuaConnectionContext kcc = new KapuaConnectionContext(brokerIdResolver.getBrokerId(this), info); + KapuaSecurityContext kapuaSecurityContext = null; try { - logger.info("User name {} - client id: {}, connection id: {}", kcc.getUserName(), kcc.getClientId(), kcc.getConnectionId()); + logger.info("User name {} - client id: {}, connection id: {}", info.getUserName(), info.getClientId(), info.getConnectionId()); Context loginShiroLoginTimeContext = loginMetric.getShiroLoginTime().time(); - LoginCredentials credentials = credentialsFactory.newUsernamePasswordCredentials(kcc.getUserName(), info.getPassword()); + LoginCredentials credentials = credentialsFactory.newUsernamePasswordCredentials(info.getUserName(), info.getPassword()); AccessToken accessToken = authenticationService.login(credentials); final Account account = getAccount(accessToken.getScopeId()); - kcc.update(accessToken, account.getName(), accessToken.getScopeId(), accessToken.getUserId(), (((TransportConnector) context.getConnector()).getName()), - brokerIpResolver.getBrokerIpOrHostName(), MULTI_ACCOUNT_CLIENT_ID); - kcc.updateOldConnectionId(CONNECTION_MAP.get(kcc.getFullClientId())); + KapuaPrincipal principal = new KapuaPrincipalImpl(accessToken, + info.getUserName(), + info.getClientId(), + info.getClientIp()); + kapuaSecurityContext = new KapuaSecurityContext(principal, brokerIdResolver.getBrokerId(this), brokerIpResolver.getBrokerIpOrHostName(), + account.getName(), info, (((TransportConnector) context.getConnector()).getName())); + kapuaSecurityContext.updateOldConnectionId(CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId())); loginShiroLoginTimeContext.stop(); - List authorizationEntries = authenticator.connect(kcc); - CONNECTION_MAP.put(kcc.getFullClientId(), info.getConnectionId().getValue()); - DefaultAuthorizationMap authMap = buildAuthorization(kcc, authorizationEntries); - context.setSecurityContext(new KapuaSecurityContext(kcc, authMap)); + CONNECTION_MAP.put(kapuaSecurityContext.getFullClientId(), info.getConnectionId().getValue()); + buildAuthorization(kapuaSecurityContext, authenticator.connect(kapuaSecurityContext)); + context.setSecurityContext(kapuaSecurityContext); // multiple account stealing link fix - info.setClientId(kcc.getFullClientId()); - context.setClientId(kcc.getFullClientId()); + info.setClientId(kapuaSecurityContext.getFullClientId()); + context.setClientId(kapuaSecurityContext.getFullClientId()); } catch (KapuaAuthenticationException e) { loginMetric.getFailure().inc(); @@ -426,7 +438,7 @@ private void addExternalConnection(ConnectionContext context, ConnectionInfo inf if (KapuaAuthenticationErrorCodes.UNKNOWN_LOGIN_CREDENTIAL.equals(errorCode) || KapuaAuthenticationErrorCodes.INVALID_LOGIN_CREDENTIALS.equals(errorCode) || KapuaAuthenticationErrorCodes.INVALID_CREDENTIALS_TYPE_PROVIDED.equals(errorCode)) { - logger.warn("Invalid username or password for user {} ({})", kcc.getUserName(), e.getMessage()); + logger.warn("Invalid username or password for user {} ({})", info.getUserName(), e.getMessage()); // activeMQ will map CredentialException into a CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD message (see javadoc on top of this method) CredentialException ce = new CredentialException("Invalid username and/or password or disabled or expired account!"); ce.setStackTrace(e.getStackTrace()); @@ -436,7 +448,7 @@ private void addExternalConnection(ConnectionContext context, ConnectionInfo inf } else if (KapuaAuthenticationErrorCodes.LOCKED_LOGIN_CREDENTIAL.equals(errorCode) || KapuaAuthenticationErrorCodes.DISABLED_LOGIN_CREDENTIAL.equals(errorCode) || KapuaAuthenticationErrorCodes.EXPIRED_LOGIN_CREDENTIALS.equals(errorCode)) { - logger.warn("User {} not authorized ({})", kcc.getUserName(), e.getMessage()); + logger.warn("User {} not authorized ({})", info.getUserName(), e.getMessage()); // activeMQ-MQ will map SecurityException into a CONNECTION_REFUSED_NOT_AUTHORIZED message (see javadoc on top of this method) SecurityException se = new SecurityException("User not authorized!"); se.setStackTrace(e.getStackTrace()); @@ -462,48 +474,30 @@ private void addExternalConnection(ConnectionContext context, ConnectionInfo inf } } - private Account getAccount(KapuaId scopeId) { - final Account account; - try { - account = KapuaSecurityUtils.doPrivileged(() -> accountService.find(scopeId)); - } catch (AuthenticationException e) { - // to preserve the original exception message (if possible) - throw e; - } catch (Exception e) { - throw new ShiroException("Error while find account!", e); - } - return account; - } - @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { if (!isPassThroughConnection(context)) { Context loginRemoveConnectionTimeContext = loginMetric.getRemoveConnectionTime().time(); - KapuaConnectionContext kcc = null; + KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(context); try { - KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(context); KapuaPrincipal kapuaPrincipal = ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()); - //get account name - final Account account = getAccount(kapuaPrincipal.getAccountId()); - kcc = new KapuaConnectionContext(brokerIdResolver.getBrokerId(this), brokerIpResolver.getBrokerIpOrHostName(), kapuaPrincipal, account.getName(), info, MULTI_ACCOUNT_CLIENT_ID, kapuaSecurityContext.isMissing()); - kcc.updateOldConnectionId(CONNECTION_MAP.get(kcc.getFullClientId())); + kapuaSecurityContext.updateOldConnectionId(CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId())); // TODO fix the kapua session when run as feature will be implemented KapuaSecurityUtils.setSession(new KapuaSession(kapuaPrincipal)); - authenticator.disconnect(kcc, error); + authenticator.disconnect(kapuaSecurityContext, error); // multiple account stealing link fix - info.setClientId(kcc.getFullClientId()); - // context may be null according to isPassThroughConnection(context) - context.setClientId(kcc.getFullClientId()); + info.setClientId(kapuaSecurityContext.getFullClientId()); + context.setClientId(kapuaSecurityContext.getFullClientId()); } finally { loginRemoveConnectionTimeContext.stop(); authenticationService.logout(); - if (kcc != null && kcc.getFullClientId() != null) { - if (info.getConnectionId().getValue().equals(CONNECTION_MAP.get(kcc.getFullClientId()))) { + if (kapuaSecurityContext != null && kapuaSecurityContext.getFullClientId() != null) { + if (info.getConnectionId().getValue().equals(CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId()))) { // cleanup stealing link detection map - CONNECTION_MAP.remove(kcc.getFullClientId()); + CONNECTION_MAP.remove(kapuaSecurityContext.getFullClientId()); } else { - logger.info("Cannot find client id in the connection map. May be it's due to a stealing link. ({})", kcc.getFullClientId()); + logger.info("Cannot find client id in the connection map. May be it's due to a stealing link. ({})", kapuaSecurityContext.getFullClientId()); } } else { logger.warn("Cannot find Kapua connection context or client id is null"); @@ -515,16 +509,17 @@ public void removeConnection(ConnectionContext context, ConnectionInfo info, Thr context.setSecurityContext(null); } - private Account getAccount(KapuaId accountId) { + private Account getAccount(KapuaId scopeId) { + final Account account; try { - return KapuaSecurityUtils.doPrivileged(() -> accountService.find(accountId)); - } - catch(AuthenticationException e) { - throw (AuthenticationException) e; - } - catch (Exception e) { + account = KapuaSecurityUtils.doPrivileged(() -> accountService.find(scopeId)); + } catch (AuthenticationException e) { + // to preserve the original exception message (if possible) + throw e; + } catch (Exception e) { throw new ShiroException("Error while find account!", e); } + return account; } // ------------------------------------------------------------------ @@ -562,15 +557,17 @@ private void internalSend(ProducerBrokerExchange producerExchange, Message messa ActiveMQTopic destinationTopic = (ActiveMQTopic) destination; originalTopic = destinationTopic.getTopicName().substring(VT_TOPIC_PREFIX.length()); } + int messageSize = messageSend.getSize(); messageSend.setProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().toEpochMilli()); if (!isBrokerContext(producerExchange.getConnectionContext())) { KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(producerExchange.getConnectionContext()); + KapuaPrincipal kapuaPrincipal = ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()); if (!messageSend.getDestination().isTemporary() && !authorizer.isAllowed(ActionType.WRITE, kapuaSecurityContext, messageSend.getDestination())) { String message = MessageFormat.format("User {0} ({1} - {2} - conn id {3}) is not authorized to write to: {4}", kapuaSecurityContext.getUserName(), - ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId(), - ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientIp(), - kapuaSecurityContext.getConnectionId(), + kapuaPrincipal.getClientId(), + kapuaPrincipal.getClientIp(), + kapuaSecurityContext.getKapuaConnectionId(), messageSend.getDestination()); logger.warn(message); publishMetric.getMessageSizeNotAllowed().update(messageSend.getSize()); @@ -581,17 +578,25 @@ private void internalSend(ProducerBrokerExchange producerExchange, Message messa } if (isLwt(originalTopic)) { //handle the missing message case - logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId()); + logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", kapuaPrincipal.getClientId()); kapuaSecurityContext.setMissing(); } // FIX #164 - messageSend.setProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(kapuaSecurityContext.getConnectionId()))); - messageSend.setProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId()); + messageSend.setProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(kapuaSecurityContext.getKapuaConnectionId()))); + messageSend.setProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, kapuaPrincipal.getClientId()); messageSend.setProperty(MessageConstants.HEADER_KAPUA_CONNECTOR_DEVICE_PROTOCOL, Base64.getEncoder().encodeToString(SerializationUtils.serialize(kapuaSecurityContext.getConnectorDescriptor()))); messageSend.setProperty(MessageConstants.HEADER_KAPUA_SESSION, Base64.getEncoder().encodeToString(SerializationUtils.serialize(kapuaSecurityContext.getKapuaSession()))); messageSend.setProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, false); + if (publishInfoMessageSizeLimit < messageSize) { + logger.info("Published message size over threshold. size: {} - destination: {} - account id: {} - username: {} - clientId: {}", + messageSize, messageSend.getDestination().getPhysicalName(), kapuaPrincipal.getAccountId(), kapuaPrincipal.getName(), kapuaPrincipal.getClientId()); + } } else { + if (publishInfoMessageSizeLimit < messageSize) { + logger.info("Published message size over threshold. size: {} - destination: {}", + messageSize, messageSend.getDestination().getPhysicalName()); + } messageSend.setProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, true); } publishMetric.getMessageSizeAllowed().update(messageSend.getSize()); @@ -649,7 +654,7 @@ private Subscription internalAddConsumer(ConnectionContext context, ConsumerInfo kapuaSecurityContext.getUserName(), ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientId(), ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()).getClientIp(), - kapuaSecurityContext.getConnectionId(), + kapuaSecurityContext.getKapuaConnectionId(), info.getDestination()); logger.warn(message); subscribeMetric.getNotAllowedMessages().inc(); @@ -668,34 +673,36 @@ private Subscription internalAddConsumer(ConnectionContext context, ConsumerInfo // // ------------------------------------------------------------------ - protected DefaultAuthorizationMap buildAuthorization(KapuaConnectionContext kcc, List authorizationEntries) { + protected void buildAuthorization(KapuaSecurityContext kapuaSecurityContext, List authorizationEntries) { @SuppressWarnings("rawtypes") List entries = new ArrayList<>(); for (org.eclipse.kapua.broker.core.plugin.authentication.AuthorizationEntry entry : authorizationEntries) { - entries.add(createAuthorizationEntry(kcc, entry.getAcl(), entry.getAddress())); + entries.add(createAuthorizationEntry(kapuaSecurityContext, entry.getAcl(), entry.getAddress())); // added to support the vt topic name space for durable subscriptions if (entry.getAcl().isRead()) { - entries.add(createAuthorizationEntry(kcc, entry.getAcl(), MessageFormat.format(VT_DURABLE_PREFIX.get(0), kcc.getFullClientId(), entry.getAddress()))); - entries.add(createAuthorizationEntry(kcc, entry.getAcl(), MessageFormat.format(VT_DURABLE_PREFIX.get(1), kcc.getFullClientId(), entry.getAddress()))); + entries.add(createAuthorizationEntry(kapuaSecurityContext, entry.getAcl(), MessageFormat.format(VT_DURABLE_PREFIX.get(0), kapuaSecurityContext.getFullClientId(), entry.getAddress()))); + // logger.info("pattern {} - clientid {} - topic {} - evaluated {}", new Object[]{JmsConstants.ACL_VT_DURABLE_PREFIX[1], clientId, topic, + // MessageFormat.format(JmsConstants.ACL_VT_DURABLE_PREFIX[1], fullClientId, topic)}); + entries.add(createAuthorizationEntry(kapuaSecurityContext, entry.getAcl(), MessageFormat.format(VT_DURABLE_PREFIX.get(1), kapuaSecurityContext.getFullClientId(), entry.getAddress()))); } } - return new DefaultAuthorizationMap(entries); + kapuaSecurityContext.setAuthorizationMap(new DefaultAuthorizationMap(entries)); } - protected AuthorizationEntry createAuthorizationEntry(KapuaConnectionContext kcc, Acl acl, String address) { + protected AuthorizationEntry createAuthorizationEntry(KapuaSecurityContext kapuaSecurityContext, Acl acl, String address) { AuthorizationEntry authorizationEntry = new AuthorizationEntry(); authorizationEntry.setDestination(ActiveMQDestination.createDestination(address, ActiveMQDestination.TOPIC_TYPE)); Set writeACLs = new HashSet<>(); Set readACLs = new HashSet<>(); Set adminACLs = new HashSet<>(); if (acl.isRead()) { - readACLs.add(kcc.getPrincipal()); + readACLs.add(kapuaSecurityContext.getPrincipal()); } if (acl.isWrite()) { - writeACLs.add(kcc.getPrincipal()); + writeACLs.add(kapuaSecurityContext.getPrincipal()); } if (acl.isAdmin()) { - adminACLs.add(kcc.getPrincipal()); + adminACLs.add(kapuaSecurityContext.getPrincipal()); } authorizationEntry.setWriteACLs(writeACLs); authorizationEntry.setReadACLs(readACLs); diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java index 17019b04916..575a12786fa 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2020 Eurotech and/or its affiliates and others + * Copyright (c) 2011, 2020 Eurotech and/or its affiliates and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -11,16 +11,26 @@ *******************************************************************************/ package org.eclipse.kapua.broker.core.plugin; +import java.math.BigInteger; import java.security.Principal; +import java.security.cert.Certificate; +import java.text.MessageFormat; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; +import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.security.AuthorizationMap; import org.apache.activemq.security.SecurityContext; import org.eclipse.kapua.KapuaException; +import org.eclipse.kapua.commons.model.id.KapuaEid; import org.eclipse.kapua.commons.security.KapuaSession; import org.eclipse.kapua.model.id.KapuaId; import org.eclipse.kapua.service.authentication.KapuaPrincipal; +import org.eclipse.kapua.service.device.registry.connection.DeviceConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Kapua security context implementation of ActiveMQ broker {@link SecurityContext} @@ -29,44 +39,87 @@ */ public class KapuaSecurityContext extends SecurityContext { + protected static final Logger logger = LoggerFactory.getLogger(KapuaSecurityContext.class); + + public static final int BROKER_CONNECT_IDX = 0; + public static final int DEVICE_MANAGE_IDX = 1; + public static final int DATA_VIEW_IDX = 2; + public static final int DATA_MANAGE_IDX = 3; + public static final int DEVICE_VIEW_IDX = 4; + private KapuaPrincipal principal; private KapuaSession kapuaSession; - private KapuaId connectionId; + private KapuaId kapuaConnectionId; + private String connectionId; private Set principals; private ConnectorDescriptor connectorDescriptor; - private String brokerConnectionId; - private AuthorizationMap authMap; + private AuthorizationMap authorizationMap; + + private String brokerId; + private KapuaId scopeId; + private KapuaId userId; + private String accountName; + private String clientId; + private String fullClientId; + private String clientIp; + private String oldConnectionId; + private boolean[] hasPermissions; + private String brokerIpOrHostName; + private Certificate[] clientCertificates; - private KapuaConnectionContext kcc; + private boolean admin; + private boolean provisioning; //flag to help the correct lifecycle handling private boolean missing; - public KapuaSecurityContext(KapuaConnectionContext kcc, - AuthorizationMap authMap) { - super(kcc.getPrincipal().getName()); + // use to track the allowed destinations for debug purpose + private List authDestinations; - this.kcc = kcc; - this.principal = kcc.getPrincipal(); - this.kapuaSession = KapuaSession.createFrom(); - principals = new HashSet(); - principals.add(principal); + public KapuaSecurityContext(Long scopeId, String clientId) { + super(null); + this.scopeId = new KapuaEid(BigInteger.valueOf(scopeId)); + this.clientId = clientId; + updateFullClientId(scopeId); + } - this.authMap = authMap; - this.connectionId = kcc.getKapuaConnectionId(); - this.connectorDescriptor = kcc.getConnectorDescriptor(); - this.brokerConnectionId = kcc.getConnectionId(); + public KapuaSecurityContext(KapuaPrincipal principal, String brokerId, String brokerIpOrHostName, String accountName, ConnectionInfo info, String connectorName) { + super(principal!=null ? principal.getName() : null); + this.principal = principal; + this.brokerId = brokerId; + this.brokerIpOrHostName = brokerIpOrHostName; + this.accountName = accountName; + principals = new HashSet(); + if (principal != null) { + userId = principal.getUserId(); + scopeId = principal.getAccountId(); + principals.add(principal); + } + authDestinations = new ArrayList<>(); + clientId = principal.getClientId(); + scopeId = principal.getAccountId(); + clientId = info.getClientId(); + clientIp = info.getClientIp(); + connectionId = info.getConnectionId().getValue(); + kapuaSession = KapuaSession.createFrom(); + if (connectorName == null) { + throw new IllegalStateException("Connector name is empty!"); + } + connectorDescriptor = ConnectorDescriptorProviders.getDescriptor(connectorName); + if (connectorDescriptor == null) { + throw new IllegalStateException(String.format("Unable to find connector descriptor for connector '%s'", connectorName)); + } + if(info.getTransportContext() instanceof Certificate[]) { + clientCertificates = (Certificate[]) info.getTransportContext(); + } + updateFullClientId(); } public KapuaPrincipal getKapuaPrincipal() throws KapuaException { return principal; } - public KapuaConnectionContext getKapuaConnectionContext() { - return kcc; - } - public Principal getMainPrincipal() { return principal; } @@ -75,16 +128,20 @@ public Set getPrincipals() { return principals; } + public void setAuthorizationMap(AuthorizationMap authorizationMap) { + this.authorizationMap = authorizationMap; + } + public AuthorizationMap getAuthorizationMap() { - return authMap; + return authorizationMap; } - public KapuaId getConnectionId() { - return connectionId; + public KapuaId getKapuaConnectionId() { + return kapuaConnectionId; } - public String getBrokerConnectionId() { - return brokerConnectionId; + public String getConnectionId() { + return connectionId; } public ConnectorDescriptor getConnectorDescriptor() { @@ -102,4 +159,127 @@ public void setMissing() { public boolean isMissing() { return missing; } + + public void updatePermissions(boolean[] hasPermissions) { + this.hasPermissions = hasPermissions; + } + + public void updateKapuaConnectionId(DeviceConnection deviceConnection) { + kapuaConnectionId = deviceConnection != null ? deviceConnection.getId() : null; + } + + public void updateOldConnectionId(String oldConnectionId) { + this.oldConnectionId = oldConnectionId; + } + + private void updateFullClientId() { + fullClientId = MessageFormat.format(KapuaSecurityBrokerFilter.MULTI_ACCOUNT_CLIENT_ID, scopeId.getId().longValue(), clientId); + } + + private void updateFullClientId(Long scopeId) { + fullClientId = MessageFormat.format(KapuaSecurityBrokerFilter.MULTI_ACCOUNT_CLIENT_ID, scopeId, clientId); + } + + public String getFullClientId() { + return fullClientId; + } + + public String getAccountName() { + return accountName; + } + + public String getBrokerId() { + return brokerId; + } + + public Certificate[] getClientCertificates() { + return clientCertificates; + } + + public KapuaId getScopeId() { + return scopeId; + } + + public long getScopeIdAsLong() { + return scopeId != null ? scopeId.getId().longValue() : 0; + } + + public String getClientId() { + return clientId; + } + + public String getClientIp() { + return clientIp; + } + + public KapuaPrincipal getPrincipal() { + return principal; + } + + public String getOldConnectionId() { + return oldConnectionId; + } + + public KapuaId getUserId() { + return userId; + } + + public boolean[] getHasPermissions() { + return hasPermissions; + } + + public boolean isAdmin() { + return admin; + } + + public void setAdmin(boolean admin) { + this.admin = admin; + } + + public boolean isProvisioning() { + return provisioning; + } + + public void setProvisioning(boolean provisioning) { + this.provisioning = provisioning; + } + + public boolean isBrokerConnect() { + return hasPermissions[BROKER_CONNECT_IDX]; + } + + public boolean isDeviceView() { + return hasPermissions[DEVICE_VIEW_IDX]; + } + + public boolean isDeviceManage() { + return hasPermissions[DEVICE_MANAGE_IDX]; + } + + public boolean isDataView() { + return hasPermissions[DATA_VIEW_IDX]; + } + + public boolean isDataManage() { + return hasPermissions[DATA_MANAGE_IDX]; + } + + public String getBrokerIpOrHostName() { + return brokerIpOrHostName; + } + + public void addAuthDestinationToLog(String message) { + if (logger.isDebugEnabled()) { + authDestinations.add(message); + } + } + + public void logAuthDestinationToLog() { + if (!authDestinations.isEmpty()) { + logger.debug("Authorization map:"); + for (String str : authDestinations) { + logger.debug(str); + } + } + } } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java index 34414aac69f..86e5a007e33 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AdminAuthenticationLogic.java @@ -17,7 +17,7 @@ import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.broker.core.plugin.Acl; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; /** * Admin profile authentication logic implementation @@ -36,26 +36,26 @@ public AdminAuthenticationLogic(Map options) { } @Override - public List connect(KapuaConnectionContext kcc) throws KapuaException { - kcc.setAdmin(true); - return buildAuthorizationMap(kcc); + public List connect(KapuaSecurityContext kapuaSecurityContext) throws KapuaException { + kapuaSecurityContext.setAdmin(true); + return buildAuthorizationMap(kapuaSecurityContext); } @Override - public boolean disconnect(KapuaConnectionContext kcc, Throwable error) { - boolean stealingLinkDetected = isStealingLink(kcc, error); - logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error); + public boolean disconnect(KapuaSecurityContext kapuaSecurityContext, Throwable error) { + boolean stealingLinkDetected = isStealingLink(kapuaSecurityContext, error); + logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kapuaSecurityContext.getOldConnectionId(), kapuaSecurityContext.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error); if (stealingLinkDetected) { loginMetric.getAdminStealingLinkDisconnect().inc(); } - return !stealingLinkDetected && !kcc.isMissing(); + return !stealingLinkDetected && !kapuaSecurityContext.isMissing(); } - protected List buildAuthorizationMap(KapuaConnectionContext kcc) { + protected List buildAuthorizationMap(KapuaSecurityContext kapuaSecurityContext) { ArrayList ael = new ArrayList(); - ael.add(createAuthorizationEntry(kcc, Acl.ALL, aclHash)); - ael.add(createAuthorizationEntry(kcc, Acl.WRITE_ADMIN, aclAdvisory)); - kcc.logAuthDestinationToLog(); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.ALL, aclHash)); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.WRITE_ADMIN, aclAdvisory)); + kapuaSecurityContext.logAuthDestinationToLog(); return ael; } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java index ab86630fdf5..ce29c8c7130 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/AuthenticationLogic.java @@ -14,7 +14,7 @@ import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.broker.BrokerDomains; import org.eclipse.kapua.broker.core.plugin.Acl; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; import org.eclipse.kapua.broker.core.plugin.KapuaDuplicateClientIdException; import org.eclipse.kapua.broker.core.plugin.metric.ClientMetric; import org.eclipse.kapua.broker.core.plugin.metric.LoginMetric; @@ -89,62 +89,62 @@ protected AuthenticationLogic(String addressPrefix, String addressClassifier, St /** * Execute the connect logic returning the authorization list (ACL) * - * @param kcc + * @param kapuaSecurityContext * @return * @throws KapuaException */ - public abstract List connect(KapuaConnectionContext kcc) + public abstract List connect(KapuaSecurityContext kapuaSecurityContext) throws KapuaException; /** * Execute the disconnection logic * - * @param kcc + * @param kapuaSecurityContext * @param error * @return true send disconnect message (if the disconnection is a clean disconnection) * false don't send disconnect message (the disconnection is caused by a stealing link or the device is currently connected to another node) */ - public abstract boolean disconnect(KapuaConnectionContext kcc, Throwable error); + public abstract boolean disconnect(KapuaSecurityContext kapuaSecurityContext, Throwable error); /** - * @param kcc + * @param kapuaSecurityContext * @return */ - protected abstract List buildAuthorizationMap(KapuaConnectionContext kcc); + protected abstract List buildAuthorizationMap(KapuaSecurityContext kapuaSecurityContext); /** * Format the ACL resource based on the pattern and the account name looking into the connection context property * * @param pattern - * @param kcc + * @param kapuaSecurityContext * @return */ - protected String formatAcl(String pattern, KapuaConnectionContext kcc) { - return MessageFormat.format(pattern, kcc.getAccountName()); + protected String formatAcl(String pattern, KapuaSecurityContext kapuaSecurityContext) { + return MessageFormat.format(pattern, kapuaSecurityContext.getAccountName()); } /** * Format the ACL resource based on the pattern and the account name and client id looking into the connection context property * * @param pattern - * @param kcc + * @param kapuaSecurityContext * @return */ - protected String formatAclFull(String pattern, KapuaConnectionContext kcc) { - return MessageFormat.format(pattern, kcc.getAccountName(), kcc.getClientId()); + protected String formatAclFull(String pattern, KapuaSecurityContext kapuaSecurityContext) { + return MessageFormat.format(pattern, kapuaSecurityContext.getAccountName(), kapuaSecurityContext.getClientId()); } /** * Create the authorization entry base on the ACL and the resource address * - * @param kcc + * @param kapuaSecurityContext * @param acl * @param address * @return */ - protected AuthorizationEntry createAuthorizationEntry(KapuaConnectionContext kcc, Acl acl, String address) { + protected AuthorizationEntry createAuthorizationEntry(KapuaSecurityContext kapuaSecurityContext, Acl acl, String address) { AuthorizationEntry entry = new AuthorizationEntry(address, acl); - kcc.addAuthDestinationToLog(MessageFormat.format(PERMISSION_LOG, + kapuaSecurityContext.addAuthDestinationToLog(MessageFormat.format(PERMISSION_LOG, acl.isRead() ? "r" : "_", acl.isWrite() ? "w" : "_", acl.isAdmin() ? "a" : "_", @@ -260,25 +260,25 @@ protected ConnectionUserCouplingMode loadConnectionUserCouplingModeFromConfig(Ma } } - protected boolean isStealingLink(KapuaConnectionContext kcc, Throwable error) { + protected boolean isStealingLink(KapuaSecurityContext kapuaSecurityContext, Throwable error) { boolean stealingLinkDetected = false; - if (kcc.getOldConnectionId() != null) { - stealingLinkDetected = !kcc.getOldConnectionId().equals(kcc.getConnectionId()); + if (kapuaSecurityContext.getOldConnectionId() != null) { + stealingLinkDetected = !kapuaSecurityContext.getOldConnectionId().equals(kapuaSecurityContext.getConnectionId()); } else { logger.error("Cannot find connection id for client id {} on connection map. Correct connection id is {} - IP: {}", - kcc.getClientId(), - kcc.getConnectionId(), - kcc.getClientIp()); + kapuaSecurityContext.getClientId(), + kapuaSecurityContext.getConnectionId(), + kapuaSecurityContext.getClientIp()); } if (!stealingLinkDetected && (error instanceof KapuaDuplicateClientIdException || (error!=null && error.getCause() instanceof KapuaDuplicateClientIdException))) { stealingLinkDetected = true; logger.warn("Detected Stealing link for cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {} - No disconnection info will be added!", - kcc.getClientId(), - kcc.getScopeId(), - kcc.getOldConnectionId(), - kcc.getConnectionId(), - kcc.getClientIp()); + kapuaSecurityContext.getClientId(), + kapuaSecurityContext.getScopeId(), + kapuaSecurityContext.getOldConnectionId(), + kapuaSecurityContext.getConnectionId(), + kapuaSecurityContext.getClientIp()); } return stealingLinkDetected; } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/Authenticator.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/Authenticator.java index b0d275a3d24..4b36cd1a77a 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/Authenticator.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/Authenticator.java @@ -14,7 +14,7 @@ import java.util.List; import org.eclipse.kapua.KapuaException; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; /** * Authenticator api @@ -33,35 +33,35 @@ public interface Authenticator { /** * Execute the connect logic returning the authorization list (ACL) * - * @param kcc + * @param kapuaSecurityContext * @return * @throws KapuaException * if any checks fails (credential not valid, profile missing, ...) */ - public abstract List connect(KapuaConnectionContext kcc) + public abstract List connect(KapuaSecurityContext kapuaSecurityContext) throws KapuaException; /** * Execute the disconnect logic * - * @param kcc + * @param kapuaSecurityContext * @param error * not null if the disconnection is due to an error not related to the client (network I/O error, server side error, ...) */ - public abstract void disconnect(KapuaConnectionContext kcc, Throwable error); + public abstract void disconnect(KapuaSecurityContext kapuaSecurityContext, Throwable error); /** * Send the connect message (this message is mainly for internal use to enforce the stealing link) * - * @param kcc + * @param kapuaSecurityContext */ - public abstract void sendConnectMessage(KapuaConnectionContext kcc); + public abstract void sendConnectMessage(KapuaSecurityContext kapuaSecurityContext); /** * Send the disconnect message (this message is mainly for internal use) * - * @param kcc + * @param kapuaSecurityContext */ - public abstract void sendDisconnectMessage(KapuaConnectionContext kcc); + public abstract void sendDisconnectMessage(KapuaSecurityContext kapuaSecurityContext); } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java index 1251a8bef15..2023bb8ab09 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java @@ -17,7 +17,7 @@ import org.eclipse.kapua.broker.core.message.system.DefaultSystemMessageCreator; import org.eclipse.kapua.broker.core.message.system.SystemMessageCreator; import org.eclipse.kapua.broker.core.message.system.SystemMessageCreator.SystemMessageType; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; import org.eclipse.kapua.broker.core.plugin.metric.ClientMetric; import org.eclipse.kapua.broker.core.plugin.metric.LoginMetric; import org.eclipse.kapua.broker.core.pool.JmsAssistantProducerPool; @@ -73,56 +73,56 @@ public DefaultAuthenticator(Map options) throws KapuaException { } @Override - public List connect(KapuaConnectionContext kcc) + public List connect(KapuaSecurityContext kapuaSecurityContext) throws KapuaException { List authorizationEntries = null; - if (isAdminUser(kcc)) { + if (isAdminUser(kapuaSecurityContext)) { loginMetric.getKapuasysTokenAttempt().inc(); - authorizationEntries = adminAuthenticationLogic.connect(kcc); + authorizationEntries = adminAuthenticationLogic.connect(kapuaSecurityContext); clientMetric.getConnectedKapuasys().inc(); } else { loginMetric.getNormalUserAttempt().inc(); - authorizationEntries = userAuthenticationLogic.connect(kcc); + authorizationEntries = userAuthenticationLogic.connect(kapuaSecurityContext); clientMetric.getConnectedClient().inc(); - sendConnectMessage(kcc); + sendConnectMessage(kapuaSecurityContext); } return authorizationEntries; } @Override - public void disconnect(KapuaConnectionContext kcc, Throwable error) { - if (isAdminUser(kcc)) { + public void disconnect(KapuaSecurityContext kapuaSecurityContext, Throwable error) { + if (isAdminUser(kapuaSecurityContext)) { clientMetric.getDisconnectionKapuasys().inc(); - adminAuthenticationLogic.disconnect(kcc, error); + adminAuthenticationLogic.disconnect(kapuaSecurityContext, error); } else { clientMetric.getDisconnectionClient().inc(); - if (userAuthenticationLogic.disconnect(kcc, error)) { - sendDisconnectMessage(kcc); + if (userAuthenticationLogic.disconnect(kapuaSecurityContext, error)) { + sendDisconnectMessage(kapuaSecurityContext); } } } @Override - public void sendConnectMessage(KapuaConnectionContext kcc) { - sendMessage(kcc, Authenticator.ADDRESS_CONNECT_PATTERN_KEY, SystemMessageType.CONNECT); + public void sendConnectMessage(KapuaSecurityContext kapuaSecurityContext) { + sendMessage(kapuaSecurityContext, Authenticator.ADDRESS_CONNECT_PATTERN_KEY, SystemMessageType.CONNECT); } @Override - public void sendDisconnectMessage(KapuaConnectionContext kcc) { - sendMessage(kcc, Authenticator.ADDRESS_DISCONNECT_PATTERN_KEY, SystemMessageType.DISCONNECT); + public void sendDisconnectMessage(KapuaSecurityContext kapuaSecurityContext) { + sendMessage(kapuaSecurityContext, Authenticator.ADDRESS_DISCONNECT_PATTERN_KEY, SystemMessageType.DISCONNECT); } - private void sendMessage(KapuaConnectionContext kcc, String messageAddressPattern, SystemMessageType systemMessageType) { + private void sendMessage(KapuaSecurityContext kapuaSecurityContext, String messageAddressPattern, SystemMessageType systemMessageType) { if (systemMessageType != null) { Context loginSendLogingUpdateMsgTimeContex = loginMetric.getSendLoginUpdateMsgTime().time(); - String message = systemMessageCreator.createMessage(systemMessageType, kcc); + String message = systemMessageCreator.createMessage(systemMessageType, kapuaSecurityContext); JmsAssistantProducerWrapper producerWrapper = null; try { producerWrapper = JmsAssistantProducerPool.getIOnstance(DESTINATIONS.NO_DESTINATION).borrowObject(); producerWrapper.send(String.format((String) options.get(messageAddressPattern), - SystemSetting.getInstance().getMessageClassifier(), kcc.getAccountName(), kcc.getClientId()), + SystemSetting.getInstance().getMessageClassifier(), kapuaSecurityContext.getAccountName(), kapuaSecurityContext.getClientId()), message, - kcc); + kapuaSecurityContext); } catch (Exception e) { logger.error("Exception sending the {} message: {}", systemMessageType.name().toLowerCase(), e.getMessage(), e); } finally { @@ -137,9 +137,9 @@ private void sendMessage(KapuaConnectionContext kcc, String messageAddressPatter } } - protected boolean isAdminUser(KapuaConnectionContext kcc) { + protected boolean isAdminUser(KapuaSecurityContext kapuaSecurityContext) { String adminUsername = SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_USERNAME); - return kcc.getUserName().equals(adminUsername); + return kapuaSecurityContext.getUserName().equals(adminUsername); } } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java index ac6130f636e..141ca01faff 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/UserAuthenticationLogic.java @@ -16,7 +16,7 @@ import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.KapuaIllegalAccessException; import org.eclipse.kapua.broker.core.plugin.Acl; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; import org.eclipse.kapua.commons.security.KapuaSecurityUtils; import org.eclipse.kapua.model.domain.Actions; import org.eclipse.kapua.service.authorization.permission.Permission; @@ -64,66 +64,66 @@ public UserAuthenticationLogic(Map options) { } @Override - public List connect(KapuaConnectionContext kcc) throws KapuaException { + public List connect(KapuaSecurityContext kapuaSecurityContext) throws KapuaException { Context loginNormalUserTimeContext = loginMetric.getNormalUserTime().time(); Context loginCheckAccessTimeContext = loginMetric.getCheckAccessTime().time(); - updatePermissions(kcc); + updatePermissions(kapuaSecurityContext); loginCheckAccessTimeContext.stop(); Context loginFindDeviceConnectionTimeContext = loginMetric.getFindDeviceConnectionTime().time(); - DeviceConnection deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kcc.getScopeId(), kcc.getClientId())); + DeviceConnection deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kapuaSecurityContext.getScopeId(), kapuaSecurityContext.getClientId())); loginFindDeviceConnectionTimeContext.stop(); // enforce the user-device bound - enforceDeviceConnectionUserBound(KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.getConfigValues(kcc.getScopeId())), deviceConnection, kcc.getScopeId(), kcc.getUserId()); + enforceDeviceConnectionUserBound(KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.getConfigValues(kapuaSecurityContext.getScopeId())), deviceConnection, kapuaSecurityContext.getScopeId(), kapuaSecurityContext.getUserId()); Context loginUpdateDeviceConnectionTimeContext = loginMetric.getUpdateDeviceConnectionTime().time(); { - deviceConnection = upsertDeviceConnection(kcc, deviceConnection); - kcc.updateKapuaConnectionId(deviceConnection); + deviceConnection = upsertDeviceConnection(kapuaSecurityContext, deviceConnection); + kapuaSecurityContext.updateKapuaConnectionId(deviceConnection); } loginUpdateDeviceConnectionTimeContext.stop(); - List authorizationEntries = buildAuthorizationMap(kcc); + List authorizationEntries = buildAuthorizationMap(kapuaSecurityContext); loginNormalUserTimeContext.stop(); return authorizationEntries; } @Override - public boolean disconnect(KapuaConnectionContext kcc, Throwable error) { - boolean stealingLinkDetected = isStealingLink(kcc, error); + public boolean disconnect(KapuaSecurityContext kapuaSecurityContext, Throwable error) { + boolean stealingLinkDetected = isStealingLink(kapuaSecurityContext, error); boolean deviceOwnedByTheCurrentNode = true; - logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kcc.getOldConnectionId(), kcc.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error); + logger.debug("Old connection id: {} - new connection id: {} - error: {} - error cause: {}", kapuaSecurityContext.getOldConnectionId(), kapuaSecurityContext.getConnectionId(), error, (error!=null ? error.getCause() : "null"), error); if (stealingLinkDetected) { loginMetric.getStealingLinkDisconnect().inc(); logger.debug("Skip device connection status update since is coming from a stealing link condition. Client id: {} - Connection id: {}", - kcc.getClientId(), - kcc.getConnectionId()); + kapuaSecurityContext.getClientId(), + kapuaSecurityContext.getConnectionId()); } else { // update device connection (if the disconnection wasn't caused by a stealing link) DeviceConnection deviceConnection; try { - deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kcc.getScopeId(), kcc.getClientId())); + deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.findByClientId(kapuaSecurityContext.getScopeId(), kapuaSecurityContext.getClientId())); } catch (Exception e) { throw new ShiroException("Error while looking for device connection on updating the device status!", e); } // the device connection must be not null if (deviceConnection != null) { - if (kcc.getBrokerIpOrHostName() == null) { + if (kapuaSecurityContext.getBrokerIpOrHostName() == null) { logger.warn("Broker Ip or host name is not correctly set! Please check the configuration!"); } - else if (!kcc.getBrokerIpOrHostName().equals(deviceConnection.getServerIp())) { + else if (!kapuaSecurityContext.getBrokerIpOrHostName().equals(deviceConnection.getServerIp())) { //the device is connected to a different node so skip to update the status! deviceOwnedByTheCurrentNode = false; logger.warn("Detected disconnection for client connected to another node: cliend id {} - account id {} - last connection id was {} - current connection id is {} - IP: {} - No disconnection info will be added!", - kcc.getClientId(), - kcc.getScopeId(), - kcc.getOldConnectionId(), - kcc.getConnectionId(), - kcc.getClientIp()); + kapuaSecurityContext.getClientId(), + kapuaSecurityContext.getScopeId(), + kapuaSecurityContext.getOldConnectionId(), + kapuaSecurityContext.getConnectionId(), + kapuaSecurityContext.getClientIp()); } if(deviceOwnedByTheCurrentNode) { //update status only if the old status wasn't missing @@ -131,7 +131,7 @@ else if (!kcc.getBrokerIpOrHostName().equals(deviceConnection.getServerIp())) { logger.warn("Skipping device status update for device {} since last status was MISSING!", deviceConnection.getClientId()); } else { - deviceConnection.setStatus(error == null && !kcc.isMissing() ? DeviceConnectionStatus.DISCONNECTED : DeviceConnectionStatus.MISSING); + deviceConnection.setStatus(error == null && !kapuaSecurityContext.isMissing() ? DeviceConnectionStatus.DISCONNECTED : DeviceConnectionStatus.MISSING); try { KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.update(deviceConnection)); } catch (Exception e) { @@ -141,81 +141,81 @@ else if (!kcc.getBrokerIpOrHostName().equals(deviceConnection.getServerIp())) { } } } - return !stealingLinkDetected && deviceOwnedByTheCurrentNode && !kcc.isMissing(); + return !stealingLinkDetected && deviceOwnedByTheCurrentNode && !kapuaSecurityContext.isMissing(); } @Override - protected List buildAuthorizationMap(KapuaConnectionContext kcc) { + protected List buildAuthorizationMap(KapuaSecurityContext kapuaSecurityContext) { ArrayList ael = new ArrayList<>(); - ael.add(createAuthorizationEntry(kcc, Acl.WRITE_ADMIN, aclAdvisory)); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.WRITE_ADMIN, aclAdvisory)); // addConnection checks BROKER_CONNECT_IDX permission before call this method // then here user has BROKER_CONNECT_IDX permission and if check isn't needed // if (hasPermissions[BROKER_CONNECT_IDX]) { - if (kcc.isDeviceManage()) { - ael.add(createAuthorizationEntry(kcc, Acl.ALL, formatAcl(aclCtrlAcc, kcc))); + if (kapuaSecurityContext.isDeviceManage()) { + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.ALL, formatAcl(aclCtrlAcc, kapuaSecurityContext))); } else { - ael.add(createAuthorizationEntry(kcc, Acl.ALL, formatAclFull(aclCtrlAccCli, kcc))); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.ALL, formatAclFull(aclCtrlAccCli, kapuaSecurityContext))); } - if (kcc.isDataManage()) { - ael.add(createAuthorizationEntry(kcc, Acl.ALL, formatAcl(aclDataAcc, kcc))); - } else if (kcc.isDataView()) { - ael.add(createAuthorizationEntry(kcc, Acl.READ_ADMIN, formatAcl(aclDataAcc, kcc))); - ael.add(createAuthorizationEntry(kcc, Acl.WRITE, formatAclFull(aclDataAccCli, kcc))); + if (kapuaSecurityContext.isDataManage()) { + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.ALL, formatAcl(aclDataAcc, kapuaSecurityContext))); + } else if (kapuaSecurityContext.isDataView()) { + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.READ_ADMIN, formatAcl(aclDataAcc, kapuaSecurityContext))); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.WRITE, formatAclFull(aclDataAccCli, kapuaSecurityContext))); } else { - ael.add(createAuthorizationEntry(kcc, Acl.ALL, formatAclFull(aclDataAccCli, kcc))); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.ALL, formatAclFull(aclDataAccCli, kapuaSecurityContext))); } - ael.add(createAuthorizationEntry(kcc, Acl.WRITE_ADMIN, formatAcl(aclCtrlAccReply, kcc))); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.WRITE_ADMIN, formatAcl(aclCtrlAccReply, kapuaSecurityContext))); // Write notify to any client Id and any application and operation - ael.add(createAuthorizationEntry(kcc, Acl.WRITE, formatAclFull(aclCtrlAccNotify, kcc))); + ael.add(createAuthorizationEntry(kapuaSecurityContext, Acl.WRITE, formatAclFull(aclCtrlAccNotify, kapuaSecurityContext))); - kcc.logAuthDestinationToLog(); + kapuaSecurityContext.logAuthDestinationToLog(); return ael; } - protected void updatePermissions(KapuaConnectionContext kcc) throws KapuaException { + protected void updatePermissions(KapuaSecurityContext kapuaSecurityContext) throws KapuaException { List permissions = new ArrayList<>(); - permissions.add(permissionFactory.newPermission(BROKER_DOMAIN, Actions.connect, kcc.getScopeId())); - permissions.add(permissionFactory.newPermission(DEVICE_MANAGEMENT_DOMAIN, Actions.write, kcc.getScopeId())); - permissions.add(permissionFactory.newPermission(DATASTORE_DOMAIN, Actions.read, kcc.getScopeId())); - permissions.add(permissionFactory.newPermission(DATASTORE_DOMAIN, Actions.write, kcc.getScopeId())); - permissions.add(permissionFactory.newPermission(DEVICE_MANAGEMENT_DOMAIN, Actions.read, kcc.getScopeId())); - kcc.updatePermissions(authorizationService.isPermitted(permissions)); - - if (!kcc.isBrokerConnect()) { - throw new KapuaIllegalAccessException(permissionFactory.newPermission(BROKER_DOMAIN, Actions.connect, kcc.getScopeId()).toString()); + permissions.add(permissionFactory.newPermission(BROKER_DOMAIN, Actions.connect, kapuaSecurityContext.getScopeId())); + permissions.add(permissionFactory.newPermission(DEVICE_MANAGEMENT_DOMAIN, Actions.write, kapuaSecurityContext.getScopeId())); + permissions.add(permissionFactory.newPermission(DATASTORE_DOMAIN, Actions.read, kapuaSecurityContext.getScopeId())); + permissions.add(permissionFactory.newPermission(DATASTORE_DOMAIN, Actions.write, kapuaSecurityContext.getScopeId())); + permissions.add(permissionFactory.newPermission(DEVICE_MANAGEMENT_DOMAIN, Actions.read, kapuaSecurityContext.getScopeId())); + kapuaSecurityContext.updatePermissions(authorizationService.isPermitted(permissions)); + + if (!kapuaSecurityContext.isBrokerConnect()) { + throw new KapuaIllegalAccessException(permissionFactory.newPermission(BROKER_DOMAIN, Actions.connect, kapuaSecurityContext.getScopeId()).toString()); } } /** - * Create a new {@link DeviceConnection} or updates the existing one using the info provided in the {@link KapuaConnectionContext}. + * Create a new {@link DeviceConnection} or updates the existing one using the info provided in the {@link KapuaSecurityContext}. * - * @param kcc The {@link KapuaConnectionContext} of the currect connection + * @param kapuaSecurityContext The {@link KapuaSecurityContext} of the currect connection * @param deviceConnection The {@link DeviceConnection} to update, or null if it needs to be created * @return The created/updated {@link DeviceConnection} * @throws KapuaException */ - protected DeviceConnection upsertDeviceConnection(KapuaConnectionContext kcc, DeviceConnection deviceConnection) throws KapuaException { + protected DeviceConnection upsertDeviceConnection(KapuaSecurityContext kapuaSecurityContext, DeviceConnection deviceConnection) throws KapuaException { if (deviceConnection == null) { - DeviceConnectionCreator deviceConnectionCreator = deviceConnectionFactory.newCreator(kcc.getScopeId()); + DeviceConnectionCreator deviceConnectionCreator = deviceConnectionFactory.newCreator(kapuaSecurityContext.getScopeId()); deviceConnectionCreator.setStatus(DeviceConnectionStatus.CONNECTED); - deviceConnectionCreator.setClientId(kcc.getClientId()); - deviceConnectionCreator.setClientIp(kcc.getClientIp()); - deviceConnectionCreator.setProtocol(kcc.getConnectorDescriptor().getTransportProtocol()); - deviceConnectionCreator.setServerIp(kcc.getBrokerIpOrHostName()); - deviceConnectionCreator.setUserId(kcc.getUserId()); + deviceConnectionCreator.setClientId(kapuaSecurityContext.getClientId()); + deviceConnectionCreator.setClientIp(kapuaSecurityContext.getClientIp()); + deviceConnectionCreator.setProtocol(kapuaSecurityContext.getConnectorDescriptor().getTransportProtocol()); + deviceConnectionCreator.setServerIp(kapuaSecurityContext.getBrokerIpOrHostName()); + deviceConnectionCreator.setUserId(kapuaSecurityContext.getUserId()); deviceConnectionCreator.setUserCouplingMode(ConnectionUserCouplingMode.INHERITED); deviceConnectionCreator.setAllowUserChange(false); deviceConnection = KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.create(deviceConnectionCreator)); } else { deviceConnection.setStatus(DeviceConnectionStatus.CONNECTED); - deviceConnection.setClientIp(kcc.getClientIp()); - deviceConnection.setProtocol(kcc.getConnectorDescriptor().getTransportProtocol()); - deviceConnection.setServerIp(kcc.getBrokerIpOrHostName()); - deviceConnection.setUserId(kcc.getUserId()); + deviceConnection.setClientIp(kapuaSecurityContext.getClientIp()); + deviceConnection.setProtocol(kapuaSecurityContext.getConnectorDescriptor().getTransportProtocol()); + deviceConnection.setServerIp(kapuaSecurityContext.getBrokerIpOrHostName()); + deviceConnection.setUserId(kapuaSecurityContext.getUserId()); deviceConnection.setAllowUserChange(false); DeviceConnection deviceConnectionToUpdate = deviceConnection; KapuaSecurityUtils.doPrivileged(() -> deviceConnectionService.update(deviceConnectionToUpdate)); @@ -225,17 +225,17 @@ protected DeviceConnection upsertDeviceConnection(KapuaConnectionContext kcc, De // } // TODO manage the stealing link event (may be a good idea to use different connect status (connect -stealing)? - String previousConnectionId = kcc.getOldConnectionId(); + String previousConnectionId = kapuaSecurityContext.getOldConnectionId(); if (previousConnectionId != null) { loginMetric.getStealingLinkConnect().inc(); // stealing link detected, skip info logger.warn("Detected Stealing link for cliend id {} - account - last connection id was {} - current connection id is {} - IP: {} - No connection status changes!", - kcc.getClientId(), - kcc.getAccountName(), + kapuaSecurityContext.getClientId(), + kapuaSecurityContext.getAccountName(), previousConnectionId, - kcc.getConnectionId(), - kcc.getClientIp()); + kapuaSecurityContext.getConnectionId(), + kapuaSecurityContext.getClientIp()); } } return deviceConnection; diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/pool/JmsAssistantProducerWrapper.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/pool/JmsAssistantProducerWrapper.java index d38153708de..55bec907f9c 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/pool/JmsAssistantProducerWrapper.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/pool/JmsAssistantProducerWrapper.java @@ -19,7 +19,7 @@ import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.broker.core.message.JmsUtil; import org.eclipse.kapua.broker.core.message.MessageConstants; -import org.eclipse.kapua.broker.core.plugin.KapuaConnectionContext; +import org.eclipse.kapua.broker.core.plugin.KapuaSecurityContext; /** * Broker ({@link JmsProducerWrapper}) implementation.
@@ -40,13 +40,13 @@ public JmsAssistantProducerWrapper(ActiveMQConnectionFactory vmconnFactory, Stri * @param message * @throws JMSException */ - public void send(String topic, String message, KapuaConnectionContext kcc) throws JMSException { + public void send(String topic, String message, KapuaSecurityContext kapuaSecurityContext) throws JMSException { TextMessage textMessage = session.createTextMessage(); Topic jmsTopic = session.createTopic(topic); - textMessage.setStringProperty(MessageConstants.PROPERTY_BROKER_ID, kcc.getBrokerId()); - textMessage.setStringProperty(MessageConstants.PROPERTY_CLIENT_ID, kcc.getClientId()); - textMessage.setLongProperty(MessageConstants.PROPERTY_SCOPE_ID, kcc.getScopeIdAsLong()); + textMessage.setStringProperty(MessageConstants.PROPERTY_BROKER_ID, kapuaSecurityContext.getBrokerId()); + textMessage.setStringProperty(MessageConstants.PROPERTY_CLIENT_ID, kapuaSecurityContext.getClientId()); + textMessage.setLongProperty(MessageConstants.PROPERTY_SCOPE_ID, kapuaSecurityContext.getScopeIdAsLong()); textMessage.setStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC, JmsUtil.convertMqttWildCardToJms(topic)); textMessage.setLongProperty(MessageConstants.PROPERTY_ENQUEUED_TIMESTAMP, System.currentTimeMillis()); textMessage.setText(message); diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java index 0f64094c5c5..8d148b3770b 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/setting/BrokerSettingKey.java @@ -79,6 +79,10 @@ public enum BrokerSettingKey implements SettingKey { * Broker name (used also for the vm connector name) */ BROKER_NAME("broker.name"), + /** + * Publish message info if the message size is over the specified threshold + */ + PUBLISHED_MESSAGE_SIZE_LOG_THRESHOLD("broker.security.published.message_size.log_threshold"), /** * Camel default route configuration file name. (please specify just the name. The file path will be discovered by the class loader) * Used by the {@link CamelKapuaDefaultRouter} to load the routing configuration. diff --git a/broker/core/src/main/resources/kapua-broker-setting.properties b/broker/core/src/main/resources/kapua-broker-setting.properties index fca82e243ca..4b504cf26e8 100644 --- a/broker/core/src/main/resources/kapua-broker-setting.properties +++ b/broker/core/src/main/resources/kapua-broker-setting.properties @@ -27,6 +27,7 @@ broker.client_pool.no_dest_max_size=10 #No destination broker internal use client pool - minimum size broker.client_pool.no_dest_min_size=5 broker.name=message-broker +broker.security.published.message_size.log_threshold=100000 #Camel default route configuration file name. #NOTE: if included in the classpath please specify just the name since the file path will be discovered by the class loader. # otherwise please provide a valid url. diff --git a/qa/integration/src/test/resources/kapua-broker-setting.properties b/qa/integration/src/test/resources/kapua-broker-setting.properties index 18c64c29ef5..598380ffc6d 100644 --- a/qa/integration/src/test/resources/kapua-broker-setting.properties +++ b/qa/integration/src/test/resources/kapua-broker-setting.properties @@ -20,6 +20,7 @@ broker.client_pool.no_dest_max_size=10 #No destination broker internal use client pool - minimum size broker.client_pool.no_dest_min_size=5 broker.name=message-broker +broker.security.published.message_size.log_threshold=100000 #Camel default route configuration file name. #NOTE: if included in the classpath please specify just the name since the file path will be discovered by the class loader. # otherwise please provide a valid url. From 23317beb6026ee9e845ba2878292b1b59e245324 Mon Sep 17 00:00:00 2001 From: riccardomodanese Date: Tue, 19 May 2020 10:44:41 +0200 Subject: [PATCH 3/3] add MQTT connector for internal use - change broker.port to broker.connector.internal.port on properties files Signed-off-by: riccardomodanese --- assembly/broker/configurations/activemq.xml | 2 + .../broker/configurations}/shiro.ini | 0 assembly/broker/docker/Dockerfile | 2 +- .../plugin/KapuaSecurityBrokerFilter.java | 112 ++++++++++++------ .../core/plugin/KapuaSecurityContext.java | 2 +- .../authentication/DefaultAuthenticator.java | 15 ++- .../core/plugin/metric/ClientMetric.java | 4 +- .../core/plugin/metric/LoginMetric.java | 20 +++- .../core/plugin/metric/SecurityMetrics.java | 1 + .../setting/system/SystemSettingKey.java | 16 ++- .../kapua/commons/util/SystemUtils.java | 4 +- .../kapua-environment-setting.properties | 6 +- .../kapua/commons/util/SystemUtilsTest.java | 4 +- .../kapua-environment-setting.properties | 2 +- deployment/docker/compose/docker-compose.yml | 2 + .../src/test/resources/activemq.xml | 2 + .../resources/kapua-broker-setting.properties | 6 + .../kapua-environment-setting.properties | 6 +- .../kapua-environment-setting.properties | 2 +- .../kapua-environment-setting.properties | 2 +- .../kapua-environment-setting.properties | 2 +- .../kapua-environment-setting.properties | 2 +- .../kapua-environment-setting.properties | 2 +- .../kapua-environment-setting.properties | 2 +- .../kapua-environment-setting.properties | 2 +- .../kapua-environment-setting.properties | 2 +- .../transport/mqtt/MqttClientFactoryImpl.java | 2 +- .../resources/mqtt-client-setting.properties | 4 +- 28 files changed, 159 insertions(+), 69 deletions(-) rename {broker/core/src/main/resources => assembly/broker/configurations}/shiro.ini (100%) diff --git a/assembly/broker/configurations/activemq.xml b/assembly/broker/configurations/activemq.xml index d32a2720f35..83ca3629e15 100644 --- a/assembly/broker/configurations/activemq.xml +++ b/assembly/broker/configurations/activemq.xml @@ -172,6 +172,8 @@ + stealingLinkManagerFuture; + private static final String CONNECTOR_NAME_VM = String.format("vm://%s", BrokerSetting.getInstance().getString(BrokerSettingKey.BROKER_NAME)); + private static final String CONNECTOR_NAME_INTERNAL; + private static final String ERROR = "@@ error"; /** @@ -133,6 +137,7 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { BROKER_ID_RESOLVER_CLASS_NAME = config.getString(BrokerSettingKey.BROKER_ID_RESOLVER_CLASS_NAME); AUTHENTICATOR_CLASS_NAME = config.getString(BrokerSettingKey.AUTHENTICATOR_CLASS_NAME); AUTHORIZER_CLASS_NAME = config.getString(BrokerSettingKey.AUTHORIZER_CLASS_NAME); + CONNECTOR_NAME_INTERNAL = SystemSetting.getInstance().getString(SystemSettingKey.BROKER_INTERNAL_CONNECTOR_NAME); STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME = config.getLong(BrokerSettingKey.STEALING_LINK_INITIALIZATION_MAX_WAIT_TIME); stealingLinkEnabled = config.getBoolean(BrokerSettingKey.BROKER_STEALING_LINK_ENABLED); publishInfoMessageSizeLimit = BrokerSetting.getInstance().getInt(BrokerSettingKey.PUBLISHED_MESSAGE_SIZE_LOG_THRESHOLD, DEFAULT_PUBLISHED_MESSAGE_SIZE_LOG_THRESHOLD); @@ -144,7 +149,6 @@ public class KapuaSecurityBrokerFilter extends BrokerFilter { protected String brokerId; protected static final Map CONNECTION_MAP = new ConcurrentHashMap<>(); - private static final String CONNECTOR_NAME_VM = String.format("vm://%s", BrokerSetting.getInstance().getString(BrokerSettingKey.BROKER_NAME)); private Authenticator authenticator; private Authorizer authorizer; @@ -342,14 +346,29 @@ protected void unregisterStealingLinkManager() { */ private boolean isPassThroughConnection(ConnectionContext context) { if (context != null) { - return context.getConnector() == null || - CONNECTOR_NAME_VM.equals(((TransportConnector) context.getConnector()).getName()) || - context.getConnection().isNetworkConnection(); + if (context.getConnector() == null || + CONNECTOR_NAME_VM.equals(((TransportConnector) context.getConnector()).getName()) || + CONNECTOR_NAME_INTERNAL.equals(((TransportConnector) context.getConnector()).getName())) { + return true; + } + + // network connector + if (context.getConnection().isNetworkConnection()) { + return true; + } } return false; } + private boolean isInternalConnector(ConnectionContext context) { + if (context != null && + (context.getConnector() == null || CONNECTOR_NAME_INTERNAL.equals(((TransportConnector) context.getConnector()).getName()))) { + return true; + } + return false; + } + /** * Check if security context is broker context * Return true if security context is a broker context or if is a pass through connection @@ -359,13 +378,13 @@ private boolean isPassThroughConnection(ConnectionContext context) { * @param context * @return */ - private boolean isBrokerContext(ConnectionContext context) { + private boolean isTrustedContext(ConnectionContext context) { if (context == null) { return false; } else if (context.getSecurityContext() != null) { return context.getSecurityContext().isBrokerContext(); } else { - return isPassThroughConnection(context); + return isPassThroughConnection(context) || isInternalConnector(context); } } @@ -396,10 +415,22 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) addExternalConnection(context, info); loginMetric.getSuccess().inc(); } + else if (isInternalConnector(context)) { + logger.debug("Internal connector: Login from {}", info.getUserName()); + //this connection type is not so frequent so we can read values from configuration each time without having big performance impact + loginMetric.getInternalConnectorAttempt().inc(); + String username = SystemSetting.getInstance().getString(SystemSettingKey.BROKER_INTERNAL_CONNECTOR_USERNAME); + String pass = SystemSetting.getInstance().getString(SystemSettingKey.BROKER_INTERNAL_CONNECTOR_PASSWORD); + if (username==null || !username.equals(info.getUserName()) || + pass==null || !pass.equals(info.getPassword())) { + throw new SecurityException("User not allowed!"); + } + loginMetric.getInternalConnectorConnected().inc(); + } super.addConnection(context, info); } - private void addExternalConnection(ConnectionContext context, ConnectionInfo info) + protected void addExternalConnection(ConnectionContext context, ConnectionInfo info) throws CredentialException, KapuaException { // Clean-up credentials possibly associated with the current thread by previous connection. ThreadContext.unbindSubject(); @@ -424,6 +455,7 @@ private void addExternalConnection(ConnectionContext context, ConnectionInfo inf loginShiroLoginTimeContext.stop(); CONNECTION_MAP.put(kapuaSecurityContext.getFullClientId(), info.getConnectionId().getValue()); + buildAuthorization(kapuaSecurityContext, authenticator.connect(kapuaSecurityContext)); context.setSecurityContext(kapuaSecurityContext); @@ -475,38 +507,46 @@ private void addExternalConnection(ConnectionContext context, ConnectionInfo inf } @Override - public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) - throws Exception { + public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { if (!isPassThroughConnection(context)) { - Context loginRemoveConnectionTimeContext = loginMetric.getRemoveConnectionTime().time(); - KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(context); - try { - KapuaPrincipal kapuaPrincipal = ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()); - kapuaSecurityContext.updateOldConnectionId(CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId())); - // TODO fix the kapua session when run as feature will be implemented - KapuaSecurityUtils.setSession(new KapuaSession(kapuaPrincipal)); - authenticator.disconnect(kapuaSecurityContext, error); - // multiple account stealing link fix - info.setClientId(kapuaSecurityContext.getFullClientId()); - context.setClientId(kapuaSecurityContext.getFullClientId()); - } finally { - loginRemoveConnectionTimeContext.stop(); - authenticationService.logout(); - if (kapuaSecurityContext != null && kapuaSecurityContext.getFullClientId() != null) { - if (info.getConnectionId().getValue().equals(CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId()))) { - // cleanup stealing link detection map - CONNECTION_MAP.remove(kapuaSecurityContext.getFullClientId()); - } else { - logger.info("Cannot find client id in the connection map. May be it's due to a stealing link. ({})", kapuaSecurityContext.getFullClientId()); - } + removeExternalConnection(context, info, error); + } + else if (isInternalConnector(context)) { + logger.debug("Internal connector: Logout from {}", info.getUserName()); + loginMetric.getInternalConnectorDisconnected().inc(); + } + super.removeConnection(context, info, error); + context.setSecurityContext(null); + } + + protected void removeExternalConnection(ConnectionContext context, ConnectionInfo info, Throwable error) + throws Exception { + logger.debug("Throwable on remove connection: {}", error!=null ? error.getMessage() : "N/A"); + Context loginRemoveConnectionTimeContext = loginMetric.getRemoveConnectionTime().time(); + KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(context); + try { + KapuaPrincipal kapuaPrincipal = ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()); + kapuaSecurityContext.updateOldConnectionId(CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId())); + // TODO fix the kapua session when run as feature will be implemented + KapuaSecurityUtils.setSession(new KapuaSession(kapuaPrincipal)); + authenticator.disconnect(kapuaSecurityContext, error); + // multiple account stealing link fix + info.setClientId(kapuaSecurityContext.getFullClientId()); + context.setClientId(kapuaSecurityContext.getFullClientId()); + } finally { + loginRemoveConnectionTimeContext.stop(); + authenticationService.logout(); + if (kapuaSecurityContext != null && kapuaSecurityContext.getFullClientId() != null) { + if (info.getConnectionId().getValue().equals(CONNECTION_MAP.get(kapuaSecurityContext.getFullClientId()))) { + // cleanup stealing link detection map + CONNECTION_MAP.remove(kapuaSecurityContext.getFullClientId()); } else { - logger.warn("Cannot find Kapua connection context or client id is null"); + logger.info("Cannot find client id in the connection map. May be it's due to a stealing link. ({})", kapuaSecurityContext.getFullClientId()); } + } else { + logger.warn("Cannot find Kapua connection context or client id is null"); } } - super.removeConnection(context, info, error); - // context may be null according to isPassThroughConnection(context) - context.setSecurityContext(null); } private Account getAccount(KapuaId scopeId) { @@ -559,7 +599,7 @@ private void internalSend(ProducerBrokerExchange producerExchange, Message messa } int messageSize = messageSend.getSize(); messageSend.setProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().toEpochMilli()); - if (!isBrokerContext(producerExchange.getConnectionContext())) { + if (!isTrustedContext(producerExchange.getConnectionContext())) { KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(producerExchange.getConnectionContext()); KapuaPrincipal kapuaPrincipal = ((KapuaPrincipal) kapuaSecurityContext.getMainPrincipal()); if (!messageSend.getDestination().isTemporary() && !authorizer.isAllowed(ActionType.WRITE, kapuaSecurityContext, messageSend.getDestination())) { @@ -629,7 +669,7 @@ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) private Subscription internalAddConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { info.setClientId(context.getClientId()); - if (!isBrokerContext(context)) { + if (!isTrustedContext(context)) { String[] destinationsPath = info.getDestination().getDestinationPaths(); String destination = info.getDestination().getPhysicalName(); KapuaSecurityContext kapuaSecurityContext = getKapuaSecurityContext(context); diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java index 575a12786fa..514f325723a 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/KapuaSecurityContext.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2011, 2020 Eurotech and/or its affiliates and others + * Copyright (c) 2016, 2020 Eurotech and/or its affiliates and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java index 2023bb8ab09..7a7349818f8 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/authentication/DefaultAuthenticator.java @@ -44,9 +44,11 @@ public class DefaultAuthenticator implements Authenticator { protected static final Logger logger = LoggerFactory.getLogger(DefaultAuthenticator.class); private static final String SYSTEM_MESSAGE_CREATOR_CLASS_NAME; + private static final String ADMIN_USERNAME; static { SYSTEM_MESSAGE_CREATOR_CLASS_NAME = BrokerSetting.getInstance().getString(BrokerSettingKey.SYSTEM_MESSAGE_CREATOR_CLASS_NAME); + ADMIN_USERNAME = SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_USERNAME); } private Map options; @@ -80,7 +82,8 @@ public List connect(KapuaSecurityContext kapuaSecurityContex loginMetric.getKapuasysTokenAttempt().inc(); authorizationEntries = adminAuthenticationLogic.connect(kapuaSecurityContext); clientMetric.getConnectedKapuasys().inc(); - } else { + } + else { loginMetric.getNormalUserAttempt().inc(); authorizationEntries = userAuthenticationLogic.connect(kapuaSecurityContext); clientMetric.getConnectedClient().inc(); @@ -92,10 +95,11 @@ public List connect(KapuaSecurityContext kapuaSecurityContex @Override public void disconnect(KapuaSecurityContext kapuaSecurityContext, Throwable error) { if (isAdminUser(kapuaSecurityContext)) { - clientMetric.getDisconnectionKapuasys().inc(); + clientMetric.getDisconnectedKapuasys().inc(); adminAuthenticationLogic.disconnect(kapuaSecurityContext, error); - } else { - clientMetric.getDisconnectionClient().inc(); + } + else { + clientMetric.getDisconnectedClient().inc(); if (userAuthenticationLogic.disconnect(kapuaSecurityContext, error)) { sendDisconnectMessage(kapuaSecurityContext); } @@ -138,8 +142,7 @@ private void sendMessage(KapuaSecurityContext kapuaSecurityContext, String messa } protected boolean isAdminUser(KapuaSecurityContext kapuaSecurityContext) { - String adminUsername = SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_USERNAME); - return kapuaSecurityContext.getUserName().equals(adminUsername); + return kapuaSecurityContext.getUserName().equals(ADMIN_USERNAME); } } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/ClientMetric.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/ClientMetric.java index 381edf54a2c..ab382ecf91d 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/ClientMetric.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/ClientMetric.java @@ -44,11 +44,11 @@ public Counter getConnectedKapuasys() { return connectedKapuasys; } - public Counter getDisconnectionClient() { + public Counter getDisconnectedClient() { return disconnectionClient; } - public Counter getDisconnectionKapuasys() { + public Counter getDisconnectedKapuasys() { return disconnectionKapuasys; } diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java index 7918b84fc3f..1fccbbadfbf 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/LoginMetric.java @@ -25,11 +25,14 @@ public class LoginMetric { private Counter invalidUserPassword; private Counter invalidClientId; private Counter kapuasysTokenAttempt; + private Counter internalConnectorAttempt; + private Counter internalConnectorConnected; private Counter normalUserAttempt; private Counter stealingLinkConnect; private Counter stealingLinkDisconnect; private Counter adminStealingLinkDisconnect; - protected Counter remoteStealingLinkDisconnect; + private Counter remoteStealingLinkDisconnect; + private Counter internalConnectorDisconnected; private Timer addConnectionTime; private Timer normalUserTime; private Timer shiroLoginTime; @@ -52,6 +55,9 @@ private LoginMetric() { invalidUserPassword = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_FAILURE_PASSWORD, SecurityMetrics.METRIC_COUNT); invalidClientId = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_FAILURE_CLIENT_ID, SecurityMetrics.METRIC_COUNT); kapuasysTokenAttempt = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_KAPUASYS, SecurityMetrics.METRIC_COUNT); + internalConnectorAttempt = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_INTERNAL_CONNECTOR, SecurityMetrics.METRIC_COUNT); + internalConnectorConnected = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_INTERNAL_CONNECTOR, SecurityMetrics.METRIC_CONNECT, SecurityMetrics.METRIC_COUNT); + internalConnectorDisconnected = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_INTERNAL_CONNECTOR, SecurityMetrics.METRIC_DISCONNECT, SecurityMetrics.METRIC_COUNT); normalUserAttempt = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_NORMAL, SecurityMetrics.METRIC_COUNT); stealingLinkConnect = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_STEALING_LINK, SecurityMetrics.METRIC_CONNECT, SecurityMetrics.METRIC_COUNT); stealingLinkDisconnect = metricsService.getCounter(SecurityMetrics.METRIC_MODULE_NAME, SecurityMetrics.METRIC_COMPONENT_LOGIN, SecurityMetrics.METRIC_STEALING_LINK, SecurityMetrics.METRIC_DISCONNECT, SecurityMetrics.METRIC_COUNT); @@ -89,6 +95,10 @@ public Counter getKapuasysTokenAttempt() { return kapuasysTokenAttempt; } + public Counter getInternalConnectorAttempt() { + return internalConnectorAttempt; + } + public Counter getNormalUserAttempt() { return normalUserAttempt; } @@ -109,8 +119,12 @@ public Counter getAdminStealingLinkDisconnect() { return adminStealingLinkDisconnect; } - public void setAdminStealingLinkDisconnect(Counter adminStealingLinkDisconnect) { - this.adminStealingLinkDisconnect = adminStealingLinkDisconnect; + public Counter getInternalConnectorConnected() { + return internalConnectorConnected; + } + + public Counter getInternalConnectorDisconnected() { + return internalConnectorDisconnected; } public Timer getAddConnectionTime() { diff --git a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/SecurityMetrics.java b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/SecurityMetrics.java index 049d9cc51b5..f1578dbdad8 100644 --- a/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/SecurityMetrics.java +++ b/broker/core/src/main/java/org/eclipse/kapua/broker/core/plugin/metric/SecurityMetrics.java @@ -27,6 +27,7 @@ private SecurityMetrics() { } public static final String METRIC_CONNECTED = "connected"; public static final String METRIC_DISCONNECT = "disconnect"; public static final String METRIC_DISCONNECTED = "disconnected"; + public static final String METRIC_INTERNAL_CONNECTOR = "internal_connector"; public static final String METRIC_SUCCESS = "success"; public static final String METRIC_FAILURE = "failure"; public static final String METRIC_FAILURE_PASSWORD = METRIC_FAILURE + "_password"; diff --git a/commons/src/main/java/org/eclipse/kapua/commons/setting/system/SystemSettingKey.java b/commons/src/main/java/org/eclipse/kapua/commons/setting/system/SystemSettingKey.java index 60ef60ec8b8..711a4c68f86 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/setting/system/SystemSettingKey.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/setting/system/SystemSettingKey.java @@ -172,9 +172,21 @@ public enum SystemSettingKey implements SettingKey { */ BROKER_HOST("broker.host"), /** - * Broker port + * Broker internal port */ - BROKER_PORT("broker.port"), + BROKER_INTERNAL_CONNECTOR_PORT("broker.connector.internal.port"), + /** + * Broker internal connector name + */ + BROKER_INTERNAL_CONNECTOR_NAME("broker.connector.internal.name"), + /** + * Internal connector username + */ + BROKER_INTERNAL_CONNECTOR_USERNAME("broker.connector.internal.username"), + /** + * Internal connector password + */ + BROKER_INTERNAL_CONNECTOR_PASSWORD("broker.connector.internal.password"), /** * Metrics JMX disabled diff --git a/commons/src/main/java/org/eclipse/kapua/commons/util/SystemUtils.java b/commons/src/main/java/org/eclipse/kapua/commons/util/SystemUtils.java index 84b69ae5c9b..fa408115fb8 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/util/SystemUtils.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/util/SystemUtils.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2020 Eurotech and/or its affiliates and others + * Copyright (c) 2011, 2020 Eurotech and/or its affiliates and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -40,7 +40,7 @@ public static URI getNodeURI() return new URI(envConfig.getString(SystemSettingKey.BROKER_SCHEME), null, envConfig.getString(SystemSettingKey.BROKER_HOST), - envConfig.getInt(SystemSettingKey.BROKER_PORT), + envConfig.getInt(SystemSettingKey.BROKER_INTERNAL_CONNECTOR_PORT), null, null, null); diff --git a/commons/src/main/resources/kapua-environment-setting.properties b/commons/src/main/resources/kapua-environment-setting.properties index 422efe20566..71f3e23edd6 100644 --- a/commons/src/main/resources/kapua-environment-setting.properties +++ b/commons/src/main/resources/kapua-environment-setting.properties @@ -53,7 +53,11 @@ commons.db.character.wildcard.single=_ # broker.scheme=tcp broker.host=localhost -broker.port=1883 +broker.connector.internal.port=1893 +broker.connector.internal.name=internalMqtt +#please keep these parameters aligned in according with transport.credential.username and transport.credential.password of transport-mqtt module +broker.connector.internal.username=internalUsername +broker.connector.internal.password=internalPassword character.encoding=UTF-8 diff --git a/commons/src/test/java/org/eclipse/kapua/commons/util/SystemUtilsTest.java b/commons/src/test/java/org/eclipse/kapua/commons/util/SystemUtilsTest.java index 51c7c6a490e..b6555e4c6b1 100644 --- a/commons/src/test/java/org/eclipse/kapua/commons/util/SystemUtilsTest.java +++ b/commons/src/test/java/org/eclipse/kapua/commons/util/SystemUtilsTest.java @@ -32,7 +32,7 @@ public void privateConstrutorTest() throws Exception { @Test public void getNodeUriTest() throws Exception { try { - URI uri = new URI("tcp://localhost:1884"); + URI uri = new URI("tcp://localhost:1893"); assertEquals(SystemUtils.getNodeURI(), uri); } catch (Exception ex) { fail("The URI is incorrect!"); @@ -42,7 +42,7 @@ public void getNodeUriTest() throws Exception { @Test public void getNodeUriUdpTest() throws Exception { try { - URI uri = new URI("udp://localhost:1884"); + URI uri = new URI("udp://localhost:1893"); assertNotEquals(SystemUtils.getNodeURI(), uri); } catch (Exception ex) { //expected diff --git a/commons/src/test/resources/kapua-environment-setting.properties b/commons/src/test/resources/kapua-environment-setting.properties index 1f61cecdc8b..7bbf177c97a 100644 --- a/commons/src/test/resources/kapua-environment-setting.properties +++ b/commons/src/test/resources/kapua-environment-setting.properties @@ -51,7 +51,7 @@ commons.db.pool.borrow.timeout=15000 # broker.scheme=tcp broker.host=localhost -broker.port=1884 +broker.connector.internal.port=1893 character.encoding=UTF-8 diff --git a/deployment/docker/compose/docker-compose.yml b/deployment/docker/compose/docker-compose.yml index 05e21b6e703..1611fb76cbf 100644 --- a/deployment/docker/compose/docker-compose.yml +++ b/deployment/docker/compose/docker-compose.yml @@ -23,6 +23,8 @@ services: - 5672:5672 broker: image: kapua/kapua-broker:${IMAGE_VERSION} + expose: + - 1893 ports: - 1883:1883 - 8883:8883 diff --git a/qa/integration/src/test/resources/activemq.xml b/qa/integration/src/test/resources/activemq.xml index dcc0af9afc2..bae1df1a818 100644 --- a/qa/integration/src/test/resources/activemq.xml +++ b/qa/integration/src/test/resources/activemq.xml @@ -176,6 +176,8 @@ +