Skip to content

Commit

Permalink
fix(topic-data): live tail not working
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Nov 18, 2020
1 parent 5b75aa2 commit 025ce12
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 40 deletions.
4 changes: 1 addition & 3 deletions src/main/java/org/akhq/controllers/TailController.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.reactivex.schedulers.Schedulers;
import io.swagger.v3.oas.annotations.Operation;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -36,8 +35,8 @@ public TailController(RecordRepository recordRepository) {
}

@Secured(Role.ROLE_TOPIC_DATA_READ)
@ExecuteOn(TaskExecutors.IO)
@Get(value = "api/{cluster}/tail/sse", produces = MediaType.TEXT_EVENT_STREAM)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"topic data"}, summary = "Tail for data on multiple topic")
public Publisher<Event<TailRecord>> sse(
String cluster,
Expand All @@ -51,7 +50,6 @@ public Publisher<Event<TailRecord>> sse(

return recordRepository
.tail(cluster, options)
.observeOn(Schedulers.io())
.map(event -> {
TailRecord tailRecord = new TailRecord();
tailRecord.offsets = getOffsets(event);
Expand Down
47 changes: 10 additions & 37 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,18 @@
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Delete;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.annotation.*;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.reactivex.schedulers.Schedulers;
import io.swagger.v3.oas.annotations.Operation;
import java.time.Instant;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.inject.Inject;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Role;
import org.akhq.models.AccessControl;
import org.akhq.models.Config;
import org.akhq.models.ConsumerGroup;
import org.akhq.models.LogDir;
import org.akhq.models.Partition;
import org.akhq.models.Record;
import org.akhq.models.Topic;
import org.akhq.models.TopicPartition;
import org.akhq.models.*;
import org.akhq.modules.AbstractKafkaWrapper;
import org.akhq.repositories.AccessControlListRepository;
import org.akhq.repositories.ConfigRepository;
import org.akhq.repositories.ConsumerGroupRepository;
import org.akhq.repositories.RecordRepository;
import org.akhq.repositories.TopicRepository;
import org.akhq.repositories.*;
import org.akhq.utils.Pagination;
import org.akhq.utils.ResultNextList;
import org.akhq.utils.ResultPagedList;
Expand All @@ -61,6 +29,12 @@
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.reactivestreams.Publisher;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.inject.Inject;

@Slf4j
@Secured(Role.ROLE_TOPIC_READ)
@Controller
Expand Down Expand Up @@ -332,7 +306,6 @@ public Publisher<Event<SearchRecord>> sse(

return recordRepository
.search(cluster, options)
.observeOn(Schedulers.io())
.map(event -> {
SearchRecord searchRecord = new SearchRecord(
event.getData().getPercent(),
Expand Down

0 comments on commit 025ce12

Please sign in to comment.