Skip to content

Commit

Permalink
schema_registry: last subject deletes schema
Browse files Browse the repository at this point in the history
When the last subject version corresponding to a schema is deleted,
Schema Registry should also remove the schema.

On the topic, the schema is already considered deleted when the last
subject version is deleted, and compaction takes care of removing it.
However, without this change, compaction needs to happen and then a node
restart needs to happen before the schema is removed from memory. This
is not ideal for use cases where schemas are frequently created and
deleted (eg. Serverless), so instead we can remove the schema from
memory when its last subject version is deleted.
  • Loading branch information
pgellert committed Jul 5, 2024
1 parent 1edb81a commit d85de7b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 3 deletions.
30 changes: 27 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ ss::future<bool> sharded_store::has_schema(schema_id id) {
});
}

ss::future<> sharded_store::delete_schema(schema_id id) {
return _store.invoke_on(
shard_for(id), _smp_opts, [id](store& s) { s.delete_schema(id); });
}

ss::future<subject_schema>
sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) {
auto versions = co_await get_versions(schema.sub(), inc_del);
Expand Down Expand Up @@ -503,11 +508,30 @@ sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {

ss::future<bool> sharded_store::delete_subject_version(
subject sub, schema_version ver, force force) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
auto sub_shard = shard_for(sub);
auto [schema_id, result] = co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}, ver, force](store& s) {
return s.delete_subject_version(sub, ver, force).value();
auto schema_id = s.get_subject_version_id(
sub, ver, include_deleted::yes)
.value()
.id;
auto result = s.delete_subject_version(sub, ver, force).value();
return std::make_pair(schema_id, result);
});

auto remaining_subjects_exist = co_await _store.map_reduce0(
[schema_id](store& s) {
return s.subject_versions_has_any_of(
{schema_id}, include_deleted::yes);
},
false,
std::logical_or{});

if (!remaining_subjects_exist) {
co_await delete_schema(schema_id);
}

co_return result;
}

ss::future<mode> sharded_store::get_mode() {
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class sharded_store {
private:
ss::future<bool>
upsert_schema(schema_id id, canonical_schema_definition def);
ss::future<> delete_schema(schema_id id);

struct insert_subject_result {
schema_version version;
Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ class store {
.second;
}

void delete_schema(schema_id id) { _schemas.erase(id); }

struct insert_subject_result {
schema_version version;
bool inserted;
Expand Down
42 changes: 42 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2937,6 +2937,48 @@ def test_delete_subject_last_clears_config(self):
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408

@cluster(num_nodes=3)
def test_hard_delete_subject_deletes_schema(self):
subject = "example_topic-key"
schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(subject=subject,
data=schema_1_data,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == {'id': 1}, f"Json: {result_raw.json()}"

self.logger.debug("Soft delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=False,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

self.logger.debug("Then hard delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=True,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

def schema_no_longer_present():
self.logger.debug("Sending get schema 1")
result_raw = self._get_schemas_ids_id(id=1, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.not_found, f'Code: {result_raw.status_code}'
assert result_raw.json()["error_code"] == 40403, \
f"Json: {result_raw.json()}"
return True

self.logger.debug("Wait until get schema 1 now eventually fails")
wait_until(schema_no_longer_present,
timeout_sec=30,
retry_on_exc=True,
err_msg="Failed to delete schema 1 in time")


class SchemaRegistryTest(SchemaRegistryTestMethods):
"""
Expand Down

0 comments on commit d85de7b

Please sign in to comment.