Skip to content

Commit

Permalink
Merge pull request #21502 from vbotbuildovich/backport-pr-18786-v24.1…
Browse files Browse the repository at this point in the history
….x-370

[v24.1.x] cst/cache: tracker based trim
  • Loading branch information
piyushredpanda authored Jul 18, 2024
2 parents 94f5424 + 11b5fef commit bae012b
Show file tree
Hide file tree
Showing 13 changed files with 597 additions and 193 deletions.
157 changes: 108 additions & 49 deletions src/v/cloud_storage/access_time_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include <seastar/core/coroutine.hh>
#include <seastar/core/smp.hh>
#include <seastar/util/defer.hh>

#include <absl/container/btree_map.h>
#include <absl/container/btree_set.h>

#include <exception>
#include <ranges>
#include <variant>

namespace absl {
Expand Down Expand Up @@ -51,20 +53,22 @@ namespace cloud_storage {
// is defined by hand in the read/write methods so that it can be done
// with streaming.
struct table_header
: serde::envelope<table_header, serde::version<0>, serde::compat_version<0>> {
: serde::envelope<table_header, serde::version<1>, serde::compat_version<0>> {
size_t table_size{0};
tracker_version version{tracker_version::v1};

auto serde_fields() { return std::tie(table_size); }
auto serde_fields() { return std::tie(table_size, version); }
};

ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {
ss::future<> access_time_tracker::write(
ss::output_stream<char>& out, tracker_version version) {
// This lock protects us from the _table being mutated while we
// are iterating over it and yielding during the loop.
auto lock_guard = co_await ss::get_units(_table_lock, 1);

_dirty = false;

const table_header h{.table_size = _table.size()};
const table_header h{.table_size = _table.size(), .version = version};
iobuf header_buf;
serde::write(header_buf, h);
co_await write_iobuf_to_output_stream(std::move(header_buf), out);
Expand All @@ -74,11 +78,15 @@ ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {

size_t i = 0;
iobuf serialize_buf;
for (auto it : _table) {
serde::write(serialize_buf, it.first);
serde::write(serialize_buf, it.second);
for (const auto& [path, metadata] : _table) {
serde::write(serialize_buf, path);
serde::write(serialize_buf, metadata.atime_sec);
serde::write(serialize_buf, metadata.size);
++i;
if (i % chunk_count == 0 || i == _table.size()) {
iobuf chunk_size;
serde::write(chunk_size, serialize_buf.size_bytes());
co_await write_iobuf_to_output_stream(std::move(chunk_size), out);
for (const auto& f : serialize_buf) {
co_await out.write(f.get(), f.size());
}
Expand All @@ -91,7 +99,9 @@ ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {
}

bool access_time_tracker::should_track(std::string_view key) const {
if (key.ends_with(".tx") || key.ends_with(".index")) {
if (
key.ends_with(".tx") || key.ends_with(".index")
|| key.ends_with(cache_tmp_file_extension)) {
return false;
}

Expand Down Expand Up @@ -139,64 +149,81 @@ ss::future<> access_time_tracker::read(ss::input_stream<char>& in) {
auto tmp = co_await in.read_exactly(header_size);
header_buf.append(tmp.get(), tmp.size());
auto h_parser = iobuf_parser(std::move(header_buf));
table_header h = serde::read_nested<table_header>(h_parser, 0);
auto h = serde::read_nested<table_header>(h_parser, 0);

// How many items to consume per stream read()
constexpr size_t chunk_count = 2048;
auto defer = ss::defer([&] {
lock_guard.return_all();
// Drop writes accumulated while reading
_pending_upserts.clear();
});

// Skip loading data for older version
if (h.version == tracker_version::v1) {
co_return;
}

for (size_t i = 0; i < h.table_size; i += chunk_count) {
auto item_count = std::min(chunk_count, h.table_size - i);
auto tmp_buf = co_await in.read_exactly(item_count * table_item_size);
while (!in.eof()) {
auto chunk_sz_buf = co_await in.read_exactly(sizeof(size_t));
if (chunk_sz_buf.empty() && in.eof()) {
break;
}

iobuf chunk_sz;
chunk_sz.append(std::move(chunk_sz_buf));
auto chunk_sz_parser = iobuf_parser{std::move(chunk_sz)};
auto chunk_size = serde::read<size_t>(chunk_sz_parser);
auto tmp_buf = co_await in.read_exactly(chunk_size);
iobuf items_buf;
items_buf.append(std::move(tmp_buf));
auto parser = iobuf_parser(std::move(items_buf));
for (size_t j = 0; j < item_count; ++j) {
uint32_t hash = serde::read_nested<uint32_t>(parser, 0);
timestamp_t t = serde::read_nested<timestamp_t>(parser, 0);
_table.emplace(hash, t);
while (parser.bytes_left() > 0) {
auto path = serde::read_nested<ss::sstring>(parser, 0);
auto atime = serde::read_nested<uint32_t>(parser, 0);
auto size = serde::read_nested<uint64_t>(parser, 0);
_table.emplace(
path, file_metadata{.atime_sec = atime, .size = size});
}
}

lock_guard.return_all();
// Any writes while we were reading are dropped
_pending_upserts.clear();
vassert(
_table.size() == h.table_size,
"unexpected tracker size, loaded {} items, expected {} items",
_table.size(),
h.table_size);
}

void access_time_tracker::add_timestamp(
std::string_view key, std::chrono::system_clock::time_point ts) {
if (!should_track(key)) {
void access_time_tracker::add(
ss::sstring path, std::chrono::system_clock::time_point atime, size_t size) {
if (!should_track(path)) {
return;
}

uint32_t seconds = std::chrono::time_point_cast<std::chrono::seconds>(ts)
uint32_t seconds = std::chrono::time_point_cast<std::chrono::seconds>(atime)
.time_since_epoch()
.count();

uint32_t hash = xxhash_32(key.data(), key.size());

auto units = seastar::try_get_units(_table_lock, 1);
if (units.has_value()) {
// Got lock, update main table
_table[hash] = seconds;
_table[path] = {.atime_sec = seconds, .size = size};
_dirty = true;
} else {
// Locked during serialization, defer write
_pending_upserts[hash] = seconds;
_pending_upserts[path] = {.atime_sec = seconds, .size = size};
}
}

void access_time_tracker::remove_timestamp(std::string_view key) noexcept {
void access_time_tracker::remove(std::string_view key) noexcept {
try {
uint32_t hash = xxhash_32(key.data(), key.size());

ss::sstring k{key.data(), key.size()};
auto units = seastar::try_get_units(_table_lock, 1);
if (units.has_value()) {
// Unlocked, update main table
_table.erase(hash);
_table.erase(k);
_dirty = true;
} else {
// Locked during serialization, defer write
_pending_upserts[hash] = std::nullopt;
_pending_upserts[k] = std::nullopt;
}
} catch (...) {
vassert(
Expand All @@ -207,22 +234,42 @@ void access_time_tracker::remove_timestamp(std::string_view key) noexcept {
}
}

ss::future<>
access_time_tracker::trim(const fragmented_vector<file_list_item>& existent) {
absl::btree_set<uint32_t> existent_hashes;
ss::future<> access_time_tracker::sync(
const fragmented_vector<file_list_item>& existent,
add_entries_t add_entries) {
absl::btree_set<ss::sstring> paths;
for (const auto& i : existent) {
existent_hashes.insert(xxhash_32(i.path.data(), i.path.size()));
paths.insert(i.path);
}

auto lock_guard = co_await ss::get_units(_table_lock, 1);

table_t tmp;
for (auto it : _table) {
if (existent_hashes.contains(it.first)) {

for (const auto& it : _table) {
if (paths.contains(it.first)) {
tmp.insert(it);
}
co_await ss::maybe_yield();
}

if (add_entries) {
auto should_add = [this, &tmp](const auto& e) {
return should_track(e.path) && !tmp.contains(e.path);
};
for (const auto& entry : existent | std::views::filter(should_add)) {
_dirty = true;
tmp.insert(
{entry.path,
{static_cast<uint32_t>(
std::chrono::time_point_cast<std::chrono::seconds>(
entry.access_time)
.time_since_epoch()
.count()),
entry.size}});
}
}

if (_table.size() != tmp.size()) {
// We dropped one or more entries, therefore mutated the table.
_dirty = true;
Expand All @@ -233,18 +280,30 @@ access_time_tracker::trim(const fragmented_vector<file_list_item>& existent) {
on_released_table_lock();
}

std::optional<std::chrono::system_clock::time_point>
access_time_tracker::estimate_timestamp(std::string_view key) const {
uint32_t hash = xxhash_32(key.data(), key.size());
auto it = _table.find(hash);
if (it == _table.end()) {
return std::nullopt;
std::optional<file_metadata>
access_time_tracker::get(const std::string& key) const {
if (auto it = _table.find(key); it != _table.end()) {
return it->second;
}
auto seconds = std::chrono::seconds(it->second);
std::chrono::system_clock::time_point ts(seconds);
return ts;
return std::nullopt;
}

bool access_time_tracker::is_dirty() const { return _dirty; }

fragmented_vector<file_list_item> access_time_tracker::lru_entries() const {
fragmented_vector<file_list_item> items;
items.reserve(_table.size());
for (const auto& [path, metadata] : _table) {
items.emplace_back(metadata.time_point(), path, metadata.size);
}
std::ranges::sort(
items, {}, [](const auto& item) { return item.access_time; });
return items;
}

std::chrono::system_clock::time_point file_metadata::time_point() const {
return std::chrono::system_clock::time_point{
std::chrono::seconds{atime_sec}};
}

} // namespace cloud_storage
65 changes: 34 additions & 31 deletions src/v/cloud_storage/access_time_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,53 @@

namespace cloud_storage {

/// Access time tracker maintains map from filename hash to
/// the timestamp that represents the time when the file was
/// accessed last.
///
/// It is possible to have conflicts. In case of conflict
/// 'add_timestamp' method will overwrite another key. For that
/// key we will observe larger access time. When one of the
/// conflicted entries will be deleted another will be deleted
/// as well. This is OK because the code in the
/// 'cloud_storage/cache_service' is ready for that.
enum class tracker_version : uint8_t { v1, v2 };

struct file_metadata {
uint32_t atime_sec;
uint64_t size;
std::chrono::system_clock::time_point time_point() const;
};

/// Access time tracker maps cache entry file paths to their last accessed
/// timestamp and file size.
class access_time_tracker {
using timestamp_t = uint32_t;
using table_t = absl::btree_map<uint32_t, timestamp_t>;

// Serialized size of each pair in table_t
static constexpr size_t table_item_size = 8;
using table_t = absl::btree_map<ss::sstring, file_metadata>;

public:
/// Add access time to the container.
void add_timestamp(
std::string_view key, std::chrono::system_clock::time_point ts);
/// Add metadata to the container.
void add(
ss::sstring path,
std::chrono::system_clock::time_point atime,
size_t size);

/// Remove key from the container.
void remove_timestamp(std::string_view) noexcept;
void remove(std::string_view) noexcept;

/// Return access time estimate (it can differ if there is a conflict
/// on file name hash).
std::optional<std::chrono::system_clock::time_point>
estimate_timestamp(std::string_view key) const;
/// Return file metadata for key.
std::optional<file_metadata> get(const std::string& key) const;

ss::future<> write(ss::output_stream<char>&);
ss::future<> write(
ss::output_stream<char>&, tracker_version version = tracker_version::v2);
ss::future<> read(ss::input_stream<char>&);

/// Returns true if tracker has new data which wasn't serialized
/// to disk.
bool is_dirty() const;

/// Remove every key which isn't present in list of existing files
ss::future<> trim(const fragmented_vector<file_list_item>&);
using add_entries_t = ss::bool_class<struct trim_additive_tag>;
/// Remove every key which isn't present in list of input files
ss::future<> sync(
const fragmented_vector<file_list_item>&,
add_entries_t add_entries = add_entries_t::no);

size_t size() const { return _table.size(); }

fragmented_vector<file_list_item> lru_entries() const;

private:
/// Returns true if the key's access time should be tracked.
/// Returns true if the key's metadata should be tracked.
/// We do not wish to track index files and transaction manifests
/// as they are just an appendage to segment/chunk files and are
/// purged along with them.
Expand All @@ -79,17 +82,17 @@ class access_time_tracker {
/// Drain _pending_upserts for any writes made while table lock was held
void on_released_table_lock();

absl::btree_map<uint32_t, timestamp_t> _table;
table_t _table;

// Lock taken during async loops over the table (ser/de and trim())
// modifications may proceed without the lock if it is not taken.
// When releasing lock, drain _pending_upserts.
ss::semaphore _table_lock{1};

// Calls into add_timestamp/remove_timestamp populate this
// if the _serialization_lock is unavailable. The serialization code is
// responsible for draining it upon releasing the lock.
absl::btree_map<uint32_t, std::optional<timestamp_t>> _pending_upserts;
// Calls into add/remove populate this if the _serialization_lock is
// unavailable. The serialization code is responsible for draining it upon
// releasing the lock.
absl::btree_map<ss::sstring, std::optional<file_metadata>> _pending_upserts;

bool _dirty{false};
};
Expand Down
Loading

0 comments on commit bae012b

Please sign in to comment.