Skip to content

Commit

Permalink
fix(topic data): search raise an error and never stop consuming message
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 21, 2021
1 parent 5add70d commit 5d0a139
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
6 changes: 4 additions & 2 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public Publisher<Event<SearchRecord>> sse(
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Optional<String> searchByHeaderValue
) {
) throws ExecutionException, InterruptedException {
RecordRepository.Options options = dataSearchOptions(
cluster,
topicName,
Expand All @@ -319,8 +319,10 @@ public Publisher<Event<SearchRecord>> sse(
searchByHeaderValue
);

Topic topic = topicRepository.findByName(cluster, topicName);

return recordRepository
.search(cluster, options)
.search(topic, options)
.map(event -> {
SearchRecord searchRecord = new SearchRecord(
event.getData().getPercent(),
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,14 @@ public RecordMetadata delete(String clusterId, String topic, Integer partition,
)).get();
}

public Flowable<Event<SearchEvent>> search(String clusterId, Options options) {
public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws ExecutionException, InterruptedException {
AtomicInteger matchesCount = new AtomicInteger();

Properties properties = new Properties();
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.getSize());

return Flowable.generate(() -> {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);
Topic topic = topicRepository.findByName(clusterId, options.topic);
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId, properties);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);

if (partitions.size() == 0) {
Expand All @@ -571,7 +573,6 @@ public Flowable<Event<SearchEvent>> search(String clusterId, Options options) {

// end
if (searchEvent == null || searchEvent.emptyPoll == 666) {
Topic topic = topicRepository.findByName(clusterId, options.topic);

emitter.onNext(new SearchEvent(topic).end());
emitter.onComplete();
Expand Down Expand Up @@ -629,7 +630,7 @@ public Flowable<Event<SearchEvent>> search(String clusterId, Options options) {
private static boolean searchFilter(BaseOptions options, Record record) {

if (options.getSearch() != null) {
if(!search(options.getSearch(), Arrays.asList(record.getKey(), record.getValue()))) return false;
return search(options.getSearch(), Arrays.asList(record.getKey(), record.getValue()));
} else {
if (options.getSearchByKey() != null) {
if (!search(options.getSearchByKey(), Collections.singletonList(record.getKey()))) return false;
Expand All @@ -644,7 +645,7 @@ private static boolean searchFilter(BaseOptions options, Record record) {
}

if (options.getSearchByHeaderValue() != null) {
if (!search(options.getSearchByHeaderValue(), record.getHeaders().values())) return false;
return search(options.getSearchByHeaderValue(), record.getHeaders().values());
}
}
return true;
Expand Down
10 changes: 8 additions & 2 deletions src/test/java/org/akhq/repositories/RecordRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micronaut.context.env.Environment;
import lombok.extern.slf4j.Slf4j;
import org.akhq.models.Topic;
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Disabled;
Expand All @@ -26,7 +27,10 @@
public class RecordRepositoryTest extends AbstractTest {
@Inject
private RecordRepository repository;


@Inject
private TopicRepository topicRepository;

@Inject
private Environment environment;

Expand Down Expand Up @@ -223,8 +227,10 @@ private int searchAll(RecordRepository.Options options) throws ExecutionExceptio
AtomicInteger size = new AtomicInteger();
AtomicBoolean hasNext = new AtomicBoolean(true);

Topic topic = topicRepository.findByName(options.getClusterId(), options.getTopic());

do {
repository.search(KafkaTestCluster.CLUSTER_ID, options).blockingSubscribe(event -> {
repository.search(topic, options).blockingSubscribe(event -> {
size.addAndGet(event.getData().getRecords().size());

assertTrue(event.getData().getPercent() >= 0);
Expand Down

0 comments on commit 5d0a139

Please sign in to comment.