Skip to content

Commit

Permalink
Add missing Schema Registry ACLs (#297)
Browse files Browse the repository at this point in the history
* Update testcontainers.version to 1.15.3 that includes JNA dependency (testcontainers-java#3834)

* Add required ACLs for schema-registry instances

* Fix typo in schemaregistryinstance class and add CREATE acl for schema-registry to schema topic

* Change offset topic ACL DescribeConfigs to Describe
  • Loading branch information
solita-juusoma authored Sep 17, 2021
1 parent 011b0d0 commit 1ee68c5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@
<commons.version>1.4</commons.version>
<mockito.version>3.6.0</mockito.version>
<junit.version>4.13.1</junit.version>
<testcontainers.version>1.15.2</testcontainers.version>
<testcontainers.version>1.15.3</testcontainers.version>
<jedis.version>3.2.0</jedis.version>
<confluent.version>6.1.0</confluent.version>
<confluent-ce.version>6.1.0-ce</confluent-ce.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,58 @@ public class SchemaRegistryInstance extends User {

private static final String DEFAULT_SCHEMA_TOPIC = "_schemas";

private static final String DEFAULT_CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";

private static final String DEFAULT_SCHEMA_REGISTRY_GROUP = "schema-registry";

@JsonInclude(Include.NON_EMPTY)
private Optional<String> topic;

@JsonInclude(Include.NON_EMPTY)
private Optional<String> consumer_offsets_topic;

@JsonInclude(Include.NON_EMPTY)
private Optional<String> group;

public SchemaRegistryInstance() {
this("");
}

public SchemaRegistryInstance(String principal) {
this(principal, Optional.empty(), Optional.empty());
this(principal, Optional.empty(), Optional.empty(), Optional.empty());
}

public SchemaRegistryInstance(String principal, Optional<String> topic, Optional<String> group) {
public SchemaRegistryInstance(
String principal,
Optional<String> topic,
Optional<String> consumer_offsets_topic,
Optional<String> group) {
super(principal);
this.topic = topic;
this.consumer_offsets_topic = consumer_offsets_topic;
this.group = group;
}

public String topicString() {
return topic.orElse(DEFAULT_SCHEMA_TOPIC);
}

public void setTopic(Optional<String> topic) {
this.topic = topic;
public String consumerOffsetsTopicString() {
return consumer_offsets_topic.orElse(DEFAULT_CONSUMER_OFFSETS_TOPIC);
}

public String groupString() {
return group.orElse(DEFAULT_SCHEMA_REGISTRY_GROUP);
}

public void setTopic(Optional<String> topic) {
this.topic = topic;
}

public void setConsumer_offsets_topic(Optional<String> consumer_offsets_topic) {
this.consumer_offsets_topic = consumer_offsets_topic;
}

public void setGroup(Optional<String> group) {
this.group = group;
}
Expand All @@ -50,6 +69,10 @@ public Optional<String> getTopic() {
return topic;
}

public Optional<String> getConsumer_offsets_topic() {
return consumer_offsets_topic;
}

public Optional<String> getGroup() {
return group;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,29 @@ private Stream<AclBinding> streamsAppStream(
private Stream<AclBinding> schemaRegistryAclsStream(SchemaRegistryInstance schemaRegistry) {
String principal = translate(schemaRegistry.getPrincipal());
List<AclBinding> bindings =
Stream.of(AclOperation.DESCRIBE_CONFIGS, AclOperation.WRITE, AclOperation.READ)
Stream.of(
AclOperation.CREATE,
AclOperation.DESCRIBE_CONFIGS,
AclOperation.DESCRIBE,
AclOperation.WRITE,
AclOperation.READ)
.map(
aclOperation ->
buildTopicLevelAcl(
principal, schemaRegistry.topicString(), PatternType.LITERAL, aclOperation))
.collect(Collectors.toList());

bindings.add(
buildTopicLevelAcl(
principal,
schemaRegistry.consumerOffsetsTopicString(),
PatternType.LITERAL,
AclOperation.DESCRIBE));

bindings.add(
buildGroupLevelAcl(
principal, schemaRegistry.groupString(), PatternType.LITERAL, AclOperation.READ));

return bindings.stream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,18 @@ private void verifySchemaRegistryAcls(Platform platform)

Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

assertEquals(3, acls.size());
ResourcePatternFilter groupResourceFilter =
new ResourcePatternFilter(ResourceType.GROUP, null, PatternType.ANY);

AccessControlEntryFilter groupEntryFilter =
new AccessControlEntryFilter(
sr.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);
AclBindingFilter groupFilter = new AclBindingFilter(groupResourceFilter, groupEntryFilter);

Collection<AclBinding> groupAcls = kafkaAdminClient.describeAcls(groupFilter).values().get();

assertEquals(6, acls.size());
assertEquals(1, groupAcls.size());
}
}

Expand Down

0 comments on commit 1ee68c5

Please sign in to comment.