Skip to content

Commit

Permalink
Merge branch 'dev' into feat/server_options_api
Browse files Browse the repository at this point in the history
  • Loading branch information
Paulo authored and Paulo committed Nov 19, 2020
2 parents 0bcccdb + 025ce12 commit 1bfd953
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 71 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ dependencies {
// micronaut
annotationProcessor "io.micronaut:micronaut-inject-java"
annotationProcessor "io.micronaut:micronaut-validation"
annotationProcessor 'io.micronaut.configuration:micronaut-openapi:1.5.2'
annotationProcessor 'io.micronaut.openapi:micronaut-openapi'
implementation "io.micronaut:micronaut-inject"
implementation "io.micronaut:micronaut-validation"
implementation "io.micronaut:micronaut-runtime"
Expand Down
2 changes: 1 addition & 1 deletion client/src/containers/Topic/Topic/Topic.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Topic extends Root {
{
clusterId,
topicId,
selectedTab: roles.topic && roles.topic['topic/data/read'] ? tabSelected : 'partitions',
selectedTab: roles.topic && roles.topic['topic/data/read'] ? tabSelected : 'configs',
topicInternal: this.props.location.internal
},
() => {
Expand Down
18 changes: 12 additions & 6 deletions client/src/containers/Topic/TopicList/TopicList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,21 @@ class TopicList extends Root {
{colName: 'Partitions', colSpan: partitionCols.length},
{colName: 'Replications', colSpan: replicationCols.length}
];
let onDetailsFunction = undefined;

if(!uiOptions.skipConsumerGroups) {
firstColumns.push({colName: 'Consumer Groups', colSpan: 1});
}

const actions = [constants.TABLE_CONFIG];
if(roles.topic && roles.topic['topic/data/read']) {
actions.push(constants.TABLE_DETAILS);
onDetailsFunction = (id) => `/ui/${selectedCluster}/topic/${id}/data`;
}
if(roles.topic && roles.topic['topic/delete']) {
actions.push(constants.TABLE_DELETE);
}

return (
<div>
<Header title="Topics" history={this.props.history} />
Expand Down Expand Up @@ -435,13 +445,9 @@ class TopicList extends Root {
onDelete={topic => {
this.handleOnDelete(topic);
}}
onDetails={(id) => `/ui/${selectedCluster}/topic/${id}/data`}
onDetails={onDetailsFunction}
onConfig={(id) => `/ui/${selectedCluster}/topic/${id}/configs`}
actions={
roles.topic && roles.topic['topic/delete']
? [constants.TABLE_DELETE, constants.TABLE_DETAILS, constants.TABLE_CONFIG]
: [constants.TABLE_DETAILS, constants.TABLE_CONFIG]
}
actions={actions}
/>

{roles.topic['topic/insert'] && (
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
micronautVersion=2.0.1
micronautVersion=2.1.3
confluentVersion=5.5.1
kafkaVersion=2.6.0
lombokVersion=1.18.16
3 changes: 3 additions & 0 deletions helm/akhq/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ spec:
securityContext:
{{ toYaml .Values.securityContext | indent 8 }}
{{- end }}
{{- if .Values.serviceAccountName }}
serviceAccountName: {{ .Values.serviceAccountName }}
{{- end }}
{{- if .Values.initContainers }}
initContainers:
{{- range $key, $value := .Values.initContainers }}
Expand Down
3 changes: 3 additions & 0 deletions helm/akhq/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ extraVolumes: []
# Any extra volume mounts to define for the akhq container
extraVolumeMounts: []

# Specify ServiceAccount for pod
serviceAccountName: null

# Add your own init container or uncomment and modify the example.
initContainers: {}
# create-keystore:
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/akhq/controllers/TailController.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.swagger.v3.oas.annotations.Operation;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -34,6 +36,7 @@ public TailController(RecordRepository recordRepository) {

@Secured(Role.ROLE_TOPIC_DATA_READ)
@Get(value = "api/{cluster}/tail/sse", produces = MediaType.TEXT_EVENT_STREAM)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"topic data"}, summary = "Tail for data on multiple topic")
public Publisher<Event<TailRecord>> sse(
String cluster,
Expand Down
50 changes: 14 additions & 36 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,18 @@
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Delete;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.annotation.*;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.swagger.v3.oas.annotations.Operation;
import java.time.Instant;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.inject.Inject;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Role;
import org.akhq.models.AccessControl;
import org.akhq.models.Config;
import org.akhq.models.ConsumerGroup;
import org.akhq.models.LogDir;
import org.akhq.models.Partition;
import org.akhq.models.Record;
import org.akhq.models.Topic;
import org.akhq.models.TopicPartition;
import org.akhq.models.*;
import org.akhq.modules.AbstractKafkaWrapper;
import org.akhq.repositories.AccessControlListRepository;
import org.akhq.repositories.ConfigRepository;
import org.akhq.repositories.ConsumerGroupRepository;
import org.akhq.repositories.RecordRepository;
import org.akhq.repositories.TopicRepository;
import org.akhq.repositories.*;
import org.akhq.utils.Pagination;
import org.akhq.utils.ResultNextList;
import org.akhq.utils.ResultPagedList;
Expand All @@ -58,6 +29,12 @@
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.reactivestreams.Publisher;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.inject.Inject;

@Slf4j
@Secured(Role.ROLE_TOPIC_READ)
@Controller
Expand Down Expand Up @@ -305,6 +282,7 @@ public HttpResponse<?> delete(String cluster, String topicName) throws Execution
}

@Secured(Role.ROLE_TOPIC_DATA_READ)
@ExecuteOn(TaskExecutors.IO)
@Get(value = "api/{cluster}/topic/{topicName}/data/search/{search}", produces = MediaType.TEXT_EVENT_STREAM)
@Operation(tags = {"topic data"}, summary = "Search for data for a topic")
public Publisher<Event<SearchRecord>> sse(
Expand All @@ -315,7 +293,7 @@ public Publisher<Event<SearchRecord>> sse(
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> search
) throws ExecutionException, InterruptedException {
) {
RecordRepository.Options options = dataSearchOptions(
cluster,
topicName,
Expand Down
62 changes: 40 additions & 22 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -540,36 +540,44 @@ public RecordMetadata delete(String clusterId, String topic, Integer partition,
)).get();
}

public Flowable<Event<SearchEvent>> search(String clusterId, Options options) throws ExecutionException, InterruptedException {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);
Topic topic = topicRepository.findByName(clusterId, options.topic);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);

public Flowable<Event<SearchEvent>> search(String clusterId, Options options) {
AtomicInteger matchesCount = new AtomicInteger();

if (partitions.size() == 0) {
return Flowable.just(new SearchEvent(topic).end());
}
return Flowable.generate(() -> {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);
Topic topic = topicRepository.findByName(clusterId, options.topic);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);

consumer.assign(partitions.keySet());
partitions.forEach(consumer::seek);
if (partitions.size() == 0) {
return new SearchState(consumer, null);
}

partitions.forEach((topicPartition, first) ->
log.trace(
"Search [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);
consumer.assign(partitions.keySet());
partitions.forEach(consumer::seek);

partitions.forEach((topicPartition, first) ->
log.trace(
"Search [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);

return new SearchState(consumer, new SearchEvent(topic));
}, (searchState, emitter) -> {
SearchEvent searchEvent = searchState.getSearchEvent();
KafkaConsumer<byte[], byte[]> consumer = searchState.getConsumer();

return Flowable.generate(() -> new SearchEvent(topic), (searchEvent, emitter) -> {
// end
if (searchEvent.emptyPoll == 666) {
if (searchEvent == null || searchEvent.emptyPoll == 666) {
Topic topic = topicRepository.findByName(clusterId, options.topic);

emitter.onNext(new SearchEvent(topic).end());
emitter.onComplete();
consumer.close();

return searchEvent;
return new SearchState(consumer, searchEvent);
}

SearchEvent currentEvent = new SearchEvent(searchEvent);
Expand Down Expand Up @@ -614,7 +622,7 @@ public Flowable<Event<SearchEvent>> search(String clusterId, Options options) th
emitter.onNext(currentEvent.progress(options));
}

return currentEvent;
return new SearchState(consumer, currentEvent);
});
}

Expand Down Expand Up @@ -849,6 +857,16 @@ public static class TailState {
private TailEvent tailEvent;
}

@ToString
@EqualsAndHashCode
@Getter
@AllArgsConstructor
public static class SearchState {
private final KafkaConsumer<byte[], byte[]> consumer;
private final SearchEvent searchEvent;
}


@ToString
@EqualsAndHashCode
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void success() {
assertThat(roles, hasItem("topic/read"));
assertThat(roles, hasItem("registry/version/delete"));

assertEquals("test.*", ((List)userDetail.getAttributes("roles", "username").get("topics-filter-regexp")).get(0));
assertEquals("test.*", ((List)userDetail.getAttributes("roles", "username").get("topicsFilterRegexp")).get(0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void success() throws NamingException {
assertThat(roles, hasItem("topic/read"));
assertThat(roles, hasItem("registry/version/delete"));

assertEquals("test.*", ((List)userDetail.getAttributes("roles", "username").get("topics-filter-regexp")).get(0));
assertEquals("test.*", ((List)userDetail.getAttributes("roles", "username").get("topicsFilterRegexp")).get(0));
}

@Test
Expand Down Expand Up @@ -127,7 +127,7 @@ public void successWithMultipleLdapGroups() throws NamingException {
assertThat(roles, hasItem("registry/version/delete"));
assertThat(roles, hasItem("topic/data/read"));

List<String> topicsFilterList = (List)(userDetail.getAttributes("roles", "username").get("topics-filter-regexp"));
List<String> topicsFilterList = (List)(userDetail.getAttributes("roles", "username").get("topicsFilterRegexp"));
assertThat(topicsFilterList, hasSize(2));
assertThat(topicsFilterList, hasItem("test.*"));
assertThat(topicsFilterList, hasItem("test-operator.*"));
Expand Down Expand Up @@ -166,7 +166,7 @@ public void successWithLdapGroupAndUserRole() throws NamingException {
assertThat(roles, hasItem("registry/version/delete"));
assertThat(roles, hasItem("topic/data/read"));

List<String> topicsFilterList = (List)(userDetail.getAttributes("roles", "username").get("topics-filter-regexp"));
List<String> topicsFilterList = (List)(userDetail.getAttributes("roles", "username").get("topicsFilterRegexp"));
assertThat(topicsFilterList, hasSize(2));
assertThat(topicsFilterList, hasItem("test.*"));
assertThat(topicsFilterList, hasItem("test-operator.*"));
Expand Down

0 comments on commit 1bfd953

Please sign in to comment.