Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: perform a full internal scrub at the end of TS tests #14349

Merged
merged 8 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ static std::unique_ptr<scrubber> maybe_make_scrubber(
remote,
feature_table,
config::shard_local_cfg().cloud_storage_enable_scrubbing.bind(),
config::shard_local_cfg().cloud_storage_scrubbing_interval_ms.bind(),
config::shard_local_cfg()
.cloud_storage_partial_scrub_interval_ms.bind(),
config::shard_local_cfg().cloud_storage_full_scrub_interval_ms.bind(),
config::shard_local_cfg()
.cloud_storage_scrubbing_interval_jitter_ms.bind());
result->set_enabled(am_leader);
Expand Down Expand Up @@ -579,6 +581,26 @@ ss::future<std::error_code> ntp_archiver::process_anomalies(
co_return error;
}

ss::future<std::error_code> ntp_archiver::reset_scrubbing_metadata() {
auto sync_timeout = config::shard_local_cfg()
.cloud_storage_metadata_sync_timeout_ms.value();
auto deadline = ss::lowres_clock::now() + sync_timeout;
auto batch = _parent.archival_meta_stm()->batch_start(deadline, _as);
batch.reset_scrubbing_metadata();
auto error = co_await batch.replicate();

if (error != cluster::errc::success) {
vlog(
_rtclog.warn,
"Failed to replicate reset scrubbing metadata command: {}",
error.message());
} else if (_scrubber) {
_scrubber->reset_scheduler();
}

co_return error;
}

