Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: SchemaRegistry references support #3747

Merged
merged 18 commits into from
Jul 12, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ public Mono<ResponseEntity<Void>> deleteTopic(
.operationName("deleteTopic")
.build();

return accessControlService.validateAccess(context).then(
topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
return accessControlService.validateAccess(context)
.then(
topicsService.deleteTopic(getCluster(clusterName), topicName)
.thenReturn(ResponseEntity.ok().<Void>build())
).doOnEach(sig -> auditService.audit(context, sig));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaTypeDTO;
import com.provectus.kafka.ui.service.SchemaRegistryService;
import com.provectus.kafka.ui.sr.model.Compatibility;
import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
import com.provectus.kafka.ui.sr.model.NewSubject;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import com.provectus.kafka.ui.sr.model.SchemaType;
import java.util.List;
import java.util.Optional;
import org.mapstruct.Mapper;


@Mapper(componentModel = "spring")
@Mapper
public interface KafkaSrMapper {

default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
Expand All @@ -24,9 +27,12 @@ default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLev
.subject(s.getSubject())
.schema(s.getSchema())
.schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
.references(toDto(s.getReferences()))
.compatibilityLevel(s.getCompatibility().toString());
}

List<SchemaReferenceDTO> toDto(List<SchemaReference> references);

CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);

CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
Expand Down Expand Up @@ -217,7 +218,9 @@ private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) {
case AVRO -> new AvroJsonSchemaConverter()
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
.toJson();
case JSON -> schema.getSchema();
case JSON ->
//need to use confluent JsonSchema since it includes resolved references
((JsonSchema) parsedSchema).rawSchema().toString();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import com.provectus.kafka.ui.sr.model.NewSubject;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.util.ReactiveFailover;
import com.provectus.kafka.ui.util.WebClientConfigurator;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -92,7 +91,7 @@ public Mono<SubjectWithCompatibilityLevel> getLatestSchemaVersionBySubject(Kafka
private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
String version) {
return api(cluster)
.mono(c -> c.getSubjectVersion(schemaName, version))
.mono(c -> c.getSubjectVersion(schemaName, version, false))
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.provectus.kafka.ui.service.integration.odd;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;

// logic copied from AbstractSchemaProvider:resolveReferences
// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54
class SchemaReferencesResolver {

private final KafkaSrClientApi client;

SchemaReferencesResolver(KafkaSrClientApi client) {
this.client = client;
}

Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> refs) {
return resolveReferences(refs, new Resolving(ImmutableMap.of(), ImmutableSet.of()))
.map(Resolving::resolved);
}

private record Resolving(ImmutableMap<String, String> resolved, ImmutableSet<String> visited) {

Resolving visit(String name) {
return new Resolving(resolved, ImmutableSet.<String>builder().addAll(visited).add(name).build());
}

Resolving resolve(String ref, String schema) {
return new Resolving(ImmutableMap.<String, String>builder().putAll(resolved).put(ref, schema).build(), visited);
}
}

private Mono<Resolving> resolveReferences(@Nullable List<SchemaReference> refs, Resolving initState) {
Mono<Resolving> result = Mono.just(initState);
for (SchemaReference reference : Optional.ofNullable(refs).orElse(List.of())) {
result = result.flatMap(state -> {
if (state.visited().contains(reference.getName())) {
return Mono.just(state);
} else {
final var newState = state.visit(reference.getName());
return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
.flatMap(subj ->
resolveReferences(subj.getReferences(), newState)
.map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())));
}
});
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.service.StatisticsCache;
import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import java.net.URI;
import java.util.List;
import java.util.Map;
Expand All @@ -24,6 +25,8 @@
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@Slf4j
@RequiredArgsConstructor
Expand Down Expand Up @@ -100,12 +103,20 @@ private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster,
return Mono.just(List.of());
}
String subject = topic + (isKey ? "-key" : "-value");
return cluster.getSchemaRegistryClient()
.mono(client -> client.getSubjectVersion(subject, "latest"))
.map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
return getSubjWithResolvedRefs(cluster, subject)
.map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
.onErrorMap(WebClientResponseException.class, err ->
new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
}

private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
String subjectName) {
return cluster.getSchemaRegistryClient()
.mono(client ->
client.getSubjectVersion(subjectName, "latest", false)
.flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences())
.map(resolvedRefs -> Tuples.of(subj, resolvedRefs))));
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.provectus.kafka.ui.service.integration.odd.schema;

import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
Expand All @@ -14,8 +14,8 @@ final class AvroExtractor {
private AvroExtractor() {
}

static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
var schema = new Schema.Parser().parse(subject.getSchema());
static List<DataSetField> extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) {
var schema = avroSchema.rawSchema();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@

import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.sr.model.SchemaType;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opendatadiscovery.client.model.DataSetField;
import org.opendatadiscovery.client.model.DataSetFieldType;
import org.opendatadiscovery.oddrn.model.KafkaPath;

public final class DataSetFieldsExtractors {

public static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
public static List<DataSetField> extract(SchemaSubject subject,
Map<String, String> resolvedRefs,
KafkaPath topicOddrn,
boolean isKey) {
SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
return switch (schemaType) {
case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey);
case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey);
case AVRO -> AvroExtractor.extract(
new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case JSON -> JsonSchemaExtractor.extract(
new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(
new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ final class JsonSchemaExtractor {
private JsonSchemaExtractor() {
}

static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
Schema schema = new JsonSchema(subject.getSchema()).rawSchema();
static List<DataSetField> extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) {
Schema schema = jsonSchema.rawSchema();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.Value;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -42,8 +41,8 @@ final class ProtoExtractor {
private ProtoExtractor() {
}

static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor();
static List<DataSetField> extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) {
Descriptor schema = protobufSchema.toDescriptor();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
import com.provectus.kafka.ui.model.SchemaTypeDTO;
Expand Down Expand Up @@ -190,6 +191,58 @@ void shouldCreateNewProtobufSchema() {
Assertions.assertEquals(schema, actual.getSchema());
}


@Test
void shouldCreateNewProtobufSchemaWithRefs() {
NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
.schemaType(SchemaTypeDTO.PROTOBUF)
.subject(subject + "-ref")
.schema("""
syntax = "proto3";
message MyRecord {
int32 id = 1;
string name = 2;
}
""");

webTestClient
.post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
.exchange()
.expectStatus()
.isOk();

requestBody = new NewSchemaSubjectDTO()
.schemaType(SchemaTypeDTO.PROTOBUF)
.subject(subject)
.schema("""
syntax = "proto3";
import "MyRecord.proto";
message MyRecordWithRef {
int32 id = 1;
MyRecord my_ref = 2;
}
""")
.references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1)));

SchemaSubjectDTO actual = webTestClient
.post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
.exchange()
.expectStatus()
.isOk()
.expectBody(SchemaSubjectDTO.class)
.returnResult()
.getResponseBody();

Assertions.assertNotNull(actual);
Assertions.assertEquals(requestBody.getReferences(), actual.getReferences());
}

@Test
public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
webTestClient
Expand Down Expand Up @@ -278,7 +331,7 @@ public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
String schema =
"{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";

webTestClient
.post()
Expand Down
Loading