Skip to content

Commit

Permalink
fix(ui): enhacement: prevent UI freezing with large messages (#970)
Browse files Browse the repository at this point in the history
also add a single messages downloading
close  #564
  • Loading branch information
EdwinFajardoBarrera committed Jan 5, 2022
1 parent f895f5a commit 7e99277
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 4 deletions.
1 change: 1 addition & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ akhq:
topic-data:
size: 50 # max record per page (default: 50)
poll-timeout: 1000 # The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
kafka-max-message-length: 1000000 # Max message length allowed to send to UI when retrieving a list of records in bytes.

# Ui Global Options (optional)
ui-options:
Expand Down
14 changes: 13 additions & 1 deletion client/src/components/Table/Table.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class Table extends Component {
}

renderActions(row) {
const { actions, onAdd, onDetails, onConfig, onDelete, onEdit, onRestart, onShare, idCol } = this.props;
const { actions, onAdd, onDetails, onConfig, onDelete, onEdit, onRestart, onShare, onDownload, idCol } = this.props;

let idColVal = idCol ? row[this.props.idCol] : row.id;

Expand Down Expand Up @@ -374,6 +374,18 @@ class Table extends Component {
</span>
</td>
)}
{actions.find(el => el === constants.TABLE_DOWNLOAD) && (
<td className="khq-row-action khq-row-action-main action-hover">
<span title="Download"
id="download"
onClick={() => {
onDownload && onDownload(row);
}}
>
<i className="fa fa-download" />
</span>
</td>
)}
</>
);
}
Expand Down
31 changes: 28 additions & 3 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class TopicData extends Root {
roles: JSON.parse(sessionStorage.getItem('roles')),
canDeleteRecords: false,
percent: 0,
loading: true
loading: true,
canDownload: false
};

