diff --git a/.daily_canary b/.daily_canary index 899c44f0baf3..b0c8ecafff33 100644 --- a/.daily_canary +++ b/.daily_canary @@ -3,4 +3,4 @@ ( V ) / . \ | +---=---' /--x-m- /--n-n---xXx--/--yY------>>>----<<<>>]]{{}}---||-/\---.. 2024__ -!..!! \ No newline at end of file +!..! diff --git a/doc/build_apps/api.rst b/doc/build_apps/api.rst index ea18460c7044..7b944c5ede28 100644 --- a/doc/build_apps/api.rst +++ b/doc/build_apps/api.rst @@ -125,7 +125,7 @@ Historical Queries .. doxygenclass:: ccf::historical::AbstractStateCache :project: CCF - :members: set_default_expiry_duration, get_state_at, get_store_at, get_store_range, drop_cached_states + :members: set_default_expiry_duration, set_soft_cache_limit, get_state_at, get_store_at, get_store_range, drop_cached_states .. doxygenstruct:: ccf::historical::State :project: CCF diff --git a/include/ccf/historical_queries_interface.h b/include/ccf/historical_queries_interface.h index f94516d81aa8..f4bc6ba9f4d4 100644 --- a/include/ccf/historical_queries_interface.h +++ b/include/ccf/historical_queries_interface.h @@ -49,6 +49,8 @@ namespace ccf::historical using ExpiryDuration = std::chrono::seconds; + using CacheSize = size_t; + /** Stores the progress of historical query requests. * * A request will generally need to be made multiple times (with the same @@ -79,6 +81,13 @@ namespace ccf::historical virtual void set_default_expiry_duration( ExpiryDuration seconds_until_expiry) = 0; + /** Set the cache limit (in bytes) to evict least recently used requests + * from the cache after its size grows beyond this limit. The limit is not + * strict. It is estimated based on serialized states' sizes approximation + * and is checked once per tick, and so it can overflow for a short time. + */ + virtual void set_soft_cache_limit(CacheSize cache_limit) = 0; + /** EXPERIMENTAL: Set the tracking of deletes on missing keys for historical * queries. * diff --git a/samples/apps/logging/logging.cpp b/samples/apps/logging/logging.cpp index 9ab2807b79b0..59081fdd90d6 100644 --- a/samples/apps/logging/logging.cpp +++ b/samples/apps/logging/logging.cpp @@ -460,6 +460,12 @@ namespace loggingapp PUBLIC_RECORDS, context, 10000, 20); context.get_indexing_strategies().install_strategy(index_per_public_key); + // According to manual obvervation it's enough to start evicting old + // requests on historical perf test, but not too small to get stuck + // because of a single request being larget than the cache. + constexpr size_t cache_limit = 1024 * 1024 * 10; // MB + context.get_historical_state().set_soft_cache_limit(cache_limit); + const ccf::AuthnPolicies auth_policies = { ccf::jwt_auth_policy, ccf::user_cert_auth_policy, diff --git a/src/node/historical_queries.h b/src/node/historical_queries.h index 0eec326355f0..46599b5123fe 100644 --- a/src/node/historical_queries.h +++ b/src/node/historical_queries.h @@ -64,6 +64,7 @@ FMT_END_NAMESPACE namespace ccf::historical { static constexpr auto slow_fetch_threshold = std::chrono::milliseconds(1000); + static constexpr size_t soft_to_raw_ratio{5}; static std::optional get_signature( const ccf::kv::StorePtr& sig_store) @@ -95,10 +96,9 @@ namespace ccf::historical // whether to keep all the writes so that we can build a diff later bool track_deletes_on_missing_keys_v = false; - enum class RequestStage + enum class StoreStage { Fetching, - Untrusted, Trusted, }; @@ -132,7 +132,7 @@ namespace ccf::historical struct StoreDetails { std::chrono::milliseconds time_until_fetch = {}; - RequestStage current_stage = RequestStage::Fetching; + StoreStage current_stage = StoreStage::Fetching; ccf::crypto::Sha256Hash entry_digest = {}; ccf::ClaimsDigest claims_digest = {}; ccf::kv::StorePtr store = nullptr; @@ -234,11 +234,13 @@ namespace ccf::historical return {}; } - void adjust_ranges( + std::pair, std::vector> adjust_ranges( const SeqNoCollection& new_seqnos, bool should_include_receipts, SeqNo earliest_ledger_secret_seqno) { + std::vector removed{}, added{}; + bool any_diff = false; // If a seqno is earlier than the earliest known ledger secret, we will @@ -266,6 +268,7 @@ namespace ccf::historical { // No longer looking for a seqno which was previously requested. // Remove it from my_stores + removed.push_back(prev_it->first); prev_it = my_stores.erase(prev_it); any_diff |= true; } @@ -279,6 +282,7 @@ namespace ccf::historical { // If this is too early for known secrets, just record that it // was requested but don't add it to all_stores yet + added.push_back(*new_it); prev_it = my_stores.insert_or_assign(prev_it, *new_it, nullptr); any_too_early = true; } @@ -293,6 +297,7 @@ namespace ccf::historical details = std::make_shared(); all_stores.insert_or_assign(all_it, *new_it, details); } + added.push_back(*new_it); prev_it = my_stores.insert_or_assign(prev_it, *new_it, details); } any_diff |= true; @@ -311,7 +316,7 @@ namespace ccf::historical if (!any_diff && (should_include_receipts == include_receipts)) { HISTORICAL_LOG("Identical to previous request"); - return; + return {removed, added}; } include_receipts = should_include_receipts; @@ -331,6 +336,7 @@ namespace ccf::historical populate_receipts(seqno); } } + return {removed, added}; } void populate_receipts(ccf::SeqNo new_seqno) @@ -493,6 +499,136 @@ namespace ccf::historical ExpiryDuration default_expiry_duration = std::chrono::seconds(1800); + // These two combine into an effective O(log(N)) lookup/add/remove by + // handle. + std::list lru_requests; + std::map::iterator> lru_lookup; + + // To maintain the estimated size consumed by all requests. Gets updated + // when ledger entries are fetched, and when requests are dropped. + std::unordered_map> store_to_requests; + std::unordered_map raw_store_sizes{}; + + CacheSize soft_store_cache_limit{1ll * 1024 * 1024 * 512 /*512 MB*/}; + CacheSize soft_store_cache_limit_raw = + soft_store_cache_limit / soft_to_raw_ratio; + CacheSize estimated_store_cache_size{0}; + + void add_request_ref(SeqNo seq, CompoundHandle handle) + { + auto it = store_to_requests.find(seq); + + if (it == store_to_requests.end()) + { + store_to_requests.insert({seq, {handle}}); + auto size = raw_store_sizes.find(seq); + if (size != raw_store_sizes.end()) + { + estimated_store_cache_size += size->second; + } + } + else + { + it->second.insert(handle); + } + } + + void add_request_refs(CompoundHandle handle) + { + for (const auto& [seq, _] : requests.at(handle).my_stores) + { + add_request_ref(seq, handle); + } + } + + void remove_request_ref(SeqNo seq, CompoundHandle handle) + { + auto it = store_to_requests.find(seq); + assert(it != store_to_requests.end()); + + it->second.erase(handle); + if (it->second.empty()) + { + store_to_requests.erase(it); + auto size = raw_store_sizes.find(seq); + if (size != raw_store_sizes.end()) + { + estimated_store_cache_size -= size->second; + raw_store_sizes.erase(size); + } + } + } + + void remove_request_refs(CompoundHandle handle) + { + for (const auto& [seq, _] : requests.at(handle).my_stores) + { + remove_request_ref(seq, handle); + } + } + + void lru_promote(CompoundHandle handle) + { + auto it = lru_lookup.find(handle); + if (it != lru_lookup.end()) + { + lru_requests.erase(it->second); + it->second = lru_requests.insert(lru_requests.begin(), handle); + } + else + { + lru_lookup[handle] = lru_requests.insert(lru_requests.begin(), handle); + add_request_refs(handle); + } + } + + void lru_shrink_to_fit(size_t threshold) + { + while (estimated_store_cache_size > threshold) + { + if (lru_requests.empty()) + { + LOG_FAIL_FMT( + "LRU shrink to {} requested but cache is already empty", threshold); + return; + } + + const auto handle = lru_requests.back(); + LOG_DEBUG_FMT( + "Cache size shrinking (reached {} / {}). Dropping {}", + estimated_store_cache_size, + threshold, + handle); + + remove_request_refs(handle); + lru_lookup.erase(handle); + + requests.erase(handle); + lru_requests.pop_back(); + } + } + + void lru_evict(CompoundHandle handle) + { + auto it = lru_lookup.find(handle); + if (it != lru_lookup.end()) + { + remove_request_refs(handle); + lru_requests.erase(it->second); + lru_lookup.erase(it); + } + } + + void update_store_raw_size(SeqNo seq, size_t new_size) + { + auto& stored_size = raw_store_sizes[seq]; + assert(!stored_size || stored_size == new_size); + + estimated_store_cache_size -= stored_size; + estimated_store_cache_size += new_size; + stored_size = new_size; + } + void fetch_entry_at(ccf::SeqNo seqno) { fetch_entries_range(seqno, seqno); @@ -577,7 +713,7 @@ namespace ccf::historical { // Deserialisation includes a GCM integrity check, so all entries // have been verified by the time we get here. - details->current_stage = RequestStage::Trusted; + details->current_stage = StoreStage::Trusted; details->has_commit_evidence = has_commit_evidence; details->entry_digest = entry_digest; @@ -760,6 +896,8 @@ namespace ccf::historical HISTORICAL_LOG("First time I've seen handle {}", handle); } + lru_promote(handle); + Request& request = it->second; update_earliest_known_ledger_secret(); @@ -772,9 +910,18 @@ namespace ccf::historical seqnos.size(), *seqnos.begin(), include_receipts); - request.adjust_ranges( + auto [removed, added] = request.adjust_ranges( seqnos, include_receipts, earliest_secret_.valid_from); + for (auto seq : removed) + { + remove_request_ref(seq, handle); + } + for (auto seq : added) + { + add_request_ref(seq, handle); + } + // If the earliest target entry cannot be deserialised with the earliest // known ledger secret, record the target seqno and begin fetching the // previous historical ledger secret. @@ -789,10 +936,9 @@ namespace ccf::historical for (auto seqno : seqnos) { auto target_details = request.get_store_details(seqno); - if ( target_details != nullptr && - target_details->current_stage == RequestStage::Trusted && + target_details->current_stage == StoreStage::Trusted && (!request.include_receipts || target_details->receipt != nullptr)) { // Have this store, associated txid and receipt and trust it - add @@ -823,6 +969,7 @@ namespace ccf::historical { if (request_it->second.get_store_details(seqno) != nullptr) { + lru_evict(request_it->first); request_it = requests.erase(request_it); } else @@ -977,6 +1124,12 @@ namespace ccf::historical default_expiry_duration = duration; } + void set_soft_cache_limit(CacheSize cache_limit) + { + soft_store_cache_limit = cache_limit; + soft_store_cache_limit_raw = soft_store_cache_limit / soft_to_raw_ratio; + } + void track_deletes_on_missing_keys(bool track) { track_deletes_on_missing_keys_v = track; @@ -985,8 +1138,8 @@ namespace ccf::historical bool drop_cached_states(const CompoundHandle& handle) { std::lock_guard guard(requests_lock); + lru_evict(handle); const auto erased_count = requests.erase(handle); - HISTORICAL_LOG("Dropping historical request {}", handle); return erased_count > 0; } @@ -1000,8 +1153,7 @@ namespace ccf::historical std::lock_guard guard(requests_lock); const auto it = all_stores.find(seqno); auto details = it == all_stores.end() ? nullptr : it->second.lock(); - if ( - details == nullptr || details->current_stage != RequestStage::Fetching) + if (details == nullptr || details->current_stage != StoreStage::Fetching) { // Unexpected entry, we already have it or weren't asking for it - // ignore this resubmission @@ -1094,6 +1246,7 @@ namespace ccf::historical std::move(claims_digest), has_commit_evidence); + update_store_raw_size(seqno, size); return true; } @@ -1246,6 +1399,7 @@ namespace ccf::historical { LOG_DEBUG_FMT( "Dropping expired historical query with handle {}", it->first); + lru_evict(it->first); it = requests.erase(it); } else @@ -1256,6 +1410,8 @@ namespace ccf::historical } } + lru_shrink_to_fit(soft_store_cache_limit_raw); + { auto it = all_stores.begin(); std::optional> range_to_request = @@ -1269,7 +1425,7 @@ namespace ccf::historical } else { - if (details->current_stage == RequestStage::Fetching) + if (details->current_stage == StoreStage::Fetching) { details->time_until_fetch -= elapsed_ms; if (details->time_until_fetch.count() <= 0) @@ -1435,6 +1591,11 @@ namespace ccf::historical StateCacheImpl::set_default_expiry_duration(duration); } + void set_soft_cache_limit(CacheSize cache_limit) override + { + StateCacheImpl::set_soft_cache_limit(cache_limit); + } + void track_deletes_on_missing_keys(bool track) override { StateCacheImpl::track_deletes_on_missing_keys(track); diff --git a/src/node/rpc/test/node_stub.h b/src/node/rpc/test/node_stub.h index 8c62e15ca057..fd539f19e47a 100644 --- a/src/node/rpc/test/node_stub.h +++ b/src/node/rpc/test/node_stub.h @@ -161,6 +161,8 @@ namespace ccf historical::ExpiryDuration seconds_until_expiry) {} + void set_soft_cache_limit(historical::CacheSize cache_limit){}; + void track_deletes_on_missing_keys(bool track) {} ccf::kv::ReadOnlyStorePtr get_store_at( diff --git a/src/node/test/historical_queries.cpp b/src/node/test/historical_queries.cpp index e7fa424d1b64..8ae0c8203d4d 100644 --- a/src/node/test/historical_queries.cpp +++ b/src/node/test/historical_queries.cpp @@ -237,6 +237,19 @@ std::map> construct_host_ledger( return ledger; } +size_t get_cache_limit_for_entries( + const std::vector>& entries) +{ + return ccf::historical::soft_to_raw_ratio * + std::accumulate( + entries.begin(), + entries.end(), + 0ll, + [&](size_t prev, const std::vector& entry) { + return prev + entry.size(); + }); +} + TEST_CASE("StateCache point queries") { auto state = create_and_init_state(); @@ -949,6 +962,134 @@ TEST_CASE("Incremental progress") } } +TEST_CASE("StateCache soft zero limit with increasing") +{ + // Try get two states. Shouldn't be able to retrieve anything with 0 cache + // limit. After increasing to the size of first state only that one is + // available, but later overwritten by the second one, and finally all are + // evicted after setting again to 0. + // + // ! DISCLAIMER ! If you change this bear in mind that each attempt to get the + // store promotes the handle, and so requests eviction order changes, + // therefore this test in particular is quite fragile. + + auto state = create_and_init_state(); + auto& kv_store = *state.kv_store; + + auto seq_low = write_transactions_and_signature(kv_store, 1); + auto seq_high = write_transactions_and_signature(kv_store, 1); + REQUIRE(kv_store.current_version() == seq_high); + + auto ledger = construct_host_ledger(state.kv_store->get_consensus()); + REQUIRE(ledger.size() == seq_high); + + auto stub_writer = std::make_shared(); + ccf::historical::StateCache cache( + kv_store, state.ledger_secrets, stub_writer); + + cache.set_soft_cache_limit(0); + + REQUIRE(!cache.get_state_at(0, seq_low)); + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_low, ledger.at(seq_low)); + cache.tick(std::chrono::milliseconds(100)); + + // Ledger entry fecthed and instantly removed because of the parent request + // eviction due to zero cache limit. + REQUIRE(!cache.get_state_at(0, seq_low)); + + cache.set_soft_cache_limit( + get_cache_limit_for_entries({ledger.at(seq_low), ledger.at(seq_high)}) - 1); + + cache.handle_ledger_entry(seq_low, ledger.at(seq_low)); + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_high)); + + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_high, ledger.at(seq_high)); + + // Both available because tick hasn't been called yet. + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_high)); + + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(!cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_high)); + + cache.set_soft_cache_limit(0); + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(!cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_high)); +} + +TEST_CASE("StateCache dropping state explicitly") +{ + // Adding two states to the cache (limit is hit). Drop state2 and add state3. + // State1 should remain available, as well as state3, state2 was force-evicted + // from LRU because it's been dropped explicitly. + // + // ! DISCLAIMER ! If you change this bear in mind that each attempt to get the + // store promotes the handle, and so requests eviction order changes, + // therefore this test in particular is quite fragile. + + auto state = create_and_init_state(); + auto& kv_store = *state.kv_store; + + auto seq_low = write_transactions_and_signature(kv_store, 1); + auto seq_mid = write_transactions_and_signature(kv_store, 1); + auto seq_high = write_transactions_and_signature(kv_store, 1); + REQUIRE(kv_store.current_version() == seq_high); + + auto ledger = construct_host_ledger(state.kv_store->get_consensus()); + REQUIRE(ledger.size() == seq_high); + + auto stub_writer = std::make_shared(); + ccf::historical::StateCache cache( + kv_store, state.ledger_secrets, stub_writer); + + REQUIRE(!cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_mid)); + + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_low, ledger.at(seq_low)); + cache.handle_ledger_entry(seq_mid, ledger.at(seq_mid)); + + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_mid)); + + cache.set_soft_cache_limit( + get_cache_limit_for_entries( + {ledger.at(seq_low), ledger.at(seq_mid), ledger.at(seq_high)}) - + 1); + + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(!cache.get_state_at(2, seq_high)); + cache.tick(std::chrono::milliseconds(100)); + + // Ledger entries not provided yet, so previous are not evicted. + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_mid)); + REQUIRE(!cache.get_state_at(2, seq_high)); + + cache.drop_cached_states(1); + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_high, ledger.at(seq_high)); + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_mid)); + REQUIRE(cache.get_state_at(2, seq_high)); +} + TEST_CASE("StateCache sparse queries") { auto state = create_and_init_state();