Skip to content

Commit

Permalink
feat(topicdata) support JSONSchema while producing message (#919)
Browse files Browse the repository at this point in the history
  • Loading branch information
txs4444 committed Dec 6, 2021
1 parent f73ddb2 commit a198839
Show file tree
Hide file tree
Showing 18 changed files with 406 additions and 82 deletions.
13 changes: 11 additions & 2 deletions src/main/java/org/akhq/configs/SchemaRegistryType.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package org.akhq.configs;

import lombok.Getter;

@Getter
public enum SchemaRegistryType {
CONFLUENT,
TIBCO
CONFLUENT((byte) 0x0),
TIBCO((byte) 0x80);

private byte magicByte;

SchemaRegistryType(byte magicByte) {
this.magicByte = magicByte;
}
}
4 changes: 3 additions & 1 deletion src/main/java/org/akhq/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ public Schema redirectId(
String cluster,
Integer id
) throws IOException, RestClientException, ExecutionException, InterruptedException {
return this.schemaRepository.getById(cluster, id);
return this.schemaRepository
.getById(cluster, id)
.orElse(null);
}

@Get("api/{cluster}/schema/{subject}/version")
Expand Down
6 changes: 1 addition & 5 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ public class Record {
private byte MAGIC_BYTE;

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map<String, String> headers, Topic topic) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
this.MAGIC_BYTE = 0x0;
}
this.MAGIC_BYTE = schemaRegistryType.getMagicByte();
this.topic = topic;
this.partition = record.partition();
this.offset = record.offset();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.akhq.modules;
package org.akhq.modules.schemaregistry;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.apache.avro.Conversions;
Expand All @@ -18,16 +18,17 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.*;

import javax.inject.Singleton;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import javax.inject.Singleton;

