Skip to content

Commit

Permalink
Merge branch '7.1.x' into 7.2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Mar 13, 2023
2 parents 8b41a6e + ba94729 commit 95313fe
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ public void validate() {
}

public void validate(Object value) throws JsonProcessingException, ValidationException {
validate(rawSchema(), value);
}

public static void validate(Schema schema, Object value)
throws JsonProcessingException, ValidationException {
Object primitiveValue = NONE_MARKER;
if (isPrimitive(value)) {
primitiveValue = value;
Expand All @@ -297,7 +302,7 @@ public void validate(Object value) throws JsonProcessingException, ValidationExc
primitiveValue = ((TextNode) value).asText();
}
if (primitiveValue != NONE_MARKER) {
rawSchema().validate(primitiveValue);
schema.validate(primitiveValue);
} else {
Object jsonObject;
if (value instanceof ArrayNode) {
Expand All @@ -309,7 +314,7 @@ public void validate(Object value) throws JsonProcessingException, ValidationExc
} else {
jsonObject = objectMapper.convertValue(value, JSONObject.class);
}
rawSchema().validate(jsonObject);
schema.validate(jsonObject);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.everit.json.schema.CombinedSchema;
import org.everit.json.schema.ReferenceSchema;
import org.everit.json.schema.Schema;
import org.everit.json.schema.ValidationException;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -125,8 +128,6 @@ protected Object deserialize(
int length = buffer.limit() - 1 - idSize;
int start = buffer.position() + buffer.arrayOffset();

String typeName = schema.getString(typeProperty);

JsonNode jsonNode = null;
if (validate) {
try {
Expand All @@ -145,19 +146,30 @@ protected Object deserialize(
value = jsonNode != null
? objectMapper.convertValue(jsonNode, type)
: objectMapper.readValue(buffer.array(), start, length, type);
} else if (typeName != null) {
value = jsonNode != null
? deriveType(jsonNode, typeName)
: deriveType(buffer, length, start, typeName);
} else if (Object.class.equals(type)) {
value = jsonNode != null
? objectMapper.convertValue(jsonNode, type)
: objectMapper.readValue(buffer.array(), start, length, type);
} else {
// Return JsonNode if type is null
value = jsonNode != null
? jsonNode
: objectMapper.readTree(new ByteArrayInputStream(buffer.array(), start, length));
String typeName;
if (schema.rawSchema() instanceof CombinedSchema) {
if (jsonNode == null) {
jsonNode = objectMapper.readValue(buffer.array(), start, length, JsonNode.class);
}
typeName = getTypeName(schema.rawSchema(), jsonNode);
} else {
typeName = schema.getString(typeProperty);
}
if (typeName != null) {
value = jsonNode != null
? deriveType(jsonNode, typeName)
: deriveType(buffer, length, start, typeName);
} else if (Object.class.equals(type)) {
value = jsonNode != null
? objectMapper.convertValue(jsonNode, type)
: objectMapper.readValue(buffer.array(), start, length, type);
} else {
// Return JsonNode if type is null
value = jsonNode != null
? jsonNode
: objectMapper.readTree(new ByteArrayInputStream(buffer.array(), start, length));
}
}

if (includeSchemaAndVersion) {
Expand All @@ -183,6 +195,26 @@ protected Object deserialize(
}
}

private String getTypeName(Schema schema, JsonNode jsonNode) {
if (schema instanceof CombinedSchema) {
for (Schema subschema : ((CombinedSchema) schema).getSubschemas()) {
boolean valid = false;
try {
JsonSchema.validate(subschema, jsonNode);
valid = true;
} catch (Exception e) {
// noop
}
if (valid) {
return getTypeName(subschema, jsonNode);
}
}
} else if (schema instanceof ReferenceSchema) {
return getTypeName(((ReferenceSchema)schema).getReferredSchema(), jsonNode);
}
return (String) schema.getUnprocessedProperties().get(typeProperty);
}

private Object deriveType(
ByteBuffer buffer, int length, int start, String typeName
) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@
package io.confluent.kafka.serializers.json;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaString;
import io.confluent.kafka.schemaregistry.annotations.SchemaReference;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.json.JsonSchemaUtils;
import java.time.LocalDate;

import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.junit.Test;
Expand All @@ -43,6 +50,7 @@ public class KafkaJsonSchemaSerializerTest {
private final Properties config;
private final SchemaRegistryClient schemaRegistry;
private KafkaJsonSchemaSerializer<Object> serializer;
private KafkaJsonSchemaSerializer<Object> latestSerializer;
private KafkaJsonSchemaDeserializer<Object> deserializer;
private final String topic;

Expand All @@ -52,9 +60,16 @@ public KafkaJsonSchemaSerializerTest() {
config.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
config.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, true);
config.put(KafkaJsonSchemaSerializerConfig.WRITE_DATES_AS_ISO8601, true);
schemaRegistry = new MockSchemaRegistryClient();
schemaRegistry = new MockSchemaRegistryClient(
Collections.singletonList(new JsonSchemaProvider()));
serializer = new KafkaJsonSchemaSerializer<>(schemaRegistry, new HashMap(config));
deserializer = getDeserializer(Object.class);
Properties latestConfig = new Properties(config);
latestConfig.put(KafkaJsonSchemaSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
latestConfig.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
latestConfig.put(KafkaJsonSchemaSerializerConfig.USE_LATEST_VERSION, true);
latestConfig.put(KafkaJsonSchemaSerializerConfig.LATEST_COMPATIBILITY_STRICT, false);
latestSerializer = new KafkaJsonSchemaSerializer<>(schemaRegistry, new HashMap(latestConfig));
topic = "test";
}

Expand Down Expand Up @@ -146,6 +161,84 @@ public void serializeInvalidUser() throws Exception {
assertEquals(user, deserialized);
}

@Test
public void serializeUserRef() throws Exception {
String schema = "{\n"
+ " \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n"
+ " \"title\": \"Schema references\",\n"
+ " \"description\": \"List of schema references for multiple types in a single topic\",\n"
+ " \"oneOf\": [\n"
+ " { \"$ref\": \"customer.json\"},\n"
+ " { \"$ref\": \"user.json\"}\n"
+ " ]\n"
+ "}";

Customer customer = new Customer("acme", null);
User user = new User("john", "doe", (short) 50, "jack", null);
JsonSchema userSchema = JsonSchemaUtils.getSchema(user);
JsonSchema customerSchema = JsonSchemaUtils.getSchema(customer);
schemaRegistry.register("user", userSchema);
schemaRegistry.register("customer", customerSchema);
List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference> refs =
ImmutableList.of(
new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(
"user.json", "user", 1),
new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(
"customer.json", "customer", 1));
Map<String, String> resolvedRefs = ImmutableMap.of(
"user.json", userSchema.canonicalString(),
"customer.json", customerSchema.canonicalString());
JsonSchema jsonSchema = new JsonSchema(schema, refs, resolvedRefs, null);
schemaRegistry.register(topic + "-value", jsonSchema);

byte[] bytes = latestSerializer.serialize(topic, user);

// Test for javaType property
Object deserialized = getDeserializer(null).deserialize(topic, bytes);
assertEquals(user, deserialized);

bytes = latestSerializer.serialize(topic, customer);

// Test for javaType property
deserialized = getDeserializer(null).deserialize(topic, bytes);
assertEquals(customer, deserialized);
}

// Generate javaType property
@JsonSchemaInject(strings = {@JsonSchemaString(path="javaType",
value="io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerTest$Customer")})
public static class Customer {
@JsonProperty
public String customerName;
@JsonProperty
public LocalDate acquireDate;

public Customer() {}

public Customer(String customerName, LocalDate acquireDate) {
this.customerName = customerName;
this.acquireDate = acquireDate;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Customer customer = (Customer) o;
return Objects.equals(customerName, customer.customerName)
&& Objects.equals(acquireDate, customer.acquireDate);
}

@Override
public int hashCode() {
return Objects.hash(customerName, acquireDate);
}
}

// Generate javaType property
@JsonSchemaInject(strings = {@JsonSchemaString(path="javaType",
value="io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerTest$User")})
Expand Down

0 comments on commit 95313fe

Please sign in to comment.