Skip to content

Commit

Permalink
feat(schema): fallback in case of subject matching the schema id but …
Browse files Browse the repository at this point in the history
…not the TopicNameStrategy (#1820)

Fix #1795
  • Loading branch information
AlexisSouquiere authored Jun 27, 2024
1 parent 9f687f6 commit ef7aa28
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions src/main/java/org/akhq/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ private Schema registerSchema(String cluster, @Body Schema schema) throws IOExce
return register;
}

/**
* Find a subject by the schema id
* In case of several subjects matching the schema id, we use the topic name to get the most relevant subject that
* matches the topic name (TopicNameStrategy). If there is no topic or if the topic doesn't match any subject,
* return the first subject that matches the schema id.
*
* @param request - The HTTP request
* @param cluster - The cluster name
* @param id - The schema id
* @param topic - (Optional) The topic name
* @return the most relevant subject
*
* @throws IOException
* @throws RestClientException
*/
@Get("api/{cluster}/schema/id/{id}")
@Operation(tags = {"schema registry"}, summary = "Find a subject by the schema id")
public Schema getSubjectBySchemaIdAndTopic(
Expand All @@ -168,11 +183,15 @@ public Schema getSubjectBySchemaIdAndTopic(
// TODO Do the check on the subject name too
checkIfClusterAllowed(cluster);

return this.schemaRepository.getSubjectsBySchemaId(cluster, id)
.stream()
List<Schema> schemas = this.schemaRepository.getSubjectsBySchemaId(cluster, id);

// No topic, return the first subject that matches
// If several subjects match the topic, return the first one
return schemas.stream()
.filter(s -> topic == null || s.getSubject().contains(topic))
.findFirst()
.orElse(null);
// If there is a topic but no match, return the first one that matches to handle subjects not following TopicNameStrategy
.orElseGet(() -> schemas.isEmpty() ? null : schemas.get(0));
}

@Get("api/{cluster}/schema/{subject}/version")
Expand Down

0 comments on commit ef7aa28

Please sign in to comment.