From ac100add6446718e9abbdbd5a42bef361b8fb350 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 19 Sep 2023 16:39:45 -0700 Subject: [PATCH] schema_registry/seq_writer: Introduce delete_config Adds `seq_writer::delete_config`, which deletes the stored compatibility level for a given subject by producing an empty-valued config write. Also adds `sharded_store::get_subject_config_written_at` which returns the sequence number history of a subject's config. This differs from `get_subject_written_at` in that the sequence markers returned cannot be used to perm delete the subject itself, only the config. Therefore, there is no need to soft-delete a subject before fetching this info. --- .../pandaproxy/schema_registry/seq_writer.cc | 67 +++++++++++++++++++ src/v/pandaproxy/schema_registry/seq_writer.h | 5 ++ .../schema_registry/sharded_store.cc | 16 ++++- .../schema_registry/sharded_store.h | 7 +- src/v/pandaproxy/schema_registry/storage.h | 36 ++++++---- src/v/pandaproxy/schema_registry/store.h | 32 ++++++++- .../pandaproxy/schema_registry/test/store.cc | 7 +- src/v/pandaproxy/schema_registry/types.h | 5 ++ 8 files changed, 155 insertions(+), 20 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index da8f7b87d46ac..f6cb455f423cf 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -228,6 +228,73 @@ ss::future seq_writer::write_config( }); } +ss::future> seq_writer::do_delete_config( + subject sub, model::offset write_at, seq_writer& seq) { + vlog(plog.debug, "delete config sub={} offset={}", sub, write_at); + + try { + co_await seq._store.get_compatibility(sub, default_to_global::no); + } catch (const exception&) { + // subject config already blank + co_return false; + } + + std::vector sequences{ + co_await _store.get_subject_config_written_at(sub)}; + + storage::record_batch_builder rb{ + model::record_batch_type::raft_data, model::offset{0}}; + + std::vector keys; + for (const auto& s : sequences) { + vlog( + plog.debug, + "Deleting config: tombstoning config_key for sub={} at {}", + sub, + s); + + vassert( + s.key_type == seq_marker_key_type::config, + "Unexpected key type: {}", + s.key_type); + + auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}}; + keys.push_back(key); + rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt); + } + + auto ts_batch = std::move(rb).build(); + kafka::partition_produce_response res + = co_await _client.local().produce_record_batch( + model::schema_registry_internal_tp, std::move(ts_batch)); + + if (res.error_code != kafka::error_code::none) { + vlog( + plog.error, + "Error writing to subject topic: {} {}", + res.error_code, + res.error_message); + throw kafka::exception(res.error_code, *res.error_message); + } + + auto applier = consume_to_store(seq._store, seq); + auto offset = res.base_offset; + for (const auto& k : keys) { + co_await applier.apply(offset, k, std::nullopt); + seq.advance_offset_inner(offset); + ++offset; + } + + co_return true; +} + +ss::future seq_writer::delete_config(subject sub) { + return sequenced_write( + [this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) { + return do_delete_config(sub, write_at, seq); + }); +} + /// Impermanent delete: update a version with is_deleted=true ss::future> seq_writer::do_delete_subject_version( subject sub, diff --git a/src/v/pandaproxy/schema_registry/seq_writer.h b/src/v/pandaproxy/schema_registry/seq_writer.h index 081daea072c7e..2dddc9ba6bc60 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.h +++ b/src/v/pandaproxy/schema_registry/seq_writer.h @@ -47,6 +47,8 @@ class seq_writer final : public ss::peering_sharded_service { ss::future write_config(std::optional sub, compatibility_level compat); + ss::future delete_config(subject sub); + ss::future delete_subject_version(subject sub, schema_version version); @@ -75,6 +77,9 @@ class seq_writer final : public ss::peering_sharded_service { model::offset write_at, seq_writer& seq); + ss::future> + do_delete_config(subject sub, model::offset write_at, seq_writer& seq); + ss::future> do_delete_subject_version( subject sub, schema_version version, diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index e1becfae68d97..67e15dc180a1b 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -459,6 +459,15 @@ sharded_store::get_subject_written_at(subject sub) { }); } +ss::future> +sharded_store::get_subject_config_written_at(subject sub) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) { + return s.store::get_subject_config_written_at(sub).value(); + }); +} + ss::future> sharded_store::get_subject_version_written_at(subject sub, schema_version ver) { auto sub_shard{shard_for(sub)}; @@ -510,11 +519,12 @@ ss::future sharded_store::set_compatibility( }); } -ss::future sharded_store::clear_compatibility(subject sub) { +ss::future +sharded_store::clear_compatibility(seq_marker marker, subject sub) { auto sub_shard{shard_for(sub)}; co_return co_await _store.invoke_on( - sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) { - return s.clear_compatibility(sub).value(); + sub_shard, _smp_opts, [marker, sub{std::move(sub)}](store& s) { + return s.clear_compatibility(marker, sub).value(); }); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index f9b317154123d..c6473f0e6bc69 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -103,6 +103,11 @@ class sharded_store { ///\brief Get sequence number history (errors out if not soft-deleted) ss::future> get_subject_written_at(subject sub); + ///\brief Get sequence number history of subject config. Subject need + /// not be soft-deleted first + ss::future> + get_subject_config_written_at(subject sub); + ///\brief Get sequence number history (errors out if not soft-deleted) ss::future> get_subject_version_written_at(subject sub, schema_version version); @@ -126,7 +131,7 @@ class sharded_store { seq_marker marker, subject sub, compatibility_level compatibility); ///\brief Clear the compatibility level for a subject. - ss::future clear_compatibility(subject sub); + ss::future clear_compatibility(seq_marker marker, subject sub); ///\brief Check if the provided schema is compatible with the subject and /// version, according the the current compatibility. diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 38aa1121930db..2028ec309b023 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -1286,19 +1286,31 @@ struct consume_to_store { } try { vlog(plog.debug, "Applying: {}", key); - if (!val) { - co_await _store.clear_compatibility(*key.sub); - } else if (key.sub) { - co_await _store.set_compatibility( - seq_marker{ - .seq = key.seq, - .node = key.node, - .version{invalid_schema_version}, // Not applicable - .key_type = seq_marker_key_type::config}, - *key.sub, - val->compat); - } else { + if (key.sub.has_value()) { + if (!val.has_value()) { + co_await _store.clear_compatibility( + seq_marker{ + .seq = key.seq, + .node = key.node, + .version{invalid_schema_version}, // Not applicable + .key_type = seq_marker_key_type::config}, + *key.sub); + } else { + co_await _store.set_compatibility( + seq_marker{ + .seq = key.seq, + .node = key.node, + .version{invalid_schema_version}, // Not applicable + .key_type = seq_marker_key_type::config}, + *key.sub, + val->compat); + } + } else if (val.has_value()) { co_await _store.set_compatibility(val->compat); + } else { + vlog( + plog.warn, + "Tried to apply config with neither subject nor value"); } } catch (const exception& e) { vlog(plog.debug, "Error replaying: {}: {}", key, e); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index ee5859f4879c0..d37936d0fa93b 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -241,6 +241,34 @@ class store { } } + /// \brief Return the seq_marker write history of a subject, but only + /// config_keys + /// + /// \return A vector (possibly empty) + result> + get_subject_config_written_at(const subject& sub) const { + auto sub_it = BOOST_OUTCOME_TRYX( + get_subject_iter(sub, include_deleted::yes)); + + // This should never happen (how can a record get into the + // store without an originating sequenced record?), but return + // an error instead of vasserting out. + if (sub_it->second.written_at.empty()) { + return not_found(sub); + } + + std::vector result; + std::copy_if( + sub_it->second.written_at.begin(), + sub_it->second.written_at.end(), + std::back_inserter(result), + [](const auto& sm) { + return sm.key_type == seq_marker_key_type::config; + }); + + return result; + } + /// \brief Return the seq_marker write history of a version. /// /// \return A vector with at least one element @@ -466,9 +494,11 @@ class store { } ///\brief Clear the compatibility level for a subject. - result clear_compatibility(const subject& sub) { + result + clear_compatibility(const seq_marker& marker, const subject& sub) { auto sub_it = BOOST_OUTCOME_TRYX( get_subject_iter(sub, include_deleted::yes)); + std::erase(sub_it->second.written_at, marker); return std::exchange(sub_it->second.compatibility, std::nullopt) != std::nullopt; } diff --git a/src/v/pandaproxy/schema_registry/test/store.cc b/src/v/pandaproxy/schema_registry/test/store.cc index 37dedae537455..483c0413a8b4e 100644 --- a/src/v/pandaproxy/schema_registry/test/store.cc +++ b/src/v/pandaproxy/schema_registry/test/store.cc @@ -471,7 +471,8 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) { BOOST_REQUIRE(s.get_compatibility().value() == global_expected); // Clearing compatibility should fallback to global - BOOST_REQUIRE(s.clear_compatibility(subject0).value() == true); + BOOST_REQUIRE( + s.clear_compatibility(dummy_marker, subject0).value() == true); BOOST_REQUIRE( s.get_compatibility(subject0, fallback).value() == global_expected); } @@ -584,7 +585,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { d_res.error().code(), pps::error_code::subject_soft_deleted); // Clearing the compatibility of a soft-deleted subject is allowed - BOOST_REQUIRE(s.clear_compatibility(subject0).has_value()); + BOOST_REQUIRE(s.clear_compatibility(dummy_marker, subject0).has_value()); v_res = s.get_versions(subject0, pps::include_deleted::yes); BOOST_REQUIRE(v_res.has_value()); @@ -622,7 +623,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { // Clearing the compatibility of a hard-deleted subject should fail BOOST_REQUIRE( - s.clear_compatibility(subject0).error().code() + s.clear_compatibility(dummy_marker, subject0).error().code() == pps::error_code::subject_not_found); } diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index da0cdeae7bfe0..69a2ecb6eaeb6 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -315,6 +315,11 @@ struct seq_marker { schema_version version; seq_marker_key_type key_type{seq_marker_key_type::invalid}; + // Note that matching nullopts is possible on the seq and node fields. + // This is intentional; both fields are particular to redpanda, so making + // them optional provides compatibility with non-rp schema registries. If + // either is not present, we can assume a collision has not occurred. + friend bool operator==(const seq_marker&, const seq_marker&) = default; friend std::ostream& operator<<(std::ostream& os, const seq_marker& v); };