Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#270 support prefix for producer transaction id and consumer group #279

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ public Consumer() {
}

public Consumer(String principal) {
this(principal, null);
}

public Consumer(String principal, String group) {
super(principal);
group = Optional.empty();
this.group = Optional.ofNullable(group);
}

public String groupString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ public Producer() {
}

public Producer(String principal) {
this(principal, null, null);
}

public Producer(String principal, String transactionId, Boolean idempotence) {
super(principal);
transactionId = Optional.empty();
idempotence = Optional.empty();
this.transactionId = Optional.ofNullable(transactionId);
this.idempotence = Optional.ofNullable(idempotence);
}

public Optional<String> getTransactionId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ private Stream<AclBinding> producerAclsStream(Producer producer, String topic, b
bindings.add(
buildTransactionIdLevelAcl(
producer.getPrincipal(),
transactionId,
PatternType.LITERAL,
evaluateResourcePattern(transactionId),
evaluateResourcePatternType(transactionId),
AclOperation.DESCRIBE));
bindings.add(
buildTransactionIdLevelAcl(
producer.getPrincipal(),
transactionId,
PatternType.LITERAL,
evaluateResourcePattern(transactionId),
evaluateResourcePatternType(transactionId),
AclOperation.WRITE));
});

Expand All @@ -170,7 +170,11 @@ private Stream<AclBinding> consumerAclsStream(Consumer consumer, String topic, b
return Stream.of(
buildTopicLevelAcl(principal, topic, patternType, AclOperation.DESCRIBE),
buildTopicLevelAcl(principal, topic, patternType, AclOperation.READ),
buildGroupLevelAcl(principal, consumer.groupString(), patternType, AclOperation.READ));
buildGroupLevelAcl(
principal,
evaluateResourcePattern(consumer.groupString()),
evaluateResourcePatternType(consumer.groupString()),
AclOperation.READ));
}

private Stream<AclBinding> streamsAppStream(
Expand Down Expand Up @@ -287,4 +291,16 @@ private AclBinding buildGroupLevelAcl(
.addControlEntry("*", op, AclPermissionType.ALLOW)
.build();
}

private boolean isResourcePrefixed(String res) {
return res.length() > 1 && res.endsWith("*");
}

private String evaluateResourcePattern(String res) {
return isResourcePrefixed(res) ? res.replaceFirst(".$", "") : res;
}

private PatternType evaluateResourcePatternType(String res) {
return isResourcePrefixed(res) ? PatternType.PREFIXED : PatternType.LITERAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
Expand Down Expand Up @@ -56,6 +55,23 @@ public void testConsumerAclsBuilder() {
.contains(buildGroupLevelAcl("User:foo", "*", PatternType.LITERAL, AclOperation.READ));
}

@Test
public void testConsumerAclsBuilderWithGroupPrefix() {

Consumer consumer = new Consumer("User:foo", "foo*");

List<TopologyAclBinding> aclBindings =
builder.buildBindingsForConsumers(Collections.singleton(consumer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(3);
assertThat(aclBindings)
.contains(buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.READ));
assertThat(aclBindings)
.contains(
buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.DESCRIBE));
assertThat(aclBindings)
.contains(buildGroupLevelAcl("User:foo", "foo", PatternType.PREFIXED, AclOperation.READ));
}

@Test
public void testProducerAclsBuilder() {
Producer producer = new Producer("User:foo");
Expand All @@ -71,8 +87,7 @@ public void testProducerAclsBuilder() {

@Test
public void testProducerWithTxIdAclsBuilder() {
Producer producer = new Producer("User:foo");
producer.setTransactionId(Optional.of("1234"));
Producer producer = new Producer("User:foo", "1234", true);
List<TopologyAclBinding> aclBindings =
builder.buildBindingsForProducers(Collections.singleton(producer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(5);
Expand Down Expand Up @@ -103,10 +118,36 @@ public void testProducerWithTxIdAclsBuilder() {
.contains(buildClusterLevelAcl(producer.getPrincipal(), AclOperation.IDEMPOTENT_WRITE));
}

@Test
public void testProducerWithTxIdPrefixAclsBuilder() {
Producer producer = new Producer("User:foo", "foo*", true);
List<TopologyAclBinding> aclBindings =
builder.buildBindingsForProducers(Collections.singleton(producer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(5);

assertThat(aclBindings)
.contains(buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.WRITE));
assertThat(aclBindings)
.contains(
buildTopicLevelAcl("User:foo", "bar", PatternType.LITERAL, AclOperation.DESCRIBE));

assertThat(aclBindings)
.contains(
buildTransactionIdLevelAcl(
"User:foo", "foo", PatternType.PREFIXED, AclOperation.DESCRIBE));

assertThat(aclBindings)
.contains(
buildTransactionIdLevelAcl(
"User:foo", "foo", PatternType.PREFIXED, AclOperation.WRITE));

assertThat(aclBindings)
.contains(buildClusterLevelAcl(producer.getPrincipal(), AclOperation.IDEMPOTENT_WRITE));
}

@Test
public void testIdempotenceProducerAclsBuilder() {
Producer producer = new Producer("User:foo");
producer.setIdempotence(Optional.of(true));
Producer producer = new Producer("User:foo", null, true);
List<TopologyAclBinding> aclBindings =
builder.buildBindingsForProducers(Collections.singleton(producer), "bar", false);
assertThat(aclBindings.size()).isEqualTo(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.purbon.kafka.topology.model.users.Producer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -78,8 +77,7 @@ public TestTopologyBuilder addConsumer(String user) {
}

public TestTopologyBuilder addConsumer(String user, String group) {
Consumer consumer = new Consumer(user);
consumer.setGroup(Optional.of(group));
Consumer consumer = new Consumer(user, group);
consumers.add(consumer);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ public void producerWithTxAclsCreation()
throws ExecutionException, InterruptedException, IOException {

List<Producer> producers = new ArrayList<>();
Producer producer = new Producer("User:Producer12");
producer.setTransactionId(Optional.of("1234"));
Producer producer = new Producer("User:Producer12", "1234", true);
producers.add(producer);

Project project = new ProjectImpl("project");
Expand All @@ -245,8 +244,7 @@ public void producerWithIdempotenceAclsCreation()
throws ExecutionException, InterruptedException, IOException {

List<Producer> producers = new ArrayList<>();
Producer producer = new Producer("User:Producer13");
producer.setIdempotence(Optional.of(true));
Producer producer = new Producer("User:Producer13", null, true);
producers.add(producer);

Project project = new ProjectImpl("project");
Expand Down