Skip to content

Commit

Permalink
schema_registry: last subject deletes schema
Browse files Browse the repository at this point in the history
When the last subject version corresponding to a schema is deleted,
Schema Registry should also remove the schema.

On the topic, the schema is already considered deleted when the last
subject version is deleted, and compaction takes care of removing it.
However, without this change, compaction needs to happen and then a node
restart needs to happen before the schema is removed from memory. This
is not ideal for use cases where schemas are frequently created and
deleted (eg. Serverless), so instead we can remove the schema from
memory when its last subject version is deleted.
  • Loading branch information
pgellert committed Jul 4, 2024
1 parent 26b6202 commit 4711b4c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ ss::future<bool> sharded_store::has_schema(schema_id id) {
});
}

ss::future<> sharded_store::delete_schema(schema_id id) {
return _store.invoke_on(
shard_for(id), _smp_opts, [id](store& s) { s.delete_schema(id); });
}

ss::future<subject_schema>
sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) {
auto versions = co_await get_versions(schema.sub(), inc_del);
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class sharded_store {
is_deleted deleted);

ss::future<bool> has_schema(schema_id id);
ss::future<> delete_schema(schema_id id);
ss::future<subject_schema> has_schema(
canonical_schema schema, include_deleted inc_del = include_deleted::no);

Expand Down
10 changes: 10 additions & 0 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1492,8 +1492,18 @@ struct consume_to_store {
offset);
if (!val) {
try {
auto schema = co_await _store.get_subject_schema(
key.sub, key.version, include_deleted::yes);

co_await _store.delete_subject_version(
key.sub, key.version, force::yes);

auto remaining_subjects
= co_await _store.get_schema_subjects(
schema.id, include_deleted::yes);
if (remaining_subjects.empty()) {
co_await _store.delete_schema(schema.id);
}
} catch (exception& e) {
// This is allowed to throw not_found errors. When we
// tombstone all the records referring to a particular
Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ class store {
.second;
}

void delete_schema(schema_id id) { _schemas.erase(id); }

struct insert_subject_result {
schema_version version;
bool inserted;
Expand Down
42 changes: 42 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2937,6 +2937,48 @@ def test_delete_subject_last_clears_config(self):
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408

@cluster(num_nodes=3)
def test_hard_delete_subject_deletes_schema(self):
subject = "example_topic-key"
schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(subject=subject,
data=schema_1_data,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == {'id': 1}, f"Json: {result_raw.json()}"

self.logger.debug("Soft delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=False,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

self.logger.debug("Then hard delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=True,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

def schema_no_longer_present():
self.logger.debug("Sending get schema 1")
result_raw = self._get_schemas_ids_id(id=1, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.not_found, f'Code: {result_raw.status_code}'
assert result_raw.json()["error_code"] == 40403, \
f"Json: {result_raw.json()}"
return True

self.logger.debug("Wait until get schema 1 now eventually fails")
wait_until(schema_no_longer_present,
timeout_sec=30,
retry_on_exc=True,
err_msg="Failed to delete schema 1 in time")


class SchemaRegistryTest(SchemaRegistryTestMethods):
"""
Expand Down

0 comments on commit 4711b4c

Please sign in to comment.