Skip to content

Commit

Permalink
feat(topic-data): improve last record
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Nov 11, 2020
1 parent b155827 commit 4f6f23b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 44 deletions.
17 changes: 11 additions & 6 deletions client/src/containers/Topic/TopicList/TopicList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,6 @@ class TopicList extends Root {
internal: topic.internal
}
collapseConsumerGroups[topic.name] = false;

this.getApi(uriTopicLastRecord(selectedCluster, topic.name))
.then(value => {
tableTopics[topic.name].lastWrite = value.data.timestamp || ''
setState()
})
});
this.setState({collapseConsumerGroups});
setState()
Expand All @@ -203,6 +197,17 @@ class TopicList extends Root {
});
setState();
});

console.log(encodeURIComponent(topicsName))
this.getApi(uriTopicLastRecord(selectedCluster, encodeURIComponent(topicsName)))
.then(value => {
topics.forEach((topic) => {
console.log(topic.name)
console.log(tableTopics[topic.name]);
tableTopics[topic.name].lastWrite = value.data[topic.name] ? value.data[topic.name].timestamp : ''
});
setState();
});
}


Expand Down
4 changes: 2 additions & 2 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ export const uriTopicDataEmpty = (clusterId, topicName) => {
return `${apiUrl}/${clusterId}/topic/${topicName}/data/empty`;
}

export const uriTopicLastRecord = (clusterId, topicName) => {
return `${apiUrl}/${clusterId}/topic/${topicName}/last-record`;
export const uriTopicLastRecord = (clusterId, topicList) => {
return `${apiUrl}/${clusterId}/topic/last-record?topics=${topicList}`;
}

export default {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ public Topic home(String cluster, String topicName) throws ExecutionException, I
return this.topicRepository.findByName(cluster, topicName);
}

@Get("api/{cluster}/topic/{topicName}/last-record")
@Operation(tags = {"topic"}, summary = "Retrieve the last record of a topic")
public Record lastRecord(String cluster, String topicName) throws ExecutionException, InterruptedException {
return this.recordRepository.getLastRecord(cluster, topicName).orElse(new Record());
@Get("api/{cluster}/topic/last-record")
@Operation(tags = {"topic"}, summary = "Retrieve the last record for a list of topics")
public Map<String, Record> lastRecord(String cluster, List<String> topics) throws ExecutionException, InterruptedException {
return this.recordRepository.getLastRecord(cluster, topics);
}

@Get("api/{cluster}/topic/{topicName}/partitions")
Expand Down
58 changes: 32 additions & 26 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.codehaus.httpcache4j.uri.URIBuilder;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -61,39 +62,43 @@ public class RecordRepository extends AbstractRepository {
@Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}")
protected int maxPollRecords;

public Optional<Record> getLastRecord(String clusterId, String topicName) throws ExecutionException, InterruptedException {
KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId);
Topic topic = topicRepository.findByName(clusterId, topicName);
public Map<String, Record> getLastRecord(String clusterId, List<String> topicsName) throws ExecutionException, InterruptedException {
List<Topic> topics = topicRepository.findByName(clusterId, topicsName);

List<TopicPartition> topicPartitions = topic
.getPartitions()
List<TopicPartition> topicPartitions = topics
.stream()
.flatMap(topic -> topic.getPartitions().stream())
.map(partition -> new TopicPartition(partition.getTopic(), partition.getId()))
.collect(Collectors.toList());

KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId, new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicPartitions.size() * 3);
}});
consumer.assign(topicPartitions);
Optional<ConsumerRecord<byte[], byte[]>> latestRecord =
getLatestRecordFromOffsetDifference(consumer, 1)
.or(() -> getLatestRecordFromOffsetDifference(consumer, 2));

consumer.close();
return latestRecord.map(record -> newRecord(record, clusterId));
}

private Optional<ConsumerRecord<byte[], byte[]>> getLatestRecordFromOffsetDifference(KafkaConsumer<byte[], byte[]> consumer, int offsetDifference) {
AtomicReference<ConsumerRecord<byte[], byte[]>> latestRecord = new AtomicReference<>();
consumer
.endOffsets(consumer.assignment())
.forEach((topicPartition, offset) -> consumer.seek(topicPartition, Math.max(0, offset - offsetDifference)));
.forEach((topicPartition, offset) -> {
consumer.seek(topicPartition, Math.max(0, offset - 2));
});

AtomicLong maxTimestamp = new AtomicLong();
this.poll(consumer).forEach(record -> {
if (record.timestamp() > maxTimestamp.get()) {
maxTimestamp.set(record.timestamp());
latestRecord.set(record);
}
});
return Optional.ofNullable(latestRecord.get());
ConcurrentHashMap<String, Record> records = new ConcurrentHashMap<>();

this.poll(consumer)
.forEach(record -> {
if (!records.containsKey(record.topic())) {
records.put(record.topic(), newRecord(record, clusterId));
} else {
Record current = records.get(record.topic());
if (current.getTimestamp().toInstant().toEpochMilli() < record.timestamp()) {
records.put(record.topic(), newRecord(record, clusterId));
}
}

});

consumer.close();
return records;
}

public List<Record> consume(String clusterId, Options options) throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -414,9 +419,10 @@ private ConsumerRecords<byte[], byte[]> poll(KafkaConsumer<byte[], byte[]> consu

private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId) {
return new Record(
record,
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId))
record,
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId))
);
}

Expand Down
9 changes: 3 additions & 6 deletions src/test/java/org/akhq/repositories/RecordRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import org.akhq.models.Record;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -218,8 +215,8 @@ public void searchAvro() throws ExecutionException, InterruptedException {

@Test
void lastRecordTest() throws ExecutionException, InterruptedException {
Optional<Record> record = repository.getLastRecord(KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_RANDOM);
assertTrue(record.isPresent());;
Map<String, Record> record = repository.getLastRecord(KafkaTestCluster.CLUSTER_ID, Collections.singletonList(KafkaTestCluster.TOPIC_RANDOM));
assertTrue(record.containsKey(KafkaTestCluster.TOPIC_RANDOM));
}

private int searchAll(RecordRepository.Options options) throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit 4f6f23b

Please sign in to comment.