diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 943b391c4..03ffd2207 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -305,7 +305,7 @@ public Publisher> sse( Optional searchByValue, Optional searchByHeaderKey, Optional searchByHeaderValue - ) { + ) throws ExecutionException, InterruptedException { RecordRepository.Options options = dataSearchOptions( cluster, topicName, @@ -319,8 +319,10 @@ public Publisher> sse( searchByHeaderValue ); + Topic topic = topicRepository.findByName(cluster, topicName); + return recordRepository - .search(cluster, options) + .search(topic, options) .map(event -> { SearchRecord searchRecord = new SearchRecord( event.getData().getPercent(), diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index e3447a90c..aac018076 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -540,12 +540,14 @@ public RecordMetadata delete(String clusterId, String topic, Integer partition, )).get(); } - public Flowable> search(String clusterId, Options options) { + public Flowable> search(Topic topic, Options options) throws ExecutionException, InterruptedException { AtomicInteger matchesCount = new AtomicInteger(); + Properties properties = new Properties(); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.getSize()); + return Flowable.generate(() -> { - KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId); - Topic topic = topicRepository.findByName(clusterId, options.topic); + KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId, properties); Map partitions = getTopicPartitionForSortOldest(topic, options, consumer); if (partitions.size() == 0) { @@ -571,7 +573,6 @@ public Flowable> search(String clusterId, Options options) { // end if (searchEvent == null || searchEvent.emptyPoll == 666) { - Topic topic = topicRepository.findByName(clusterId, options.topic); emitter.onNext(new SearchEvent(topic).end()); emitter.onComplete(); @@ -629,7 +630,7 @@ public Flowable> search(String clusterId, Options options) { private static boolean searchFilter(BaseOptions options, Record record) { if (options.getSearch() != null) { - if(!search(options.getSearch(), Arrays.asList(record.getKey(), record.getValue()))) return false; + return search(options.getSearch(), Arrays.asList(record.getKey(), record.getValue())); } else { if (options.getSearchByKey() != null) { if (!search(options.getSearchByKey(), Collections.singletonList(record.getKey()))) return false; @@ -644,7 +645,7 @@ private static boolean searchFilter(BaseOptions options, Record record) { } if (options.getSearchByHeaderValue() != null) { - if (!search(options.getSearchByHeaderValue(), record.getHeaders().values())) return false; + return search(options.getSearchByHeaderValue(), record.getHeaders().values()); } } return true; diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java index 89c2cf0ca..0cb622025 100644 --- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java @@ -2,6 +2,7 @@ import io.micronaut.context.env.Environment; import lombok.extern.slf4j.Slf4j; +import org.akhq.models.Topic; import org.codehaus.httpcache4j.uri.URIBuilder; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Disabled; @@ -26,7 +27,10 @@ public class RecordRepositoryTest extends AbstractTest { @Inject private RecordRepository repository; - + + @Inject + private TopicRepository topicRepository; + @Inject private Environment environment; @@ -223,8 +227,10 @@ private int searchAll(RecordRepository.Options options) throws ExecutionExceptio AtomicInteger size = new AtomicInteger(); AtomicBoolean hasNext = new AtomicBoolean(true); + Topic topic = topicRepository.findByName(options.getClusterId(), options.getTopic()); + do { - repository.search(KafkaTestCluster.CLUSTER_ID, options).blockingSubscribe(event -> { + repository.search(topic, options).blockingSubscribe(event -> { size.addAndGet(event.getData().getRecords().size()); assertTrue(event.getData().getPercent() >= 0);