Skip to content

Commit

Permalink
[refactor] hyperledger-iroha#3412: Exclude transaction gossiping out …
Browse files Browse the repository at this point in the history
…of sumeragi

Signed-off-by: Shanin Roman <shanin1000@yandex.ru>
  • Loading branch information
Erigara committed Apr 27, 2023
1 parent b185545 commit d15c6bb
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 113 deletions.
12 changes: 12 additions & 0 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use iroha_config::{
};
use iroha_core::{
block_sync::{BlockSynchronizer, BlockSynchronizerHandle},
gossiper::{Gossiper, GossiperHandle},
handler::ThreadHandler,
kura::Kura,
prelude::{World, WorldStateView},
Expand Down Expand Up @@ -116,6 +117,7 @@ impl Drop for Iroha {
struct NetworkRelay {
sumeragi: Arc<Sumeragi>,
block_sync: BlockSynchronizerHandle,
gossiper: GossiperHandle,
network: IrohaNetwork,
shutdown_notify: Arc<Notify>,
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -158,6 +160,7 @@ impl NetworkRelay {
self.sumeragi.incoming_message(data.into_v1());
}
BlockSync(data) => self.block_sync.message(data.into_v1()).await,
Gossiper(data) => self.gossiper.gossip(*data).await,
Health => {}
}
}
Expand Down Expand Up @@ -293,11 +296,20 @@ impl Iroha {
)
.start();

let gossiper = Gossiper::from_configuration(
&config.sumeragi,
network.clone(),
Arc::clone(&queue),
Arc::clone(&sumeragi),
)
.start();

let freeze_status = Arc::new(AtomicBool::new(false));

NetworkRelay {
sumeragi: Arc::clone(&sumeragi),
block_sync,
gossiper,
network: network.clone(),
shutdown_notify: Arc::clone(&notify_shutdown),
#[cfg(debug_assertions)]
Expand Down
167 changes: 167 additions & 0 deletions core/src/gossiper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
//! Gossiper is actor which is responsible for transaction gossiping

use std::{sync::Arc, time::Duration};

use iroha_config::sumeragi::Configuration;
use iroha_data_model::transaction::{
AcceptedTransaction, Transaction, TransactionLimits, VersionedAcceptedTransaction,
VersionedSignedTransaction,
};
use iroha_p2p::Broadcast;
use parity_scale_codec::{Decode, Encode};
use tokio::sync::mpsc;

use crate::{
prelude::WorldStateView, queue::Queue, sumeragi::Sumeragi, IrohaNetwork, NetworkMessage,
};

/// [`Gossiper`] actor handle.
#[derive(Clone)]
pub struct GossiperHandle {
message_sender: mpsc::Sender<TransactionGossip>,
}

impl GossiperHandle {
/// Send [`TransactionGossip`] to actor
pub async fn gossip(&self, gossip: TransactionGossip) {
self.message_sender
.send(gossip)
.await
.expect("Gossiper must handle messages until there is at least one handle to it")
}
}

/// Actor to gossip transactions and receive transaction gossips
pub struct Gossiper {
/// The size of batch that is being gossiped. Smaller size leads
/// to longer time to synchronise, useful if you have high packet loss.
gossip_batch_size: u32,
/// The time between gossiping. More frequent gossiping shortens
/// the time to sync, but can overload the network.
gossip_period: Duration,
/// Address of queue
queue: Arc<Queue>,
/// [`iroha_p2p::Network`] actor handle
network: IrohaNetwork,
/// Sumearagi
sumeragi: Arc<Sumeragi>,
/// Limits that all transactions need to obey, in terms of size
/// of WASM blob and number of instructions.
transaction_limits: TransactionLimits,
}

impl Gossiper {
/// Start [`Self`] actor.
pub fn start(self) -> GossiperHandle {
let (message_sender, message_receiver) = mpsc::channel(1);
tokio::task::spawn(self.run(message_receiver));
GossiperHandle { message_sender }
}

/// Construct [`Self`] from configuration
pub fn from_configuration(
// Currently we are using configuration parameters from sumeragi not to break configuration
configuartion: &Configuration,
network: IrohaNetwork,
queue: Arc<Queue>,
sumeragi: Arc<Sumeragi>,
) -> Self {
Self {
queue,
sumeragi,
network,
transaction_limits: configuartion.transaction_limits,
gossip_batch_size: configuartion.gossip_batch_size,
gossip_period: Duration::from_millis(configuartion.gossip_period_ms),
}
}

async fn run(self, mut message_receiver: mpsc::Receiver<TransactionGossip>) {
let mut gossip_period = tokio::time::interval(self.gossip_period);
#[allow(clippy::arithmetic_side_effects)]
loop {
tokio::select! {
_ = gossip_period.tick() => self.gossip_transactions(),
transaction_gossip = message_receiver.recv() => {
let Some(transaction_gossip) = transaction_gossip else {
iroha_logger::info!("All handler to Gossiper are dropped. Shutting down...");
break;
};
self.handle_transaction_gossip(transaction_gossip);
}
}
tokio::task::yield_now().await;
}
}

fn gossip_transactions(&self) {
let txs = self
.queue
.n_random_transactions(self.gossip_batch_size, &self.sumeragi.wsv_mutex_access());

if txs.is_empty() {
iroha_logger::debug!("Nothing to gossip");
return;
}

iroha_logger::trace!(tx_count = txs.len(), "Gossiping transactions");
self.network.broadcast(Broadcast {
data: NetworkMessage::Gossiper(Box::new(TransactionGossip::new(txs))),
});
}

fn handle_transaction_gossip(&self, TransactionGossip { txs }: TransactionGossip) {
iroha_logger::trace!(size = txs.len(), "Received new transaction gossip");
for tx in txs {
enqueue_transaction(
tx,
&self.queue,
&self.sumeragi.wsv_mutex_access(),
&self.transaction_limits,
);
}
}
}

fn enqueue_transaction(
tx: VersionedSignedTransaction,
queue: &Queue,
wsv: &WorldStateView,
transaction_limits: &TransactionLimits,
) {
let tx = tx.into_v1();

match AcceptedTransaction::accept::<false>(tx, transaction_limits) {
Ok(tx) => match queue.push(tx.into(), wsv) {
Ok(_) => {}
Err(crate::queue::Failure {
tx,
err: crate::queue::Error::InBlockchain,
}) => {
iroha_logger::debug!(tx_hash = %tx.hash(), "Transaction already in blockchain, ignoring...")
}
Err(crate::queue::Failure { tx, err }) => {
iroha_logger::error!(?err, tx_hash = %tx.hash(), "Failed to enqueue transaction.")
}
},
Err(err) => iroha_logger::error!(%err, "Transaction rejected"),
}
}

/// Message for gossiping batches of transactions.
#[derive(Decode, Encode, Debug, Clone)]
pub struct TransactionGossip {
/// Batch of transactions.
pub txs: Vec<VersionedSignedTransaction>,
}

impl TransactionGossip {
/// Constructor.
pub fn new(txs: Vec<VersionedAcceptedTransaction>) -> Self {
Self {
// Converting into non-accepted transaction because it's not possible
// to guarantee that the sending peer checked transaction limits
txs: txs.into_iter().map(Into::into).collect(),
}
}
}
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod block;
pub mod block_sync;
pub mod gossiper;
pub mod kura;
pub mod modules;
pub mod queue;
Expand All @@ -14,6 +15,7 @@ pub mod wsv;
use core::time::Duration;

use dashmap::{DashMap, DashSet};
use gossiper::TransactionGossip;
use iroha_data_model::{permission::Permissions, prelude::*};
use parity_scale_codec::{Decode, Encode};
use tokio::sync::broadcast;
Expand Down Expand Up @@ -58,6 +60,8 @@ pub enum NetworkMessage {
SumeragiPacket(Box<SumeragiPacket>),
/// Block sync message
BlockSync(Box<BlockSyncMessage>),
/// Gossipper message
Gossiper(Box<TransactionGossip>),
/// Health check message
Health,
}
Expand Down
Loading

0 comments on commit d15c6bb

Please sign in to comment.