Skip to content

Commit

Permalink
fix(server): ensure oldest listing to be started at the first existin…
Browse files Browse the repository at this point in the history
…g entry by consuming partitions separately
  • Loading branch information
jonasvoelcker committed Jun 21, 2024
1 parent 5810ffc commit 6e8ca94
Showing 1 changed file with 139 additions and 139 deletions.
278 changes: 139 additions & 139 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,31 @@ public Map<String, Record> getLastRecord(String clusterId, List<String> topicsNa
.map(partition -> new TopicPartition(partition.getTopic(), partition.getId()))
.collect(Collectors.toList());

KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId);
consumer.assign(topicPartitions);
ConcurrentHashMap<String, Record> records = new ConcurrentHashMap<>();

consumer
.endOffsets(consumer.assignment())
.forEach((topicPartition, offset) -> {
consumer.seek(topicPartition, Math.max(0, offset - 2));
});
try (KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId)) {
consumer.assign(topicPartitions);

ConcurrentHashMap<String, Record> records = new ConcurrentHashMap<>();
consumer
.endOffsets(consumer.assignment())
.forEach((topicPartition, offset) -> {
consumer.seek(topicPartition, Math.max(0, offset - 2));
});

this.poll(consumer)
.forEach(record -> {
if (!records.containsKey(record.topic())) {
records.put(record.topic(), newRecord(record, clusterId, topics.get(record.topic())));
} else {
Record current = records.get(record.topic());
if (current.getTimestamp().toInstant().toEpochMilli() < record.timestamp()) {
this.poll(consumer)
.forEach(record -> {
if (!records.containsKey(record.topic())) {
records.put(record.topic(), newRecord(record, clusterId, topics.get(record.topic())));
} else {
Record current = records.get(record.topic());
if (current.getTimestamp().toInstant().toEpochMilli() < record.timestamp()) {
records.put(record.topic(), newRecord(record, clusterId, topics.get(record.topic())));
}
}
}

});
});
}

consumer.close();
return records;
}

Expand All @@ -141,42 +141,38 @@ public List<Record> consume(String clusterId, Options options) throws ExecutionE
}

private List<Record> consumeOldest(Topic topic, Options options) {
KafkaConsumer<byte[], byte[]> consumer =
this.kafkaModule.getConsumer(options.clusterId, new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.size);
}});

Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);
List<Record> list = new ArrayList<>();

if (partitions.size() > 0) {
consumer.assign(partitions.keySet());
partitions.forEach(consumer::seek);
for (Map.Entry<TopicPartition, Long> partition : getTopicPartitionForSortOldest(topic, options).entrySet()) {
Properties properties = new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.size);
}};

if (log.isTraceEnabled()) {
partitions.forEach((topicPartition, first) ->
try (KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId, properties)) {
consumer.assign(List.of(partition.getKey()));
consumer.seek(partition.getKey(), partition.getValue());

if (log.isTraceEnabled()) {
log.trace(
"Consume [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);
}
partition.getKey().topic(),
partition.getKey().partition(),
partition.getValue()
);
}

ConsumerRecords<byte[], byte[]> records = this.poll(consumer);
ConsumerRecords<byte[], byte[]> records = this.poll(consumer);

for (ConsumerRecord<byte[], byte[]> record : records) {
Record current = newRecord(record, options, topic);
if (matchFilters(options, current)) {
filterMessageLength(current);
list.add(current);
for (ConsumerRecord<byte[], byte[]> record : records) {
Record current = newRecord(record, options, topic);
if (matchFilters(options, current)) {
filterMessageLength(current);
list.add(current);
}
}
}
}

consumer.close();

return list.stream()
.sorted(Comparator.comparing(Record::getTimestamp))
.limit(options.size)
Expand All @@ -187,51 +183,50 @@ public List<TimeOffset> getOffsetForTime(String clusterId, List<org.akhq.models.
return Debug.call(() -> {
Map<TopicPartition, Long> map = new HashMap<>();

KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(clusterId);

partitions
.forEach(partition -> map.put(
new TopicPartition(partition.getTopic(), partition.getPartition()),
timestamp
));

List<TimeOffset> collect = consumer.offsetsForTimes(map)
.entrySet()
.stream()
.map(r -> r.getValue() != null ? new TimeOffset(
r.getKey().topic(),
r.getKey().partition(),
r.getValue().offset()
) : null)
.filter(Objects::nonNull)
.collect(Collectors.toList());

consumer.close();

return collect;
try (KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(clusterId)) {
partitions
.forEach(partition -> map.put(
new TopicPartition(partition.getTopic(), partition.getPartition()),
timestamp
));

List<TimeOffset> collect = consumer.offsetsForTimes(map)
.entrySet()
.stream()
.map(r -> r.getValue() != null ? new TimeOffset(
r.getKey().topic(),
r.getKey().partition(),
r.getValue().offset()
) : null)
.filter(Objects::nonNull)
.collect(Collectors.toList());

return collect;
}
}, "Offsets for " + partitions + " Timestamp " + timestamp, null);
}

public Optional<Record> consumeSingleRecord(String clusterId, Topic topic, Options options) throws ExecutionException, InterruptedException {
return Debug.call(() -> {
Optional<Record> singleRecord = Optional.empty();
KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId, new Properties() {{

Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options);

Properties properties = new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
}});
}};

Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);
consumer.assign(partitions.keySet());
partitions.forEach(consumer::seek);
try (KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId, properties)) {
consumer.assign(partitions.keySet());
partitions.forEach(consumer::seek);

ConsumerRecords<byte[], byte[]> records = this.poll(consumer);
if(!records.isEmpty()) {
singleRecord = Optional.of(newRecord(records.iterator().next(), options, topic));
ConsumerRecords<byte[], byte[]> records = this.poll(consumer);
if (!records.isEmpty()) {
singleRecord = Optional.of(newRecord(records.iterator().next(), options, topic));
}
}

consumer.close();
return singleRecord;

}, "Consume with options {}", Collections.singletonList(options.toString()));
}

