From c03eae6769ab7f4af4ce29051ce6dc29d6eec7f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pere=20Urb=C3=B3n?= Date: Thu, 30 Sep 2021 15:51:14 +0200 Subject: [PATCH] Add a possibility to add custom roles definitions for julieops (#336) * initial step into adding custom roles for julieops * add test and wire up transformations and actions for having custom role definitions in acls and rbac providers * add integration test for rbac * add connector level test for rbac * add sample actions and tests for ksqldb * simplify acl conversion * add early validation for wrong ACLs config values * add sample documentation page * ammend rbac julieroles test to make it more repetable * make role and operation be an alias --- docs/futures/define-custom-roles.rst | 128 ++++++++++++++++++ example/roles.yml | 31 +++++ .../kafka/topology/AccessControlManager.java | 25 +++- .../topology/BindingsBuilderProvider.java | 5 + .../purbon/kafka/topology/Configuration.java | 26 ++++ .../com/purbon/kafka/topology/Constants.java | 3 + .../com/purbon/kafka/topology/JulieOps.java | 3 +- .../access/builders/BuildBindingsForRole.java | 59 ++++++++ .../topology/api/mds/MDSApiClientBuilder.java | 1 + .../topology/model/Impl/ProjectImpl.java | 24 +++- .../kafka/topology/model/JulieRole.java | 26 ++++ .../kafka/topology/model/JulieRoleAcl.java | 59 ++++++++ .../kafka/topology/model/JulieRoles.java | 54 ++++++++ .../purbon/kafka/topology/model/Project.java | 5 + .../kafka/topology/model/users/Other.java | 93 +++++++++++++ .../roles/acls/AclsBindingsBuilder.java | 39 ++++++ .../roles/rbac/RBACBindingsBuilder.java | 52 +++++++ .../topology/serdes/JulieRolesSerdes.java | 34 +++++ .../serdes/TopologyCustomDeserializer.java | 49 +++++-- .../topology/AccessControlManagerTest.java | 70 ++++++++++ .../kafka/topology/ConfigurationTest.java | 41 ++++++ .../purbon/kafka/topology/JulieRolesTest.java | 73 ++++++++++ .../kafka/topology/TestTopologyBuilder.java | 17 +++ .../kafka/topology/TopologySerdesTest.java | 15 ++ .../integration/AccessControlManagerIT.java | 31 +++++ .../topology/integration/MDSBaseTest.java | 4 + .../integration/RBACPRoviderRbacIT.java | 49 +++++++ src/test/resources/descriptor.yaml | 6 + src/test/resources/roles-goodTest.yaml | 42 ++++++ src/test/resources/roles-rbac.yaml | 39 ++++++ src/test/resources/roles-wrong.yaml | 43 ++++++ src/test/resources/roles.yaml | 42 ++++++ 32 files changed, 1175 insertions(+), 13 deletions(-) create mode 100644 docs/futures/define-custom-roles.rst create mode 100644 example/roles.yml create mode 100644 src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForRole.java create mode 100644 src/main/java/com/purbon/kafka/topology/model/JulieRole.java create mode 100644 src/main/java/com/purbon/kafka/topology/model/JulieRoleAcl.java create mode 100644 src/main/java/com/purbon/kafka/topology/model/JulieRoles.java create mode 100644 src/main/java/com/purbon/kafka/topology/model/users/Other.java create mode 100644 src/main/java/com/purbon/kafka/topology/serdes/JulieRolesSerdes.java create mode 100644 src/test/java/com/purbon/kafka/topology/JulieRolesTest.java create mode 100644 src/test/resources/roles-goodTest.yaml create mode 100644 src/test/resources/roles-rbac.yaml create mode 100644 src/test/resources/roles-wrong.yaml create mode 100644 src/test/resources/roles.yaml diff --git a/docs/futures/define-custom-roles.rst b/docs/futures/define-custom-roles.rst new file mode 100644 index 000000000..57ce35b3d --- /dev/null +++ b/docs/futures/define-custom-roles.rst @@ -0,0 +1,128 @@ +Define custom roles for JulieOps +******************************* + +While JulieOps offer you as a user the possibility to manage the ACLs (and RBAC if you're using the Confluent Platform) for most common +applications deployments such as Consumers, Producers, Kafka Streams, Connectors and ksqlDB, it would be for some cases amazing to be +be able to keep using the powerful abstractions of JulieOps but provide your own set of ACLs. + +For example: + +* If you are deploying a custom App and aim to give application specific roles +* Deploying applications that might not fit our of the box with generic permissions provided by JulieOps +* Or just if you are building your own roles based on Simple ACLs or Confluent RBAC + +and more. + +But, how can you get this with JulieOps. + +Defining the your roles +----------- + +First thing is to define your roles in a configuration file, this file should look like this: + +.. code-block:: YAML + + roles: + - name: "app" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "PREFIXED" + host: "*" + operation: "ALL" + permissionType: "ALLOW" + - resourceType: "Topic" + resourceName: "sourceTopic" + patternType: "LITERAL" + host: "*" + operation: "ALL" + permissionType: "READ" + - resourceType: "Topic" + resourceName: "targetTopic" + patternType: "LITERAL" + host: "*" + operation: "ALL" + permissionType: "WRITE" + - resourceType: "Group" + resourceName: "{{group}}" + patternType: "PREFIXED" + host: "*" + operation: "READ" + permissionType: "ALLOW" + +if you are using Confluent Platform RBAC functionality to define your own Access Control management, the only different property +per acl is **role**, so the file might look like this: + +.. code-block:: YAML + + roles: + - name: "app" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "PREFIXED" + host: "*" + role: "ResourceOwner" + - resourceType: "Topic" + resourceName: "sourceTopic" + patternType: "LITERAL" + host: "*" + role: "DeveloperRead" + - resourceType: "Topic" + resourceName: "targetTopic" + patternType: "LITERAL" + host: "*" + role: "DeveloperWrite" + - resourceType: "Group" + resourceName: "{{group}}" + patternType: "PREFIXED" + host: "*" + role: "DeveloperRead" + - resourceType: "Subject" + resourceName: "Subject:foo" + patternType: "LITERAL" + host: "*" + role: "DeveloperRead" + - resourceType: "Connector" + resourceName: "Connector:con" + patternType: "LITERAL" + host: "*" + role: "SecurityAdmin" + - resourceType: "KsqlCluster" + resourceName: "KsqlCluster:ksql-cluster" + patternType: "LITERAL" + host: "*" + role: "ResourceOwner" + + +Plug this into JulieOps +----------- + +Once the roles are define, the only thing you need to do is to configure your deployment to use it. This can be done using this +configuration variable in your property file: + + +.. code-block:: JAVA + + julie.roles=/path/to/the/roles/file + + +How would my new topology file look like +----------- + +Once the new roles are setup, your topology can start using them just as the previous "hardcoded" roles. +Your topology file could look like this: + + +.. code-block:: YAML + + context: "contextOrg" + source: "source" + projects: + - name: "foo" + foo: + - principal: "User:banana" + group: "foo" + bar: + - principal: "User:bandana" + group: "bar" \ No newline at end of file diff --git a/example/roles.yml b/example/roles.yml new file mode 100644 index 000000000..7ef3b0a59 --- /dev/null +++ b/example/roles.yml @@ -0,0 +1,31 @@ +--- +roles: + rules: + - name: "security" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "" + host: "" + operation: "" + permissionType: "" + - resourceType: "Group" + resourceName: "{{group}}" + patternType: "" + host: "" + operation: "" + permissionType: "" + - name: "other" + acls: + - resourceType: "Topic" + resourceName: "" + patternType: "" + host: "" + operation: "" + permissionType: "" + - resourceType: "Group" + resourceName: "" + patternType: "" + host: "" + operation: "" + permissionType: "" \ No newline at end of file diff --git a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java index 6bac9c8fc..0e9b8e951 100644 --- a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java +++ b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java @@ -9,6 +9,7 @@ import com.purbon.kafka.topology.actions.access.builders.rbac.*; import com.purbon.kafka.topology.model.Component; import com.purbon.kafka.topology.model.DynamicUser; +import com.purbon.kafka.topology.model.JulieRoles; import com.purbon.kafka.topology.model.Platform; import com.purbon.kafka.topology.model.Project; import com.purbon.kafka.topology.model.Topology; @@ -17,6 +18,7 @@ import com.purbon.kafka.topology.model.users.Consumer; import com.purbon.kafka.topology.model.users.KSqlApp; import com.purbon.kafka.topology.model.users.KStream; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import com.purbon.kafka.topology.model.users.Schemas; import com.purbon.kafka.topology.model.users.platform.ControlCenterInstance; @@ -37,6 +39,7 @@ public class AccessControlManager { private static final Logger LOGGER = LogManager.getLogger(AccessControlManager.class); private final Configuration config; + private final JulieRoles julieRoles; private AccessControlProvider controlProvider; private BindingsBuilderProvider bindingsBuilder; private final List managedServiceAccountPrefixes; @@ -52,12 +55,21 @@ public AccessControlManager( AccessControlProvider controlProvider, BindingsBuilderProvider builderProvider, Configuration config) { + this(controlProvider, builderProvider, new JulieRoles(), config); + } + + public AccessControlManager( + AccessControlProvider controlProvider, + BindingsBuilderProvider builderProvider, + JulieRoles julieRoles, + Configuration config) { this.controlProvider = controlProvider; this.bindingsBuilder = builderProvider; this.config = config; this.managedServiceAccountPrefixes = config.getServiceAccountManagedPrefixes(); this.managedTopicPrefixes = config.getTopicManagedPrefixes(); this.managedGroupPrefixes = config.getGroupManagedPrefixes(); + this.julieRoles = julieRoles; } /** @@ -68,6 +80,7 @@ public AccessControlManager( * @param plan An Execution plan */ public void apply(final Topology topology, ExecutionPlan plan) throws IOException { + julieRoles.validateTopology(topology); List actions = buildProjectActions(topology); actions.addAll(buildPlatformLevelActions(topology)); buildUpdateBindingsActions(actions, loadActualClusterStateIfAvailable(plan)).forEach(plan::add); @@ -99,7 +112,7 @@ private Set providerBindings() { * @param topology A topology file * @return List A list of actions required based on the parameters */ - public List buildProjectActions(Topology topology) { + public List buildProjectActions(Topology topology) throws IOException { List actions = new ArrayList<>(); for (Project project : topology.getProjects()) { @@ -132,6 +145,16 @@ public List buildProjectActions(Topology topology) { } syncRbacRawRoles(project.getRbacRawRoles(), topicPrefix, actions); + + for (Map.Entry> other : project.getOthers().entrySet()) { + if (julieRoles.size() == 0) { + throw new IOException( + "Custom JulieRoles are being used without providing the required config file."); + } + actions.add( + new BuildBindingsForRole( + bindingsBuilder, julieRoles.get(other.getKey()), other.getValue())); + } } return actions; } diff --git a/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java b/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java index 97eb72767..69b17deb7 100644 --- a/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java +++ b/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java @@ -2,9 +2,11 @@ import com.purbon.kafka.topology.exceptions.ConfigurationException; import com.purbon.kafka.topology.model.Component; +import com.purbon.kafka.topology.model.JulieRoleAcl; import com.purbon.kafka.topology.model.users.Connector; import com.purbon.kafka.topology.model.users.Consumer; import com.purbon.kafka.topology.model.users.KSqlApp; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import com.purbon.kafka.topology.model.users.platform.KsqlServerInstance; import com.purbon.kafka.topology.model.users.platform.SchemaRegistryInstance; @@ -56,4 +58,7 @@ default List setClusterLevelRole( Collection buildBindingsForKSqlServer(KsqlServerInstance ksqlServer); Collection buildBindingsForKSqlApp(KSqlApp app, String prefix); + + Collection buildBindingsForJulieRole( + Other other, String name, List acls) throws IOException; } diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java index 10505de0c..5e8f44977 100644 --- a/src/main/java/com/purbon/kafka/topology/Configuration.java +++ b/src/main/java/com/purbon/kafka/topology/Configuration.java @@ -5,9 +5,11 @@ import com.purbon.kafka.topology.api.ksql.KsqlClientConfig; import com.purbon.kafka.topology.exceptions.ConfigurationException; +import com.purbon.kafka.topology.model.JulieRoles; import com.purbon.kafka.topology.model.Project; import com.purbon.kafka.topology.model.Topic; import com.purbon.kafka.topology.model.Topology; +import com.purbon.kafka.topology.serdes.JulieRolesSerdes; import com.purbon.kafka.topology.serdes.TopologySerdes.FileType; import com.purbon.kafka.topology.utils.BasicAuth; import com.purbon.kafka.topology.utils.Pair; @@ -15,14 +17,20 @@ import com.typesafe.config.ConfigException; import com.typesafe.config.ConfigFactory; import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class Configuration { + private static final Logger LOGGER = LogManager.getLogger(Configuration.class); + private final Map cliParams; private final Config config; @@ -361,6 +369,10 @@ public String getKafkaConnectClusterId() { return config.getString(MDS_KC_CLUSTER_ID_CONFIG); } + public String getKsqlDBClusterID() { + return config.getString(MDS_KSQLDB_CLUSTER_ID_CONFIG); + } + public Optional getSslTrustStoreLocation() { try { return Optional.of(config.getString(SSL_TRUSTSTORE_LOCATION)); @@ -477,4 +489,18 @@ public Optional getMdsBasicAuth() { } return Optional.ofNullable(auth); } + + public JulieRoles getJulieRoles() throws IOException { + JulieRolesSerdes serdes = new JulieRolesSerdes(); + try { + String path = config.getString(JULIE_ROLES); + return serdes.deserialise(Paths.get(path).toFile()); + } catch (ConfigException.Missing | ConfigException.WrongType ex) { + LOGGER.debug(ex); + return new JulieRoles(); + } catch (IOException e) { + LOGGER.error(e); + throw e; + } + } } diff --git a/src/main/java/com/purbon/kafka/topology/Constants.java b/src/main/java/com/purbon/kafka/topology/Constants.java index 5a01c1825..9d45cbc30 100644 --- a/src/main/java/com/purbon/kafka/topology/Constants.java +++ b/src/main/java/com/purbon/kafka/topology/Constants.java @@ -41,6 +41,7 @@ public class Constants { public static final String MDS_KAFKA_CLUSTER_ID_CONFIG = "topology.builder.mds.kafka.cluster.id"; static final String MDS_SR_CLUSTER_ID_CONFIG = "topology.builder.mds.schema.registry.cluster.id"; static final String MDS_KC_CLUSTER_ID_CONFIG = "topology.builder.mds.kafka.connect.cluster.id"; + static final String MDS_KSQLDB_CLUSTER_ID_CONFIG = "topology.builder.mds.ksqldb.cluster.id"; public static final String CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; static final String CONFLUENT_MONITORING_TOPIC_CONFIG = "confluent.monitoring.topic"; @@ -116,4 +117,6 @@ public class Constants { public static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; public static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; public static final String SSL_KEY_PASSWORD = "ssl.key.password"; + + public static final String JULIE_ROLES = "julie.roles"; } diff --git a/src/main/java/com/purbon/kafka/topology/JulieOps.java b/src/main/java/com/purbon/kafka/topology/JulieOps.java index 6266f2143..5936060b7 100644 --- a/src/main/java/com/purbon/kafka/topology/JulieOps.java +++ b/src/main/java/com/purbon/kafka/topology/JulieOps.java @@ -131,7 +131,8 @@ public static JulieOps build( config.validateWith(topology); AccessControlManager accessControlManager = - new AccessControlManager(accessControlProvider, bindingsBuilderProvider, config); + new AccessControlManager( + accessControlProvider, bindingsBuilderProvider, config.getJulieRoles(), config); RestService restService = new RestService(config.getConfluentSchemaRegistryUrl()); Map schemaRegistryConfig = config.asMap(); diff --git a/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForRole.java b/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForRole.java new file mode 100644 index 000000000..154672294 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/actions/access/builders/BuildBindingsForRole.java @@ -0,0 +1,59 @@ +package com.purbon.kafka.topology.actions.access.builders; + +import com.purbon.kafka.topology.BindingsBuilderProvider; +import com.purbon.kafka.topology.actions.BaseAccessControlAction; +import com.purbon.kafka.topology.model.JulieRole; +import com.purbon.kafka.topology.model.JulieRoleAcl; +import com.purbon.kafka.topology.model.users.Other; +import com.purbon.kafka.topology.utils.JinjaUtils; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class BuildBindingsForRole extends BaseAccessControlAction { + + private final BindingsBuilderProvider bindingsBuilder; + private final JulieRole julieRole; + private final List values; + + public BuildBindingsForRole( + BindingsBuilderProvider bindingsBuilder, JulieRole julieRole, List values) { + this.bindingsBuilder = bindingsBuilder; + this.julieRole = julieRole; + this.values = values; + } + + @Override + protected void execute() throws IOException { + for (Other other : values) { + var acls = + julieRole.getAcls().stream() + .map( + acl -> { + String resourceName = + JinjaUtils.serialise(acl.getResourceName(), other.asMap()); + return new JulieRoleAcl( + acl.getResourceType(), + resourceName, + acl.getPatternType(), + acl.getHost(), + acl.getOperation(), + acl.getPermissionType()); + }) + .collect(Collectors.toList()); + var instanceBindings = + bindingsBuilder.buildBindingsForJulieRole(other, julieRole.getName(), acls); + bindings.addAll(instanceBindings); + } + } + + @Override + protected Map props() { + Map map = new HashMap<>(); + map.put("Operation", getClass().getName()); + map.put("Role", julieRole.getName()); + return map; + } +} diff --git a/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java b/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java index e4c74b2eb..5464c98b6 100644 --- a/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/api/mds/MDSApiClientBuilder.java @@ -23,6 +23,7 @@ public MDSApiClient build() { apiClient.setKafkaClusterId(config.getKafkaClusterId()); apiClient.setSchemaRegistryClusterID(config.getSchemaRegistryClusterId()); apiClient.setConnectClusterID(config.getKafkaConnectClusterId()); + apiClient.setKSqlClusterID(config.getKsqlDBClusterID()); LOGGER.debug(String.format("Connecting to an MDS server at %s", mdsServer)); return apiClient; diff --git a/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java b/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java index 205083bb5..62ab33f7a 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java +++ b/src/main/java/com/purbon/kafka/topology/model/Impl/ProjectImpl.java @@ -11,10 +11,12 @@ import com.purbon.kafka.topology.model.users.Consumer; import com.purbon.kafka.topology.model.users.KSqlApp; import com.purbon.kafka.topology.model.users.KStream; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import com.purbon.kafka.topology.model.users.Schemas; import com.purbon.kafka.topology.utils.JinjaUtils; import java.util.*; +import java.util.stream.Collectors; public class ProjectImpl implements Project, Cloneable { @@ -30,6 +32,7 @@ public class ProjectImpl implements Project, Cloneable { private PlatformSystem connectors; private PlatformSystem schemas; private Map> rbacRawRoles; + private List>> others; private List topics; @@ -53,7 +56,8 @@ public ProjectImpl(String name, Configuration config) { Optional.empty(), Optional.empty(), Optional.empty(), - new HashMap<>(), + Collections.emptyMap(), + Collections.emptyList(), config); } @@ -66,6 +70,7 @@ public ProjectImpl( Optional> schemas, Optional> ksqls, Map> rbacRawRoles, + List>> others, Configuration config) { this( name, @@ -78,6 +83,7 @@ public ProjectImpl( schemas.orElse(new PlatformSystem<>()), ksqls.orElse(new PlatformSystem<>()), rbacRawRoles, + others, config); } @@ -92,6 +98,7 @@ public ProjectImpl( PlatformSystem schemas, PlatformSystem ksqls, Map> rbacRawRoles, + List>> others, Configuration config) { this.name = name; this.topics = topics; @@ -103,6 +110,7 @@ public ProjectImpl( this.connectors = connectors; this.schemas = schemas; this.rbacRawRoles = rbacRawRoles; + this.others = others; this.config = config; this.prefixContext = new HashMap<>(); this.order = new ArrayList<>(); @@ -164,6 +172,19 @@ public void setConnectors(List connectors) { this.connectors = new PlatformSystem<>(connectors); } + public Map> getOthers() { + return this.others.stream() + .map(e -> Map.entry(e.getKey(), e.getValue().getAccessControlLists())) + .collect(Collectors.toMap(km -> km.getKey(), vm -> vm.getValue())); + } + + public void setOthers(Map> others) { + this.others = + others.entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), new PlatformSystem<>(entry.getValue()))) + .collect(Collectors.toList()); + } + public List getTopics() { return topics; } @@ -237,6 +258,7 @@ public ProjectImpl clone() { new PlatformSystem<>(getSchemas()), new PlatformSystem<>(getKSqls()), getRbacRawRoles(), + others, config); project.setPrefixContextAndOrder(prefixContext, order); return project; diff --git a/src/main/java/com/purbon/kafka/topology/model/JulieRole.java b/src/main/java/com/purbon/kafka/topology/model/JulieRole.java new file mode 100644 index 000000000..73c20d498 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/model/JulieRole.java @@ -0,0 +1,26 @@ +package com.purbon.kafka.topology.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +public class JulieRole { + + private String name; + private List acls; + + @JsonCreator + public JulieRole( + @JsonProperty("name") String name, @JsonProperty("acls") List acls) { + this.name = name; + this.acls = acls; + } + + public String getName() { + return name; + } + + public List getAcls() { + return acls; + } +} diff --git a/src/main/java/com/purbon/kafka/topology/model/JulieRoleAcl.java b/src/main/java/com/purbon/kafka/topology/model/JulieRoleAcl.java new file mode 100644 index 000000000..0faac770e --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/model/JulieRoleAcl.java @@ -0,0 +1,59 @@ +package com.purbon.kafka.topology.model; + +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class JulieRoleAcl { + + private String resourceType; + private String resourceName; + private String patternType; + private String host; + private String operation; + private String permissionType; + + @JsonCreator + public JulieRoleAcl( + @JsonProperty("resourceType") String resourceType, + @JsonProperty("resourceName") String resourceName, + @JsonProperty("patternType") String patternType, + @JsonProperty("host") String host, + @JsonProperty("operation") @JsonAlias("role") String operation, + @JsonProperty("permissionType") String permissionType) { + this.resourceType = resourceType; + this.resourceName = resourceName; + this.patternType = patternType; + this.host = host; + this.operation = operation; + this.permissionType = permissionType; + } + + public String getResourceType() { + return resourceType; + } + + public String getResourceName() { + return resourceName; + } + + public String getPatternType() { + return patternType; + } + + public String getHost() { + return host; + } + + public String getOperation() { + return operation; + } + + public String getPermissionType() { + return permissionType; + } + + public String getRole() { + return getOperation(); + } +} diff --git a/src/main/java/com/purbon/kafka/topology/model/JulieRoles.java b/src/main/java/com/purbon/kafka/topology/model/JulieRoles.java new file mode 100644 index 000000000..a7f9946e5 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/model/JulieRoles.java @@ -0,0 +1,54 @@ +package com.purbon.kafka.topology.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.purbon.kafka.topology.model.users.Other; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class JulieRoles { + + private Map roles; + + public JulieRoles() { + this.roles = Collections.emptyMap(); + } + + @JsonCreator + public JulieRoles(@JsonProperty("roles") List roles) { + this.roles = roles.stream().collect(Collectors.toMap(JulieRole::getName, e -> e)); + } + + public List getRoles() { + return new ArrayList<>(roles.values()); + } + + public JulieRole get(String key) { + return roles.get(key); + } + + public void validateTopology(Topology topology) throws IOException { + if (roles.isEmpty()) { + return; + } + for (Project project : topology.getProjects()) { + for (Map.Entry> other : project.getOthers().entrySet()) { + if (!roles.containsKey(other.getKey())) { + throw new IOException( + "trying to deploy role: " + + other.getKey() + + " not available in the configured roles: " + + roles.keySet().stream().collect(Collectors.joining(", "))); + } + } + } + } + + public int size() { + return roles.size(); + } +} diff --git a/src/main/java/com/purbon/kafka/topology/model/Project.java b/src/main/java/com/purbon/kafka/topology/model/Project.java index b2202b83e..f48bc06b2 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Project.java +++ b/src/main/java/com/purbon/kafka/topology/model/Project.java @@ -8,6 +8,7 @@ import com.purbon.kafka.topology.model.users.Consumer; import com.purbon.kafka.topology.model.users.KSqlApp; import com.purbon.kafka.topology.model.users.KStream; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import com.purbon.kafka.topology.model.users.Schemas; import java.util.List; @@ -36,6 +37,10 @@ public interface Project { void setKSqls(List ksqls); + Map> getOthers(); + + void setOthers(Map> others); + List getConnectors(); KConnectArtefacts getConnectorArtefacts(); diff --git a/src/main/java/com/purbon/kafka/topology/model/users/Other.java b/src/main/java/com/purbon/kafka/topology/model/users/Other.java new file mode 100644 index 000000000..ac46a393a --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/model/users/Other.java @@ -0,0 +1,93 @@ +package com.purbon.kafka.topology.model.users; + +import com.purbon.kafka.topology.model.User; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class Other extends User { + + private Optional transactionId; + private Optional idempotence; + private Optional group; + private Optional topic; + + public Other() { + super(); + topic = Optional.empty(); + group = Optional.empty(); + transactionId = Optional.empty(); + idempotence = Optional.empty(); + } + + public Map asMap() { + Map map = new HashMap<>(); + map.put("topic", topicString()); + map.put("group", groupString()); + if (transactionId.isPresent()) { + map.put("transactionId", transactionId.get()); + } + return map; + } + + public String groupString() { + return group.orElse("*"); + } + + public Optional getGroup() { + return group; + } + + public void setGroup(Optional group) { + this.group = group; + } + + public String topicString() { + return topic.orElse(""); + } + + public Optional getTopic() { + return topic; + } + + public void setTopic(Optional topic) { + this.topic = topic; + } + + public Optional getTransactionId() { + return transactionId; + } + + public void setTransactionId(Optional transactionId) { + this.transactionId = transactionId; + } + + public Optional getIdempotence() { + return idempotence; + } + + public void setIdempotence(Optional idempotence) { + this.idempotence = idempotence; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Other)) { + return false; + } + Other other = (Other) o; + return getPrincipal().equals(other.getPrincipal()) + && groupString().equals(other.groupString()) + && getTransactionId().equals(other.getTransactionId()) + && getIdempotence().equals(other.getIdempotence()); + } + + @Override + public int hashCode() { + return Objects.hash(groupString(), getPrincipal(), getTransactionId(), getIdempotence()); + } +} diff --git a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java index 6eb871f64..87a2f5f85 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java @@ -6,9 +6,11 @@ import com.purbon.kafka.topology.Configuration; import com.purbon.kafka.topology.api.adminclient.AclBuilder; import com.purbon.kafka.topology.api.ccloud.CCloudCLI; +import com.purbon.kafka.topology.model.JulieRoleAcl; import com.purbon.kafka.topology.model.users.Connector; import com.purbon.kafka.topology.model.users.Consumer; import com.purbon.kafka.topology.model.users.KSqlApp; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import com.purbon.kafka.topology.model.users.platform.KsqlServerInstance; import com.purbon.kafka.topology.model.users.platform.SchemaRegistryInstance; @@ -133,6 +135,43 @@ public Collection buildBindingsForKSqlApp(KSqlApp app, Strin return toList(ksqlAppStream(app, prefix)); } + @Override + public Collection buildBindingsForJulieRole( + Other other, String name, List acls) throws IOException { + + List bindings = new ArrayList<>(); + for (JulieRoleAcl acl : acls) { + var resourceType = ResourceType.fromString(acl.getResourceType()); + var patternType = PatternType.fromString(acl.getPatternType()); + var aclOperation = AclOperation.fromString(acl.getOperation()); + if (resourceType.isUnknown() || patternType.isUnknown() || aclOperation.isUnknown()) { + throw new IOException( + "Unknown ACL setting being used resourceType=" + + acl.getResourceType() + + " (" + + resourceType + + ")" + + ", patternType=" + + acl.getPatternType() + + " (" + + patternType + + ")" + + ", aclOperation=" + + acl.getOperation() + + " (" + + aclOperation + + ")"); + } + var binding = + new AclBuilder(other.getPrincipal()) + .addResource(resourceType, acl.getResourceName(), patternType) + .addControlEntry(acl.getHost(), aclOperation, AclPermissionType.ALLOW) + .build(); + bindings.add(new TopologyAclBinding(binding)); + } + return bindings; + } + private List toList(Stream bindingStream) { return bindingStream.map(TopologyAclBinding::new).collect(Collectors.toList()); } diff --git a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java index 515d211a3..9ce0dae59 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java @@ -1,5 +1,6 @@ package com.purbon.kafka.topology.roles.rbac; +import static com.purbon.kafka.topology.api.mds.ClusterIDs.KSQL_CLUSTER_ID_LABEL; import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.DEVELOPER_READ; import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.DEVELOPER_WRITE; import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER; @@ -9,9 +10,11 @@ import com.purbon.kafka.topology.BindingsBuilderProvider; import com.purbon.kafka.topology.api.mds.MDSApiClient; import com.purbon.kafka.topology.model.Component; +import com.purbon.kafka.topology.model.JulieRoleAcl; import com.purbon.kafka.topology.model.users.Connector; import com.purbon.kafka.topology.model.users.Consumer; import com.purbon.kafka.topology.model.users.KSqlApp; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import com.purbon.kafka.topology.model.users.platform.KsqlServerInstance; import com.purbon.kafka.topology.model.users.platform.SchemaRegistryInstance; @@ -342,6 +345,55 @@ public Collection buildBindingsForKSqlApp(KSqlApp app, Strin return bindings; } + @Override + public Collection buildBindingsForJulieRole( + Other other, String name, List acls) { + + var stream = acls.stream().map(acl -> julieRoleToBinding(other, acl)); + + return stream.collect(Collectors.toList()); + } + + private TopologyAclBinding julieRoleToBinding(Other other, JulieRoleAcl acl) { + + String resourceType = acl.getResourceType(); + + if (resourceType.equalsIgnoreCase("Subject")) { + String subjectName = acl.getResourceName().replaceFirst("Subject:", "").trim(); + return apiClient + .bind(other.getPrincipal(), acl.getRole()) + .forSchemaSubject(subjectName) + .apply("Subject", subjectName); + } else if (resourceType.equalsIgnoreCase("Connector")) { + String connectorName = acl.getResourceName().replaceFirst("Connector:", "").trim(); + return apiClient + .bind(other.getPrincipal(), acl.getRole()) + .forAKafkaConnector(connectorName) + .apply(acl.getResourceType(), connectorName); + } else if (resourceType.equalsIgnoreCase("KsqlCluster")) { + var clusterIds = apiClient.withClusterIDs().forKsql().asMap(); + var clusterId = clusterIds.get("clusters").get(KSQL_CLUSTER_ID_LABEL); + String resourceName = acl.getResourceName().replaceFirst("KsqlCluster:", "").trim(); + return apiClient + .bind(other.getPrincipal(), acl.getRole()) + .forKSqlServer(clusterId) + .apply(acl.getResourceType(), resourceName); + } + + String resourceName = acl.getResourceName(); + if (resourceName.contains(":")) { + var pos = resourceName.indexOf(":"); + resourceName = resourceName.substring(pos + 1); + } + + return apiClient.bind( + other.getPrincipal(), + acl.getRole(), + resourceName, + acl.getResourceType(), + acl.getPatternType()); + } + @Override public List setClusterLevelRole( String role, String principal, Component component) throws IOException { diff --git a/src/main/java/com/purbon/kafka/topology/serdes/JulieRolesSerdes.java b/src/main/java/com/purbon/kafka/topology/serdes/JulieRolesSerdes.java new file mode 100644 index 000000000..d803b5fcb --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/serdes/JulieRolesSerdes.java @@ -0,0 +1,34 @@ +package com.purbon.kafka.topology.serdes; + +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.purbon.kafka.topology.model.JulieRoles; +import com.purbon.kafka.topology.model.PlanMap; +import java.io.File; +import java.io.IOException; + +public class JulieRolesSerdes { + + ObjectMapper mapper; + + public JulieRolesSerdes() { + mapper = new ObjectMapper(new YAMLFactory()); + mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY); + mapper.registerModule(new SimpleModule()); + mapper.registerModule(new Jdk8Module()); + mapper.findAndRegisterModules(); + } + + public JulieRoles deserialise(File file) throws IOException { + return mapper.readValue(file, JulieRoles.class); + } + + public String serialise(PlanMap planMap) throws JsonProcessingException { + return mapper.writeValueAsString(planMap); + } +} diff --git a/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java b/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java index 9074125fe..2647b466a 100644 --- a/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java +++ b/src/main/java/com/purbon/kafka/topology/serdes/TopologyCustomDeserializer.java @@ -25,6 +25,7 @@ import com.purbon.kafka.topology.model.users.Consumer; import com.purbon.kafka.topology.model.users.KSqlApp; import com.purbon.kafka.topology.model.users.KStream; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import com.purbon.kafka.topology.model.users.Schemas; import com.purbon.kafka.topology.model.users.platform.ControlCenter; @@ -37,6 +38,7 @@ import java.util.*; import java.util.function.Function; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.logging.log4j.LogManager; @@ -72,6 +74,17 @@ public class TopologyCustomDeserializer extends StdDeserializer { private static final String STREAMS_NODE = "streams"; private static final String TABLES_NODE = "tables"; + private static List projectCoreKeys = + Arrays.asList( + NAME_KEY, + CONSUMERS_KEY, + PRODUCERS_KEY, + CONNECTORS_KEY, + STREAMS_KEY, + SCHEMAS_KEY, + KSQL_KEY, + RBAC_KEY); + private final Configuration config; TopologyCustomDeserializer(Configuration config) { @@ -163,23 +176,18 @@ private Project parseProject( JsonParser parser, JsonNode rootNode, Topology topology, Configuration config) throws IOException { + Iterable it = () -> rootNode.fieldNames(); List keys = - Arrays.asList( - CONSUMERS_KEY, - PROJECTS_KEY, - PRODUCERS_KEY, - CONNECTORS_KEY, - STREAMS_KEY, - SCHEMAS_KEY, - KSQL_KEY); - + StreamSupport.stream(it.spliterator(), false) + .filter(key -> !Arrays.asList(TOPICS_KEY, NAME_KEY).contains(key)) + .collect(Collectors.toList()); Map rootNodes = Maps.asMap(new HashSet<>(keys), (key) -> rootNode.get(key)); Map mapOfValues = new HashMap<>(); for (String key : rootNodes.keySet()) { JsonNode keyNode = rootNodes.get(key); if (keyNode != null) { - Optional optionalPlatformSystem = Optional.empty(); + Optional optionalPlatformSystem; switch (key) { case CONSUMERS_KEY: optionalPlatformSystem = doConsumerElements(parser, keyNode); @@ -199,6 +207,11 @@ private Project parseProject( case KSQL_KEY: optionalPlatformSystem = doKSqlElements(parser, keyNode); break; + default: + optionalPlatformSystem = Optional.empty(); + if (!key.equalsIgnoreCase(RBAC_KEY)) { + optionalPlatformSystem = doOtherElements(parser, keyNode); + } } optionalPlatformSystem.ifPresent(ps -> mapOfValues.put(key, ps)); } @@ -214,6 +227,7 @@ private Project parseProject( Optional.ofNullable(mapOfValues.get(SCHEMAS_KEY)), Optional.ofNullable(mapOfValues.get(KSQL_KEY)), parseOptionalRbacRoles(rootNode.get(RBAC_KEY)), + filterOthers(mapOfValues), config); project.setPrefixContextAndOrder(topology.asFullContext(), topology.getOrder()); @@ -233,6 +247,21 @@ private Project parseProject( return project; } + private List>> filterOthers( + Map mapOfValues) { + return mapOfValues.entrySet().stream() + .filter(entry -> !projectCoreKeys.contains(entry.getKey())) + .map(entry -> Map.entry(entry.getKey(), (PlatformSystem) entry.getValue())) + .collect(Collectors.toList()); + } + + private Optional doOtherElements(JsonParser parser, JsonNode node) + throws JsonProcessingException { + List others = + new JsonSerdesUtils().parseApplicationUser(parser, node, Other.class); + return Optional.of(new PlatformSystem(others)); + } + private Optional doConsumerElements(JsonParser parser, JsonNode node) throws JsonProcessingException { List consumers = diff --git a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java index a102c8ac9..e96aa6026 100644 --- a/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java +++ b/src/test/java/com/purbon/kafka/topology/AccessControlManagerTest.java @@ -739,4 +739,74 @@ public void testToProcessWildcardGroupOnlySelectedServiceAccounts() throws IOExc && b.getPrincipal().equals("User:NamespaceB_app2")) .count()); } + + @Test + public void testJulieRoleAclCreation() throws IOException { + Topic topicA = new TopicImpl("topicA"); + Topology topology = + TestTopologyBuilder.createProject() + .addTopic(topicA) + .addConsumer("User:app1") + .addOther("app", "User:app1", "foo") + .buildTopology(); + + Map cliOps = new HashMap<>(); + cliOps.put(BROKERS_OPTION, ""); + + Properties props = new Properties(); + props.put(JULIE_ROLES, TestUtils.getResourceFilename("/roles.yaml")); + + Configuration config = new Configuration(cliOps, props); + + accessControlManager = + new AccessControlManager( + aclsProvider, new AclsBindingsBuilder(config), config.getJulieRoles(), config); + + accessControlManager.apply(topology, plan); + + assertEquals( + 1, + plan.getActions().get(0).getBindings().stream() + .filter( + b -> + b.getResourceType().equals(ResourceType.TOPIC.name()) + && b.getResourceName().equals("foo") + && b.getPrincipal().equals("User:app1")) + .count()); + + assertEquals( + 1, + plan.getActions().get(0).getBindings().stream() + .filter( + b -> + b.getResourceType().equals(ResourceType.TOPIC.name()) + && b.getResourceName().equals("sourceTopic") + && b.getPrincipal().equals("User:app1")) + .count()); + } + + @Test(expected = IOException.class) + public void testWrongJulieRoleAclCreation() throws IOException { + Topic topicA = new TopicImpl("topicA"); + Topology topology = + TestTopologyBuilder.createProject() + .addTopic(topicA) + .addConsumer("User:app1") + .addOther("app", "User:app1", "foo") + .buildTopology(); + + Map cliOps = new HashMap<>(); + cliOps.put(BROKERS_OPTION, ""); + + Properties props = new Properties(); + props.put(JULIE_ROLES, TestUtils.getResourceFilename("/roles-wrong.yaml")); + + Configuration config = new Configuration(cliOps, props); + + accessControlManager = + new AccessControlManager( + aclsProvider, new AclsBindingsBuilder(config), config.getJulieRoles(), config); + + accessControlManager.apply(topology, plan); + } } diff --git a/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java b/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java index e26ccce48..3182e893c 100644 --- a/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java +++ b/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java @@ -11,12 +11,16 @@ import com.purbon.kafka.topology.model.Impl.ProjectImpl; import com.purbon.kafka.topology.model.Impl.TopicImpl; import com.purbon.kafka.topology.model.Impl.TopologyImpl; +import com.purbon.kafka.topology.model.JulieRole; +import com.purbon.kafka.topology.model.JulieRoleAcl; import com.purbon.kafka.topology.model.Project; import com.purbon.kafka.topology.model.Topic; import com.purbon.kafka.topology.model.Topology; import com.purbon.kafka.topology.model.schema.TopicSchemas; import com.purbon.kafka.topology.utils.TestUtils; +import java.io.IOException; import java.util.*; +import java.util.stream.Collectors; import org.junit.Before; import org.junit.Test; @@ -206,4 +210,41 @@ public void testKsqlServerWithoutScheme() { .hasMessageContaining("example.com:8083") .isInstanceOf(IllegalArgumentException.class); } + + @Test + public void testJulieRolesFetch() throws IOException { + + String rolesFile = TestUtils.getResourceFilename("/roles.yaml"); + props.put(JULIE_ROLES, rolesFile); + Configuration config = new Configuration(cliOps, props); + + var roles = config.getJulieRoles(); + assertThat(roles).isNotNull(); + assertThat(roles.getRoles()).hasSize(2); + for (JulieRole role : roles.getRoles()) { + assertThat(role.getName()).isIn("app", "other"); + } + + JulieRole role = roles.get("app"); + List resources = + role.getAcls().stream().map(JulieRoleAcl::getResourceType).collect(Collectors.toList()); + assertThat(resources).contains("Topic", "Group"); + assertThat(role.getName()).isEqualTo("app"); + assertThat(role.getAcls()).hasSize(4); + + role = roles.get("other"); + resources = + role.getAcls().stream().map(JulieRoleAcl::getResourceType).collect(Collectors.toList()); + assertThat(resources).contains("Topic"); + assertThat(role.getName()).isEqualTo("other"); + assertThat(role.getAcls()).hasSize(2); + } + + @Test(expected = IOException.class) + public void testWrongFileJulieRoles() throws IOException { + String rolesFile = TestUtils.getResourceFilename("/descriptor.yaml"); + props.put(JULIE_ROLES, rolesFile); + Configuration config = new Configuration(cliOps, props); + config.getJulieRoles(); + } } diff --git a/src/test/java/com/purbon/kafka/topology/JulieRolesTest.java b/src/test/java/com/purbon/kafka/topology/JulieRolesTest.java new file mode 100644 index 000000000..316ea000c --- /dev/null +++ b/src/test/java/com/purbon/kafka/topology/JulieRolesTest.java @@ -0,0 +1,73 @@ +package com.purbon.kafka.topology; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.purbon.kafka.topology.model.JulieRole; +import com.purbon.kafka.topology.model.JulieRoleAcl; +import com.purbon.kafka.topology.model.JulieRoles; +import com.purbon.kafka.topology.model.Topology; +import com.purbon.kafka.topology.serdes.JulieRolesSerdes; +import com.purbon.kafka.topology.serdes.TopologySerdes; +import com.purbon.kafka.topology.utils.TestUtils; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class JulieRolesTest { + + JulieRolesSerdes parser; + + @Before + public void before() { + this.parser = new JulieRolesSerdes(); + } + + @After + public void after() {} + + @Test + public void testSerdes() throws IOException { + JulieRoles roles = parser.deserialise(TestUtils.getResourceFile("/roles.yaml")); + + assertThat(roles.getRoles()).hasSize(2); + for (JulieRole role : roles.getRoles()) { + assertThat(role.getName()).isIn("app", "other"); + } + + JulieRole role = roles.get("app"); + List resources = + role.getAcls().stream().map(JulieRoleAcl::getResourceType).collect(Collectors.toList()); + assertThat(resources).contains("Topic", "Group"); + assertThat(role.getName()).isEqualTo("app"); + assertThat(role.getAcls()).hasSize(4); + assertThat(role.getAcls().get(0).getRole()).isEqualTo("ALL"); + + role = roles.get("other"); + resources = + role.getAcls().stream().map(JulieRoleAcl::getResourceType).collect(Collectors.toList()); + assertThat(resources).contains("Topic"); + assertThat(role.getName()).isEqualTo("other"); + assertThat(role.getAcls()).hasSize(2); + } + + @Test(expected = IOException.class) + public void testTopologyValidationException() throws IOException { + JulieRoles roles = parser.deserialise(TestUtils.getResourceFile("/roles.yaml")); + TopologySerdes topologySerdes = new TopologySerdes(); + + Topology topology = topologySerdes.deserialise(TestUtils.getResourceFile("/descriptor.yaml")); + roles.validateTopology(topology); + } + + @Test + public void testTopologyValidationCorrect() throws IOException { + JulieRoles roles = parser.deserialise(TestUtils.getResourceFile("/roles-goodTest.yaml")); + TopologySerdes topologySerdes = new TopologySerdes(); + + Topology topology = topologySerdes.deserialise(TestUtils.getResourceFile("/descriptor.yaml")); + roles.validateTopology(topology); + } +} diff --git a/src/test/java/com/purbon/kafka/topology/TestTopologyBuilder.java b/src/test/java/com/purbon/kafka/topology/TestTopologyBuilder.java index cec276edc..5c0e1aa35 100644 --- a/src/test/java/com/purbon/kafka/topology/TestTopologyBuilder.java +++ b/src/test/java/com/purbon/kafka/topology/TestTopologyBuilder.java @@ -7,9 +7,15 @@ import com.purbon.kafka.topology.model.Topic; import com.purbon.kafka.topology.model.Topology; import com.purbon.kafka.topology.model.users.Consumer; +import com.purbon.kafka.topology.model.users.Other; import com.purbon.kafka.topology.model.users.Producer; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -20,6 +26,7 @@ public class TestTopologyBuilder { private Set topics = new HashSet<>(); private final Set consumers = new HashSet<>(); private final Set producers = new HashSet<>(); + private final Collection>> others = new ArrayList<>(); public TestTopologyBuilder() { this(new Configuration(), "ctx", "project"); @@ -49,6 +56,8 @@ public Topology buildTopology() { project.setTopics(new ArrayList<>(topics)); project.setConsumers(new ArrayList<>(consumers)); project.setProducers(new ArrayList<>(producers)); + var map = others.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + project.setOthers(map); return topology; } @@ -82,6 +91,14 @@ public TestTopologyBuilder addConsumer(String user, String group) { return this; } + public TestTopologyBuilder addOther(String roleName, String principal, String topic) { + Other other = new Other(); + other.setPrincipal(principal); + other.setTopic(Optional.of(topic)); + others.add(Map.entry(roleName, Collections.singletonList(other))); + return this; + } + @SuppressWarnings("UnusedReturnValue") public TestTopologyBuilder removeConsumer(String user) { consumers.remove(new Consumer(user)); diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java index cff5b4798..a74ac892f 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java @@ -260,6 +260,21 @@ public void testSchemaSerdes() { assertThat(topicCat.get().getSubjectNameStrategy()).isEqualTo(TOPIC_RECORD_NAME_STRATEGY); } + @Test + public void testOtherSerdes() { + Topology topology = parser.deserialise(TestUtils.getResourceFile("/descriptor.yaml")); + Project project = topology.getProjects().get(0); + + var others = project.getOthers(); + assertThat(others).hasSize(2); + var foos = others.get("foo"); + assertThat(foos.get(0).getPrincipal()).isEqualTo("User:banana"); + assertThat(foos.get(0).groupString()).isEqualTo("foo"); + var bars = others.get("bar"); + assertThat(bars.get(0).getPrincipal()).isEqualTo("User:banana"); + assertThat(bars.get(0).groupString()).isEqualTo("bar"); + } + @Test public void testKsqlSerdes() { Topology topology = parser.deserialise(TestUtils.getResourceFile("/descriptor.yaml")); diff --git a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java index 4726a21b5..69e05dfd2 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java +++ b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java @@ -11,6 +11,7 @@ import com.purbon.kafka.topology.BackendController; import com.purbon.kafka.topology.Configuration; import com.purbon.kafka.topology.ExecutionPlan; +import com.purbon.kafka.topology.TestTopologyBuilder; import com.purbon.kafka.topology.api.adminclient.AclBuilder; import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient; import com.purbon.kafka.topology.integration.containerutils.ContainerFactory; @@ -450,6 +451,36 @@ public void connectAclsCreation() throws ExecutionException, InterruptedExceptio verifyConnectAcls(connector); } + @Test + public void testJulieRoleAclCreation() + throws IOException, ExecutionException, InterruptedException { + Topic topicA = new TopicImpl("topicA"); + Topology topology = + TestTopologyBuilder.createProject() + .addTopic(topicA) + .addConsumer("User:app1") + .addOther("app", "User:app1", "foo") + .buildTopology(); + + Map cliOps = new HashMap<>(); + cliOps.put(BROKERS_OPTION, ""); + + Properties props = new Properties(); + props.put(JULIE_ROLES, TestUtils.getResourceFilename("/roles.yaml")); + + Configuration config = new Configuration(cliOps, props); + + accessControlManager = + new AccessControlManager( + aclsProvider, new AclsBindingsBuilder(config), config.getJulieRoles(), config); + + accessControlManager.apply(topology, plan); + + plan.run(); + + verifyAclsOfSize(7); + } + private void verifyAclsOfSize(int size) throws ExecutionException, InterruptedException { Collection acls = diff --git a/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java b/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java index a25b12eba..431dc2b3c 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java +++ b/src/test/java/com/purbon/kafka/topology/integration/MDSBaseTest.java @@ -34,4 +34,8 @@ protected String getSchemaRegistryClusterID() { protected String getKafkaConnectClusterID() { return "connect-cluster"; } + + protected String getKSqlClusterID() { + return "ksqldb"; + } } diff --git a/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java b/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java index 43623a675..dc741c553 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java +++ b/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java @@ -16,6 +16,7 @@ import com.purbon.kafka.topology.BackendController; import com.purbon.kafka.topology.Configuration; import com.purbon.kafka.topology.ExecutionPlan; +import com.purbon.kafka.topology.TestTopologyBuilder; import com.purbon.kafka.topology.api.mds.MDSApiClient; import com.purbon.kafka.topology.model.Impl.ProjectImpl; import com.purbon.kafka.topology.model.Impl.TopicImpl; @@ -83,6 +84,7 @@ public void before() throws IOException, InterruptedException { apiClient.setKafkaClusterId(getKafkaClusterID()); apiClient.setSchemaRegistryClusterID(getSchemaRegistryClusterID()); apiClient.setConnectClusterID(getKafkaConnectClusterID()); + apiClient.setKSqlClusterID(getKSqlClusterID()); plan = ExecutionPlan.init(cs, System.out); RBACProvider rbacProvider = new RBACProvider(apiClient); @@ -386,6 +388,53 @@ public void testRoleDeleteFlow() throws IOException { assertThat(bindings).hasSize(2); } + @Test + public void testJulieRoleAclCreation() throws IOException { + + BackendController cs = new BackendController(); + ExecutionPlan plan = ExecutionPlan.init(cs, System.out); + RBACProvider rbacProvider = Mockito.spy(new RBACProvider(apiClient)); + RBACBindingsBuilder bindingsBuilder = new RBACBindingsBuilder(apiClient); + String principal = "User:app" + System.currentTimeMillis(); + + Topology topology = + TestTopologyBuilder.createProject().addOther("app", principal, "foo").buildTopology(); + + Map cliOps = new HashMap<>(); + cliOps.put(BROKERS_OPTION, ""); + + Properties props = new Properties(); + props.put(JULIE_ROLES, TestUtils.getResourceFilename("/roles-rbac.yaml")); + + Configuration config = new Configuration(cliOps, props); + + accessControlManager = + new AccessControlManager(rbacProvider, bindingsBuilder, config.getJulieRoles(), config); + + accessControlManager.apply(topology, plan); + + plan.run(); + + List bindings = + getBindings(rbacProvider).stream() + .filter(binding -> binding.getPrincipal().equalsIgnoreCase(principal)) + .collect(Collectors.toList()); + + assertThat(bindings).hasSize(4); + + List roles = apiClient.lookupRoles(principal); + assertTrue(roles.contains(DEVELOPER_READ)); + + roles = + apiClient.lookupRoles( + principal, apiClient.withClusterIDs().forKafka().forKafkaConnect().asMap()); + assertTrue(roles.contains(SECURITY_ADMIN)); + + var clusters = apiClient.withClusterIDs().forKafka().forKsql().asMap(); + roles = apiClient.lookupRoles(principal, clusters); + assertTrue(roles.contains(RESOURCE_OWNER)); + } + private List getBindings(RBACProvider rbacProvider) { return rbacProvider.listAcls().values().stream() .flatMap(Collection::stream) diff --git a/src/test/resources/descriptor.yaml b/src/test/resources/descriptor.yaml index 0c6616e51..f8b6499a5 100644 --- a/src/test/resources/descriptor.yaml +++ b/src/test/resources/descriptor.yaml @@ -3,6 +3,12 @@ context: "contextOrg" source: "source" projects: - name: "foo" + foo: + - principal: "User:banana" + group: "foo" + bar: + - principal: "User:banana" + group: "bar" consumers: - principal: "User:App0" group: "foo" diff --git a/src/test/resources/roles-goodTest.yaml b/src/test/resources/roles-goodTest.yaml new file mode 100644 index 000000000..ebf1645c7 --- /dev/null +++ b/src/test/resources/roles-goodTest.yaml @@ -0,0 +1,42 @@ +--- +roles: + - name: "foo" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "PREFIXED" + host: "*" + operation: "ALL" + permissionType: "ALLOW" + - resourceType: "Topic" + resourceName: "sourceTopic" + patternType: "LITERAL" + host: "*" + operation: "ALL" + permissionType: "READ" + - resourceType: "Topic" + resourceName: "targetTopic" + patternType: "LITERAL" + host: "*" + operation: "ALL" + permissionType: "WRITE" + - resourceType: "Group" + resourceName: "{{group}}" + patternType: "PREFIXED" + host: "*" + operation: "READ" + permissionType: "ALLOW" + - name: "bar" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "LITERAL" + host: "*" + operation: "READ" + permissionType: "ALLOW" + - resourceType: "Group" + resourceName: "" + patternType: "" + host: "" + operation: "" + permissionType: "" \ No newline at end of file diff --git a/src/test/resources/roles-rbac.yaml b/src/test/resources/roles-rbac.yaml new file mode 100644 index 000000000..ab158ce64 --- /dev/null +++ b/src/test/resources/roles-rbac.yaml @@ -0,0 +1,39 @@ +--- +roles: + - name: "app" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "PREFIXED" + host: "*" + role: "ResourceOwner" + - resourceType: "Topic" + resourceName: "sourceTopic" + patternType: "LITERAL" + host: "*" + role: "DeveloperRead" + - resourceType: "Topic" + resourceName: "targetTopic" + patternType: "LITERAL" + host: "*" + role: "DeveloperWrite" + - resourceType: "Group" + resourceName: "{{group}}" + patternType: "PREFIXED" + host: "*" + role: "DeveloperRead" + - resourceType: "Subject" + resourceName: "Subject:foo" + patternType: "LITERAL" + host: "*" + role: "DeveloperRead" + - resourceType: "Connector" + resourceName: "Connector:con" + patternType: "LITERAL" + host: "*" + role: "SecurityAdmin" + - resourceType: "KsqlCluster" + resourceName: "KsqlCluster:ksql-cluster" + patternType: "LITERAL" + host: "*" + role: "ResourceOwner" \ No newline at end of file diff --git a/src/test/resources/roles-wrong.yaml b/src/test/resources/roles-wrong.yaml new file mode 100644 index 000000000..8379a6055 --- /dev/null +++ b/src/test/resources/roles-wrong.yaml @@ -0,0 +1,43 @@ +--- +roles: + - name: "app" + acls: + - resourceType: "Topics" + resourceName: "{{topic}}" + patternType: "PREFIXED" + host: "*" + operation: "ALL" + permissionType: "ALLOW" + role: "RBAC" + - resourceType: "Topic" + resourceName: "sourceTopic" + patternType: "LITERAL" + host: "*" + operation: "ALL" + permissionType: "READ" + - resourceType: "Topic" + resourceName: "targetTopic" + patternType: "LITERAL" + host: "*" + operation: "ALLs" + permissionType: "WRITE" + - resourceType: "Group" + resourceName: "{{group}}" + patternType: "PREFIXED" + host: "*" + operation: "READ" + permissionType: "ALLOW" + - name: "other" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "LITERAL" + host: "*" + operation: "READ" + permissionType: "ALLOW" + - resourceType: "Group" + resourceName: "" + patternType: "" + host: "" + operation: "" + permissionType: "" \ No newline at end of file diff --git a/src/test/resources/roles.yaml b/src/test/resources/roles.yaml new file mode 100644 index 000000000..884aa7918 --- /dev/null +++ b/src/test/resources/roles.yaml @@ -0,0 +1,42 @@ +--- +roles: + - name: "app" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "PREFIXED" + host: "*" + operation: "ALL" + permissionType: "ALLOW" + - resourceType: "Topic" + resourceName: "sourceTopic" + patternType: "LITERAL" + host: "*" + operation: "ALL" + permissionType: "READ" + - resourceType: "Topic" + resourceName: "targetTopic" + patternType: "LITERAL" + host: "*" + operation: "ALL" + permissionType: "WRITE" + - resourceType: "Group" + resourceName: "{{group}}" + patternType: "PREFIXED" + host: "*" + operation: "READ" + permissionType: "ALLOW" + - name: "other" + acls: + - resourceType: "Topic" + resourceName: "{{topic}}" + patternType: "LITERAL" + host: "*" + operation: "READ" + permissionType: "ALLOW" + - resourceType: "Group" + resourceName: "" + patternType: "" + host: "" + operation: "" + permissionType: "" \ No newline at end of file