Skip to content

Commit

Permalink
cst/cache: Add file size to access time tracker
Browse files Browse the repository at this point in the history
* The access time tracker now also tracks file sizes.
* The key to the tracker map is now the full file path instead of a hash
* The serialized form of tracker now contains a version to distinguish
  between the old and new data structures.
* The recursive directory walker uses tracker entries to avoid file stat
  operations.
  • Loading branch information
abhijat committed Jun 12, 2024
1 parent 229c8dd commit d76659a
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 110 deletions.
116 changes: 71 additions & 45 deletions src/v/cloud_storage/access_time_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <seastar/core/coroutine.hh>
#include <seastar/core/smp.hh>
#include <seastar/util/defer.hh>

#include <absl/container/btree_map.h>
#include <absl/container/btree_set.h>
Expand Down Expand Up @@ -51,20 +52,22 @@ namespace cloud_storage {
// is defined by hand in the read/write methods so that it can be done
// with streaming.
struct table_header
: serde::envelope<table_header, serde::version<0>, serde::compat_version<0>> {
: serde::envelope<table_header, serde::version<1>, serde::compat_version<0>> {
size_t table_size{0};
tracker_version version{tracker_version::v1};

auto serde_fields() { return std::tie(table_size); }
auto serde_fields() { return std::tie(table_size, version); }
};

ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {
ss::future<> access_time_tracker::write(
ss::output_stream<char>& out, tracker_version version) {
// This lock protects us from the _table being mutated while we
// are iterating over it and yielding during the loop.
auto lock_guard = co_await ss::get_units(_table_lock, 1);

_dirty = false;

const table_header h{.table_size = _table.size()};
const table_header h{.table_size = _table.size(), .version = version};
iobuf header_buf;
serde::write(header_buf, h);
co_await write_iobuf_to_output_stream(std::move(header_buf), out);
Expand All @@ -74,11 +77,15 @@ ss::future<> access_time_tracker::write(ss::output_stream<char>& out) {

size_t i = 0;
iobuf serialize_buf;
for (auto it : _table) {
serde::write(serialize_buf, it.first);
serde::write(serialize_buf, it.second);
for (const auto& [path, metadata] : _table) {
serde::write(serialize_buf, path);
serde::write(serialize_buf, metadata.atime_sec);
serde::write(serialize_buf, metadata.size);
++i;
if (i % chunk_count == 0 || i == _table.size()) {
iobuf chunk_size;
serde::write(chunk_size, serialize_buf.size_bytes());
co_await write_iobuf_to_output_stream(std::move(chunk_size), out);
for (const auto& f : serialize_buf) {
co_await out.write(f.get(), f.size());
}
Expand Down Expand Up @@ -139,64 +146,81 @@ ss::future<> access_time_tracker::read(ss::input_stream<char>& in) {
auto tmp = co_await in.read_exactly(header_size);
header_buf.append(tmp.get(), tmp.size());
auto h_parser = iobuf_parser(std::move(header_buf));
table_header h = serde::read_nested<table_header>(h_parser, 0);
auto h = serde::read_nested<table_header>(h_parser, 0);

// How many items to consume per stream read()
constexpr size_t chunk_count = 2048;
auto defer = ss::defer([&] {
lock_guard.return_all();
// Drop writes accumulated while reading
_pending_upserts.clear();
});

// Skip loading data for older version
if (h.version == tracker_version::v1) {
co_return;
}

for (size_t i = 0; i < h.table_size; i += chunk_count) {
auto item_count = std::min(chunk_count, h.table_size - i);
auto tmp_buf = co_await in.read_exactly(item_count * table_item_size);
while (!in.eof()) {
auto chunk_sz_buf = co_await in.read_exactly(sizeof(size_t));
if (chunk_sz_buf.empty() && in.eof()) {
break;
}

iobuf chunk_sz;
chunk_sz.append(std::move(chunk_sz_buf));
auto chunk_sz_parser = iobuf_parser{std::move(chunk_sz)};
auto chunk_size = serde::read<size_t>(chunk_sz_parser);
auto tmp_buf = co_await in.read_exactly(chunk_size);
iobuf items_buf;
items_buf.append(std::move(tmp_buf));
auto parser = iobuf_parser(std::move(items_buf));
for (size_t j = 0; j < item_count; ++j) {
uint32_t hash = serde::read_nested<uint32_t>(parser, 0);
timestamp_t t = serde::read_nested<timestamp_t>(parser, 0);
_table.emplace(hash, t);
while (parser.bytes_left() > 0) {
auto path = serde::read_nested<ss::sstring>(parser, 0);
auto atime = serde::read_nested<uint32_t>(parser, 0);
auto size = serde::read_nested<uint64_t>(parser, 0);
_table.emplace(
path, file_metadata{.atime_sec = atime, .size = size});
}
}

lock_guard.return_all();
// Any writes while we were reading are dropped
_pending_upserts.clear();
vassert(
_table.size() == h.table_size,
"unexpected tracker size, loaded {} items, expected {} items",
_table.size(),
h.table_size);
}

void access_time_tracker::add_timestamp(
std::string_view key, std::chrono::system_clock::time_point ts) {
if (!should_track(key)) {
void access_time_tracker::add(
ss::sstring path, std::chrono::system_clock::time_point atime, size_t size) {
if (!should_track(path)) {
return;
}

uint32_t seconds = std::chrono::time_point_cast<std::chrono::seconds>(ts)
uint32_t seconds = std::chrono::time_point_cast<std::chrono::seconds>(atime)
.time_since_epoch()
.count();

uint32_t hash = xxhash_32(key.data(), key.size());

auto units = seastar::try_get_units(_table_lock, 1);
if (units.has_value()) {
// Got lock, update main table
_table[hash] = seconds;
_table[path] = {.atime_sec = seconds, .size = size};
_dirty = true;
} else {
// Locked during serialization, defer write
_pending_upserts[hash] = seconds;
_pending_upserts[path] = {.atime_sec = seconds, .size = size};
}
}

void access_time_tracker::remove_timestamp(std::string_view key) noexcept {
void access_time_tracker::remove(std::string_view key) noexcept {
try {
uint32_t hash = xxhash_32(key.data(), key.size());

ss::sstring k{key.data(), key.size()};
auto units = seastar::try_get_units(_table_lock, 1);
if (units.has_value()) {
// Unlocked, update main table
_table.erase(hash);
_table.erase(k);
_dirty = true;
} else {
// Locked during serialization, defer write
_pending_upserts[hash] = std::nullopt;
_pending_upserts[k] = std::nullopt;
}
} catch (...) {
vassert(
Expand All @@ -209,15 +233,15 @@ void access_time_tracker::remove_timestamp(std::string_view key) noexcept {

ss::future<>
access_time_tracker::trim(const fragmented_vector<file_list_item>& existent) {
absl::btree_set<uint32_t> existent_hashes;
absl::btree_set<ss::sstring> existent_hashes;
for (const auto& i : existent) {
existent_hashes.insert(xxhash_32(i.path.data(), i.path.size()));
existent_hashes.insert(i.path);
}

auto lock_guard = co_await ss::get_units(_table_lock, 1);

table_t tmp;
for (auto it : _table) {
for (const auto& it : _table) {
if (existent_hashes.contains(it.first)) {
tmp.insert(it);
}
Expand All @@ -233,18 +257,20 @@ access_time_tracker::trim(const fragmented_vector<file_list_item>& existent) {
on_released_table_lock();
}

std::optional<std::chrono::system_clock::time_point>
access_time_tracker::estimate_timestamp(std::string_view key) const {
uint32_t hash = xxhash_32(key.data(), key.size());
auto it = _table.find(hash);
if (it == _table.end()) {
return std::nullopt;
std::optional<file_metadata>
access_time_tracker::get(const std::string& key) const {
if (auto it = _table.find(key); it != _table.end()) {
return it->second;
}
auto seconds = std::chrono::seconds(it->second);
std::chrono::system_clock::time_point ts(seconds);
return ts;
return std::nullopt;
}

bool access_time_tracker::is_dirty() const { return _dirty; }


std::chrono::system_clock::time_point file_metadata::time_point() const {
return std::chrono::system_clock::time_point{
std::chrono::seconds{atime_sec}};
}

} // namespace cloud_storage
58 changes: 28 additions & 30 deletions src/v/cloud_storage/access_time_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,48 @@

namespace cloud_storage {

/// Access time tracker maintains map from filename hash to
/// the timestamp that represents the time when the file was
/// accessed last.
///
/// It is possible to have conflicts. In case of conflict
/// 'add_timestamp' method will overwrite another key. For that
/// key we will observe larger access time. When one of the
/// conflicted entries will be deleted another will be deleted
/// as well. This is OK because the code in the
/// 'cloud_storage/cache_service' is ready for that.
enum class tracker_version : uint8_t { v1, v2 };

struct file_metadata {
uint32_t atime_sec;
uint64_t size;
std::chrono::system_clock::time_point time_point() const;
};

/// Access time tracker maps cache entry file paths to their last accessed
/// timestamp and file size.
class access_time_tracker {
using timestamp_t = uint32_t;
using table_t = absl::btree_map<uint32_t, timestamp_t>;

// Serialized size of each pair in table_t
static constexpr size_t table_item_size = 8;
using table_t = absl::btree_map<ss::sstring, file_metadata>;

public:
/// Add access time to the container.
void add_timestamp(
std::string_view key, std::chrono::system_clock::time_point ts);
/// Add metadata to the container.
void add(
ss::sstring path,
std::chrono::system_clock::time_point atime,
size_t size);

/// Remove key from the container.
void remove_timestamp(std::string_view) noexcept;
void remove(std::string_view) noexcept;

/// Return access time estimate (it can differ if there is a conflict
/// on file name hash).
std::optional<std::chrono::system_clock::time_point>
estimate_timestamp(std::string_view key) const;
/// Return file metadata for key.
std::optional<file_metadata> get(const std::string& key) const;

ss::future<> write(ss::output_stream<char>&);
ss::future<> write(
ss::output_stream<char>&, tracker_version version = tracker_version::v2);
ss::future<> read(ss::input_stream<char>&);

/// Returns true if tracker has new data which wasn't serialized
/// to disk.
bool is_dirty() const;

/// Remove every key which isn't present in list of existing files
/// Remove every key which isn't present in list of input files
ss::future<> trim(const fragmented_vector<file_list_item>&);

size_t size() const { return _table.size(); }

private:
/// Returns true if the key's access time should be tracked.
/// Returns true if the key's metadata should be tracked.
/// We do not wish to track index files and transaction manifests
/// as they are just an appendage to segment/chunk files and are
/// purged along with them.
Expand All @@ -79,17 +77,17 @@ class access_time_tracker {
/// Drain _pending_upserts for any writes made while table lock was held
void on_released_table_lock();

absl::btree_map<uint32_t, timestamp_t> _table;
table_t _table;

// Lock taken during async loops over the table (ser/de and trim())
// modifications may proceed without the lock if it is not taken.
// When releasing lock, drain _pending_upserts.
ss::semaphore _table_lock{1};

// Calls into add_timestamp/remove_timestamp populate this
// if the _serialization_lock is unavailable. The serialization code is
// responsible for draining it upon releasing the lock.
absl::btree_map<uint32_t, std::optional<timestamp_t>> _pending_upserts;
// Calls into add/remove populate this if the _serialization_lock is
// unavailable. The serialization code is responsible for draining it upon
// releasing the lock.
absl::btree_map<ss::sstring, std::optional<file_metadata>> _pending_upserts;

bool _dirty{false};
};
Expand Down
32 changes: 17 additions & 15 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ cache::remove_segment_full(const file_list_item& file_stat) {

// Remove key if possible to make sure there is no resource
// leak
_access_time_tracker.remove_timestamp(std::string_view(file_stat.path));
_access_time_tracker.remove(file_stat.path);

vlog(
cst_log.trace,
Expand Down Expand Up @@ -773,8 +773,7 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) {
// exhaustive trim because they are occupying too much space.
try {
co_await delete_file_and_empty_parents(file_stat.path);
_access_time_tracker.remove_timestamp(
std::string_view(file_stat.path));
_access_time_tracker.remove(file_stat.path);

_current_cache_size -= std::min(
file_stat.size, _current_cache_size);
Expand Down Expand Up @@ -1053,19 +1052,22 @@ ss::future<std::optional<cache_item>> cache::_get(std::filesystem::path key) {
vlog(cst_log.debug, "Trying to get {} from archival cache.", key.native());
probe.get();
ss::file cache_file;

size_t data_size{0};
try {
auto source = (_cache_dir / key).native();
cache_file = co_await ss::open_file_dma(source, ss::open_flags::ro);
data_size = co_await cache_file.size();

// Bump access time of the file
if (ss::this_shard_id() == 0) {
_access_time_tracker.add_timestamp(
source, std::chrono::system_clock::now());
_access_time_tracker.add(
source, std::chrono::system_clock::now(), data_size);
} else {
ssx::spawn_with_gate(_gate, [this, source] {
return container().invoke_on(0, [source](cache& c) {
c._access_time_tracker.add_timestamp(
source, std::chrono::system_clock::now());
ssx::spawn_with_gate(_gate, [this, source, data_size] {
return container().invoke_on(0, [source, data_size](cache& c) {
c._access_time_tracker.add(
source, std::chrono::system_clock::now(), data_size);
});
});
}
Expand All @@ -1078,7 +1080,6 @@ ss::future<std::optional<cache_item>> cache::_get(std::filesystem::path key) {
}
}

auto data_size = co_await cache_file.size();
probe.cached_get();
co_return std::optional(cache_item{std::move(cache_file), data_size});
}
Expand Down Expand Up @@ -1265,7 +1266,7 @@ ss::future<> cache::_invalidate(const std::filesystem::path& key) {
try {
auto path = (_cache_dir / key).native();
auto stat = co_await ss::file_stat(path);
_access_time_tracker.remove_timestamp(key.native());
_access_time_tracker.remove(key.native());
co_await delete_file_and_empty_parents(path);
_current_cache_size -= stat.size;
_current_cache_objects -= 1;
Expand Down Expand Up @@ -1473,15 +1474,16 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) {
auto rel_path = _cache_dir
/ std::filesystem::relative(
std::filesystem::path(file_stat.path), _cache_dir);
auto estimate = _access_time_tracker.estimate_timestamp(
rel_path.native());
if (estimate != file_stat.access_time) {

if (auto estimate = _access_time_tracker.get(rel_path.native());
estimate.has_value()
&& estimate->time_point() != file_stat.access_time) {
vlog(
cst_log.trace,
"carryover file {} was accessed ({}) since the last trim ({}), "
"ignoring",
rel_path.native(),
estimate->time_since_epoch().count(),
estimate->atime_sec,
file_stat.access_time.time_since_epoch().count());
// The file was accessed since we get the stats
continue;
Expand Down
Loading

0 comments on commit d76659a

Please sign in to comment.