Skip to content

Commit

Permalink
#270 support prefix for producer transaction id and consumer group (#279
Browse files Browse the repository at this point in the history
)

* added support for resource prefix to be used for producer transaction id and consumer group

* updated to reference all arg constructors to consumer and producer

Co-authored-by: Magnus Smith <magnus.smith@engineering.digital.dwp.gov.uk>
  • Loading branch information
MagnusSmith and Magnus Smith authored May 12, 2021
1 parent b12a2c8 commit c039805
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ public Consumer() {
}

public Consumer(String principal) {
this(principal, null);
}

public Consumer(String principal, String group) {
super(principal);
group = Optional.empty();
this.group = Optional.ofNullable(group);
}

public String groupString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ public Producer() {
}

public Producer(String principal) {
this(principal, null, null);
}

public Producer(String principal, String transactionId, Boolean idempotence) {
super(principal);
transactionId = Optional.empty();
idempotence = Optional.empty();
this.transactionId = Optional.ofNullable(transactionId);
this.idempotence = Optional.ofNullable(idempotence);
}

public Optional<String> getTransactionId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ private Stream<AclBinding> producerAclsStream(Producer producer, String topic, b
bindings.add(
buildTransactionIdLevelAcl(
producer.getPrincipal(),
transactionId,
PatternType.LITERAL,
evaluateResourcePattern(transactionId),
evaluateResourcePatternType(transactionId),
AclOperation.DESCRIBE));
bindings.add(
buildTransactionIdLevelAcl(
producer.getPrincipal(),
transactionId,
PatternType.LITERAL,
evaluateResourcePattern(transactionId),
evaluateResourcePatternType(transactionId),
AclOperation.WRITE));
});

Expand All @@ -170,7 +170,11 @@ private Stream<AclBinding> consumerAclsStream(Consumer consumer, String topic, b
return Stream.of(
buildTopicLevelAcl(principal, topic, patternType, AclOperation.DESCRIBE),
buildTopicLevelAcl(principal, topic, patternType, AclOperation.READ),
buildGroupLevelAcl(principal, consumer.groupString(), patternType, AclOperation.READ));
buildGroupLevelAcl(
principal,
evaluateResourcePattern(consumer.groupString()),
evaluateResourcePatternType(consumer.groupString()),
AclOperation.READ));
}

private Stream<AclBinding> streamsAppStream(
Expand Down Expand Up @@ -287,4 +291,16 @@ private AclBinding buildGroupLevelAcl(
.addControlEntry("*", op, AclPermissionType.ALLOW)
.build();
}

private boolean isResourcePrefixed(String res) {
return res.length() > 1 && res.endsWith("*");
}

private String evaluateResourcePattern(String res) {
return isResourcePrefixed(res) ? res.replaceFirst(".$", "") : res;
}

private PatternType evaluateResourcePatternType(String res) {
return isResourcePrefixed(res) ? PatternType.PREFIXED : PatternType.LITERAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
Expand Down Expand Up @@ -56,6 +55,23 @@ public void testConsumerAclsBuilder() {
.contains(buildGroupLevelAcl("User:foo", "*", PatternType.LITERAL, AclOperation.READ));
}

@Test
public void testConsumerAclsBuilderWithGroupPrefix() {

Consumer consumer = new Consumer("User:foo", "foo*");

List<TopologyAclBinding> aclBindings =
builder.buildBindingsForConsumers(Collections.singleton(consumer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(3);
assertThat(aclBindings)
.contains(buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.READ));
assertThat(aclBindings)
.contains(
buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.DESCRIBE));
assertThat(aclBindings)
.contains(buildGroupLevelAcl("User:foo", "foo", PatternType.PREFIXED, AclOperation.READ));
}

@Test
public void testProducerAclsBuilder() {
Producer producer = new Producer("User:foo");
Expand All @@ -71,8 +87,7 @@ public void testProducerAclsBuilder() {

@Test
public void testProducerWithTxIdAclsBuilder() {
Producer producer = new Producer("User:foo");
producer.setTransactionId(Optional.of("1234"));
Producer producer = new Producer("User:foo", "1234", true);
List<TopologyAclBinding> aclBindings =
builder.buildBindingsForProducers(Collections.singleton(producer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(5);
Expand Down Expand Up @@ -103,10 +118,36 @@ public void testProducerWithTxIdAclsBuilder() {
.contains(buildClusterLevelAcl(producer.getPrincipal(), AclOperation.IDEMPOTENT_WRITE));
}

@Test
public void testProducerWithTxIdPrefixAclsBuilder() {
Producer producer = new Producer("User:foo", "foo*", true);
List<TopologyAclBinding> aclBindings =
builder.buildBindingsForProducers(Collections.singleton(producer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(5);

assertThat(aclBindings)
.contains(buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.WRITE));
assertThat(aclBindings)
.contains(
buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.DESCRIBE));

assertThat(aclBindings)
.contains(
buildTransactionIdLevelAcl(
"User:foo", "foo", PatternType.PREFIXED, AclOperation.DESCRIBE));

assertThat(aclBindings)
.contains(
buildTransactionIdLevelAcl(
"User:foo", "foo", PatternType.PREFIXED, AclOperation.WRITE));

assertThat(aclBindings)
.contains(buildClusterLevelAcl(producer.getPrincipal(), AclOperation.IDEMPOTENT_WRITE));
}

@Test
public void testIdempotenceProducerAclsBuilder() {
Producer producer = new Producer("User:foo");
producer.setIdempotence(Optional.of(true));
Producer producer = new Producer("User:foo", null, true);
List<TopologyAclBinding> aclBindings =
builder.buildBindingsForProducers(Collections.singleton(producer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.purbon.kafka.topology.model.users.Producer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -78,8 +77,7 @@ public TestTopologyBuilder addConsumer(String user) {
}

public TestTopologyBuilder addConsumer(String user, String group) {
Consumer consumer = new Consumer(user);
consumer.setGroup(Optional.of(group));
Consumer consumer = new Consumer(user, group);
consumers.add(consumer);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ public void producerWithTxAclsCreation()
throws ExecutionException, InterruptedException, IOException {

List<Producer> producers = new ArrayList<>();
Producer producer = new Producer("User:Producer12");
producer.setTransactionId(Optional.of("1234"));
Producer producer = new Producer("User:Producer12", "1234", true);
producers.add(producer);

Project project = new ProjectImpl("project");
Expand All @@ -245,8 +244,7 @@ public void producerWithIdempotenceAclsCreation()
throws ExecutionException, InterruptedException, IOException {

List<Producer> producers = new ArrayList<>();
Producer producer = new Producer("User:Producer13");
producer.setIdempotence(Optional.of(true));
Producer producer = new Producer("User:Producer13", null, true);
producers.add(producer);

Project project = new ProjectImpl("project");
Expand Down

0 comments on commit c039805

Please sign in to comment.