diff --git a/Cargo.lock b/Cargo.lock index 90f7d35f1a4a..ec683c0590cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7241,12 +7241,17 @@ version = "0.9.13" dependencies = [ "assert_matches", "async-trait", + "bitvec", "clap", "color-eyre", "futures 0.3.19", "futures-timer", + "kvdb", + "parity-scale-codec", "parity-util-mem", "polkadot-cli", + "polkadot-erasure-coding", + "polkadot-node-core-av-store", "polkadot-node-core-backing", "polkadot-node-core-candidate-validation", "polkadot-node-core-dispute-coordinator", diff --git a/node/malus/Cargo.toml b/node/malus/Cargo.toml index 6e70902fc5c7..1e87aa638fa5 100644 --- a/node/malus/Cargo.toml +++ b/node/malus/Cargo.toml @@ -23,7 +23,9 @@ polkadot-node-core-backing = { path = "../core/backing" } polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } polkadot-node-core-pvf = { path = "../core/pvf" } +polkadot-node-core-av-store = { path = "../core/av-store" } parity-util-mem = { version = "0.10.0", default-features = false, features = ["jemalloc-global"] } +parity-scale-codec = { version = "2.3.1", features = ["derive"] } color-eyre = { version = "0.5.11", default-features = false } assert_matches = "1.5" async-trait = "0.1.52" @@ -32,11 +34,14 @@ clap = { version = "3.0", features = ["derive"] } futures = "0.3.19" futures-timer = "3.0.2" tracing = "0.1.26" +bitvec = "0.20.1" +kvdb = "0.10.0" +erasure = { package = "polkadot-erasure-coding", path = "../../erasure-coding" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } [features] default = [] [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } -sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = { version = "0.3.19", features = ["thread-pool"] } diff --git a/node/malus/src/malus.rs b/node/malus/src/malus.rs index a94c195cf56c..2c4712574ee3 100644 --- a/node/malus/src/malus.rs +++ b/node/malus/src/malus.rs @@ -32,10 +32,10 @@ use variants::*; #[clap(about = "Malus - the nemesis of polkadot.", version)] #[clap(rename_all = "kebab-case")] enum NemesisVariant { + /// Store garbage chunks in the availability store + StoreMaliciousAvailableData(RunCmd), /// Suggest a candidate with an invalid proof of validity. SuggestGarbageCandidate(RunCmd), - /// Back a candidate with a specifically crafted proof of validity. - BackGarbageCandidate(RunCmd), /// Delayed disputing of ancestors that are perfectly fine. DisputeAncestor(RunCmd), @@ -63,10 +63,10 @@ impl MalusCli { /// Launch a malus node. fn launch(self) -> eyre::Result<()> { match self.variant { - NemesisVariant::BackGarbageCandidate(cmd) => - polkadot_cli::run_node(run_cmd(cmd), BackGarbageCandidate)?, + NemesisVariant::StoreMaliciousAvailableData(cmd) => + polkadot_cli::run_node(run_cmd(cmd), StoreMaliciousAvailableDataWrapper)?, NemesisVariant::SuggestGarbageCandidate(cmd) => - polkadot_cli::run_node(run_cmd(cmd), SuggestGarbageCandidate)?, + polkadot_cli::run_node(run_cmd(cmd), BackGarbageCandidateWrapper)?, NemesisVariant::DisputeAncestor(cmd) => polkadot_cli::run_node(run_cmd(cmd), DisputeValidCandidates)?, NemesisVariant::PvfPrepareWorker(cmd) => { diff --git a/node/malus/src/tests.rs b/node/malus/src/tests.rs index 57072336852b..b1d04cb1607c 100644 --- a/node/malus/src/tests.rs +++ b/node/malus/src/tests.rs @@ -14,16 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::*; - use polkadot_node_subsystem_test_helpers::*; use polkadot_node_subsystem::{ messages::{AllMessages, AvailabilityStoreMessage}, - overseer::{dummy::DummySubsystem, gen::TimeoutExt, Subsystem}, + overseer::{self, dummy::DummySubsystem, gen::TimeoutExt, OverseerSignal, Subsystem}, SubsystemError, }; +use futures::Future; + +// Filter wrapping related types. +use crate::interceptor::*; + #[derive(Clone, Debug)] struct BlackHoleInterceptor; @@ -131,7 +134,8 @@ fn integrity_test_pass() { AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx), ) .await; - let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap(); + let resp = rx.timeout(std::time::Duration::from_secs(10)).await.unwrap(); + println!("RESP {:?}", resp); overseer }, sub_intercepted, diff --git a/node/malus/src/variants/dispute_valid_candidates.rs b/node/malus/src/variants/dispute_valid_candidates.rs index aa5626c44bef..e18ce523c603 100644 --- a/node/malus/src/variants/dispute_valid_candidates.rs +++ b/node/malus/src/variants/dispute_valid_candidates.rs @@ -35,11 +35,9 @@ use crate::interceptor::*; // Import extra types relevant to the particular // subsystem. -use polkadot_node_core_backing::CandidateBackingSubsystem; use polkadot_node_subsystem::messages::{ ApprovalDistributionMessage, CandidateBackingMessage, DisputeCoordinatorMessage, }; -use sp_keystore::SyncCryptoStorePtr; use std::sync::Arc; @@ -104,17 +102,10 @@ impl OverseerGen for DisputeValidCandidates { RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, Spawner: 'static + SpawnNamed + Clone + Unpin, { - let spawner = args.spawner.clone(); - let crypto_store_ptr = args.keystore.clone() as SyncCryptoStorePtr; let filter = ReplaceApprovalsWithDisputes; prepared_overseer_builder(args)? - .replace_candidate_backing(move |cb| { - InterceptedSubsystem::new( - CandidateBackingSubsystem::new(spawner, crypto_store_ptr, cb.params.metrics), - filter, - ) - }) + .replace_candidate_backing(move |cb| InterceptedSubsystem::new(cb, filter)) .build_with_connector(connector) .map_err(|e| e.into()) } diff --git a/node/malus/src/variants/mod.rs b/node/malus/src/variants/mod.rs index aab3203f5bf3..9f876a166e09 100644 --- a/node/malus/src/variants/mod.rs +++ b/node/malus/src/variants/mod.rs @@ -18,9 +18,11 @@ mod back_garbage_candidate; mod dispute_valid_candidates; +mod store_malicious_available_data; mod suggest_garbage_candidate; pub(crate) use self::{ - back_garbage_candidate::BackGarbageCandidate, dispute_valid_candidates::DisputeValidCandidates, - suggest_garbage_candidate::SuggestGarbageCandidate, + dispute_valid_candidates::DisputeValidCandidates, + store_malicious_available_data::StoreMaliciousAvailableDataWrapper, + suggest_garbage_candidate::BackGarbageCandidateWrapper, }; diff --git a/node/malus/src/variants/store_malicious_available_data.rs b/node/malus/src/variants/store_malicious_available_data.rs new file mode 100644 index 000000000000..02be632c7a53 --- /dev/null +++ b/node/malus/src/variants/store_malicious_available_data.rs @@ -0,0 +1,353 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A malicious node that replaces approvals with invalid disputes +//! against valid candidates. +//! +//! Attention: For usage with `zombienet` only! + +#![allow(missing_docs)] + +use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec}; +use kvdb::{DBTransaction, KeyValueDB}; + +use polkadot_cli::{ + prepared_overseer_builder, + service::{ + AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer, + OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost, + ProvideRuntimeApi, SpawnNamed, + }, +}; + +// Filter wrapping related types. +use crate::interceptor::*; + +// Import extra types relevant to the particular +// subsystem. +use parity_scale_codec::{Decode, Encode, Error as CodecError, Input}; +use polkadot_node_core_av_store::Config as AvailabilityConfig; +use polkadot_node_primitives::{AvailableData, ErasureChunk}; +use polkadot_node_subsystem::messages::AvailabilityStoreMessage; +use polkadot_primitives::v1::{BlockNumber, CandidateHash, Hash, ValidatorIndex}; + +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +const AVAILABLE_PREFIX: &[u8; 9] = b"available"; +const CHUNK_PREFIX: &[u8; 5] = b"chunk"; +const META_PREFIX: &[u8; 4] = b"meta"; +const PRUNE_BY_TIME_PREFIX: &[u8; 13] = b"prune_by_time"; + +// We have some keys we want to map to empty values because existence of the key is enough. We use this because +// rocksdb doesn't support empty values. +const TOMBSTONE_VALUE: &[u8] = &*b" "; + +/// Replace outgoing approval messages with disputes. +#[derive(Clone)] +pub(crate) struct StoreMaliciousAvailableData { + db: Arc, + config: AvailabilityConfig, +} + +impl MessageInterceptor for StoreMaliciousAvailableData +where + Sender: overseer::SubsystemSender + Clone + Send + 'static, +{ + type Message = AvailabilityStoreMessage; + + fn intercept_incoming( + &self, + _sender: &mut Sender, + msg: FromOverseer, + ) -> Option> { + match msg { + FromOverseer::Communication { + msg: + AvailabilityStoreMessage::StoreAvailableData { + candidate_hash, + n_validators, + available_data, + tx, + }, + } => { + let res = store_malicious_available_data( + &self.db, + &self.config, + candidate_hash, + 0, + n_validators as _, + available_data, + ); + + match res { + Ok(()) => { + let _ = tx.send(Ok(())); + }, + Err(_) => { + let _ = tx.send(Err(())); + }, + } + + // We needn't actually drop this message, but there is no benefit in handling it. + None + }, + _ => Some(msg), + } + } + + fn intercept_outgoing(&self, msg: AllMessages) -> Option { + Some(msg) + } +} + +// Meta information about a candidate. +#[derive(Debug, Encode, Decode)] +struct CandidateMeta { + state: State, + data_available: bool, + chunks_stored: BitVec, +} + +fn store_malicious_available_data( + db: &Arc, + config: &AvailabilityConfig, + candidate_hash: CandidateHash, + replaced_chunk_index: usize, + n_validators: usize, + available_data: AvailableData, +) -> Result<(), Error> { + let mut tx = DBTransaction::new(); + + let mut meta = match load_meta(&db, &config, &candidate_hash)? { + Some(m) => { + if m.data_available { + return Ok(()) // already stored. + } + + m + }, + None => { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Getting time should work. QED"); + + // Write a pruning record. + let prune_at = now + Duration::from_secs(60 * 60); + write_pruning_key(&mut tx, &config, prune_at, &candidate_hash); + + CandidateMeta { + state: State::Unavailable(BETimestamp(0)), + data_available: false, + chunks_stored: BitVec::new(), + } + }, + }; + + let mut chunks = erasure::obtain_chunks_v1(n_validators, &available_data).unwrap(); + chunks[replaced_chunk_index].fill(42); + let branches = erasure::branches(chunks.as_ref()); + + let erasure_chunks = chunks.iter().zip(branches.map(|(proof, _)| proof)).enumerate().map( + |(index, (chunk, proof))| ErasureChunk { + chunk: chunk.clone(), + proof, + index: ValidatorIndex(index as u32), + }, + ); + + for chunk in erasure_chunks { + write_chunk(&mut tx, &config, &candidate_hash, chunk.index, &chunk); + } + + meta.data_available = true; + meta.chunks_stored = bitvec::bitvec![BitOrderLsb0, u8; 1; n_validators]; + + write_meta(&mut tx, &config, &candidate_hash, &meta); + write_available_data(&mut tx, &config, &candidate_hash, &available_data); + + db.write(tx)?; + + Ok(()) +} + +/// Generates an overseer that disputes instead of approving valid candidates. +pub(crate) struct StoreMaliciousAvailableDataWrapper; + +impl OverseerGen for StoreMaliciousAvailableDataWrapper { + fn generate<'a, Spawner, RuntimeClient>( + &self, + connector: OverseerConnector, + args: OverseerGenArgs<'a, Spawner, RuntimeClient>, + ) -> Result<(Overseer>, OverseerHandle), Error> + where + RuntimeClient: 'static + ProvideRuntimeApi + HeaderBackend + AuxStore, + RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, + Spawner: 'static + SpawnNamed + Clone + Unpin, + { + let filter = StoreMaliciousAvailableData { + db: args.parachains_db.clone(), + config: args.availability_config.clone(), + }; + + prepared_overseer_builder(args)? + .replace_availability_store(|av| InterceptedSubsystem::new(av, filter)) + .build_with_connector(connector) + .map_err(|e| e.into()) + } +} + +/// Unix time wrapper with big-endian encoding. +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +struct BETimestamp(u64); + +impl Encode for BETimestamp { + fn size_hint(&self) -> usize { + std::mem::size_of::() + } + + fn using_encoded R>(&self, f: F) -> R { + f(&self.0.to_be_bytes()) + } +} + +impl Decode for BETimestamp { + fn decode(value: &mut I) -> Result { + <[u8; 8]>::decode(value).map(u64::from_be_bytes).map(Self) + } +} + +impl From for BETimestamp { + fn from(d: Duration) -> Self { + BETimestamp(d.as_secs()) + } +} + +impl Into for BETimestamp { + fn into(self) -> Duration { + Duration::from_secs(self.0) + } +} + +/// [`BlockNumber`] wrapper with big-endian encoding. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)] +struct BEBlockNumber(BlockNumber); + +impl Encode for BEBlockNumber { + fn size_hint(&self) -> usize { + std::mem::size_of::() + } + + fn using_encoded R>(&self, f: F) -> R { + f(&self.0.to_be_bytes()) + } +} + +impl Decode for BEBlockNumber { + fn decode(value: &mut I) -> Result { + <[u8; std::mem::size_of::()]>::decode(value) + .map(BlockNumber::from_be_bytes) + .map(Self) + } +} + +#[derive(Debug, Encode, Decode)] +enum State { + /// Candidate data was first observed at the given time but is not available in any block. + #[codec(index = 0)] + Unavailable(BETimestamp), + /// The candidate was first observed at the given time and was included in the given list of unfinalized blocks, which may be + /// empty. The timestamp here is not used for pruning. Either one of these blocks will be finalized or the state will regress to + /// `State::Unavailable`, in which case the same timestamp will be reused. Blocks are sorted ascending first by block number and + /// then hash. + #[codec(index = 1)] + Unfinalized(BETimestamp, Vec<(BEBlockNumber, Hash)>), + /// Candidate data has appeared in a finalized block and did so at the given time. + #[codec(index = 2)] + Finalized(BETimestamp), +} + +fn load_meta( + db: &Arc, + config: &AvailabilityConfig, + hash: &CandidateHash, +) -> Result, Error> { + let key = (META_PREFIX, hash).encode(); + + query_inner(db, config.col_meta, &key) +} + +fn write_meta( + tx: &mut DBTransaction, + config: &AvailabilityConfig, + hash: &CandidateHash, + meta: &CandidateMeta, +) { + let key = (META_PREFIX, hash).encode(); + + tx.put_vec(config.col_meta, &key, meta.encode()); +} + +fn write_chunk( + tx: &mut DBTransaction, + config: &AvailabilityConfig, + candidate_hash: &CandidateHash, + chunk_index: ValidatorIndex, + erasure_chunk: &ErasureChunk, +) { + let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode(); + + tx.put_vec(config.col_data, &key, erasure_chunk.encode()); +} + +fn write_available_data( + tx: &mut DBTransaction, + config: &AvailabilityConfig, + hash: &CandidateHash, + available_data: &AvailableData, +) { + let key = (AVAILABLE_PREFIX, hash).encode(); + + tx.put_vec(config.col_data, &key[..], available_data.encode()); +} + +fn query_inner( + db: &Arc, + column: u32, + key: &[u8], +) -> Result, Error> { + match db.get(column, key) { + Ok(Some(raw)) => { + let res = D::decode(&mut &raw[..]).expect("Should Decode. QED"); + Ok(Some(res)) + }, + Ok(None) => Ok(None), + Err(err) => Err(err.into()), + } +} + +fn write_pruning_key( + tx: &mut DBTransaction, + config: &AvailabilityConfig, + t: impl Into, + h: &CandidateHash, +) { + let t = t.into(); + let key = (PRUNE_BY_TIME_PREFIX, t, h).encode(); + tx.put(config.col_meta, &key, TOMBSTONE_VALUE); +} diff --git a/node/malus/src/variants/suggest_garbage_candidate.rs b/node/malus/src/variants/suggest_garbage_candidate.rs index 82198c0a86b7..ac2ddab5e58e 100644 --- a/node/malus/src/variants/suggest_garbage_candidate.rs +++ b/node/malus/src/variants/suggest_garbage_candidate.rs @@ -14,11 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! A malicious overseer proposing a garbage block. +//! A malicious node that replaces approvals with invalid disputes +//! against valid candidates. //! -//! Supposed to be used with regular nodes or in conjunction -//! with [`malus-back-garbage-candidate.rs`](./malus-back-garbage-candidate.rs) -//! to simulate a coordinated attack. +//! Attention: For usage with `zombienet` only! #![allow(missing_docs)] @@ -30,40 +29,42 @@ use polkadot_cli::{ ProvideRuntimeApi, SpawnNamed, }, }; +use polkadot_node_primitives::{AvailableData, BlockData, PoV}; +use polkadot_primitives::v1::{CandidateCommitments, CandidateDescriptor, CandidateHash, Hash}; -// Import extra types relevant to the particular -// subsystem. -use polkadot_node_core_backing::CandidateBackingSubsystem; -use polkadot_node_primitives::Statement; -use polkadot_node_subsystem::{ - messages::{CandidateBackingMessage, StatementDistributionMessage}, - overseer::{self, SubsystemSender}, -}; -use polkadot_node_subsystem_util as util; // Filter wrapping related types. use crate::interceptor::*; -use polkadot_primitives::v1::{ - CandidateCommitments, CandidateReceipt, CommittedCandidateReceipt, CompactStatement, Hash, - Signed, -}; -use sp_keystore::SyncCryptoStorePtr; -use util::metered; -use std::sync::Arc; +// Import extra types relevant to the particular +// subsystem. +use polkadot_node_subsystem::messages::{AvailabilityStoreMessage, CandidateBackingMessage}; +use polkadot_primitives::v1::{CandidateReceipt, CommittedCandidateReceipt}; -use crate::shared::*; +use std::sync::{Arc, Mutex}; -/// Replaces the seconded PoV data -/// of outgoing messages by some garbage data. +struct NotedCandidate { + candidate: CandidateReceipt, + relay_parent: Hash, +} + +#[derive(Default)] +struct Inner { + map: std::collections::HashMap, +} + +/// Replace outgoing approval messages with disputes. #[derive(Clone)] -struct ReplacePoVBytes -where - Sender: Send, -{ - queue: metered::UnboundedMeteredSender<(Sender, Hash, CandidateReceipt)>, +struct NoteCandidate { + inner: Arc>, } -impl MessageInterceptor for ReplacePoVBytes +/// Replace outgoing approval messages with disputes. +#[derive(Clone)] +struct BackGarbageCandidate { + inner: Arc>, +} + +impl MessageInterceptor for NoteCandidate where Sender: overseer::SubsystemSender + Clone + Send + 'static, { @@ -71,20 +72,27 @@ where fn intercept_incoming( &self, - sender: &mut Sender, + _sender: &mut Sender, msg: FromOverseer, ) -> Option> { match msg { FromOverseer::Communication { - msg: CandidateBackingMessage::Second(hash, candidate_receipt, _pov), + msg: CandidateBackingMessage::Second(relay_parent, candidate, pov), } => { - self.queue - .unbounded_send((sender.clone(), hash, candidate_receipt.clone())) - .unwrap(); - - None + let mut candidate_cache = self.inner.lock().unwrap(); + candidate_cache.map.insert( + candidate.hash(), + NotedCandidate { + candidate: candidate.clone(), + relay_parent: relay_parent.clone(), + }, + ); + Some(FromOverseer::Communication { + msg: CandidateBackingMessage::Second(relay_parent, candidate, pov), + }) }, - other => Some(other), + FromOverseer::Communication { msg } => Some(FromOverseer::Communication { msg }), + FromOverseer::Signal(signal) => Some(FromOverseer::Signal(signal)), } } @@ -93,10 +101,104 @@ where } } -/// Generates an overseer that exposes bad behavior. -pub(crate) struct SuggestGarbageCandidate; +impl MessageInterceptor for BackGarbageCandidate +where + Sender: overseer::SubsystemSender + Clone + Send + 'static, +{ + type Message = AvailabilityStoreMessage; + + fn intercept_incoming( + &self, + _sender: &mut Sender, + msg: FromOverseer, + ) -> Option> { + Some(msg) + } + + fn intercept_outgoing(&self, msg: AllMessages) -> Option { + match msg { + AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData { + candidate_hash, + n_validators, + available_data, + tx, + }) => { + let pov = Arc::new(PoV { block_data: BlockData(vec![0; 256]) }); + let malicious_available_data = AvailableData { + pov: pov.clone(), + validation_data: available_data.validation_data.clone(), + }; + + let pov_hash = pov.hash(); + let validation_data_hash = malicious_available_data.validation_data.hash(); + + let inner = self.inner.lock().unwrap(); + let cache = inner.map.get(&candidate_hash).unwrap(); + let relay_parent = cache.relay_parent.clone(); + let candidate_cache = cache.candidate.clone(); + let validation_code_hash = candidate_cache.descriptor().validation_code_hash; + + let erasure_root = { + let chunks = + erasure::obtain_chunks_v1(n_validators as usize, &available_data).unwrap(); + + let branches = erasure::branches(chunks.as_ref()); + branches.root() + }; + let (collator_id, collator_signature) = { + use polkadot_primitives::v1::CollatorPair; + use sp_core::crypto::Pair; + + let collator_pair = CollatorPair::generate().0; + let signature_payload = polkadot_primitives::v1::collator_signature_payload( + &relay_parent, + &candidate_cache.descriptor().para_id, + &validation_data_hash, + &pov_hash, + &validation_code_hash, + ); + + (collator_pair.public(), collator_pair.sign(&signature_payload)) + }; + let malicious_commitments = CandidateCommitments { + upward_messages: Vec::new(), + horizontal_messages: Vec::new(), + new_validation_code: None, + head_data: vec![1, 2, 3, 4, 5].into(), + processed_downward_messages: 0, + hrmp_watermark: available_data.validation_data.relay_parent_number, + }; + let malicious_candidate = CommittedCandidateReceipt { + descriptor: CandidateDescriptor { + para_id: candidate_cache.descriptor().para_id, + relay_parent, + collator: collator_id, + persisted_validation_data_hash: validation_data_hash, + pov_hash, + erasure_root, + signature: collator_signature, + para_head: malicious_commitments.head_data.hash(), + validation_code_hash, + }, + commitments: malicious_commitments.clone(), + }; + let malicious_candidate_hash = malicious_candidate.hash(); + Some(AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData { + candidate_hash: malicious_candidate_hash, + n_validators, + available_data: malicious_available_data, + tx, + })) + }, + msg => Some(msg), + } + } +} + +/// Generates an overseer that disputes instead of approving valid candidates. +pub(crate) struct BackGarbageCandidateWrapper; -impl OverseerGen for SuggestGarbageCandidate { +impl OverseerGen for BackGarbageCandidateWrapper { fn generate<'a, Spawner, RuntimeClient>( &self, connector: OverseerConnector, @@ -107,65 +209,17 @@ impl OverseerGen for SuggestGarbageCandidate { RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, Spawner: 'static + SpawnNamed + Clone + Unpin, { - let spawner = args.spawner.clone(); - let (sink, source) = metered::unbounded(); - let keystore = args.keystore.clone() as SyncCryptoStorePtr; - - let filter = ReplacePoVBytes { queue: sink }; - - let keystore2 = keystore.clone(); - let spawner2 = spawner.clone(); - - let result = prepared_overseer_builder(args)? - .replace_candidate_backing(move |cb| { - InterceptedSubsystem::new( - CandidateBackingSubsystem::new(spawner2, keystore2, cb.params.metrics), - filter, - ) + let inner = Inner { map: std::collections::HashMap::new() }; + let inner_mut = Arc::new(Mutex::new(inner)); + let note_candidate = NoteCandidate { inner: inner_mut.clone() }; + let back_garbage_candidate = BackGarbageCandidate { inner: inner_mut.clone() }; + + prepared_overseer_builder(args)? + .replace_candidate_backing(move |cb| InterceptedSubsystem::new(cb, note_candidate)) + .replace_availability_store(move |av| { + InterceptedSubsystem::new(av, back_garbage_candidate) }) .build_with_connector(connector) - .map_err(|e| e.into()); - - launch_processing_task( - &spawner, - source, - move |(mut subsystem_sender, hash, candidate_receipt): (_, Hash, CandidateReceipt)| { - let keystore = keystore.clone(); - async move { - tracing::info!( - target = MALUS, - "Replacing seconded candidate pov with something else" - ); - - let committed_candidate_receipt = CommittedCandidateReceipt { - descriptor: candidate_receipt.descriptor.clone(), - commitments: CandidateCommitments::default(), - }; - - let statement = Statement::Seconded(committed_candidate_receipt); - - if let Ok(validator) = - util::Validator::new(hash, keystore.clone(), &mut subsystem_sender).await - { - let signed_statement: Signed = validator - .sign(keystore, statement) - .await - .expect("Signing works. qed") - .expect("Something must come out of this. qed"); - - subsystem_sender - .send_message(StatementDistributionMessage::Share( - hash, - signed_statement, - )) - .await; - } else { - tracing::info!("We are not a validator. Not siging anything."); - } - } - }, - ); - - result + .map_err(|e| e.into()) } }