Skip to content

Commit

Permalink
Merge pull request #20847 from pgellert/sr/fix-permanent-delete-schema
Browse files Browse the repository at this point in the history
CORE-5083 schema_registry: last subject deletes schema
  • Loading branch information
aanthony-rp authored Jul 5, 2024
2 parents fa0a2e1 + d85de7b commit 0b32183
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 9 deletions.
32 changes: 28 additions & 4 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ ss::future<bool> sharded_store::has_schema(schema_id id) {
});
}

ss::future<> sharded_store::delete_schema(schema_id id) {
return _store.invoke_on(
shard_for(id), _smp_opts, [id](store& s) { s.delete_schema(id); });
}

ss::future<subject_schema>
sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) {
auto versions = co_await get_versions(schema.sub(), inc_del);
Expand Down Expand Up @@ -394,7 +399,7 @@ ss::future<bool> sharded_store::is_referenced(subject sub, schema_version ver) {
// Find whether any subject version reference any of the schema
co_return co_await _store.map_reduce0(
[refs{std::move(references)}](store& s) {
return s.subject_versions_has_any_of(refs);
return s.subject_versions_has_any_of(refs, include_deleted::no);
},
false,
std::logical_or<>{});
Expand Down Expand Up @@ -503,11 +508,30 @@ sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {

ss::future<bool> sharded_store::delete_subject_version(
subject sub, schema_version ver, force force) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
auto sub_shard = shard_for(sub);
auto [schema_id, result] = co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}, ver, force](store& s) {
return s.delete_subject_version(sub, ver, force).value();
auto schema_id = s.get_subject_version_id(
sub, ver, include_deleted::yes)
.value()
.id;
auto result = s.delete_subject_version(sub, ver, force).value();
return std::make_pair(schema_id, result);
});

auto remaining_subjects_exist = co_await _store.map_reduce0(
[schema_id](store& s) {
return s.subject_versions_has_any_of(
{schema_id}, include_deleted::yes);
},
false,
std::logical_or{});

if (!remaining_subjects_exist) {
co_await delete_schema(schema_id);
}

co_return result;
}

ss::future<mode> sharded_store::get_mode() {
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class sharded_store {
private:
ss::future<bool>
upsert_schema(schema_id id, canonical_schema_definition def);
ss::future<> delete_schema(schema_id id);

struct insert_subject_result {
schema_version version;
Expand Down
14 changes: 9 additions & 5 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,13 @@ class store {
return has_ids;
}

bool subject_versions_has_any_of(const schema_id_set& ids) {
return absl::c_any_of(_subjects, [&ids](const auto& s) {
return absl::c_any_of(s.second.versions, [&ids, &s](const auto& v) {
return !s.second.deleted && ids.contains(v.id);
});
bool subject_versions_has_any_of(
const schema_id_set& ids, include_deleted inc_del) {
return absl::c_any_of(_subjects, [&ids, inc_del](const auto& s) {
return absl::c_any_of(
s.second.versions, [&ids, &s, inc_del](const auto& v) {
return (inc_del || !s.second.deleted) && ids.contains(v.id);
});
});
}

Expand Down Expand Up @@ -628,6 +630,8 @@ class store {
.second;
}

void delete_schema(schema_id id) { _schemas.erase(id); }

struct insert_subject_result {
schema_version version;
bool inserted;
Expand Down
42 changes: 42 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2937,6 +2937,48 @@ def test_delete_subject_last_clears_config(self):
assert result_raw.status_code == requests.codes.not_found
assert result_raw.json()["error_code"] == 40408

@cluster(num_nodes=3)
def test_hard_delete_subject_deletes_schema(self):
subject = "example_topic-key"
schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(subject=subject,
data=schema_1_data,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'
assert result_raw.json() == {'id': 1}, f"Json: {result_raw.json()}"

self.logger.debug("Soft delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=False,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

self.logger.debug("Then hard delete subject")
result_raw = self._delete_subject(subject=subject,
permanent=True,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok, f'Code: {result_raw.status_code}'

def schema_no_longer_present():
self.logger.debug("Sending get schema 1")
result_raw = self._get_schemas_ids_id(id=1, auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.not_found, f'Code: {result_raw.status_code}'
assert result_raw.json()["error_code"] == 40403, \
f"Json: {result_raw.json()}"
return True

self.logger.debug("Wait until get schema 1 now eventually fails")
wait_until(schema_no_longer_present,
timeout_sec=30,
retry_on_exc=True,
err_msg="Failed to delete schema 1 in time")


class SchemaRegistryTest(SchemaRegistryTestMethods):
"""
Expand Down

0 comments on commit 0b32183

Please sign in to comment.