diff --git a/src/main/java/org/akhq/configs/SchemaRegistryType.java b/src/main/java/org/akhq/configs/SchemaRegistryType.java index 1dc9c44f5..3410c912a 100644 --- a/src/main/java/org/akhq/configs/SchemaRegistryType.java +++ b/src/main/java/org/akhq/configs/SchemaRegistryType.java @@ -1,6 +1,15 @@ package org.akhq.configs; +import lombok.Getter; + +@Getter public enum SchemaRegistryType { - CONFLUENT, - TIBCO + CONFLUENT((byte) 0x0), + TIBCO((byte) 0x80); + + private byte magicByte; + + SchemaRegistryType(byte magicByte) { + this.magicByte = magicByte; + } } diff --git a/src/main/java/org/akhq/controllers/SchemaController.java b/src/main/java/org/akhq/controllers/SchemaController.java index 100fba7ba..b2b791746 100644 --- a/src/main/java/org/akhq/controllers/SchemaController.java +++ b/src/main/java/org/akhq/controllers/SchemaController.java @@ -128,7 +128,9 @@ public Schema redirectId( String cluster, Integer id ) throws IOException, RestClientException, ExecutionException, InterruptedException { - return this.schemaRepository.getById(cluster, id); + return this.schemaRepository + .getById(cluster, id) + .orElse(null); } @Get("api/{cluster}/schema/{subject}/version") diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index 57de8babf..06018b3d4 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -78,11 +78,7 @@ public class Record { private byte MAGIC_BYTE; public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map headers, Topic topic) { - if (schemaRegistryType == SchemaRegistryType.TIBCO) { - this.MAGIC_BYTE = (byte) 0x80; - } else { - this.MAGIC_BYTE = 0x0; - } + this.MAGIC_BYTE = schemaRegistryType.getMagicByte(); this.topic = topic; this.partition = record.partition(); this.offset = record.offset(); diff --git a/src/main/java/org/akhq/modules/AvroSerializer.java b/src/main/java/org/akhq/modules/schemaregistry/AvroSerializer.java similarity index 67% rename from src/main/java/org/akhq/modules/AvroSerializer.java rename to src/main/java/org/akhq/modules/schemaregistry/AvroSerializer.java index ed3e1b5cc..c2f0ed66b 100644 --- a/src/main/java/org/akhq/modules/AvroSerializer.java +++ b/src/main/java/org/akhq/modules/schemaregistry/AvroSerializer.java @@ -1,4 +1,4 @@ -package org.akhq.modules; +package org.akhq.modules.schemaregistry; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.type.TypeReference; @@ -6,8 +6,8 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import lombok.extern.slf4j.Slf4j; import org.akhq.configs.SchemaRegistryType; import org.apache.avro.Conversions; @@ -18,16 +18,17 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.*; +import javax.inject.Singleton; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Objects; import java.util.TimeZone; -import javax.inject.Singleton; @Singleton @Slf4j -public class AvroSerializer { +public class AvroSerializer implements SchemaSerializer { private static final ObjectMapper MAPPER = new ObjectMapper() .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) .setSerializationInclusion(JsonInclude.Include.NON_NULL) @@ -37,31 +38,32 @@ public class AvroSerializer { .setTimeZone(TimeZone.getDefault()); private static final TypeReference> TYPE_REFERENCE = new TypeReference<>() {}; - private final int MAGIC_BYTE; - public static final int SCHEMA_ID_SIZE = 4; + private final int schemaId; + private final AvroSchema avroSchema; + private final SchemaRegistryType schemaRegistryType; - private final SchemaRegistryClient registryClient; - - public AvroSerializer(SchemaRegistryClient registryClient, SchemaRegistryType schemaRegistryType) { - this.registryClient = registryClient; + public static boolean supports(ParsedSchema parsedSchema) { + return Objects.equals(AvroSchema.TYPE, parsedSchema.schemaType()); + } - if (schemaRegistryType == SchemaRegistryType.TIBCO) { - MAGIC_BYTE = (byte) 0x80; + public static AvroSerializer newInstance(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) { + if (supports(parsedSchema)) { + return new AvroSerializer(schemaId, (AvroSchema) parsedSchema, schemaRegistryType); } else { - MAGIC_BYTE = 0x0; + String errorMsg = String.format("Schema %s has not supported schema type expected %s but found %s", parsedSchema.name(), AvroSchema.TYPE, parsedSchema.schemaType()); + throw new IllegalArgumentException(errorMsg); } } - public byte[] toAvro(String json, int schemaId) { - byte[] asBytes; + @Override + public byte[] serialize(String json) { try { - Schema schema = this.registryClient.getById(schemaId); - asBytes = this.fromJsonToAvro(json.trim(), schema, schemaId); - } catch (IOException | RestClientException e) { - throw new RuntimeException(String.format("Can't retrieve schema %d in registry", schemaId), e); + return this.fromJsonToAvro(json.trim(), avroSchema.rawSchema(), schemaId); + } catch (IOException e) { + log.error("Cannot serialize value", e); + throw new RuntimeException("Cannot serialize value", e); } - return asBytes; } private byte[] fromJsonToAvro(String json, Schema schema, int schemaId) throws IOException { @@ -89,7 +91,7 @@ private byte[] fromJsonToAvro(String json, Schema schema, int schemaId) throws I GenericDatumWriter w = new GenericDatumWriter<>(schema, genericData); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - outputStream.write(MAGIC_BYTE); + outputStream.write(schemaRegistryType.getMagicByte()); outputStream.write(ByteBuffer.allocate(SCHEMA_ID_SIZE).putInt(schemaId).array()); Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null); @@ -99,4 +101,10 @@ private byte[] fromJsonToAvro(String json, Schema schema, int schemaId) throws I return outputStream.toByteArray(); } + + private AvroSerializer(int schemaId, AvroSchema avroSchema, SchemaRegistryType schemaRegistryType) { + this.schemaId = schemaId; + this.avroSchema = avroSchema; + this.schemaRegistryType = schemaRegistryType; + } } diff --git a/src/main/java/org/akhq/modules/schemaregistry/JsonSchemaSerializer.java b/src/main/java/org/akhq/modules/schemaregistry/JsonSchemaSerializer.java new file mode 100644 index 000000000..deaabb934 --- /dev/null +++ b/src/main/java/org/akhq/modules/schemaregistry/JsonSchemaSerializer.java @@ -0,0 +1,70 @@ +package org.akhq.modules.schemaregistry; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer; +import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.SchemaRegistryType; +import org.everit.json.schema.ValidationException; +import org.json.JSONObject; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +@Slf4j +public class JsonSchemaSerializer extends AbstractKafkaJsonSchemaSerializer implements SchemaSerializer { + private final int schemaId; + private final JsonSchema jsonSchema; + private final SchemaRegistryType schemaRegistryType; + + public static JsonSchemaSerializer newInstance(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) { + if (supports(parsedSchema)) { + return new JsonSchemaSerializer(schemaId, (JsonSchema) parsedSchema, schemaRegistryType); + } + String errorMsg = String.format("Schema %s has not supported schema type expected %s but found %s", parsedSchema.name(), JsonSchema.TYPE, parsedSchema.schemaType()); + throw new IllegalArgumentException(errorMsg); + } + + @Override + public byte[] serialize(String json) { + try { + JSONObject jsonObject = new JSONObject(json); + jsonSchema.validate(jsonObject); + } catch (JsonProcessingException e) { + String errorMsg = String.format("Provided json [%s] is not valid according to schema", json); + log.error(errorMsg); + throw new RuntimeException(errorMsg, e); + } catch (ValidationException e) { + String validationErrorMsg = String.format( + "Provided json message is not valid according to jsonSchema (id=%d): %s", + schemaId, + e.getMessage() + ); + throw new IllegalArgumentException(validationErrorMsg); + } + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + out.write(schemaRegistryType.getMagicByte()); + out.write(ByteBuffer.allocate(idSize).putInt(schemaId).array()); + out.write(json.getBytes(StandardCharsets.UTF_8)); + byte[] bytes = out.toByteArray(); + out.close(); + return bytes; + } catch (IOException e) { + throw new RuntimeException(String.format("Could not serialize json [%s]", json), e); + } + } + + public static boolean supports(ParsedSchema parsedSchema) { + return Objects.equals(JsonSchema.TYPE, parsedSchema.schemaType()); + } + + private JsonSchemaSerializer(int schemaId, JsonSchema jsonSchema, SchemaRegistryType schemaRegistryType) { + this.schemaId = schemaId; + this.jsonSchema = jsonSchema; + this.schemaRegistryType = schemaRegistryType; + } +} diff --git a/src/main/java/org/akhq/modules/schemaregistry/RecordWithSchemaSerializerFactory.java b/src/main/java/org/akhq/modules/schemaregistry/RecordWithSchemaSerializerFactory.java new file mode 100644 index 000000000..f49e82091 --- /dev/null +++ b/src/main/java/org/akhq/modules/schemaregistry/RecordWithSchemaSerializerFactory.java @@ -0,0 +1,57 @@ +package org.akhq.modules.schemaregistry; + +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.Connection; +import org.akhq.configs.SchemaRegistryType; +import org.akhq.modules.KafkaModule; + +import javax.inject.Singleton; +import java.io.IOException; + +@Singleton +@RequiredArgsConstructor +@Slf4j +public class RecordWithSchemaSerializerFactory { + private final KafkaModule kafkaModule; + + public SchemaSerializer createSerializer(String clusterId, int schemaId) { + ParsedSchema parsedSchema = retrieveSchema(clusterId, schemaId); + SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId); + return createSerializer(schemaId, parsedSchema, schemaRegistryType); + } + + public SchemaSerializer createSerializer(int schemaId, ParsedSchema parsedSchema, SchemaRegistryType schemaRegistryType) { + if (JsonSchemaSerializer.supports(parsedSchema)) { + return JsonSchemaSerializer.newInstance(schemaId, parsedSchema, schemaRegistryType); + } if (AvroSerializer.supports(parsedSchema)) { + return AvroSerializer.newInstance(schemaId, parsedSchema, schemaRegistryType); + } else { + String errorMsg = String.format("Schema with id %d has unsupported schema type %s", schemaId, parsedSchema.schemaType()); + throw new IllegalStateException(errorMsg); + } + } + + private ParsedSchema retrieveSchema(String clusterId, int schemaId) { + SchemaRegistryClient registryClient = kafkaModule.getRegistryClient(clusterId); + try { + return registryClient.getSchemaById(schemaId); + } catch (IOException|RestClientException e) { + String errorMsg = String.format("Can't retrieve schema %d in registry", schemaId); + log.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + } + + private SchemaRegistryType getSchemaRegistryType(String clusterId) { + SchemaRegistryType schemaRegistryType = SchemaRegistryType.CONFLUENT; + Connection.SchemaRegistry schemaRegistry = this.kafkaModule.getConnection(clusterId).getSchemaRegistry(); + if (schemaRegistry != null) { + schemaRegistryType = schemaRegistry.getType(); + } + return schemaRegistryType; + } +} diff --git a/src/main/java/org/akhq/modules/schemaregistry/SchemaSerializer.java b/src/main/java/org/akhq/modules/schemaregistry/SchemaSerializer.java new file mode 100644 index 000000000..ed72a5094 --- /dev/null +++ b/src/main/java/org/akhq/modules/schemaregistry/SchemaSerializer.java @@ -0,0 +1,7 @@ +package org.akhq.modules.schemaregistry; + +public interface SchemaSerializer { + + byte[] serialize(String value); + +} diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 4de6c1a16..36cc3be32 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -17,8 +17,9 @@ import org.akhq.models.Partition; import org.akhq.models.Record; import org.akhq.models.Topic; -import org.akhq.modules.AvroSerializer; import org.akhq.modules.KafkaModule; +import org.akhq.modules.schemaregistry.SchemaSerializer; +import org.akhq.modules.schemaregistry.RecordWithSchemaSerializerFactory; import org.akhq.utils.AvroToJsonSerializer; import org.akhq.utils.Debug; import org.apache.kafka.clients.admin.DeletedRecords; @@ -60,6 +61,9 @@ public class RecordRepository extends AbstractRepository { @Inject private SchemaRegistryRepository schemaRegistryRepository; + @Inject + private RecordWithSchemaSerializerFactory serializerFactory; + @Inject private CustomDeserializerRepository customDeserializerRepository; @@ -588,13 +592,13 @@ public RecordMetadata produce( Optional keySchemaId, Optional valueSchemaId ) throws ExecutionException, InterruptedException { - AvroSerializer avroSerializer = this.schemaRegistryRepository.getAvroSerializer(clusterId); byte[] keyAsBytes = null; byte[] valueAsBytes; if (key.isPresent()) { if (keySchemaId.isPresent()) { - keyAsBytes = avroSerializer.toAvro(key.get(), keySchemaId.get()); + SchemaSerializer keySerializer = serializerFactory.createSerializer(clusterId, keySchemaId.get()); + keyAsBytes = keySerializer.serialize(key.get()); } else { keyAsBytes = key.get().getBytes(); } @@ -609,7 +613,8 @@ public RecordMetadata produce( } if (value != null && valueSchemaId.isPresent()) { - valueAsBytes = avroSerializer.toAvro(value, valueSchemaId.get()); + SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, valueSchemaId.get()); + valueAsBytes = valueSerializer.serialize(value); } else { valueAsBytes = value != null ? value.getBytes() : null; } diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index ade8f711f..790f9ba76 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -15,7 +15,6 @@ import org.akhq.configs.Connection; import org.akhq.configs.SchemaRegistryType; import org.akhq.models.Schema; -import org.akhq.modules.AvroSerializer; import org.akhq.modules.KafkaModule; import org.akhq.utils.PagedList; import org.akhq.utils.Pagination; @@ -38,7 +37,6 @@ public class SchemaRegistryRepository extends AbstractRepository { private final Map kafkaAvroDeserializers = new HashMap<>(); private final Map kafkaJsonDeserializers = new HashMap<>(); private final Map kafkaProtoDeserializers = new HashMap<>(); - private AvroSerializer avroSerializer; public PagedList list(String clusterId, Pagination pagination, Optional search) throws IOException, RestClientException, ExecutionException, InterruptedException { return PagedList.of(all(clusterId, search), pagination, list -> this.toSchemasLatestVersion(list, clusterId)); @@ -111,16 +109,16 @@ public boolean exist(String clusterId, String subject) throws IOException, RestC return found; } - public Schema getById(String clusterId, Integer id) throws IOException, RestClientException, ExecutionException, InterruptedException { + public Optional getById(String clusterId, Integer id) throws IOException, RestClientException, ExecutionException, InterruptedException { for (String subject: this.all(clusterId, Optional.empty())) { for (Schema version: this.getAllVersions(clusterId, subject)) { if (version.getId().equals(id)) { - return version; + return Optional.of(version); } } } - return null; + return Optional.empty(); } public Schema getLatestVersion(String clusterId, String subject) throws IOException, RestClientException { @@ -303,14 +301,6 @@ public Deserializer getKafkaProtoDeserializer(String clusterId) { return this.kafkaProtoDeserializers.get(clusterId); } - public AvroSerializer getAvroSerializer(String clusterId) { - if(this.avroSerializer == null){ - this.avroSerializer = new AvroSerializer(this.kafkaModule.getRegistryClient(clusterId), - getSchemaRegistryType(clusterId)); - } - return this.avroSerializer; - } - public SchemaRegistryType getSchemaRegistryType(String clusterId) { SchemaRegistryType schemaRegistryType = SchemaRegistryType.CONFLUENT; Connection.SchemaRegistry schemaRegistry = this.kafkaModule.getConnection(clusterId).getSchemaRegistry(); diff --git a/src/test/java/org/akhq/KafkaTestCluster.java b/src/test/java/org/akhq/KafkaTestCluster.java index 8441726cb..ab8249155 100644 --- a/src/test/java/org/akhq/KafkaTestCluster.java +++ b/src/test/java/org/akhq/KafkaTestCluster.java @@ -54,11 +54,12 @@ public class KafkaTestCluster implements Runnable, Stoppable { public static final String TOPIC_STREAM_MAP = "stream-map"; public static final String TOPIC_STREAM_COUNT = "stream-count"; public static final String TOPIC_CONNECT = "connect-sink"; + public static final String TOPIC_JSON_SCHEMA = "json-schema-topic"; - public static final int TOPIC_ALL_COUNT = 19; - public static final int TOPIC_HIDE_INTERNAL_COUNT = 11; - public static final int TOPIC_HIDE_INTERNAL_STREAM_COUNT = 9; - public static final int TOPIC_HIDE_STREAM_COUNT = 17; + public static final int TOPIC_ALL_COUNT = 20; + public static final int TOPIC_HIDE_INTERNAL_COUNT = 12; + public static final int TOPIC_HIDE_INTERNAL_STREAM_COUNT = 10; + public static final int TOPIC_HIDE_STREAM_COUNT = 18; public static final int CONSUMER_GROUP_COUNT = 6; public static final String CONSUMER_STREAM_TEST = "stream-test-example"; @@ -291,6 +292,10 @@ private void injectTestData() throws InterruptedException, ExecutionException { } log.debug("Huge topic created"); + // empty topic + testUtils.createTopic(TOPIC_JSON_SCHEMA, 3, (short) 1); + log.debug("{} topic created", TOPIC_JSON_SCHEMA); + // consumer groups for (int c = 0; c < 5; c++) { Properties properties = new Properties(); diff --git a/src/test/java/org/akhq/controllers/TopicControllerTest.java b/src/test/java/org/akhq/controllers/TopicControllerTest.java index b34dd3b8f..576280f4f 100644 --- a/src/test/java/org/akhq/controllers/TopicControllerTest.java +++ b/src/test/java/org/akhq/controllers/TopicControllerTest.java @@ -26,6 +26,7 @@ class TopicControllerTest extends AbstractTest { public static final String TOPIC_URL = BASE_URL + "/" + KafkaTestCluster.TOPIC_COMPACTED; public static final String CREATE_TOPIC_NAME = UUID.randomUUID().toString(); public static final String CREATE_TOPIC_URL = BASE_URL + "/" + CREATE_TOPIC_NAME; + public static final int DEFAULT_PAGE_SIZE = 5; @Test @Order(1) @@ -43,12 +44,17 @@ void defaultsConfigsApi(){ void listApi() { ResultPagedList result; + int expectedPageCount = (int) Math.ceil((double)KafkaTestCluster.TOPIC_HIDE_INTERNAL_COUNT / DEFAULT_PAGE_SIZE); result = this.retrievePagedList(HttpRequest.GET(BASE_URL), Topic.class); - assertEquals(5, result.getResults().size()); + assertEquals(expectedPageCount, result.getPage()); + assertEquals(DEFAULT_PAGE_SIZE, result.getResults().size()); result = this.retrievePagedList(HttpRequest.GET(BASE_URL + "?page=2"), Topic.class); - assertEquals(KafkaTestCluster.TOPIC_HIDE_INTERNAL_COUNT - 6, result.getResults().size()); - assertEquals("stream-test-example-count-changelog", result.getResults().get(4).getName()); + assertEquals(DEFAULT_PAGE_SIZE, result.getResults().size()); + + int expectedLastPageSize = KafkaTestCluster.TOPIC_HIDE_INTERNAL_COUNT - 2 * DEFAULT_PAGE_SIZE; + result = this.retrievePagedList(HttpRequest.GET(BASE_URL + "?page=3"), Topic.class); + assertEquals(expectedLastPageSize, result.getResults().size()); } @Test @@ -221,7 +227,7 @@ void dataDelete() { @Test @Order(6) void produceMultipleMessages() { - Map paramMap = new HashMap(); + Map paramMap = new HashMap<>(); paramMap.put("value", "key1_{\"test_1\":1}\n" + "key2_{\"test_1\":2}\n" + "key3_{\"test_1\":3}"); diff --git a/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java b/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java index 7829ec9bb..695717731 100644 --- a/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java +++ b/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java @@ -1,13 +1,13 @@ package org.akhq.modules; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.akhq.configs.SchemaRegistryType; +import org.akhq.modules.schemaregistry.AvroSerializer; import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; @@ -15,11 +15,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class AvroSchemaSerializerTest { + private static final int SCHEMA_ID = 3; private final org.apache.avro.Schema SCHEMA = SchemaBuilder .record("schema1").namespace("org.akhq") @@ -41,35 +40,30 @@ class AvroSchemaSerializerTest { " \"rating\": 2.5\n" + "}"; - private AvroSerializer cut; - - @Mock - private SchemaRegistryClient schemaRegistryClient; + private AvroSerializer avroSerializer; @BeforeEach - void setUp() throws IOException, RestClientException { - cut = new AvroSerializer(schemaRegistryClient, SchemaRegistryType.CONFLUENT); - when(schemaRegistryClient.getById(anyInt())).thenReturn(SCHEMA); + void setUp() { + avroSerializer = AvroSerializer.newInstance(SCHEMA_ID, new AvroSchema(SCHEMA), SchemaRegistryType.CONFLUENT); } @Test void shouldSerializeSchemaId() { - int schemaId = 3; - byte[] bytes = cut.toAvro(VALID_JSON, schemaId); + byte[] bytes = avroSerializer.serialize(VALID_JSON); ByteBuffer buffer = ByteBuffer.wrap(bytes); byte magicBytes = buffer.get(); int serializedSchemaId = buffer.getInt(); assertEquals(0, magicBytes); - assertEquals(schemaId, serializedSchemaId); + assertEquals(SCHEMA_ID, serializedSchemaId); } @Test void shouldFailIfDoesntMatchSchemaId() { assertThrows(NullPointerException.class, () -> { int schemaId = 3; - cut.toAvro(INVALID_JSON, schemaId); + avroSerializer.serialize(INVALID_JSON); }); } diff --git a/src/test/java/org/akhq/modules/schemaregistry/JsonSchemaSerializerTest.java b/src/test/java/org/akhq/modules/schemaregistry/JsonSchemaSerializerTest.java new file mode 100644 index 000000000..45d1e89c8 --- /dev/null +++ b/src/test/java/org/akhq/modules/schemaregistry/JsonSchemaSerializerTest.java @@ -0,0 +1,50 @@ +package org.akhq.modules.schemaregistry; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import org.akhq.configs.SchemaRegistryType; +import org.akhq.utils.Album; +import org.akhq.utils.ResourceTestUtil; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class JsonSchemaSerializerTest { + + @Test + public void serializeJsonStringWithMagicByteAndSchemaId() throws IOException { + JsonSchema jsonSchema = createJsonSchema("json_schema/album.json"); + JsonSchemaSerializer jsonSchemaSerializer = JsonSchemaSerializer.newInstance(1, jsonSchema, SchemaRegistryType.CONFLUENT); + Album objectSatisfyingJsonSchema = new Album("title", List.of("artist_1", "artist_2"), 1989, List.of("song_1", "song_2")); + String recordAsJsonString = new ObjectMapper().writeValueAsString(objectSatisfyingJsonSchema); + + byte[] serialize = jsonSchemaSerializer.serialize(recordAsJsonString); + + assertEquals(SchemaRegistryType.CONFLUENT.getMagicByte(), serialize[0]); + } + + @Test + public void failsWhenObjectDoesNotAdhereToSchema() throws IOException { + JsonSchema jsonSchema = createJsonSchema("json_schema/album.json"); + JsonSchemaSerializer jsonSchemaSerializer = JsonSchemaSerializer.newInstance(1, jsonSchema, SchemaRegistryType.CONFLUENT); + + JSONObject notSchemaValidObject = new JSONObject(Collections.singletonMap("any_property", "property value")); + try { + jsonSchemaSerializer.serialize(notSchemaValidObject.toString()); + fail("Exception should be thrown"); + } catch (Exception e) { + assertEquals(IllegalArgumentException.class, e.getClass()); + } + } + + private JsonSchema createJsonSchema(String resourcePath) throws IOException { + String schemaAsString = ResourceTestUtil.resourceAsString(resourcePath); + return new JsonSchema(schemaAsString); + } + +} \ No newline at end of file diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java index fd9e95f4f..8919c5ee5 100644 --- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java @@ -1,20 +1,28 @@ package org.akhq.repositories; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.micronaut.context.env.Environment; import lombok.extern.slf4j.Slf4j; import org.akhq.AbstractTest; import org.akhq.KafkaTestCluster; import org.akhq.models.Record; +import org.akhq.models.Schema; import org.akhq.models.Topic; +import org.akhq.utils.Album; +import org.akhq.utils.ResourceTestUtil; +import org.apache.kafka.clients.producer.RecordMetadata; import org.codehaus.httpcache4j.uri.URIBuilder; +import org.json.JSONObject; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import javax.inject.Inject; +import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import javax.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -29,9 +37,15 @@ class RecordRepositoryTest extends AbstractTest { @Inject private TopicRepository topicRepository; + @Inject + private SchemaRegistryRepository schemaRegistryRepository; + @Inject private Environment environment; + @Inject + private ObjectMapper objectMapper; + @Test void consumeEmpty() throws ExecutionException, InterruptedException { RecordRepository.Options options = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_EMPTY); @@ -225,6 +239,53 @@ void lastRecordTest() throws ExecutionException, InterruptedException { assertTrue(record.containsKey(KafkaTestCluster.TOPIC_RANDOM)); } + @Test + void produceAndConsumeRecordUsingJsonSchema() throws ExecutionException, InterruptedException, IOException, RestClientException { + Schema keyJsonSchema = registerSchema("json_schema/key.json", KafkaTestCluster.TOPIC_JSON_SCHEMA + "-key"); + Schema valueJsonSchema = registerSchema("json_schema/album.json", KafkaTestCluster.TOPIC_JSON_SCHEMA + "-value"); + Album objectSatisfyingJsonSchema = new Album("title", List.of("artist_1", "artist_2"), 1989, List.of("song_1", "song_2")); + String recordAsJsonString = objectMapper.writeValueAsString(objectSatisfyingJsonSchema); + String keyJsonString = new JSONObject(Collections.singletonMap("id", "83fff9f8-b47a-4bf7-863b-9942c4369f06")).toString(); + + RecordMetadata producedRecordMetadata = repository.produce( + KafkaTestCluster.CLUSTER_ID, + KafkaTestCluster.TOPIC_JSON_SCHEMA, + recordAsJsonString, + Collections.emptyMap(), + Optional.of(keyJsonString), + Optional.empty(), + Optional.empty(), + Optional.of(keyJsonSchema.getId()), + Optional.of(valueJsonSchema.getId()) + ); + + RecordRepository.Options options = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_JSON_SCHEMA); + List records = consumeAllRecord(options); + Optional consumedRecord = records.stream() + .filter(record -> Objects.equals(record.getKey(), keyJsonString)) + .findFirst(); + assertTrue(consumedRecord.isPresent()); + Record recordToAssert = consumedRecord.get(); + assertEquals(recordToAssert.getKey(), keyJsonString); + assertEquals(recordToAssert.getValue(), recordAsJsonString); + assertEquals(recordToAssert.getValueSchemaId(), valueJsonSchema.getId()); + + // clear schema registry as it is shared between tests + schemaRegistryRepository.delete(KafkaTestCluster.CLUSTER_ID, keyJsonSchema.getSubject()); + schemaRegistryRepository.delete(KafkaTestCluster.CLUSTER_ID, valueJsonSchema.getSubject()); + } + + private Schema registerSchema(String resourcePath, String subject) throws IOException, RestClientException { + String jsonSchemaRequest = ResourceTestUtil.resourceAsString(resourcePath); + return schemaRegistryRepository.register( + KafkaTestCluster.CLUSTER_ID, + subject, + "JSON", + jsonSchemaRequest, + Collections.emptyList() + ); + } + private int searchAll(RecordRepository.Options options) throws ExecutionException, InterruptedException { AtomicInteger size = new AtomicInteger(); AtomicBoolean hasNext = new AtomicBoolean(true); diff --git a/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java b/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java index 571d3ff06..a0f0ec4b5 100644 --- a/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/SchemaRegistryRepositoryTest.java @@ -2,15 +2,15 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import org.apache.avro.SchemaBuilder; -import org.codehaus.httpcache4j.uri.URIBuilder; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.akhq.AbstractTest; import org.akhq.KafkaTestCluster; import org.akhq.models.Schema; import org.akhq.utils.PagedList; import org.akhq.utils.Pagination; +import org.apache.avro.SchemaBuilder; +import org.codehaus.httpcache4j.uri.URIBuilder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import javax.inject.Inject; import java.io.IOException; @@ -19,9 +19,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; public class SchemaRegistryRepositoryTest extends AbstractTest { @Inject @@ -138,13 +136,19 @@ void getLatestVersionWithReferences() throws IOException, RestClientException { @Test void register() throws IOException, RestClientException, ExecutionException, InterruptedException { + int numberOfSchemasAlreadyStored = repository.listAll(KafkaTestCluster.CLUSTER_ID, Optional.empty()).size(); + repository.register(KafkaTestCluster.CLUSTER_ID, SUBJECT_1, SCHEMA_1_V1.toString(), Collections.emptyList()); repository.updateConfig(KafkaTestCluster.CLUSTER_ID, SUBJECT_1, new Schema.Config(Schema.Config.CompatibilityLevelConfig.FORWARD)); repository.register(KafkaTestCluster.CLUSTER_ID, SUBJECT_1, SCHEMA_1_V2.toString(), Collections.emptyList()); repository.register(KafkaTestCluster.CLUSTER_ID, SUBJECT_2, SCHEMA_2.toString(), Collections.emptyList()); - assertEquals(5, repository.list(KafkaTestCluster.CLUSTER_ID, new Pagination(100, URIBuilder.empty(), 1), Optional.empty()).size()); - assertEquals(SCHEMA_1_V2, repository.getLatestVersion(KafkaTestCluster.CLUSTER_ID, SUBJECT_1).getAvroSchema()); + int expectedNumberAllSchemas = numberOfSchemasAlreadyStored + 2; + assertEquals(expectedNumberAllSchemas, repository.list(KafkaTestCluster.CLUSTER_ID, new Pagination(100, URIBuilder.empty(), 1), Optional.empty()).size()); + + Schema subject1Schema = repository.getLatestVersion(KafkaTestCluster.CLUSTER_ID, SUBJECT_1); + assertEquals(SCHEMA_1_V2, subject1Schema.getAvroSchema()); + assertEquals(Schema.Config.CompatibilityLevelConfig.FORWARD, subject1Schema.getCompatibilityLevel()); assertEquals(2, repository.getAllVersions(KafkaTestCluster.CLUSTER_ID, SUBJECT_1).size()); assertEquals(SCHEMA_2, repository.getLatestVersion(KafkaTestCluster.CLUSTER_ID, SUBJECT_2).getAvroSchema()); @@ -152,10 +156,12 @@ void register() throws IOException, RestClientException, ExecutionException, Int @Test void delete() throws IOException, RestClientException, ExecutionException, InterruptedException { - repository.register(KafkaTestCluster.CLUSTER_ID, SUBJECT_1, SCHEMA_1_V1.toString(), Collections.emptyList()); + Schema registeredSchema = repository.register(KafkaTestCluster.CLUSTER_ID, SUBJECT_1, SCHEMA_1_V1.toString(), Collections.emptyList()); + repository.delete(KafkaTestCluster.CLUSTER_ID, SUBJECT_1); - assertEquals(3, repository.list(KafkaTestCluster.CLUSTER_ID, new Pagination(100, URIBuilder.empty(), 1), Optional.empty()).size()); + Optional schemaById = repository.getById(KafkaTestCluster.CLUSTER_ID, registeredSchema.getId()); + assertTrue(schemaById.isEmpty()); } @Test diff --git a/src/test/java/org/akhq/utils/ResourceTestUtil.java b/src/test/java/org/akhq/utils/ResourceTestUtil.java new file mode 100644 index 000000000..7a31ebad4 --- /dev/null +++ b/src/test/java/org/akhq/utils/ResourceTestUtil.java @@ -0,0 +1,18 @@ +package org.akhq.utils; + +import com.google.common.io.CharStreams; +import org.junit.platform.commons.util.ClassLoaderUtils; + +import java.io.IOException; +import java.io.InputStreamReader; + +public class ResourceTestUtil { + + public static String resourceAsString(String resourcePath) throws IOException { + try ( + InputStreamReader isr = new InputStreamReader(ClassLoaderUtils.getDefaultClassLoader().getResourceAsStream(resourcePath)) + ) { + return CharStreams.toString(isr); + } + } +} diff --git a/src/test/resources/json_schema/album.json b/src/test/resources/json_schema/album.json new file mode 100644 index 000000000..1f55f6d4f --- /dev/null +++ b/src/test/resources/json_schema/album.json @@ -0,0 +1,29 @@ +{ + "type": "object", + "properties": { + "title": { + "type": "string" + }, + "releaseYear": { + "type": "number" + }, + "artists": { + "type": "array", + "items": { + "type" : "string" + } + }, + "songsTitles": { + "type": "array", + "items": { + "type" : "string" + } + } + }, + "required": [ + "title", + "releaseYear", + "artists", + "songsTitles" + ] +} \ No newline at end of file diff --git a/src/test/resources/json_schema/key.json b/src/test/resources/json_schema/key.json new file mode 100644 index 000000000..d6497fcf3 --- /dev/null +++ b/src/test/resources/json_schema/key.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "properties": { + "id": { + "description": "The unique identifier for a message", + "type": "string", + "format": "uuid" + } + }, + "required": [ "id" ] +} \ No newline at end of file