Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Json support for java8 datetimes #320

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,26 @@ The recommended way to use the AWS Glue Schema Registry Library for Java is to c

```

### Jackson Support for Java 8 Date Types in JSON

To support Java 8 Dates in JSON add the [Jackson JSR310 Datatype](https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310) dependency to implementing project class path.

For example, in a Maven based project include the latest dependency.

```xml
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.16.1</version>
</dependency>
```

Then add the following configuration property to specify the fully qualified class path to the JavaTimeModule like so.

```java
properties.put(AWSSchemaRegistryConstants.REGISTER_JAVA_TIME_MODULE, "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule");
```

#### Producer for Kafka with PROTOBUF format

```java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.amazonaws.services.schemaregistry.utils.ProtobufMessageType;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.EnumUtils;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class GlueSchemaRegistryConfiguration {
private Map<String, String> metadata;
private String secondaryDeserializer;
private URI proxyUrl;
private String javaTimeModuleClass;
private String objectMapperFactory = "com.amazonaws.services.schemaregistry.utils.json.DefaultObjectMapperFactory";

/**
* Name of the application using the serializer/deserializer.
Expand Down Expand Up @@ -104,6 +107,8 @@ private void buildSchemaRegistryConfigs(Map<String, ?> configs) {
validateAndSetUserAgent(configs);
validateAndSetSecondaryDeserializer(configs);
validateAndSetProxyUrl(configs);
validateAndSetJavaTimeModule(configs);
validateAndSetObjectMapperFactory(configs);
}

private void validateAndSetSecondaryDeserializer(Map<String, ?> configs) {
Expand Down Expand Up @@ -323,6 +328,29 @@ private void validateAndSetJacksonDeserializationFeatures(Map<String, ?> configs
}
}

private void validateAndSetJavaTimeModule(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.REGISTER_JAVA_TIME_MODULE)) {
String moduleClassName = String.valueOf(configs.get(AWSSchemaRegistryConstants.REGISTER_JAVA_TIME_MODULE));
this.javaTimeModuleClass = moduleClassName;
}
}

private void validateAndSetObjectMapperFactory(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.OBJECT_MAPPER_FACTORY)) {
this.objectMapperFactory = String.valueOf(configs.get(AWSSchemaRegistryConstants.OBJECT_MAPPER_FACTORY));
}
}

public SimpleModule loadJavaTimeModule() {
try {
Class<?> moduleClass = Class.forName(this.getJavaTimeModuleClass());
return (SimpleModule) moduleClass.getConstructor().newInstance();
} catch (Exception e) {
String message = String.format("Invalid JavaTimeModule specified: %s", this.javaTimeModuleClass);
throw new AWSSchemaRegistryException(message, e);
}
}

private boolean isPresent(Map<String, ?> configs,
String key) {
if (!GlueSchemaRegistryUtils.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ public final class AWSSchemaRegistryConstants {
*/
public static final String USER_AGENT_APP = "userAgentApp";

/**
* Jackson Java Time Module to use for handling Java 8 Date/Time.
* By default, no module is registered.
* Ex: com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
*/
public static final String REGISTER_JAVA_TIME_MODULE = "registerJavaTimeModule";

/**
* Factory for creating custom Jackson ObjectMapper instances for use in handling JSON.
* Default: com.amazonaws.services.schemaregistry.utils.json.DefaultObjectMapperFactory
*/
public static final String OBJECT_MAPPER_FACTORY = "objectMapperFactory";

/**
* Private constructor to avoid initialization of the class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.CreateSchemaRequest;
import software.amazon.awssdk.services.glue.model.CreateSchemaResponse;
import software.amazon.awssdk.services.glue.model.DataFormat;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetSchemaByDefinitionRequest;
import software.amazon.awssdk.services.glue.model.GetSchemaByDefinitionResponse;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionRequest;
import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.GetTagsRequest;
import software.amazon.awssdk.services.glue.model.GetTagsResponse;
import software.amazon.awssdk.services.glue.model.MetadataKeyValuePair;
import software.amazon.awssdk.services.glue.model.PutSchemaVersionMetadataRequest;
import software.amazon.awssdk.services.glue.model.PutSchemaVersionMetadataResponse;
import software.amazon.awssdk.services.glue.model.QuerySchemaVersionMetadataRequest;
import software.amazon.awssdk.services.glue.model.QuerySchemaVersionMetadataResponse;
import software.amazon.awssdk.services.glue.model.RegisterSchemaVersionRequest;
import software.amazon.awssdk.services.glue.model.RegisterSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.RegistryId;
import software.amazon.awssdk.services.glue.model.SchemaId;
import software.amazon.awssdk.services.glue.model.*;

import java.io.File;
import java.io.IOException;
Expand All @@ -68,10 +50,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
public class AWSSchemaRegistryClientTest {
Expand Down Expand Up @@ -693,6 +672,31 @@ public void testQuerySchemaTags_clientThrowsException_throwsException() throws N
assertEquals(expectedMsg, awsSchemaRegistryException.getMessage());
}

@Test
public void testCreateSchema_existingSchema_throwsAlreadyExistsException() throws NoSuchFieldException, IllegalAccessException {
String testSchemaName = "test-schema";
String testSchemaDefinition = "test-schema-definition";
awsSchemaRegistryClient = configureAWSSchemaRegistryClientWithSerdeConfig(awsSchemaRegistryClient,
glueSchemaRegistryConfiguration);

AWSSchemaRegistryClient awsSchemaRegistryClientSpy = spy(awsSchemaRegistryClient);
AlreadyExistsException ex = AlreadyExistsException.builder().message("Already exists").build();
when(mockGlueClient.createSchema(any(CreateSchemaRequest.class))).thenThrow(ex);

UUID expectedId = UUID.randomUUID();
String expectedDataFormat = DataFormat.AVRO.toString();
Map<String, String> expectedMetadata = getMetadata();
doReturn(expectedId).when(awsSchemaRegistryClientSpy).registerSchemaVersion(
testSchemaDefinition,
testSchemaName,
expectedDataFormat,
expectedMetadata
);

UUID actualId = awsSchemaRegistryClientSpy.createSchema(testSchemaName, expectedDataFormat, testSchemaDefinition, expectedMetadata);
assertEquals(expectedId, actualId);
}

private Map<String, String> getMetadata() {
Map<String, String> metadata = new HashMap<>();
metadata.put("event-source-1", "topic1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -32,11 +34,7 @@
import java.util.Map;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

/**
* Unit tests for testing configuration elements.
Expand Down Expand Up @@ -410,4 +408,55 @@ public void testBuildConfig_invalidProxyUrl_throwsException() {
Exception exception = assertThrows(AWSSchemaRegistryException.class, () -> new GlueSchemaRegistryConfiguration(props));
assertEquals("Proxy URL property is not a valid URL: "+proxy, exception.getMessage());
}

@Test
public void testBuildConfig_javaTimeModuleEnabledMock_succeeds() {
Properties props = createTestProperties();
String moduleClassName = "com.amazonaws.services.schemaregistry.common.configs.TestableSimpleModule";
props.put(AWSSchemaRegistryConstants.REGISTER_JAVA_TIME_MODULE, moduleClassName);
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(props);
assertNotNull(cfg.loadJavaTimeModule());
}

@Test
public void testBuildConfig_javaTimeModuleEnabledWithoutDependency_throwException() {
Properties props = createTestProperties();
String moduleClassName = "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule";
props.put(AWSSchemaRegistryConstants.REGISTER_JAVA_TIME_MODULE, moduleClassName);
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(props);
Exception exception = assertThrows(AWSSchemaRegistryException.class, () -> cfg.loadJavaTimeModule());
String message = String.format("Invalid JavaTimeModule specified: %s", moduleClassName);
assertEquals(message, exception.getMessage());
}

@Test
public void testBuildConfig_specifyObjectMapperFactory_succeeds() {
Properties props = createTestProperties();
String factoryClass = "com.amazonaws.services.schemaregistry.common.configs.TestableSimpleModule"; // could be any string
props.put(AWSSchemaRegistryConstants.OBJECT_MAPPER_FACTORY, factoryClass);
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(props);
assertEquals(factoryClass, cfg.getObjectMapperFactory());
}

@Test
public void testBuildConfig_jacksonSerializationFeatures_succeeds() {
Properties props = createTestProperties();
List<String> features = new ArrayList<>();
features.add(SerializationFeature.INDENT_OUTPUT.toString());
features.add(SerializationFeature.WRAP_EXCEPTIONS.toString());
props.put(AWSSchemaRegistryConstants.JACKSON_SERIALIZATION_FEATURES, features);
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(props);
assertEquals(cfg.getJacksonSerializationFeatures().size(), 2);
}

@Test
public void testBuildConfig_jacksonDeserializationFeatures_succeeds() {
Properties props = createTestProperties();
List<String> features = new ArrayList<>();
features.add(DeserializationFeature.WRAP_EXCEPTIONS.toString());
features.add(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES.toString());
props.put(AWSSchemaRegistryConstants.JACKSON_DESERIALIZATION_FEATURES, features);
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(props);
assertEquals(cfg.getJacksonDeserializationFeatures().size(), 2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.amazonaws.services.schemaregistry.common.configs;

import com.fasterxml.jackson.databind.module.SimpleModule;

public class TestableSimpleModule extends SimpleModule {
}
4 changes: 4 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.amazonaws.services.schemaregistry.integrationtests.kafka;

import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.json.ObjectMapperUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

public class ObjectMapperDateTimeSupportTest {

@Test
void testCreate_useJavaTimeModule_succeeds() {
Map<String, Object> map = new HashMap<>();
map.put(AWSSchemaRegistryConstants.AWS_REGION, "US-West-1");
map.put(AWSSchemaRegistryConstants.REGISTER_JAVA_TIME_MODULE, "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule");
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(map);

ObjectMapper om = ObjectMapperUtils.create(cfg);
assertNotNull(cfg.loadJavaTimeModule());
assertNotNull(om);
}

@Test
void testCreate_noJavaTimeModule_failsSerializingJava8DateTime() throws JsonProcessingException {
Map<String, Object> map = new HashMap<>();
map.put(AWSSchemaRegistryConstants.AWS_REGION, "US-West-1");
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(map);

ObjectMapper om = ObjectMapperUtils.create(cfg);

MyTestObject myObj = new MyTestObject("obj1", LocalDateTime.now());

assertThrows(InvalidDefinitionException.class, () -> om.writeValueAsString(myObj));
}

@Test
void testCreate_withJavaTimeModule_succeedsSerializingAndDeserializingJava8DateTime() throws JsonProcessingException {
Map<String, Object> map = new HashMap<>();
map.put(AWSSchemaRegistryConstants.AWS_REGION, "US-West-1");
map.put(AWSSchemaRegistryConstants.REGISTER_JAVA_TIME_MODULE, "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule");
GlueSchemaRegistryConfiguration cfg = new GlueSchemaRegistryConfiguration(map);

ObjectMapper om = ObjectMapperUtils.create(cfg);

MyTestObject expectedObj = new MyTestObject("obj1", LocalDateTime.now());
String jsonStr = om.writeValueAsString(expectedObj);

MyTestObject actualObj = om.readValue(jsonStr, MyTestObject.class);
assertNotNull(actualObj);
assertEquals(expectedObj.getCreated(), actualObj.getCreated());
}

@Data
@NoArgsConstructor
@AllArgsConstructor
static class MyTestObject {
String name;
LocalDateTime created;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ private String produceRecordsWithKPL(String streamName,
byte[] serializedBytes = dataFormatSerializer.serialize(record);

putFutures.add(producer.addUserRecord(streamName, Long.toString(timestamp.toEpochMilli()), null,
ByteBuffer.wrap(serializedBytes), gsrSchema));
ByteBuffer.wrap(serializedBytes), null, gsrSchema));
}

String shardId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
public class FromConnectTest {
private static final JsonNodeFactory JSON_NODE_FACTORY = TypeConverter.JSON_NODE_FACTORY;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonValidator JSON_VALIDATOR = new JsonValidator();
private static final JsonValidator JSON_VALIDATOR = new JsonValidator(OBJECT_MAPPER);

private ConnectSchemaToJsonSchemaConverter connectSchemaToJsonSchemaConverter;
private ConnectValueToJsonNodeConverter connectValueToJsonNodeConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import com.amazonaws.services.schemaregistry.common.Schema;
import com.amazonaws.services.schemaregistry.utils.json.ObjectMapperUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -56,19 +55,7 @@ public class JsonDeserializer implements GlueSchemaRegistryDataFormatDeserialize
@Builder
public JsonDeserializer(GlueSchemaRegistryConfiguration configs) {
this.schemaRegistrySerDeConfigs = configs;
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper = new ObjectMapper();
this.objectMapper.setNodeFactory(jsonNodeFactory);
if (configs != null) {
if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) {
configs.getJacksonSerializationFeatures()
.forEach(this.objectMapper::enable);
}
if (!CollectionUtils.isEmpty(configs.getJacksonDeserializationFeatures())) {
configs.getJacksonDeserializationFeatures()
.forEach(this.objectMapper::enable);
}
}
this.objectMapper = ObjectMapperUtils.create(configs);
}

/**
Expand Down
Loading
Loading