Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement logical types conversion for serializer/deserializer #237

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class GlueSchemaRegistryConfiguration {
private List<SerializationFeature> jacksonSerializationFeatures;
private List<DeserializationFeature> jacksonDeserializationFeatures;

private boolean logicalTypesConversionEnabled;

public GlueSchemaRegistryConfiguration(String region) {
Map<String, Object> config = new HashMap<>();
config.put(AWSSchemaRegistryConstants.AWS_REGION, region);
Expand Down Expand Up @@ -104,6 +106,7 @@ private void buildSchemaRegistryConfigs(Map<String, ?> configs) {
validateAndSetUserAgent(configs);
validateAndSetSecondaryDeserializer(configs);
validateAndSetProxyUrl(configs);
validateAndSetLogicalTypesConversionEnabled(configs);
}

private void validateAndSetSecondaryDeserializer(Map<String, ?> configs) {
Expand All @@ -130,6 +133,12 @@ private void validateAndSetUserAgent(Map<String, ?> configs) {
}
}

private void validateAndSetLogicalTypesConversionEnabled(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED)) {
this.logicalTypesConversionEnabled = (Boolean) configs.get(AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED);
}
}

private void validateAndSetCompressionType(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.COMPRESSION_TYPE) && validateCompressionType(
(String) configs.get(AWSSchemaRegistryConstants.COMPRESSION_TYPE))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public final class AWSSchemaRegistryConstants {
*/
public static final String USER_AGENT_APP = "userAgentApp";

/**
* Boolean indicating if logical types in avro data must be converted or not.
*/
public static final String LOGICAL_TYPES_CONVERSION_ENABLED = "logicalTypesConversionEnabled";

/**
* Private constructor to avoid initialization of the class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize
@Setter
private AvroRecordType avroRecordType;

@Setter
private boolean logicalTypesConversionEnabled;

@NonNull
@Getter
@VisibleForTesting
Expand All @@ -65,6 +68,7 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize
public AvroDeserializer(GlueSchemaRegistryConfiguration configs) {
this.schemaRegistrySerDeConfigs = configs;
this.avroRecordType = configs.getAvroRecordType();
this.logicalTypesConversionEnabled = configs.isLogicalTypesConversionEnabled();
this.datumReaderCache =
CacheBuilder
.newBuilder()
Expand Down Expand Up @@ -111,7 +115,7 @@ private BinaryDecoder getBinaryDecoder(byte[] data, int start, int end) {
private class DatumReaderCache extends CacheLoader<String, DatumReader<Object>> {
@Override
public DatumReader<Object> load(String schema) throws Exception {
return DatumReaderInstance.from(schema, avroRecordType);
return DatumReaderInstance.from(schema, avroRecordType, logicalTypesConversionEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class DatumReaderInstance {
* @throws IllegalAccessException can be thrown readerClass.newInstance() from
* java.lang.Class implementation
*/
public static DatumReader<Object> from(String writerSchemaDefinition, AvroRecordType avroRecordType)
public static DatumReader<Object> from(String writerSchemaDefinition, AvroRecordType avroRecordType, boolean logicalTypesConversionEnabled)
throws InstantiationException, IllegalAccessException {

Schema writerSchema = AVRO_UTILS.parseSchema(writerSchemaDefinition);
Expand All @@ -47,7 +47,11 @@ public static DatumReader<Object> from(String writerSchemaDefinition, AvroRecord
case GENERIC_RECORD:
log.debug("Using GenericDatumReader for de-serializing Avro message, schema: {})",
writerSchema.toString());
return new GenericDatumReader<>(writerSchema);
if (logicalTypesConversionEnabled) {
return new GenericDatumReader<>(writerSchema, writerSchema, GenericDataWithLogicalTypesConversion.getInstance());
} else {
return new GenericDatumReader<>(writerSchema);
}

default:
String message = String.format("Unsupported AvroRecordType: %s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.amazonaws.services.schemaregistry.deserializers.avro;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;

@Slf4j
public class GenericDataWithLogicalTypesConversion {
private static final GenericData INSTANCE = new GenericData();

static {
INSTANCE.addLogicalTypeConversion(new Conversions.DecimalConversion());
INSTANCE.addLogicalTypeConversion(new Conversions.UUIDConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.DateConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
INSTANCE.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

public static GenericData getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public GlueSchemaRegistryDataFormatSerializer getInstance(@NonNull DataFormat da
@NonNull GlueSchemaRegistryConfiguration glueSchemaRegistryConfig) {
switch (dataFormat) {
case AVRO:
this.serializerMap.computeIfAbsent(dataFormat, key -> new AvroSerializer());
this.serializerMap.computeIfAbsent(dataFormat, key -> new AvroSerializer(glueSchemaRegistryConfig));

log.debug("Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory");
return this.serializerMap.get(dataFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.amazonaws.services.schemaregistry.serializers.avro;

import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatSerializer;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
Expand Down Expand Up @@ -45,12 +46,14 @@
public class AvroSerializer implements GlueSchemaRegistryDataFormatSerializer {
private AVROUtils avroUtils = AVROUtils.getInstance();
private static final long MAX_DATUM_WRITER_CACHE_SIZE = 100;
private final boolean logicalTypesConversionEnabled;

@NonNull
@VisibleForTesting
protected final LoadingCache<DatumWriterCacheKey, DatumWriter<Object>> datumWriterCache;

public AvroSerializer() {
public AvroSerializer(GlueSchemaRegistryConfiguration glueSchemaRegistryConfig) {
this.logicalTypesConversionEnabled = glueSchemaRegistryConfig.isLogicalTypesConversionEnabled();
this.datumWriterCache =
CacheBuilder
.newBuilder()
Expand All @@ -61,7 +64,7 @@ public AvroSerializer() {
@Override
public byte[] serialize(Object data) {
byte[] bytes;
bytes = serialize(data, createDatumWriter(data));
bytes = serialize(data, createDatumWriter(data, logicalTypesConversionEnabled));

return bytes;
}
Expand All @@ -74,19 +77,19 @@ public byte[] serialize(Object data) {
* @param object the Avro message
* @return Avro datum writer for serialization
*/
private DatumWriter<Object> createDatumWriter(Object object) {
private DatumWriter<Object> createDatumWriter(Object object, boolean logicalTypesConversionEnabled) {
org.apache.avro.Schema schema = AVROUtils.getInstance()
.getSchema(object);
if (object instanceof SpecificRecord) {
return getSpecificDatumWriter(schema);
} else if (object instanceof GenericRecord) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else if (object instanceof GenericData.EnumSymbol) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else if (object instanceof GenericData.Array) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else if (object instanceof GenericData.Fixed) {
return getGenericDatumWriter(schema);
return getGenericDatumWriter(schema, logicalTypesConversionEnabled);
} else {
String message =
String.format("Unsupported type passed for serialization: %s", object);
Expand All @@ -96,13 +99,13 @@ private DatumWriter<Object> createDatumWriter(Object object) {

@SneakyThrows
private DatumWriter<Object> getSpecificDatumWriter(Schema schema) {
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.SPECIFIC_RECORD);
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.SPECIFIC_RECORD, false);
return datumWriterCache.get(datumWriterCacheKey);
}

@SneakyThrows
private DatumWriter<Object> getGenericDatumWriter(Schema schema) {
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.GENERIC_RECORD);
private DatumWriter<Object> getGenericDatumWriter(Schema schema, boolean logicalTypesConversionEnabled) {
DatumWriterCacheKey datumWriterCacheKey = new DatumWriterCacheKey(schema, AvroRecordType.GENERIC_RECORD, logicalTypesConversionEnabled);
return datumWriterCache.get(datumWriterCacheKey);
}

Expand Down Expand Up @@ -160,14 +163,16 @@ private static class DatumWriterCacheKey {
private final Schema schema;
@NonNull
private final AvroRecordType avroRecordType;
private final boolean logicalTypesConversionEnabled;
}

private static class DatumWriterCache extends CacheLoader<DatumWriterCacheKey, DatumWriter<Object>> {
@Override
public DatumWriter<Object> load(DatumWriterCacheKey datumWriterCacheKey) {
Schema schema = datumWriterCacheKey.getSchema();
AvroRecordType avroRecordType = datumWriterCacheKey.getAvroRecordType();
return DatumWriterInstance.get(schema, avroRecordType);
boolean logicalTypesConversionEnabled = datumWriterCacheKey.isLogicalTypesConversionEnabled();
return DatumWriterInstance.get(schema, avroRecordType, logicalTypesConversionEnabled);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.amazonaws.services.schemaregistry.serializers.avro;

import com.amazonaws.services.schemaregistry.deserializers.avro.GenericDataWithLogicalTypesConversion;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import org.apache.avro.Schema;
Expand All @@ -8,12 +9,16 @@
import org.apache.avro.specific.SpecificDatumWriter;

public class DatumWriterInstance {
public static DatumWriter<Object> get(Schema schema, AvroRecordType avroRecordType) {
public static DatumWriter<Object> get(Schema schema, AvroRecordType avroRecordType, boolean logicalTypesConversionEnabled) {
switch (avroRecordType) {
case SPECIFIC_RECORD:
return new SpecificDatumWriter<>(schema);
case GENERIC_RECORD:
return new GenericDatumWriter<>(schema);
if (logicalTypesConversionEnabled) {
return new GenericDatumWriter<>(schema, GenericDataWithLogicalTypesConversion.getInstance());
} else {
return new GenericDatumWriter<>(schema);
}
case UNKNOWN:
default:
String message =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.UUID;

import static com.amazonaws.services.schemaregistry.utils.RecordGenerator.AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -87,6 +88,7 @@ public class AvroDeserializerTest {
public void setup() {
this.configs.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, "https://test");
this.configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
this.configs.put(AWSSchemaRegistryConstants.LOGICAL_TYPES_CONVERSION_ENABLED, true);
this.schemaRegistrySerDeConfigs = new GlueSchemaRegistryConfiguration(this.configs);

MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -298,6 +300,28 @@ public void testDeserialize_genericRecord_equalsOriginal(AWSSchemaRegistryConsta
assertEquals(1, avroDeserializer.getDatumReaderCache().size());
}

/**
* Test whether the serialized generic record with logical types can be de-serialized back to the
* generic record instance with conversions.
*/
@ParameterizedTest
@EnumSource(AWSSchemaRegistryConstants.COMPRESSION.class)
public void testDeserialize_genericRecord_with_logicalTypes_equalsOriginal(AWSSchemaRegistryConstants.COMPRESSION compressionType) {
GenericRecord genericRecord = RecordGenerator.createGenericAvroRecordWithLogicalTypes();

ByteBuffer serializedData = createBasicSerializedData(genericRecord, compressionType.name(), DataFormat.AVRO);
org.apache.avro.Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH);
AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.GENERIC_RECORD);

com.amazonaws.services.schemaregistry.common.Schema schemaObject = new com.amazonaws.services.schemaregistry.common.Schema(
schema.toString(), DataFormat.AVRO.name(), "testAvroSchema");

Object deserializedObject = avroDeserializer.deserialize(serializedData, schemaObject);
assertGenericRecord(genericRecord, deserializedObject);
//Assert the instance is getting cached.
assertEquals(1, avroDeserializer.getDatumReaderCache().size());
}

public void assertGenericRecord(GenericRecord genericRecord, Object deserializedObject) {
assertTrue(deserializedObject instanceof GenericRecord);
assertTrue(deserializedObject.equals(genericRecord));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.amazonaws.services.schemaregistry.serializers.avro;

import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.RecordGenerator;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;

import java.util.HashMap;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class AvroSerializerTest {

@Test
public void serialize_WhenSerializeIsCalled_ReturnsCachedInstance() {
AvroSerializer avroSerializer = new AvroSerializer();
GlueSchemaRegistryConfiguration config = new GlueSchemaRegistryConfiguration("eu-west-1");
AvroSerializer avroSerializer = new AvroSerializer(config);

User specificUserRecord = RecordGenerator.createSpecificAvroRecord();
GenericRecord genericUserRecord = RecordGenerator.createGenericUserMapAvroRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
Expand All @@ -42,6 +44,7 @@ public final class RecordGenerator {
public static final String AVRO_USER_ARRAY_STRING_SCHEMA_FILE = "src/test/resources/avro/user_array_String.avsc";
public static final String AVRO_USER_MAP_SCHEMA_FILE = "src/test/resources/avro/user_map.avsc";
public static final String AVRO_USER_MIXED_TYPE_SCHEMA_FILE = "src/test/resources/avro/user3.avsc";
public static final String AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH = "src/test/resources/avro/user4.avsc";
public static final String JSON_PERSON_SCHEMA_FILE_PATH =
"src/test/resources/json/schema/draft07/person.schema.json";
public static final String JSON_PERSON_DATA_FILE_PATH = "src/test/resources/json/person1.json";
Expand Down Expand Up @@ -322,6 +325,21 @@ public static GenericData.Record createGenericMultipleTypesAvroRecord() {
return genericRecordWithAllTypes;
}

/**
* Test Helper method to generate a test GenericRecord with logical types
*
* @return Generic AVRO Record
*/
public static GenericRecord createGenericAvroRecordWithLogicalTypes() {
Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_LOGICAL_TYPES_SCHEMA_FILE_PATH);
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("name", "Sylvestre");
genericRecord.put("dateOfBirth", LocalDate.parse("2021-05-01"));
genericRecord.put("age", new BigDecimal("1.56"));

return genericRecord;
}

/**
* Helper method to create a test user object
*
Expand Down
10 changes: 10 additions & 0 deletions serializer-deserializer/src/test/resources/avro/user4.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"namespace": "com.amazonaws.services.schemaregistry.serializers.avro",
"type": "record",
"name": "User4",
"fields": [
{"name": "name", "type": "string" },
{"name": "dateOfBirth", "type": { "type": "int", "logicalType": "date"} },
{"name": "age", "type": { "type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 2 } }
]
}