diff --git a/client/src/containers/Topic/TopicList/TopicList.jsx b/client/src/containers/Topic/TopicList/TopicList.jsx index 3ed872446..73fa24fd0 100644 --- a/client/src/containers/Topic/TopicList/TopicList.jsx +++ b/client/src/containers/Topic/TopicList/TopicList.jsx @@ -183,12 +183,6 @@ class TopicList extends Root { internal: topic.internal } collapseConsumerGroups[topic.name] = false; - - this.getApi(uriTopicLastRecord(selectedCluster, topic.name)) - .then(value => { - tableTopics[topic.name].lastWrite = value.data.timestamp || '' - setState() - }) }); this.setState({collapseConsumerGroups}); setState() @@ -203,6 +197,17 @@ class TopicList extends Root { }); setState(); }); + + console.log(encodeURIComponent(topicsName)) + this.getApi(uriTopicLastRecord(selectedCluster, encodeURIComponent(topicsName))) + .then(value => { + topics.forEach((topic) => { + console.log(topic.name) + console.log(tableTopics[topic.name]); + tableTopics[topic.name].lastWrite = value.data[topic.name] ? value.data[topic.name].timestamp : '' + }); + setState(); + }); } diff --git a/client/src/utils/endpoints.js b/client/src/utils/endpoints.js index 9b1303993..d99784d26 100644 --- a/client/src/utils/endpoints.js +++ b/client/src/utils/endpoints.js @@ -290,8 +290,8 @@ export const uriTopicDataEmpty = (clusterId, topicName) => { return `${apiUrl}/${clusterId}/topic/${topicName}/data/empty`; } -export const uriTopicLastRecord = (clusterId, topicName) => { - return `${apiUrl}/${clusterId}/topic/${topicName}/last-record`; +export const uriTopicLastRecord = (clusterId, topicList) => { + return `${apiUrl}/${clusterId}/topic/last-record?topics=${topicList}`; } export default { diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 47117a17b..318cfc89a 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -211,10 +211,10 @@ public Topic home(String cluster, String topicName) throws ExecutionException, I return this.topicRepository.findByName(cluster, topicName); } - @Get("api/{cluster}/topic/{topicName}/last-record") - @Operation(tags = {"topic"}, summary = "Retrieve the last record of a topic") - public Record lastRecord(String cluster, String topicName) throws ExecutionException, InterruptedException { - return this.recordRepository.getLastRecord(cluster, topicName).orElse(new Record()); + @Get("api/{cluster}/topic/last-record") + @Operation(tags = {"topic"}, summary = "Retrieve the last record for a list of topics") + public Map lastRecord(String cluster, List topics) throws ExecutionException, InterruptedException { + return this.recordRepository.getLastRecord(cluster, topics); } @Get("api/{cluster}/topic/{topicName}/partitions") diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 9368b81c0..32faede51 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -28,6 +28,7 @@ import org.codehaus.httpcache4j.uri.URIBuilder; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -61,39 +62,43 @@ public class RecordRepository extends AbstractRepository { @Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}") protected int maxPollRecords; - public Optional getLastRecord(String clusterId, String topicName) throws ExecutionException, InterruptedException { - KafkaConsumer consumer = kafkaModule.getConsumer(clusterId); - Topic topic = topicRepository.findByName(clusterId, topicName); + public Map getLastRecord(String clusterId, List topicsName) throws ExecutionException, InterruptedException { + List topics = topicRepository.findByName(clusterId, topicsName); - List topicPartitions = topic - .getPartitions() + List topicPartitions = topics .stream() + .flatMap(topic -> topic.getPartitions().stream()) .map(partition -> new TopicPartition(partition.getTopic(), partition.getId())) .collect(Collectors.toList()); + KafkaConsumer consumer = kafkaModule.getConsumer(clusterId, new Properties() {{ + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicPartitions.size() * 3); + }}); consumer.assign(topicPartitions); - Optional> latestRecord = - getLatestRecordFromOffsetDifference(consumer, 1) - .or(() -> getLatestRecordFromOffsetDifference(consumer, 2)); - - consumer.close(); - return latestRecord.map(record -> newRecord(record, clusterId)); - } - private Optional> getLatestRecordFromOffsetDifference(KafkaConsumer consumer, int offsetDifference) { - AtomicReference> latestRecord = new AtomicReference<>(); consumer .endOffsets(consumer.assignment()) - .forEach((topicPartition, offset) -> consumer.seek(topicPartition, Math.max(0, offset - offsetDifference))); + .forEach((topicPartition, offset) -> { + consumer.seek(topicPartition, Math.max(0, offset - 2)); + }); - AtomicLong maxTimestamp = new AtomicLong(); - this.poll(consumer).forEach(record -> { - if (record.timestamp() > maxTimestamp.get()) { - maxTimestamp.set(record.timestamp()); - latestRecord.set(record); - } - }); - return Optional.ofNullable(latestRecord.get()); + ConcurrentHashMap records = new ConcurrentHashMap<>(); + + this.poll(consumer) + .forEach(record -> { + if (!records.containsKey(record.topic())) { + records.put(record.topic(), newRecord(record, clusterId)); + } else { + Record current = records.get(record.topic()); + if (current.getTimestamp().toInstant().toEpochMilli() < record.timestamp()) { + records.put(record.topic(), newRecord(record, clusterId)); + } + } + + }); + + consumer.close(); + return records; } public List consume(String clusterId, Options options) throws ExecutionException, InterruptedException { @@ -414,9 +419,10 @@ private ConsumerRecords poll(KafkaConsumer consu private Record newRecord(ConsumerRecord record, String clusterId) { return new Record( - record, - this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId), - avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId)) + record, + this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId), + this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId), + avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId)) ); } diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java index c78cf7369..f34db83cf 100644 --- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java @@ -12,10 +12,7 @@ import org.akhq.models.Record; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -218,8 +215,8 @@ public void searchAvro() throws ExecutionException, InterruptedException { @Test void lastRecordTest() throws ExecutionException, InterruptedException { - Optional record = repository.getLastRecord(KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_RANDOM); - assertTrue(record.isPresent());; + Map record = repository.getLastRecord(KafkaTestCluster.CLUSTER_ID, Collections.singletonList(KafkaTestCluster.TOPIC_RANDOM)); + assertTrue(record.containsKey(KafkaTestCluster.TOPIC_RANDOM)); } private int searchAll(RecordRepository.Options options) throws ExecutionException, InterruptedException {