Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-5083 schema_registry: last subject deletes schema #20847

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ ss::future<bool> sharded_store::has_schema(schema_id id) {
});
}

ss::future<> sharded_store::delete_schema(schema_id id) {
return _store.invoke_on(
shard_for(id), _smp_opts, [id](store& s) { s.delete_schema(id); });
}

ss::future<subject_schema>
sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) {
auto versions = co_await get_versions(schema.sub(), inc_del);
Expand Down Expand Up @@ -394,7 +399,7 @@ ss::future<bool> sharded_store::is_referenced(subject sub, schema_version ver) {
// Find whether any subject version reference any of the schema
co_return co_await _store.map_reduce0(
[refs{std::move(references)}](store& s) {
return s.subject_versions_has_any_of(refs);
return s.subject_versions_has_any_of(refs, include_deleted::no);
},
false,
std::logical_or<>{});
Expand Down Expand Up @@ -503,11 +508,30 @@ sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {

ss::future<bool> sharded_store::delete_subject_version(
subject sub, schema_version ver, force force) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
auto sub_shard = shard_for(sub);
auto [schema_id, result] = co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}, ver, force](store& s) {
return s.delete_subject_version(sub, ver, force).value();
auto schema_id = s.get_subject_version_id(
sub, ver, include_deleted::yes)
.value()
.id;
auto result = s.delete_subject_version(sub, ver, force).value();
BenPope marked this conversation as resolved.
Show resolved Hide resolved
return std::make_pair(schema_id, result);
});

auto remaining_subjects_exist = co_await _store.map_reduce0(
[schema_id](store& s) {
return s.subject_versions_has_any_of(
{schema_id}, include_deleted::yes);
},
false,
std::logical_or{});

if (!remaining_subjects_exist) {
co_await delete_schema(schema_id);
}

co_return result;
}

ss::future<mode> sharded_store::get_mode() {
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class sharded_store {
private:
ss::future<bool>
upsert_schema(schema_id id, canonical_schema_definition def);
ss::future<> delete_schema(schema_id id);

struct insert_subject_result {
schema_version version;
Expand Down
14 changes: 9 additions & 5 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,13 @@ class store {
return has_ids;
}

bool subject_versions_has_any_of(const schema_id_set& ids) {
return absl::c_any_of(_subjects, [&ids](const auto& s) {
return absl::c_any_of(s.second.versions, [&ids, &s](const auto& v) {
return !s.second.deleted && ids.contains(v.id);
});
bool subject_versions_has_any_of(
const schema_id_set& ids, include_deleted inc_del) {
return absl::c_any_of(_subjects, [&ids, inc_del](const auto& s) {
return absl::c_any_of(
s.second.versions, [&ids, &s, inc_del](const auto& v) {
return (inc_del || !s.second.deleted) && ids.contains(v.id);
});
});
}

Expand Down Expand Up @@ -628,6 +630,8 @@ class store {
.second;
}

void delete_schema(schema_id id) { _schemas.erase(id); }

struct insert_subject_result {
schema_version version;
bool inserted;
Expand Down
42 changes: 42 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2937,6 +2937,48 @@ def test_delete_subject_last_clears_config(self):
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408

@cluster(num_nodes=3)
def test_hard_delete_subject_deletes_schema(self):
subject = "example_topic-key"
schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(subject=subject,
data=schema_1_data,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == {'id': 1}, f"Json: {result_raw.json()}"

self.logger.debug("Soft delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=False,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

self.logger.debug("Then hard delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=True,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

def schema_no_longer_present():
self.logger.debug("Sending get schema 1")
result_raw = self._get_schemas_ids_id(id=1, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.not_found, f'Code: {result_raw.status_code}'
assert result_raw.json()["error_code"] == 40403, \
f"Json: {result_raw.json()}"
return True

self.logger.debug("Wait until get schema 1 now eventually fails")
wait_until(schema_no_longer_present,
timeout_sec=30,
retry_on_exc=True,
err_msg="Failed to delete schema 1 in time")


class SchemaRegistryTest(SchemaRegistryTestMethods):
"""
Expand Down