diff --git a/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java b/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java index 197f3b81c..59eb8e62a 100644 --- a/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java +++ b/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java @@ -19,8 +19,10 @@ public String getOwner() { return owner; } - public String commandTopic() { - return String.format("_confluent-ksql-%s_command_topic", ksqlDbId); + public String commandTopic() { return String.format("_confluent-ksql-%s_command_topic", ksqlDbId); } + + public String internalTopics() { + return String.format("_confluent-ksql-%s", ksqlDbId); } public String processingLogTopic() { diff --git a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java index 29ff6c554..fd77ce54a 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java @@ -355,7 +355,7 @@ private Stream ksqlServerStream(KsqlServerInstance ksqlServer) { bindings.add( buildTopicLevelAcl( - principal, ksqlServer.commandTopic(), PatternType.LITERAL, AclOperation.ALL)); + principal, ksqlServer.internalTopics(), PatternType.PREFIXED, AclOperation.ALL)); bindings.add( buildTopicLevelAcl( principal, ksqlServer.processingLogTopic(), PatternType.LITERAL, AclOperation.ALL));