ss::future<> ntp_archiver::upload_until_term_change() {
ss::lowres_clock::duration backoff = _conf->upload_loop_initial_backoff;

Expand Down
5 changes: 4 additions & 1 deletion src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "archival/archival_policy.h"
#include "archival/probe.h"
#include "archival/scrubber.h"
#include "archival/types.h"
#include "cloud_storage/cache_service.h"
#include "cloud_storage/fwd.h"
Expand Down Expand Up @@ -347,6 +348,8 @@ class ntp_archiver {
cloud_storage::scrub_status status,
cloud_storage::anomalies detected);

ss::future<std::error_code> reset_scrubbing_metadata();

private:
// Labels for contexts in which manifest uploads occur. Used for logging.
static constexpr const char* housekeeping_ctx_label = "housekeeping";
Expand Down Expand Up @@ -647,7 +650,7 @@ class ntp_archiver {
std::unique_ptr<housekeeping_job> _local_segment_merger;

// NTP level scrubbing job
std::unique_ptr<housekeeping_job> _scrubber;
std::unique_ptr<scrubber> _scrubber;

// The archival metadata stm has its own clean/dirty mechanism, but it
// is expensive to persistently mark it clean after each segment upload,
Expand Down
23 changes: 19 additions & 4 deletions src/v/archival/scrubber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ scrubber::scrubber(
cloud_storage::remote& remote,
features::feature_table& feature_table,
config::binding<bool> config_enabled,
config::binding<std::chrono::milliseconds> interval,
config::binding<std::chrono::milliseconds> partial_interval,
config::binding<std::chrono::milliseconds> full_interval,
config::binding<std::chrono::milliseconds> jitter)
: _root_rtc(_as)
, _logger(archival_log, _root_rtc, archiver.get_ntp().path())
Expand All @@ -30,8 +31,20 @@ scrubber::scrubber(
, _feature_table(feature_table)
, _detector{_archiver.get_bucket_name(), _archiver.get_ntp(), _archiver.get_revision_id(), _remote, _logger, _as}
, _scheduler(
[this] { return _archiver.manifest().last_partition_scrub(); },
std::move(interval),
[this] {
const auto at = _archiver.manifest().last_partition_scrub();
const auto offset = _archiver.manifest().last_scrubbed_offset();
cloud_storage::scrub_status status;
if (!offset && at != model::timestamp::missing()) {
status = cloud_storage::scrub_status::full;
} else {
status = cloud_storage::scrub_status::partial;
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
}

return std::make_tuple(at, status);
},
std::move(partial_interval),
std::move(full_interval),
std::move(jitter)) {
ssx::spawn_with_gate(_gate, [this] { return await_feature_enabled(); });
}
Expand Down Expand Up @@ -159,7 +172,7 @@ void scrubber::release() {
}

ss::future<> scrubber::stop() {
vlog(archival_log.info, "Stopping scrubber ({})...", _gate.get_count());
vlog(_logger.info, "Stopping scrubber ({})...", _gate.get_count());
_as.request_abort();
return _gate.close();
}
Expand Down Expand Up @@ -198,4 +211,6 @@ std::pair<bool, std::optional<ss::sstring>> scrubber::should_skip() const {
return {false, std::nullopt};
}

void scrubber::reset_scheduler() { _scheduler.pick_next_scrub_time(); }

} // namespace archival
6 changes: 4 additions & 2 deletions src/v/archival/scrubber.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class scrubber : public housekeeping_job {
cloud_storage::remote& remote,
features::feature_table& feature_table,
config::binding<bool> config_enabled,
config::binding<std::chrono::milliseconds> interval,
config::binding<std::chrono::milliseconds> partial_interval,
config::binding<std::chrono::milliseconds> full_interval,
config::binding<std::chrono::milliseconds> jitter);

ss::future<> await_feature_enabled();
Expand All @@ -68,7 +69,8 @@ class scrubber : public housekeeping_job {

std::pair<bool, std::optional<ss::sstring>> should_skip() const;

model::timestamp next_scrub_at() const;
// Reset the scheduler and pick a new time-point for the next scrub
void reset_scheduler();

private:
ss::abort_source _as;
Expand Down
55 changes: 39 additions & 16 deletions src/v/archival/scrubber_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include "cloud_storage/types.h"
#include "config/property.h"
#include "random/simple_time_jitter.h"

Expand Down Expand Up @@ -37,25 +38,41 @@ namespace archival {
template<typename Clock = std::chrono::system_clock>
class scrubber_scheduler {
public:
using last_scrub_type = std::function<model::timestamp()>;
using last_scrub_type = std::function<
std::tuple<model::timestamp, cloud_storage::scrub_status>()>;

scrubber_scheduler(
last_scrub_type get_last_scrub_time,
config::binding<std::chrono::milliseconds> interval,
config::binding<std::chrono::milliseconds> partial_scrub_interval,
config::binding<std::chrono::milliseconds> full_scrub_interval,
config::binding<std::chrono::milliseconds> jitter)
: _get_last_scrub_time(std::move(get_last_scrub_time))
, _interval(std::move(interval))
, _partial_scrub_interval(std::move(partial_scrub_interval))
, _full_scrub_interval(std::move(full_scrub_interval))
, _jitter(std::move(jitter))
, _jittery_timer(_interval(), _jitter()) {
_interval.watch([this]() {
_jittery_timer = simple_time_jitter<model::timestamp_clock>(
_interval(), _jitter());
, _jittery_partial_scrub_timer(_partial_scrub_interval(), _jitter())
, _jittery_full_scrub_timer(_full_scrub_interval(), _jitter()) {
_partial_scrub_interval.watch([this]() {
_jittery_partial_scrub_timer
= simple_time_jitter<model::timestamp_clock>(
_partial_scrub_interval(), _jitter());
pick_next_scrub_time();
});

_full_scrub_interval.watch([this]() {
_jittery_full_scrub_timer
= simple_time_jitter<model::timestamp_clock>(
_full_scrub_interval(), _jitter());
pick_next_scrub_time();
});

_jitter.watch([this]() {
_jittery_timer = simple_time_jitter<model::timestamp_clock>(
_interval(), _jitter());
_jittery_partial_scrub_timer
= simple_time_jitter<model::timestamp_clock>(
_partial_scrub_interval(), _jitter());
_jittery_full_scrub_timer
= simple_time_jitter<model::timestamp_clock>(
_full_scrub_interval(), _jitter());
pick_next_scrub_time();
});
}
Expand All @@ -70,18 +87,22 @@ class scrubber_scheduler {
}

void pick_next_scrub_time() {
const auto last_scrub_time = _get_last_scrub_time();
const auto first_scrub = last_scrub_time == model::timestamp::missing();
const auto [at, status] = _get_last_scrub_time();
const auto first_scrub = at == model::timestamp::missing();

auto& timer = status == cloud_storage::scrub_status::full
? _jittery_full_scrub_timer
: _jittery_partial_scrub_timer;

if (first_scrub) {
const auto now = Clock::now();
_next_scrub_at = model::to_timestamp(
now + _jittery_timer.next_jitter_duration());
now + timer.next_jitter_duration());
} else {
_next_scrub_at = model::timestamp{
last_scrub_time()
at()
+ std::chrono::duration_cast<std::chrono::milliseconds>(
_jittery_timer.next_duration())
timer.next_duration())
.count()};
}
}
Expand All @@ -101,10 +122,12 @@ class scrubber_scheduler {

private:
last_scrub_type _get_last_scrub_time;
config::binding<std::chrono::milliseconds> _interval;
config::binding<std::chrono::milliseconds> _partial_scrub_interval;
config::binding<std::chrono::milliseconds> _full_scrub_interval;
config::binding<std::chrono::milliseconds> _jitter;

simple_time_jitter<model::timestamp_clock> _jittery_timer;
simple_time_jitter<model::timestamp_clock> _jittery_partial_scrub_timer;
simple_time_jitter<model::timestamp_clock> _jittery_full_scrub_timer;
model::timestamp _next_scrub_at;
};

Expand Down
Loading
Loading