From fbc15ebde3475b2d60c079367452e0c54556b21d Mon Sep 17 00:00:00 2001 From: Eric Bowden Date: Sun, 31 Jan 2021 13:34:24 -0600 Subject: [PATCH] fix(schema-registry): fixed no schema configuration (#576) bug from commit aada01e36407a3cc1799263c0996fb100905829a that broke browsing topic data when config files without a 'schema-registry' section present were used. --- .../org/akhq/controllers/TopicController.java | 7 +++--- .../akhq/repositories/RecordRepository.java | 14 ++++-------- .../SchemaRegistryRepository.java | 22 +++++++++++++------ 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 951e2e36b..2e3eaf612 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -17,7 +17,6 @@ import io.swagger.v3.oas.annotations.Operation; import lombok.*; import lombok.extern.slf4j.Slf4j; -import org.akhq.configs.Connection; import org.akhq.configs.Role; import org.akhq.models.*; import org.akhq.modules.AbstractKafkaWrapper; @@ -57,7 +56,7 @@ public class TopicController extends AbstractController { @Inject private AccessControlListRepository aclRepository; @Inject - private Connection.SchemaRegistry schemaRegistry; + private SchemaRegistryRepository schemaRegistryRepository; @Value("${akhq.topic.replication}") private Short replicationFactor; @@ -149,7 +148,7 @@ public Record produce( keySchema, valueSchema ), - schemaRegistry.getType(), + schemaRegistryRepository.getSchemaRegistryType(cluster), key.map(String::getBytes).orElse(null), value.getBytes(), headers @@ -279,7 +278,7 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio partition, Base64.getDecoder().decode(key) ), - schemaRegistry.getType(), + schemaRegistryRepository.getSchemaRegistryType(cluster), Base64.getDecoder().decode(key), null, new HashMap<>() diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 51ffe8235..9f6080dbb 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -9,7 +9,6 @@ import io.reactivex.Flowable; import lombok.*; import lombok.extern.slf4j.Slf4j; -import org.akhq.configs.Connection; import org.akhq.controllers.TopicController; import org.akhq.models.Partition; import org.akhq.models.Record; @@ -32,8 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.inject.Inject; @@ -57,9 +54,6 @@ public class RecordRepository extends AbstractRepository { @Inject private AvroWireFormatConverter avroWireFormatConverter; - @Inject - private Connection.SchemaRegistry schemaRegistry; - @Value("${akhq.topic-data.poll-timeout:1000}") protected int pollTimeout; @@ -424,22 +418,22 @@ private ConsumerRecords poll(KafkaConsumer consu private Record newRecord(ConsumerRecord record, String clusterId) { return new Record( record, - this.schemaRegistry.getType(), + this.schemaRegistryRepository.getSchemaRegistryType(clusterId), this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId), this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId), avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId), - this.kafkaModule.getConnection(clusterId).getSchemaRegistry().getType()) + this.schemaRegistryRepository.getSchemaRegistryType(clusterId)) ); } private Record newRecord(ConsumerRecord record, BaseOptions options) { return new Record( record, - this.schemaRegistry.getType(), + this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId), this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId), this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId), avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId), - this.kafkaModule.getConnection(options.clusterId).getSchemaRegistry().getType()) + this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)) ); } diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index 65a0740f1..3214e3feb 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -2,13 +2,13 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.RestService; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.utils.JacksonMapper; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.akhq.configs.Connection; import org.akhq.configs.SchemaRegistryType; import org.akhq.models.Schema; import org.akhq.modules.AvroSerializer; @@ -21,7 +21,6 @@ import javax.inject.Singleton; import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -40,14 +39,14 @@ public class SchemaRegistryRepository extends AbstractRepository { private AvroSerializer avroSerializer; public PagedList list(String clusterId, Pagination pagination, Optional search) throws IOException, RestClientException, ExecutionException, InterruptedException { - return PagedList.of(all(clusterId, search), pagination, list -> this.toSchemasLastestVersion(list, clusterId)); + return PagedList.of(all(clusterId, search), pagination, list -> this.toSchemasLatestVersion(list, clusterId)); } public List listAll(String clusterId, Optional search) throws IOException, RestClientException { - return toSchemasLastestVersion(all(clusterId, search), clusterId); + return toSchemasLatestVersion(all(clusterId, search), clusterId); } - private List toSchemasLastestVersion(List subjectList, String clusterId){ + private List toSchemasLatestVersion(List subjectList, String clusterId){ return subjectList .stream() .map(s -> { try { @@ -229,7 +228,7 @@ public void updateConfig(String clusterId, String subject, Schema.Config config) public Deserializer getKafkaAvroDeserializer(String clusterId) { if (!this.kafkaAvroDeserializers.containsKey(clusterId)) { Deserializer deserializer; - SchemaRegistryType schemaRegistryType = this.kafkaModule.getConnection(clusterId).getSchemaRegistry().getType(); + SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId); if (schemaRegistryType == SchemaRegistryType.TIBCO) { try { deserializer = (Deserializer) Class.forName("com.tibco.messaging.kafka.avro.AvroDeserializer").getDeclaredConstructor().newInstance(); @@ -257,11 +256,20 @@ public Deserializer getKafkaAvroDeserializer(String clusterId) { public AvroSerializer getAvroSerializer(String clusterId) { if(this.avroSerializer == null){ this.avroSerializer = new AvroSerializer(this.kafkaModule.getRegistryClient(clusterId), - this.kafkaModule.getConnection(clusterId).getSchemaRegistry().getType()); + getSchemaRegistryType(clusterId)); } return this.avroSerializer; } + public SchemaRegistryType getSchemaRegistryType(String clusterId) { + SchemaRegistryType schemaRegistryType = SchemaRegistryType.CONFLUENT; + Connection.SchemaRegistry schemaRegistry = this.kafkaModule.getConnection(clusterId).getSchemaRegistry(); + if (schemaRegistry != null) { + schemaRegistryType = schemaRegistry.getType(); + } + return schemaRegistryType; + } + static { JacksonMapper.INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); }