Skip to content

Commit

Permalink
Configurable RBAC role bindings for schema registry subjects (#283) (#…
Browse files Browse the repository at this point in the history
…284)

Co-authored-by: lsolovey <leonidsolovey@crd.com>
Co-authored-by: Pere Urbón <purbon@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 17, 2021
1 parent 1ee68c5 commit d56b063
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 12 deletions.
10 changes: 9 additions & 1 deletion docs/futures/what-acl-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ It is possible in RBAC to assign permission for a given principal to access a gi
This is possible with Julie Ops with a topology like the one below, where *User:App0* will
have access to schemas in subjects *transactions* and *User:App1* to subject *contracts*.

By default, Julie Ops grants `ResourceOwner` role for subjects, and creates non-prefixed (literal) role bindings.
It's possible to specify different role, and create prefixed role bindings for subjects,
as shown in the example below for *User:App2*.

.. code-block:: YAML
---
Expand All @@ -417,7 +421,11 @@ have access to schemas in subjects *transactions* and *User:App1* to subject *co
- principal: "User:App1"
subjects:
- "contracts"
- principal: "User:App2"
subjects:
- "myapp"
role: "DeveloperRead"
prefixed: true
Cluster wide roles
^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ List<TopologyAclBinding> buildBindingsForSchemaRegistry(SchemaRegistryInstance s

List<TopologyAclBinding> buildBindingsForControlCenter(String principal, String appId);

default List<TopologyAclBinding> setSchemaAuthorization(String principal, List<String> subjects) {
default List<TopologyAclBinding> setSchemaAuthorization(
String principal, List<String> subjects, String role, boolean prefixed) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ public BuildBindingsForSchemaAuthorization(
protected void execute() throws IOException {
bindings =
builderProvider.setSchemaAuthorization(
schemaAuthorization.getPrincipal(), schemaAuthorization.getSubjects());
schemaAuthorization.getPrincipal(),
schemaAuthorization.getSubjects(),
schemaAuthorization.getRole(),
schemaAuthorization.isPrefixed());
}

@Override
Expand All @@ -32,6 +35,8 @@ protected Map<String, Object> props() {
map.put("Operation", getClass().getName());
map.put("Principal", schemaAuthorization.getPrincipal());
map.put("Subjects", schemaAuthorization.getSubjects());
map.put("Role", schemaAuthorization.getRole());
map.put("IsPrefixed", schemaAuthorization.isPrefixed());
return map;
}
}
25 changes: 25 additions & 0 deletions src/main/java/com/purbon/kafka/topology/model/users/Schemas.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package com.purbon.kafka.topology.model.users;

import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER;

import com.purbon.kafka.topology.model.User;
import java.util.List;

public class Schemas extends User {

private List<String> subjects;
private String role;
private boolean prefixed;

public Schemas() {
super("");

// using default role RESOURCE_OWNER and non-prefixed (literal) binding
// for backward compatibility
this.role = RESOURCE_OWNER;
this.prefixed = false;
}

public List<String> getSubjects() {
Expand All @@ -18,4 +27,20 @@ public List<String> getSubjects() {
public void setSubjects(List<String> subjects) {
this.subjects = subjects;
}

public String getRole() {
return role;
}

public void setRole(String role) {
this.role = role;
}

public boolean isPrefixed() {
return prefixed;
}

public void setPrefixed(boolean prefixed) {
this.prefixed = prefixed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.resource.PatternType;

public class RBACBindingsBuilder implements BindingsBuilderProvider {

Expand Down Expand Up @@ -364,14 +365,14 @@ public List<TopologyAclBinding> setClusterLevelRole(
}

@Override
public List<TopologyAclBinding> setSchemaAuthorization(String principal, List<String> subjects) {
public List<TopologyAclBinding> setSchemaAuthorization(
String principal, List<String> subjects, String role, boolean prefixed) {

String patternType = prefixed ? PatternType.PREFIXED.name() : PatternType.LITERAL.name();
return subjects.stream()
.map(
subject ->
apiClient
.bind(principal, RESOURCE_OWNER)
.forSchemaSubject(subject)
.apply("Subject", subject))
apiClient.bind(principal, role).forSchemaSubject(subject, patternType).apply())
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
Expand Down
34 changes: 30 additions & 4 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static com.purbon.kafka.topology.Constants.*;
import static com.purbon.kafka.topology.model.SubjectNameStrategy.TOPIC_NAME_STRATEGY;
import static com.purbon.kafka.topology.model.SubjectNameStrategy.TOPIC_RECORD_NAME_STRATEGY;
import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.DEVELOPER_READ;
import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -22,6 +24,7 @@
import com.purbon.kafka.topology.model.users.KSqlApp;
import com.purbon.kafka.topology.model.users.KStream;
import com.purbon.kafka.topology.model.users.Producer;
import com.purbon.kafka.topology.model.users.Schemas;
import com.purbon.kafka.topology.model.users.platform.ControlCenterInstance;
import com.purbon.kafka.topology.model.users.platform.KsqlServerInstance;
import com.purbon.kafka.topology.model.users.platform.SchemaRegistryInstance;
Expand Down Expand Up @@ -350,10 +353,25 @@ public void testWithRBACDescriptor() {
Project myProject = topology.getProjects().get(0);

assertEquals(2, myProject.getRbacRawRoles().size());
assertEquals(2, myProject.getSchemas().size());
assertEquals("User:App0", myProject.getSchemas().get(0).getPrincipal());
assertEquals(1, myProject.getSchemas().get(0).getSubjects().size());

assertEquals(3, myProject.getSchemas().size());
assertSchemas(
myProject.getSchemas().get(0),
"User:App0",
Collections.singletonList("transactions"),
RESOURCE_OWNER,
false);
assertSchemas(
myProject.getSchemas().get(1),
"User:App1",
Collections.singletonList("contracts"),
RESOURCE_OWNER,
false);
assertSchemas(
myProject.getSchemas().get(2),
"User:App2",
Collections.singletonList("myapp"),
DEVELOPER_READ,
true);
Connector connector = myProject.getConnectors().get(0);
assertEquals(true, connector.getConnectors().isPresent());
assertEquals("jdbc-sync", connector.getConnectors().get().get(0));
Expand Down Expand Up @@ -501,4 +519,12 @@ private List<Consumer> buildConsumers() {
consumers.add(new Consumer("app2"));
return consumers;
}

private void assertSchemas(
Schemas schemas, String principal, List<String> subjects, String role, boolean prefixed) {
assertEquals("Schemas principal", principal, schemas.getPrincipal());
assertEquals("Schemas subjects", subjects, schemas.getSubjects());
assertEquals("Schemas role", role, schemas.getRole());
assertEquals("Schemas isPrefixed", prefixed, schemas.isPrefixed());
}
}
5 changes: 5 additions & 0 deletions src/test/resources/descriptor-with-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ projects:
- principal: "User:App1"
subjects:
- "contracts"
- principal: "User:App2"
subjects:
- "myapp"
role: "DeveloperRead"
prefixed: true
rbac:
- ResourceOwner:
- principal: "User:Foo"
Expand Down

0 comments on commit d56b063

Please sign in to comment.