From 47d3b363858d976a1b1ed0d54bc39dc34b6597ee Mon Sep 17 00:00:00 2001 From: Raphael Date: Wed, 9 Nov 2022 07:42:55 +0100 Subject: [PATCH] feat(topicdata): support duplicate header keys (#1258) close #1257 Co-authored-by: rafanyan --- .../Topic/Topic/TopicData/TopicData.jsx | 10 ++++---- .../Topic/TopicProduce/TopicProduce.jsx | 7 ++++-- .../org/akhq/controllers/TopicController.java | 4 ++-- src/main/java/org/akhq/models/Record.java | 23 ++++++++++++++++--- .../akhq/repositories/RecordRepository.java | 13 +++++------ .../akhq/controllers/TopicControllerTest.java | 8 +++---- .../repositories/RecordRepositoryTest.java | 2 +- 7 files changed, 43 insertions(+), 24 deletions(-) diff --git a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx index 25e9b612d..4739b723d 100644 --- a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx +++ b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx @@ -452,7 +452,7 @@ class TopicData extends Root { timestamp: message.timestamp, partition: JSON.stringify(message.partition) || '', offset: JSON.stringify(message.offset) || '', - headers: message.headers || {}, + headers: message.headers || [], schema: { key: message.keySchemaId, value: message.valueSchemaId }, exceptions: message.exceptions || [] }; @@ -1018,7 +1018,7 @@ class TopicData extends Root { type: 'text', expand: true, cell: obj => { - return
{Object.keys(obj.headers).length}
; + return
{obj.headers.length}
; } }, { @@ -1078,7 +1078,7 @@ class TopicData extends Root { }} actions={actions} onExpand={obj => { - return Object.keys(obj.headers).map(header => { + return obj.headers.map(header => { return ( - {header} + {header.key} - {obj.headers[header]} + {header.value} ); diff --git a/client/src/containers/Topic/TopicProduce/TopicProduce.jsx b/client/src/containers/Topic/TopicProduce/TopicProduce.jsx index 1850d5b0c..ee6f32ace 100644 --- a/client/src/containers/Topic/TopicProduce/TopicProduce.jsx +++ b/client/src/containers/Topic/TopicProduce/TopicProduce.jsx @@ -185,11 +185,14 @@ class TopicProduce extends Form { keyValueSeparator: formData.keyValueSeparator }; - let headers = {}; + const headers = []; Object.keys(formData).forEach(key => { if (key.includes('hKey')) { let keyNumbers = key.replace(/\D/g, ''); - headers[formData[key]] = formData[`hValue${keyNumbers}`]; + headers.push({ + key: formData[key], + value: formData[`hValue${keyNumbers}`] + }); } }); diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 02e42be22..c170f2687 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -145,7 +145,7 @@ public List produce( Optional key, Optional partition, Optional timestamp, - Map headers, + List> headers, Optional keySchema, Optional valueSchema, Boolean multiMessage, @@ -300,7 +300,7 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio schemaRegistryRepository.getSchemaRegistryType(cluster), Base64.getDecoder().decode(key), null, - new HashMap<>(), + new ArrayList<>(), topicRepository.findByName(cluster, topicName) ); } diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index 53fd6fec6..243eb1939 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -29,6 +29,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.*; +import java.util.stream.Collectors; @ToString @EqualsAndHashCode @@ -43,7 +44,7 @@ public class Record { private TimestampType timestampType; private Integer keySchemaId; private Integer valueSchemaId; - private Map headers = new HashMap<>(); + private List> headers = new ArrayList<>(); @JsonIgnore private Deserializer kafkaAvroDeserializer; @JsonIgnore @@ -81,7 +82,7 @@ public class Record { private Boolean truncated; - public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map headers, Topic topic) { + public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List> headers, Topic topic) { this.MAGIC_BYTE = schemaRegistryType.getMagicByte(); this.topic = topic; this.partition = record.partition(); @@ -114,7 +115,8 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.bytesValue = bytesValue; this.valueSchemaId = getAvroSchemaId(this.bytesValue); for (Header header: record.headers()) { - this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null); + String headerValue = header.value() != null ? new String(header.value()) : null; + this.headers.add(new KeyValue<>(header.key(), headerValue)); } this.kafkaAvroDeserializer = kafkaAvroDeserializer; @@ -264,6 +266,20 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey) } } + public Collection getHeadersKeySet() { + return headers + .stream() + .map(KeyValue::getKey) + .collect(Collectors.toList()); + } + + public Collection getHeadersValues() { + return headers + .stream() + .map(KeyValue::getValue) + .collect(Collectors.toList()); + } + private Integer getAvroSchemaId(byte[] payload) { if (topic.isInternalTopic()) { return null; @@ -281,4 +297,5 @@ private Integer getAvroSchemaId(byte[] payload) { } return null; } + } diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index c00b19ab4..1c6a57720 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -481,7 +481,7 @@ public List produce( String clusterId, String topic, Optional value, - Map headers, + List> headers, Optional key, Optional partition, Optional timestamp, @@ -509,7 +509,7 @@ public List produce( private RecordMetadata produce( String clusterId, String topic, byte[] value, - Map headers, + List> headers, byte[] key, Optional partition, Optional timestamp @@ -522,8 +522,7 @@ private RecordMetadata produce( timestamp.orElse(null), key, value, - (headers == null ? ImmutableMap.of() : headers) - .entrySet() + headers == null ? Collections.emptyList() : headers .stream() .filter(entry -> StringUtils.isNotEmpty(entry.getKey())) .map(entry -> new RecordHeader( @@ -592,7 +591,7 @@ public RecordMetadata produce( String clusterId, String topic, Optional value, - Map headers, + List> headers, Optional key, Optional partition, Optional timestamp, @@ -743,13 +742,13 @@ private static boolean searchFilter(BaseOptions options, Record record) { } if (options.getSearchByHeaderKey() != null) { - if (!search(options.getSearchByHeaderKey(), record.getHeaders().keySet())) { + if (!search(options.getSearchByHeaderKey(), record.getHeadersKeySet())) { return false; } } if (options.getSearchByHeaderValue() != null) { - return search(options.getSearchByHeaderValue(), record.getHeaders().values()); + return search(options.getSearchByHeaderValue(), record.getHeadersValues()); } } return true; diff --git a/src/test/java/org/akhq/controllers/TopicControllerTest.java b/src/test/java/org/akhq/controllers/TopicControllerTest.java index d1f3193b3..861625f14 100644 --- a/src/test/java/org/akhq/controllers/TopicControllerTest.java +++ b/src/test/java/org/akhq/controllers/TopicControllerTest.java @@ -177,9 +177,9 @@ void produce() { paramMap.put("value", "my-value"); paramMap.put("key", "my-key"); paramMap.put("partition", 1); - paramMap.put("headers", ImmutableMap.of( - "my-header-1", "1", - "my-header-2", "2")); + paramMap.put("headers", List.of( + new KeyValue<>("my-header-1", "1"), + new KeyValue<>("my-header-2", "2"))); paramMap.put("multiMessage", false); List response = this.retrieveList(HttpRequest.POST( CREATE_TOPIC_URL + "/data", paramMap @@ -190,7 +190,7 @@ void produce() { assertEquals("my-value", response.get(0).getValue()); assertEquals(1, response.get(0).getPartition()); assertEquals(2, response.get(0).getHeaders().size()); - assertEquals("1", response.get(0).getHeaders().get("my-header-1")); + assertEquals("1", response.get(0).getHeaders().get(0).getValue()); } @Test diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java index 9bf7b10f6..8d98832c6 100644 --- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java @@ -251,7 +251,7 @@ void produceAndConsumeRecordUsingJsonSchema() throws ExecutionException, Interru KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_JSON_SCHEMA, Optional.of(recordAsJsonString), - Collections.emptyMap(), + Collections.emptyList(), Optional.of(keyJsonString), Optional.empty(), Optional.empty(),