diff --git a/pom.xml b/pom.xml
index 07f94ccac..98f70a97c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -428,7 +428,7 @@
1.4
3.6.0
4.13.1
- 1.15.2
+ 1.15.3
3.2.0
6.1.0
6.1.0-ce
diff --git a/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java b/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java
index 737431d87..697c4e653 100644
--- a/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java
+++ b/src/main/java/com/purbon/kafka/topology/model/users/platform/SchemaRegistryInstance.java
@@ -9,11 +9,17 @@ public class SchemaRegistryInstance extends User {
private static final String DEFAULT_SCHEMA_TOPIC = "_schemas";
+ private static final String DEFAULT_CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
+
private static final String DEFAULT_SCHEMA_REGISTRY_GROUP = "schema-registry";
@JsonInclude(Include.NON_EMPTY)
private Optional topic;
+ @JsonInclude(Include.NON_EMPTY)
+ private Optional consumer_offsets_topic;
+
+ @JsonInclude(Include.NON_EMPTY)
private Optional group;
public SchemaRegistryInstance() {
@@ -21,12 +27,17 @@ public SchemaRegistryInstance() {
}
public SchemaRegistryInstance(String principal) {
- this(principal, Optional.empty(), Optional.empty());
+ this(principal, Optional.empty(), Optional.empty(), Optional.empty());
}
- public SchemaRegistryInstance(String principal, Optional topic, Optional group) {
+ public SchemaRegistryInstance(
+ String principal,
+ Optional topic,
+ Optional consumer_offsets_topic,
+ Optional group) {
super(principal);
this.topic = topic;
+ this.consumer_offsets_topic = consumer_offsets_topic;
this.group = group;
}
@@ -34,14 +45,22 @@ public String topicString() {
return topic.orElse(DEFAULT_SCHEMA_TOPIC);
}
- public void setTopic(Optional topic) {
- this.topic = topic;
+ public String consumerOffsetsTopicString() {
+ return consumer_offsets_topic.orElse(DEFAULT_CONSUMER_OFFSETS_TOPIC);
}
public String groupString() {
return group.orElse(DEFAULT_SCHEMA_REGISTRY_GROUP);
}
+ public void setTopic(Optional topic) {
+ this.topic = topic;
+ }
+
+ public void setConsumer_offsets_topic(Optional consumer_offsets_topic) {
+ this.consumer_offsets_topic = consumer_offsets_topic;
+ }
+
public void setGroup(Optional group) {
this.group = group;
}
@@ -50,6 +69,10 @@ public Optional getTopic() {
return topic;
}
+ public Optional getConsumer_offsets_topic() {
+ return consumer_offsets_topic;
+ }
+
public Optional getGroup() {
return group;
}
diff --git a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java
index 078ea01eb..bd19fc665 100644
--- a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java
+++ b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java
@@ -213,12 +213,29 @@ private Stream streamsAppStream(
private Stream schemaRegistryAclsStream(SchemaRegistryInstance schemaRegistry) {
String principal = translate(schemaRegistry.getPrincipal());
List bindings =
- Stream.of(AclOperation.DESCRIBE_CONFIGS, AclOperation.WRITE, AclOperation.READ)
+ Stream.of(
+ AclOperation.CREATE,
+ AclOperation.DESCRIBE_CONFIGS,
+ AclOperation.DESCRIBE,
+ AclOperation.WRITE,
+ AclOperation.READ)
.map(
aclOperation ->
buildTopicLevelAcl(
principal, schemaRegistry.topicString(), PatternType.LITERAL, aclOperation))
.collect(Collectors.toList());
+
+ bindings.add(
+ buildTopicLevelAcl(
+ principal,
+ schemaRegistry.consumerOffsetsTopicString(),
+ PatternType.LITERAL,
+ AclOperation.DESCRIBE));
+
+ bindings.add(
+ buildGroupLevelAcl(
+ principal, schemaRegistry.groupString(), PatternType.LITERAL, AclOperation.READ));
+
return bindings.stream();
}
diff --git a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
index b87c4ce39..12520e899 100644
--- a/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
+++ b/src/test/java/com/purbon/kafka/topology/integration/AccessControlManagerIT.java
@@ -509,7 +509,18 @@ private void verifySchemaRegistryAcls(Platform platform)
Collection acls = kafkaAdminClient.describeAcls(filter).values().get();
- assertEquals(3, acls.size());
+ ResourcePatternFilter groupResourceFilter =
+ new ResourcePatternFilter(ResourceType.GROUP, null, PatternType.ANY);
+
+ AccessControlEntryFilter groupEntryFilter =
+ new AccessControlEntryFilter(
+ sr.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);
+ AclBindingFilter groupFilter = new AclBindingFilter(groupResourceFilter, groupEntryFilter);
+
+ Collection groupAcls = kafkaAdminClient.describeAcls(groupFilter).values().get();
+
+ assertEquals(6, acls.size());
+ assertEquals(1, groupAcls.size());
}
}