Skip to content

Commit

Permalink
feat(topicdata): adding download all messages feature (#1628)
Browse files Browse the repository at this point in the history
close #1487
  • Loading branch information
AlexisSouquiere committed Jan 16, 2024
1 parent 04c2d1b commit c89db95
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 7 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ dependencies {
implementation 'com.google.guava:guava:32.1.3-jre'
implementation 'com.google.code.gson:gson:2.10.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'

// avro
implementation 'org.apache.avro:avro:1.11.0'
Expand Down
56 changes: 56 additions & 0 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
uriSchemaId,
uriTopicData,
uriTopicDataDelete,
uriTopicDataDownload,
uriTopicDataSearch,
uriTopicDataSingleRecord,
uriTopicsPartitions,
Expand Down Expand Up @@ -63,6 +64,7 @@ class TopicData extends Root {
recordCount: 0,
showFilters: '',
showDeleteModal: false,
showDownloadModal: false,
deleteMessage: '',
compactMessageToDelete: '',
selectedCluster: this.props.clusterId,
Expand Down Expand Up @@ -528,6 +530,28 @@ class TopicData extends Root {
return tableMessages;
};

_downloadAllMatchingFilters = () => {
const { selectedCluster, selectedTopic } = this.state;

const filters = this._buildFilters();

this.getApi(uriTopicDataDownload(selectedCluster, selectedTopic, filters)).then(response => {
const a = document.createElement('a');
const type = 'application/json';
a.href = URL.createObjectURL(
new Blob([JSON.stringify(response.data, null, 2)], { type: type, endings: 'native' })
);
a.download = `${selectedTopic}.json`;

a.click();
a.remove();

this.setState({
showDownloadModal: false
});
});
};

_getNextPageOffsets = nextPage => {
let aux = nextPage.substring(nextPage.indexOf('after=') + 6);
let afterString = aux.substring(0, aux.indexOf('&'));
Expand Down Expand Up @@ -1026,6 +1050,26 @@ class TopicData extends Root {
</Dropdown.Menu>
</Dropdown>
</li>
<div
style={{ borderLeft: '1px solid ', padding: 0, margin: '0 10px' }}
className="nav-link"
></div>
<li>
<button
type="button"
aria-label="Toggle navigation"
onClick={() => {
this.setState({
showDownloadModal: true
});
}}
className="nav-link"
style={{ backgroundColor: 'transparent', borderColor: 'transparent' }}
>
<i className="fa fa-fw fa fa-download" aria-hidden="true" />
Download query result
</button>
</li>
</ul>
</div>
</nav>
Expand Down Expand Up @@ -1257,6 +1301,18 @@ class TopicData extends Root {
handleConfirm={this._deleteCompactMessage}
message={this.state.deleteMessage}
/>

<ConfirmModal
show={this.state.showDownloadModal}
handleCancel={() =>
this.setState({
showDownloadModal: false
})
}
handleConfirm={this._downloadAllMatchingFilters}
message="Do you want to download all the messages matching your filters ? If you did not set any, it will
download all the topic messages. For large topics, it can be slow and lead to high memory consumption."
/>
</React.Fragment>
);
}
Expand Down
8 changes: 8 additions & 0 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ export const uriTopicDataSearch = (clusterId, topicId, filters, offsets) => {
return uri;
};

export const uriTopicDataDownload = (clusterId, topicId, filters) => {
let uri = `${apiUrl}/${clusterId}/topic/${topicId}/data/download`;
if (filters) {
uri = uri + `?${filters}`;
}
return uri;
};

export const uriTopicDataSingleRecord = (clusterId, topicId, partition, offset) => {
let uri = `${apiUrl}/${clusterId}/topic/${topicId}/data/record/${partition}/${offset}`;
return uri;
Expand Down
102 changes: 101 additions & 1 deletion src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.akhq.controllers;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.micronaut.context.annotation.Value;
Expand All @@ -11,11 +12,13 @@
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.*;
import io.micronaut.http.server.types.files.StreamedFile;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.micronaut.security.rules.SecurityRule;
import io.reactivex.schedulers.Schedulers;
import io.swagger.v3.oas.annotations.Operation;
import lombok.*;
import org.akhq.configs.security.Role;
Expand All @@ -32,10 +35,11 @@
import org.reactivestreams.Publisher;
import org.akhq.models.Record;

import java.io.IOException;
import java.io.*;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import jakarta.inject.Inject;
Expand Down Expand Up @@ -437,6 +441,102 @@ public Publisher<Event<SearchRecord>> sse(
});
}

@AKHQSecured(resource = Role.Resource.TOPIC_DATA, action = Role.Action.READ)
@ExecuteOn(TaskExecutors.IO)
@Get(value = "api/{cluster}/topic/{topicName}/data/download")
@Operation(tags = {"topic data download"}, summary = "Download data for a topic")
public HttpResponse<StreamedFile> download(
String cluster,
String topicName,
Optional<String> after,
Optional<Integer> partition,
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> endTimestamp,
Optional<String> searchByKey,
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Optional<String> searchByHeaderValue,
Optional<String> searchByKeySubject,
Optional<String> searchByValueSubject
) throws ExecutionException, InterruptedException, IOException {
checkIfClusterAndResourceAllowed(cluster, topicName);

RecordRepository.Options options = dataSearchOptions(
cluster,
topicName,
after,
partition,
sort,
timestamp,
endTimestamp,
searchByKey,
searchByValue,
searchByHeaderKey,
searchByHeaderValue,
searchByKeySubject,
searchByValueSubject
);

// Set in MAX_POLL_RECORDS_CONFIG, big number increases speed
options.setSize(10000);

Topic topic = topicRepository.findByName(cluster, topicName);

ObjectMapper mapper = new ObjectMapper();
// For ZonedDatetime serialization
mapper.findAndRegisterModules();

PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);

new Thread(() -> {
try (out) {
out.write('[');

AtomicBoolean continueSearch = new AtomicBoolean(true);
AtomicBoolean isFirstBatch = new AtomicBoolean(true);

while(continueSearch.get()) {
recordRepository
.search(topic, options)
.observeOn(Schedulers.io())
.map(event -> {
if (!event.getData().getRecords().isEmpty()) {
if (!isFirstBatch.getAndSet(false)) {
// Add a comma between batches records
out.write(',');
}

byte[] bytes = mapper.writeValueAsString(event.getData().getRecords()).getBytes();
// Remove start [ and end ] to concatenate records in the same array
out.write(Arrays.copyOfRange(bytes, 1, bytes.length - 1));

} else {
// No more records, add the end array ] and stop here
if (event.getData().getEmptyPoll() == 1) {
out.write(']');
out.flush();
continueSearch.set(false);
}
else if (event.getData().getAfter() != null) {
// Continue to search from the last offsets
options.setAfter(event.getData().getAfter());
}
}

return 0;
}).blockingSubscribe();
}
} catch (IOException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}).start();

return HttpResponse.ok(new StreamedFile(in, MediaType.APPLICATION_JSON_TYPE));
}


@AKHQSecured(resource = Role.Resource.TOPIC_DATA, action = Role.Action.READ)
@Get("api/{cluster}/topic/{topicName}/data/record/{partition}/{offset}")
@Operation(tags = {"topic data"}, summary = "Get a single record by partition and offset")
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
@Getter
@NoArgsConstructor
public class Record {
@JsonIgnore
private Topic topic;
private int partition;
private long offset;
Expand Down Expand Up @@ -78,10 +79,13 @@ public class Record {
@Setter(AccessLevel.NONE)
private String value;

@JsonIgnore
private final List<String> exceptions = new ArrayList<>();

@Getter(AccessLevel.NONE)
private byte MAGIC_BYTE;

@JsonIgnore
private Boolean truncated;

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List<KeyValue<String, String>> headers, Topic topic) {
Expand Down Expand Up @@ -276,13 +280,15 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}
}

@JsonIgnore
public Collection<String> getHeadersKeySet() {
return headers
.stream()
.map(KeyValue::getKey)
.collect(Collectors.toList());
}

@JsonIgnore
public Collection<String> getHeadersValues() {
return headers
.stream()
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws
KafkaConsumer<byte[], byte[]> consumer = searchState.getConsumer();

// end
if (searchEvent == null || searchEvent.emptyPoll == 666) {
if (searchEvent == null || searchEvent.emptyPoll >= 1) {
emitter.onNext(new SearchEvent(topic).end(searchEvent != null ? searchEvent.after: null));
emitter.onComplete();
consumer.close();
Expand All @@ -701,7 +701,7 @@ public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws
ConsumerRecords<byte[], byte[]> records = this.poll(consumer);

if (records.isEmpty()) {
currentEvent.emptyPoll++;
currentEvent.emptyPoll = 1;
} else {
currentEvent.emptyPoll = 0;
}
Expand All @@ -728,13 +728,17 @@ public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws

currentEvent.records = list;

if (currentEvent.emptyPoll >= 1) {
currentEvent.emptyPoll = 666;
// No more records, poll was empty: stop here
if (currentEvent.emptyPoll == 1) {
emitter.onNext(currentEvent.end(searchEvent.getAfter()));
} else if (matchesCount.get() >= options.getSize()) {
}
// More records than expected, send the records and then stop
else if (matchesCount.get() >= options.getSize()) {
currentEvent.emptyPoll = 666;
emitter.onNext(currentEvent.progress(options));
} else {
}
// Continue to search
else {
emitter.onNext(currentEvent.progress(options));
}

Expand Down

0 comments on commit c89db95

Please sign in to comment.