diff --git a/Cargo.lock b/Cargo.lock index d7a765dd846a5..f6d05a319e55f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5749,10 +5749,14 @@ dependencies = [ "fail 0.5.1", "fastcrypto", "futures", + "lru", "narwhal-config", "narwhal-crypto", "narwhal-test-utils", "narwhal-types", + "parking_lot 0.12.1", + "prometheus", + "tap", "tempfile", "tokio", "tracing", diff --git a/crates/sui-core/src/narwhal_manager/mod.rs b/crates/sui-core/src/narwhal_manager/mod.rs index fd6f8d1b0bb8b..381745e7206d0 100644 --- a/crates/sui-core/src/narwhal_manager/mod.rs +++ b/crates/sui-core/src/narwhal_manager/mod.rs @@ -11,7 +11,7 @@ use narwhal_config::{Committee, Epoch, Parameters, WorkerCache, WorkerId}; use narwhal_executor::ExecutionState; use narwhal_node::primary_node::PrimaryNode; use narwhal_node::worker_node::WorkerNodes; -use narwhal_node::NodeStorage; +use narwhal_node::{CertificateStoreCacheMetrics, NodeStorage}; use narwhal_worker::TransactionValidator; use prometheus::{register_int_gauge_with_registry, IntGauge, Registry}; use std::path::PathBuf; @@ -83,6 +83,7 @@ pub struct NarwhalManager { worker_nodes: WorkerNodes, running: Mutex, metrics: NarwhalManagerMetrics, + store_cache_metrics: CertificateStoreCacheMetrics, } impl NarwhalManager { @@ -98,6 +99,9 @@ impl NarwhalManager { let worker_nodes = WorkerNodes::new(config.registry_service.clone(), config.parameters.clone()); + let store_cache_metrics = + CertificateStoreCacheMetrics::new(&config.registry_service.default_registry()); + Self { primary_node, worker_nodes, @@ -107,6 +111,7 @@ impl NarwhalManager { storage_base_path: config.storage_base_path, running: Mutex::new(Running::False), metrics, + store_cache_metrics, } } @@ -134,7 +139,7 @@ impl NarwhalManager { // Create a new store let store_path = self.get_store_path(committee.epoch()); - let store = NodeStorage::reopen(store_path); + let store = NodeStorage::reopen(store_path, Some(self.store_cache_metrics.clone())); let name = self.primary_keypair.public().clone(); diff --git a/narwhal/consensus/src/tests/consensus_tests.rs b/narwhal/consensus/src/tests/consensus_tests.rs index 3fd436ca10df2..e690e63cdc7c1 100644 --- a/narwhal/consensus/src/tests/consensus_tests.rs +++ b/narwhal/consensus/src/tests/consensus_tests.rs @@ -34,7 +34,7 @@ async fn test_consensus_recovery_with_bullshark() { let _guard = setup_tracing(); // GIVEN - let storage = NodeStorage::reopen(temp_dir()); + let storage = NodeStorage::reopen(temp_dir(), None); let consensus_store = storage.consensus_store; let certificate_store = storage.certificate_store; @@ -154,7 +154,7 @@ async fn test_consensus_recovery_with_bullshark() { let (tx_consensus_round_updates, _rx_consensus_round_updates) = watch::channel(ConsensusRound::default()); - let storage = NodeStorage::reopen(temp_dir()); + let storage = NodeStorage::reopen(temp_dir(), None); let consensus_store = storage.consensus_store; let certificate_store = storage.certificate_store; diff --git a/narwhal/consensus/src/tests/consensus_utils.rs b/narwhal/consensus/src/tests/consensus_utils.rs index 3317c30c4cefa..9e054e4260deb 100644 --- a/narwhal/consensus/src/tests/consensus_utils.rs +++ b/narwhal/consensus/src/tests/consensus_utils.rs @@ -1,8 +1,9 @@ +use std::num::NonZeroUsize; // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use config::AuthorityIdentifier; use std::sync::Arc; -use storage::CertificateStore; +use storage::{CertificateStore, CertificateStoreCache}; use store::rocks::MetricConf; use store::{reopen, rocks, rocks::DBMap, rocks::ReadWriteOptions}; use types::{ @@ -55,5 +56,6 @@ pub fn make_certificate_store(store_path: &std::path::Path) -> CertificateStore certificate_map, certificate_digest_by_round_map, certificate_digest_by_origin_map, + CertificateStoreCache::new(NonZeroUsize::new(100).unwrap(), None), ) } diff --git a/narwhal/executor/tests/consensus_integration_tests.rs b/narwhal/executor/tests/consensus_integration_tests.rs index fd21263b9007e..f4a7106d297b3 100644 --- a/narwhal/executor/tests/consensus_integration_tests.rs +++ b/narwhal/executor/tests/consensus_integration_tests.rs @@ -22,7 +22,7 @@ use types::{Certificate, PreSubscribedBroadcastSender, Round, TransactionProto}; #[tokio::test] async fn test_recovery() { // Create storage - let storage = NodeStorage::reopen(temp_dir()); + let storage = NodeStorage::reopen(temp_dir(), None); let consensus_store = storage.consensus_store; let certificate_store = storage.certificate_store; diff --git a/narwhal/node/src/lib.rs b/narwhal/node/src/lib.rs index d1628c3546067..2f1a0524a5748 100644 --- a/narwhal/node/src/lib.rs +++ b/narwhal/node/src/lib.rs @@ -4,7 +4,7 @@ use config::WorkerId; use executor::SubscriberError; use futures::future::try_join_all; use futures::stream::FuturesUnordered; -pub use storage::NodeStorage; +pub use storage::{CertificateStoreCacheMetrics, NodeStorage}; use thiserror::Error; pub mod execution_state; diff --git a/narwhal/node/src/main.rs b/narwhal/node/src/main.rs index c00ac325a62ee..e72854a1fc0ed 100644 --- a/narwhal/node/src/main.rs +++ b/narwhal/node/src/main.rs @@ -23,7 +23,7 @@ use node::{ }; use prometheus::Registry; use std::sync::Arc; -use storage::NodeStorage; +use storage::{CertificateStoreCacheMetrics, NodeStorage}; use sui_keys::keypair_file::{ read_authority_keypair_from_file, read_network_keypair_from_file, write_authority_keypair_to_file, write_keypair_to_file, @@ -273,13 +273,15 @@ async fn run( }; // Make the data store. - let store = NodeStorage::reopen(store_path); + let registry_service = RegistryService::new(Registry::new()); + let certificate_store_cache_metrics = + CertificateStoreCacheMetrics::new(®istry_service.default_registry()); + + let store = NodeStorage::reopen(store_path, Some(certificate_store_cache_metrics)); // The channel returning the result for each transaction's execution. let (_tx_transaction_confirmation, _rx_transaction_confirmation) = channel(100); - let registry_service = RegistryService::new(Registry::new()); - // Check whether to run a primary, a worker, or an entire authority. let (primary, worker) = match matches.subcommand() { // Spawn the primary and consensus core. diff --git a/narwhal/node/tests/node_test.rs b/narwhal/node/tests/node_test.rs index c2c2368648da7..df5943a3fe8ed 100644 --- a/narwhal/node/tests/node_test.rs +++ b/narwhal/node/tests/node_test.rs @@ -35,7 +35,7 @@ async fn simple_primary_worker_node_start_stop() { let key_pair = authority.keypair(); let network_key_pair = authority.network_keypair(); - let store = NodeStorage::reopen(temp_dir()); + let store = NodeStorage::reopen(temp_dir(), None); let (tx_confirmation, _rx_confirmation) = channel(10); let execution_state = Arc::new(SimpleExecutionState::new(tx_confirmation)); @@ -114,7 +114,7 @@ async fn primary_node_restart() { let key_pair = authority.keypair(); let network_key_pair = authority.network_keypair(); - let store = NodeStorage::reopen(temp_dir()); + let store = NodeStorage::reopen(temp_dir(), None); let (tx_confirmation, _rx_confirmation) = channel(10); let execution_state = Arc::new(SimpleExecutionState::new(tx_confirmation)); diff --git a/narwhal/primary/src/tests/certificate_fetcher_tests.rs b/narwhal/primary/src/tests/certificate_fetcher_tests.rs index e6c502a27a80c..9c8018cd738e1 100644 --- a/narwhal/primary/src/tests/certificate_fetcher_tests.rs +++ b/narwhal/primary/src/tests/certificate_fetcher_tests.rs @@ -158,7 +158,7 @@ async fn fetch_certificates_basic() { let (tx_fetch_resp, rx_fetch_resp) = mpsc::channel(1000); // Create test stores. - let store = NodeStorage::reopen(temp_dir()); + let store = NodeStorage::reopen(temp_dir(), None); let certificate_store = store.certificate_store.clone(); let payload_store = store.payload_store.clone(); diff --git a/narwhal/primary/src/tests/common.rs b/narwhal/primary/src/tests/common.rs index 3baccd3b1ad5c..b445f93f0dc9d 100644 --- a/narwhal/primary/src/tests/common.rs +++ b/narwhal/primary/src/tests/common.rs @@ -1,9 +1,10 @@ +use std::num::NonZeroUsize; // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use config::{AuthorityIdentifier, WorkerId}; use crypto::NetworkKeyPair; use std::time::Duration; -use storage::{CertificateStore, HeaderStore, PayloadStore}; +use storage::{CertificateStore, CertificateStoreCache, HeaderStore, PayloadStore}; use store::{reopen, rocks, rocks::DBMap, rocks::ReadWriteOptions}; use test_utils::{ temp_dir, PrimaryToWorkerMockServer, CERTIFICATES_CF, CERTIFICATE_DIGEST_BY_ORIGIN_CF, @@ -53,6 +54,7 @@ pub fn create_db_stores() -> (HeaderStore, CertificateStore, PayloadStore) { certificate_map, certificate_digest_by_round_map, certificate_digest_by_origin_map, + CertificateStoreCache::new(NonZeroUsize::new(100).unwrap(), None), ), PayloadStore::new(payload_map), ) diff --git a/narwhal/primary/src/tests/primary_tests.rs b/narwhal/primary/src/tests/primary_tests.rs index f731d226cc844..04d5f36c53ece 100644 --- a/narwhal/primary/src/tests/primary_tests.rs +++ b/narwhal/primary/src/tests/primary_tests.rs @@ -26,8 +26,8 @@ use std::{ sync::Arc, time::Duration, }; -use storage::PayloadToken; use storage::{CertificateStore, VoteDigestStore}; +use storage::{CertificateStoreCache, PayloadToken}; use storage::{NodeStorage, PayloadStore}; use store::rocks::{DBMap, MetricConf, ReadWriteOptions}; use test_utils::{make_optimal_signed_certificates, temp_dir, CommitteeFixture}; @@ -60,7 +60,7 @@ async fn get_network_peers_from_admin_server() { let worker_1_keypair = authority_1.worker(worker_id).keypair().copy(); // Make the data store. - let store = NodeStorage::reopen(temp_dir()); + let store = NodeStorage::reopen(temp_dir(), None); let (tx_new_certificates, rx_new_certificates) = types::metered_channel::channel( CHANNEL_CAPACITY, @@ -1177,6 +1177,7 @@ async fn test_process_payload_availability_when_failures() { certificate_map, certificate_digest_by_round_map, certificate_digest_by_origin_map, + CertificateStoreCache::new(NonZeroUsize::new(100).unwrap(), None), ); let payload_store = PayloadStore::new(payload_map); diff --git a/narwhal/primary/tests/integration_tests_proposer_api.rs b/narwhal/primary/tests/integration_tests_proposer_api.rs index 839f8a5d24727..419eb2584e642 100644 --- a/narwhal/primary/tests/integration_tests_proposer_api.rs +++ b/narwhal/primary/tests/integration_tests_proposer_api.rs @@ -74,7 +74,7 @@ async fn test_rounds_errors() { }; // AND create separate data stores - let store_primary = NodeStorage::reopen(temp_dir()); + let store_primary = NodeStorage::reopen(temp_dir(), None); // Spawn the primary let (tx_new_certificates, rx_new_certificates) = @@ -183,7 +183,7 @@ async fn test_rounds_return_successful_response() { }; // AND create separate data stores - let store_primary = NodeStorage::reopen(temp_dir()); + let store_primary = NodeStorage::reopen(temp_dir(), None); // Spawn the primary let (tx_new_certificates, rx_new_certificates) = @@ -284,8 +284,8 @@ async fn test_node_read_causal_signed_certificates() { let authority_2 = fixture.authorities().nth(1).unwrap(); // Make the data store. - let primary_store_1 = NodeStorage::reopen(temp_dir()); - let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir()); + let primary_store_1 = NodeStorage::reopen(temp_dir(), None); + let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir(), None); let mut collection_ids: Vec = Vec::new(); diff --git a/narwhal/primary/tests/integration_tests_validator_api.rs b/narwhal/primary/tests/integration_tests_validator_api.rs index b5402586d41c3..301fa3f5108f5 100644 --- a/narwhal/primary/tests/integration_tests_validator_api.rs +++ b/narwhal/primary/tests/integration_tests_validator_api.rs @@ -48,7 +48,7 @@ async fn test_get_collections() { let worker_keypair = author.worker(worker_id).keypair().copy(); // Make the data store. - let store = NodeStorage::reopen(temp_dir()); + let store = NodeStorage::reopen(temp_dir(), None); let mut header_digests = Vec::new(); // Blocks/Collections @@ -243,7 +243,7 @@ async fn test_remove_collections() { let worker_keypair = author.worker(worker_id).keypair().copy(); // Make the data store. - let store = NodeStorage::reopen(temp_dir()); + let store = NodeStorage::reopen(temp_dir(), None); let mut header_digests = Vec::new(); // Blocks/Collections let mut collection_digests = Vec::new(); @@ -470,8 +470,8 @@ async fn test_read_causal_signed_certificates() { let authority_2 = fixture.authorities().nth(1).unwrap(); // Make the data store. - let primary_store_1 = NodeStorage::reopen(temp_dir()); - let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir()); + let primary_store_1 = NodeStorage::reopen(temp_dir(), None); + let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir(), None); let mut collection_digests: Vec = Vec::new(); @@ -698,8 +698,8 @@ async fn test_read_causal_unsigned_certificates() { let network_keypair_2 = authority_2.network_keypair().copy(); // Make the data store. - let primary_store_1 = NodeStorage::reopen(temp_dir()); - let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir()); + let primary_store_1 = NodeStorage::reopen(temp_dir(), None); + let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir(), None); let mut collection_digests: Vec = Vec::new(); @@ -936,8 +936,8 @@ async fn test_get_collections_with_missing_certificates() { }; // AND create separate data stores for the 2 primaries - let store_primary_1 = NodeStorage::reopen(temp_dir()); - let store_primary_2 = NodeStorage::reopen(temp_dir()); + let store_primary_1 = NodeStorage::reopen(temp_dir(), None); + let store_primary_2 = NodeStorage::reopen(temp_dir(), None); // The certificate_1 will be stored in primary 1 let (certificate_1, batch_1) = fixture_certificate( diff --git a/narwhal/storage/Cargo.toml b/narwhal/storage/Cargo.toml index 0959d65ee2de6..828c1d0535115 100644 --- a/narwhal/storage/Cargo.toml +++ b/narwhal/storage/Cargo.toml @@ -19,7 +19,11 @@ types = { path = "../types", package = "narwhal-types" } store = { path = "../../crates/typed-store", package = "typed-store" } config = { path = "../config", package = "narwhal-config" } workspace-hack = { version = "0.1", path = "../../crates/workspace-hack" } +prometheus = "0.13.3" fail = "0.5.1" +lru = "0.10" +parking_lot = "0.12.1" +tap = "1.0.1" [dev-dependencies] test-utils = { path = "../test-utils", package = "narwhal-test-utils" } diff --git a/narwhal/storage/src/certificate_store.rs b/narwhal/storage/src/certificate_store.rs index 47ac5f141b91b..f1115b6c41edc 100644 --- a/narwhal/storage/src/certificate_store.rs +++ b/narwhal/storage/src/certificate_store.rs @@ -1,7 +1,14 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use fastcrypto::hash::Hash; +use lru::LruCache; +use parking_lot::Mutex; +use prometheus::{register_int_counter_with_registry, IntCounter, Registry}; +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::Arc; use std::{cmp::Ordering, collections::BTreeMap, iter}; +use tap::Tap; use crate::NotifySubscribers; use config::AuthorityIdentifier; @@ -11,6 +18,181 @@ use store::{ }; use types::{Certificate, CertificateDigest, Round, StoreResult}; +#[derive(Clone)] +pub struct CertificateStoreCacheMetrics { + hit: IntCounter, + miss: IntCounter, +} + +impl CertificateStoreCacheMetrics { + pub fn new(registry: &Registry) -> Self { + Self { + hit: register_int_counter_with_registry!( + "certificate_store_cache_hit", + "The number of hits in the cache", + registry + ) + .unwrap(), + miss: register_int_counter_with_registry!( + "certificate_store_cache_miss", + "The number of miss in the cache", + registry + ) + .unwrap(), + } + } +} + +/// A cache trait to be used as temporary in-memory store when accessing the underlying +/// certificate_store. Using the cache allows to skip rocksdb access giving us benefits +/// both on less disk access (when value not in db's cache) and also avoiding any additional +/// deserialization costs. +pub trait Cache { + fn write(&self, certificate: Certificate); + fn write_all(&self, certificate: Vec); + fn read(&self, digest: &CertificateDigest) -> Option; + + /// Returns the certificates by performing a look up in the cache. The method is expected to + /// always return a result for every provided digest (when found will be Some, None otherwise) + /// and in the same order. + fn read_all( + &self, + digests: Vec, + ) -> Vec<(CertificateDigest, Option)>; + fn contains(&self, digest: &CertificateDigest) -> bool; + fn remove(&self, digest: &CertificateDigest); + fn remove_all(&self, digests: Vec); +} + +/// An LRU cache for the certificate store. +#[derive(Clone)] +pub struct CertificateStoreCache { + cache: Arc>>, + metrics: Option, +} + +impl CertificateStoreCache { + pub fn new(size: NonZeroUsize, metrics: Option) -> Self { + Self { + cache: Arc::new(Mutex::new(LruCache::new(size))), + metrics, + } + } + + fn report_result(&self, is_hit: bool) { + if let Some(metrics) = self.metrics.as_ref() { + if is_hit { + metrics.hit.inc() + } else { + metrics.miss.inc() + } + } + } +} + +impl Cache for CertificateStoreCache { + fn write(&self, certificate: Certificate) { + let mut guard = self.cache.lock(); + guard.put(certificate.digest(), certificate); + } + + fn write_all(&self, certificate: Vec) { + let mut guard = self.cache.lock(); + for cert in certificate { + guard.put(cert.digest(), cert); + } + } + + /// Fetches the certificate for the provided digest. This method will update the LRU record + /// and mark it as "last accessed". + fn read(&self, digest: &CertificateDigest) -> Option { + let mut guard = self.cache.lock(); + guard + .get(digest) + .cloned() + .tap(|v| self.report_result(v.is_some())) + } + + /// Fetches the certificates for the provided digests. This method will update the LRU records + /// and mark them as "last accessed". + fn read_all( + &self, + digests: Vec, + ) -> Vec<(CertificateDigest, Option)> { + let mut guard = self.cache.lock(); + digests + .into_iter() + .map(move |id| { + ( + id, + guard + .get(&id) + .cloned() + .tap(|v| self.report_result(v.is_some())), + ) + }) + .collect() + } + + /// Checks whether the value exists in the LRU cache. The method does not update the LRU record, thus + /// it will not count as a "last access" for the provided digest. + fn contains(&self, digest: &CertificateDigest) -> bool { + let guard = self.cache.lock(); + guard + .contains(digest) + .tap(|result| self.report_result(*result)) + } + + fn remove(&self, digest: &CertificateDigest) { + let mut guard = self.cache.lock(); + let _ = guard.pop(digest); + } + + fn remove_all(&self, digests: Vec) { + let mut guard = self.cache.lock(); + for digest in digests { + let _ = guard.pop(&digest); + } + } +} + +/// An implementation that basically disables the caching functionality when used for CertificateStore. +#[derive(Clone)] +struct NoCache {} + +impl Cache for NoCache { + fn write(&self, _certificate: Certificate) { + // no-op + } + + fn write_all(&self, _certificate: Vec) { + // no-op + } + + fn read(&self, _digest: &CertificateDigest) -> Option { + None + } + + fn read_all( + &self, + digests: Vec, + ) -> Vec<(CertificateDigest, Option)> { + digests.into_iter().map(|digest| (digest, None)).collect() + } + + fn contains(&self, _digest: &CertificateDigest) -> bool { + false + } + + fn remove(&self, _digest: &CertificateDigest) { + // no-op + } + + fn remove_all(&self, _digests: Vec) { + // no-op + } +} + /// The main storage when we have to deal with certificates. It maintains /// two storages, one main which saves the certificates by their ids, and a /// secondary one which acts as an index to allow us fast retrieval based @@ -19,7 +201,7 @@ use types::{Certificate, CertificateDigest, Round, StoreResult}; /// `notify_read` someone can wait to hear until a certificate by a specific /// id has been written in storage. #[derive(Clone)] -pub struct CertificateStore { +pub struct CertificateStore { /// Holds the certificates by their digest id certificates_by_id: DBMap, /// A secondary index that keeps the certificate digest ids @@ -34,19 +216,23 @@ pub struct CertificateStore { certificate_id_by_origin: DBMap<(AuthorityIdentifier, Round), CertificateDigest>, /// The pub/sub to notify for a write that happened for a certificate digest id notify_subscribers: NotifySubscribers, + /// An LRU cache to keep recent certificates + cache: Arc, } -impl CertificateStore { +impl CertificateStore { pub fn new( certificates_by_id: DBMap, certificate_id_by_round: DBMap<(Round, AuthorityIdentifier), CertificateDigest>, certificate_id_by_origin: DBMap<(AuthorityIdentifier, Round), CertificateDigest>, - ) -> CertificateStore { + certificate_store_cache: T, + ) -> CertificateStore { Self { certificates_by_id, certificate_id_by_round, certificate_id_by_origin, notify_subscribers: NotifySubscribers::new(), + cache: Arc::new(certificate_store_cache), } } @@ -85,6 +271,9 @@ impl CertificateStore { self.notify_subscribers.notify(&id, &certificate); } + // insert in cache + self.cache.write(certificate); + result } @@ -131,12 +320,19 @@ impl CertificateStore { let result = batch.write(); if result.is_ok() { - for (_id, certificate) in certificates { + for (_id, certificate) in &certificates { self.notify_subscribers - .notify(&certificate.digest(), &certificate); + .notify(&certificate.digest(), certificate); } } + self.cache.write_all( + certificates + .into_iter() + .map(|(_, certificate)| certificate) + .collect(), + ); + result } @@ -149,6 +345,10 @@ impl CertificateStore { ))) }); + if let Some(certificate) = self.cache.read(&id) { + return Ok(Some(certificate)); + } + self.certificates_by_id.get(&id) } @@ -166,7 +366,7 @@ impl CertificateStore { }); match self.certificate_id_by_origin.get(&(origin, round))? { - Some(d) => self.certificates_by_id.get(&d), + Some(d) => self.read(d), None => Ok(None), } } @@ -180,6 +380,10 @@ impl CertificateStore { ))) }); + if self.cache.contains(id) { + return Ok(true); + } + self.certificates_by_id.contains_key(id) } @@ -195,7 +399,31 @@ impl CertificateStore { ))) }); - self.certificates_by_id.multi_get(ids) + let mut found = HashMap::new(); + let mut missing = Vec::new(); + + // first find whatever we can from our local cache + let ids: Vec = ids.into_iter().collect(); + for (id, certificate) in self.cache.read_all(ids.clone()) { + if let Some(certificate) = certificate { + found.insert(id, certificate.clone()); + } else { + missing.push(id); + } + } + + // then fallback for all the misses on the storage + let from_store = self.certificates_by_id.multi_get(&missing)?; + from_store + .iter() + .zip(missing) + .for_each(|(certificate, id)| { + if let Some(certificate) = certificate { + found.insert(id, certificate.clone()); + } + }); + + Ok(ids.into_iter().map(|id| found.get(&id).cloned()).collect()) } /// Waits to get notified until the requested certificate becomes available @@ -241,7 +469,13 @@ impl CertificateStore { batch = batch.delete_batch(&self.certificate_id_by_round, iter::once(key))?; // execute the batch (atomically) and return the result - batch.write() + let result = batch.write(); + + if result.is_ok() { + self.cache.remove(&id); + } + + result } /// Deletes multiple certificates in an atomic way. @@ -264,10 +498,16 @@ impl CertificateStore { batch = batch.delete_batch(&self.certificate_id_by_round, keys_by_round)?; // delete the certificates by its ids - batch = batch.delete_batch(&self.certificates_by_id, ids)?; + batch = batch.delete_batch(&self.certificates_by_id, ids.clone())?; // execute the batch (atomically) and return the result - batch.write() + let result = batch.write(); + + if result.is_ok() { + self.cache.remove_all(ids); + } + + result } /// Retrieves all the certificates with round >= the provided round. @@ -375,7 +615,7 @@ impl CertificateStore { .next() { if name == origin { - return self.certificates_by_id.get(&digest); + return self.read(digest); } } Ok(None) @@ -447,10 +687,12 @@ impl CertificateStore { #[cfg(test)] mod test { - use crate::certificate_store::CertificateStore; + use crate::certificate_store::{CertificateStore, NoCache}; + use crate::{Cache, CertificateStoreCache}; use config::AuthorityIdentifier; use fastcrypto::hash::Hash; use futures::future::join_all; + use std::num::NonZeroUsize; use std::{ collections::{BTreeSet, HashSet}, time::Instant, @@ -464,6 +706,38 @@ mod test { use types::{Certificate, CertificateDigest, Round}; fn new_store(path: std::path::PathBuf) -> CertificateStore { + let (certificate_map, certificate_id_by_round_map, certificate_id_by_origin_map) = + create_db_maps(path); + + let store_cache = CertificateStoreCache::new(NonZeroUsize::new(100).unwrap(), None); + + CertificateStore::new( + certificate_map, + certificate_id_by_round_map, + certificate_id_by_origin_map, + store_cache, + ) + } + + fn new_store_no_cache(path: std::path::PathBuf) -> CertificateStore { + let (certificate_map, certificate_id_by_round_map, certificate_id_by_origin_map) = + create_db_maps(path); + + CertificateStore::new( + certificate_map, + certificate_id_by_round_map, + certificate_id_by_origin_map, + NoCache {}, + ) + } + + fn create_db_maps( + path: std::path::PathBuf, + ) -> ( + DBMap, + DBMap<(Round, AuthorityIdentifier), CertificateDigest>, + DBMap<(AuthorityIdentifier, Round), CertificateDigest>, + ) { const CERTIFICATES_CF: &str = "certificates"; const CERTIFICATE_ID_BY_ROUND_CF: &str = "certificate_id_by_round"; const CERTIFICATE_ID_BY_ORIGIN_CF: &str = "certificate_id_by_origin"; @@ -480,16 +754,10 @@ mod test { ) .expect("Cannot open database"); - let (certificate_map, certificate_id_by_round_map, certificate_id_by_origin_map) = reopen!(&rocksdb, + reopen!(&rocksdb, CERTIFICATES_CF;, CERTIFICATE_ID_BY_ROUND_CF;<(Round, AuthorityIdentifier), CertificateDigest>, CERTIFICATE_ID_BY_ORIGIN_CF;<(AuthorityIdentifier, Round), CertificateDigest> - ); - - CertificateStore::new( - certificate_map, - certificate_id_by_round_map, - certificate_id_by_origin_map, ) } @@ -524,9 +792,12 @@ mod test { #[tokio::test] async fn test_write_and_read() { - // GIVEN - let store = new_store(temp_dir()); + test_write_and_read_by_store_type(new_store(temp_dir())).await; + test_write_and_read_by_store_type(new_store_no_cache(temp_dir())).await; + } + async fn test_write_and_read_by_store_type(store: CertificateStore) { + // GIVEN // create certificates for 10 rounds let certs = certificates(10); @@ -544,9 +815,12 @@ mod test { #[tokio::test] async fn test_write_all_and_read_all() { - // GIVEN - let store = new_store(temp_dir()); + test_write_all_and_read_all_by_store_type(new_store(temp_dir())).await; + test_write_all_and_read_all_by_store_type(new_store_no_cache(temp_dir())).await; + } + async fn test_write_all_and_read_all_by_store_type(store: CertificateStore) { + // GIVEN // create certificates for 10 rounds let certs = certificates(10); let ids = certs @@ -557,6 +831,12 @@ mod test { // store them in both main and secondary index store.write_all(certs.clone()).unwrap(); + // AND if running with cache, just remove a few items to ensure that they'll be fetched + // from storage + store.cache.remove(&ids[0]); + store.cache.remove(&ids[3]); + store.cache.remove(&ids[9]); + // WHEN let result = store.read_all(ids).unwrap(); @@ -812,10 +1092,13 @@ mod test { } #[tokio::test] - async fn test_delete() { - // GIVEN - let store = new_store(temp_dir()); + async fn test_delete_by_store_type() { + test_delete(new_store(temp_dir())).await; + test_delete(new_store_no_cache(temp_dir())).await; + } + async fn test_delete(store: CertificateStore) { + // GIVEN // create certificates for 10 rounds let certs = certificates(10); @@ -834,10 +1117,13 @@ mod test { } #[tokio::test] - async fn test_delete_all() { - // GIVEN - let store = new_store(temp_dir()); + async fn test_delete_all_by_store_type() { + test_delete_all(new_store(temp_dir())).await; + test_delete_all(new_store_no_cache(temp_dir())).await; + } + async fn test_delete_all(store: CertificateStore) { + // GIVEN // create certificates for 10 rounds let certs = certificates(10); @@ -853,4 +1139,41 @@ mod test { assert!(store.read(to_delete[0]).unwrap().is_none()); assert!(store.read(to_delete[1]).unwrap().is_none()); } + + #[test] + fn test_cache() { + // cache should hold up to 5 elements + let cache = CertificateStoreCache::new(NonZeroUsize::new(5).unwrap(), None); + + let certificates = certificates(5); + + // write 20 certificates + for cert in &certificates { + cache.write(cert.clone()); + } + + for (i, cert) in certificates.iter().enumerate() { + // first 15 certificates should not exist + if i < 15 { + assert!(cache.read(&cert.digest()).is_none()); + } else { + assert!(cache.read(&cert.digest()).is_some()); + } + } + + // now the same should happen when we use a write_all & read_all + let cache = CertificateStoreCache::new(NonZeroUsize::new(5).unwrap(), None); + + cache.write_all(certificates.clone()); + + let result = cache.read_all(certificates.iter().map(|c| c.digest()).collect()); + for (i, (_, cert)) in result.iter().enumerate() { + // first 15 certificates should not exist + if i < 15 { + assert!(cert.is_none()); + } else { + assert!(cert.is_some()); + } + } + } } diff --git a/narwhal/storage/src/node_store.rs b/narwhal/storage/src/node_store.rs index a2033fefa30eb..6beefb98dede4 100644 --- a/narwhal/storage/src/node_store.rs +++ b/narwhal/storage/src/node_store.rs @@ -3,8 +3,12 @@ use crate::payload_store::PayloadStore; use crate::proposer_store::ProposerKey; use crate::vote_digest_store::VoteDigestStore; -use crate::{CertificateStore, HeaderStore, ProposerStore}; +use crate::{ + CertificateStore, CertificateStoreCache, CertificateStoreCacheMetrics, HeaderStore, + ProposerStore, +}; use config::{AuthorityIdentifier, WorkerId}; +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use store::metrics::SamplingInterval; @@ -25,7 +29,7 @@ pub struct NodeStorage { pub proposer_store: ProposerStore, pub vote_digest_store: VoteDigestStore, pub header_store: HeaderStore, - pub certificate_store: CertificateStore, + pub certificate_store: CertificateStore, pub payload_store: PayloadStore, pub batch_store: DBMap, pub consensus_store: Arc, @@ -44,8 +48,16 @@ impl NodeStorage { pub(crate) const LAST_COMMITTED_CF: &'static str = "last_committed"; pub(crate) const SUB_DAG_INDEX_CF: &'static str = "sub_dag"; + // 100 nodes * 60 rounds (assuming 1 round/sec this will hold data for about the last 1 minute + // which should be more than enough for advancing the protocol and also help other nodes) + // TODO: take into account committee size instead of having fixed 100. + pub(crate) const CERTIFICATE_STORE_CACHE_SIZE: usize = 100 * 60; + /// Open or reopen all the storage of the node. - pub fn reopen + Send>(store_path: Path) -> Self { + pub fn reopen + Send>( + store_path: Path, + certificate_store_cache_metrics: Option, + ) -> Self { let mut metrics_conf = MetricConf::with_db_name("consensus_epoch"); metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0); let rocksdb = open_cf( @@ -94,10 +106,16 @@ impl NodeStorage { let proposer_store = ProposerStore::new(last_proposed_map); let vote_digest_store = VoteDigestStore::new(votes_map); let header_store = HeaderStore::new(header_map); - let certificate_store = CertificateStore::new( + + let certificate_store_cache = CertificateStoreCache::new( + NonZeroUsize::new(Self::CERTIFICATE_STORE_CACHE_SIZE).unwrap(), + certificate_store_cache_metrics, + ); + let certificate_store = CertificateStore::::new( certificate_map, certificate_digest_by_round_map, certificate_digest_by_origin_map, + certificate_store_cache, ); let payload_store = PayloadStore::new(payload_map); let batch_store = batch_map; diff --git a/narwhal/test-utils/src/cluster.rs b/narwhal/test-utils/src/cluster.rs index 7c7a3fee93720..4d8f44216aa58 100644 --- a/narwhal/test-utils/src/cluster.rs +++ b/narwhal/test-utils/src/cluster.rs @@ -356,7 +356,7 @@ impl PrimaryNodeDetails { let (tx_transaction_confirmation, mut rx_transaction_confirmation) = channel(100); // Primary node - let primary_store: NodeStorage = NodeStorage::reopen(store_path.clone()); + let primary_store: NodeStorage = NodeStorage::reopen(store_path.clone(), None); self.node .start( @@ -460,7 +460,7 @@ impl WorkerNodeDetails { temp_dir() }; - let worker_store = NodeStorage::reopen(store_path.clone()); + let worker_store = NodeStorage::reopen(store_path.clone(), None); self.node .start( self.primary_key.clone(), diff --git a/narwhal/worker/src/tests/worker_tests.rs b/narwhal/worker/src/tests/worker_tests.rs index ffc6fb7b028a6..03982a3337037 100644 --- a/narwhal/worker/src/tests/worker_tests.rs +++ b/narwhal/worker/src/tests/worker_tests.rs @@ -381,7 +381,7 @@ async fn get_network_peers_from_admin_server() { let worker_1_keypair = authority_1.worker(worker_id).keypair().copy(); // Make the data store. - let store = NodeStorage::reopen(temp_dir()); + let store = NodeStorage::reopen(temp_dir(), None); let (tx_new_certificates, rx_new_certificates) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY);