Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable json inclusions in Avro to Json serializing #799

Merged
merged 2 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ akhq:
#### Pagination
* `akhq.pagination.page-size` number of topics per page (default : 25)

#### Avro Serializer
* `akhq.avro-serializer.json.serialization.inclusions` is list of ObjectMapper serialization inclusions that is used for converting Avro message to more
readable Json format in the UI. Supports Enums of JsonInclude.Include from Jackson library

#### Topic List
* `akhq.topic.internal-regexps` is list of regexp to be considered as internal (internal topic can't be deleted or updated)
* `akhq.topic.stream-regexps` is list of regexp to be considered as internal stream topic
Expand Down
7 changes: 7 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ akhq:
page-size: 25 # number of elements per page (default : 25)
threads: 16 # Number of parallel threads to resolve page

# Configure avro-to-json serializer
avro-serializer:
json.serialization.inclusions: # ObjectMapper serialization inclusions used for avro-to-json conversion for display in the UI.
# Supports Enums in JsonInclude.Include from Jackson library
- NON_NULL
- NON_EMPTY

# Topic list display options (optional)
topic:
retention: 172800000 # default retention period when creating topic
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class Record {
private Deserializer kafkaProtoDeserializer;
@JsonIgnore
private Deserializer kafkaJsonDeserializer;
@JsonIgnore
private AvroToJsonSerializer avroToJsonSerializer;

@JsonIgnore
private SchemaRegistryClient client;
Expand Down Expand Up @@ -85,7 +87,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
}

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue, Topic topic) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
Expand All @@ -109,6 +111,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
}

Expand Down Expand Up @@ -154,7 +157,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}

Message dynamicMessage = (Message)toType;
return AvroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
return avroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
} else if ( schema.schemaType().equals(JsonSchema.TYPE) ) {
toType = kafkaJsonDeserializer.deserialize(topic.getName(), payload);
if ( !(toType instanceof JsonNode) ) {
Expand All @@ -173,7 +176,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}

GenericRecord record = (GenericRecord) toType;
return AvroToJsonSerializer.toJson(record);
return avroToJsonSerializer.toJson(record);

} catch (Exception exception) {
this.exceptions.add(exception.getMessage());
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.akhq.models.Topic;
import org.akhq.modules.AvroSerializer;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.Debug;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
Expand Down Expand Up @@ -49,6 +50,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private ConfigRepository configRepository;

@Inject
private AvroToJsonSerializer avroToJsonSerializer;

@Inject
private TopicRepository topicRepository;

Expand Down Expand Up @@ -433,6 +437,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId)),
Expand All @@ -450,6 +455,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(options.clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)),
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/akhq/utils/AvroDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class AvroDeserializer {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";

private static final DecimalConversion DECIMAL_CONVERSION = new DecimalConversion();
private static final UUIDConversion UUID_CONVERSION = new UUIDConversion();
private static final DateConversion DATE_CONVERSION = new DateConversion();
Expand Down
50 changes: 33 additions & 17 deletions src/main/java/org/akhq/utils/AvroToJsonSerializer.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
package org.akhq.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import javax.inject.Singleton;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.util.Map;
import java.util.TimeZone;

@Singleton
public class AvroToJsonSerializer {
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
.registerModule(new JavaTimeModule())
.registerModule(new Jdk8Module())
.setTimeZone(TimeZone.getDefault());
private final ObjectMapper mapper;

public static String toJson(GenericRecord record) throws IOException {
Map<String, Object> map = AvroDeserializer.recordDeserializer(record);
public AvroToJsonSerializer(@Value("${akhq.avro-serializer.json.serialization.inclusions}") @Nullable List<Include> jsonInclusions) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are changing the default configuration, please add old default one on the application.yml please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the old default to src/main/resources/application.yml

  avro-serializer:
    json.serialization.inclusions:
      - NON_NULL
      - NON_EMPTY

List<Include> inclusions = jsonInclusions != null ? jsonInclusions : Collections.emptyList();
this.mapper = createObjectMapper(inclusions);
}

return MAPPER.writeValueAsString(map);
private ObjectMapper createObjectMapper(List<Include> jsonInclusions) {
ObjectMapper objectMapper = new ObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.registerModule(new JavaTimeModule())
.registerModule(new Jdk8Module())
.setTimeZone(TimeZone.getDefault());
for (Include include : jsonInclusions) {
objectMapper = objectMapper.setSerializationInclusion(include);
}
return objectMapper;
}

public String toJson(GenericRecord record) throws IOException {
Map<String, Object> map = AvroDeserializer.recordDeserializer(record);
return mapper.writeValueAsString(map);
}

public static ObjectMapper getMapper() {
return MAPPER;
public ObjectMapper getMapper() {
return mapper;
}
}
5 changes: 5 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ akhq:
page-size: 25
threads: 16

avro-serializer:
json.serialization.inclusions:
- NON_NULL
- NON_EMPTY

topic:
replication: 1
retention: 86400000
Expand Down