Skip to content

Commit

Permalink
fix(schema-registry): fixed no schema configuration (#576)
Browse files Browse the repository at this point in the history
bug from commit aada01e that broke browsing topic data when config files without a 'schema-registry' section present were used.
  • Loading branch information
ebowden-tibco committed Jan 31, 2021
1 parent aada01e commit fbc15eb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 21 deletions.
7 changes: 3 additions & 4 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.swagger.v3.oas.annotations.Operation;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.akhq.configs.Role;
import org.akhq.models.*;
import org.akhq.modules.AbstractKafkaWrapper;
Expand Down Expand Up @@ -57,7 +56,7 @@ public class TopicController extends AbstractController {
@Inject
private AccessControlListRepository aclRepository;
@Inject
private Connection.SchemaRegistry schemaRegistry;
private SchemaRegistryRepository schemaRegistryRepository;

@Value("${akhq.topic.replication}")
private Short replicationFactor;
Expand Down Expand Up @@ -149,7 +148,7 @@ public Record produce(
keySchema,
valueSchema
),
schemaRegistry.getType(),
schemaRegistryRepository.getSchemaRegistryType(cluster),
key.map(String::getBytes).orElse(null),
value.getBytes(),
headers
Expand Down Expand Up @@ -279,7 +278,7 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio
partition,
Base64.getDecoder().decode(key)
),
schemaRegistry.getType(),
schemaRegistryRepository.getSchemaRegistryType(cluster),
Base64.getDecoder().decode(key),
null,
new HashMap<>()
Expand Down
14 changes: 4 additions & 10 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.reactivex.Flowable;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.akhq.controllers.TopicController;
import org.akhq.models.Partition;
import org.akhq.models.Record;
Expand All @@ -32,8 +31,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
Expand All @@ -57,9 +54,6 @@ public class RecordRepository extends AbstractRepository {
@Inject
private AvroWireFormatConverter avroWireFormatConverter;

@Inject
private Connection.SchemaRegistry schemaRegistry;

@Value("${akhq.topic-data.poll-timeout:1000}")
protected int pollTimeout;

Expand Down Expand Up @@ -424,22 +418,22 @@ private ConsumerRecords<byte[], byte[]> poll(KafkaConsumer<byte[], byte[]> consu
private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId) {
return new Record(
record,
this.schemaRegistry.getType(),
this.schemaRegistryRepository.getSchemaRegistryType(clusterId),
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId),
this.kafkaModule.getConnection(clusterId).getSchemaRegistry().getType())
this.schemaRegistryRepository.getSchemaRegistryType(clusterId))
);
}

private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions options) {
return new Record(
record,
this.schemaRegistry.getType(),
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId),
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId),
this.kafkaModule.getConnection(options.clusterId).getSchemaRegistry().getType())
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId))
);
}

Expand Down
22 changes: 15 additions & 7 deletions src/main/java/org/akhq/repositories/SchemaRegistryRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import com.fasterxml.jackson.databind.DeserializationFeature;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.utils.JacksonMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.akhq.configs.Connection;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.models.Schema;
import org.akhq.modules.AvroSerializer;
Expand All @@ -21,7 +21,6 @@
import javax.inject.Singleton;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -40,14 +39,14 @@ public class SchemaRegistryRepository extends AbstractRepository {
private AvroSerializer avroSerializer;

public PagedList<Schema> list(String clusterId, Pagination pagination, Optional<String> search) throws IOException, RestClientException, ExecutionException, InterruptedException {
return PagedList.of(all(clusterId, search), pagination, list -> this.toSchemasLastestVersion(list, clusterId));
return PagedList.of(all(clusterId, search), pagination, list -> this.toSchemasLatestVersion(list, clusterId));
}

public List<Schema> listAll(String clusterId, Optional<String> search) throws IOException, RestClientException {
return toSchemasLastestVersion(all(clusterId, search), clusterId);
return toSchemasLatestVersion(all(clusterId, search), clusterId);
}

private List<Schema> toSchemasLastestVersion(List<String> subjectList, String clusterId){
private List<Schema> toSchemasLatestVersion(List<String> subjectList, String clusterId){
return subjectList .stream()
.map(s -> {
try {
Expand Down Expand Up @@ -229,7 +228,7 @@ public void updateConfig(String clusterId, String subject, Schema.Config config)
public Deserializer getKafkaAvroDeserializer(String clusterId) {
if (!this.kafkaAvroDeserializers.containsKey(clusterId)) {
Deserializer deserializer;
SchemaRegistryType schemaRegistryType = this.kafkaModule.getConnection(clusterId).getSchemaRegistry().getType();
SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId);
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
try {
deserializer = (Deserializer) Class.forName("com.tibco.messaging.kafka.avro.AvroDeserializer").getDeclaredConstructor().newInstance();
Expand Down Expand Up @@ -257,11 +256,20 @@ public Deserializer getKafkaAvroDeserializer(String clusterId) {
public AvroSerializer getAvroSerializer(String clusterId) {
if(this.avroSerializer == null){
this.avroSerializer = new AvroSerializer(this.kafkaModule.getRegistryClient(clusterId),
this.kafkaModule.getConnection(clusterId).getSchemaRegistry().getType());
getSchemaRegistryType(clusterId));
}
return this.avroSerializer;
}

public SchemaRegistryType getSchemaRegistryType(String clusterId) {
SchemaRegistryType schemaRegistryType = SchemaRegistryType.CONFLUENT;
Connection.SchemaRegistry schemaRegistry = this.kafkaModule.getConnection(clusterId).getSchemaRegistry();
if (schemaRegistry != null) {
schemaRegistryType = schemaRegistry.getType();
}
return schemaRegistryType;
}

static {
JacksonMapper.INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
Expand Down

0 comments on commit fbc15eb

Please sign in to comment.