Skip to content

Commit

Permalink
feat(topicdata): support duplicate header keys (#1258)
Browse files Browse the repository at this point in the history
close #1257 

Co-authored-by: rafanyan <raphael.afanyan@nexthink.com>
  • Loading branch information
raph-af and raphael-afanyan committed Nov 9, 2022
1 parent b504381 commit 47d3b36
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 24 deletions.
10 changes: 5 additions & 5 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ class TopicData extends Root {
timestamp: message.timestamp,
partition: JSON.stringify(message.partition) || '',
offset: JSON.stringify(message.offset) || '',
headers: message.headers || {},
headers: message.headers || [],
schema: { key: message.keySchemaId, value: message.valueSchemaId },
exceptions: message.exceptions || []
};
Expand Down Expand Up @@ -1018,7 +1018,7 @@ class TopicData extends Root {
type: 'text',
expand: true,
cell: obj => {
return <div className="tail-headers">{Object.keys(obj.headers).length}</div>;
return <div className="tail-headers">{obj.headers.length}</div>;
}
},
{
Expand Down Expand Up @@ -1078,7 +1078,7 @@ class TopicData extends Root {
}}
actions={actions}
onExpand={obj => {
return Object.keys(obj.headers).map(header => {
return obj.headers.map(header => {
return (
<tr
className={'table-sm'}
Expand All @@ -1097,7 +1097,7 @@ class TopicData extends Root {
backgroundColor: '#171819'
}}
>
{header}
{header.key}
</td>
<td
style={{
Expand All @@ -1108,7 +1108,7 @@ class TopicData extends Root {
backgroundColor: '#171819'
}}
>
{obj.headers[header]}
{header.value}
</td>
</tr>
);
Expand Down
7 changes: 5 additions & 2 deletions client/src/containers/Topic/TopicProduce/TopicProduce.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,14 @@ class TopicProduce extends Form {
keyValueSeparator: formData.keyValueSeparator
};

let headers = {};
const headers = [];
Object.keys(formData).forEach(key => {
if (key.includes('hKey')) {
let keyNumbers = key.replace(/\D/g, '');
headers[formData[key]] = formData[`hValue${keyNumbers}`];
headers.push({
key: formData[key],
value: formData[`hValue${keyNumbers}`]
});
}
});

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public List<Record> produce(
Optional<String> key,
Optional<Integer> partition,
Optional<String> timestamp,
Map<String, String> headers,
List<KeyValue<String, String>> headers,
Optional<Integer> keySchema,
Optional<Integer> valueSchema,
Boolean multiMessage,
Expand Down Expand Up @@ -300,7 +300,7 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio
schemaRegistryRepository.getSchemaRegistryType(cluster),
Base64.getDecoder().decode(key),
null,
new HashMap<>(),
new ArrayList<>(),
topicRepository.findByName(cluster, topicName)
);
}
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;

@ToString
@EqualsAndHashCode
Expand All @@ -43,7 +44,7 @@ public class Record {
private TimestampType timestampType;
private Integer keySchemaId;
private Integer valueSchemaId;
private Map<String, String> headers = new HashMap<>();
private List<KeyValue<String, String>> headers = new ArrayList<>();
@JsonIgnore
private Deserializer kafkaAvroDeserializer;
@JsonIgnore
Expand Down Expand Up @@ -81,7 +82,7 @@ public class Record {

private Boolean truncated;

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map<String, String> headers, Topic topic) {
public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List<KeyValue<String, String>> headers, Topic topic) {
this.MAGIC_BYTE = schemaRegistryType.getMagicByte();
this.topic = topic;
this.partition = record.partition();
Expand Down Expand Up @@ -114,7 +115,8 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
for (Header header: record.headers()) {
this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null);
String headerValue = header.value() != null ? new String(header.value()) : null;
this.headers.add(new KeyValue<>(header.key(), headerValue));
}

this.kafkaAvroDeserializer = kafkaAvroDeserializer;
Expand Down Expand Up @@ -264,6 +266,20 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}
}

public Collection<String> getHeadersKeySet() {
return headers
.stream()
.map(KeyValue::getKey)
.collect(Collectors.toList());
}

public Collection<String> getHeadersValues() {
return headers
.stream()
.map(KeyValue::getValue)
.collect(Collectors.toList());
}

private Integer getAvroSchemaId(byte[] payload) {
if (topic.isInternalTopic()) {
return null;
Expand All @@ -281,4 +297,5 @@ private Integer getAvroSchemaId(byte[] payload) {
}
return null;
}

}
13 changes: 6 additions & 7 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public List<RecordMetadata> produce(
String clusterId,
String topic,
Optional<String> value,
Map<String, String> headers,
List<KeyValue<String, String>> headers,
Optional<String> key,
Optional<Integer> partition,
Optional<Long> timestamp,
Expand Down Expand Up @@ -509,7 +509,7 @@ public List<RecordMetadata> produce(
private RecordMetadata produce(
String clusterId,
String topic, byte[] value,
Map<String, String> headers,
List<KeyValue<String, String>> headers,
byte[] key,
Optional<Integer> partition,
Optional<Long> timestamp
Expand All @@ -522,8 +522,7 @@ private RecordMetadata produce(
timestamp.orElse(null),
key,
value,
(headers == null ? ImmutableMap.<String, String>of() : headers)
.entrySet()
headers == null ? Collections.emptyList() : headers
.stream()
.filter(entry -> StringUtils.isNotEmpty(entry.getKey()))
.map(entry -> new RecordHeader(
Expand Down Expand Up @@ -592,7 +591,7 @@ public RecordMetadata produce(
String clusterId,
String topic,
Optional<String> value,
Map<String, String> headers,
List<KeyValue<String, String>> headers,
Optional<String> key,
Optional<Integer> partition,
Optional<Long> timestamp,
Expand Down Expand Up @@ -743,13 +742,13 @@ private static boolean searchFilter(BaseOptions options, Record record) {
}

if (options.getSearchByHeaderKey() != null) {
if (!search(options.getSearchByHeaderKey(), record.getHeaders().keySet())) {
if (!search(options.getSearchByHeaderKey(), record.getHeadersKeySet())) {
return false;
}
}

if (options.getSearchByHeaderValue() != null) {
return search(options.getSearchByHeaderValue(), record.getHeaders().values());
return search(options.getSearchByHeaderValue(), record.getHeadersValues());
}
}
return true;
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/org/akhq/controllers/TopicControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ void produce() {
paramMap.put("value", "my-value");
paramMap.put("key", "my-key");
paramMap.put("partition", 1);
paramMap.put("headers", ImmutableMap.of(
"my-header-1", "1",
"my-header-2", "2"));
paramMap.put("headers", List.of(
new KeyValue<>("my-header-1", "1"),
new KeyValue<>("my-header-2", "2")));
paramMap.put("multiMessage", false);
List<Record> response = this.retrieveList(HttpRequest.POST(
CREATE_TOPIC_URL + "/data", paramMap
Expand All @@ -190,7 +190,7 @@ void produce() {
assertEquals("my-value", response.get(0).getValue());
assertEquals(1, response.get(0).getPartition());
assertEquals(2, response.get(0).getHeaders().size());
assertEquals("1", response.get(0).getHeaders().get("my-header-1"));
assertEquals("1", response.get(0).getHeaders().get(0).getValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void produceAndConsumeRecordUsingJsonSchema() throws ExecutionException, Interru
KafkaTestCluster.CLUSTER_ID,
KafkaTestCluster.TOPIC_JSON_SCHEMA,
Optional.of(recordAsJsonString),
Collections.emptyMap(),
Collections.emptyList(),
Optional.of(keyJsonString),
Optional.empty(),
Optional.empty(),
Expand Down

0 comments on commit 47d3b36

Please sign in to comment.