Skip to content

Commit

Permalink
schema_registry/seq_writer: Introduce delete_config
Browse files Browse the repository at this point in the history
Adds `seq_writer::delete_config`, which deletes the stored compatibility
level for a given subject by producing an empty-valued config write.

Also adds `sharded_store::get_subject_config_written_at` which returns
the sequence number history of a subject's config. This differs from
`get_subject_written_at` in that the sequence markers returned cannot
be used to perm delete the subject itself, only the config. Therefore,
there is no need to soft-delete a subject before fetching this info.
  • Loading branch information
oleiman committed Oct 23, 2023
1 parent 03fd0fa commit ac100ad
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 20 deletions.
67 changes: 67 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,73 @@ ss::future<bool> seq_writer::write_config(
});
}

ss::future<std::optional<bool>> seq_writer::do_delete_config(
subject sub, model::offset write_at, seq_writer& seq) {
vlog(plog.debug, "delete config sub={} offset={}", sub, write_at);

try {
co_await seq._store.get_compatibility(sub, default_to_global::no);
} catch (const exception&) {
// subject config already blank
co_return false;
}

std::vector<seq_marker> sequences{
co_await _store.get_subject_config_written_at(sub)};

storage::record_batch_builder rb{
model::record_batch_type::raft_data, model::offset{0}};

std::vector<config_key> keys;
for (const auto& s : sequences) {
vlog(
plog.debug,
"Deleting config: tombstoning config_key for sub={} at {}",
sub,
s);

vassert(
s.key_type == seq_marker_key_type::config,
"Unexpected key type: {}",
s.key_type);

auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}};
keys.push_back(key);
rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt);
}

auto ts_batch = std::move(rb).build();
kafka::partition_produce_response res
= co_await _client.local().produce_record_batch(
model::schema_registry_internal_tp, std::move(ts_batch));

if (res.error_code != kafka::error_code::none) {
vlog(
plog.error,
"Error writing to subject topic: {} {}",
res.error_code,
res.error_message);
throw kafka::exception(res.error_code, *res.error_message);
}

auto applier = consume_to_store(seq._store, seq);
auto offset = res.base_offset;
for (const auto& k : keys) {
co_await applier.apply(offset, k, std::nullopt);
seq.advance_offset_inner(offset);
++offset;
}

co_return true;
}

ss::future<bool> seq_writer::delete_config(subject sub) {
return sequenced_write(
[this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return do_delete_config(sub, write_at, seq);
});
}

/// Impermanent delete: update a version with is_deleted=true
ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
subject sub,
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
ss::future<bool>
write_config(std::optional<subject> sub, compatibility_level compat);

ss::future<bool> delete_config(subject sub);

ss::future<bool>
delete_subject_version(subject sub, schema_version version);

Expand Down Expand Up @@ -75,6 +77,9 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
model::offset write_at,
seq_writer& seq);

ss::future<std::optional<bool>>
do_delete_config(subject sub, model::offset write_at, seq_writer& seq);

