Skip to content

Commit

Permalink
DGS-11418 Ensure aliases are properly qualified
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Jun 24, 2024
1 parent 22f6421 commit ab26883
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public ExtendedSchema(Schema schema, List<String> aliases) {
this.aliases = aliases;
}

public ExtendedSchema copy() {
return new ExtendedSchema(this, aliases);
}

@io.swagger.v3.oas.annotations.media.Schema(description = ALIASES_DESC)
@JsonProperty("aliases")
public List<String> getAliases() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public static String qualifiedContextFor(String tenant, String qualifiedSubject)

public static QualifiedSubject qualifySubjectWithParent(
String tenant, String parent, String subjectWithoutTenant) {
return qualifySubjectWithParent(tenant, parent, subjectWithoutTenant, false);
}

public static QualifiedSubject qualifySubjectWithParent(
String tenant, String parent, String subjectWithoutTenant, boolean prefixTenant) {
// Since the subject has no tenant, pass the default tenant
QualifiedSubject qualifiedSubject =
QualifiedSubject.create(DEFAULT_TENANT, subjectWithoutTenant);
Expand All @@ -181,6 +186,13 @@ public static QualifiedSubject qualifySubjectWithParent(
DEFAULT_TENANT, qualifiedParent.getContext(), subjectWithoutTenant);
}
}
if (prefixTenant) {
// Prefix the tenant if prefixTenant is true.
// For example, references are stored without tenant prefixes,
// while alias replacements need the tenant.
qualifiedSubject = new QualifiedSubject(
tenant, qualifiedSubject.getContext(), qualifiedSubject.getSubject());
}
return qualifiedSubject;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@

package io.confluent.kafka.schemaregistry.rest.filters;

import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.DEFAULT_TENANT;
import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.TENANT_DELIMITER;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import java.io.IOException;
import java.net.URI;
import java.net.URLDecoder;
Expand Down Expand Up @@ -131,10 +129,9 @@ private String replaceAlias(String subject) {
}
String alias = config.getAlias();
if (alias != null && !alias.isEmpty()) {
if (!DEFAULT_TENANT.equals(schemaRegistry.tenant())) {
alias = schemaRegistry.tenant() + TENANT_DELIMITER + alias;
}
return alias;
QualifiedSubject qualAlias =
QualifiedSubject.qualifySubjectWithParent(schemaRegistry.tenant(), subject, alias, true);
return qualAlias.toQualifiedSubject();
} else {
return subject;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void schemaDeleted(
addToSchemaHashToGuid(schemaKey, schemaValue);
for (SchemaReference ref : schemaValue.getReferences()) {
QualifiedSubject refSubject = QualifiedSubject.qualifySubjectWithParent(
tenant(), schemaKey.getSubject(), ref.getSubject());
tenant(), schemaKey.getSubject(), ref.getSubject(), true);
SchemaKey refKey = new SchemaKey(refSubject.toQualifiedSubject(), ref.getVersion());
Map<String, Map<SchemaKey, Set<Integer>>> ctxRefBy =
referencedBy.getOrDefault(tenant(), Collections.emptyMap());
Expand Down Expand Up @@ -226,7 +226,7 @@ public void schemaRegistered(
addToSchemaHashToGuid(schemaKey, schemaValue);
for (SchemaReference ref : schemaValue.getReferences()) {
QualifiedSubject refSubject = QualifiedSubject.qualifySubjectWithParent(
tenant(), schemaKey.getSubject(), ref.getSubject());
tenant(), schemaKey.getSubject(), ref.getSubject(), true);
SchemaKey refKey = new SchemaKey(refSubject.toQualifiedSubject(), ref.getVersion());
Map<String, Map<SchemaKey, Set<Integer>>> ctxRefBy =
referencedBy.computeIfAbsent(tenant(), k -> new ConcurrentHashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,10 @@ private Map<String, List<String>> getAliases(String subjectPrefix)
if (alias == null) {
continue;
}
List<String> aliases = subjectToAliases.computeIfAbsent(alias, k -> new ArrayList<>());
QualifiedSubject qualAlias =
QualifiedSubject.qualifySubjectWithParent(tenant(), subjectPrefix, alias, true);
List<String> aliases = subjectToAliases.computeIfAbsent(
qualAlias.toQualifiedSubject(), k -> new ArrayList<>());
aliases.add(configValue.getSubject());
}
return subjectToAliases;
Expand Down

0 comments on commit ab26883

Please sign in to comment.