From d56b063bf0dc577917111c5364bf921ac69e9861 Mon Sep 17 00:00:00 2001
From: Leonid Solovey <38634160+lsolovey@users.noreply.github.com>
Date: Fri, 17 Sep 2021 17:58:24 +0300
Subject: [PATCH] Configurable RBAC role bindings for schema registry subjects
 (#283) (#284)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Co-authored-by: lsolovey <leonidsolovey@crd.com>
Co-authored-by: Pere Urbón <purbon@users.noreply.github.com>
---
 docs/futures/what-acl-management.rst          | 10 +++++-
 .../topology/BindingsBuilderProvider.java     |  3 +-
 .../BuildBindingsForSchemaAuthorization.java  |  7 +++-
 .../kafka/topology/model/users/Schemas.java   | 25 ++++++++++++++
 .../roles/rbac/RBACBindingsBuilder.java       | 11 +++---
 .../kafka/topology/TopologySerdesTest.java    | 34 ++++++++++++++++---
 src/test/resources/descriptor-with-rbac.yaml  |  5 +++
 7 files changed, 83 insertions(+), 12 deletions(-)

diff --git a/docs/futures/what-acl-management.rst b/docs/futures/what-acl-management.rst
index c67114506..a921c587d 100644
--- a/docs/futures/what-acl-management.rst
+++ b/docs/futures/what-acl-management.rst
@@ -391,6 +391,10 @@ It is possible in RBAC to assign permission for a given principal to access a gi
 This is possible with Julie Ops with a topology like the one below, where *User:App0* will
 have access to schemas in subjects *transactions* and *User:App1* to subject *contracts*.
 
+By default, Julie Ops grants `ResourceOwner` role for subjects, and creates non-prefixed (literal) role bindings.
+It's possible to specify different role, and create prefixed role bindings for subjects,
+as shown in the example below for *User:App2*.
+
 .. code-block:: YAML
 
   ---
@@ -417,7 +421,11 @@ have access to schemas in subjects *transactions* and *User:App1* to subject *co
           - principal: "User:App1"
             subjects:
               - "contracts"
-
+          - principal: "User:App2"
+            subjects:
+              - "myapp"
+            role: "DeveloperRead"
+            prefixed: true
 
 Cluster wide roles
 ^^^^^^^^^^^
diff --git a/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java b/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java
index d0a9d41e7..97eb72767 100644
--- a/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java
+++ b/src/main/java/com/purbon/kafka/topology/BindingsBuilderProvider.java
@@ -38,7 +38,8 @@ List<TopologyAclBinding> buildBindingsForSchemaRegistry(SchemaRegistryInstance s
 
   List<TopologyAclBinding> buildBindingsForControlCenter(String principal, String appId);
 
-  default List<TopologyAclBinding> setSchemaAuthorization(String principal, List<String> subjects) {
+  default List<TopologyAclBinding> setSchemaAuthorization(
+      String principal, List<String> subjects, String role, boolean prefixed) {
     return Collections.emptyList();
   }
 
diff --git a/src/main/java/com/purbon/kafka/topology/actions/access/builders/rbac/BuildBindingsForSchemaAuthorization.java b/src/main/java/com/purbon/kafka/topology/actions/access/builders/rbac/BuildBindingsForSchemaAuthorization.java
index f9368bd35..904faa552 100644
--- a/src/main/java/com/purbon/kafka/topology/actions/access/builders/rbac/BuildBindingsForSchemaAuthorization.java
+++ b/src/main/java/com/purbon/kafka/topology/actions/access/builders/rbac/BuildBindingsForSchemaAuthorization.java
@@ -23,7 +23,10 @@ public BuildBindingsForSchemaAuthorization(
   protected void execute() throws IOException {
     bindings =
         builderProvider.setSchemaAuthorization(
-            schemaAuthorization.getPrincipal(), schemaAuthorization.getSubjects());
+            schemaAuthorization.getPrincipal(),
+            schemaAuthorization.getSubjects(),
+            schemaAuthorization.getRole(),
+            schemaAuthorization.isPrefixed());
   }
 
   @Override
@@ -32,6 +35,8 @@ protected Map<String, Object> props() {
     map.put("Operation", getClass().getName());
     map.put("Principal", schemaAuthorization.getPrincipal());
     map.put("Subjects", schemaAuthorization.getSubjects());
+    map.put("Role", schemaAuthorization.getRole());
+    map.put("IsPrefixed", schemaAuthorization.isPrefixed());
     return map;
   }
 }
diff --git a/src/main/java/com/purbon/kafka/topology/model/users/Schemas.java b/src/main/java/com/purbon/kafka/topology/model/users/Schemas.java
index 3d4c80c64..cb800ed02 100644
--- a/src/main/java/com/purbon/kafka/topology/model/users/Schemas.java
+++ b/src/main/java/com/purbon/kafka/topology/model/users/Schemas.java
@@ -1,14 +1,23 @@
 package com.purbon.kafka.topology.model.users;
 
+import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER;
+
 import com.purbon.kafka.topology.model.User;
 import java.util.List;
 
 public class Schemas extends User {
 
   private List<String> subjects;
+  private String role;
+  private boolean prefixed;
 
   public Schemas() {
     super("");
+
+    // using default role RESOURCE_OWNER and non-prefixed (literal) binding
+    // for backward compatibility
+    this.role = RESOURCE_OWNER;
+    this.prefixed = false;
   }
 
   public List<String> getSubjects() {
@@ -18,4 +27,20 @@ public List<String> getSubjects() {
   public void setSubjects(List<String> subjects) {
     this.subjects = subjects;
   }
+
+  public String getRole() {
+    return role;
+  }
+
+  public void setRole(String role) {
+    this.role = role;
+  }
+
+  public boolean isPrefixed() {
+    return prefixed;
+  }
+
+  public void setPrefixed(boolean prefixed) {
+    this.prefixed = prefixed;
+  }
 }
diff --git a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java
index a1cca0dc5..515d211a3 100644
--- a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java
+++ b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java
@@ -27,6 +27,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.kafka.common.resource.PatternType;
 
 public class RBACBindingsBuilder implements BindingsBuilderProvider {
 
@@ -364,14 +365,14 @@ public List<TopologyAclBinding> setClusterLevelRole(
   }
 
   @Override
-  public List<TopologyAclBinding> setSchemaAuthorization(String principal, List<String> subjects) {
+  public List<TopologyAclBinding> setSchemaAuthorization(
+      String principal, List<String> subjects, String role, boolean prefixed) {
+
+    String patternType = prefixed ? PatternType.PREFIXED.name() : PatternType.LITERAL.name();
     return subjects.stream()
         .map(
             subject ->
-                apiClient
-                    .bind(principal, RESOURCE_OWNER)
-                    .forSchemaSubject(subject)
-                    .apply("Subject", subject))
+                apiClient.bind(principal, role).forSchemaSubject(subject, patternType).apply())
         .filter(Objects::nonNull)
         .collect(Collectors.toList());
   }
diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
index d6bc3c1c7..cff5b4798 100644
--- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
+++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
@@ -4,6 +4,8 @@
 import static com.purbon.kafka.topology.Constants.*;
 import static com.purbon.kafka.topology.model.SubjectNameStrategy.TOPIC_NAME_STRATEGY;
 import static com.purbon.kafka.topology.model.SubjectNameStrategy.TOPIC_RECORD_NAME_STRATEGY;
+import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.DEVELOPER_READ;
+import static com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles.RESOURCE_OWNER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -22,6 +24,7 @@
 import com.purbon.kafka.topology.model.users.KSqlApp;
 import com.purbon.kafka.topology.model.users.KStream;
 import com.purbon.kafka.topology.model.users.Producer;
+import com.purbon.kafka.topology.model.users.Schemas;
 import com.purbon.kafka.topology.model.users.platform.ControlCenterInstance;
 import com.purbon.kafka.topology.model.users.platform.KsqlServerInstance;
 import com.purbon.kafka.topology.model.users.platform.SchemaRegistryInstance;
@@ -350,10 +353,25 @@ public void testWithRBACDescriptor() {
     Project myProject = topology.getProjects().get(0);
 
     assertEquals(2, myProject.getRbacRawRoles().size());
-    assertEquals(2, myProject.getSchemas().size());
-    assertEquals("User:App0", myProject.getSchemas().get(0).getPrincipal());
-    assertEquals(1, myProject.getSchemas().get(0).getSubjects().size());
-
+    assertEquals(3, myProject.getSchemas().size());
+    assertSchemas(
+        myProject.getSchemas().get(0),
+        "User:App0",
+        Collections.singletonList("transactions"),
+        RESOURCE_OWNER,
+        false);
+    assertSchemas(
+        myProject.getSchemas().get(1),
+        "User:App1",
+        Collections.singletonList("contracts"),
+        RESOURCE_OWNER,
+        false);
+    assertSchemas(
+        myProject.getSchemas().get(2),
+        "User:App2",
+        Collections.singletonList("myapp"),
+        DEVELOPER_READ,
+        true);
     Connector connector = myProject.getConnectors().get(0);
     assertEquals(true, connector.getConnectors().isPresent());
     assertEquals("jdbc-sync", connector.getConnectors().get().get(0));
@@ -501,4 +519,12 @@ private List<Consumer> buildConsumers() {
     consumers.add(new Consumer("app2"));
     return consumers;
   }
+
+  private void assertSchemas(
+      Schemas schemas, String principal, List<String> subjects, String role, boolean prefixed) {
+    assertEquals("Schemas principal", principal, schemas.getPrincipal());
+    assertEquals("Schemas subjects", subjects, schemas.getSubjects());
+    assertEquals("Schemas role", role, schemas.getRole());
+    assertEquals("Schemas isPrefixed", prefixed, schemas.isPrefixed());
+  }
 }
diff --git a/src/test/resources/descriptor-with-rbac.yaml b/src/test/resources/descriptor-with-rbac.yaml
index 29f996371..5f5ee1fc0 100644
--- a/src/test/resources/descriptor-with-rbac.yaml
+++ b/src/test/resources/descriptor-with-rbac.yaml
@@ -36,6 +36,11 @@ projects:
       - principal: "User:App1"
         subjects:
           - "contracts"
+      - principal: "User:App2"
+        subjects:
+          - "myapp"
+        role: "DeveloperRead"
+        prefixed: true
     rbac:
       - ResourceOwner:
           - principal: "User:Foo"