Skip to content

Commit

Permalink
Kafka: Add possibility to push null avro message with a string key (#372
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alexandrepa authored Oct 7, 2024
1 parent d078b1a commit ce8942a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,38 +387,41 @@ public boolean isJsonMessageType(String name) {
}

private ProducerRecord<GenericRecord, GenericRecord> mapToAvroKeyMessageRecord(Schema schemaMessage, Schema schemaKey, String topic, Map<?, Object> avroRecord) {
GenericRecordBuilder genericRecordBuilderMessage = new GenericRecordBuilder(schemaMessage);
if (avroRecord.get("value") != null) {
((Map<String, Object>) avroRecord.get("value"))
.forEach((fieldName, value) -> genericRecordBuilderMessage.set(fieldName, wrapIn(value, schemaMessage.getField(fieldName).schema())));
}
GenericRecord genericRecordMessage = buildGenericRecordMessage(schemaMessage, avroRecord);

GenericRecordBuilder genericRecordBuilderKey = new GenericRecordBuilder(schemaKey);
Map<String, Object> keyValue = (Map<String, Object>) avroRecord.get("key");
keyValue.forEach((fieldName, value) -> genericRecordBuilderKey.set(fieldName, wrapIn(value, schemaKey.getField(fieldName).schema())));
GenericData.Record recordKey = genericRecordBuilderKey.build();

ProducerRecord<GenericRecord, GenericRecord> producerRecord = new ProducerRecord<>(topic, recordKey, genericRecordBuilderMessage.build());
ProducerRecord<GenericRecord, GenericRecord> producerRecord = new ProducerRecord<>(topic, recordKey, genericRecordMessage);
((Map<String, String>) avroRecord.get("headers"))
.forEach((key, value) -> producerRecord.headers().add(key, value.getBytes(UTF_8)));

return producerRecord;
}

private ProducerRecord<String, GenericRecord> mapToAvroRecord(Schema schema, String topic, Map<String, Object> avroRecord) {
GenericRecordBuilder genericRecordBuilderMessage = new GenericRecordBuilder(schema);
((Map<String, Object>) avroRecord.get("value"))
.forEach((fieldName, value) -> genericRecordBuilderMessage.set(fieldName, wrapIn(value, schema.getField(fieldName).schema())));
GenericRecord genericRecordMessage = buildGenericRecordMessage(schema, avroRecord);

String messageKey = (String) avroRecord.get("key");

ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(topic, messageKey, genericRecordBuilderMessage.build());
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(topic, messageKey, genericRecordMessage);
((Map<String, String>) avroRecord.get("headers"))
.forEach((key, value) -> producerRecord.headers().add(key, value.getBytes(UTF_8)));

return producerRecord;
}

private @NotNull GenericRecord buildGenericRecordMessage(Schema schemaMessage, Map<?, Object> avroRecord) {
GenericRecordBuilder genericRecordBuilderMessage = new GenericRecordBuilder(schemaMessage);
if (avroRecord.get("value") != null) {
((Map<String, Object>) avroRecord.get("value"))
.forEach((fieldName, value) -> genericRecordBuilderMessage.set(fieldName, wrapIn(value, schemaMessage.getField(fieldName).schema())));
}
return genericRecordBuilderMessage.build();
}

private Object wrapIn(Object value, Schema schema) {
if (schema.getType().equals(Schema.Type.RECORD)) {
GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,32 @@ Feature: to interact with a spring boot service having a connection to a kafka q
- "?e .*received user with messageKey a-key on users-with-key-0@0: \\{\"id\": 1, \"name\": \"bob\"}"
"""

Scenario: we can push a message with a key in a kafka topic
Given this avro schema:
"""yml
type: record
name: user
fields:
- name: id
type: ["null", "int"]
default: null
- name: name
type: ["null", "string"]
default: null
"""
When these users are consumed from the users-with-key topic:
"""yml
headers:
uuid: some-id
value: null
key: a-key
"""
Then we have received 1 messages on the topic users-with-key
And the logs contain:
"""yml
- "?e .*received user with messageKey a-key on users-with-key-0@0: \\{\"id\": null, \"name\": null}"
"""

Scenario: we can push a message with an avro key in a kafka topic
Given this avro schema:
"""yml
Expand Down

0 comments on commit ce8942a

Please sign in to comment.