Skip to content

Commit

Permalink
[refactor] introduce an LRU cache inside certificate store (#9760)
Browse files Browse the repository at this point in the history
## Description 

This PR is introducing an LRU cache in the certificate_store in order to
minimise the impact from the read operations (only on the primary index
at the moment). That will help:
* reduce the number of reads against rocksdb
* avoid deserialisation costs of Certificate 

relevant metrics for cache hit/miss have been added.

The cache has been set to hold 100 * 60 items (100 nodes X 60 rounds) -
but we can tune further. Also, we only populate the cache when we write
a certificate - especially for the cases we are interested I don't
consider we need to re-populate on read.

## Test Plan 

Added unit tests

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis committed Mar 24, 2023
1 parent ca6066b commit 27ca30d
Show file tree
Hide file tree
Showing 18 changed files with 426 additions and 65 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions crates/sui-core/src/narwhal_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use narwhal_config::{Committee, Epoch, Parameters, WorkerCache, WorkerId};
use narwhal_executor::ExecutionState;
use narwhal_node::primary_node::PrimaryNode;
use narwhal_node::worker_node::WorkerNodes;
use narwhal_node::NodeStorage;
use narwhal_node::{CertificateStoreCacheMetrics, NodeStorage};
use narwhal_worker::TransactionValidator;
use prometheus::{register_int_gauge_with_registry, IntGauge, Registry};
use std::path::PathBuf;
Expand Down Expand Up @@ -83,6 +83,7 @@ pub struct NarwhalManager {
worker_nodes: WorkerNodes,
running: Mutex<Running>,
metrics: NarwhalManagerMetrics,
store_cache_metrics: CertificateStoreCacheMetrics,
}

impl NarwhalManager {
Expand All @@ -98,6 +99,9 @@ impl NarwhalManager {
let worker_nodes =
WorkerNodes::new(config.registry_service.clone(), config.parameters.clone());

let store_cache_metrics =
CertificateStoreCacheMetrics::new(&config.registry_service.default_registry());

Self {
primary_node,
worker_nodes,
Expand All @@ -107,6 +111,7 @@ impl NarwhalManager {
storage_base_path: config.storage_base_path,
running: Mutex::new(Running::False),
metrics,
store_cache_metrics,
}
}

Expand Down Expand Up @@ -134,7 +139,7 @@ impl NarwhalManager {

// Create a new store
let store_path = self.get_store_path(committee.epoch());
let store = NodeStorage::reopen(store_path);
let store = NodeStorage::reopen(store_path, Some(self.store_cache_metrics.clone()));

let name = self.primary_keypair.public().clone();

Expand Down
4 changes: 2 additions & 2 deletions narwhal/consensus/src/tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn test_consensus_recovery_with_bullshark() {
let _guard = setup_tracing();

// GIVEN
let storage = NodeStorage::reopen(temp_dir());
let storage = NodeStorage::reopen(temp_dir(), None);

let consensus_store = storage.consensus_store;
let certificate_store = storage.certificate_store;
Expand Down Expand Up @@ -154,7 +154,7 @@ async fn test_consensus_recovery_with_bullshark() {
let (tx_consensus_round_updates, _rx_consensus_round_updates) =
watch::channel(ConsensusRound::default());

let storage = NodeStorage::reopen(temp_dir());
let storage = NodeStorage::reopen(temp_dir(), None);

let consensus_store = storage.consensus_store;
let certificate_store = storage.certificate_store;
Expand Down
4 changes: 3 additions & 1 deletion narwhal/consensus/src/tests/consensus_utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::num::NonZeroUsize;
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use config::AuthorityIdentifier;
use std::sync::Arc;
use storage::CertificateStore;
use storage::{CertificateStore, CertificateStoreCache};
use store::rocks::MetricConf;
use store::{reopen, rocks, rocks::DBMap, rocks::ReadWriteOptions};
use types::{
Expand Down Expand Up @@ -55,5 +56,6 @@ pub fn make_certificate_store(store_path: &std::path::Path) -> CertificateStore
certificate_map,
certificate_digest_by_round_map,
certificate_digest_by_origin_map,
CertificateStoreCache::new(NonZeroUsize::new(100).unwrap(), None),
)
}
2 changes: 1 addition & 1 deletion narwhal/executor/tests/consensus_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use types::{Certificate, PreSubscribedBroadcastSender, Round, TransactionProto};
#[tokio::test]
async fn test_recovery() {
// Create storage
let storage = NodeStorage::reopen(temp_dir());
let storage = NodeStorage::reopen(temp_dir(), None);

let consensus_store = storage.consensus_store;
let certificate_store = storage.certificate_store;
Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use config::WorkerId;
use executor::SubscriberError;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
pub use storage::NodeStorage;
pub use storage::{CertificateStoreCacheMetrics, NodeStorage};
use thiserror::Error;

pub mod execution_state;
Expand Down
10 changes: 6 additions & 4 deletions narwhal/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use node::{
};
use prometheus::Registry;
use std::sync::Arc;
use storage::NodeStorage;
use storage::{CertificateStoreCacheMetrics, NodeStorage};
use sui_keys::keypair_file::{
read_authority_keypair_from_file, read_network_keypair_from_file,
write_authority_keypair_to_file, write_keypair_to_file,
Expand Down Expand Up @@ -273,13 +273,15 @@ async fn run(
};

// Make the data store.
let store = NodeStorage::reopen(store_path);
let registry_service = RegistryService::new(Registry::new());
let certificate_store_cache_metrics =
CertificateStoreCacheMetrics::new(&registry_service.default_registry());

let store = NodeStorage::reopen(store_path, Some(certificate_store_cache_metrics));

// The channel returning the result for each transaction's execution.
let (_tx_transaction_confirmation, _rx_transaction_confirmation) = channel(100);

let registry_service = RegistryService::new(Registry::new());

// Check whether to run a primary, a worker, or an entire authority.
let (primary, worker) = match matches.subcommand() {
// Spawn the primary and consensus core.
Expand Down
4 changes: 2 additions & 2 deletions narwhal/node/tests/node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn simple_primary_worker_node_start_stop() {
let key_pair = authority.keypair();
let network_key_pair = authority.network_keypair();

let store = NodeStorage::reopen(temp_dir());
let store = NodeStorage::reopen(temp_dir(), None);

let (tx_confirmation, _rx_confirmation) = channel(10);
let execution_state = Arc::new(SimpleExecutionState::new(tx_confirmation));
Expand Down Expand Up @@ -114,7 +114,7 @@ async fn primary_node_restart() {
let key_pair = authority.keypair();
let network_key_pair = authority.network_keypair();

let store = NodeStorage::reopen(temp_dir());
let store = NodeStorage::reopen(temp_dir(), None);

let (tx_confirmation, _rx_confirmation) = channel(10);
let execution_state = Arc::new(SimpleExecutionState::new(tx_confirmation));
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/tests/certificate_fetcher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ async fn fetch_certificates_basic() {
let (tx_fetch_resp, rx_fetch_resp) = mpsc::channel(1000);

// Create test stores.
let store = NodeStorage::reopen(temp_dir());
let store = NodeStorage::reopen(temp_dir(), None);
let certificate_store = store.certificate_store.clone();
let payload_store = store.payload_store.clone();

Expand Down
4 changes: 3 additions & 1 deletion narwhal/primary/src/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::num::NonZeroUsize;
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use config::{AuthorityIdentifier, WorkerId};
use crypto::NetworkKeyPair;
use std::time::Duration;
use storage::{CertificateStore, HeaderStore, PayloadStore};
use storage::{CertificateStore, CertificateStoreCache, HeaderStore, PayloadStore};
use store::{reopen, rocks, rocks::DBMap, rocks::ReadWriteOptions};
use test_utils::{
temp_dir, PrimaryToWorkerMockServer, CERTIFICATES_CF, CERTIFICATE_DIGEST_BY_ORIGIN_CF,
Expand Down Expand Up @@ -53,6 +54,7 @@ pub fn create_db_stores() -> (HeaderStore, CertificateStore, PayloadStore) {
certificate_map,
certificate_digest_by_round_map,
certificate_digest_by_origin_map,
CertificateStoreCache::new(NonZeroUsize::new(100).unwrap(), None),
),
PayloadStore::new(payload_map),
)
Expand Down
5 changes: 3 additions & 2 deletions narwhal/primary/src/tests/primary_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use std::{
sync::Arc,
time::Duration,
};
use storage::PayloadToken;
use storage::{CertificateStore, VoteDigestStore};
use storage::{CertificateStoreCache, PayloadToken};
use storage::{NodeStorage, PayloadStore};
use store::rocks::{DBMap, MetricConf, ReadWriteOptions};
use test_utils::{make_optimal_signed_certificates, temp_dir, CommitteeFixture};
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn get_network_peers_from_admin_server() {
let worker_1_keypair = authority_1.worker(worker_id).keypair().copy();

// Make the data store.
let store = NodeStorage::reopen(temp_dir());
let store = NodeStorage::reopen(temp_dir(), None);

let (tx_new_certificates, rx_new_certificates) = types::metered_channel::channel(
CHANNEL_CAPACITY,
Expand Down Expand Up @@ -1177,6 +1177,7 @@ async fn test_process_payload_availability_when_failures() {
certificate_map,
certificate_digest_by_round_map,
certificate_digest_by_origin_map,
CertificateStoreCache::new(NonZeroUsize::new(100).unwrap(), None),
);
let payload_store = PayloadStore::new(payload_map);

Expand Down
8 changes: 4 additions & 4 deletions narwhal/primary/tests/integration_tests_proposer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn test_rounds_errors() {
};

// AND create separate data stores
let store_primary = NodeStorage::reopen(temp_dir());
let store_primary = NodeStorage::reopen(temp_dir(), None);

// Spawn the primary
let (tx_new_certificates, rx_new_certificates) =
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn test_rounds_return_successful_response() {
};

// AND create separate data stores
let store_primary = NodeStorage::reopen(temp_dir());
let store_primary = NodeStorage::reopen(temp_dir(), None);

// Spawn the primary
let (tx_new_certificates, rx_new_certificates) =
Expand Down Expand Up @@ -284,8 +284,8 @@ async fn test_node_read_causal_signed_certificates() {
let authority_2 = fixture.authorities().nth(1).unwrap();

// Make the data store.
let primary_store_1 = NodeStorage::reopen(temp_dir());
let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir());
let primary_store_1 = NodeStorage::reopen(temp_dir(), None);
let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir(), None);

let mut collection_ids: Vec<CertificateDigest> = Vec::new();

Expand Down
16 changes: 8 additions & 8 deletions narwhal/primary/tests/integration_tests_validator_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn test_get_collections() {
let worker_keypair = author.worker(worker_id).keypair().copy();

// Make the data store.
let store = NodeStorage::reopen(temp_dir());
let store = NodeStorage::reopen(temp_dir(), None);

let mut header_digests = Vec::new();
// Blocks/Collections
Expand Down Expand Up @@ -243,7 +243,7 @@ async fn test_remove_collections() {
let worker_keypair = author.worker(worker_id).keypair().copy();

// Make the data store.
let store = NodeStorage::reopen(temp_dir());
let store = NodeStorage::reopen(temp_dir(), None);
let mut header_digests = Vec::new();
// Blocks/Collections
let mut collection_digests = Vec::new();
Expand Down Expand Up @@ -470,8 +470,8 @@ async fn test_read_causal_signed_certificates() {
let authority_2 = fixture.authorities().nth(1).unwrap();

// Make the data store.
let primary_store_1 = NodeStorage::reopen(temp_dir());
let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir());
let primary_store_1 = NodeStorage::reopen(temp_dir(), None);
let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir(), None);

let mut collection_digests: Vec<CertificateDigest> = Vec::new();

Expand Down Expand Up @@ -698,8 +698,8 @@ async fn test_read_causal_unsigned_certificates() {
let network_keypair_2 = authority_2.network_keypair().copy();

// Make the data store.
let primary_store_1 = NodeStorage::reopen(temp_dir());
let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir());
let primary_store_1 = NodeStorage::reopen(temp_dir(), None);
let primary_store_2: NodeStorage = NodeStorage::reopen(temp_dir(), None);

let mut collection_digests: Vec<CertificateDigest> = Vec::new();

Expand Down Expand Up @@ -936,8 +936,8 @@ async fn test_get_collections_with_missing_certificates() {
};

// AND create separate data stores for the 2 primaries
let store_primary_1 = NodeStorage::reopen(temp_dir());
let store_primary_2 = NodeStorage::reopen(temp_dir());
let store_primary_1 = NodeStorage::reopen(temp_dir(), None);
let store_primary_2 = NodeStorage::reopen(temp_dir(), None);

// The certificate_1 will be stored in primary 1
let (certificate_1, batch_1) = fixture_certificate(
Expand Down
4 changes: 4 additions & 0 deletions narwhal/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ types = { path = "../types", package = "narwhal-types" }
store = { path = "../../crates/typed-store", package = "typed-store" }
config = { path = "../config", package = "narwhal-config" }
workspace-hack = { version = "0.1", path = "../../crates/workspace-hack" }
prometheus = "0.13.3"
fail = "0.5.1"
lru = "0.10"
parking_lot = "0.12.1"
tap = "1.0.1"

[dev-dependencies]
test-utils = { path = "../test-utils", package = "narwhal-test-utils" }
Expand Down
Loading

0 comments on commit 27ca30d

Please sign in to comment.