diff --git a/Cargo.lock b/Cargo.lock index 9198572afd43..c5f83fddd86a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3951,9 +3951,9 @@ dependencies = [ [[package]] name = "lz4" -version = "1.23.2" +version = "1.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aac20ed6991e01bf6a2e68cc73df2b389707403662a8ba89f68511fb340f724c" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" dependencies = [ "libc", "lz4-sys", @@ -3961,9 +3961,9 @@ dependencies = [ [[package]] name = "lz4-sys" -version = "1.9.2" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca79aa95d8b3226213ad454d328369853be3a1382d89532a854f4d69640acae" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" dependencies = [ "cc", "libc", @@ -5669,6 +5669,25 @@ dependencies = [ "snap", ] +[[package]] +name = "parity-db" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a7511a0bec4a336b5929999d02b560d2439c993cccf98c26481484e811adc43" +dependencies = [ + "blake2", + "crc32fast", + "fs2", + "hex", + "libc", + "log", + "lz4", + "memmap2 0.5.0", + "parking_lot 0.12.1", + "rand 0.8.5", + "snap", +] + [[package]] name = "parity-scale-codec" version = "3.1.5" @@ -6756,11 +6775,12 @@ dependencies = [ "futures", "itertools", "kvdb", + "kvdb-memorydb", "kvdb-shared-tests", "lazy_static", "log", "lru 0.8.0", - "parity-db", + "parity-db 0.4.2", "parity-scale-codec", "parity-util-mem", "parking_lot 0.11.2", @@ -7154,7 +7174,7 @@ dependencies = [ "pallet-im-online", "pallet-staking", "pallet-transaction-payment-rpc-runtime-api", - "parity-db", + "parity-db 0.4.2", "polkadot-approval-distribution", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", @@ -8583,7 +8603,7 @@ dependencies = [ "kvdb-rocksdb", "linked-hash-map", "log", - "parity-db", + "parity-db 0.3.16", "parity-scale-codec", "parking_lot 0.12.1", "sc-client-api", diff --git a/node/core/approval-voting/src/approval_db/v1/mod.rs b/node/core/approval-voting/src/approval_db/v1/mod.rs index 03b7aa68f134..858bcb8c36fe 100644 --- a/node/core/approval-voting/src/approval_db/v1/mod.rs +++ b/node/core/approval-voting/src/approval_db/v1/mod.rs @@ -90,41 +90,45 @@ impl Backend for DbBackend { match op { BackendWriteOp::WriteStoredBlockRange(stored_block_range) => { tx.put_vec( - self.config.col_data, + self.config.col_approval_data, &STORED_BLOCKS_KEY, stored_block_range.encode(), ); }, BackendWriteOp::DeleteStoredBlockRange => { - tx.delete(self.config.col_data, &STORED_BLOCKS_KEY); + tx.delete(self.config.col_approval_data, &STORED_BLOCKS_KEY); }, BackendWriteOp::WriteBlocksAtHeight(h, blocks) => { - tx.put_vec(self.config.col_data, &blocks_at_height_key(h), blocks.encode()); + tx.put_vec( + self.config.col_approval_data, + &blocks_at_height_key(h), + blocks.encode(), + ); }, BackendWriteOp::DeleteBlocksAtHeight(h) => { - tx.delete(self.config.col_data, &blocks_at_height_key(h)); + tx.delete(self.config.col_approval_data, &blocks_at_height_key(h)); }, BackendWriteOp::WriteBlockEntry(block_entry) => { let block_entry: BlockEntry = block_entry.into(); tx.put_vec( - self.config.col_data, + self.config.col_approval_data, &block_entry_key(&block_entry.block_hash), block_entry.encode(), ); }, BackendWriteOp::DeleteBlockEntry(hash) => { - tx.delete(self.config.col_data, &block_entry_key(&hash)); + tx.delete(self.config.col_approval_data, &block_entry_key(&hash)); }, BackendWriteOp::WriteCandidateEntry(candidate_entry) => { let candidate_entry: CandidateEntry = candidate_entry.into(); tx.put_vec( - self.config.col_data, + self.config.col_approval_data, &candidate_entry_key(&candidate_entry.candidate.hash()), candidate_entry.encode(), ); }, BackendWriteOp::DeleteCandidateEntry(candidate_hash) => { - tx.delete(self.config.col_data, &candidate_entry_key(&candidate_hash)); + tx.delete(self.config.col_approval_data, &candidate_entry_key(&candidate_hash)); }, } } @@ -149,7 +153,9 @@ pub type Bitfield = BitVec; #[derive(Debug, Clone, Copy)] pub struct Config { /// The column family in the database where data is stored. - pub col_data: u32, + pub col_approval_data: u32, + /// The column of the database where rolling session window data is stored. + pub col_session_data: u32, } /// Details pertaining to our assignment on a block. @@ -243,10 +249,10 @@ pub type Result = std::result::Result; pub(crate) fn load_decode( store: &dyn Database, - col_data: u32, + col_approval_data: u32, key: &[u8], ) -> Result> { - match store.get(col_data, key)? { + match store.get(col_approval_data, key)? { None => Ok(None), Some(raw) => D::decode(&mut &raw[..]).map(Some).map_err(Into::into), } @@ -303,7 +309,7 @@ pub fn load_stored_blocks( store: &dyn Database, config: &Config, ) -> SubsystemResult> { - load_decode(store, config.col_data, STORED_BLOCKS_KEY) + load_decode(store, config.col_approval_data, STORED_BLOCKS_KEY) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) } @@ -313,7 +319,7 @@ pub fn load_blocks_at_height( config: &Config, block_number: &BlockNumber, ) -> SubsystemResult> { - load_decode(store, config.col_data, &blocks_at_height_key(*block_number)) + load_decode(store, config.col_approval_data, &blocks_at_height_key(*block_number)) .map(|x| x.unwrap_or_default()) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) } @@ -324,7 +330,7 @@ pub fn load_block_entry( config: &Config, block_hash: &Hash, ) -> SubsystemResult> { - load_decode(store, config.col_data, &block_entry_key(block_hash)) + load_decode(store, config.col_approval_data, &block_entry_key(block_hash)) .map(|u: Option| u.map(|v| v.into())) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) } @@ -335,7 +341,7 @@ pub fn load_candidate_entry( config: &Config, candidate_hash: &CandidateHash, ) -> SubsystemResult> { - load_decode(store, config.col_data, &candidate_entry_key(candidate_hash)) + load_decode(store, config.col_approval_data, &candidate_entry_key(candidate_hash)) .map(|u: Option| u.map(|v| v.into())) .map_err(|e| SubsystemError::with_origin("approval-voting", e)) } diff --git a/node/core/approval-voting/src/approval_db/v1/tests.rs b/node/core/approval-voting/src/approval_db/v1/tests.rs index 548c64bcef03..06923c6a539f 100644 --- a/node/core/approval-voting/src/approval_db/v1/tests.rs +++ b/node/core/approval-voting/src/approval_db/v1/tests.rs @@ -28,9 +28,12 @@ use std::{collections::HashMap, sync::Arc}; use ::test_helpers::{dummy_candidate_receipt, dummy_candidate_receipt_bad_sig, dummy_hash}; const DATA_COL: u32 = 0; -const NUM_COLUMNS: u32 = 1; +const SESSION_DATA_COL: u32 = 1; -const TEST_CONFIG: Config = Config { col_data: DATA_COL }; +const NUM_COLUMNS: u32 = 2; + +const TEST_CONFIG: Config = + Config { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL }; fn make_db() -> (DbBackend, Arc) { let db = kvdb_memorydb::create(NUM_COLUMNS); diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index df713143750f..20629dd022d4 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -632,14 +632,15 @@ pub(crate) mod tests { pub(crate) use sp_runtime::{Digest, DigestItem}; use std::{pin::Pin, sync::Arc}; - use crate::{ - approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry, APPROVAL_SESSIONS, - }; + use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry}; const DATA_COL: u32 = 0; - const NUM_COLUMNS: u32 = 1; + const SESSION_DATA_COL: u32 = 1; + + const NUM_COLUMNS: u32 = 2; - const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_data: DATA_COL }; + const TEST_CONFIG: DatabaseConfig = + DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL }; #[derive(Default)] struct MockClock; @@ -654,22 +655,23 @@ pub(crate) mod tests { } fn blank_state() -> State { + let db = kvdb_memorydb::create(NUM_COLUMNS); + let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); + let db: Arc = Arc::new(db); State { session_window: None, keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria), + db, + db_config: TEST_CONFIG, } } fn single_session_state(index: SessionIndex, info: SessionInfo) -> State { State { - session_window: Some(RollingSessionWindow::with_session_info( - APPROVAL_SESSIONS, - index, - vec![info], - )), + session_window: Some(RollingSessionWindow::with_session_info(index, vec![info])), ..blank_state() } } @@ -782,11 +784,8 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let session_window = RollingSessionWindow::with_session_info( - APPROVAL_SESSIONS, - session, - vec![session_info], - ); + let session_window = + RollingSessionWindow::with_session_info(session, vec![session_info]); let header = header.clone(); Box::pin(async move { @@ -891,11 +890,8 @@ pub(crate) mod tests { .collect::>(); let test_fut = { - let session_window = RollingSessionWindow::with_session_info( - APPROVAL_SESSIONS, - session, - vec![session_info], - ); + let session_window = + RollingSessionWindow::with_session_info(session, vec![session_info]); let header = header.clone(); Box::pin(async move { @@ -1089,11 +1085,8 @@ pub(crate) mod tests { .map(|(r, c, g)| (r.hash(), r.clone(), *c, *g)) .collect::>(); - let session_window = Some(RollingSessionWindow::with_session_info( - APPROVAL_SESSIONS, - session, - vec![session_info], - )); + let session_window = + Some(RollingSessionWindow::with_session_info(session, vec![session_info])); let header = header.clone(); Box::pin(async move { @@ -1304,38 +1297,6 @@ pub(crate) mod tests { } ); - // Caching of sesssions needs sessoion of first unfinalied block. - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(header.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, header.number); - let _ = s_tx.send(Ok(Some(header.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, header.hash()); - let _ = s_tx.send(Ok(session)); - } - ); - // determine_new_blocks exits early as the parent_hash is in the DB assert_matches!( diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index e9757071f15e..b96992df2c88 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -44,8 +44,7 @@ use polkadot_node_subsystem_util::{ database::Database, metrics::{self, prometheus}, rolling_session_window::{ - new_session_window_size, RollingSessionWindow, SessionWindowSize, SessionWindowUpdate, - SessionsUnavailable, + DatabaseParams, RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable, }, TimeoutExt, }; @@ -97,8 +96,6 @@ use crate::{ #[cfg(test)] mod tests; -pub const APPROVAL_SESSIONS: SessionWindowSize = new_session_window_size!(6); - const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120); /// How long are we willing to wait for approval signatures? /// @@ -118,7 +115,9 @@ const LOG_TARGET: &str = "parachain::approval-voting"; #[derive(Debug, Clone)] pub struct Config { /// The column family in the DB where approval-voting data is stored. - pub col_data: u32, + pub col_approval_data: u32, + /// The of the DB where rolling session info is stored. + pub col_session_data: u32, /// The slot duration of the consensus algorithm, in milliseconds. Should be evenly /// divisible by 500. pub slot_duration_millis: u64, @@ -358,7 +357,10 @@ impl ApprovalVotingSubsystem { keystore, slot_duration_millis: config.slot_duration_millis, db, - db_config: DatabaseConfig { col_data: config.col_data }, + db_config: DatabaseConfig { + col_approval_data: config.col_approval_data, + col_session_data: config.col_session_data, + }, mode: Mode::Syncing(sync_oracle), metrics, } @@ -367,7 +369,10 @@ impl ApprovalVotingSubsystem { /// Revert to the block corresponding to the specified `hash`. /// The operation is not allowed for blocks older than the last finalized one. pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> { - let config = approval_db::v1::Config { col_data: self.db_config.col_data }; + let config = approval_db::v1::Config { + col_approval_data: self.db_config.col_approval_data, + col_session_data: self.db_config.col_session_data, + }; let mut backend = approval_db::v1::DbBackend::new(self.db.clone(), config); let mut overlay = OverlayedBackend::new(&backend); @@ -634,6 +639,9 @@ struct State { slot_duration_millis: u64, clock: Box, assignment_criteria: Box, + // Require for `RollingSessionWindow`. + db_config: DatabaseConfig, + db: Arc, } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] @@ -655,8 +663,17 @@ impl State { match session_window { None => { let sender = ctx.sender().clone(); - self.session_window = - Some(RollingSessionWindow::new(sender, APPROVAL_SESSIONS, head).await?); + self.session_window = Some( + RollingSessionWindow::new( + sender, + head, + DatabaseParams { + db: self.db.clone(), + db_column: self.db_config.col_session_data, + }, + ) + .await?, + ); Ok(None) }, Some(mut session_window) => { @@ -751,7 +768,7 @@ async fn run( where B: Backend, { - if let Err(err) = db_sanity_check(subsystem.db, subsystem.db_config) { + if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config.clone()) { gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check"); } @@ -761,6 +778,8 @@ where slot_duration_millis: subsystem.slot_duration_millis, clock, assignment_criteria, + db_config: subsystem.db_config, + db: subsystem.db, }; let mut wakeups = Wakeups::default(); diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index d5c8d3c01da4..b9063c8ade25 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::tests::test_constants::TEST_CONFIG; + use super::*; use polkadot_node_primitives::{ approval::{ @@ -111,9 +113,12 @@ fn make_sync_oracle(val: bool) -> (Box, TestSyncOracleHan pub mod test_constants { use crate::approval_db::v1::Config as DatabaseConfig; const DATA_COL: u32 = 0; - pub(crate) const NUM_COLUMNS: u32 = 1; + const SESSION_DATA_COL: u32 = 1; + + pub(crate) const NUM_COLUMNS: u32 = 2; - pub(crate) const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_data: DATA_COL }; + pub(crate) const TEST_CONFIG: DatabaseConfig = + DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL }; } struct MockSupportsParachains; @@ -487,8 +492,9 @@ fn test_harness>( context, ApprovalVotingSubsystem::with_config( Config { - col_data: test_constants::TEST_CONFIG.col_data, + col_approval_data: test_constants::TEST_CONFIG.col_approval_data, slot_duration_millis: SLOT_DURATION_MILLIS, + col_session_data: TEST_CONFIG.col_session_data, }, Arc::new(db), Arc::new(keystore), @@ -810,38 +816,38 @@ async fn import_block( } ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(number)); - } - ); + if !fork { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(number)); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, number); - let _ = s_tx.send(Ok(Some(hashes[number as usize].0))); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, number); + let _ = s_tx.send(Ok(Some(hashes[number as usize].0))); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hashes[number as usize].0); - let _ = s_tx.send(Ok(number.into())); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, hashes[number as usize].0); + let _ = s_tx.send(Ok(number.into())); + } + ); - if !fork { assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi( diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index 2c643d341de2..bb1456a59745 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -99,10 +99,10 @@ impl DbBackend { encoded = ?candidate_votes_session_prefix(index), "Cleaning votes for session index" ); - tx.delete_prefix(self.config.col_data, &candidate_votes_session_prefix(index)); + tx.delete_prefix(self.config.col_dispute_data, &candidate_votes_session_prefix(index)); } // New watermark: - tx.put_vec(self.config.col_data, CLEANED_VOTES_WATERMARK_KEY, clean_until.encode()); + tx.put_vec(self.config.col_dispute_data, CLEANED_VOTES_WATERMARK_KEY, clean_until.encode()); Ok(()) } } @@ -148,21 +148,32 @@ impl Backend for DbBackend { self.add_vote_cleanup_tx(&mut tx, session)?; // Actually write the earliest session. - tx.put_vec(self.config.col_data, EARLIEST_SESSION_KEY, session.encode()); + tx.put_vec( + self.config.col_dispute_data, + EARLIEST_SESSION_KEY, + session.encode(), + ); }, BackendWriteOp::WriteRecentDisputes(recent_disputes) => { - tx.put_vec(self.config.col_data, RECENT_DISPUTES_KEY, recent_disputes.encode()); + tx.put_vec( + self.config.col_dispute_data, + RECENT_DISPUTES_KEY, + recent_disputes.encode(), + ); }, BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => { gum::trace!(target: LOG_TARGET, ?session, "Writing candidate votes"); tx.put_vec( - self.config.col_data, + self.config.col_dispute_data, &candidate_votes_key(session, &candidate_hash), votes.encode(), ); }, BackendWriteOp::DeleteCandidateVotes(session, candidate_hash) => { - tx.delete(self.config.col_data, &candidate_votes_key(session, &candidate_hash)); + tx.delete( + self.config.col_dispute_data, + &candidate_votes_key(session, &candidate_hash), + ); }, } } @@ -195,7 +206,9 @@ fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] { #[derive(Debug, Clone)] pub struct ColumnConfiguration { /// The column in the key-value DB where data is stored. - pub col_data: u32, + pub col_dispute_data: u32, + /// The column in the key-value DB where session data is stored. + pub col_session_data: u32, } /// Tracked votes on candidates, for the purposes of dispute resolution. @@ -257,8 +270,12 @@ impl From for crate::error::Error { /// Result alias for DB errors. pub type Result = std::result::Result; -fn load_decode(db: &dyn Database, col_data: u32, key: &[u8]) -> Result> { - match db.get(col_data, key)? { +fn load_decode( + db: &dyn Database, + col_dispute_data: u32, + key: &[u8], +) -> Result> { + match db.get(col_dispute_data, key)? { None => Ok(None), Some(raw) => D::decode(&mut &raw[..]).map(Some).map_err(Into::into), } @@ -271,7 +288,7 @@ pub(crate) fn load_candidate_votes( session: SessionIndex, candidate_hash: &CandidateHash, ) -> SubsystemResult> { - load_decode(db, config.col_data, &candidate_votes_key(session, candidate_hash)) + load_decode(db, config.col_dispute_data, &candidate_votes_key(session, candidate_hash)) .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) } @@ -280,7 +297,7 @@ pub(crate) fn load_earliest_session( db: &dyn Database, config: &ColumnConfiguration, ) -> SubsystemResult> { - load_decode(db, config.col_data, EARLIEST_SESSION_KEY) + load_decode(db, config.col_dispute_data, EARLIEST_SESSION_KEY) .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) } @@ -289,7 +306,7 @@ pub(crate) fn load_recent_disputes( db: &dyn Database, config: &ColumnConfiguration, ) -> SubsystemResult> { - load_decode(db, config.col_data, RECENT_DISPUTES_KEY) + load_decode(db, config.col_dispute_data, RECENT_DISPUTES_KEY) .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) } @@ -347,7 +364,7 @@ fn load_cleaned_votes_watermark( db: &dyn Database, config: &ColumnConfiguration, ) -> FatalResult> { - load_decode(db, config.col_data, CLEANED_VOTES_WATERMARK_KEY) + load_decode(db, config.col_dispute_data, CLEANED_VOTES_WATERMARK_KEY) .map_err(|e| FatalError::DbReadFailed(e)) } @@ -362,7 +379,7 @@ mod tests { let db = kvdb_memorydb::create(1); let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]); let store = Arc::new(db); - let config = ColumnConfiguration { col_data: 0 }; + let config = ColumnConfiguration { col_dispute_data: 0, col_session_data: 1 }; DbBackend::new(store, config, Metrics::default()) } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 6289eb2f11a2..03abd8f59d60 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -35,7 +35,8 @@ use polkadot_node_subsystem::{ overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_util::{ - database::Database, rolling_session_window::RollingSessionWindow, + database::Database, + rolling_session_window::{DatabaseParams, RollingSessionWindow}, }; use polkadot_primitives::v2::{ScrapedOnChainVotes, ValidatorIndex, ValidatorPair}; @@ -117,12 +118,17 @@ pub struct DisputeCoordinatorSubsystem { #[derive(Debug, Clone, Copy)] pub struct Config { /// The data column in the store to use for dispute data. - pub col_data: u32, + pub col_dispute_data: u32, + /// The data column in the store to use for session data. + pub col_session_data: u32, } impl Config { fn column_config(&self) -> db::v1::ColumnConfiguration { - db::v1::ColumnConfiguration { col_data: self.col_data } + db::v1::ColumnConfiguration { + col_dispute_data: self.col_dispute_data, + col_session_data: self.col_session_data, + } } } @@ -199,17 +205,21 @@ impl DisputeCoordinatorSubsystem { B: Backend + 'static, { loop { - let (first_leaf, rolling_session_window) = match get_rolling_session_window(ctx).await { - Ok(Some(update)) => update, - Ok(None) => { - gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); - return Ok(None) - }, - Err(e) => { - e.split()?.log(); - continue - }, - }; + let db_params = + DatabaseParams { db: self.store.clone(), db_column: self.config.col_session_data }; + + let (first_leaf, rolling_session_window) = + match get_rolling_session_window(ctx, db_params).await { + Ok(Some(update)) => update, + Ok(None) => { + gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); + return Ok(None) + }, + Err(e) => { + e.split()?.log(); + continue + }, + }; let mut overlay_db = OverlayedBackend::new(&mut backend); let (participations, votes, spam_slots, ordering_provider) = match self @@ -352,12 +362,13 @@ impl DisputeCoordinatorSubsystem { #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] async fn get_rolling_session_window( ctx: &mut Context, + db_params: DatabaseParams, ) -> Result> { if let Some(leaf) = { wait_for_first_leaf(ctx) }.await? { let sender = ctx.sender().clone(); Ok(Some(( leaf.clone(), - RollingSessionWindow::new(sender, DISPUTE_WINDOW, leaf.hash) + RollingSessionWindow::new(sender, leaf.hash, db_params) .await .map_err(JfyiError::RollingSessionWindow)?, ))) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index c6fe328d9537..a1ad315d2ea0 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -176,7 +176,7 @@ impl Default for TestState { let db = kvdb_memorydb::create(1); let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); let db = Arc::new(db); - let config = Config { col_data: 0 }; + let config = Config { col_dispute_data: 0, col_session_data: 1 }; let genesis_header = Header { parent_hash: Hash::zero(), @@ -251,6 +251,7 @@ impl TestState { session: SessionIndex, ) { // Order of messages is not fixed (different on initializing): + #[derive(Debug)] struct FinishedSteps { got_session_information: bool, got_scraping_information: bool, @@ -268,7 +269,8 @@ impl TestState { let mut finished_steps = FinishedSteps::new(); while !finished_steps.is_done() { - match overseer_recv(virtual_overseer).await { + let recv = overseer_recv(virtual_overseer).await; + match recv { AllMessages::RuntimeApi(RuntimeApiMessage::Request( h, RuntimeApiRequest::SessionIndexForChild(tx), @@ -282,36 +284,38 @@ impl TestState { let _ = tx.send(Ok(session)); // Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`. - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(block_number)); - } - ); + if self.known_session.is_none() { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(block_number)); + } + ); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - number, - s_tx, - )) => { - assert_eq!(block_number, number); - let _ = s_tx.send(Ok(Some(block_hash))); - } - ); + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + number, + s_tx, + )) => { + assert_eq!(block_number, number); + let _ = s_tx.send(Ok(Some(block_hash))); + } + ); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, block_hash); - let _ = s_tx.send(Ok(session)); - } - ); + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, block_hash); + let _ = s_tx.send(Ok(session)); + } + ); + } // No queries, if subsystem knows about this session already. if self.known_session == Some(session) { @@ -754,6 +758,7 @@ fn approval_vote_import_works() { let approval_votes = [(ValidatorIndex(4), approval_vote.into_validator_signature())] .into_iter() .collect(); + handle_approval_vote_request(&mut virtual_overseer, &candidate_hash1, approval_votes) .await; @@ -2255,6 +2260,7 @@ fn resume_dispute_with_local_statement() { }, }) .await; + handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) .await; @@ -2469,6 +2475,7 @@ fn resume_dispute_with_local_statement_without_local_key() { test_state }) }); + // No keys: test_state.subsystem_keystore = make_keystore(vec![Sr25519Keyring::Two.to_seed()].into_iter()).into(); diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 82109f2e6ce4..e6e073546a13 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -71,7 +71,8 @@ serde_json = "1.0.81" thiserror = "1.0.31" kvdb = "0.12.0" kvdb-rocksdb = { version = "0.16.0", optional = true } -parity-db = { version = "0.3.16", optional = true } +parity-db = { version = "0.4.2", optional = true } + async-trait = "0.1.57" lru = "0.8" diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 3619d05c7592..18218a8aba8e 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -943,7 +943,8 @@ where let parachains_db = open_database(&config.database)?; let approval_voting_config = ApprovalVotingConfig { - col_data: parachains_db::REAL_COLUMNS.col_approval_data, + col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, + col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data, slot_duration_millis: slot_duration.as_millis() as u64, }; @@ -966,7 +967,8 @@ where }; let dispute_coordinator_config = DisputeCoordinatorConfig { - col_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data, + col_dispute_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data, + col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data, }; let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams { @@ -1537,7 +1539,8 @@ fn revert_chain_selection(db: Arc, hash: Hash) -> sp_blockchain::R fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::Result<()> { let config = approval_voting_subsystem::Config { - col_data: parachains_db::REAL_COLUMNS.col_approval_data, + col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, + col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data, slot_duration_millis: Default::default(), }; diff --git a/node/service/src/parachains_db/mod.rs b/node/service/src/parachains_db/mod.rs index de12a8ac1a32..74e7e13dc657 100644 --- a/node/service/src/parachains_db/mod.rs +++ b/node/service/src/parachains_db/mod.rs @@ -23,6 +23,7 @@ mod upgrade; const LOG_TARGET: &str = "parachain::db"; +/// Column configuration per version. #[cfg(any(test, feature = "full-node"))] pub(crate) mod columns { pub mod v0 { @@ -31,12 +32,17 @@ pub(crate) mod columns { pub mod v1 { pub const NUM_COLUMNS: u32 = 5; + } + pub mod v2 { + pub const NUM_COLUMNS: u32 = 6; pub const COL_AVAILABILITY_DATA: u32 = 0; pub const COL_AVAILABILITY_META: u32 = 1; pub const COL_APPROVAL_DATA: u32 = 2; pub const COL_CHAIN_SELECTION_DATA: u32 = 3; pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; + pub const COL_SESSION_WINDOW_DATA: u32 = 5; + pub const ORDERED_COL: &[u32] = &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; } @@ -56,16 +62,19 @@ pub struct ColumnsConfig { pub col_chain_selection_data: u32, /// The column used by dispute coordinator for data. pub col_dispute_coordinator_data: u32, + /// The column used for session window data. + pub col_session_window_data: u32, } /// The real columns used by the parachains DB. #[cfg(any(test, feature = "full-node"))] pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { - col_availability_data: columns::v1::COL_AVAILABILITY_DATA, - col_availability_meta: columns::v1::COL_AVAILABILITY_META, - col_approval_data: columns::v1::COL_APPROVAL_DATA, - col_chain_selection_data: columns::v1::COL_CHAIN_SELECTION_DATA, - col_dispute_coordinator_data: columns::v1::COL_DISPUTE_COORDINATOR_DATA, + col_availability_data: columns::v2::COL_AVAILABILITY_DATA, + col_availability_meta: columns::v2::COL_AVAILABILITY_META, + col_approval_data: columns::v2::COL_APPROVAL_DATA, + col_chain_selection_data: columns::v2::COL_CHAIN_SELECTION_DATA, + col_dispute_coordinator_data: columns::v2::COL_DISPUTE_COORDINATOR_DATA, + col_session_window_data: columns::v2::COL_SESSION_WINDOW_DATA, }; #[derive(PartialEq)] @@ -83,11 +92,18 @@ pub struct CacheSizes { pub availability_meta: usize, /// Cache used by approval data. pub approval_data: usize, + /// Cache used by session window data + pub session_data: usize, } impl Default for CacheSizes { fn default() -> Self { - CacheSizes { availability_data: 25, availability_meta: 1, approval_data: 5 } + CacheSizes { + availability_data: 25, + availability_meta: 1, + approval_data: 5, + session_data: 1, + } } } @@ -106,17 +122,20 @@ pub fn open_creating_rocksdb( let path = root.join("parachains").join("db"); - let mut db_config = DatabaseConfig::with_columns(columns::v1::NUM_COLUMNS); + let mut db_config = DatabaseConfig::with_columns(columns::v2::NUM_COLUMNS); let _ = db_config .memory_budget - .insert(columns::v1::COL_AVAILABILITY_DATA, cache_sizes.availability_data); + .insert(columns::v2::COL_AVAILABILITY_DATA, cache_sizes.availability_data); + let _ = db_config + .memory_budget + .insert(columns::v2::COL_AVAILABILITY_META, cache_sizes.availability_meta); let _ = db_config .memory_budget - .insert(columns::v1::COL_AVAILABILITY_META, cache_sizes.availability_meta); + .insert(columns::v2::COL_APPROVAL_DATA, cache_sizes.approval_data); let _ = db_config .memory_budget - .insert(columns::v1::COL_APPROVAL_DATA, cache_sizes.approval_data); + .insert(columns::v2::COL_SESSION_WINDOW_DATA, cache_sizes.session_data); let path_str = path .to_str() @@ -127,7 +146,7 @@ pub fn open_creating_rocksdb( let db = Database::open(&db_config, &path_str)?; let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new( db, - columns::v1::ORDERED_COL, + columns::v2::ORDERED_COL, ); Ok(Arc::new(db)) @@ -147,12 +166,12 @@ pub fn open_creating_paritydb( std::fs::create_dir_all(&path_str)?; upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?; - let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_1_config(&path)) + let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_2_config(&path)) .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new( db, - columns::v1::ORDERED_COL, + columns::v2::ORDERED_COL, ); Ok(Arc::new(db)) } diff --git a/node/service/src/parachains_db/upgrade.rs b/node/service/src/parachains_db/upgrade.rs index 73321ae04c09..01d4fb62f7f6 100644 --- a/node/service/src/parachains_db/upgrade.rs +++ b/node/service/src/parachains_db/upgrade.rs @@ -28,7 +28,7 @@ type Version = u32; const VERSION_FILE_NAME: &'static str = "parachain_db_version"; /// Current db version. -const CURRENT_VERSION: Version = 1; +const CURRENT_VERSION: Version = 2; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -36,7 +36,7 @@ pub enum Error { Io(#[from] io::Error), #[error("The version file format is incorrect")] CorruptedVersionFile, - #[error("Future version (expected {current:?}, found {got:?})")] + #[error("Parachains DB has a future version (expected {current:?}, found {got:?})")] FutureVersion { current: Version, got: Version }, } @@ -56,6 +56,8 @@ pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<() match get_db_version(db_path)? { // 0 -> 1 migration Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?, + // 1 -> 2 migration + Some(1) => migrate_from_version_1_to_2(db_path, db_kind)?, // Already at current version, do nothing. Some(CURRENT_VERSION) => (), // This is an arbitrary future version, we don't handle it. @@ -112,6 +114,19 @@ fn migrate_from_version_0_to_1(path: &Path, db_kind: DatabaseKind) -> Result<(), }) } +fn migrate_from_version_1_to_2(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> { + gum::info!(target: LOG_TARGET, "Migrating parachains db from version 1 to version 2 ..."); + + match db_kind { + DatabaseKind::ParityDB => paritydb_migrate_from_version_1_to_2(path), + DatabaseKind::RocksDB => rocksdb_migrate_from_version_1_to_2(path), + } + .and_then(|result| { + gum::info!(target: LOG_TARGET, "Migration complete! "); + Ok(result) + }) +} + /// Migration from version 0 to version 1: /// * the number of columns has changed from 3 to 5; fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { @@ -129,6 +144,22 @@ fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { Ok(()) } +/// Migration from version 1 to version 2: +/// * the number of columns has changed from 5 to 6; +fn rocksdb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> { + use kvdb_rocksdb::{Database, DatabaseConfig}; + + let db_path = path + .to_str() + .ok_or_else(|| super::other_io_error("Invalid database path".into()))?; + let db_cfg = DatabaseConfig::with_columns(super::columns::v1::NUM_COLUMNS); + let mut db = Database::open(&db_cfg, db_path)?; + + db.add_column()?; + + Ok(()) +} + // This currently clears columns which had their configs altered between versions. // The columns to be changed are constrained by the `allowed_columns` vector. fn paritydb_fix_columns( @@ -190,7 +221,18 @@ fn paritydb_fix_columns( pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); - for i in columns::v1::ORDERED_COL { + for i in columns::v2::ORDERED_COL { + options.columns[*i as usize].btree_index = true; + } + + options +} + +/// Database configuration for version 2. +pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options { + let mut options = + parity_db::Options::with_columns(&path, super::columns::v2::NUM_COLUMNS as u8); + for i in columns::v2::ORDERED_COL { options.columns[*i as usize].btree_index = true; } @@ -202,8 +244,8 @@ pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); - options.columns[super::columns::v1::COL_AVAILABILITY_META as usize].btree_index = true; - options.columns[super::columns::v1::COL_CHAIN_SELECTION_DATA as usize].btree_index = true; + options.columns[super::columns::v2::COL_AVAILABILITY_META as usize].btree_index = true; + options.columns[super::columns::v2::COL_CHAIN_SELECTION_DATA as usize].btree_index = true; options } @@ -218,17 +260,30 @@ fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { paritydb_fix_columns( path, paritydb_version_1_config(path), - vec![super::columns::v1::COL_DISPUTE_COORDINATOR_DATA], + vec![super::columns::v2::COL_DISPUTE_COORDINATOR_DATA], )?; Ok(()) } +/// Migration from version 1 to version 2: +/// - add a new column for session information storage +fn paritydb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> { + let mut options = paritydb_version_1_config(path); + + // Adds the session info column. + parity_db::Db::add_column(&mut options, Default::default()) + .map_err(|e| other_io_error(format!("Error adding column {:?}", e)))?; + + Ok(()) +} + #[cfg(test)] mod tests { + use super::{columns::v2::*, *}; + #[test] - fn test_paritydb_migrate_0_1() { - use super::{columns::v1::*, *}; + fn test_paritydb_migrate_0_to_1() { use parity_db::Db; let db_dir = tempfile::tempdir().unwrap(); @@ -246,13 +301,119 @@ mod tests { try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap(); let db = Db::open(&paritydb_version_1_config(&path)).unwrap(); + assert_eq!(db.get(COL_DISPUTE_COORDINATOR_DATA as u8, b"1234").unwrap(), None); assert_eq!( - db.get(super::columns::v1::COL_DISPUTE_COORDINATOR_DATA as u8, b"1234").unwrap(), - None + db.get(COL_AVAILABILITY_META as u8, b"5678").unwrap(), + Some("somevalue".as_bytes().to_vec()) ); + } + + #[test] + fn test_paritydb_migrate_1_to_2() { + use parity_db::Db; + + let db_dir = tempfile::tempdir().unwrap(); + let path = db_dir.path(); + + // We need to properly set db version for upgrade to work. + fs::write(version_file_path(path), "1").expect("Failed to write DB version"); + + { + let db = Db::open_or_create(&paritydb_version_1_config(&path)).unwrap(); + + // Write some dummy data + db.commit(vec![( + COL_DISPUTE_COORDINATOR_DATA as u8, + b"1234".to_vec(), + Some(b"somevalue".to_vec()), + )]) + .unwrap(); + + assert_eq!(db.num_columns(), columns::v1::NUM_COLUMNS as u8); + } + + try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap(); + + let db = Db::open(&paritydb_version_2_config(&path)).unwrap(); + + assert_eq!(db.num_columns(), columns::v2::NUM_COLUMNS as u8); + assert_eq!( - db.get(super::columns::v1::COL_AVAILABILITY_META as u8, b"5678").unwrap(), + db.get(COL_DISPUTE_COORDINATOR_DATA as u8, b"1234").unwrap(), Some("somevalue".as_bytes().to_vec()) ); + + // Test we can write the new column. + db.commit(vec![( + COL_SESSION_WINDOW_DATA as u8, + b"1337".to_vec(), + Some(b"0xdeadb00b".to_vec()), + )]) + .unwrap(); + + // Read back data from new column. + assert_eq!( + db.get(COL_SESSION_WINDOW_DATA as u8, b"1337").unwrap(), + Some("0xdeadb00b".as_bytes().to_vec()) + ); + } + + #[test] + fn test_rocksdb_migrate_1_to_2() { + use kvdb::{DBKey, DBOp}; + use kvdb_rocksdb::{Database, DatabaseConfig}; + use polkadot_node_subsystem_util::database::{ + kvdb_impl::DbAdapter, DBTransaction, KeyValueDB, + }; + + let db_dir = tempfile::tempdir().unwrap(); + let db_path = db_dir.path().to_str().unwrap(); + let db_cfg = DatabaseConfig::with_columns(super::columns::v1::NUM_COLUMNS); + let db = Database::open(&db_cfg, db_path).unwrap(); + assert_eq!(db.num_columns(), super::columns::v1::NUM_COLUMNS as u32); + + // We need to properly set db version for upgrade to work. + fs::write(version_file_path(db_dir.path()), "1").expect("Failed to write DB version"); + { + let db = DbAdapter::new(db, columns::v2::ORDERED_COL); + db.write(DBTransaction { + ops: vec![DBOp::Insert { + col: COL_DISPUTE_COORDINATOR_DATA, + key: DBKey::from_slice(b"1234"), + value: b"0xdeadb00b".to_vec(), + }], + }) + .unwrap(); + } + + try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB).unwrap(); + + let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS); + let db = Database::open(&db_cfg, db_path).unwrap(); + + assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS); + + let db = DbAdapter::new(db, columns::v2::ORDERED_COL); + + assert_eq!( + db.get(COL_DISPUTE_COORDINATOR_DATA, b"1234").unwrap(), + Some("0xdeadb00b".as_bytes().to_vec()) + ); + + // Test we can write the new column. + db.write(DBTransaction { + ops: vec![DBOp::Insert { + col: COL_SESSION_WINDOW_DATA, + key: DBKey::from_slice(b"1337"), + value: b"0xdeadb00b".to_vec(), + }], + }) + .unwrap(); + + // Read back data from new column. + assert_eq!( + db.get(COL_SESSION_WINDOW_DATA, b"1337").unwrap(), + Some("0xdeadb00b".as_bytes().to_vec()) + ); } } diff --git a/node/subsystem-util/Cargo.toml b/node/subsystem-util/Cargo.toml index ab886c0c4078..d390fd2b42cc 100644 --- a/node/subsystem-util/Cargo.toml +++ b/node/subsystem-util/Cargo.toml @@ -34,7 +34,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste kvdb = "0.12.0" parity-util-mem = { version = "0.12.0", default-features = false } -parity-db = { version = "0.3.13" } +parity-db = { version = "0.4.2"} [dev-dependencies] assert_matches = "1.4.0" @@ -46,3 +46,4 @@ lazy_static = "1.4.0" polkadot-primitives-test-helpers = { path = "../../primitives/test-helpers" } kvdb-shared-tests = "0.10.0" tempfile = "3.1.0" +kvdb-memorydb = "0.12.0" diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 700feb2ccff8..beac31292b7d 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -19,8 +19,13 @@ //! This is useful for consensus components which need to stay up-to-date about recent sessions but don't //! care about the state of particular blocks. +use super::database::{DBTransaction, Database}; +use kvdb::{DBKey, DBOp}; + +use parity_scale_codec::{Decode, Encode}; pub use polkadot_node_primitives::{new_session_window_size, SessionWindowSize}; use polkadot_primitives::v2::{BlockNumber, Hash, SessionIndex, SessionInfo}; +use std::sync::Arc; use futures::channel::oneshot; use polkadot_node_subsystem::{ @@ -29,7 +34,11 @@ use polkadot_node_subsystem::{ overseer, }; +// The window size is equal to the `approval-voting` and `dispute-coordinator` constants that +// have been obsoleted. +const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); const LOG_TARGET: &str = "parachain::rolling-session-window"; +const STORED_ROLLING_SESSION_WINDOW: &[u8] = b"Rolling_session_window"; /// Sessions unavailable in state to cache. #[derive(Debug, Clone, thiserror::Error)] @@ -94,55 +103,176 @@ pub enum SessionWindowUpdate { Unchanged, } +/// A structure to store rolling session database parameters. +#[derive(Clone)] +pub struct DatabaseParams { + /// Database reference. + pub db: Arc, + /// The column which stores the rolling session info. + pub db_column: u32, +} /// A rolling window of sessions and cached session info. pub struct RollingSessionWindow { earliest_session: SessionIndex, session_info: Vec, window_size: SessionWindowSize, + // The option is just to enable some approval-voting tests to force feed sessions + // in the window without dealing with the DB. + db_params: Option, +} + +/// The rolling session data we persist in the database. +#[derive(Encode, Decode, Default)] +struct StoredWindow { + earliest_session: SessionIndex, + session_info: Vec, } impl RollingSessionWindow { /// Initialize a new session info cache with the given window size. + /// Invariant: The database always contains the earliest session. Then, + /// we can always extend the session info vector using chain state. pub async fn new( mut sender: Sender, - window_size: SessionWindowSize, block_hash: Hash, + db_params: DatabaseParams, ) -> Result where Sender: overseer::SubsystemSender + overseer::SubsystemSender, { + // At first, determine session window start using the chain state. let session_index = get_session_index_for_child(&mut sender, block_hash).await?; let earliest_non_finalized_block_session = Self::earliest_non_finalized_block_session(&mut sender).await?; // This will increase the session window to cover the full unfinalized chain. - let window_start = std::cmp::min( - session_index.saturating_sub(window_size.get() - 1), + let on_chain_window_start = std::cmp::min( + session_index.saturating_sub(SESSION_WINDOW_SIZE.get() - 1), earliest_non_finalized_block_session, ); - match load_all_sessions(&mut sender, block_hash, window_start, session_index).await { - Err(kind) => Err(SessionsUnavailable { - kind, - info: Some(SessionsUnavailableInfo { - window_start, - window_end: session_index, - block_hash, + // Fetch session information from DB. + let maybe_stored_window = Self::db_load(db_params.clone()); + + // Get the DB stored sessions and recompute window start based on DB data. + let (mut window_start, stored_sessions) = + if let Some(mut stored_window) = maybe_stored_window { + // Check if DB is ancient. + if earliest_non_finalized_block_session > + stored_window.earliest_session + stored_window.session_info.len() as u32 + { + // If ancient, we scrap it and fetch from chain state. + stored_window.session_info.clear(); + } + + // The session window might extend beyond the last finalized block, but that's fine as we'll prune it at + // next update. + let window_start = if stored_window.session_info.len() > 0 { + // If there is at least one entry in db, we always take the DB as source of truth. + stored_window.earliest_session + } else { + on_chain_window_start + }; + + (window_start, stored_window.session_info) + } else { + (on_chain_window_start, Vec::new()) + }; + + // Compute the amount of sessions missing from the window that will be fetched from chain state. + let sessions_missing_count = session_index + .saturating_sub(window_start) + .saturating_add(1) + .saturating_sub(stored_sessions.len() as u32); + + // Extend from chain state. + let sessions = if sessions_missing_count > 0 { + match extend_sessions_from_chain_state( + stored_sessions, + &mut sender, + block_hash, + &mut window_start, + session_index, + ) + .await + { + Err(kind) => Err(SessionsUnavailable { + kind, + info: Some(SessionsUnavailableInfo { + window_start, + window_end: session_index, + block_hash, + }), }), - }), - Ok(s) => Ok(Self { earliest_session: window_start, session_info: s, window_size }), + Ok(sessions) => Ok(sessions), + }? + } else { + // There are no new sessions to be fetched from chain state. + Vec::new() + }; + + Ok(Self { + earliest_session: window_start, + session_info: sessions, + window_size: SESSION_WINDOW_SIZE, + db_params: Some(db_params), + }) + } + + // Load session information from the parachains db. + fn db_load(db_params: DatabaseParams) -> Option { + match db_params.db.get(db_params.db_column, STORED_ROLLING_SESSION_WINDOW).ok()? { + None => None, + Some(raw) => { + let maybe_decoded = StoredWindow::decode(&mut &raw[..]).map(Some); + match maybe_decoded { + Ok(decoded) => decoded, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?err, + "Failed decoding db entry; will start with onchain session infos and self-heal DB entry on next update." + ); + None + }, + } + }, + } + } + + // Saves/Updates all sessions in the database. + // TODO: https://github.com/paritytech/polkadot/issues/6144 + fn db_save(&mut self, stored_window: StoredWindow) { + if let Some(db_params) = self.db_params.as_ref() { + match db_params.db.write(DBTransaction { + ops: vec![DBOp::Insert { + col: db_params.db_column, + key: DBKey::from_slice(STORED_ROLLING_SESSION_WINDOW), + value: stored_window.encode(), + }], + }) { + Ok(_) => {}, + Err(err) => { + gum::warn!(target: LOG_TARGET, ?err, "Failed writing db entry"); + }, + } } } /// Initialize a new session info cache with the given window size and /// initial data. + /// This is only used in `approval voting` tests. pub fn with_session_info( - window_size: SessionWindowSize, earliest_session: SessionIndex, session_info: Vec, ) -> Self { - RollingSessionWindow { earliest_session, session_info, window_size } + RollingSessionWindow { + earliest_session, + session_info, + window_size: SESSION_WINDOW_SIZE, + db_params: None, + } } /// Access the session info for the given session index, if stored within the window. @@ -262,11 +392,6 @@ impl RollingSessionWindow { + overseer::SubsystemSender, { let session_index = get_session_index_for_child(sender, block_hash).await?; - let earliest_non_finalized_block_session = - Self::earliest_non_finalized_block_session(sender).await?; - - let old_window_start = self.earliest_session; - let latest = self.latest_session(); // Either cached or ancient. @@ -274,6 +399,10 @@ impl RollingSessionWindow { return Ok(SessionWindowUpdate::Unchanged) } + let earliest_non_finalized_block_session = + Self::earliest_non_finalized_block_session(sender).await?; + + let old_window_start = self.earliest_session; let old_window_end = latest; // Ensure we keep sessions up to last finalized block by adjusting the window start. @@ -283,16 +412,34 @@ impl RollingSessionWindow { earliest_non_finalized_block_session, ); - // keep some of the old window, if applicable. - let overlap_start = window_start.saturating_sub(old_window_start); + // Never look back past earliest session, since if sessions beyond were not needed or available + // in the past remains valid for the future (window only advances forward). + let mut window_start = std::cmp::max(window_start, self.earliest_session); + + let mut sessions = self.session_info.clone(); + let sessions_out_of_window = window_start.saturating_sub(old_window_start) as usize; - let fresh_start = if latest < window_start { window_start } else { latest + 1 }; + let sessions = if sessions_out_of_window < sessions.len() { + // Drop sessions based on how much the window advanced. + sessions.split_off((window_start as usize).saturating_sub(old_window_start as usize)) + } else { + // Window has jumped such that we need to fetch all sessions from on chain. + Vec::new() + }; - match load_all_sessions(sender, block_hash, fresh_start, session_index).await { + match extend_sessions_from_chain_state( + sessions, + sender, + block_hash, + &mut window_start, + session_index, + ) + .await + { Err(kind) => Err(SessionsUnavailable { kind, info: Some(SessionsUnavailableInfo { - window_start: fresh_start, + window_start, window_end: session_index, block_hash, }), @@ -305,15 +452,19 @@ impl RollingSessionWindow { new_window_end: session_index, }; - let outdated = std::cmp::min(overlap_start as usize, self.session_info.len()); - self.session_info.drain(..outdated); - self.session_info.extend(s); + self.session_info = s; + // we need to account for this case: // window_start ................................... session_index // old_window_start ........... latest let new_earliest = std::cmp::max(window_start, old_window_start); self.earliest_session = new_earliest; + // Update current window in DB. + self.db_save(StoredWindow { + earliest_session: self.earliest_session, + session_info: self.session_info.clone(), + }); Ok(update) }, } @@ -354,13 +505,23 @@ async fn get_session_index_for_child( } } -async fn load_all_sessions( +/// Attempts to extend db stored sessions with sessions missing between `start` and up to `end_inclusive`. +/// Runtime session info fetching errors are ignored if that doesn't create a gap in the window. +async fn extend_sessions_from_chain_state( + stored_sessions: Vec, sender: &mut impl overseer::SubsystemSender, block_hash: Hash, - start: SessionIndex, + window_start: &mut SessionIndex, end_inclusive: SessionIndex, ) -> Result, SessionsUnavailableReason> { - let mut v = Vec::new(); + // Start from the db sessions. + let mut sessions = stored_sessions; + // We allow session fetch failures only if we won't create a gap in the window by doing so. + // If `allow_failure` is set to true here, fetching errors are ignored until we get a first session. + let mut allow_failure = sessions.is_empty(); + + let start = *window_start + sessions.len() as u32; + for i in start..=end_inclusive { let (tx, rx) = oneshot::channel(); sender @@ -370,22 +531,58 @@ async fn load_all_sessions( )) .await; - let session_info = match rx.await { - Ok(Ok(Some(s))) => s, - Ok(Ok(None)) => return Err(SessionsUnavailableReason::Missing(i)), - Ok(Err(e)) => return Err(SessionsUnavailableReason::RuntimeApi(e)), - Err(canceled) => return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)), + match rx.await { + Ok(Ok(Some(session_info))) => { + // We do not allow failure anymore after having at least 1 session in window. + allow_failure = false; + sessions.push(session_info); + }, + Ok(Ok(None)) if !allow_failure => return Err(SessionsUnavailableReason::Missing(i)), + Ok(Ok(None)) => { + // Handle `allow_failure` true. + // If we didn't get the session, we advance window start. + *window_start += 1; + gum::debug!( + target: LOG_TARGET, + session = ?i, + "Session info missing from runtime." + ); + }, + Ok(Err(e)) if !allow_failure => return Err(SessionsUnavailableReason::RuntimeApi(e)), + Err(canceled) if !allow_failure => + return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)), + Ok(Err(err)) => { + // Handle `allow_failure` true. + // If we didn't get the session, we advance window start. + *window_start += 1; + gum::debug!( + target: LOG_TARGET, + session = ?i, + ?err, + "Error while fetching session information." + ); + }, + Err(err) => { + // Handle `allow_failure` true. + // If we didn't get the session, we advance window start. + *window_start += 1; + gum::debug!( + target: LOG_TARGET, + session = ?i, + ?err, + "Channel error while fetching session information." + ); + }, }; - - v.push(session_info); } - Ok(v) + Ok(sessions) } #[cfg(test)] mod tests { use super::*; + use crate::database::kvdb_impl::DbAdapter; use assert_matches::assert_matches; use polkadot_node_subsystem::{ messages::{AllMessages, AvailabilityRecoveryMessage}, @@ -395,7 +592,16 @@ mod tests { use polkadot_primitives::v2::Header; use sp_core::testing::TaskExecutor; - pub const TEST_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); + const SESSION_DATA_COL: u32 = 0; + + const NUM_COLUMNS: u32 = 1; + + fn dummy_db_params() -> DatabaseParams { + let db = kvdb_memorydb::create(NUM_COLUMNS); + let db = DbAdapter::new(db, &[]); + let db: Arc = Arc::new(db); + DatabaseParams { db, db_column: SESSION_DATA_COL } + } fn dummy_session_info(index: SessionIndex) -> SessionInfo { SessionInfo { @@ -420,7 +626,10 @@ mod tests { session: SessionIndex, window: Option, expect_requests_from: SessionIndex, - ) { + db_params: Option, + ) -> RollingSessionWindow { + let db_params = db_params.unwrap_or(dummy_db_params()); + let header = Header { digest: Default::default(), extrinsics_root: Default::default(), @@ -448,9 +657,8 @@ mod tests { let test_fut = { Box::pin(async move { let window = match window { - None => RollingSessionWindow::new(sender.clone(), TEST_WINDOW_SIZE, hash) - .await - .unwrap(), + None => + RollingSessionWindow::new(sender.clone(), hash, db_params).await.unwrap(), Some(mut window) => { window.cache_session_info_for_head(sender, hash).await.unwrap(); window @@ -461,6 +669,8 @@ mod tests { window.session_info, (expected_start_session..=session).map(dummy_session_info).collect::>(), ); + + window }) }; @@ -522,12 +732,43 @@ mod tests { } }); - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); + let (window, _) = futures::executor::block_on(futures::future::join(test_fut, aux_fut)); + window + } + + #[test] + fn cache_session_info_start_empty_db() { + let db_params = dummy_db_params(); + + let window = cache_session_info_test( + (10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + 10, + None, + (10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + Some(db_params.clone()), + ); + + let window = cache_session_info_test( + (11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + 11, + Some(window), + 11, + None, + ); + assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize); + + cache_session_info_test( + (11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + 12, + None, + 12, + Some(db_params), + ); } #[test] fn cache_session_info_first_early() { - cache_session_info_test(0, 1, None, 0); + cache_session_info_test(0, 1, None, 0, None); } #[test] @@ -535,19 +776,21 @@ mod tests { let window = RollingSessionWindow { earliest_session: 1, session_info: vec![dummy_session_info(1)], - window_size: TEST_WINDOW_SIZE, + window_size: SESSION_WINDOW_SIZE, + db_params: Some(dummy_db_params()), }; - cache_session_info_test(1, 2, Some(window), 2); + cache_session_info_test(1, 2, Some(window), 2, None); } #[test] fn cache_session_info_first_late() { cache_session_info_test( - (100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1), + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), 100, None, - (100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1), + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + None, ); } @@ -560,48 +803,88 @@ mod tests { dummy_session_info(51), dummy_session_info(52), ], - window_size: TEST_WINDOW_SIZE, + window_size: SESSION_WINDOW_SIZE, + db_params: Some(dummy_db_params()), }; cache_session_info_test( - (100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1), + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), 100, Some(window), - (100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1), + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + None, ); } #[test] fn cache_session_info_roll_full() { - let start = 99 - (TEST_WINDOW_SIZE.get() - 1); + let start = 99 - (SESSION_WINDOW_SIZE.get() - 1); let window = RollingSessionWindow { earliest_session: start, session_info: (start..=99).map(dummy_session_info).collect(), - window_size: TEST_WINDOW_SIZE, + window_size: SESSION_WINDOW_SIZE, + db_params: Some(dummy_db_params()), }; cache_session_info_test( - (100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1), + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), 100, Some(window), 100, // should only make one request. + None, + ); + } + + #[test] + fn cache_session_info_roll_many_full_db() { + let db_params = dummy_db_params(); + let start = 97 - (SESSION_WINDOW_SIZE.get() - 1); + let window = RollingSessionWindow { + earliest_session: start, + session_info: (start..=97).map(dummy_session_info).collect(), + window_size: SESSION_WINDOW_SIZE, + db_params: Some(db_params.clone()), + }; + + cache_session_info_test( + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + 100, + Some(window), + 98, + None, + ); + + // We expect the session to be populated from DB, and only fetch 101 from on chain. + cache_session_info_test( + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), + 101, + None, + 101, + Some(db_params.clone()), ); + + // Session warps in the future. + let window = cache_session_info_test(195, 200, None, 195, Some(db_params)); + + assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize); } #[test] fn cache_session_info_roll_many_full() { - let start = 97 - (TEST_WINDOW_SIZE.get() - 1); + let start = 97 - (SESSION_WINDOW_SIZE.get() - 1); let window = RollingSessionWindow { earliest_session: start, session_info: (start..=97).map(dummy_session_info).collect(), - window_size: TEST_WINDOW_SIZE, + window_size: SESSION_WINDOW_SIZE, + db_params: Some(dummy_db_params()), }; cache_session_info_test( - (100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE.get() - 1), + (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), 100, Some(window), 98, + None, ); } @@ -611,7 +894,8 @@ mod tests { let window = RollingSessionWindow { earliest_session: start, session_info: (0..=1).map(dummy_session_info).collect(), - window_size: TEST_WINDOW_SIZE, + window_size: SESSION_WINDOW_SIZE, + db_params: Some(dummy_db_params()), }; cache_session_info_test( @@ -619,6 +903,7 @@ mod tests { 2, Some(window), 2, // should only make one request. + None, ); } @@ -628,14 +913,17 @@ mod tests { let window = RollingSessionWindow { earliest_session: start, session_info: (0..=1).map(dummy_session_info).collect(), - window_size: TEST_WINDOW_SIZE, + window_size: SESSION_WINDOW_SIZE, + db_params: Some(dummy_db_params()), }; - cache_session_info_test(0, 3, Some(window), 2); + let actual_window_size = window.session_info.len() as u32; + + cache_session_info_test(0, 3, Some(window), actual_window_size, None); } #[test] - fn any_session_stretch_for_unfinalized_chain() { + fn cache_session_fails_for_gap_in_window() { // Session index of the tip of our fake test chain. let session: SessionIndex = 100; let genesis_session: SessionIndex = 0; @@ -664,7 +952,8 @@ mod tests { let test_fut = { let sender = ctx.sender().clone(); Box::pin(async move { - let res = RollingSessionWindow::new(sender, TEST_WINDOW_SIZE, hash).await; + let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await; + assert!(res.is_err()); }) }; @@ -713,6 +1002,135 @@ mod tests { ); // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch. + // First 50 sessions are missing. + for i in genesis_session..=50 { + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(j, s_tx), + )) => { + assert_eq!(h, hash); + assert_eq!(i, j); + let _ = s_tx.send(Ok(None)); + } + ); + } + // next 10 sessions are present + for i in 51..=60 { + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(j, s_tx), + )) => { + assert_eq!(h, hash); + assert_eq!(i, j); + let _ = s_tx.send(Ok(Some(dummy_session_info(i)))); + } + ); + } + // gap of 1 session + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(j, s_tx), + )) => { + assert_eq!(h, hash); + assert_eq!(61, j); + let _ = s_tx.send(Ok(None)); + } + ); + }); + + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); + } + + #[test] + fn any_session_stretch_with_failure_allowed_for_unfinalized_chain() { + // Session index of the tip of our fake test chain. + let session: SessionIndex = 100; + let genesis_session: SessionIndex = 0; + + let header = Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: 5, + state_root: Default::default(), + parent_hash: Default::default(), + }; + + let finalized_header = Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: 0, + state_root: Default::default(), + parent_hash: Default::default(), + }; + + let pool = TaskExecutor::new(); + let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); + + let hash = header.hash(); + + let test_fut = { + let sender = ctx.sender().clone(); + Box::pin(async move { + let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await; + assert!(res.is_ok()); + let rsw = res.unwrap(); + // Since first 50 sessions are missing the earliest should be 50. + assert_eq!(rsw.earliest_session, 50); + assert_eq!(rsw.session_info.len(), 51); + }) + }; + + let aux_fut = Box::pin(async move { + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, hash); + let _ = s_tx.send(Ok(session)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(finalized_header.number)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, finalized_header.number); + let _ = s_tx.send(Ok(Some(finalized_header.hash()))); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, finalized_header.hash()); + let _ = s_tx.send(Ok(0)); + } + ); + + // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch. + // We also test if failure is allowed for 50 first missing sessions. for i in genesis_session..=session { assert_matches!( handle.recv().await, @@ -723,7 +1141,7 @@ mod tests { assert_eq!(h, hash); assert_eq!(i, j); - let _ = s_tx.send(Ok(if i == session { + let _ = s_tx.send(Ok(if i < 50 { None } else { Some(dummy_session_info(i)) @@ -739,7 +1157,7 @@ mod tests { #[test] fn any_session_unavailable_for_caching_means_no_change() { let session: SessionIndex = 6; - let start_session = session.saturating_sub(TEST_WINDOW_SIZE.get() - 1); + let start_session = session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1); let header = Header { digest: Default::default(), @@ -765,7 +1183,7 @@ mod tests { let test_fut = { let sender = ctx.sender().clone(); Box::pin(async move { - let res = RollingSessionWindow::new(sender, TEST_WINDOW_SIZE, hash).await; + let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await; assert!(res.is_err()); }) }; @@ -857,7 +1275,7 @@ mod tests { Box::pin(async move { let sender = ctx.sender().clone(); let window = - RollingSessionWindow::new(sender, TEST_WINDOW_SIZE, hash).await.unwrap(); + RollingSessionWindow::new(sender, hash, dummy_db_params()).await.unwrap(); assert_eq!(window.earliest_session, session); assert_eq!(window.session_info, vec![dummy_session_info(session)]);