Skip to content

Commit

Permalink
fix(topic data): require key for produce onto compacted topic (#780)
Browse files Browse the repository at this point in the history
close #133
  • Loading branch information
Tim te Beek authored and tchiotludo committed Oct 24, 2021
1 parent b1fdcf4 commit 9894d0b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/main/java/org/akhq/models/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ public Boolean canDeleteRecords(String clusterId, ConfigRepository configReposit
return false;
}

List<Config> configs = configRepository.findByTopic(clusterId, this.getName());
return isCompacted(configRepository.findByTopic(clusterId, this.getName()));
}

public static boolean isCompacted(List<Config> configs) {
return configs != null && configs
.stream()
.filter(config -> config.getName().equals(TopicConfig.CLEANUP_POLICY_CONFIG))
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private KafkaModule kafkaModule;

@Inject
private ConfigRepository configRepository;

@Inject
private TopicRepository topicRepository;

Expand Down Expand Up @@ -540,6 +543,14 @@ public RecordMetadata produce(
} else {
keyAsBytes = key.get().getBytes();
}
} else {
try {
if (Topic.isCompacted(configRepository.findByTopic(clusterId, value))) {
throw new IllegalArgumentException("Key missing for produce onto compacted topic");
}
} catch (ExecutionException ex) {
log.debug("Failed to determine if {} topic {} is compacted", clusterId, topic, ex);
}
}

if (value != null && valueSchemaId.isPresent()) {
Expand Down

0 comments on commit 9894d0b

Please sign in to comment.