From 1ee68c5f077bff40ae63e6a5200a60d3caf5b830 Mon Sep 17 00:00:00 2001
From: Juuso Mantila <82364577+solita-juusoma@users.noreply.github.com>
Date: Fri, 17 Sep 2021 17:44:47 +0300
Subject: [PATCH] Add missing Schema Registry ACLs (#297)
* Update testcontainers.version to 1.15.3 that includes JNA dependency (testcontainers-java#3834)
* Add required ACLs for schema-registry instances
* Fix typo in schemaregistryinstance class and add CREATE acl for schema-registry to schema topic
* Change offset topic ACL DescribeConfigs to Describe
---
pom.xml | 2 +-
.../platform/SchemaRegistryInstance.java | 31 ++++++++++++++++---
.../roles/acls/AclsBindingsBuilder.java | 19 +++++++++++-
.../integration/AccessControlManagerIT.java | 13 +++++++-
4 files changed, 58 insertions(+), 7 deletions(-)
diff --git a/pom.xml b/pom.xml
index 07f94ccac..98f70a97c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -428,7 +428,7 @@
1.4
3.6.0
4.13.1
- 1.15.2
+ 1.15.3
3.2.0
6.1.0
6.1.0-ce
diff --git a/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java b/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java
index 737431d87..697c4e653 100644
--- a/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java
+++ b/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java
@@ -9,11 +9,17 @@ public class SchemaRegistryInstance extends User {
private static final String DEFAULT_SCHEMA_TOPIC = "_schemas";
+ private static final String DEFAULT_CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
+
private static final String DEFAULT_SCHEMA_REGISTRY_GROUP = "schema-registry";
@JsonInclude(Include.NON_EMPTY)
private Optional topic;
+ @JsonInclude(Include.NON_EMPTY)
+ private Optional consumer_offsets_topic;
+
+ @JsonInclude(Include.NON_EMPTY)
private Optional group;
public SchemaRegistryInstance() {
@@ -21,12 +27,17 @@ public SchemaRegistryInstance() {
}
public SchemaRegistryInstance(String principal) {
- this(principal, Optional.empty(), Optional.empty());
+ this(principal, Optional.empty(), Optional.empty(), Optional.empty());
}
- public SchemaRegistryInstance(String principal, Optional topic, Optional group) {
+ public SchemaRegistryInstance(
+ String principal,
+ Optional topic,
+ Optional consumer_offsets_topic,
+ Optional group) {
super(principal);
this.topic = topic;
+ this.consumer_offsets_topic = consumer_offsets_topic;
this.group = group;
}
@@ -34,14 +45,22 @@ public String topicString() {
return topic.orElse(DEFAULT_SCHEMA_TOPIC);
}
- public void setTopic(Optional topic) {
- this.topic = topic;
+ public String consumerOffsetsTopicString() {
+ return consumer_offsets_topic.orElse(DEFAULT_CONSUMER_OFFSETS_TOPIC);
}
public String groupString() {
return group.orElse(DEFAULT_SCHEMA_REGISTRY_GROUP);
}
+ public void setTopic(Optional topic) {
+ this.topic = topic;
+ }
+
+ public void setConsumer_offsets_topic(Optional consumer_offsets_topic) {
+ this.consumer_offsets_topic = consumer_offsets_topic;
+ }
+
public void setGroup(Optional group) {
this.group = group;
}
@@ -50,6 +69,10 @@ public Optional getTopic() {
return topic;
}
+ public Optional getConsumer_offsets_topic() {
+ return consumer_offsets_topic;
+ }
+
public Optional getGroup() {
return group;
}
diff --git a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java
index a89eb2218..6eb871f64 100644
--- a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java
+++ b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java
@@ -213,12 +213,29 @@ private Stream streamsAppStream(
private Stream schemaRegistryAclsStream(SchemaRegistryInstance schemaRegistry) {
String principal = translate(schemaRegistry.getPrincipal());
List bindings =
- Stream.of(AclOperation.DESCRIBE_CONFIGS, AclOperation.WRITE, AclOperation.READ)
+ Stream.of(
+ AclOperation.CREATE,
+ AclOperation.DESCRIBE_CONFIGS,
+ AclOperation.DESCRIBE,
+ AclOperation.WRITE,
+ AclOperation.READ)
.map(
aclOperation ->
buildTopicLevelAcl(
principal, schemaRegistry.topicString(), PatternType.LITERAL, aclOperation))
.collect(Collectors.toList());
+
+ bindings.add(
+ buildTopicLevelAcl(
+ principal,
+ schemaRegistry.consumerOffsetsTopicString(),
+ PatternType.LITERAL,
+ AclOperation.DESCRIBE));
+
+ bindings.add(
+ buildGroupLevelAcl(
+ principal, schemaRegistry.groupString(), PatternType.LITERAL, AclOperation.READ));
+
return bindings.stream();
}
diff --git a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
index 11858abdf..4726a21b5 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
@@ -509,7 +509,18 @@ private void verifySchemaRegistryAcls(Platform platform)
Collection acls = kafkaAdminClient.describeAcls(filter).values().get();
- assertEquals(3, acls.size());
+ ResourcePatternFilter groupResourceFilter =
+ new ResourcePatternFilter(ResourceType.GROUP, null, PatternType.ANY);
+
+ AccessControlEntryFilter groupEntryFilter =
+ new AccessControlEntryFilter(
+ sr.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);
+ AclBindingFilter groupFilter = new AclBindingFilter(groupResourceFilter, groupEntryFilter);
+
+ Collection groupAcls = kafkaAdminClient.describeAcls(groupFilter).values().get();
+
+ assertEquals(6, acls.size());
+ assertEquals(1, groupAcls.size());
}
}