diff --git a/Cargo.lock b/Cargo.lock index d222b9ed2e..000658dc6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11354,6 +11354,7 @@ dependencies = [ "parity-db", "parity-scale-codec", "parking_lot 0.12.1", + "prometheus-client 0.21.2", "rand 0.8.5", "rayon", "schnorrkel", @@ -11366,6 +11367,7 @@ dependencies = [ "subspace-core-primitives", "subspace-erasure-coding", "subspace-farmer-components", + "subspace-metrics", "subspace-networking", "subspace-proof-of-space", "subspace-rpc-primitives", @@ -11449,6 +11451,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "subspace-metrics" +version = "0.1.0" +dependencies = [ + "actix-web", + "async-mutex", + "libp2p 0.52.2", + "parking_lot 0.12.1", + "prometheus", + "prometheus-client 0.21.2", + "tracing", +] + [[package]] name = "subspace-networking" version = "0.1.0" @@ -11482,6 +11497,7 @@ dependencies = [ "serde", "serde_json", "subspace-core-primitives", + "subspace-metrics", "tempfile", "thiserror", "tokio", @@ -11638,6 +11654,7 @@ dependencies = [ "sp-std", "sp-transaction-pool", "sp-version", + "static_assertions", "subspace-core-primitives", "subspace-runtime-primitives", "subspace-verification", @@ -11675,6 +11692,7 @@ dependencies = [ "pallet-transaction-payment-rpc-runtime-api", "parity-scale-codec", "parking_lot 0.12.1", + "prometheus-client 0.21.2", "sc-basic-authorship", "sc-chain-spec", "sc-client-api", @@ -11716,6 +11734,7 @@ dependencies = [ "subspace-archiving", "subspace-core-primitives", "subspace-fraud-proof", + "subspace-metrics", "subspace-networking", "subspace-proof-of-space", "subspace-runtime-primitives", @@ -11806,6 +11825,7 @@ dependencies = [ "sp-std", "sp-transaction-pool", "sp-version", + "static_assertions", "subspace-core-primitives", "subspace-runtime-primitives", "subspace-verification", diff --git a/Cargo.toml b/Cargo.toml index d47f46baa1..54d3c1a290 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "domains/service", "domains/test/runtime/*", "domains/test/service", + "shared/*", "test/subspace-test-client", "test/subspace-test-runtime", "test/subspace-test-service", diff --git a/crates/pallet-subspace/src/lib.rs b/crates/pallet-subspace/src/lib.rs index ad53d12475..42caf9112c 100644 --- a/crates/pallet-subspace/src/lib.rs +++ b/crates/pallet-subspace/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(assert_matches, const_option)] // Copyright (C) 2019-2021 Parity Technologies (UK) Ltd. // Copyright (C) 2021 Subspace Labs, Inc. // SPDX-License-Identifier: Apache-2.0 @@ -16,6 +15,7 @@ // limitations under the License. #![doc = include_str!("../README.md")] #![cfg_attr(not(feature = "std"), no_std)] +#![feature(assert_matches, const_option, let_chains)] #![warn(unused_must_use, unsafe_code, unused_variables, unused_must_use)] extern crate alloc; @@ -47,15 +47,19 @@ pub use pallet::*; use scale_info::TypeInfo; use schnorrkel::SignatureError; use sp_consensus_slots::Slot; +#[cfg(feature = "pot")] +use sp_consensus_subspace::consensus::is_proof_of_time_valid; use sp_consensus_subspace::consensus::verify_solution; use sp_consensus_subspace::digests::CompatibleDigestItem; use sp_consensus_subspace::offence::{OffenceDetails, OffenceError, OnOffenceHandler}; -#[cfg(feature = "pot")] -use sp_consensus_subspace::PotParameters; use sp_consensus_subspace::{ ChainConstants, EquivocationProof, FarmerPublicKey, FarmerSignature, SignedVote, Vote, }; +#[cfg(feature = "pot")] +use sp_consensus_subspace::{PotParameters, PotParametersChange, WrappedPotOutput}; use sp_runtime::generic::DigestItem; +#[cfg(feature = "pot")] +use sp_runtime::traits::CheckedSub; use sp_runtime::traits::{BlockNumberProvider, Hash, One, Zero}; #[cfg(not(feature = "pot"))] use sp_runtime::traits::{SaturatedConversion, Saturating}; @@ -68,12 +72,14 @@ use sp_std::collections::btree_map::BTreeMap; use sp_std::prelude::*; use subspace_core_primitives::crypto::Scalar; #[cfg(feature = "pot")] -use subspace_core_primitives::PotProof; +use subspace_core_primitives::BlockHash; use subspace_core_primitives::{ ArchivedHistorySegment, HistorySize, PublicKey, Randomness, RewardSignature, SectorId, - SectorIndex, SegmentHeader, SegmentIndex, SolutionRange, + SectorIndex, SegmentHeader, SegmentIndex, SlotNumber, SolutionRange, }; use subspace_solving::REWARD_SIGNING_CONTEXT; +#[cfg(feature = "pot")] +use subspace_verification::derive_pot_entropy; #[cfg(not(feature = "pot"))] use subspace_verification::derive_randomness; use subspace_verification::{ @@ -121,6 +127,7 @@ impl EraChangeTrigger for NormalEraChange { #[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, MaxEncodedLen, TypeInfo)] struct VoteVerificationData { + #[cfg(not(feature = "pot"))] global_randomness: Randomness, solution_range: SolutionRange, current_slot: Slot, @@ -152,7 +159,8 @@ mod pallet { use sp_std::prelude::*; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ - HistorySize, Randomness, SectorIndex, SegmentHeader, SegmentIndex, SolutionRange, + Blake3Hash, HistorySize, Randomness, SectorIndex, SegmentHeader, SegmentIndex, + SolutionRange, }; pub(super) struct InitialSolutionRanges { @@ -200,10 +208,25 @@ mod pallet { // TODO: Remove when switching to PoT by default type GlobalRandomnessUpdateInterval: Get; - /// The amount of time, in blocks, between updates of global randomness. + /// Number of slots between slot arrival and when corresponding block can be produced. + /// + /// Practically this means future proof of time proof needs to be revealed this many slots + /// ahead before block can be authored even though solution is available before that. #[pallet::constant] type BlockAuthoringDelay: Get; + /// Interval, in blocks, between blockchain entropy injection into proof of time chain. + #[pallet::constant] + type PotEntropyInjectionInterval: Get; + + /// Interval, in entropy injection intervals, where to take entropy for injection from. + #[pallet::constant] + type PotEntropyInjectionLookbackDepth: Get; + + /// Delay after block, in slots, when entropy injection takes effect. + #[pallet::constant] + type PotEntropyInjectionDelay: Get; + /// The amount of time, in blocks, that each era should last. /// NOTE: Currently it is not possible to change the era duration after /// the chain has started. Attempting to do so will brick block production. @@ -292,6 +315,13 @@ mod pallet { RootFarmer(FarmerPublicKey), } + #[derive(Debug, Copy, Clone, Encode, Decode, TypeInfo)] + pub(super) struct PotEntropyValue { + /// Target slot at which entropy should be injected (when known) + pub(super) target_slot: Option, + pub(super) entropy: Blake3Hash, + } + #[pallet::genesis_config] pub struct GenesisConfig { /// Whether rewards should be enabled. @@ -378,6 +408,7 @@ mod pallet { pub(super) type GlobalRandomnesses = StorageValue<_, sp_consensus_subspace::GlobalRandomnesses, ValueQuery>; + // TODO: Clarify when this value is updated (when it is updated, right now it is not) /// Number of iterations for proof of time per slot #[pallet::storage] pub(super) type PotSlotIterations = StorageValue<_, NonZeroU32>; @@ -468,6 +499,12 @@ mod pallet { >, >; + /// Entropy that needs to be injected into proof of time chain at specific slot associated with + /// block number it came from. + #[pallet::storage] + pub(super) type PotEntropy = + StorageValue<_, BTreeMap, ValueQuery>; + /// The current block randomness, updated at block initialization. When the proof of time feature /// is enabled it derived from PoT otherwise PoR. #[pallet::storage] @@ -559,7 +596,6 @@ mod pallet { } /// Farmer vote, currently only used for extra rewards to farmers. - // TODO: Proper weight #[pallet::call_index(3)] #[pallet::weight((::WeightInfo::vote(), DispatchClass::Operational, Pays::No))] // Suppression because the custom syntax will also generate an enum and we need enum to have @@ -783,7 +819,7 @@ impl Pallet { } fn do_initialize(block_number: T::BlockNumber) { - let pre_digest = >::digest() + let pre_digest = frame_system::Pallet::::digest() .logs .iter() .find_map(|s| s.as_subspace_pre_digest::()) @@ -890,7 +926,8 @@ impl Pallet { // Extract PoR randomness from pre-digest. #[cfg(not(feature = "pot"))] - let block_randomness = derive_randomness(pre_digest.solution(), pre_digest.slot().into()); + let block_randomness = + derive_randomness(pre_digest.solution(), SlotNumber::from(pre_digest.slot())); #[cfg(feature = "pot")] let block_randomness = pre_digest @@ -925,11 +962,90 @@ impl Pallet { )); } - // TODO: Take adjustment of iterations into account once we have it #[cfg(feature = "pot")] - frame_system::Pallet::::deposit_log(DigestItem::pot_slot_iterations( - PotSlotIterations::::get().expect("Always instantiated during genesis; qed"), - )); + { + let pot_slot_iterations = + PotSlotIterations::::get().expect("Always initialized during genesis; qed"); + let pot_entropy_injection_interval = T::PotEntropyInjectionInterval::get(); + let pot_entropy_injection_delay = T::PotEntropyInjectionDelay::get(); + + // TODO: Take adjustment of iterations into account once we have it + frame_system::Pallet::::deposit_log(DigestItem::pot_slot_iterations( + pot_slot_iterations, + )); + + let mut entropy = PotEntropy::::get(); + let lookback_in_blocks = pot_entropy_injection_interval + * T::BlockNumber::from(T::PotEntropyInjectionLookbackDepth::get()); + let last_entropy_injection_block = + block_number / pot_entropy_injection_interval * pot_entropy_injection_interval; + let maybe_entropy_source_block_number = + last_entropy_injection_block.checked_sub(&lookback_in_blocks); + + if (block_number % pot_entropy_injection_interval).is_zero() { + let current_block_entropy = derive_pot_entropy( + pre_digest.solution().chunk, + pre_digest.pot_info().proof_of_time(), + ); + // Collect entropy every `T::PotEntropyInjectionInterval` blocks + entropy.insert( + block_number, + PotEntropyValue { + target_slot: None, + entropy: current_block_entropy, + }, + ); + + // Update target slot for entropy injection once we know it + if let Some(entropy_source_block_number) = maybe_entropy_source_block_number { + if let Some(entropy_value) = entropy.get_mut(&entropy_source_block_number) { + let target_slot = pre_digest + .slot() + .saturating_add(pot_entropy_injection_delay); + debug!( + target: "runtime::subspace", + "Pot entropy injection will happen at slot {target_slot:?}", + ); + entropy_value.target_slot.replace(target_slot); + } + } + + PotEntropy::::put(entropy.clone()); + } + + // Deposit consensus log item with parameters change in case corresponding entropy is + // available + if let Some(entropy_source_block_number) = maybe_entropy_source_block_number { + let maybe_entropy_value = entropy.get(&entropy_source_block_number).copied(); + if let Some(PotEntropyValue { + target_slot, + entropy, + }) = maybe_entropy_value + { + let target_slot = target_slot + .expect("Target slot is guaranteed to be present due to logic above; qed"); + + frame_system::Pallet::::deposit_log(DigestItem::pot_parameters_change( + PotParametersChange { + slot: target_slot, + // TODO: Take adjustment of iterations into account once we have it + slot_iterations: pot_slot_iterations, + entropy, + }, + )); + } + } + + // Clean up old values we'll no longer need + if let Some(entry) = entropy.first_entry() { + if let Some(target_slot) = entry.get().target_slot + && target_slot < pre_digest.slot() + { + entry.remove(); + PotEntropy::::put(entropy); + } + } + } } fn do_finalize(_block_number: T::BlockNumber) { @@ -1083,12 +1199,44 @@ impl Pallet { /// Proof of time parameters #[cfg(feature = "pot")] pub fn pot_parameters() -> PotParameters { + let block_number = frame_system::Pallet::::block_number(); + let pot_slot_iterations = + PotSlotIterations::::get().expect("Always initialized during genesis; qed"); + let pot_entropy_injection_interval = T::PotEntropyInjectionInterval::get(); + + let entropy = PotEntropy::::get(); + let lookback_in_blocks = pot_entropy_injection_interval + * T::BlockNumber::from(T::PotEntropyInjectionLookbackDepth::get()); + let last_entropy_injection_block = + block_number / pot_entropy_injection_interval * pot_entropy_injection_interval; + let maybe_entropy_source_block_number = + last_entropy_injection_block.checked_sub(&lookback_in_blocks); + + let mut next_change = None; + + if let Some(entropy_source_block_number) = maybe_entropy_source_block_number { + let maybe_entropy_value = entropy.get(&entropy_source_block_number).copied(); + if let Some(PotEntropyValue { + target_slot, + entropy, + }) = maybe_entropy_value + { + let target_slot = target_slot.expect( + "Always present due to identical check present in block initialization; qed", + ); + + next_change.replace(PotParametersChange { + slot: target_slot, + // TODO: Take adjustment of iterations into account once we have it + slot_iterations: pot_slot_iterations, + entropy, + }); + } + } + PotParameters::V0 { - slot_iterations: PotSlotIterations::::get() - .expect("Always instantiated during genesis; qed"), - // TODO: This is where adjustment for number of iterations and entropy injection will - // happen for runtime API calls - next_change: None, + slot_iterations: pot_slot_iterations, + next_change, } } @@ -1246,9 +1394,6 @@ fn current_vote_verification_data(is_block_initialized: bool) -> Vote .next .unwrap_or(global_randomnesses.current) }, - // TODO: This is invalid and must be fixed for PoT - #[cfg(feature = "pot")] - global_randomness: Default::default(), solution_range: if is_block_initialized { solution_ranges.voting_current } else { @@ -1282,6 +1427,10 @@ enum CheckVoteError { UnknownSegmentCommitment, InvalidHistorySize, InvalidSolution(String), + #[cfg(feature = "pot")] + InvalidProofOfTime, + #[cfg(feature = "pot")] + InvalidFutureProofOfTime, DuplicateVote, Equivocated(SubspaceEquivocationOffence), } @@ -1301,6 +1450,10 @@ impl From for TransactionValidityError { CheckVoteError::UnknownSegmentCommitment => InvalidTransaction::Call, CheckVoteError::InvalidHistorySize => InvalidTransaction::Call, CheckVoteError::InvalidSolution(_) => InvalidTransaction::Call, + #[cfg(feature = "pot")] + CheckVoteError::InvalidProofOfTime => InvalidTransaction::Future, + #[cfg(feature = "pot")] + CheckVoteError::InvalidFutureProofOfTime => InvalidTransaction::Call, CheckVoteError::DuplicateVote => InvalidTransaction::Call, CheckVoteError::Equivocated(_) => InvalidTransaction::BadSigner, }) @@ -1316,6 +1469,10 @@ fn check_vote( parent_hash, slot, solution, + #[cfg(feature = "pot")] + proof_of_time, + #[cfg(feature = "pot")] + future_proof_of_time, } = &signed_vote.vote; let height = *height; let slot = *slot; @@ -1364,7 +1521,7 @@ fn check_vote( let current_vote_verification_data = current_vote_verification_data::(pre_dispatch); let parent_vote_verification_data = ParentVoteVerificationData::::get() - .expect("Above check for block number ensures that this value is always present"); + .expect("Above check for block number ensures that this value is always present; qed"); if pre_dispatch { // New time slot is already set, whatever time slot is in the vote it must be smaller or the @@ -1380,7 +1537,7 @@ fn check_vote( } let parent_slot = if pre_dispatch { - // For pre-dispatch parent slot is `current_slot` if the parent vote verification data (it + // For pre-dispatch parent slot is `current_slot` in the parent vote verification data (it // was updated in current block because initialization hook was already called) if vote is // at the same height as the current block, otherwise it is one level older and // `parent_slot` from parent vote verification data needs to be taken instead @@ -1390,8 +1547,8 @@ fn check_vote( parent_vote_verification_data.parent_slot } } else { - // Otherwise parent slot is `current_slot` if the current vote verification data (that - // wan't updated from parent block because initialization hook wasn't called yet) if vote + // Otherwise parent slot is `current_slot` in the current vote verification data (that + // wasn't updated from parent block because initialization hook wasn't called yet) if vote // is at the same height as the current block, otherwise it is one level older and // `parent_slot` from current vote verification data needs to be taken instead if height == current_block_number { @@ -1473,9 +1630,8 @@ fn check_vote( (&VerifySolutionParams { #[cfg(not(feature = "pot"))] global_randomness: vote_verification_data.global_randomness, - // TODO: This is incorrect, find a way to verify votes #[cfg(feature = "pot")] - proof_of_time: PotProof::default(), + proof_of_time: *proof_of_time, solution_range: vote_verification_data.solution_range, piece_check_params: Some(PieceCheckParams { max_pieces_in_sector: T::MaxPiecesInSector::get(), @@ -1496,6 +1652,36 @@ fn check_vote( return Err(CheckVoteError::InvalidSolution(error)); } + // Cheap proof of time verification is possible here because proof of time must have already + // been seen by this node due to votes requiring the same authoring delay as blocks + #[cfg(feature = "pot")] + if !is_proof_of_time_valid( + BlockHash::try_from(parent_hash.as_ref()) + .expect("Must be able to convert to block hash type"), + SlotNumber::from(slot), + WrappedPotOutput::from(*proof_of_time), + ) { + debug!(target: "runtime::subspace", "Invalid proof of time"); + + return Err(CheckVoteError::InvalidProofOfTime); + } + + // During pre-dispatch we have already verified proofs of time up to future proof of time of + // current block, which vote can't exceed, this must be possible to verify cheaply + #[cfg(feature = "pot")] + if pre_dispatch + && !is_proof_of_time_valid( + BlockHash::try_from(parent_hash.as_ref()) + .expect("Must be able to convert to block hash type"), + SlotNumber::from(slot + T::BlockAuthoringDelay::get()), + WrappedPotOutput::from(*future_proof_of_time), + ) + { + debug!(target: "runtime::subspace", "Invalid future proof of time"); + + return Err(CheckVoteError::InvalidFutureProofOfTime); + } + let key = ( solution.public_key.clone(), solution.sector_index, diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index e2768017e9..cb860f2754 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -31,6 +31,8 @@ use sp_consensus_slots::Slot; #[cfg(feature = "pot")] use sp_consensus_subspace::digests::PreDigestPotInfo; use sp_consensus_subspace::digests::{CompatibleDigestItem, PreDigest}; +#[cfg(feature = "pot")] +use sp_consensus_subspace::PotExtension; use sp_consensus_subspace::{FarmerSignature, KzgExtension, PosExtension, SignedVote, Vote}; use sp_core::crypto::UncheckedFrom; use sp_core::H256; @@ -45,9 +47,9 @@ use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::{ - ArchivedBlockProgress, ArchivedHistorySegment, Blake2b256Hash, HistorySize, LastArchivedBlock, - Piece, PieceOffset, PublicKey, Randomness, RecordedHistorySegment, SegmentCommitment, - SegmentHeader, SegmentIndex, SlotNumber, Solution, SolutionRange, + ArchivedBlockProgress, ArchivedHistorySegment, Blake2b256Hash, BlockNumber, HistorySize, + LastArchivedBlock, Piece, PieceOffset, PublicKey, Randomness, RecordedHistorySegment, + SegmentCommitment, SegmentHeader, SegmentIndex, SlotNumber, Solution, SolutionRange, }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector; @@ -162,6 +164,9 @@ parameter_types! { #[cfg(not(feature = "pot"))] pub const GlobalRandomnessUpdateInterval: u64 = 10; pub const BlockAuthoringDelay: SlotNumber = 2; + pub const PotEntropyInjectionInterval: BlockNumber = 5; + pub const PotEntropyInjectionLookbackDepth: u8 = 2; + pub const PotEntropyInjectionDelay: SlotNumber = 4; pub const EraDuration: u32 = 4; // 1GB pub const InitialSolutionRange: SolutionRange = INITIAL_SOLUTION_RANGE; @@ -184,6 +189,9 @@ impl Config for Test { type RuntimeEvent = RuntimeEvent; type GlobalRandomnessUpdateInterval = GlobalRandomnessUpdateInterval; type BlockAuthoringDelay = BlockAuthoringDelay; + type PotEntropyInjectionInterval = PotEntropyInjectionInterval; + type PotEntropyInjectionLookbackDepth = PotEntropyInjectionLookbackDepth; + type PotEntropyInjectionDelay = PotEntropyInjectionDelay; type EraDuration = EraDuration; type InitialSolutionRange = InitialSolutionRange; type SlotProbability = SlotProbability; @@ -300,6 +308,10 @@ pub fn new_test_ext() -> TestExternalities { ext.register_extension(KzgExtension::new(Kzg::new(embedded_kzg_settings()))); ext.register_extension(PosExtension::new::()); + #[cfg(feature = "pot")] + ext.register_extension(PotExtension::new(Box::new( + |parent_hash, slot, proof_of_time| todo!(), + ))); ext } diff --git a/crates/sc-consensus-subspace/src/import_queue.rs b/crates/sc-consensus-subspace/src/import_queue.rs index c7a054437e..9497291e9a 100644 --- a/crates/sc-consensus-subspace/src/import_queue.rs +++ b/crates/sc-consensus-subspace/src/import_queue.rs @@ -234,16 +234,47 @@ where return Ok(CheckedHeader::Deferred(header, slot)); } + #[cfg(feature = "pot")] + let slot_iterations; + #[cfg(feature = "pot")] + let pot_seed; + #[cfg(feature = "pot")] + let next_slot = slot + Slot::from(1); + #[cfg(feature = "pot")] + // The change to number of iterations might have happened before `next_slot` + if let Some(parameters_change) = subspace_digest_items.pot_parameters_change + && parameters_change.slot <= next_slot + { + slot_iterations = parameters_change.slot_iterations; + // Only if entropy injection happens exactly on next slot we need to mix it in + if parameters_change.slot == next_slot { + pot_seed = pre_digest + .pot_info() + .proof_of_time() + .seed_with_entropy(¶meters_change.entropy); + } else { + pot_seed = pre_digest.pot_info().proof_of_time().seed(); + } + } else { + slot_iterations = subspace_digest_items.pot_slot_iterations; + pot_seed = pre_digest.pot_info().proof_of_time().seed(); + } + // TODO: Extend/optimize this check once we have checkpoints in justifications // Check proof of time between slot of the block and future proof of time + // Here during stateless verification we do not have access to parent block, thus only + // verify proofs after proof of time of at current slot up until future proof of time + // (inclusive), during block import we verify the rest. #[cfg(feature = "pot")] if !self .pot_verifier - .is_proof_valid( - pre_digest.pot_info().proof_of_time().seed(), - subspace_digest_items.pot_slot_iterations, + .is_output_valid( + next_slot, + pot_seed, + slot_iterations, self.chain_constants.block_authoring_delay(), pre_digest.pot_info().future_proof_of_time(), + subspace_digest_items.pot_parameters_change, ) .await { diff --git a/crates/sc-consensus-subspace/src/lib.rs b/crates/sc-consensus-subspace/src/lib.rs index 81dcd3308f..a87e709cb2 100644 --- a/crates/sc-consensus-subspace/src/lib.rs +++ b/crates/sc-consensus-subspace/src/lib.rs @@ -16,7 +16,7 @@ // along with this program. If not, see . #![doc = include_str!("../README.md")] -#![feature(try_blocks)] +#![feature(let_chains, try_blocks)] #![forbid(unsafe_code)] #![warn(missing_docs)] @@ -401,6 +401,10 @@ where /// Handle use to report telemetries. pub telemetry: Option, + /// Proof of time verifier + #[cfg(feature = "pot")] + pub pot_verifier: PotVerifier, + /// Stream with proof of time slots. #[cfg(feature = "pot")] pub pot_slot_info_stream: PotSlotInfoStream, @@ -424,6 +428,8 @@ pub fn start_subspace, ) -> Result @@ -476,6 +482,8 @@ where pending_solutions: Default::default(), #[cfg(feature = "pot")] pot_checkpoints: Default::default(), + #[cfg(feature = "pot")] + pot_verifier, _pos_table: PhantomData::, }; @@ -745,6 +753,8 @@ where let correct_global_randomness; #[cfg(feature = "pot")] let pot_seed; + #[cfg(feature = "pot")] + let slot_iterations; let correct_solution_range; if block_number.is_one() { @@ -759,6 +769,11 @@ where } #[cfg(feature = "pot")] { + slot_iterations = self + .client + .runtime_api() + .pot_parameters(parent_hash)? + .slot_iterations(); pot_seed = self.pot_verifier.genesis_seed(); } @@ -782,7 +797,19 @@ where }; } #[cfg(feature = "pot")] + // In case parameters change in the very first slot after slot of the parent block, + // account for them + if let Some(parameters_change) = subspace_digest_items.pot_parameters_change + && parameters_change.slot == (parent_slot + Slot::from(1)) { + slot_iterations = parameters_change.slot_iterations; + pot_seed = parent_subspace_digest_items + .pre_digest + .pot_info() + .proof_of_time() + .seed_with_entropy(¶meters_change.entropy); + } else { + slot_iterations = subspace_digest_items.pot_slot_iterations; pot_seed = parent_subspace_digest_items .pre_digest .pot_info() @@ -802,13 +829,21 @@ where } #[cfg(feature = "pot")] // TODO: Extend/optimize this check once we have checkpoints in justifications + // Here we check that there is continuity from parent block's proof of time (but not future + // entropy since this block may be produced before slot corresponding to parent block's + // future proof of time) to current block's proof of time. During stateless verification we + // do not have access to parent block, thus only verify proofs after proof of time of at + // current slot up until future proof of time (inclusive), here during block import we + // verify the rest. if !self .pot_verifier - .is_proof_valid( + .is_output_valid( + parent_slot + Slot::from(1), pot_seed, - subspace_digest_items.pot_slot_iterations, + slot_iterations, slots_since_parent, subspace_digest_items.pre_digest.pot_info().proof_of_time(), + subspace_digest_items.pot_parameters_change, ) .await { diff --git a/crates/sc-consensus-subspace/src/slot_worker.rs b/crates/sc-consensus-subspace/src/slot_worker.rs index f268853cc0..8225ae441f 100644 --- a/crates/sc-consensus-subspace/src/slot_worker.rs +++ b/crates/sc-consensus-subspace/src/slot_worker.rs @@ -32,6 +32,8 @@ use sc_consensus_slots::{ BackoffAuthoringBlocksStrategy, SimpleSlotWorker, SlotInfo, SlotLenienceType, SlotProportion, }; #[cfg(feature = "pot")] +use sc_proof_of_time::verifier::PotVerifier; +#[cfg(feature = "pot")] use sc_proof_of_time::PotSlotWorker; use sc_telemetry::TelemetryHandle; use sc_utils::mpsc::tracing_unbounded; @@ -58,11 +60,11 @@ use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; -#[cfg(feature = "pot")] -use subspace_core_primitives::PotCheckpoints; #[cfg(not(feature = "pot"))] use subspace_core_primitives::Randomness; use subspace_core_primitives::{BlockNumber, PublicKey, RewardSignature, SectorId, Solution}; +#[cfg(feature = "pot")] +use subspace_core_primitives::{PotCheckpoints, PotOutput}; use subspace_proof_of_space::Table; use subspace_verification::{ check_reward_signature, verify_solution, PieceCheckParams, VerifySolutionParams, @@ -140,6 +142,8 @@ where #[cfg(feature = "pot")] // TODO: Substrate should make `fn claim_slot` take `&mut self`, the we'll not need `Mutex` pub(super) pot_checkpoints: Mutex>, + #[cfg(feature = "pot")] + pub(super) pot_verifier: PotVerifier, pub(super) _pos_table: PhantomData, } @@ -153,7 +157,14 @@ where SO: SyncOracle + Send + Sync, { fn on_proof(&mut self, slot: Slot, checkpoints: PotCheckpoints) { - self.pot_checkpoints.lock().insert(slot, checkpoints); + { + let mut pot_checkpoints = self.pot_checkpoints.lock(); + + // Remove checkpoints from future slots, if present they are out of date anyway + pot_checkpoints.retain(|&stored_slot, _checkpoints| stored_slot < slot); + + pot_checkpoints.insert(slot, checkpoints); + } if self.sync_oracle.is_major_syncing() { debug!( @@ -303,27 +314,90 @@ where #[cfg(feature = "pot")] let (proof_of_time, future_proof_of_time, new_checkpoints) = { - let mut pot_checkpoints = self.pot_checkpoints.lock(); - - // Remove checkpoints from old slots we will not need anymore - pot_checkpoints.retain(|&stored_slot, _checkpoints| stored_slot > parent_slot); - - let proof_of_time = pot_checkpoints.get(&slot)?.output(); + // TODO: These variables and code block below are only necessary to work around + // https://github.com/rust-lang/rust/issues/57478 + let proof_of_time; + let future_slot; + let future_proof_of_time; + let new_checkpoints; + { + let mut pot_checkpoints = self.pot_checkpoints.lock(); + + // Remove checkpoints from old slots we will not need anymore + pot_checkpoints.retain(|&stored_slot, _checkpoints| stored_slot > parent_slot); + + proof_of_time = pot_checkpoints.get(&slot)?.output(); + + // Future slot for which proof must be available before authoring block at this slot + future_slot = slot + self.chain_constants.block_authoring_delay(); + let parent_future_slot = parent_slot + self.chain_constants.block_authoring_delay(); + future_proof_of_time = pot_checkpoints.get(&future_slot)?.output(); + + // New checkpoints that were produced since parent block's future slot up to current + // future slot (inclusive) + new_checkpoints = pot_checkpoints + .iter() + .filter_map(|(&stored_slot, &checkpoints)| { + (stored_slot > parent_future_slot && stored_slot <= future_slot) + .then_some(checkpoints) + }) + .collect::>(); + } - // Future slot for which proof must be available before authoring block at this slot - let future_slot = slot + self.chain_constants.block_authoring_delay(); - let parent_future_slot = parent_slot + self.chain_constants.block_authoring_delay(); - let future_proof_of_time = pot_checkpoints.get(&future_slot)?.output(); + let pot_parameters = runtime_api.pot_parameters(parent_hash).ok()?; + let slot_iterations; + let pot_seed; + let after_parent_slot = parent_slot + Slot::from(1); + + if parent_header.number().is_zero() { + slot_iterations = pot_parameters.slot_iterations(); + pot_seed = self.pot_verifier.genesis_seed(); + } else { + let pot_info = parent_pre_digest.pot_info(); + // The change to number of iterations might have happened before + // `after_parent_slot` + if let Some(parameters_change) = pot_parameters.next_parameters_change() + && parameters_change.slot <= after_parent_slot + { + slot_iterations = parameters_change.slot_iterations; + // Only if entropy injection happens exactly after parent slot we need to \ + // mix it in + if parameters_change.slot == after_parent_slot { + pot_seed = pot_info + .proof_of_time() + .seed_with_entropy(¶meters_change.entropy); + } else { + pot_seed = pot_info + .proof_of_time().seed(); + } + } else { + slot_iterations = pot_parameters.slot_iterations(); + pot_seed = pot_info + .proof_of_time() + .seed(); + } + }; - // New checkpoints that were produced since parent block's future slot up to current - // future slot (inclusive) - let new_checkpoints = pot_checkpoints - .iter() - .filter_map(|(&stored_slot, &checkpoints)| { - (stored_slot > parent_future_slot && stored_slot <= future_slot) - .then_some(checkpoints) - }) - .collect::>(); + // Ensure proof of time and future proof of time included in upcoming block are valid + if !self + .pot_verifier + .is_output_valid( + after_parent_slot, + pot_seed, + slot_iterations, + Slot::from(u64::from(future_slot) - u64::from(parent_slot)), + future_proof_of_time, + pot_parameters.next_parameters_change(), + ) + .await + { + warn!( + target: "subspace", + "Proof of time or future proof of time is invalid, skipping block \ + production at slot {slot:?}" + ); + return None; + } (proof_of_time, future_proof_of_time, new_checkpoints) }; @@ -481,8 +555,16 @@ where // verification wouldn't be possible due to missing (for now) segment commitment info!(target: "subspace", "🗳️ Claimed vote at slot {slot}"); - self.create_vote(solution, slot, parent_header, parent_hash) - .await; + self.create_vote( + parent_header, + slot, + solution, + #[cfg(feature = "pot")] + proof_of_time, + #[cfg(feature = "pot")] + future_proof_of_time, + ) + .await; } } Err(error) => { @@ -645,11 +727,13 @@ where { async fn create_vote( &self, - solution: Solution, - slot: Slot, parent_header: &Block::Header, - parent_hash: Block::Hash, + slot: Slot, + solution: Solution, + #[cfg(feature = "pot")] proof_of_time: PotOutput, + #[cfg(feature = "pot")] future_proof_of_time: PotOutput, ) { + let parent_hash = parent_header.hash(); let runtime_api = self.client.runtime_api(); if self.should_backoff(slot, parent_header) { @@ -662,6 +746,10 @@ where parent_hash: parent_header.hash(), slot, solution: solution.clone(), + #[cfg(feature = "pot")] + proof_of_time, + #[cfg(feature = "pot")] + future_proof_of_time, }; let signature = match self.sign_reward(vote.hash(), &solution.public_key).await { diff --git a/crates/sc-proof-of-time/src/lib.rs b/crates/sc-proof-of-time/src/lib.rs index e874d9e0ca..f09e3ee506 100644 --- a/crates/sc-proof-of-time/src/lib.rs +++ b/crates/sc-proof-of-time/src/lib.rs @@ -1,6 +1,7 @@ //! Subspace proof of time implementation. -pub mod gossip; +#![feature(let_chains, stmt_expr_attributes)] + mod slots; pub mod source; pub mod verifier; diff --git a/crates/sc-proof-of-time/src/source.rs b/crates/sc-proof-of-time/src/source.rs index 8e3f49917a..9404067f06 100644 --- a/crates/sc-proof-of-time/src/source.rs +++ b/crates/sc-proof-of-time/src/source.rs @@ -1,11 +1,15 @@ -use crate::gossip::{GossipCheckpoints, PotGossipWorker}; +pub mod gossip; +mod state; +mod timekeeper; + +use crate::source::gossip::{GossipCheckpoints, PotGossipWorker}; +use crate::source::state::{NextSlotInput, PotState}; +use crate::source::timekeeper::run_timekeeper; use crate::verifier::PotVerifier; -use atomic::Atomic; -use derive_more::{Deref, DerefMut, From}; +use derive_more::{Deref, DerefMut}; use futures::channel::mpsc; -use futures::executor::block_on; -use futures::{select, SinkExt, StreamExt}; -use sc_client_api::{BlockImportNotification, BlockchainEvents}; +use futures::{select, StreamExt}; +use sc_client_api::BlockchainEvents; use sc_network::PeerId; use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing}; use sp_api::{ApiError, ProvideRuntimeApi}; @@ -14,24 +18,30 @@ use sp_consensus_slots::Slot; #[cfg(feature = "pot")] use sp_consensus_subspace::digests::extract_pre_digest; #[cfg(feature = "pot")] +use sp_consensus_subspace::digests::extract_subspace_digest_items; +#[cfg(feature = "pot")] use sp_consensus_subspace::ChainConstants; +#[cfg(feature = "pot")] +use sp_consensus_subspace::FarmerSignature; use sp_consensus_subspace::{FarmerPublicKey, SubspaceApi as SubspaceRuntimeApi}; use sp_runtime::traits::Block as BlockT; #[cfg(feature = "pot")] -use sp_runtime::traits::{Header, Zero}; +use sp_runtime::traits::Header as HeaderT; +#[cfg(feature = "pot")] +use sp_runtime::traits::Zero; use std::marker::PhantomData; use std::num::NonZeroU32; -#[cfg(feature = "pot")] -use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread; -use subspace_core_primitives::{PotCheckpoints, PotSeed, SlotNumber}; -use subspace_proof_of_time::PotError; +use subspace_core_primitives::{PotCheckpoints, PotSeed}; +#[cfg(feature = "pot")] +use tracing::warn; use tracing::{debug, error}; const LOCAL_PROOFS_CHANNEL_CAPACITY: usize = 10; const SLOTS_CHANNEL_CAPACITY: usize = 10; const GOSSIP_OUTGOING_CHANNEL_CAPACITY: usize = 10; +const GOSSIP_INCOMING_CHANNEL_CAPACITY: usize = 10; /// Proof of time slot information pub struct PotSlotInfo { @@ -54,16 +64,16 @@ struct TimekeeperCheckpoints { } /// Stream with proof of time slots -#[derive(Debug, Deref, DerefMut, From)] +#[derive(Debug, Deref, DerefMut)] pub struct PotSlotInfoStream(mpsc::Receiver); -/// Source of proofs of time. +/// Worker producing proofs of time. /// /// Depending on configuration may produce proofs of time locally, send/receive via gossip and keep /// up to day with blockchain reorgs. #[derive(Debug)] #[must_use = "Proof of time source doesn't do anything unless run() method is called"] -pub struct PotSource { +pub struct PotSourceWorker { client: Arc, #[cfg(feature = "pot")] chain_constants: ChainConstants, @@ -71,15 +81,11 @@ pub struct PotSource { outgoing_messages_sender: mpsc::Sender, incoming_messages_receiver: mpsc::Receiver<(PeerId, GossipCheckpoints)>, slot_sender: mpsc::Sender, - #[cfg(feature = "pot")] - current_slot_iterations: Arc>, - // TODO: Make this shared with Timekeeper instead so it can follow latest parameters - // automatically, this will implement Timekeeper "reset" - next_slot_and_seed: (Slot, PotSeed), + state: Arc, _block: PhantomData, } -impl PotSource +impl PotSourceWorker where Block: BlockT, Client: BlockchainEvents + HeaderBackend + ProvideRuntimeApi, @@ -98,13 +104,16 @@ where { #[cfg(feature = "pot")] let chain_constants; + #[cfg(feature = "pot")] + let mut maybe_next_parameters_change; let start_slot; let start_seed; - let current_slot_iterations; + let slot_iterations; #[cfg(feature = "pot")] { let best_hash = client.info().best_hash; - chain_constants = client.runtime_api().chain_constants(best_hash)?; + let runtime_api = client.runtime_api(); + chain_constants = runtime_api.chain_constants(best_hash)?; let best_header = client.header(best_hash)?.ok_or_else(|| { ApiError::UnknownBlock(format!("Parent block {best_hash} not found")) @@ -118,63 +127,76 @@ where // Next slot after the best one seen best_pre_digest.slot() + chain_constants.block_authoring_delay() + Slot::from(1) }; - // TODO: Support parameters change - start_seed = if best_header.number().is_zero() { - pot_verifier.genesis_seed() + + let pot_parameters = runtime_api.pot_parameters(best_hash)?; + maybe_next_parameters_change = pot_parameters.next_parameters_change(); + + if let Some(parameters_change) = maybe_next_parameters_change + && parameters_change.slot == start_slot + { + start_seed = best_pre_digest.pot_info().future_proof_of_time().seed_with_entropy(¶meters_change.entropy); + slot_iterations = parameters_change.slot_iterations; + maybe_next_parameters_change.take(); } else { - best_pre_digest.pot_info().future_proof_of_time().seed() - }; - // TODO: Support parameters change - current_slot_iterations = client - .runtime_api() - .pot_parameters(best_hash)? - .slot_iterations(start_slot); + start_seed = if best_header.number().is_zero() { + pot_verifier.genesis_seed() + } else { + best_pre_digest.pot_info().future_proof_of_time().seed() + }; + slot_iterations = pot_parameters.slot_iterations(); + } } #[cfg(not(feature = "pot"))] { start_slot = Slot::from(1); start_seed = pot_verifier.genesis_seed(); - current_slot_iterations = NonZeroU32::new(100_000_000).expect("Not zero; qed"); + slot_iterations = NonZeroU32::new(100_000_000).expect("Not zero; qed"); } + let state = Arc::new(PotState::new( + NextSlotInput { + slot: start_slot, + slot_iterations, + seed: start_seed, + }, + #[cfg(feature = "pot")] + maybe_next_parameters_change, + pot_verifier.clone(), + )); + let (timekeeper_checkpoints_sender, timekeeper_checkpoints_receiver) = mpsc::channel(LOCAL_PROOFS_CHANNEL_CAPACITY); let (slot_sender, slot_receiver) = mpsc::channel(SLOTS_CHANNEL_CAPACITY); if is_timekeeper { + let state = Arc::clone(&state); let pot_verifier = pot_verifier.clone(); thread::Builder::new() .name("timekeeper".to_string()) .spawn(move || { - if let Err(error) = run_timekeeper( - start_seed, - start_slot, - current_slot_iterations, - pot_verifier, - timekeeper_checkpoints_sender, - ) { + if let Err(error) = + run_timekeeper(state, pot_verifier, timekeeper_checkpoints_sender) + { error!(%error, "Timekeeper exited with an error"); } }) .expect("Thread creation must not panic"); } - let current_slot_iterations = Arc::new(Atomic::new(current_slot_iterations)); - let (outgoing_messages_sender, outgoing_messages_receiver) = mpsc::channel(GOSSIP_OUTGOING_CHANNEL_CAPACITY); let (incoming_messages_sender, incoming_messages_receiver) = - mpsc::channel(GOSSIP_OUTGOING_CHANNEL_CAPACITY); - let gossip = PotGossipWorker::new( + mpsc::channel(GOSSIP_INCOMING_CHANNEL_CAPACITY); + let gossip_worker = PotGossipWorker::new( outgoing_messages_receiver, incoming_messages_sender, pot_verifier, - Arc::clone(¤t_slot_iterations), + Arc::clone(&state), network, sync, ); - let source = Self { + let source_worker = Self { client, #[cfg(feature = "pot")] chain_constants, @@ -182,13 +204,13 @@ where outgoing_messages_sender, incoming_messages_receiver, slot_sender, - #[cfg(feature = "pot")] - current_slot_iterations, - next_slot_and_seed: (start_slot, start_seed), + state, _block: PhantomData, }; - Ok((source, gossip, PotSlotInfoStream(slot_receiver))) + let pot_slot_info_stream = PotSlotInfoStream(slot_receiver); + + Ok((source_worker, gossip_worker, pot_slot_info_stream)) } /// Run proof of time source @@ -199,12 +221,12 @@ where select! { // List of blocks that the client has finalized. timekeeper_checkpoints = self.timekeeper_checkpoints_receiver.select_next_some() => { - self.handle_timekeeper_checkpoints(timekeeper_checkpoints).await; + self.handle_timekeeper_checkpoints(timekeeper_checkpoints); } // List of blocks that the client has finalized. maybe_gossip_checkpoints = self.incoming_messages_receiver.next() => { if let Some((sender, gossip_checkpoints)) = maybe_gossip_checkpoints { - self.handle_gossip_checkpoints(sender, gossip_checkpoints).await; + self.handle_gossip_checkpoints(sender, gossip_checkpoints); } else { debug!("Incoming gossip messages stream ended, exiting"); return; @@ -212,7 +234,10 @@ where } maybe_import_notification = import_notification_stream.next() => { if let Some(import_notification) = maybe_import_notification { - self.handle_import_notification(import_notification).await; + self.handle_block_import_notification( + import_notification.hash, + &import_notification.header, + ); } else { debug!("Import notifications stream ended, exiting"); return; @@ -222,17 +247,22 @@ where } } - async fn handle_timekeeper_checkpoints( - &mut self, - timekeeper_checkpoints: TimekeeperCheckpoints, - ) { + fn handle_timekeeper_checkpoints(&mut self, timekeeper_checkpoints: TimekeeperCheckpoints) { let TimekeeperCheckpoints { + slot, seed, slot_iterations, - slot, checkpoints, } = timekeeper_checkpoints; + debug!( + ?slot, + %seed, + %slot_iterations, + output = %checkpoints.output(), + "Received timekeeper proof", + ); + if self .outgoing_messages_sender .try_send(GossipCheckpoints { @@ -246,130 +276,96 @@ where debug!(%slot, "Gossip is not able to keep-up with slot production"); } - // It doesn't matter if receiver is dropped - let _ = self - .slot_sender - .send(PotSlotInfo { slot, checkpoints }) - .await; - - self.next_slot_and_seed = (slot + Slot::from(1), checkpoints.output().seed()); + // We don't care if block production is too slow or block production is not enabled on this + // node at all + let _ = self.slot_sender.try_send(PotSlotInfo { slot, checkpoints }); } // TODO: Follow both verified and unverified checkpoints to start secondary timekeeper ASAP in // case verification succeeds - async fn handle_gossip_checkpoints( + fn handle_gossip_checkpoints( &mut self, _sender: PeerId, gossip_checkpoints: GossipCheckpoints, ) { - let (next_slot, next_seed) = self.next_slot_and_seed; - if gossip_checkpoints.slot == next_slot && gossip_checkpoints.seed == next_seed { - // It doesn't matter if receiver is dropped - let _ = self - .slot_sender - .send(PotSlotInfo { - slot: gossip_checkpoints.slot, - checkpoints: gossip_checkpoints.checkpoints, - }) - .await; + let expected_next_slot_input = NextSlotInput { + slot: gossip_checkpoints.slot, + slot_iterations: gossip_checkpoints.slot_iterations, + seed: gossip_checkpoints.seed, + }; - self.next_slot_and_seed = ( - gossip_checkpoints.slot + Slot::from(1), - gossip_checkpoints.checkpoints.output().seed(), - ); + if self + .state + .try_extend( + expected_next_slot_input, + gossip_checkpoints.slot, + gossip_checkpoints.checkpoints.output(), + #[cfg(feature = "pot")] + None, + ) + .is_ok() + { + // We don't care if block production is too slow or block production is not enabled on + // this node at all + let _ = self.slot_sender.try_send(PotSlotInfo { + slot: gossip_checkpoints.slot, + checkpoints: gossip_checkpoints.checkpoints, + }); } } #[cfg(not(feature = "pot"))] - async fn handle_import_notification( + fn handle_block_import_notification( &mut self, - _import_notification: BlockImportNotification, + _block_hash: Block::Hash, + _header: &Block::Header, ) { } #[cfg(feature = "pot")] - async fn handle_import_notification( - &mut self, - import_notification: BlockImportNotification, - ) { - let pre_digest = match extract_pre_digest(&import_notification.header) { - Ok(pre_digest) => pre_digest, - Err(error) => { - error!( - %error, - block_number = %import_notification.header.number(), - block_hash = %import_notification.hash, - "Failed to extract pre-digest from header" - ); - return; - } - }; - let pot_parameters = match self - .client - .runtime_api() - .pot_parameters(import_notification.hash) + fn handle_block_import_notification(&self, block_hash: Block::Hash, header: &Block::Header) { + let subspace_digest_items = match extract_subspace_digest_items::< + Block::Header, + FarmerPublicKey, + FarmerPublicKey, + FarmerSignature, + >(header) { - Ok(pot_parameters) => pot_parameters, + Ok(pre_digest) => pre_digest, Err(error) => { error!( %error, - block_number = %import_notification.header.number(), - block_hash = %import_notification.hash, - "Failed to get proof of time parameters" + block_number = %header.number(), + %block_hash, + "Failed to extract Subspace digest items from header" ); return; } }; - let next_slot = - pre_digest.slot() + self.chain_constants.block_authoring_delay() + Slot::from(1); - self.current_slot_iterations - .store(pot_parameters.slot_iterations(next_slot), Ordering::Relaxed); - - // In case block import is ahead of timekeeper and gossip, update `next_slot_and_seed` - if next_slot >= self.next_slot_and_seed.0 { - // TODO: Account for entropy injection here - self.next_slot_and_seed = ( - next_slot, - pre_digest.pot_info().future_proof_of_time().seed(), - ); - - // TODO: Try to get higher time slot using verifier, we are behind and need to catch up - // and may have already received newer proofs via gossip - } - } -} - -/// Runs timekeeper, must be running on a fast dedicated CPU core -fn run_timekeeper( - mut seed: PotSeed, - slot: Slot, - slot_iterations: NonZeroU32, - pot_verifier: PotVerifier, - mut proofs_sender: mpsc::Sender, -) -> Result<(), PotError> { - let mut slot = SlotNumber::from(slot); - loop { - let checkpoints = subspace_proof_of_time::prove(seed, slot_iterations)?; - - pot_verifier.inject_verified_checkpoints(seed, slot_iterations, checkpoints); - - let slot_info = TimekeeperCheckpoints { - seed, - slot_iterations, - slot: Slot::from(slot), - checkpoints, - }; - - seed = checkpoints.output().seed(); - - if let Err(error) = proofs_sender.try_send(slot_info) { - if let Err(error) = block_on(proofs_sender.send(error.into_inner())) { - debug!(%error, "Couldn't send checkpoints, channel is closed"); - return Ok(()); - } + let best_slot = + subspace_digest_items.pre_digest.slot() + self.chain_constants.block_authoring_delay(); + let best_proof = subspace_digest_items + .pre_digest + .pot_info() + .future_proof_of_time(); + + // This will do one of 3 things depending on circumstances: + // * if block import is ahead of timekeeper and gossip, it will update next slot input + // * if block import is on a different PoT chain, it will update next slot input to the + // correct fork + // * if block import is on the same PoT chain this will essentially do nothing + if self + .state + .update( + best_slot, + best_proof, + #[cfg(feature = "pot")] + Some(subspace_digest_items.pot_parameters_change), + ) + .is_some() + { + warn!("Proof of time chain reorg happened"); } - - slot += 1; } } diff --git a/crates/sc-proof-of-time/src/gossip.rs b/crates/sc-proof-of-time/src/source/gossip.rs similarity index 90% rename from crates/sc-proof-of-time/src/gossip.rs rename to crates/sc-proof-of-time/src/source/gossip.rs index 8d7a561f87..a0559ebb53 100644 --- a/crates/sc-proof-of-time/src/gossip.rs +++ b/crates/sc-proof-of-time/src/source/gossip.rs @@ -1,7 +1,7 @@ //! PoT gossip functionality. +use crate::source::state::PotState; use crate::verifier::PotVerifier; -use atomic::Atomic; use futures::channel::mpsc; use futures::{FutureExt, SinkExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; @@ -31,15 +31,15 @@ pub fn pot_gossip_peers_set_config() -> NonDefaultSetConfig { } #[derive(Debug, Copy, Clone, Encode, Decode)] -pub(crate) struct GossipCheckpoints { +pub(super) struct GossipCheckpoints { /// Slot number - pub(crate) slot: Slot, + pub(super) slot: Slot, /// Proof of time seed - pub(crate) seed: PotSeed, + pub(super) seed: PotSeed, /// Iterations per slot - pub(crate) slot_iterations: NonZeroU32, + pub(super) slot_iterations: NonZeroU32, /// Proof of time checkpoints - pub(crate) checkpoints: PotCheckpoints, + pub(super) checkpoints: PotCheckpoints, } /// PoT gossip worker @@ -59,11 +59,11 @@ where Block: BlockT, { /// Instantiate gossip worker - pub(crate) fn new( + pub(super) fn new( outgoing_messages_receiver: mpsc::Receiver, incoming_messages_sender: mpsc::Sender<(PeerId, GossipCheckpoints)>, pot_verifier: PotVerifier, - current_slot_iterations: Arc>, + state: Arc, network: Network, sync: Arc, ) -> Self @@ -73,11 +73,7 @@ where { let topic = <::Hashing as HashT>::hash(b"checkpoints"); - let validator = Arc::new(PotGossipValidator::new( - pot_verifier, - current_slot_iterations, - topic, - )); + let validator = Arc::new(PotGossipValidator::new(pot_verifier, state, topic)); let engine = GossipEngine::new(network, sync, GOSSIP_PROTOCOL, validator, None); Self { @@ -146,7 +142,7 @@ where Block: BlockT, { pot_verifier: PotVerifier, - current_slot_iterations: Arc>, + state: Arc, topic: Block::Hash, } @@ -155,14 +151,10 @@ where Block: BlockT, { /// Creates the validator. - fn new( - pot_verifier: PotVerifier, - current_slot_iterations: Arc>, - topic: Block::Hash, - ) -> Self { + fn new(pot_verifier: PotVerifier, state: Arc, topic: Block::Hash) -> Self { Self { pot_verifier, - current_slot_iterations, + state, topic, } } @@ -184,7 +176,10 @@ where Ok(message) => { // TODO: Gossip validation should be non-blocking! // TODO: Check that slot number is not too far in the past of future - let current_slot_iterations = self.current_slot_iterations.load(Ordering::Relaxed); + let current_slot_iterations = self + .state + .next_slot_input(Ordering::Relaxed) + .slot_iterations; // Check that number of slot iterations is between 2/3 and 1.5 of current slot // iterations, otherwise ignore diff --git a/crates/sc-proof-of-time/src/source/state.rs b/crates/sc-proof-of-time/src/source/state.rs new file mode 100644 index 0000000000..9ea4c84446 --- /dev/null +++ b/crates/sc-proof-of-time/src/source/state.rs @@ -0,0 +1,187 @@ +use crate::verifier::PotVerifier; +use atomic::Atomic; +use sp_consensus_slots::Slot; +#[cfg(feature = "pot")] +use sp_consensus_subspace::PotParametersChange; +use std::num::NonZeroU32; +use std::sync::atomic::Ordering; +use subspace_core_primitives::{PotOutput, PotSeed}; + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub(super) struct NextSlotInput { + pub(super) slot: Slot, + pub(super) slot_iterations: NonZeroU32, + pub(super) seed: PotSeed, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +struct InnerState { + next_slot_input: NextSlotInput, + #[cfg(feature = "pot")] + parameters_change: Option, +} + +impl InnerState { + pub(super) fn update( + mut self, + mut best_slot: Slot, + mut best_output: PotOutput, + #[cfg(feature = "pot")] maybe_updated_parameters_change: Option< + Option, + >, + pot_verifier: &PotVerifier, + ) -> Self { + #[cfg(feature = "pot")] + if let Some(updated_parameters_change) = maybe_updated_parameters_change { + self.parameters_change = updated_parameters_change; + } + + loop { + let next_slot = best_slot + Slot::from(1); + let next_slot_iterations; + let next_seed; + + #[cfg(feature = "pot")] + // The change to number of iterations might have happened before `next_slot` + if let Some(parameters_change) = self.parameters_change + && parameters_change.slot <= next_slot + { + next_slot_iterations = parameters_change.slot_iterations; + // Only if entropy injection happens on this exact slot we need to mix it in + if parameters_change.slot == next_slot { + next_seed = best_output.seed_with_entropy(¶meters_change.entropy); + } else { + next_seed = best_output.seed(); + } + } else { + next_slot_iterations = self.next_slot_input.slot_iterations; + next_seed = best_output.seed(); + } + #[cfg(not(feature = "pot"))] + { + next_slot_iterations = self.next_slot_input.slot_iterations; + next_seed = best_output.seed(); + } + + self.next_slot_input = NextSlotInput { + slot: next_slot, + slot_iterations: next_slot_iterations, + seed: next_seed, + }; + + // Advance further as far as possible using previously verified proofs/checkpoints + if let Some(checkpoints) = pot_verifier.get_checkpoints(next_seed, next_slot_iterations) + { + best_slot = best_slot + Slot::from(1); + best_output = checkpoints.output(); + } else { + break; + } + } + + self + } +} + +#[derive(Debug)] +pub(super) struct PotState { + inner_state: Atomic, + verifier: PotVerifier, +} + +impl PotState { + pub(super) fn new( + next_slot_input: NextSlotInput, + #[cfg(feature = "pot")] parameters_change: Option, + verifier: PotVerifier, + ) -> Self { + let inner = InnerState { + next_slot_input, + #[cfg(feature = "pot")] + parameters_change, + }; + + Self { + inner_state: Atomic::new(inner), + verifier, + } + } + + pub(super) fn next_slot_input(&self, ordering: Ordering) -> NextSlotInput { + self.inner_state.load(ordering).next_slot_input + } + + /// Extend state if it matches provided expected next slot input. + /// + /// Returns `Ok(new_next_slot_input)` if state was extended successfully and + /// `Err(existing_next_slot_input)` in case state was changed in the meantime. + pub(super) fn try_extend( + &self, + expected_previous_next_slot_input: NextSlotInput, + best_slot: Slot, + best_output: PotOutput, + #[cfg(feature = "pot")] maybe_updated_parameters_change: Option< + Option, + >, + ) -> Result { + let old_inner_state = self.inner_state.load(Ordering::Acquire); + if expected_previous_next_slot_input != old_inner_state.next_slot_input { + return Err(old_inner_state.next_slot_input); + } + + let new_inner_state = old_inner_state.update( + best_slot, + best_output, + #[cfg(feature = "pot")] + maybe_updated_parameters_change, + &self.verifier, + ); + + // Use `compare_exchange` to ensure we only update previously known value and not + // accidentally override something that doesn't match expectations anymore + self.inner_state + .compare_exchange( + old_inner_state, + new_inner_state, + Ordering::AcqRel, + // We don't care about the value read in case of failure + Ordering::Acquire, + ) + .map(|_old_inner_state| new_inner_state.next_slot_input) + .map_err(|existing_inner_state| existing_inner_state.next_slot_input) + } + + /// Update state, overriding PoT chain if it doesn't match provided values. + /// + /// Returns `Some(next_slot_input)` if reorg happened. + #[cfg(feature = "pot")] + pub(super) fn update( + &self, + best_slot: Slot, + best_output: PotOutput, + #[cfg(feature = "pot")] maybe_updated_parameters_change: Option< + Option, + >, + ) -> Option { + let mut best_state = None; + // Use `fetch_update` such that we don't accidentally downgrade best slot to smaller value + let previous_best_state = self + .inner_state + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |inner_state| { + best_state = Some(inner_state.update( + best_slot, + best_output, + #[cfg(feature = "pot")] + maybe_updated_parameters_change, + &self.verifier, + )); + + best_state + }) + .expect("Callback always returns `Some`; qed"); + let best_state = best_state.expect("Replaced with `Some` above; qed"); + + (previous_best_state.next_slot_input != best_state.next_slot_input) + .then_some(best_state.next_slot_input) + } +} diff --git a/crates/sc-proof-of-time/src/source/timekeeper.rs b/crates/sc-proof-of-time/src/source/timekeeper.rs new file mode 100644 index 0000000000..8d393d1b8d --- /dev/null +++ b/crates/sc-proof-of-time/src/source/timekeeper.rs @@ -0,0 +1,54 @@ +use crate::source::state::PotState; +use crate::source::TimekeeperCheckpoints; +use crate::verifier::PotVerifier; +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::SinkExt; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use subspace_proof_of_time::PotError; +use tracing::debug; + +/// Runs timekeeper, must be running on a fast dedicated CPU core +pub(super) fn run_timekeeper( + state: Arc, + pot_verifier: PotVerifier, + mut proofs_sender: mpsc::Sender, +) -> Result<(), PotError> { + let mut next_slot_input = state.next_slot_input(Ordering::Acquire); + + loop { + let checkpoints = + subspace_proof_of_time::prove(next_slot_input.seed, next_slot_input.slot_iterations)?; + + let slot_info = TimekeeperCheckpoints { + seed: next_slot_input.seed, + slot_iterations: next_slot_input.slot_iterations, + slot: next_slot_input.slot, + checkpoints, + }; + + pot_verifier.inject_verified_checkpoints( + next_slot_input.seed, + next_slot_input.slot_iterations, + checkpoints, + ); + + next_slot_input = state + .try_extend( + next_slot_input, + next_slot_input.slot, + checkpoints.output(), + #[cfg(feature = "pot")] + None, + ) + .unwrap_or_else(|next_slot_input| next_slot_input); + + if let Err(error) = proofs_sender.try_send(slot_info) { + if let Err(error) = block_on(proofs_sender.send(error.into_inner())) { + debug!(%error, "Couldn't send checkpoints, channel is closed"); + return Ok(()); + } + } + } +} diff --git a/crates/sc-proof-of-time/src/verifier.rs b/crates/sc-proof-of-time/src/verifier.rs index b949f11ebd..930c3cad4c 100644 --- a/crates/sc-proof-of-time/src/verifier.rs +++ b/crates/sc-proof-of-time/src/verifier.rs @@ -8,9 +8,11 @@ use futures::channel::oneshot; use lru::LruCache; use parking_lot::Mutex; use sp_consensus_slots::Slot; +#[cfg(feature = "pot")] +use sp_consensus_subspace::PotParametersChange; use std::num::{NonZeroU32, NonZeroUsize}; use std::sync::Arc; -use subspace_core_primitives::{PotCheckpoints, PotProof, PotSeed}; +use subspace_core_primitives::{PotCheckpoints, PotOutput, PotSeed}; use tracing::trace; #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] @@ -62,24 +64,44 @@ impl PotVerifier { self.genesis_seed } - /// Verify a single proof of time that is `slots` slots away from `seed`. + pub fn get_checkpoints( + &self, + seed: PotSeed, + slot_iterations: NonZeroU32, + ) -> Option { + let cache_key = CacheKey { + seed, + slot_iterations, + }; + + self.cache + .lock() + .peek(&cache_key) + .and_then(|value| value.checkpoints.try_lock()?.as_ref().copied()) + } + + /// Verify sequence of proofs of time that covers `slots` slots starting at `slot` with provided + /// initial `seed`. + /// + /// In case `maybe_parameters_change` is present, it will not affect provided `seed` and + /// `slot_iterations`, meaning if parameters change occurred at `slot`, provided `seed` and + /// `slot_iterations` must already account for that. /// /// NOTE: Potentially much slower than checkpoints, prefer [`Self::verify_checkpoints()`] /// whenever possible. - pub async fn is_proof_valid( + // TODO: Version of this API that never invokes proving, just checks + pub async fn is_output_valid( &self, + #[cfg(feature = "pot")] mut slot: Slot, mut seed: PotSeed, - slot_iterations: NonZeroU32, + #[cfg_attr(not(feature = "pot"), allow(unused_mut))] mut slot_iterations: NonZeroU32, slots: Slot, - proof: PotProof, + output: PotOutput, + #[cfg(feature = "pot")] mut maybe_parameters_change: Option, ) -> bool { let mut slots = u64::from(slots); loop { - if slots == 0 { - return proof.seed() == seed; - } - // TODO: This "proxy" is a workaround for https://github.com/rust-lang/rust/issues/57478 let (result_sender, result_receiver) = oneshot::channel(); std::thread::spawn({ @@ -90,20 +112,41 @@ impl PotVerifier { async move { // Result doesn't matter here let _ = result_sender - .send(verifier.derive_next_seed(seed, slot_iterations).await); + .send(verifier.calculate_output(seed, slot_iterations).await); } }); } }); - seed = match result_receiver.await { - Ok(Some(seed)) => seed, - _ => { - return false; - } + let Ok(Some(calculated_proof)) = result_receiver.await else { + return false; }; slots -= 1; + + if slots == 0 { + return output == calculated_proof; + } + + #[cfg(feature = "pot")] + { + slot = slot + Slot::from(1); + } + + #[cfg(feature = "pot")] + if let Some(parameters_change) = maybe_parameters_change + && parameters_change.slot == slot + { + slot_iterations = parameters_change.slot_iterations; + seed = calculated_proof.seed_with_entropy(¶meters_change.entropy); + maybe_parameters_change.take(); + } else { + seed = calculated_proof.seed(); + } + #[cfg(not(feature = "pot"))] + { + seed = calculated_proof.seed(); + } } } @@ -111,11 +154,11 @@ impl PotVerifier { // TODO: False-positive, lock is not actually held over await point, remove suppression once // fixed upstream #[allow(clippy::await_holding_lock)] - async fn derive_next_seed( + async fn calculate_output( &self, seed: PotSeed, slot_iterations: NonZeroU32, - ) -> Option { + ) -> Option { let cache_key = CacheKey { seed, slot_iterations, @@ -128,7 +171,7 @@ impl PotVerifier { drop(cache); let correct_checkpoints = cache_value.checkpoints.lock().await; if let Some(correct_checkpoints) = correct_checkpoints.as_ref() { - return Some(correct_checkpoints.output().seed()); + return Some(correct_checkpoints.output()); } // There was another verification for these inputs and it wasn't successful, @@ -178,9 +221,9 @@ impl PotVerifier { return None; }; - let seed = generated_checkpoints.output().seed(); + let proof = generated_checkpoints.output(); checkpoints.replace(generated_checkpoints); - return Some(seed); + return Some(proof); } } diff --git a/crates/sc-proof-of-time/src/verifier/tests.rs b/crates/sc-proof-of-time/src/verifier/tests.rs index 56c3d178c2..f79a71270e 100644 --- a/crates/sc-proof-of-time/src/verifier/tests.rs +++ b/crates/sc-proof-of-time/src/verifier/tests.rs @@ -1,7 +1,13 @@ use crate::verifier::PotVerifier; use futures::executor::block_on; use sp_consensus_slots::Slot; +#[cfg(feature = "pot")] +use sp_consensus_subspace::PotParametersChange; +#[cfg(feature = "pot")] +use std::mem; use std::num::{NonZeroU32, NonZeroUsize}; +#[cfg(feature = "pot")] +use subspace_core_primitives::Blake3Hash; use subspace_core_primitives::PotSeed; const SEED: [u8; 16] = [ @@ -17,11 +23,15 @@ fn test_basic() { let verifier = PotVerifier::new(genesis_seed, NonZeroUsize::new(1000).unwrap()); // Expected to be valid - assert!(block_on(verifier.is_proof_valid( + assert!(block_on(verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(1), genesis_seed, slot_iterations, Slot::from(1), - checkpoints_1.output() + checkpoints_1.output(), + #[cfg(feature = "pot")] + None ))); assert!(block_on(verifier.verify_checkpoints( genesis_seed, @@ -30,18 +40,26 @@ fn test_basic() { ))); // Invalid number of slots - assert!(!block_on(verifier.is_proof_valid( + assert!(!block_on(verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(1), genesis_seed, slot_iterations, Slot::from(2), - checkpoints_1.output() + checkpoints_1.output(), + #[cfg(feature = "pot")] + None ))); // Invalid seed - assert!(!block_on(verifier.is_proof_valid( + assert!(!block_on(verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(1), checkpoints_1.output().seed(), slot_iterations, Slot::from(1), - checkpoints_1.output() + checkpoints_1.output(), + #[cfg(feature = "pot")] + None ))); // Invalid number of iterations assert!(!block_on( @@ -58,17 +76,25 @@ fn test_basic() { let checkpoints_2 = subspace_proof_of_time::prove(seed_1, slot_iterations).unwrap(); // Expected to be valid - assert!(block_on(verifier.is_proof_valid( + assert!(block_on(verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(2), seed_1, slot_iterations, Slot::from(1), - checkpoints_2.output() + checkpoints_2.output(), + #[cfg(feature = "pot")] + None ))); - assert!(block_on(verifier.is_proof_valid( + assert!(block_on(verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(1), genesis_seed, slot_iterations, Slot::from(2), - checkpoints_2.output() + checkpoints_2.output(), + #[cfg(feature = "pot")] + None ))); assert!(block_on(verifier.verify_checkpoints( seed_1, @@ -77,28 +103,135 @@ fn test_basic() { ))); // Invalid number of slots - assert!(!block_on(verifier.is_proof_valid( + assert!(!block_on(verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(1), seed_1, slot_iterations, Slot::from(2), - checkpoints_2.output() + checkpoints_2.output(), + #[cfg(feature = "pot")] + None ))); // Invalid seed - assert!(!block_on(verifier.is_proof_valid( + assert!(!block_on(verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(1), seed_1, slot_iterations, Slot::from(2), - checkpoints_2.output() + checkpoints_2.output(), + #[cfg(feature = "pot")] + None ))); // Invalid number of iterations assert!(!block_on( - verifier.is_proof_valid( + verifier.is_output_valid( + #[cfg(feature = "pot")] + Slot::from(1), genesis_seed, slot_iterations .checked_mul(NonZeroU32::new(2).unwrap()) .unwrap(), Slot::from(2), - checkpoints_2.output() + checkpoints_2.output(), + #[cfg(feature = "pot")] + None ) )); } + +#[cfg(feature = "pot")] +#[test] +fn parameters_change() { + let genesis_seed = PotSeed::from(SEED); + let slot_iterations_1 = NonZeroU32::new(512).unwrap(); + let entropy = [1; mem::size_of::()]; + let checkpoints_1 = subspace_proof_of_time::prove(genesis_seed, slot_iterations_1).unwrap(); + let slot_iterations_2 = slot_iterations_1.saturating_mul(NonZeroU32::new(2).unwrap()); + let checkpoints_2 = subspace_proof_of_time::prove( + checkpoints_1.output().seed_with_entropy(&entropy), + slot_iterations_2, + ) + .unwrap(); + let checkpoints_3 = + subspace_proof_of_time::prove(checkpoints_2.output().seed(), slot_iterations_2).unwrap(); + + let verifier = PotVerifier::new(genesis_seed, NonZeroUsize::new(1000).unwrap()); + + // Changing parameters after first slot + assert!(block_on(verifier.is_output_valid( + Slot::from(1), + genesis_seed, + slot_iterations_1, + Slot::from(1), + checkpoints_1.output(), + Some(PotParametersChange { + slot: Slot::from(2), + slot_iterations: slot_iterations_2, + entropy, + }) + ))); + // Changing parameters in the middle + assert!(block_on(verifier.is_output_valid( + Slot::from(1), + genesis_seed, + slot_iterations_1, + Slot::from(3), + checkpoints_3.output(), + Some(PotParametersChange { + slot: Slot::from(2), + slot_iterations: slot_iterations_2, + entropy, + }) + ))); + // Changing parameters on last slot + assert!(block_on(verifier.is_output_valid( + Slot::from(1), + genesis_seed, + slot_iterations_1, + Slot::from(2), + checkpoints_2.output(), + Some(PotParametersChange { + slot: Slot::from(2), + slot_iterations: slot_iterations_2, + entropy, + }) + ))); + // Not changing parameters because changes apply to the very first slot that is verified + assert!(block_on(verifier.is_output_valid( + Slot::from(2), + checkpoints_1.output().seed_with_entropy(&entropy), + slot_iterations_2, + Slot::from(2), + checkpoints_3.output(), + Some(PotParametersChange { + slot: Slot::from(2), + slot_iterations: slot_iterations_2, + entropy, + }) + ))); + + // Missing parameters change + assert!(!block_on(verifier.is_output_valid( + Slot::from(1), + genesis_seed, + slot_iterations_1, + Slot::from(3), + checkpoints_3.output(), + None + ))); + // Invalid slot + assert!(!block_on(verifier.is_output_valid( + Slot::from(2), + genesis_seed, + slot_iterations_1, + Slot::from(3), + checkpoints_3.output(), + Some(PotParametersChange { + slot: Slot::from(2), + slot_iterations: slot_iterations_2, + entropy, + }) + ))); +} diff --git a/crates/sp-consensus-subspace/src/digests.rs b/crates/sp-consensus-subspace/src/digests.rs index f976d596ba..d9e42f9f82 100644 --- a/crates/sp-consensus-subspace/src/digests.rs +++ b/crates/sp-consensus-subspace/src/digests.rs @@ -31,7 +31,7 @@ use sp_std::fmt; #[cfg(feature = "pot")] use sp_std::num::NonZeroU32; #[cfg(feature = "pot")] -use subspace_core_primitives::PotProof; +use subspace_core_primitives::PotOutput; #[cfg(not(feature = "pot"))] use subspace_core_primitives::Randomness; use subspace_core_primitives::{SegmentCommitment, SegmentIndex, Solution, SolutionRange}; @@ -87,9 +87,9 @@ pub enum PreDigestPotInfo { #[codec(index = 0)] V0 { /// Proof of time for this slot - proof_of_time: PotProof, + proof_of_time: PotOutput, /// Future proof of time - future_proof_of_time: PotProof, + future_proof_of_time: PotOutput, }, } @@ -98,7 +98,7 @@ impl PreDigestPotInfo { /// Proof of time for this slot #[cfg(feature = "pot")] #[inline] - pub fn proof_of_time(&self) -> PotProof { + pub fn proof_of_time(&self) -> PotOutput { let Self::V0 { proof_of_time, .. } = self; *proof_of_time } @@ -106,7 +106,7 @@ impl PreDigestPotInfo { /// Future proof of time #[cfg(feature = "pot")] #[inline] - pub fn future_proof_of_time(&self) -> PotProof { + pub fn future_proof_of_time(&self) -> PotOutput { let Self::V0 { future_proof_of_time, .. @@ -133,7 +133,8 @@ pub trait CompatibleDigestItem: Sized { /// If this item is a Subspace signature, return the signature. fn as_subspace_seal(&self) -> Option; - /// Number of iterations for proof of time per slot + /// Number of iterations for proof of time per slot, corresponds to slot that directly follows + /// parent block's slot and can change before slot for which block is produced #[cfg(feature = "pot")] fn pot_slot_iterations(pot_slot_iterations: NonZeroU32) -> Self; @@ -525,7 +526,8 @@ pub struct SubspaceDigestItems { pub pre_digest: PreDigest, /// Signature (seal) if present pub signature: Option, - /// Number of iterations for proof of time per slot + /// Number of iterations for proof of time per slot, corresponds to slot that directly follows + /// parent block's slot and can change before slot for which block is produced #[cfg(feature = "pot")] pub pot_slot_iterations: NonZeroU32, /// Global randomness diff --git a/crates/sp-consensus-subspace/src/lib.rs b/crates/sp-consensus-subspace/src/lib.rs index 6e04e6c8e4..bde78a1b26 100644 --- a/crates/sp-consensus-subspace/src/lib.rs +++ b/crates/sp-consensus-subspace/src/lib.rs @@ -43,14 +43,17 @@ use sp_runtime_interface::{pass_by, runtime_interface}; #[cfg(feature = "pot")] use sp_std::num::NonZeroU32; use sp_std::vec::Vec; +#[cfg(feature = "std")] use subspace_core_primitives::crypto::kzg::Kzg; #[cfg(feature = "pot")] -use subspace_core_primitives::Blake2b256Hash; +use subspace_core_primitives::BlockHash; #[cfg(not(feature = "pot"))] use subspace_core_primitives::Randomness; +#[cfg(feature = "pot")] +use subspace_core_primitives::{Blake3Hash, PotOutput}; use subspace_core_primitives::{ BlockNumber, HistorySize, PotCheckpoints, PublicKey, RewardSignature, SegmentCommitment, - SegmentHeader, SegmentIndex, Solution, SolutionRange, PUBLIC_KEY_LENGTH, + SegmentHeader, SegmentIndex, SlotNumber, Solution, SolutionRange, PUBLIC_KEY_LENGTH, REWARD_SIGNATURE_LENGTH, }; #[cfg(feature = "std")] @@ -59,6 +62,7 @@ use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::shim::ShimTable; #[cfg(feature = "std")] use subspace_proof_of_space::PosTableType; +#[cfg(feature = "std")] use subspace_proof_of_space::Table; use subspace_solving::REWARD_SIGNING_CONTEXT; use subspace_verification::{check_reward_signature, VerifySolutionParams}; @@ -136,21 +140,22 @@ pub type EquivocationProof
= sp_consensus_slots::EquivocationProof { slot: Slot, /// Solution (includes PoR). solution: Solution, + /// Proof of time for this slot + #[cfg(feature = "pot")] + proof_of_time: PotOutput, + /// Future proof of time + #[cfg(feature = "pot")] + future_proof_of_time: PotOutput, }, } @@ -500,6 +511,24 @@ impl<'a> PassBy for WrappedVerifySolutionParams<'a> { type PassBy = pass_by::Codec; } +/// Wrapped proof of time output for the purposes of runtime interface. +#[derive(Debug, Encode, Decode)] +#[cfg(feature = "pot")] +pub struct WrappedPotOutput(PotOutput); + +#[cfg(feature = "pot")] +impl From for WrappedPotOutput { + #[inline] + fn from(value: PotOutput) -> Self { + Self(value) + } +} + +#[cfg(feature = "pot")] +impl PassBy for WrappedPotOutput { + type PassBy = pass_by::Codec; +} + #[cfg(feature = "std")] sp_externalities::decl_extension! { /// A KZG extension. @@ -531,6 +560,22 @@ impl PosExtension { } } +#[cfg(all(feature = "std", feature = "pot"))] +sp_externalities::decl_extension! { + /// A Poof of time extension. + pub struct PotExtension(Box bool) + Send + Sync>); +} + +#[cfg(all(feature = "std", feature = "pot"))] +impl PotExtension { + /// Create new instance. + pub fn new( + verifier: Box bool) + Send + Sync>, + ) -> Self { + Self(verifier) + } +} + /// Consensus-related runtime interface #[runtime_interface] pub trait Consensus { @@ -538,7 +583,7 @@ pub trait Consensus { fn verify_solution( &mut self, solution: WrappedSolution, - slot: u64, + slot: SlotNumber, params: WrappedVerifySolutionParams<'_>, ) -> Result<(), String> { use sp_externalities::ExternalitiesExt; @@ -577,6 +622,25 @@ pub trait Consensus { Ok(()) } + + /// Verify whether `proof_of_time` is valid at specified `slot` if built on top of `parent_hash` + /// fork of the chain. + #[cfg(feature = "pot")] + fn is_proof_of_time_valid( + &mut self, + parent_hash: BlockHash, + slot: SlotNumber, + proof_of_time: WrappedPotOutput, + ) -> bool { + use sp_externalities::ExternalitiesExt; + + let verifier = &self + .extension::() + .expect("No `PotExtension` associated for the current context!") + .0; + + verifier(parent_hash, slot, proof_of_time.0) + } } #[cfg(not(feature = "pot"))] @@ -647,7 +711,8 @@ sp_api::decl_runtime_apis! { pub enum PotParameters { /// Initial version of the parameters V0 { - /// Base number of iterations per slot + /// Number of iterations for proof of time per slot, corresponds to slot that directly + /// follows parent block's slot and can change before slot for which block is produced slot_iterations: NonZeroU32, /// Optional next scheduled change of parameters next_change: Option, @@ -656,21 +721,22 @@ pub enum PotParameters { #[cfg(feature = "pot")] impl PotParameters { - /// Number of iterations for proof of time per slot, taking into account potential future change - pub fn slot_iterations(&self, slot: Slot) -> NonZeroU32 { + /// Number of iterations for proof of time per slot, corresponds to slot that directly follows + /// parent block's slot and can change before slot for which block is produced + pub fn slot_iterations(&self) -> NonZeroU32 { let Self::V0 { - slot_iterations, - next_change, + slot_iterations, .. } = self; - if let Some(next_change) = next_change { - if next_change.slot >= slot { - return next_change.iterations; - } - } - *slot_iterations } + + /// Get next proof of time parameters change if any + pub fn next_parameters_change(&self) -> Option { + let Self::V0 { next_change, .. } = self; + + *next_change + } } #[cfg(feature = "pot")] diff --git a/crates/sp-lightclient/src/mock.rs b/crates/sp-lightclient/src/mock.rs index c4a1041862..541708408d 100644 --- a/crates/sp-lightclient/src/mock.rs +++ b/crates/sp-lightclient/src/mock.rs @@ -3,6 +3,8 @@ use codec::{Decode, Encode}; use frame_support::sp_io::TestExternalities; use scale_info::TypeInfo; use sp_arithmetic::traits::Zero; +#[cfg(feature = "pot")] +use sp_consensus_subspace::PotExtension; use sp_consensus_subspace::{KzgExtension, PosExtension}; use sp_runtime::traits::{BlakeTwo256, Header as HeaderT}; use std::collections::{BTreeMap, HashMap}; @@ -203,6 +205,10 @@ pub fn new_test_ext() -> TestExternalities { ext.register_extension(KzgExtension::new(Kzg::new(embedded_kzg_settings()))); ext.register_extension(PosExtension::new::()); + #[cfg(feature = "pot")] + ext.register_extension(PotExtension::new(Box::new( + |parent_hash, slot, proof_of_time| todo!(), + ))); ext } diff --git a/crates/sp-lightclient/src/tests.rs b/crates/sp-lightclient/src/tests.rs index 21aa800a90..3954b37dc2 100644 --- a/crates/sp-lightclient/src/tests.rs +++ b/crates/sp-lightclient/src/tests.rs @@ -26,7 +26,7 @@ use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; #[cfg(feature = "pot")] -use subspace_core_primitives::PotProof; +use subspace_core_primitives::PotOutput; use subspace_core_primitives::{ BlockWeight, HistorySize, PublicKey, Randomness, Record, RecordedHistorySegment, SegmentCommitment, SegmentIndex, SlotNumber, Solution, SolutionRange, @@ -130,9 +130,9 @@ struct ValidHeaderParams<'a> { #[cfg(not(feature = "pot"))] global_randomness: Randomness, #[cfg(feature = "pot")] - proof_of_time: PotProof, + proof_of_time: PotOutput, #[cfg(feature = "pot")] - future_proof_of_time: PotProof, + future_proof_of_time: PotOutput, farmer_parameters: &'a FarmerParameters, } @@ -797,10 +797,10 @@ fn test_reorg_to_heavier_smaller_chain() { global_randomness: digests_at_2.global_randomness, // TODO: Correct value #[cfg(feature = "pot")] - proof_of_time: PotProof::default(), + proof_of_time: PotOutput::default(), // TODO: Correct value #[cfg(feature = "pot")] - future_proof_of_time: PotProof::default(), + future_proof_of_time: PotOutput::default(), farmer_parameters: &farmer_parameters, }); seal_header(&keypair, &mut header); diff --git a/crates/subspace-core-primitives/src/lib.rs b/crates/subspace-core-primitives/src/lib.rs index 450a781960..23e79fb2c7 100644 --- a/crates/subspace-core-primitives/src/lib.rs +++ b/crates/subspace-core-primitives/src/lib.rs @@ -131,6 +131,9 @@ impl Randomness { /// Block number in Subspace network. pub type BlockNumber = u32; +/// Block hash in Subspace network. +pub type BlockHash = [u8; 32]; + /// Slot number in Subspace network. pub type SlotNumber = u64; @@ -329,7 +332,7 @@ impl PotSeed { } } -/// Proof of time checkpoint +/// Proof of time output, can be intermediate checkpoint or final slot output #[derive( Debug, Default, @@ -348,15 +351,15 @@ impl PotSeed { MaxEncodedLen, )] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct PotProof(#[cfg_attr(feature = "serde", serde(with = "hex::serde"))] [u8; Self::SIZE]); +pub struct PotOutput(#[cfg_attr(feature = "serde", serde(with = "hex::serde"))] [u8; Self::SIZE]); -impl fmt::Display for PotProof { +impl fmt::Display for PotOutput { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", hex::encode(self.0)) } } -impl PotProof { +impl PotOutput { /// Size of proof of time proof in bytes pub const SIZE: usize = 16; @@ -371,6 +374,15 @@ impl PotProof { pub fn seed(&self) -> PotSeed { PotSeed(self.0) } + + /// Derive seed from proof of time with entropy injection + #[inline] + pub fn seed_with_entropy(&self, entropy: &Blake3Hash) -> PotSeed { + let hash = blake3_hash_list(&[entropy, &self.0]); + let mut seed = PotSeed::default(); + seed.copy_from_slice(&hash[..Self::SIZE]); + seed + } } /// Proof of time checkpoints, result of proving @@ -388,7 +400,7 @@ impl PotProof { TypeInfo, MaxEncodedLen, )] -pub struct PotCheckpoints([PotProof; Self::NUM_CHECKPOINTS.get() as usize]); +pub struct PotCheckpoints([PotOutput; Self::NUM_CHECKPOINTS.get() as usize]); impl PotCheckpoints { /// Number of PoT checkpoints produced (used to optimize verification) @@ -396,7 +408,7 @@ impl PotCheckpoints { /// Get proof of time output out of checkpoints (last checkpoint) #[inline] - pub fn output(&self) -> PotProof { + pub fn output(&self) -> PotOutput { self.0[Self::NUM_CHECKPOINTS.get() as usize - 1] } } diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 3f12c6fced..09a15464a4 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -32,6 +32,7 @@ memmap2 = "0.7.1" parity-db = "0.4.6" parity-scale-codec = "3.6.5" parking_lot = "0.12.1" +prometheus-client = "0.21.2" rand = "0.8.5" rayon = "1.7.0" schnorrkel = "0.9.1" @@ -45,6 +46,7 @@ subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-codin subspace-farmer-components = { version = "0.1.0", path = "../subspace-farmer-components" } subspace-solving = { version = "0.1.0", path = "../subspace-solving" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-metrics = { version = "0.1.0", path = "../../shared/subspace-metrics" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space", features = ["chia"] } subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 64b2419281..a88be78d3d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -25,6 +25,7 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::run_future_in_dedicated_thread; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; +use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::identity::{ed25519, Keypair}; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; @@ -52,6 +53,7 @@ where dev, tmp, mut disk_farms, + metrics_endpoints, } = farming_args; // Override the `--enable_private_ips` flag with `--dev` @@ -107,7 +109,9 @@ where let (piece_cache, piece_cache_worker) = PieceCache::new(node_client.clone(), peer_id); - let (node, mut node_runner) = { + let metrics_endpoints_are_specified = !metrics_endpoints.is_empty(); + + let (node, mut node_runner, metrics_registry) = { if dsn.bootstrap_nodes.is_empty() { dsn.bootstrap_nodes = farmer_app_info.dsn_bootstrap_nodes.clone(); } @@ -120,9 +124,19 @@ where Arc::downgrade(&readers_and_pieces), node_client.clone(), piece_cache.clone(), + metrics_endpoints_are_specified, )? }; + if metrics_endpoints_are_specified { + let prometheus_task = start_prometheus_metrics_server( + metrics_endpoints, + RegistryAdapter::Libp2p(metrics_registry), + )?; + + let _prometheus_worker = tokio::spawn(prometheus_task); + } + let kzg = Kzg::new(embedded_kzg_settings()); let erasure_coding = ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize).unwrap(), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index 8d284f247d..7086d4329c 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -1,5 +1,6 @@ use crate::DsnArgs; use parking_lot::Mutex; +use prometheus_client::registry::Registry; use std::collections::HashSet; use std::path::PathBuf; use std::sync::{Arc, Weak}; @@ -8,6 +9,7 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::{NodeClient, NodeRpcClient}; use subspace_networking::libp2p::identity::Keypair; use subspace_networking::libp2p::kad::RecordKey; +use subspace_networking::libp2p::metrics::Metrics; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::strip_peer_id; @@ -44,7 +46,8 @@ pub(super) fn configure_dsn( weak_readers_and_pieces: Weak>>, node_client: NodeRpcClient, piece_cache: PieceCache, -) -> Result<(Node, NodeRunner), anyhow::Error> { + initialize_metrics: bool, +) -> Result<(Node, NodeRunner, Registry), anyhow::Error> { let networking_parameters_registry = NetworkingParametersManager::new( &base_path.join("known_addresses.bin"), strip_peer_id(bootstrap_nodes.clone()) @@ -54,6 +57,10 @@ pub(super) fn configure_dsn( ) .map(Box::new)?; + // Metrics + let mut metrics_registry = Registry::default(); + let metrics = initialize_metrics.then(|| Metrics::new(&mut metrics_registry)); + let default_config = Config::new( protocol_prefix, keypair, @@ -183,6 +190,7 @@ pub(super) fn configure_dsn( })), bootstrap_addresses: bootstrap_nodes, external_addresses, + metrics, ..default_config }; @@ -201,7 +209,7 @@ pub(super) fn configure_dsn( .detach(); // Consider returning HandlerId instead of each `detach()` calls for other usages. - (node, node_runner) + (node, node_runner, metrics_registry) }) .map_err(Into::into) } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index af56017038..8fb32b5cef 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -8,6 +8,7 @@ use bytesize::ByteSize; use clap::{Parser, ValueHint}; use ss58::parse_ss58_reward_address; use std::fs; +use std::net::SocketAddr; use std::num::NonZeroU8; use std::path::PathBuf; use std::str::FromStr; @@ -80,6 +81,10 @@ struct FarmingArgs { /// Do not print info about configured farms on startup. #[arg(long)] no_info: bool, + /// Defines endpoints for the prometheus metrics server. It doesn't start without at least + /// one specified endpoint. Format: 127.0.0.1:8080 + #[arg(long, alias = "metrics-endpoint")] + metrics_endpoints: Vec, } fn cache_percentage_parser(s: &str) -> anyhow::Result { @@ -195,6 +200,7 @@ impl FromStr for DiskFarm { } } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Parser)] #[clap(about, version)] enum Command { diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index 4747a31d70..63e553af0d 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -40,6 +40,7 @@ rand = "0.8.5" serde = { version = "1.0.159", features = ["derive"] } serde_json = "1.0.97" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-metrics = { version = "0.1.0", path = "../../shared/subspace-metrics" } tempfile = "3.4.0" thiserror = "1.0.38" tokio = { version = "1.28.2", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } diff --git a/crates/subspace-networking/examples/metrics.rs b/crates/subspace-networking/examples/metrics.rs index a6616edfad..96c6e0d3f3 100644 --- a/crates/subspace-networking/examples/metrics.rs +++ b/crates/subspace-networking/examples/metrics.rs @@ -8,7 +8,8 @@ use parking_lot::Mutex; use prometheus_client::registry::Registry; use std::sync::Arc; use std::time::Duration; -use subspace_networking::{start_prometheus_metrics_server, Config, Node}; +use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; +use subspace_networking::{Config, Node}; use tokio::time::sleep; use tracing::{error, info}; @@ -29,8 +30,10 @@ async fn main() { // Init prometheus let prometheus_metrics_server_address = "127.0.0.1:63000".parse().unwrap(); - match start_prometheus_metrics_server(vec![prometheus_metrics_server_address], metric_registry) - { + match start_prometheus_metrics_server( + vec![prometheus_metrics_server_address], + RegistryAdapter::Libp2p(metric_registry), + ) { Err(err) => { error!( ?prometheus_metrics_server_address, diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 59f45c67d9..12078c48b4 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -13,8 +13,9 @@ use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; use std::sync::Arc; +use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::multiaddr::Protocol; -use subspace_networking::{peer_id, start_prometheus_metrics_server, Config}; +use subspace_networking::{peer_id, Config}; use tracing::{debug, info, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; @@ -142,7 +143,12 @@ async fn main() -> Result<(), Box> { metrics_endpoints_are_specified.then(|| Metrics::new(&mut metric_registry)); let prometheus_task = metrics_endpoints_are_specified - .then(|| start_prometheus_metrics_server(metrics_endpoints, metric_registry)) + .then(|| { + start_prometheus_metrics_server( + metrics_endpoints, + RegistryAdapter::Libp2p(metric_registry), + ) + }) .transpose()?; let config = Config { diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 13f984cbed..72398bd482 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -50,5 +50,4 @@ pub use protocols::request_response::handlers::segment_header::{ }; pub use shared::NewPeerInfo; pub use utils::multihash::Multihash; -pub use utils::prometheus::start_prometheus_metrics_server; pub use utils::unique_record_binary_heap::{KeyWrapper, UniqueRecordBinaryHeap}; diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 587183a031..1cfdb4be9a 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -2,7 +2,6 @@ pub mod multihash; pub mod piece_provider; -pub(crate) mod prometheus; #[cfg(test)] mod tests; pub(crate) mod unique_record_binary_heap; diff --git a/crates/subspace-networking/src/utils/prometheus.rs b/crates/subspace-networking/src/utils/prometheus.rs deleted file mode 100644 index e485409bd1..0000000000 --- a/crates/subspace-networking/src/utils/prometheus.rs +++ /dev/null @@ -1,40 +0,0 @@ -use actix_web::http::StatusCode; -use actix_web::web::Data; -use actix_web::{get, App, HttpResponse, HttpServer}; -use parking_lot::Mutex; -use prometheus_client::encoding::text::encode; -use prometheus_client::registry::Registry; -use std::error::Error; -use std::future::Future; -use std::net::SocketAddr; -use std::sync::Arc; -use tracing::info; - -type SharedRegistry = Arc>; - -#[get("/metrics")] -async fn metrics(registry: Data) -> Result> { - let mut encoded = String::new(); - encode(&mut encoded, ®istry.lock())?; - - let resp = HttpResponse::build(StatusCode::OK).body(encoded); - - Ok(resp) -} - -/// Start prometheus metrics server on the provided address. -pub fn start_prometheus_metrics_server( - endpoints: Vec, - registry: Registry, -) -> std::io::Result>> { - let shared_registry = Arc::new(Mutex::new(registry)); - let data = Data::new(shared_registry); - - info!(?endpoints, "Starting metrics server...",); - - Ok( - HttpServer::new(move || App::new().app_data(data.clone()).service(metrics)) - .bind(endpoints.as_slice())? - .run(), - ) -} diff --git a/crates/subspace-node/src/lib.rs b/crates/subspace-node/src/lib.rs index d1905bf243..d70d95e2ef 100644 --- a/crates/subspace-node/src/lib.rs +++ b/crates/subspace-node/src/lib.rs @@ -217,7 +217,7 @@ pub struct Cli { pub dsn_in_connections: u32, /// Defines max established outgoing swarm connection limit for DSN. - #[arg(long, default_value_t = 100)] + #[arg(long, default_value_t = 150)] pub dsn_out_connections: u32, /// Defines max pending incoming connection limit for DSN. @@ -225,11 +225,11 @@ pub struct Cli { pub dsn_pending_in_connections: u32, /// Defines max pending outgoing swarm connection limit for DSN. - #[arg(long, default_value_t = 100)] + #[arg(long, default_value_t = 150)] pub dsn_pending_out_connections: u32, /// Defines target total (in and out) connection number for DSN that should be maintained. - #[arg(long, default_value_t = 50)] + #[arg(long, default_value_t = 30)] pub dsn_target_connections: u32, /// Determines whether we allow keeping non-global (private, shared, loopback..) addresses diff --git a/crates/subspace-proof-of-time/src/aes.rs b/crates/subspace-proof-of-time/src/aes.rs index 94b780f9dc..10253b7160 100644 --- a/crates/subspace-proof-of-time/src/aes.rs +++ b/crates/subspace-proof-of-time/src/aes.rs @@ -9,7 +9,7 @@ extern crate alloc; use aes::cipher::generic_array::GenericArray; use aes::cipher::{BlockDecrypt, BlockEncrypt, KeyInit}; use aes::Aes128; -use subspace_core_primitives::{PotCheckpoints, PotKey, PotProof, PotSeed}; +use subspace_core_primitives::{PotCheckpoints, PotKey, PotOutput, PotSeed}; /// Creates the AES based proof. #[inline(always)] @@ -48,7 +48,7 @@ fn create_generic(seed: PotSeed, key: PotKey, checkpoint_iterations: u32) -> Pot pub(crate) fn verify_sequential( seed: PotSeed, key: PotKey, - checkpoints: &[PotProof], + checkpoints: &[PotOutput], checkpoint_iterations: u32, ) -> bool { assert_eq!(checkpoint_iterations % 2, 0); @@ -77,7 +77,7 @@ pub(crate) fn verify_sequential( #[cfg(test)] mod tests { use super::*; - use subspace_core_primitives::{PotKey, PotProof, PotSeed}; + use subspace_core_primitives::{PotKey, PotOutput, PotSeed}; const SEED: [u8; 16] = [ 0xd6, 0x66, 0xcc, 0xd8, 0xd5, 0x93, 0xc2, 0x3d, 0xa8, 0xdb, 0x6b, 0x5b, 0x14, 0x13, 0xb1, @@ -120,7 +120,7 @@ mod tests { // Decryption of invalid cipher text fails. let mut checkpoints_1 = checkpoints; - checkpoints_1[0] = PotProof::from(BAD_CIPHER); + checkpoints_1[0] = PotOutput::from(BAD_CIPHER); assert!(!verify_sequential( seed, key, diff --git a/crates/subspace-proof-of-time/src/lib.rs b/crates/subspace-proof-of-time/src/lib.rs index 41cc33d1b8..cd4df7e4c0 100644 --- a/crates/subspace-proof-of-time/src/lib.rs +++ b/crates/subspace-proof-of-time/src/lib.rs @@ -4,7 +4,7 @@ mod aes; use core::num::NonZeroU32; -use subspace_core_primitives::{PotCheckpoints, PotProof, PotSeed}; +use subspace_core_primitives::{PotCheckpoints, PotOutput, PotSeed}; /// Proof of time error #[derive(Debug)] @@ -50,7 +50,7 @@ pub fn prove(seed: PotSeed, iterations: NonZeroU32) -> Result Result { let num_checkpoints = checkpoints.len() as u32; if iterations.get() % (num_checkpoints * 2) != 0 { diff --git a/crates/subspace-runtime/Cargo.toml b/crates/subspace-runtime/Cargo.toml index 77cce2e889..01250dbc58 100644 --- a/crates/subspace-runtime/Cargo.toml +++ b/crates/subspace-runtime/Cargo.toml @@ -58,6 +58,7 @@ sp-session = { version = "4.0.0-dev", default-features = false, git = "https://g sp-std = { version = "8.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } sp-transaction-pool = { version = "4.0.0-dev", default-features = false, git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } sp-version = { version = "22.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } +static_assertions = "1.1.0" subspace-core-primitives = { version = "0.1.0", default-features = false, path = "../subspace-core-primitives" } subspace-runtime-primitives = { version = "0.1.0", default-features = false, path = "../subspace-runtime-primitives" } subspace-verification = { version = "0.1.0", default-features = false, path = "../subspace-verification" } diff --git a/crates/subspace-runtime/src/lib.rs b/crates/subspace-runtime/src/lib.rs index 3cbd9de70a..de6ac2cb54 100644 --- a/crates/subspace-runtime/src/lib.rs +++ b/crates/subspace-runtime/src/lib.rs @@ -86,6 +86,7 @@ use sp_std::prelude::*; #[cfg(feature = "std")] use sp_version::NativeVersion; use sp_version::RuntimeVersion; +use static_assertions::const_assert; use subspace_core_primitives::crypto::Scalar; use subspace_core_primitives::objects::BlockObjectMapping; use subspace_core_primitives::{ @@ -159,7 +160,24 @@ const SLOT_PROBABILITY: (u64, u64) = (1, 6); const GLOBAL_RANDOMNESS_UPDATE_INTERVAL: BlockNumber = 256; /// Number of slots between slot arrival and when corresponding block can be produced. -const BLOCK_AUTHORING_DELAY: SlotNumber = 6; +const BLOCK_AUTHORING_DELAY: SlotNumber = 4; + +/// Interval, in blocks, between blockchain entropy injection into proof of time chain. +const POT_ENTROPY_INJECTION_INTERVAL: BlockNumber = 20; + +/// Interval, in entropy injection intervals, where to take entropy for injection from. +const POT_ENTROPY_INJECTION_LOOKBACK_DEPTH: u8 = 5; + +/// Delay after block, in slots, when entropy injection takes effect. +const POT_ENTROPY_INJECTION_DELAY: SlotNumber = 15; + +// Entropy injection interval must be bigger than injection delay or else we may end up in a +// situation where we'll need to do more than one injection at the same slot +const_assert!(POT_ENTROPY_INJECTION_INTERVAL as u64 > POT_ENTROPY_INJECTION_DELAY); +// Entropy injection delay must be bigger than block authoring delay or else we may include +// invalid future proofs in parent block, +1 ensures we do not have unnecessary reorgs that will +// inevitably happen otherwise +const_assert!(POT_ENTROPY_INJECTION_DELAY > BLOCK_AUTHORING_DELAY + 1); /// Era duration in blocks. const ERA_DURATION_IN_BLOCKS: BlockNumber = 2016; @@ -281,6 +299,9 @@ impl frame_system::Config for Runtime { parameter_types! { pub const BlockAuthoringDelay: SlotNumber = BLOCK_AUTHORING_DELAY; + pub const PotEntropyInjectionInterval: BlockNumber = POT_ENTROPY_INJECTION_INTERVAL; + pub const PotEntropyInjectionLookbackDepth: u8 = POT_ENTROPY_INJECTION_LOOKBACK_DEPTH; + pub const PotEntropyInjectionDelay: SlotNumber = POT_ENTROPY_INJECTION_DELAY; pub const SlotProbability: (u64, u64) = SLOT_PROBABILITY; pub const ExpectedBlockTime: Moment = MILLISECS_PER_BLOCK; pub const ExpectedVotesPerBlock: u32 = EXPECTED_VOTES_PER_BLOCK; @@ -304,6 +325,9 @@ impl pallet_subspace::Config for Runtime { type RuntimeEvent = RuntimeEvent; type GlobalRandomnessUpdateInterval = ConstU32; type BlockAuthoringDelay = BlockAuthoringDelay; + type PotEntropyInjectionInterval = PotEntropyInjectionInterval; + type PotEntropyInjectionLookbackDepth = PotEntropyInjectionLookbackDepth; + type PotEntropyInjectionDelay = PotEntropyInjectionDelay; type EraDuration = ConstU32; type InitialSolutionRange = ConstU64; type SlotProbability = SlotProbability; @@ -1301,6 +1325,8 @@ impl_runtime_apis! { parent_hash, slot, solution, + proof_of_time, + future_proof_of_time, } = vote; Subspace::submit_vote(SignedVote { @@ -1309,6 +1335,8 @@ impl_runtime_apis! { parent_hash, slot, solution: solution.into_reward_address_format::(), + proof_of_time, + future_proof_of_time, }, signature, }) diff --git a/crates/subspace-service/Cargo.toml b/crates/subspace-service/Cargo.toml index d4d343ee0a..dba1e254d0 100644 --- a/crates/subspace-service/Cargo.toml +++ b/crates/subspace-service/Cargo.toml @@ -29,6 +29,7 @@ jsonrpsee = { version = "0.16.2", features = ["server"] } pallet-transaction-payment-rpc = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } parity-scale-codec = "3.6.5" parking_lot = "0.12.1" +prometheus-client = "0.21.2" sc-basic-authorship = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } sc-chain-spec = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } @@ -69,6 +70,7 @@ static_assertions = "1.1.0" subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } subspace-fraud-proof = { version = "0.1.0", path = "../subspace-fraud-proof" } +subspace-metrics = { version = "0.1.0", path = "../../shared/subspace-metrics" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" } subspace-runtime-primitives = { version = "0.1.0", path = "../subspace-runtime-primitives" } diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 525f13430d..16400ce1d2 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -1,3 +1,4 @@ +use prometheus_client::registry::Registry; use sc_client_api::AuxStore; use sc_consensus_subspace::archiver::SegmentHeadersStore; use std::collections::HashSet; @@ -6,12 +7,13 @@ use std::path::PathBuf; use std::sync::Arc; use subspace_core_primitives::{SegmentHeader, SegmentIndex}; use subspace_networking::libp2p::kad::Mode as KademliaMode; +use subspace_networking::libp2p::metrics::Metrics; use subspace_networking::libp2p::{identity, Multiaddr}; use subspace_networking::utils::strip_peer_id; use subspace_networking::{ CreationError, NetworkParametersPersistenceError, NetworkingParametersManager, Node, - NodeRunner, PeerInfoProvider, SegmentHeaderBySegmentIndexesRequestHandler, - SegmentHeaderRequest, SegmentHeaderResponse, + NodeRunner, PeerInfoProvider, PieceByIndexRequestHandler, + SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, }; use thiserror::Error; use tracing::{debug, error, trace}; @@ -73,12 +75,16 @@ pub(crate) fn create_dsn_instance( dsn_protocol_version: String, dsn_config: DsnConfig, segment_headers_store: SegmentHeadersStore, -) -> Result<(Node, NodeRunner<()>), DsnConfigurationError> + enable_metrics: bool, +) -> Result<(Node, NodeRunner<()>, Option), DsnConfigurationError> where AS: AuxStore + Sync + Send + 'static, { trace!("Subspace networking starting."); + let mut metric_registry = Registry::default(); + let metrics = enable_metrics.then(|| Metrics::new(&mut metric_registry)); + let networking_parameters_registry = dsn_config .base_path .map(|path| { @@ -112,8 +118,10 @@ where listen_on: dsn_config.listen_on, allow_non_global_addresses_in_dht: dsn_config.allow_non_global_addresses_in_dht, networking_parameters_registry, - request_response_protocols: vec![SegmentHeaderBySegmentIndexesRequestHandler::create( - move |_, req| { + request_response_protocols: vec![ + // We need to enable protocol to request pieces + PieceByIndexRequestHandler::create(|_, _| async { None }), + SegmentHeaderBySegmentIndexesRequestHandler::create(move |_, req| { let segment_indexes = match req { SegmentHeaderRequest::SegmentIndexes { segment_indexes } => { segment_indexes.clone() @@ -162,8 +170,8 @@ where }; async move { result } - }, - )], + }), + ], max_established_incoming_connections: dsn_config.max_in_connections, max_established_outgoing_connections: dsn_config.max_out_connections, max_pending_incoming_connections: dsn_config.max_pending_in_connections, @@ -176,9 +184,12 @@ where bootstrap_addresses: dsn_config.bootstrap_nodes, external_addresses: dsn_config.external_addresses, kademlia_mode: Some(KademliaMode::Client), + metrics, ..default_networking_config }; - subspace_networking::construct(networking_config).map_err(Into::into) + subspace_networking::construct(networking_config) + .map(|(node, node_runner)| (node, node_runner, enable_metrics.then_some(metric_registry))) + .map_err(Into::into) } diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index de44ab5be6..a86f322d13 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -19,6 +19,7 @@ const_option, impl_trait_in_assoc_type, int_roundings, + let_chains, type_alias_impl_trait, type_changing_struct_update )] @@ -37,9 +38,13 @@ use domain_runtime_primitives::{BlockNumber as DomainNumber, Hash as DomainHash} pub use dsn::DsnConfig; use frame_system_rpc_runtime_api::AccountNonceApi; use futures::channel::oneshot; +#[cfg(feature = "pot")] +use futures::executor::block_on; +use futures::FutureExt; use jsonrpsee::RpcModule; use pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi; use parking_lot::Mutex; +use prometheus_client::registry::Registry; use sc_basic_authorship::ProposerFactory; use sc_client_api::execution_extensions::ExtensionsFactory; use sc_client_api::{ @@ -56,9 +61,9 @@ use sc_consensus_subspace::{ use sc_executor::{NativeElseWasmExecutor, NativeExecutionDispatch}; use sc_network::NetworkService; #[cfg(feature = "pot")] -use sc_proof_of_time::gossip::pot_gossip_peers_set_config; +use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config; #[cfg(feature = "pot")] -use sc_proof_of_time::source::PotSource; +use sc_proof_of_time::source::PotSourceWorker; #[cfg(feature = "pot")] use sc_proof_of_time::verifier::PotVerifier; use sc_service::error::Error as ServiceError; @@ -70,6 +75,10 @@ use sp_block_builder::BlockBuilder; use sp_blockchain::HeaderMetadata; use sp_consensus::Error as ConsensusError; use sp_consensus_slots::Slot; +#[cfg(feature = "pot")] +use sp_consensus_subspace::digests::extract_pre_digest; +#[cfg(feature = "pot")] +use sp_consensus_subspace::PotExtension; use sp_consensus_subspace::{FarmerPublicKey, KzgExtension, PosExtension, SubspaceApi}; use sp_core::offchain; use sp_core::traits::SpawnEssentialNamed; @@ -79,6 +88,8 @@ use sp_externalities::Extensions; use sp_objects::ObjectsApi; use sp_offchain::OffchainWorkerApi; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor}; +#[cfg(feature = "pot")] +use sp_runtime::traits::{Header, Zero}; use sp_session::SessionKeys; use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use static_assertions::const_assert; @@ -90,6 +101,7 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; #[cfg(feature = "pot")] use subspace_core_primitives::PotSeed; use subspace_fraud_proof::verifier_api::VerifierClient; +use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::libp2p::Multiaddr; use subspace_networking::Node; @@ -185,6 +197,8 @@ pub enum SubspaceNetworking { node: Node, /// Bootstrap nodes used (that can be also sent to the farmer over RPC) bootstrap_nodes: Vec, + /// DSN metrics registry (libp2p type). + metrics_registry: Option, }, /// Networking must be instantiated internally Create { @@ -213,16 +227,23 @@ pub struct SubspaceConfiguration { pub is_timekeeper: bool, } -struct SubspaceExtensionsFactory { +struct SubspaceExtensionsFactory { kzg: Kzg, + #[cfg_attr(not(feature = "pot"), allow(dead_code))] + client: Arc, + #[cfg(feature = "pot")] + pot_verifier: PotVerifier, domain_genesis_receipt_ext: Option>, _pos_table: PhantomData, } -impl ExtensionsFactory for SubspaceExtensionsFactory +impl ExtensionsFactory + for SubspaceExtensionsFactory where PosTable: Table, Block: BlockT, + Client: HeaderBackend + ProvideRuntimeApi + Send + Sync + 'static, + Client::Api: SubspaceApi, { fn extensions_for( &self, @@ -233,6 +254,118 @@ where let mut exts = Extensions::new(); exts.register(KzgExtension::new(self.kzg.clone())); exts.register(PosExtension::new::()); + #[cfg(feature = "pot")] + exts.register(PotExtension::new({ + let client = Arc::clone(&self.client); + let pot_verifier = self.pot_verifier.clone(); + + Box::new(move |parent_hash, slot, proof_of_time| { + let parent_hash = { + let mut converted_parent_hash = Block::Hash::default(); + converted_parent_hash.as_mut().copy_from_slice(&parent_hash); + converted_parent_hash + }; + + let parent_header = match client.header(parent_hash) { + Ok(Some(parent_header)) => parent_header, + Ok(None) => { + error!( + %parent_hash, + "Header not found during proof of time verification" + ); + + return false; + } + Err(error) => { + error!( + %error, + %parent_hash, + "Failed to retrieve header during proof of time verification" + ); + + return false; + } + }; + let parent_pre_digest = match extract_pre_digest(&parent_header) { + Ok(parent_pre_digest) => parent_pre_digest, + Err(error) => { + error!( + %error, + %parent_hash, + parent_number = %parent_header.number(), + "Failed to extract pre-digest from parent header during proof of time \ + verification, this must never happen" + ); + + return false; + } + }; + + let parent_slot = parent_pre_digest.slot(); + if slot <= *parent_slot { + return false; + } + + let pot_parameters = match client.runtime_api().pot_parameters(parent_hash) { + Ok(pot_parameters) => pot_parameters, + Err(error) => { + debug!( + %error, + %parent_hash, + parent_number = %parent_header.number(), + "Failed to retieve proof of time parameters during proof of time \ + verification" + ); + + return false; + } + }; + + let slot_iterations; + let pot_seed; + let after_parent_slot = parent_slot + Slot::from(1); + + if parent_header.number().is_zero() { + slot_iterations = pot_parameters.slot_iterations(); + pot_seed = pot_verifier.genesis_seed(); + } else { + let pot_info = parent_pre_digest.pot_info(); + // The change to number of iterations might have happened before + // `after_parent_slot` + if let Some(parameters_change) = pot_parameters.next_parameters_change() + && parameters_change.slot <= after_parent_slot + { + slot_iterations = parameters_change.slot_iterations; + // Only if entropy injection happens exactly after parent slot we need to \ + // mix it in + if parameters_change.slot == after_parent_slot { + pot_seed = pot_info + .proof_of_time() + .seed_with_entropy(¶meters_change.entropy); + } else { + pot_seed = pot_info + .proof_of_time().seed(); + } + } else { + slot_iterations = pot_parameters.slot_iterations(); + pot_seed = pot_info + .proof_of_time() + .seed(); + } + }; + + // Ensure proof of time and future proof of time included in upcoming block are + // valid + block_on(pot_verifier.is_output_valid( + after_parent_slot, + pot_seed, + slot_iterations, + Slot::from(slot - u64::from(parent_slot)), + proof_of_time, + pot_parameters.next_parameters_change(), + )) + }) + })); if let Some(ext) = self.domain_genesis_receipt_ext.clone() { exts.register(GenesisReceiptExtension::new(ext)); } @@ -341,15 +474,23 @@ where let domain_genesis_receipt_ext = construct_domain_genesis_block_builder.map(|f| f(backend.clone(), executor.clone())); - client - .execution_extensions() - .set_extensions_factory(SubspaceExtensionsFactory:: { + let client = Arc::new(client); + + #[cfg(feature = "pot")] + let pot_verifier = PotVerifier::new( + PotSeed::from_genesis(client.info().genesis_hash.as_ref(), pot_external_entropy), + POT_VERIFIER_CACHE_SIZE, + ); + client.execution_extensions().set_extensions_factory( + SubspaceExtensionsFactory:: { kzg: kzg.clone(), + client: Arc::clone(&client), + #[cfg(feature = "pot")] + pot_verifier: pot_verifier.clone(), domain_genesis_receipt_ext, _pos_table: PhantomData, - }); - - let client = Arc::new(client); + }, + ); let telemetry = telemetry.map(|(worker, telemetry)| { task_manager @@ -394,11 +535,6 @@ where let fraud_proof_block_import = sc_consensus_fraud_proof::block_import(client.clone(), client.clone(), proof_verifier); - #[cfg(feature = "pot")] - let pot_verifier = PotVerifier::new( - PotSeed::from_genesis(client.info().genesis_hash.as_ref(), pot_external_entropy), - POT_VERIFIER_CACHE_SIZE, - ); let (block_import, subspace_link) = sc_consensus_subspace::block_import::< PosTable, _, @@ -539,7 +675,7 @@ type FullNode = NewFull< /// Builds a new service for a full client. pub async fn new_full( - config: SubspaceConfiguration, + mut config: SubspaceConfiguration, partial_components: PartialComponents, enable_rpc_extensions: bool, block_proposal_slot_portion: SlotProportion, @@ -583,11 +719,12 @@ where mut telemetry, } = other; - let (node, bootstrap_nodes) = match config.subspace_networking { + let (node, bootstrap_nodes, dsn_metrics_registry) = match config.subspace_networking { SubspaceNetworking::Reuse { node, bootstrap_nodes, - } => (node, bootstrap_nodes), + metrics_registry, + } => (node, bootstrap_nodes, metrics_registry), SubspaceNetworking::Create { config: dsn_config } => { let dsn_protocol_version = hex::encode(client.chain_info().genesis_hash); @@ -597,10 +734,11 @@ where "Setting DSN protocol version..." ); - let (node, mut node_runner) = create_dsn_instance( + let (node, mut node_runner, dsn_metrics_registry) = create_dsn_instance( dsn_protocol_version, dsn_config.clone(), segment_headers_store.clone(), + config.base.prometheus_config.is_some(), )?; info!("Subspace networking initialized: Node ID is {}", node.id()); @@ -630,7 +768,7 @@ where ), ); - (node, dsn_config.bootstrap_nodes) + (node, dsn_config.bootstrap_nodes, dsn_metrics_registry) } }; @@ -788,17 +926,17 @@ where #[cfg(feature = "pot")] let pot_slot_info_stream = { - let (pot_source, pot_gossip_worker, pot_slot_info_stream) = PotSource::new( + let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new( config.is_timekeeper, client.clone(), - pot_verifier, + pot_verifier.clone(), network_service.clone(), sync_service.clone(), ) .map_err(|error| Error::Other(error.into()))?; let spawn_essential_handle = task_manager.spawn_essential_handle(); - spawn_essential_handle.spawn("pot-source", Some("pot"), pot_source.run()); + spawn_essential_handle.spawn("pot-source", Some("pot"), pot_source_worker.run()); spawn_essential_handle.spawn_blocking("pot-gossip", Some("pot"), pot_gossip_worker.run()); pot_slot_info_stream @@ -856,6 +994,8 @@ where max_block_proposal_slot_portion: None, telemetry: None, #[cfg(feature = "pot")] + pot_verifier, + #[cfg(feature = "pot")] pot_slot_info_stream, }; @@ -873,6 +1013,26 @@ where ); } + // We replace the Substrate implementation of metrics server with our own. + if let Some(prometheus_config) = config.base.prometheus_config.take() { + let registry = if let Some(dsn_metrics_registry) = dsn_metrics_registry { + RegistryAdapter::Both(dsn_metrics_registry, prometheus_config.registry) + } else { + RegistryAdapter::Substrate(prometheus_config.registry) + }; + + let metrics_server = + start_prometheus_metrics_server(vec![prometheus_config.port], registry)?.map(|error| { + debug!(?error, "Metrics server error."); + }); + + task_manager.spawn_handle().spawn( + "node-metrics-server", + Some("node-metrics-server"), + metrics_server, + ); + }; + let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams { network: network_service.clone(), client: client.clone(), diff --git a/crates/subspace-verification/src/lib.rs b/crates/subspace-verification/src/lib.rs index ff9969506e..6e3a47447f 100644 --- a/crates/subspace-verification/src/lib.rs +++ b/crates/subspace-verification/src/lib.rs @@ -30,13 +30,12 @@ use subspace_archiving::archiver; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::crypto::{ blake2b_256_254_hash_to_scalar, blake2b_256_hash_list, blake2b_256_hash_with_key, + blake3_hash_list, Scalar, }; -#[cfg(feature = "pot")] -use subspace_core_primitives::PotProof; use subspace_core_primitives::{ - Blake2b256Hash, BlockNumber, BlockWeight, HistorySize, PublicKey, Randomness, Record, - RewardSignature, SectorId, SectorSlotChallenge, SegmentCommitment, SlotNumber, Solution, - SolutionRange, + Blake2b256Hash, Blake3Hash, BlockNumber, BlockWeight, HistorySize, PotOutput, PublicKey, + Randomness, Record, RewardSignature, SectorId, SectorSlotChallenge, SegmentCommitment, + SlotNumber, Solution, SolutionRange, }; use subspace_proof_of_space::Table; @@ -169,7 +168,7 @@ pub struct VerifySolutionParams { pub global_randomness: Randomness, /// Proof of time for which solution is built #[cfg(feature = "pot")] - pub proof_of_time: PotProof, + pub proof_of_time: PotOutput, /// Solution range pub solution_range: SolutionRange, /// Parameters for checking piece validity. @@ -339,6 +338,11 @@ pub fn derive_randomness( ])) } +/// Derive proof of time entropy from chunk and proof of time for injection purposes. +pub fn derive_pot_entropy(chunk: Scalar, proof_of_time: PotOutput) -> Blake3Hash { + blake3_hash_list(&[&chunk.to_bytes(), proof_of_time.as_ref()]) +} + /// Derives next solution range based on the total era slots and slot probability pub fn derive_next_solution_range( start_slot: SlotNumber, diff --git a/shared/subspace-metrics/Cargo.toml b/shared/subspace-metrics/Cargo.toml new file mode 100644 index 0000000000..d9570ae300 --- /dev/null +++ b/shared/subspace-metrics/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "subspace-metrics" +version = "0.1.0" +edition = "2021" +authors = [ + "Shamil Gadelshin " +] +description = "Metrics for Subspace Network" +license = "Apache-2.0" +homepage = "https://subspace.network" +repository = "https://github.com/subspace/subspace" +include = [ + "/src", + "/Cargo.toml", +] + +[dependencies] +async-mutex = "1.4.0" +actix-web = "4.3.1" +parking_lot = "0.12.1" +prometheus = { version = "0.13.0", default-features = false } +prometheus-client = "0.21.2" +tracing = "0.1.37" + +[dependencies.libp2p] +version = "0.52.2" +default-features = false +features = [ + "autonat", + "dns", + "gossipsub", + "identify", + "kad", + "macros", + "metrics", + "noise", + "ping", + "plaintext", + "request-response", + "serde", + "tcp", + "tokio", + "websocket", + "yamux", +] diff --git a/shared/subspace-metrics/src/lib.rs b/shared/subspace-metrics/src/lib.rs new file mode 100644 index 0000000000..c6abad979d --- /dev/null +++ b/shared/subspace-metrics/src/lib.rs @@ -0,0 +1,92 @@ +#![warn(missing_docs)] +//! This Rust module serves as a bridge between two different Prometheus metrics libraries +//! used within our frameworks — Substrate and Libp2p. +//! The module exposes a web server endpoint at "/metrics" that outputs metrics in Prometheus +//! format. It adapts metrics from either or both of the following libraries: +//! - Official Rust Prometheus client (registry aliased as Libp2pMetricsRegistry) +//! - TiKV's Prometheus client (registry aliased as SubstrateMetricsRegistry) + +use actix_web::http::StatusCode; +use actix_web::web::Data; +use actix_web::{get, App, HttpResponse, HttpServer}; +use parking_lot::Mutex; +use prometheus::{Encoder, Registry as SubstrateMetricsRegistry, TextEncoder}; +use prometheus_client::encoding::text::encode; +use prometheus_client::registry::Registry as Libp2pMetricsRegistry; +use std::error::Error; +use std::future::Future; +use std::net::SocketAddr; +use std::ops::DerefMut; +use std::sync::Arc; +use tracing::info; + +type SharedRegistry = Arc>; + +/// An metrics registry adapter for Libp2p and Substrate frameworks. +/// It specifies which metrics registry or registries are in use. +pub enum RegistryAdapter { + /// Uses only the Libp2p metrics registry. + Libp2p(Libp2pMetricsRegistry), + /// Uses only the Substrate metrics registry. + Substrate(SubstrateMetricsRegistry), + /// We use both Substrate and Libp2p metrics registries. + Both(Libp2pMetricsRegistry, SubstrateMetricsRegistry), +} + +#[get("/metrics")] +async fn metrics(registry: Data) -> Result> { + let encoded_metrics = match registry.lock().deref_mut() { + RegistryAdapter::Libp2p(libp2p_registry) => { + let mut encoded = String::new(); + encode(&mut encoded, libp2p_registry)?; + + encoded + } + RegistryAdapter::Substrate(substrate_registry) => { + let encoder = TextEncoder::new(); + let mut encoded = String::new(); + unsafe { + encoder.encode(&substrate_registry.gather(), &mut encoded.as_mut_vec())?; + } + encoded + } + RegistryAdapter::Both(libp2p_registry, substrate_registry) => { + // We combine outputs of both metrics registries in one string. + let mut libp2p_encoded = String::new(); + encode(&mut libp2p_encoded, libp2p_registry)?; + + let encoder = TextEncoder::new(); + let mut substrate_encoded = String::new(); + unsafe { + encoder.encode( + &substrate_registry.gather(), + &mut substrate_encoded.as_mut_vec(), + )?; + } + + // libp2p string contains #EOF, order is important here. + substrate_encoded + &libp2p_encoded + } + }; + + let resp = HttpResponse::build(StatusCode::OK).body(encoded_metrics); + + Ok(resp) +} + +/// Start prometheus metrics server on the provided address. +pub fn start_prometheus_metrics_server( + endpoints: Vec, + registry: RegistryAdapter, +) -> std::io::Result>> { + let shared_registry = Arc::new(Mutex::new(registry)); + let data = Data::new(shared_registry); + + info!(?endpoints, "Starting metrics server...",); + + Ok( + HttpServer::new(move || App::new().app_data(data.clone()).service(metrics)) + .bind(endpoints.as_slice())? + .run(), + ) +} diff --git a/test/subspace-test-runtime/Cargo.toml b/test/subspace-test-runtime/Cargo.toml index bbd93c594c..2edfa865d0 100644 --- a/test/subspace-test-runtime/Cargo.toml +++ b/test/subspace-test-runtime/Cargo.toml @@ -54,6 +54,7 @@ sp-session = { version = "4.0.0-dev", default-features = false, git = "https://g sp-std = { version = "8.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } sp-transaction-pool = { version = "4.0.0-dev", default-features = false, git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } sp-version = { version = "22.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } +static_assertions = "1.1.0" subspace-core-primitives = { version = "0.1.0", default-features = false, path = "../../crates/subspace-core-primitives" } subspace-runtime-primitives = { version = "0.1.0", default-features = false, path = "../../crates/subspace-runtime-primitives" } subspace-verification = { version = "0.1.0", default-features = false, path = "../../crates/subspace-verification" } diff --git a/test/subspace-test-runtime/src/lib.rs b/test/subspace-test-runtime/src/lib.rs index 7e7a28a8ca..7079699c53 100644 --- a/test/subspace-test-runtime/src/lib.rs +++ b/test/subspace-test-runtime/src/lib.rs @@ -83,6 +83,7 @@ use sp_std::prelude::*; #[cfg(feature = "std")] use sp_version::NativeVersion; use sp_version::RuntimeVersion; +use static_assertions::const_assert; use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping}; use subspace_core_primitives::{ HistorySize, Piece, Randomness, SegmentCommitment, SegmentHeader, SegmentIndex, SlotNumber, @@ -168,6 +169,23 @@ const GLOBAL_RANDOMNESS_UPDATE_INTERVAL: BlockNumber = 256; /// Number of slots between slot arrival and when corresponding block can be produced. const BLOCK_AUTHORING_DELAY: SlotNumber = 2; +/// Interval, in blocks, between blockchain entropy injection into proof of time chain. +const POT_ENTROPY_INJECTION_INTERVAL: BlockNumber = 5; + +/// Interval, in entropy injection intervals, where to take entropy for injection from. +const POT_ENTROPY_INJECTION_LOOKBACK_DEPTH: u8 = 2; + +/// Delay after block, in slots, when entropy injection takes effect. +const POT_ENTROPY_INJECTION_DELAY: SlotNumber = 4; + +// Entropy injection interval must be bigger than injection delay or else we may end up in a +// situation where we'll need to do more than one injection at the same slot +const_assert!(POT_ENTROPY_INJECTION_INTERVAL as u64 > POT_ENTROPY_INJECTION_DELAY); +// Entropy injection delay must be bigger than block authoring delay or else we may include +// invalid future proofs in parent block, +1 ensures we do not have unnecessary reorgs that will +// inevitably happen otherwise +const_assert!(POT_ENTROPY_INJECTION_DELAY > BLOCK_AUTHORING_DELAY + 1); + /// Era duration in blocks. const ERA_DURATION_IN_BLOCKS: BlockNumber = 2016; @@ -255,6 +273,9 @@ impl frame_system::Config for Runtime { parameter_types! { pub const BlockAuthoringDelay: SlotNumber = BLOCK_AUTHORING_DELAY; + pub const PotEntropyInjectionInterval: BlockNumber = POT_ENTROPY_INJECTION_INTERVAL; + pub const PotEntropyInjectionLookbackDepth: u8 = POT_ENTROPY_INJECTION_LOOKBACK_DEPTH; + pub const PotEntropyInjectionDelay: SlotNumber = POT_ENTROPY_INJECTION_DELAY; pub const SlotProbability: (u64, u64) = SLOT_PROBABILITY; pub const ExpectedBlockTime: Moment = MILLISECS_PER_BLOCK; pub const ShouldAdjustSolutionRange: bool = false; @@ -272,6 +293,9 @@ impl pallet_subspace::Config for Runtime { type RuntimeEvent = RuntimeEvent; type GlobalRandomnessUpdateInterval = ConstU32; type BlockAuthoringDelay = BlockAuthoringDelay; + type PotEntropyInjectionInterval = PotEntropyInjectionInterval; + type PotEntropyInjectionLookbackDepth = PotEntropyInjectionLookbackDepth; + type PotEntropyInjectionDelay = PotEntropyInjectionDelay; type EraDuration = ConstU32; type InitialSolutionRange = ConstU64; type SlotProbability = SlotProbability; @@ -1570,6 +1594,8 @@ impl_runtime_apis! { parent_hash, slot, solution, + proof_of_time, + future_proof_of_time, } = vote; Subspace::submit_vote(SignedVote { @@ -1578,6 +1604,8 @@ impl_runtime_apis! { parent_hash, slot, solution: solution.into_reward_address_format::(), + proof_of_time, + future_proof_of_time, }, signature, })