Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable json inclusions in Avro to Json serializing #799

Merged
merged 2 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class Record {
private Deserializer kafkaProtoDeserializer;
@JsonIgnore
private Deserializer kafkaJsonDeserializer;
@JsonIgnore
private AvroToJsonSerializer avroToJsonSerializer;

@JsonIgnore
private SchemaRegistryClient client;
Expand Down Expand Up @@ -86,7 +88,9 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer,
AvroToJsonSerializer avroToJsonSerializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue, Topic topic) {
this.avroToJsonSerializer = avroToJsonSerializer;
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
Expand Down Expand Up @@ -154,7 +158,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}

Message dynamicMessage = (Message)toType;
return AvroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
return avroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
} else if ( schema.schemaType().equals(JsonSchema.TYPE) ) {
toType = kafkaJsonDeserializer.deserialize(topic.getName(), payload);
if ( !(toType instanceof JsonNode) ) {
Expand All @@ -173,7 +177,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}

GenericRecord record = (GenericRecord) toType;
return AvroToJsonSerializer.toJson(record);
return avroToJsonSerializer.toJson(record);

} catch (Exception exception) {
this.exceptions.add(exception.getMessage());
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.akhq.models.Topic;
import org.akhq.modules.AvroSerializer;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.Debug;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
Expand Down Expand Up @@ -49,6 +50,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private ConfigRepository configRepository;

@Inject
private AvroToJsonSerializer avroToJsonSerializer;

@Inject
private TopicRepository topicRepository;

Expand Down Expand Up @@ -433,6 +437,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId)),
Expand All @@ -450,6 +455,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(options.clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)),
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/akhq/utils/AvroDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class AvroDeserializer {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";

private static final DecimalConversion DECIMAL_CONVERSION = new DecimalConversion();
private static final UUIDConversion UUID_CONVERSION = new UUIDConversion();
private static final DateConversion DATE_CONVERSION = new DateConversion();
Expand Down
51 changes: 34 additions & 17 deletions src/main/java/org/akhq/utils/AvroToJsonSerializer.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,49 @@
package org.akhq.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import javax.inject.Singleton;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.util.Map;
import java.util.TimeZone;

@Singleton
public class AvroToJsonSerializer {
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
.registerModule(new JavaTimeModule())
.registerModule(new Jdk8Module())
.setTimeZone(TimeZone.getDefault());
private final ObjectMapper mapper;

public static String toJson(GenericRecord record) throws IOException {
Map<String, Object> map = AvroDeserializer.recordDeserializer(record);
public AvroToJsonSerializer(@Value("${akhq.avro-serializer.json.serialization.inclusions}") @Nullable List<Include> jsonInclusions) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are changing the default configuration, please add old default one on the application.yml please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the old default to src/main/resources/application.yml

  avro-serializer:
    json.serialization.inclusions:
      - NON_NULL
      - NON_EMPTY

List<Include> inclusions = jsonInclusions != null ? jsonInclusions : Collections.emptyList();
this.mapper = createObjectMapper(inclusions);
}

return MAPPER.writeValueAsString(map);
private ObjectMapper createObjectMapper(List<Include> jsonInclusions) {
ObjectMapper objectMapper = new ObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.registerModule(new JavaTimeModule())
.registerModule(new Jdk8Module())
.setTimeZone(TimeZone.getDefault());
System.out.println("Inclusions: " + jsonInclusions);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops.. done!

for (Include include : jsonInclusions) {
objectMapper = objectMapper.setSerializationInclusion(include);
}
return objectMapper;
}

public String toJson(GenericRecord record) throws IOException {
Map<String, Object> map = AvroDeserializer.recordDeserializer(record);
return mapper.writeValueAsString(map);
}

public static ObjectMapper getMapper() {
return MAPPER;
public ObjectMapper getMapper() {
return mapper;
}
}