Skip to content

Commit

Permalink
fix: validate block sync messages
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic authored and nxsaken committed Aug 21, 2024
1 parent 6c8cbf2 commit 00680d4
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 74 deletions.
5 changes: 3 additions & 2 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, fs::File, io::BufReader, path::Path, sync::mpsc, thread, time};
use std::{fmt, fs::File, io::BufReader, num::NonZeroUsize, path::Path, sync::mpsc, thread, time};

use eyre::{Result, WrapErr};
use iroha::{
Expand Down Expand Up @@ -108,7 +108,8 @@ impl Config {
.expect("Must be some")
.state()
.view();
let mut blocks = state_view.all_blocks().skip(blocks_out_of_measure as usize);
let mut blocks =
state_view.all_blocks(NonZeroUsize::new(blocks_out_of_measure as usize + 1).unwrap());
let (txs_accepted, txs_rejected) = (0..self.blocks)
.map(|_| {
let block = blocks
Expand Down
1 change: 0 additions & 1 deletion client/tests/integration/roles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ fn role_permissions_are_deduplicated() {
.add_permission(allow_alice_to_transfer_rose_1)
.add_permission(allow_alice_to_transfer_rose_2);

println!("KITA: {role:?}");
test_client
.submit_blocking(Register::role(role))
.expect("failed to register role");
Expand Down
187 changes: 130 additions & 57 deletions core/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::{
collections::BTreeSet,
fmt::Debug,
num::{NonZeroU32, NonZeroU64},
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -50,7 +50,7 @@ pub struct BlockSynchronizer {
gossip_size: NonZeroU32,
network: IrohaNetwork,
state: Arc<State>,
seen_blocks: BTreeSet<(NonZeroU64, HashOf<SignedBlock>)>,
seen_blocks: BTreeSet<(NonZeroUsize, HashOf<SignedBlock>)>,
latest_height: usize,
}

Expand Down Expand Up @@ -91,7 +91,7 @@ impl BlockSynchronizer {
self.latest_height = now_height;

self.seen_blocks
.retain(|(height, _hash)| height.get() >= now_height as u64);
.retain(|(height, _hash)| height.get() >= now_height);

if let Some(random_peer) = self.network.online_peers(Self::random_peer) {
self.request_latest_blocks_from_peer(random_peer.id().clone())
Expand All @@ -101,7 +101,7 @@ impl BlockSynchronizer {

/// Get a random online peer.
#[allow(clippy::disallowed_types)]
pub fn random_peer(peers: &std::collections::HashSet<PeerId>) -> Option<Peer> {
fn random_peer(peers: &std::collections::HashSet<PeerId>) -> Option<Peer> {
use rand::{seq::IteratorRandom, SeedableRng};

let rng = &mut rand::rngs::StdRng::from_entropy();
Expand All @@ -115,13 +115,13 @@ impl BlockSynchronizer {
(state_view.prev_block_hash(), state_view.latest_block_hash())
};
message::Message::GetBlocksAfter(message::GetBlocksAfter::new(
latest_hash,
self.peer_id.clone(),
prev_hash,
latest_hash,
self.seen_blocks
.iter()
.map(|(_height, hash)| hash.clone())
.map(|(_height, hash)| *hash)
.collect(),
self.peer_id.clone(),
))
.send_to(&self.network, peer_id)
.await;
Expand Down Expand Up @@ -152,53 +152,52 @@ impl BlockSynchronizer {

pub mod message {
//! Module containing messages for [`BlockSynchronizer`](super::BlockSynchronizer).
use std::num::NonZeroUsize;

use super::*;

/// Get blocks after some block
#[derive(Debug, Clone, Decode, Encode)]
#[derive(Debug, Clone, Encode)]
pub struct GetBlocksAfter {
/// Hash of latest available block
pub latest_hash: Option<HashOf<SignedBlock>>,
/// Peer id
pub peer_id: PeerId,
/// Hash of second to latest block
pub prev_hash: Option<HashOf<SignedBlock>>,
/// Hash of latest available block
pub latest_hash: Option<HashOf<SignedBlock>>,
/// The block hashes already seen
pub seen_blocks: BTreeSet<HashOf<SignedBlock>>,
/// Peer id
pub peer_id: PeerId,
}

impl GetBlocksAfter {
/// Construct [`GetBlocksAfter`].
pub const fn new(
latest_hash: Option<HashOf<SignedBlock>>,
peer_id: PeerId,
prev_hash: Option<HashOf<SignedBlock>>,
latest_hash: Option<HashOf<SignedBlock>>,
seen_blocks: BTreeSet<HashOf<SignedBlock>>,
peer_id: PeerId,
) -> Self {
Self {
latest_hash,
peer_id,
prev_hash,
latest_hash,
seen_blocks,
peer_id,
}
}
}

/// Message variant to share blocks to peer
#[derive(Debug, Clone, Decode, Encode)]
#[derive(Debug, Clone, Encode)]
pub struct ShareBlocks {
/// Blocks
pub blocks: Vec<SignedBlock>,
/// Peer id
pub peer_id: PeerId,
/// Blocks
pub blocks: Vec<SignedBlock>,
}

impl ShareBlocks {
/// Construct [`ShareBlocks`].
pub const fn new(blocks: Vec<SignedBlock>, peer_id: PeerId) -> Self {
Self { blocks, peer_id }
Self { peer_id, blocks }
}
}

Expand All @@ -214,13 +213,13 @@ pub mod message {
impl Message {
/// Handles the incoming message.
#[iroha_futures::telemetry_future]
pub async fn handle_message(&self, block_sync: &mut BlockSynchronizer) {
pub(super) async fn handle_message(&self, block_sync: &mut BlockSynchronizer) {
match self {
Message::GetBlocksAfter(GetBlocksAfter {
latest_hash,
peer_id,
prev_hash,
latest_hash,
seen_blocks,
peer_id,
}) => {
let local_latest_block_hash = block_sync.state.view().latest_block_hash();

Expand All @@ -230,42 +229,37 @@ pub mod message {
return;
}

let start_height = match prev_hash {
Some(hash) => match block_sync.kura.get_block_height_by_hash(hash) {
None => {
error!(
peer_id=%block_sync.peer_id,
block=%hash,
"Block hash not found"
);
return;
}
// It's get blocks *after*, so we add 1.
Some(height) => height
.checked_add(1)
.expect("INTERNAL BUG: Block height exceeds usize::MAX"),
},
None => nonzero_ext::nonzero!(1_usize),
let start_height = if let Some(hash) = *prev_hash {
let Some(height) = block_sync.kura.get_block_height_by_hash(hash) else {
error!(
peer=%block_sync.peer_id,
block=%hash,
"Block hash not found"
);

return;
};

height
.checked_add(1)
.expect("INTERNAL BUG: Blockchain height overflow")
} else {
nonzero_ext::nonzero!(1_usize)
};

let blocks = (start_height.get()..)
.take(block_sync.gossip_size.get() as usize + 1)
.map_while(|height| {
NonZeroUsize::new(height)
.and_then(|height| block_sync.kura.get_block_by_height(height))
})
let blocks = block_sync
.state
.view()
.all_blocks(start_height)
.skip_while(|block| Some(block.hash()) == *latest_hash)
.filter(|block| !seen_blocks.contains(&block.hash()))
.skip_while(|block| seen_blocks.contains(&block.hash()))
.take(block_sync.gossip_size.get() as usize)
.map(|block| (*block).clone())
.collect::<Vec<_>>();

if blocks.is_empty() {
// The only case where the blocks array could be empty is if we got queried for blocks
// after the latest hash. There is a check earlier in the function that returns early
// so it should not be possible for us to get here.
error!(hash=?prev_hash, "Blocks array is empty but shouldn't be.");
} else {
if !blocks.is_empty() {
trace!(hash=?prev_hash, "Sharing blocks after hash");

Message::ShareBlocks(ShareBlocks::new(blocks, block_sync.peer_id.clone()))
.send_to(&block_sync.network, peer_id.clone())
.await;
Expand All @@ -275,9 +269,13 @@ pub mod message {
use crate::sumeragi::message::BlockSyncUpdate;

for block in blocks.clone() {
block_sync
.seen_blocks
.insert((block.header().height(), block.hash()));
let height = block
.header()
.height()
.try_into()
.expect("INTERNAL BUG: block height exceeds usize::MAX");

block_sync.seen_blocks.insert((height, block.hash()));
let msg = BlockSyncUpdate::from(&block);
block_sync.sumeragi.incoming_block_message(msg);
}
Expand All @@ -288,7 +286,7 @@ pub mod message {
/// Send this message over the network to the specified `peer`.
#[iroha_futures::telemetry_future]
#[log("TRACE")]
pub async fn send_to(self, network: &IrohaNetwork, peer: PeerId) {
pub(super) async fn send_to(self, network: &IrohaNetwork, peer: PeerId) {
let data = NetworkMessage::BlockSync(Box::new(self));
let message = Post {
data,
Expand All @@ -297,4 +295,79 @@ pub mod message {
network.post(message);
}
}

mod candidate {
use parity_scale_codec::Input;

use super::*;

#[derive(Decode)]
struct GetBlocksAfterCandidate {
peer: PeerId,
prev_hash: Option<HashOf<SignedBlock>>,
latest_hash: Option<HashOf<SignedBlock>>,
seen_blocks: BTreeSet<HashOf<SignedBlock>>,
}

#[derive(Decode)]
struct ShareBlocksCandidate {
peer: PeerId,
blocks: Vec<SignedBlock>,
}

impl GetBlocksAfterCandidate {
fn validate(self) -> Result<GetBlocksAfter, parity_scale_codec::Error> {
if self.prev_hash.is_some() && self.latest_hash.is_none() {
return Err(parity_scale_codec::Error::from(
"Latest hash must be defined if previous hash is",
));
}

Ok(GetBlocksAfter {
peer_id: self.peer,
prev_hash: self.prev_hash,
latest_hash: self.latest_hash,
seen_blocks: self.seen_blocks,
})
}
}

impl ShareBlocksCandidate {
fn validate(self) -> Result<ShareBlocks, parity_scale_codec::Error> {
if self.blocks.is_empty() {
return Err(parity_scale_codec::Error::from("Blocks are empty"));
}

if !self.blocks.windows(2).all(|wnd| {
wnd[1].header().height.get() == wnd[0].header().height.get() - 1
&& wnd[1].header().prev_block_hash == Some(wnd[0].hash())
}) {
return Err(parity_scale_codec::Error::from(
"Blocks are not ordered correctly",
));
}

Ok(ShareBlocks {
peer_id: self.peer,
blocks: self.blocks,
})
}
}

impl Decode for ShareBlocks {
fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
ShareBlocksCandidate::decode(input)?
.validate()
.map_err(Into::into)
}
}

impl Decode for GetBlocksAfter {
fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
GetBlocksAfterCandidate::decode(input)?
.validate()
.map_err(Into::into)
}
}
}
}
4 changes: 2 additions & 2 deletions core/src/kura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ impl Kura {
}

/// Search through blocks for the height of the block with the given hash.
pub fn get_block_height_by_hash(&self, hash: &HashOf<SignedBlock>) -> Option<NonZeroUsize> {
pub fn get_block_height_by_hash(&self, hash: HashOf<SignedBlock>) -> Option<NonZeroUsize> {
self.block_data
.lock()
.iter()
.position(|(block_hash, _block_arc)| block_hash == hash)
.position(|(block_hash, _block_arc)| *block_hash == hash)
.and_then(|idx| idx.checked_add(1))
.and_then(NonZeroUsize::new)
}
Expand Down
10 changes: 6 additions & 4 deletions core/src/smartcontracts/isi/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use iroha_data_model::{
},
};
use iroha_telemetry::metrics;
use nonzero_ext::nonzero;

use super::*;
use crate::{smartcontracts::ValidQuery, state::StateReadOnly};
Expand All @@ -24,7 +25,7 @@ impl ValidQuery for FindBlocks {
state_ro: &'state impl StateReadOnly,
) -> Result<impl Iterator<Item = Self::Item> + 'state, QueryExecutionFail> {
Ok(state_ro
.all_blocks()
.all_blocks(nonzero!(1_usize))
.rev()
.filter(move |block| filter.applies(block))
.map(|block| (*block).clone()))
Expand All @@ -39,7 +40,7 @@ impl ValidQuery for FindBlockHeaders {
state_ro: &'state impl StateReadOnly,
) -> Result<impl Iterator<Item = Self::Item> + 'state, QueryExecutionFail> {
Ok(state_ro
.all_blocks()
.all_blocks(nonzero!(1_usize))
.rev()
.filter(move |block| filter.applies(block.header()))
.map(|block| block.header().clone()))
Expand All @@ -52,8 +53,9 @@ impl ValidSingularQuery for FindBlockHeaderByHash {
let hash = self.hash;

let block = state_ro
.all_blocks()
.find(|block| block.hash() == hash)
.kura()
.get_block_height_by_hash(hash)
.and_then(|height| state_ro.kura().get_block_by_height(height))
.ok_or_else(|| QueryExecutionFail::Find(FindError::Block(hash)))?;

Ok(block.header().clone())
Expand Down
5 changes: 4 additions & 1 deletion core/src/smartcontracts/isi/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,10 @@ mod tests {
async fn find_block_header_by_hash() -> Result<()> {
let state = state_with_test_blocks_and_transactions(1, 1, 1)?;
let state_view = state.view();
let block = state_view.all_blocks().last().expect("state is empty");
let block = state_view
.all_blocks(nonzero!(1_usize))
.last()
.expect("state is empty");

assert_eq!(
FindBlockHeaderByHash::new(block.hash()).execute(&state_view)?,
Expand Down
Loading

0 comments on commit 00680d4

Please sign in to comment.