ss::future<std::optional<bool>> do_delete_subject_version(
subject sub,
schema_version version,
Expand Down
16 changes: 13 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ sharded_store::get_subject_written_at(subject sub) {
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_config_written_at(subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.store::get_subject_config_written_at(sub).value();
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {
auto sub_shard{shard_for(sub)};
Expand Down Expand Up @@ -510,11 +519,12 @@ ss::future<bool> sharded_store::set_compatibility(
});
}

ss::future<bool> sharded_store::clear_compatibility(subject sub) {
ss::future<bool>
sharded_store::clear_compatibility(seq_marker marker, subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.clear_compatibility(sub).value();
sub_shard, _smp_opts, [marker, sub{std::move(sub)}](store& s) {
return s.clear_compatibility(marker, sub).value();
});
}

Expand Down
7 changes: 6 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class sharded_store {
///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>> get_subject_written_at(subject sub);

///\brief Get sequence number history of subject config. Subject need
/// not be soft-deleted first
ss::future<std::vector<seq_marker>>
get_subject_config_written_at(subject sub);

///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>>
get_subject_version_written_at(subject sub, schema_version version);
Expand All @@ -126,7 +131,7 @@ class sharded_store {
seq_marker marker, subject sub, compatibility_level compatibility);

///\brief Clear the compatibility level for a subject.
ss::future<bool> clear_compatibility(subject sub);
ss::future<bool> clear_compatibility(seq_marker marker, subject sub);

///\brief Check if the provided schema is compatible with the subject and
/// version, according the the current compatibility.
Expand Down
36 changes: 24 additions & 12 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1286,19 +1286,31 @@ struct consume_to_store {
}
try {
vlog(plog.debug, "Applying: {}", key);
if (!val) {
co_await _store.clear_compatibility(*key.sub);
} else if (key.sub) {
co_await _store.set_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub,
val->compat);
} else {
if (key.sub.has_value()) {
if (!val.has_value()) {
co_await _store.clear_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub);
} else {
co_await _store.set_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub,
val->compat);
}
} else if (val.has_value()) {
co_await _store.set_compatibility(val->compat);
} else {
vlog(
plog.warn,
"Tried to apply config with neither subject nor value");
}
} catch (const exception& e) {
vlog(plog.debug, "Error replaying: {}: {}", key, e);
Expand Down
32 changes: 31 additions & 1 deletion src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,34 @@ class store {
}
}

/// \brief Return the seq_marker write history of a subject, but only
/// config_keys
///
/// \return A vector (possibly empty)
result<std::vector<seq_marker>>
get_subject_config_written_at(const subject& sub) const {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));

// This should never happen (how can a record get into the
// store without an originating sequenced record?), but return
// an error instead of vasserting out.
if (sub_it->second.written_at.empty()) {
return not_found(sub);
}

std::vector<seq_marker> result;
std::copy_if(
sub_it->second.written_at.begin(),
sub_it->second.written_at.end(),
std::back_inserter(result),
[](const auto& sm) {
return sm.key_type == seq_marker_key_type::config;
});

return result;
}

/// \brief Return the seq_marker write history of a version.
///
/// \return A vector with at least one element
Expand Down Expand Up @@ -466,9 +494,11 @@ class store {
}

///\brief Clear the compatibility level for a subject.
result<bool> clear_compatibility(const subject& sub) {
result<bool>
clear_compatibility(const seq_marker& marker, const subject& sub) {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));
std::erase(sub_it->second.written_at, marker);
return std::exchange(sub_it->second.compatibility, std::nullopt)
!= std::nullopt;
}
Expand Down
7 changes: 4 additions & 3 deletions src/v/pandaproxy/schema_registry/test/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) {
BOOST_REQUIRE(s.get_compatibility().value() == global_expected);

// Clearing compatibility should fallback to global
BOOST_REQUIRE(s.clear_compatibility(subject0).value() == true);
BOOST_REQUIRE(
s.clear_compatibility(dummy_marker, subject0).value() == true);
BOOST_REQUIRE(
s.get_compatibility(subject0, fallback).value() == global_expected);
}
Expand Down Expand Up @@ -584,7 +585,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {
d_res.error().code(), pps::error_code::subject_soft_deleted);

// Clearing the compatibility of a soft-deleted subject is allowed
BOOST_REQUIRE(s.clear_compatibility(subject0).has_value());
BOOST_REQUIRE(s.clear_compatibility(dummy_marker, subject0).has_value());

v_res = s.get_versions(subject0, pps::include_deleted::yes);
BOOST_REQUIRE(v_res.has_value());
Expand Down Expand Up @@ -622,7 +623,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {

// Clearing the compatibility of a hard-deleted subject should fail
BOOST_REQUIRE(
s.clear_compatibility(subject0).error().code()
s.clear_compatibility(dummy_marker, subject0).error().code()
== pps::error_code::subject_not_found);
}

Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ struct seq_marker {
schema_version version;
seq_marker_key_type key_type{seq_marker_key_type::invalid};

// Note that matching nullopts is possible on the seq and node fields.
// This is intentional; both fields are particular to redpanda, so making
// them optional provides compatibility with non-rp schema registries. If
// either is not present, we can assume a collision has not occurred.
friend bool operator==(const seq_marker&, const seq_marker&) = default;
friend std::ostream& operator<<(std::ostream& os, const seq_marker& v);
};

Expand Down

0 comments on commit ac100ad

Please sign in to comment.