From d972155bd7555a0dd79884a611f0526e99707c79 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 21 Sep 2023 22:08:16 -0700 Subject: [PATCH] Tombstones and supporting code --- .../pandaproxy/schema_registry/seq_writer.cc | 59 +++++++++++++++++-- src/v/pandaproxy/schema_registry/seq_writer.h | 6 +- .../schema_registry/sharded_store.cc | 9 +++ .../schema_registry/sharded_store.h | 5 ++ src/v/pandaproxy/schema_registry/store.h | 25 ++++++++ 5 files changed, 95 insertions(+), 9 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index 2017e111dcf54..256b1c281ea99 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -229,19 +229,66 @@ ss::future seq_writer::write_config( } ss::future> seq_writer::do_delete_config( - std::optional sub, model::offset write_at, seq_writer& seq) { + subject sub, model::offset write_at, seq_writer& seq) { vlog(plog.debug, "delete config sub={} offset={}", sub, write_at); try { - if (sub.has_value()) { - (void)co_await seq._store.get_compatibility( - sub.value(), default_to_global::no); - } + (void)co_await seq._store.get_compatibility(sub, default_to_global::no); } catch (const exception&) { // subject config already blank co_return false; } + std::vector sequences{ + co_await _store.get_subject_config_written_at(sub)}; + + storage::record_batch_builder rb{ + model::record_batch_type::raft_data, model::offset{0}}; + + std::vector keys; + for (auto s : sequences) { + vlog( + plog.debug, + "Deleting config: tombstoning config_key for sub={} at {}", + sub, + s); + + switch (s.key_type) { + case seq_marker_key_type::config: { + auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}}; + keys.push_back(key); + rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt); + } break; + default: + vassert(false, "Unexpected key type: {}", s.key_type); + } + } + + auto ts_batch = std::move(rb).build(); + kafka::partition_produce_response res + = co_await _client.local().produce_record_batch( + model::schema_registry_internal_tp, std::move(ts_batch)); + + if (res.error_code != kafka::error_code::none) { + vlog( + plog.error, + "Error writing to subject topic: {} {}", + res.error_code, + res.error_message); + throw kafka::exception(res.error_code, *res.error_message); + } + + auto applier = consume_to_store(seq._store, seq); + auto offset = res.base_offset; + for (const auto& k : keys) { + co_await applier.apply(offset, k, std::nullopt); + seq.advance_offset_inner(offset); + offset++; + } + + // co_return true; + + // TODO(oren): seems like this might not be necessary? auto key = config_key{.seq{write_at}, .node{seq._node_id}, .sub{sub}}; auto batch = as_record_batch(key, std::nullopt); auto success = co_await seq.produce_and_check(write_at, std::move(batch)); @@ -256,7 +303,7 @@ ss::future> seq_writer::do_delete_config( } } -ss::future seq_writer::delete_config(std::optional sub) { +ss::future seq_writer::delete_config(subject sub) { return sequenced_write( [this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) { return do_delete_config(sub, write_at, seq); diff --git a/src/v/pandaproxy/schema_registry/seq_writer.h b/src/v/pandaproxy/schema_registry/seq_writer.h index 49a1deed4dd3f..2dddc9ba6bc60 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.h +++ b/src/v/pandaproxy/schema_registry/seq_writer.h @@ -47,7 +47,7 @@ class seq_writer final : public ss::peering_sharded_service { ss::future write_config(std::optional sub, compatibility_level compat); - ss::future delete_config(std::optional sub); + ss::future delete_config(subject sub); ss::future delete_subject_version(subject sub, schema_version version); @@ -77,8 +77,8 @@ class seq_writer final : public ss::peering_sharded_service { model::offset write_at, seq_writer& seq); - ss::future> do_delete_config( - std::optional sub, model::offset write_at, seq_writer& seq); + ss::future> + do_delete_config(subject sub, model::offset write_at, seq_writer& seq); ss::future> do_delete_subject_version( subject sub, diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index e1becfae68d97..7d2192b5ea96c 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -459,6 +459,15 @@ sharded_store::get_subject_written_at(subject sub) { }); } +ss::future> +sharded_store::get_subject_config_written_at(subject sub) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) { + return s.store::get_subject_config_written_at(sub).value(); + }); +} + ss::future> sharded_store::get_subject_version_written_at(subject sub, schema_version ver) { auto sub_shard{shard_for(sub)}; diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index f9b317154123d..6802365aed744 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -103,6 +103,11 @@ class sharded_store { ///\brief Get sequence number history (errors out if not soft-deleted) ss::future> get_subject_written_at(subject sub); + ///\brief Get sequence number history of subject config. Subject need + /// not be soft-deleted first + ss::future> + get_subject_config_written_at(subject sub); + ///\brief Get sequence number history (errors out if not soft-deleted) ss::future> get_subject_version_written_at(subject sub, schema_version version); diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index ee5859f4879c0..20e872d857728 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -241,6 +241,31 @@ class store { } } + /// \brief Return the seq_marker write history of a subject, but only + /// config_keys + /// + /// \return A vector (possibly empty) + result> + get_subject_config_written_at(const subject& sub) const { + auto sub_it = BOOST_OUTCOME_TRYX( + get_subject_iter(sub, include_deleted::yes)); + + if (sub_it->second.written_at.empty()) { + return not_found(sub); + } + + std::vector result; + std::copy_if( + std::begin(sub_it->second.written_at), + std::end(sub_it->second.written_at), + std::back_inserter(result), + [](const auto& sm) { + return sm.key_type == seq_marker_key_type::config; + }); + + return std::move(result); + } + /// \brief Return the seq_marker write history of a version. /// /// \return A vector with at least one element