@Singleton
@Slf4j
public class AvroSerializer {
public class AvroSerializer implements SchemaSerializer {
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
Expand All @@ -37,31 +38,32 @@ public class AvroSerializer {
.setTimeZone(TimeZone.getDefault());
private static final TypeReference<Map<String, Object>> TYPE_REFERENCE = new TypeReference<>() {};

private final int MAGIC_BYTE;

public static final int SCHEMA_ID_SIZE = 4;
private final int schemaId;
private final AvroSchema avroSchema;
private final SchemaRegistryType schemaRegistryType;

private final SchemaRegistryClient registryClient;

public AvroSerializer(SchemaRegistryClient registryClient, SchemaRegistryType schemaRegistryType) {
this.registryClient = registryClient;
public static boolean supports(ParsedSchema parsedSchema) {
return Objects.equals(AvroSchema.TYPE, parsedSchema.schemaType());
}

if (schemaRegistryType == SchemaRegistryType.TIBCO) {
MAGIC_BYTE = (byte) 0x80;
public static AvroSerializer newInstance(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) {
if (supports(parsedSchema)) {
return new AvroSerializer(schemaId, (AvroSchema) parsedSchema, schemaRegistryType);
} else {
MAGIC_BYTE = 0x0;
String errorMsg = String.format("Schema %s has not supported schema type expected %s but found %s", parsedSchema.name(), AvroSchema.TYPE, parsedSchema.schemaType());
throw new IllegalArgumentException(errorMsg);
}
}

public byte[] toAvro(String json, int schemaId) {
byte[] asBytes;
@Override
public byte[] serialize(String json) {
try {
Schema schema = this.registryClient.getById(schemaId);
asBytes = this.fromJsonToAvro(json.trim(), schema, schemaId);
} catch (IOException | RestClientException e) {
throw new RuntimeException(String.format("Can't retrieve schema %d in registry", schemaId), e);
return this.fromJsonToAvro(json.trim(), avroSchema.rawSchema(), schemaId);
} catch (IOException e) {
log.error("Cannot serialize value", e);
throw new RuntimeException("Cannot serialize value", e);
}
return asBytes;
}

private byte[] fromJsonToAvro(String json, Schema schema, int schemaId) throws IOException {
Expand Down Expand Up @@ -89,7 +91,7 @@ private byte[] fromJsonToAvro(String json, Schema schema, int schemaId) throws I

GenericDatumWriter<Object> w = new GenericDatumWriter<>(schema, genericData);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(MAGIC_BYTE);
outputStream.write(schemaRegistryType.getMagicByte());
outputStream.write(ByteBuffer.allocate(SCHEMA_ID_SIZE).putInt(schemaId).array());

Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);
Expand All @@ -99,4 +101,10 @@ private byte[] fromJsonToAvro(String json, Schema schema, int schemaId) throws I

return outputStream.toByteArray();
}

private AvroSerializer(int schemaId, AvroSchema avroSchema, SchemaRegistryType schemaRegistryType) {
this.schemaId = schemaId;
this.avroSchema = avroSchema;
this.schemaRegistryType = schemaRegistryType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.akhq.modules.schemaregistry;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.everit.json.schema.ValidationException;
import org.json.JSONObject;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

@Slf4j
public class JsonSchemaSerializer extends AbstractKafkaJsonSchemaSerializer<String> implements SchemaSerializer {
private final int schemaId;
private final JsonSchema jsonSchema;
private final SchemaRegistryType schemaRegistryType;

public static JsonSchemaSerializer newInstance(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) {
if (supports(parsedSchema)) {
return new JsonSchemaSerializer(schemaId, (JsonSchema) parsedSchema, schemaRegistryType);
}
String errorMsg = String.format("Schema %s has not supported schema type expected %s but found %s", parsedSchema.name(), JsonSchema.TYPE, parsedSchema.schemaType());
throw new IllegalArgumentException(errorMsg);
}

@Override
public byte[] serialize(String json) {
try {
JSONObject jsonObject = new JSONObject(json);
jsonSchema.validate(jsonObject);
} catch (JsonProcessingException e) {
String errorMsg = String.format("Provided json [%s] is not valid according to schema", json);
log.error(errorMsg);
throw new RuntimeException(errorMsg, e);
} catch (ValidationException e) {
String validationErrorMsg = String.format(
"Provided json message is not valid according to jsonSchema (id=%d): %s",
schemaId,
e.getMessage()
);
throw new IllegalArgumentException(validationErrorMsg);
}
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
out.write(schemaRegistryType.getMagicByte());
out.write(ByteBuffer.allocate(idSize).putInt(schemaId).array());
out.write(json.getBytes(StandardCharsets.UTF_8));
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException e) {
throw new RuntimeException(String.format("Could not serialize json [%s]", json), e);
}
}

public static boolean supports(ParsedSchema parsedSchema) {
return Objects.equals(JsonSchema.TYPE, parsedSchema.schemaType());
}

private JsonSchemaSerializer(int schemaId, JsonSchema jsonSchema, SchemaRegistryType schemaRegistryType) {
this.schemaId = schemaId;
this.jsonSchema = jsonSchema;
this.schemaRegistryType = schemaRegistryType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.akhq.modules.schemaregistry;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.modules.KafkaModule;

import javax.inject.Singleton;
import java.io.IOException;

@Singleton
@RequiredArgsConstructor
@Slf4j
public class RecordWithSchemaSerializerFactory {
private final KafkaModule kafkaModule;

public SchemaSerializer createSerializer(String clusterId, int schemaId) {
ParsedSchema parsedSchema = retrieveSchema(clusterId, schemaId);
SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId);
return createSerializer(schemaId, parsedSchema, schemaRegistryType);
}

public SchemaSerializer createSerializer(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) {
if (JsonSchemaSerializer.supports(parsedSchema)) {
return JsonSchemaSerializer.newInstance(schemaId, parsedSchema, schemaRegistryType);
} if (AvroSerializer.supports(parsedSchema)) {
return AvroSerializer.newInstance(schemaId, parsedSchema, schemaRegistryType);
} else {
String errorMsg = String.format("Schema with id %d has unsupported schema type %s", schemaId, parsedSchema.schemaType());
throw new IllegalStateException(errorMsg);
}
}

private ParsedSchema retrieveSchema(String clusterId, int schemaId) {
SchemaRegistryClient registryClient = kafkaModule.getRegistryClient(clusterId);
try {
return registryClient.getSchemaById(schemaId);
} catch (IOException|RestClientException e) {
String errorMsg = String.format("Can't retrieve schema %d in registry", schemaId);
log.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
}

private SchemaRegistryType getSchemaRegistryType(String clusterId) {
SchemaRegistryType schemaRegistryType = SchemaRegistryType.CONFLUENT;
Connection.SchemaRegistry schemaRegistry = this.kafkaModule.getConnection(clusterId).getSchemaRegistry();
if (schemaRegistry != null) {
schemaRegistryType = schemaRegistry.getType();
}
return schemaRegistryType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.akhq.modules.schemaregistry;

public interface SchemaSerializer {

byte[] serialize(String value);

}
13 changes: 9 additions & 4 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import org.akhq.models.Partition;
import org.akhq.models.Record;
import org.akhq.models.Topic;
import org.akhq.modules.AvroSerializer;
import org.akhq.modules.KafkaModule;
import org.akhq.modules.schemaregistry.SchemaSerializer;
import org.akhq.modules.schemaregistry.RecordWithSchemaSerializerFactory;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.Debug;
import org.apache.kafka.clients.admin.DeletedRecords;
Expand Down Expand Up @@ -60,6 +61,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private SchemaRegistryRepository schemaRegistryRepository;

@Inject
private RecordWithSchemaSerializerFactory serializerFactory;

@Inject
private CustomDeserializerRepository customDeserializerRepository;

Expand Down Expand Up @@ -588,13 +592,13 @@ public RecordMetadata produce(
Optional<Integer> keySchemaId,
Optional<Integer> valueSchemaId
) throws ExecutionException, InterruptedException {
AvroSerializer avroSerializer = this.schemaRegistryRepository.getAvroSerializer(clusterId);
byte[] keyAsBytes = null;
byte[] valueAsBytes;

if (key.isPresent()) {
if (keySchemaId.isPresent()) {
keyAsBytes = avroSerializer.toAvro(key.get(), keySchemaId.get());
SchemaSerializer keySerializer = serializerFactory.createSerializer(clusterId, keySchemaId.get());
keyAsBytes = keySerializer.serialize(key.get());
} else {
keyAsBytes = key.get().getBytes();
}
Expand All @@ -609,7 +613,8 @@ public RecordMetadata produce(
}

if (value != null && valueSchemaId.isPresent()) {
valueAsBytes = avroSerializer.toAvro(value, valueSchemaId.get());
SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, valueSchemaId.get());
valueAsBytes = valueSerializer.serialize(value);
} else {
valueAsBytes = value != null ? value.getBytes() : null;
}
Expand Down
16 changes: 3 additions & 13 deletions src/main/java/org/akhq/repositories/SchemaRegistryRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.akhq.configs.Connection;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.models.Schema;
import org.akhq.modules.AvroSerializer;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;
Expand All @@ -38,7 +37,6 @@ public class SchemaRegistryRepository extends AbstractRepository {
private final Map<String, Deserializer> kafkaAvroDeserializers = new HashMap<>();
private final Map<String, Deserializer> kafkaJsonDeserializers = new HashMap<>();
private final Map<String, Deserializer> kafkaProtoDeserializers = new HashMap<>();
private AvroSerializer avroSerializer;

public PagedList<Schema> list(String clusterId, Pagination pagination, Optional<String> search) throws IOException, RestClientException, ExecutionException, InterruptedException {
return PagedList.of(all(clusterId, search), pagination, list -> this.toSchemasLatestVersion(list, clusterId));
Expand Down Expand Up @@ -111,16 +109,16 @@ public boolean exist(String clusterId, String subject) throws IOException, RestC
return found;
}

public Schema getById(String clusterId, Integer id) throws IOException, RestClientException, ExecutionException, InterruptedException {
public Optional<Schema> getById(String clusterId, Integer id) throws IOException, RestClientException, ExecutionException, InterruptedException {
for (String subject: this.all(clusterId, Optional.empty())) {
for (Schema version: this.getAllVersions(clusterId, subject)) {
if (version.getId().equals(id)) {
return version;
return Optional.of(version);
}
}
}

return null;
return Optional.empty();
}

public Schema getLatestVersion(String clusterId, String subject) throws IOException, RestClientException {
Expand Down Expand Up @@ -303,14 +301,6 @@ public Deserializer getKafkaProtoDeserializer(String clusterId) {
return this.kafkaProtoDeserializers.get(clusterId);
}

public AvroSerializer getAvroSerializer(String clusterId) {
if(this.avroSerializer == null){
this.avroSerializer = new AvroSerializer(this.kafkaModule.getRegistryClient(clusterId),
getSchemaRegistryType(clusterId));
}
return this.avroSerializer;
}

public SchemaRegistryType getSchemaRegistryType(String clusterId) {
SchemaRegistryType schemaRegistryType = SchemaRegistryType.CONFLUENT;
Connection.SchemaRegistry schemaRegistry = this.kafkaModule.getConnection(clusterId).getSchemaRegistry();
Expand Down
Loading

0 comments on commit a198839

Please sign in to comment.