Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] cst/cache: tracker based trim #21502

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 108 additions & 49 deletions src/v/cloud_storage/access_time_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include <seastar/core/coroutine.hh>
#include <seastar/core/smp.hh>
#include <seastar/util/defer.hh>

#include <absl/container/btree_map.h>
#include <absl/container/btree_set.h>

#include <exception>
#include <ranges>
#include <variant>

namespace absl {
Expand Down Expand Up @@ -51,20 +53,22 @@ namespace cloud_storage {
// is defined by hand in the read/write methods so that it can be done
// with streaming.
struct table_header
: serde::envelope<table_header, serde::version<0>, serde::compat_version<0>> {
: serde::envelope<table_header, serde::version<1>, serde::compat_version<0>> {
size_t table_size{0};
tracker_version version{tracker_version::v1};

auto serde_fields() { return std::tie(table_size); }
auto serde_fields() { return std::tie(table_size, version); }
};

ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {
ss::future<> access_time_tracker::write(
ss::output_stream<char>& out, tracker_version version) {
// This lock protects us from the _table being mutated while we
// are iterating over it and yielding during the loop.
auto lock_guard = co_await ss::get_units(_table_lock, 1);

_dirty = false;

const table_header h{.table_size = _table.size()};
const table_header h{.table_size = _table.size(), .version = version};
iobuf header_buf;
serde::write(header_buf, h);
co_await write_iobuf_to_output_stream(std::move(header_buf), out);
Expand All @@ -74,11 +78,15 @@ ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {

size_t i = 0;
iobuf serialize_buf;
for (auto it : _table) {
serde::write(serialize_buf, it.first);
serde::write(serialize_buf, it.second);
for (const auto& [path, metadata] : _table) {
serde::write(serialize_buf, path);
serde::write(serialize_buf, metadata.atime_sec);
serde::write(serialize_buf, metadata.size);
++i;
if (i % chunk_count == 0 || i == _table.size()) {
iobuf chunk_size;
serde::write(chunk_size, serialize_buf.size_bytes());
co_await write_iobuf_to_output_stream(std::move(chunk_size), out);
for (const auto& f : serialize_buf) {
co_await out.write(f.get(), f.size());
}
Expand All @@ -91,7 +99,9 @@ ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {
}

bool access_time_tracker::should_track(std::string_view key) const {
if (key.ends_with(".tx") || key.ends_with(".index")) {
if (
key.ends_with(".tx") || key.ends_with(".index")
|| key.ends_with(cache_tmp_file_extension)) {
return false;
}

Expand Down Expand Up @@ -139,64 +149,81 @@ ss::future<> access_time_tracker::read(ss::input_stream<char>& in) {
auto tmp = co_await in.read_exactly(header_size);
header_buf.append(tmp.get(), tmp.size());
auto h_parser = iobuf_parser(std::move(header_buf));
table_header h = serde::read_nested<table_header>(h_parser, 0);
auto h = serde::read_nested<table_header>(h_parser, 0);

// How many items to consume per stream read()
constexpr size_t chunk_count = 2048;
auto defer = ss::defer([&] {
lock_guard.return_all();
// Drop writes accumulated while reading
_pending_upserts.clear();
});

// Skip loading data for older version
if (h.version == tracker_version::v1) {
co_return;
}

for (size_t i = 0; i < h.table_size; i += chunk_count) {
auto item_count = std::min(chunk_count, h.table_size - i);
auto tmp_buf = co_await in.read_exactly(item_count * table_item_size);
while (!in.eof()) {
auto chunk_sz_buf = co_await in.read_exactly(sizeof(size_t));
if (chunk_sz_buf.empty() && in.eof()) {
break;
}

iobuf chunk_sz;
chunk_sz.append(std::move(chunk_sz_buf));
auto chunk_sz_parser = iobuf_parser{std::move(chunk_sz)};
auto chunk_size = serde::read<size_t>(chunk_sz_parser);
auto tmp_buf = co_await in.read_exactly(chunk_size);
iobuf items_buf;
items_buf.append(std::move(tmp_buf));
auto parser = iobuf_parser(std::move(items_buf));
for (size_t j = 0; j < item_count; ++j) {
uint32_t hash = serde::read_nested<uint32_t>(parser, 0);
timestamp_t t = serde::read_nested<timestamp_t>(parser, 0);
_table.emplace(hash, t);
while (parser.bytes_left() > 0) {
auto path = serde::read_nested<ss::sstring>(parser, 0);
auto atime = serde::read_nested<uint32_t>(parser, 0);
auto size = serde::read_nested<uint64_t>(parser, 0);
_table.emplace(
path, file_metadata{.atime_sec = atime, .size = size});
}
}

lock_guard.return_all();
// Any writes while we were reading are dropped
_pending_upserts.clear();
vassert(
_table.size() == h.table_size,
"unexpected tracker size, loaded {} items, expected {} items",
_table.size(),
h.table_size);
}

void access_time_tracker::add_timestamp(
std::string_view key, std::chrono::system_clock::time_point ts) {
if (!should_track(key)) {
void access_time_tracker::add(
ss::sstring path, std::chrono::system_clock::time_point atime, size_t size) {
if (!should_track(path)) {
return;
}

uint32_t seconds = std::chrono::time_point_cast<std::chrono::seconds>(ts)
uint32_t seconds = std::chrono::time_point_cast<std::chrono::seconds>(atime)
.time_since_epoch()
.count();

uint32_t hash = xxhash_32(key.data(), key.size());

auto units = seastar::try_get_units(_table_lock, 1);
if (units.has_value()) {
// Got lock, update main table
_table[hash] = seconds;
_table[path] = {.atime_sec = seconds, .size = size};
_dirty = true;
} else {
// Locked during serialization, defer write
_pending_upserts[hash] = seconds;
_pending_upserts[path] = {.atime_sec = seconds, .size = size};
}
}

void access_time_tracker::remove_timestamp(std::string_view key) noexcept {
void access_time_tracker::remove(std::string_view key) noexcept {
try {
uint32_t hash = xxhash_32(key.data(), key.size());

ss::sstring k{key.data(), key.size()};
auto units = seastar::try_get_units(_table_lock, 1);
if (units.has_value()) {
// Unlocked, update main table
_table.erase(hash);
_table.erase(k);
_dirty = true;
} else {
// Locked during serialization, defer write
_pending_upserts[hash] = std::nullopt;
_pending_upserts[k] = std::nullopt;
}
} catch (...) {
vassert(
Expand All @@ -207,22 +234,42 @@ void access_time_tracker::remove_timestamp(std::string_view key) noexcept {
}
}

ss::future<>
access_time_tracker::trim(const fragmented_vector<file_list_item>& existent) {
absl::btree_set<uint32_t> existent_hashes;
ss::future<> access_time_tracker::sync(
const fragmented_vector<file_list_item>& existent,
add_entries_t add_entries) {
absl::btree_set<ss::sstring> paths;
for (const auto& i : existent) {
existent_hashes.insert(xxhash_32(i.path.data(), i.path.size()));
paths.insert(i.path);
}

auto lock_guard = co_await ss::get_units(_table_lock, 1);

table_t tmp;
for (auto it : _table) {
if (existent_hashes.contains(it.first)) {

for (const auto& it : _table) {
if (paths.contains(it.first)) {
tmp.insert(it);
}
co_await ss::maybe_yield();
}

if (add_entries) {
auto should_add = [this, &tmp](const auto& e) {
return should_track(e.path) && !tmp.contains(e.path);
};
for (const auto& entry : existent | std::views::filter(should_add)) {
_dirty = true;
tmp.insert(
{entry.path,
{static_cast<uint32_t>(
std::chrono::time_point_cast<std::chrono::seconds>(
entry.access_time)
.time_since_epoch()
.count()),
entry.size}});
}
}

if (_table.size() != tmp.size()) {
// We dropped one or more entries, therefore mutated the table.
_dirty = true;
Expand All @@ -233,18 +280,30 @@ access_time_tracker::trim(const fragmented_vector<file_list_item>& existent) {
on_released_table_lock();
}

std::optional<std::chrono::system_clock::time_point>
access_time_tracker::estimate_timestamp(std::string_view key) const {
uint32_t hash = xxhash_32(key.data(), key.size());
auto it = _table.find(hash);
if (it == _table.end()) {
return std::nullopt;
std::optional<file_metadata>
access_time_tracker::get(const std::string& key) const {
if (auto it = _table.find(key); it != _table.end()) {
return it->second;
}
auto seconds = std::chrono::seconds(it->second);
std::chrono::system_clock::time_point ts(seconds);
return ts;
return std::nullopt;
}

bool access_time_tracker::is_dirty() const { return _dirty; }

fragmented_vector<file_list_item> access_time_tracker::lru_entries() const {
fragmented_vector<file_list_item> items;
items.reserve(_table.size());
for (const auto& [path, metadata] : _table) {
items.emplace_back(metadata.time_point(), path, metadata.size);
}
std::ranges::sort(
items, {}, [](const auto& item) { return item.access_time; });
return items;
}

std::chrono::system_clock::time_point file_metadata::time_point() const {
return std::chrono::system_clock::time_point{
std::chrono::seconds{atime_sec}};
}

} // namespace cloud_storage
65 changes: 34 additions & 31 deletions src/v/cloud_storage/access_time_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,53 @@

namespace cloud_storage {

/// Access time tracker maintains map from filename hash to
/// the timestamp that represents the time when the file was
/// accessed last.
///
/// It is possible to have conflicts. In case of conflict
/// 'add_timestamp' method will overwrite another key. For that
/// key we will observe larger access time. When one of the
/// conflicted entries will be deleted another will be deleted
/// as well. This is OK because the code in the
/// 'cloud_storage/cache_service' is ready for that.
enum class tracker_version : uint8_t { v1, v2 };

struct file_metadata {
uint32_t atime_sec;
uint64_t size;
std::chrono::system_clock::time_point time_point() const;
};

/// Access time tracker maps cache entry file paths to their last accessed
/// timestamp and file size.
class access_time_tracker {
using timestamp_t = uint32_t;
using table_t = absl::btree_map<uint32_t, timestamp_t>;

// Serialized size of each pair in table_t
static constexpr size_t table_item_size = 8;
using table_t = absl::btree_map<ss::sstring, file_metadata>;

public:
/// Add access time to the container.
void add_timestamp(
std::string_view key, std::chrono::system_clock::time_point ts);
/// Add metadata to the container.
void add(
ss::sstring path,
std::chrono::system_clock::time_point atime,
size_t size);

/// Remove key from the container.
void remove_timestamp(std::string_view) noexcept;
void remove(std::string_view) noexcept;

/// Return access time estimate (it can differ if there is a conflict
/// on file name hash).
std::optional<std::chrono::system_clock::time_point>
estimate_timestamp(std::string_view key) const;
/// Return file metadata for key.
std::optional<file_metadata> get(const std::string& key) const;

ss::future<> write(ss::output_stream<char>&);
ss::future<> write(
ss::output_stream<char>&, tracker_version version = tracker_version::v2);
ss::future<> read(ss::input_stream<char>&);

/// Returns true if tracker has new data which wasn't serialized
/// to disk.
bool is_dirty() const;

/// Remove every key which isn't present in list of existing files
ss::future<> trim(const fragmented_vector<file_list_item>&);
using add_entries_t = ss::bool_class<struct trim_additive_tag>;
/// Remove every key which isn't present in list of input files
ss::future<> sync(
const fragmented_vector<file_list_item>&,
add_entries_t add_entries = add_entries_t::no);

size_t size() const { return _table.size(); }

fragmented_vector<file_list_item> lru_entries() const;

private:
/// Returns true if the key's access time should be tracked.
/// Returns true if the key's metadata should be tracked.
/// We do not wish to track index files and transaction manifests
/// as they are just an appendage to segment/chunk files and are
/// purged along with them.
Expand All @@ -79,17 +82,17 @@ class access_time_tracker {
/// Drain _pending_upserts for any writes made while table lock was held
void on_released_table_lock();

absl::btree_map<uint32_t, timestamp_t> _table;
table_t _table;

// Lock taken during async loops over the table (ser/de and trim())
// modifications may proceed without the lock if it is not taken.
// When releasing lock, drain _pending_upserts.
ss::semaphore _table_lock{1};

// Calls into add_timestamp/remove_timestamp populate this
// if the _serialization_lock is unavailable. The serialization code is
// responsible for draining it upon releasing the lock.
absl::btree_map<uint32_t, std::optional<timestamp_t>> _pending_upserts;
// Calls into add/remove populate this if the _serialization_lock is
// unavailable. The serialization code is responsible for draining it upon
// releasing the lock.
absl::btree_map<ss::sstring, std::optional<file_metadata>> _pending_upserts;

bool _dirty{false};
};
Expand Down
Loading
Loading