searchFilterTypes = [
Expand Down Expand Up @@ -113,6 +114,7 @@ class TopicData extends Root {
() => {
if(query.get('single') !== null) {
this._getSingleMessage(query.get('partition'), query.get('offset'));
this.setState({ canDownload: true })
} else {
this._getMessages();
}
Expand Down Expand Up @@ -346,10 +348,23 @@ class TopicData extends Root {
console.error('Failed to copy: ', err);
}

this.setState({ canDownload: true })

this.props.history.push(pathToShare)
this._getSingleMessage(row.partition, row.offset);
}

_handleDownload({ key, value: data }) {
const hasKey = key && key !== null && key !== 'null';

const a = document.createElement('a');
a.href = URL.createObjectURL( new Blob([data], { type:'text/json' }) );
a.download = `${hasKey ? key : 'file'}.json`;

a.click();
a.remove();
}

_showDeleteModal = deleteMessage => {
this.setState({ showDeleteModal: true, deleteMessage });
};
Expand Down Expand Up @@ -383,7 +398,9 @@ class TopicData extends Root {
messages.forEach(message => {
let messageToPush = {
key: message.key || 'null',
value: message.value || 'null',
value: message.truncated
? message.value + '...\nToo large message. Full body in share button.' || 'null'
: message.value || 'null',
timestamp: message.timestamp,
partition: JSON.stringify(message.partition) || '',
offset: JSON.stringify(message.offset) || '',
Expand Down Expand Up @@ -642,9 +659,14 @@ class TopicData extends Root {
datetime,
isSearching,
canDeleteRecords,
canDownload,
percent,
loading
} = this.state;

let actions = canDeleteRecords ? [constants.TABLE_DELETE, constants.TABLE_SHARE] : [constants.TABLE_SHARE]
if (canDownload) actions.push(constants.TABLE_DOWNLOAD)

let date = moment(datetime);
const { history } = this.props;
const firstColumns = [
Expand Down Expand Up @@ -965,7 +987,10 @@ class TopicData extends Root {
onShare={row => {
this._handleOnShare(row);
}}
actions={canDeleteRecords ? [constants.TABLE_DELETE, constants.TABLE_SHARE] : [constants.TABLE_SHARE]}
onDownload={row => {
this._handleDownload(row);
}}
actions={actions}
onExpand={obj => {
return Object.keys(obj.headers).map(header => {
return (
Expand Down
2 changes: 2 additions & 0 deletions client/src/utils/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export const TABLE_DETAILS = 'details';
export const TABLE_CONFIG = 'config';
export const TABLE_RESTART = 'restart';
export const TABLE_SHARE = 'share';
export const TABLE_DOWNLOAD = 'download'

// Tab names/route names
export const CLUSTER = 'cluster';
Expand Down Expand Up @@ -65,6 +66,7 @@ export default {
TABLE_CONFIG,
TABLE_RESTART,
TABLE_SHARE,
TABLE_DOWNLOAD,
CLUSTER,
NODE,
TOPIC,
Expand Down
1 change: 1 addition & 0 deletions docs/docs/configuration/akhq.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ These parameters are the default values used in the topic creation page.
## Topic Data
* `akhq.topic-data.size`: max record per page (default: 50)
* `akhq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the buffer (default: 1000).
* `akhq.topic-data.kafka-max-message-length`: Max message length allowed to send to UI when retrieving a list of records (dafault: 1000000 bytes).

## Ui Settings
### Topics
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.micronaut.context.annotation.Value;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.transaction.TransactionLog;
import kafka.coordinator.transaction.TxnKey;
Expand Down Expand Up @@ -71,12 +72,15 @@ public class Record {
private byte[] bytesValue;

@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
private String value;

private final List<String> exceptions = new ArrayList<>();

private byte MAGIC_BYTE;

private Boolean truncated;

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map<String, String> headers, Topic topic) {
this.MAGIC_BYTE = schemaRegistryType.getMagicByte();
this.topic = topic;
Expand All @@ -88,6 +92,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
this.headers = headers;
this.truncated = false;
}

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Expand Down Expand Up @@ -118,6 +123,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
this.truncated = false;
}

public String getKey() {
Expand Down Expand Up @@ -145,6 +151,14 @@ public String getValue() {
return this.value;
}

public void setValue(String value) {
this.value = value;
}

public void setTruncated(Boolean truncated) {
this.truncated = truncated;
}

private String convertToString(byte[] payload, Integer schemaId, boolean isKey) {
if (payload == null) {
return null;
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.header.internals.RecordHeader;
import org.codehaus.httpcache4j.uri.URIBuilder;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class RecordRepository extends AbstractRepository {
@Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}")
protected int maxPollRecords;

@Value("${akhq.topic-data.kafka-max-message-length}")
private int maxKafkaMessageLength;

public Map<String, Record> getLastRecord(String clusterId, List<String> topicsName) throws ExecutionException, InterruptedException {
Map<String, Topic> topics = topicRepository.findByName(clusterId, topicsName).stream()
.collect(Collectors.toMap(Topic::getName, Function.identity()));
Expand Down Expand Up @@ -153,6 +157,7 @@ private List<Record> consumeOldest(Topic topic, Options options) {
for (ConsumerRecord<byte[], byte[]> record : records) {
Record current = newRecord(record, options, topic);
if (searchFilter(options, current)) {
filterMessageLength(current);
list.add(current);
}
}
Expand Down Expand Up @@ -311,6 +316,7 @@ private List<Record> consumeNewest(Topic topic, Options options) {
}
Record current = newRecord(record, options, topic);
if (searchFilter(options, current)) {
filterMessageLength(current);
list.add(current);
}
}
Expand Down Expand Up @@ -1268,5 +1274,14 @@ private static class EndOffsetBound {
private final long end;
private final KafkaConsumer<byte[], byte[]> consumer;
}

private void filterMessageLength(Record record) {
int bytesLength = record.getValue().getBytes(StandardCharsets.UTF_8).length;
if (bytesLength > maxKafkaMessageLength) {
int substringChars = maxKafkaMessageLength / 1000;
record.setValue(record.getValue().substring(0, substringChars));
record.setTruncated(true);
}
}
}

1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ akhq:
topic-data:
size: 50
poll-timeout: 1000
kafka-max-message-length: 1000000

security:
default-group: admin
Expand Down

0 comments on commit 7e99277

Please sign in to comment.