Expand All @@ -246,21 +241,27 @@ public static class TimeOffset {
}


private Map<TopicPartition, Long> getTopicPartitionForSortOldest(Topic topic, Options options, KafkaConsumer<byte[], byte[]> consumer) {
return topic
.getPartitions()
.stream()
.map(partition -> getFirstOffsetForSortOldest(consumer, partition, options)
.map(offsetBound -> offsetBound.withTopicPartition(
new TopicPartition(
partition.getTopic(),
partition.getId()
)
))
)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(OffsetBound::getTopicPartition, OffsetBound::getBegin));
private Map<TopicPartition, Long> getTopicPartitionForSortOldest(Topic topic, Options options) {
Properties properties = new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
}};

try (KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(options.clusterId, properties)) {
return topic
.getPartitions()
.stream()
.map(partition -> getFirstOffsetForSortOldest(consumer, partition, options)
.map(offsetBound -> offsetBound.withTopicPartition(
new TopicPartition(
partition.getTopic(),
partition.getId()
)
))
)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(OffsetBound::getTopicPartition, OffsetBound::getBegin));
}
}

private List<Record> consumeNewest(Topic topic, Options options) {
Expand Down Expand Up @@ -655,8 +656,9 @@ public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws
AtomicInteger matchesCount = new AtomicInteger();

return Flowable.generate(() -> {
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options);

KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);

if (partitions.size() == 0) {
return new SearchState(consumer, null);
Expand Down Expand Up @@ -1023,69 +1025,67 @@ public Flowable<Event<TailEvent>> tail(String clusterId, TailOptions options) {
}

public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List<TopicController.OffsetCopy> offsets, RecordRepository.Options options) {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(
options.clusterId,
new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
}}
);

Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(fromTopic, options, consumer);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(fromTopic, options);

Map<TopicPartition, Long> filteredPartitions = partitions.entrySet().stream()
.filter(topicPartitionLongEntry -> offsets.stream()
.anyMatch(offsetCopy -> offsetCopy.getPartition() == topicPartitionLongEntry.getKey().partition()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

int counter = 0;
Properties properties = new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
}};

if (filteredPartitions.size() > 0) {
consumer.assign(filteredPartitions.keySet());
filteredPartitions.forEach(consumer::seek);
try (KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId, properties)) {
int counter = 0;

if (log.isTraceEnabled()) {
filteredPartitions.forEach((topicPartition, first) ->
log.trace(
"Consume [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);
}
if (filteredPartitions.size() > 0) {
consumer.assign(filteredPartitions.keySet());
filteredPartitions.forEach(consumer::seek);

Map<Partition, Long> partitionsLastOffsetMap = fromTopic.getPartitions()
.stream()
.collect(Collectors.toMap(Function.identity(), Partition::getLastOffset));
if (log.isTraceEnabled()) {
filteredPartitions.forEach((topicPartition, first) ->
log.trace(
"Consume [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);
}

boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size();
Map<Partition, Long> partitionsLastOffsetMap = fromTopic.getPartitions()
.stream()
.collect(Collectors.toMap(Function.identity(), Partition::getLastOffset));

KafkaProducer<byte[], byte[]> producer = kafkaModule.getProducer(toClusterId);
ConsumerRecords<byte[], byte[]> records;
do {
records = this.pollAndFilter(consumer, partitionsLastOffsetMap);
boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size();

for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.offset() + "-" + record.partition());

counter++;
producer.send(new ProducerRecord<>(
toTopic.getName(),
samePartition ? record.partition() : null,
record.timestamp(),
record.key(),
record.value(),
record.headers()
));
}
KafkaProducer<byte[], byte[]> producer = kafkaModule.getProducer(toClusterId);
ConsumerRecords<byte[], byte[]> records;
do {
records = this.pollAndFilter(consumer, partitionsLastOffsetMap);

for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.offset() + "-" + record.partition());

counter++;
producer.send(new ProducerRecord<>(
toTopic.getName(),
samePartition ? record.partition() : null,
record.timestamp(),
record.key(),
record.value(),
record.headers()
));
}

} while (!records.isEmpty());
} while (!records.isEmpty());

producer.flush();
}
consumer.close();
producer.flush();
}

return new CopyResult(counter);
return new CopyResult(counter);
}
}

/**
Expand Down

0 comments on commit 6e8ca94

Please sign in to comment.