Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
change everything to transaction (#6440)
Browse files Browse the repository at this point in the history
  • Loading branch information
NikVolf committed Jun 21, 2020
1 parent 6c16d15 commit ef2a6c1
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 73 deletions.
121 changes: 61 additions & 60 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ pub use generic_proto::LegacyConnectionKillError;
const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
/// Interval at which we propagate extrinsics;
/// Interval at which we propagate transactions;
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);

/// Maximim number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// Maximim number of known extrinsic hashes to keep for a peer.
/// Maximim number of known transaction hashes to keep for a peer.
///
/// This should be approx. 2 blocks full of transactions for the network to function properly.
const MAX_KNOWN_EXTRINSICS: usize = 10240; // ~300kb per peer + overhead.
const MAX_KNOWN_TRANSACTIONS: usize = 10240; // ~300kb per peer + overhead.

/// Maximim number of transaction validation request we keep at any moment.
const MAX_PENDING_TRANSACTIONS: usize = 8192;
Expand All @@ -106,25 +106,25 @@ mod rep {
pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
/// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// Reputation change when a peer sends us any extrinsic.
/// Reputation change when a peer sends us any transaction.
///
/// This forces node to verify it, thus the negative value here. Once extrinsic is verified,
/// reputation change should be refunded with `ANY_EXTRINSIC_REFUND`
pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic");
/// Reputation change when a peer sends us any extrinsic that is not invalid.
pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)");
/// Reputation change when a peer sends us an extrinsic that we didn't know about.
pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic");
/// Reputation change when a peer sends us a bad extrinsic.
pub const BAD_EXTRINSIC: Rep = Rep::new(-(1 << 12), "Bad extrinsic");
/// This forces node to verify it, thus the negative value here. Once transaction is verified,
/// reputation change should be refunded with `ANY_TRANSACTION_REFUND`
pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
/// Reputation change when a peer sends us any transaction that is not invalid.
pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
/// Reputation change when a peer sends us an transaction that we didn't know about.
pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
/// Reputation change when a peer sends us a bad transaction.
pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
/// We sent an RPC query to the given node, but it failed.
pub const RPC_FAILED: Rep = Rep::new(-(1 << 12), "Remote call failed");
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// We received an unexpected response.
pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet");
/// We received an unexpected extrinsic packet.
pub const UNEXPECTED_EXTRINSICS: Rep = Rep::new_fatal("Unexpected extrinsics packet");
/// We received an unexpected transaction packet.
pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet");
/// We received an unexpected light node request.
pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet");
/// Peer has different genesis.
Expand All @@ -145,7 +145,7 @@ struct Metrics {
fork_targets: Gauge<U64>,
finality_proofs: GaugeVec<U64>,
justifications: GaugeVec<U64>,
propagated_extrinsics: Counter<U64>,
propagated_transactions: Counter<U64>,
}

impl Metrics {
Expand Down Expand Up @@ -191,8 +191,8 @@ impl Metrics {
)?;
register(g, r)?
},
propagated_extrinsics: register(Counter::new(
"sync_propagated_extrinsics",
propagated_transactions: register(Counter::new(
"sync_propagated_transactions",
"Number of transactions propagated to at least one peer",
)?, r)?,
})
Expand Down Expand Up @@ -221,11 +221,11 @@ impl Future for PendingTransaction {
pub struct Protocol<B: BlockT, H: ExHashT> {
/// Interval at which we call `tick`.
tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Interval at which we call `propagate_extrinsics`.
/// Interval at which we call `propagate_transactions`.
propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
/// Pending extrinsic verification tasks.
/// Pending transactions verification tasks.
pending_transactions: FuturesUnordered<PendingTransaction>,
config: ProtocolConfig,
genesis_hash: B::Hash,
Expand Down Expand Up @@ -280,7 +280,7 @@ struct Peer<B: BlockT, H: ExHashT> {
/// Requests we are no longer interested in.
obsolete_requests: HashMap<message::RequestId, Instant>,
/// Holds a set of transactions known to this peer.
known_extrinsics: LruHashSet<H>,
known_transactions: LruHashSet<H>,
/// Holds a set of blocks known to this peer.
known_blocks: LruHashSet<B::Hash>,
/// Request counter,
Expand Down Expand Up @@ -601,7 +601,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
return outcome;
},
GenericMessage::Transactions(m) =>
self.on_extrinsics(who, m),
self.on_transactions(who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request),
GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
Expand Down Expand Up @@ -720,8 +720,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
// Print some diagnostics.
if let Some(peer) = self.context_data.peers.get(&who) {
debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
who, peer.info.protocol_version, peer.info.roles, peer.known_extrinsics, peer.known_blocks,
known_transactions: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
who, peer.info.protocol_version, peer.info.roles, peer.known_transactions, peer.known_blocks,
peer.info.best_hash, peer.info.best_number);
} else {
debug!(target: "sync", "Peer clogged before being properly connected");
Expand Down Expand Up @@ -1048,7 +1048,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let peer = Peer {
info,
block_request: None,
known_extrinsics: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_EXTRINSICS)
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
Expand Down Expand Up @@ -1137,28 +1137,29 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.map(|(peer_id, peer)| (peer_id, peer.info.roles))
}

/// Called when peer sends us new extrinsics
fn on_extrinsics(
/// Called when peer sends us new transactions
fn on_transactions(
&mut self,
who: PeerId,
extrinsics: message::Transactions<B::Extrinsic>
transactions: message::Transactions<B::Extrinsic>
) {
// sending extrinsic to light node is considered a bad behavior
// sending transaction to light node is considered a bad behavior
if !self.config.roles.is_full() {
trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who);
trace!(target: "sync", "Peer {} is trying to send transactions to the light node", who);
self.behaviour.disconnect_peer(&who);
self.peerset_handle.report_peer(who, rep::UNEXPECTED_EXTRINSICS);
self.peerset_handle.report_peer(who, rep::UNEXPECTED_TRANSACTIONS);
return;
}

// Accept extrinsics only when fully synced
// Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle {
trace!(target: "sync", "{} Ignoring extrinsics while syncing", who);
trace!(target: "sync", "{} Ignoring transactions while syncing", who);
return;
}
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);

trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who);
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
for t in extrinsics {
for t in transactions {
if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
debug!(
target: "sync",
Expand All @@ -1169,9 +1170,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}

let hash = self.transaction_pool.hash_of(&t);
peer.known_extrinsics.insert(hash);
peer.known_transactions.insert(hash);

self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC);
self.peerset_handle.report_peer(who.clone(), rep::ANY_TRANSACTION);

self.pending_transactions.push(PendingTransaction {
peer_id: who.clone(),
Expand All @@ -1181,45 +1182,45 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) {
fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
match import {
TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND),
TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC),
TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC),
TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_TRANSACTION_REFUND),
TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_TRANSACTION),
TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_TRANSACTION),
TransactionImport::None => {},
}
}

/// Propagate one extrinsic.
pub fn propagate_extrinsic(
/// Propagate one transaction.
pub fn propagate_transaction(
&mut self,
hash: &H,
) {
debug!(target: "sync", "Propagating extrinsic [{:?}]", hash);
debug!(target: "sync", "Propagating transaction [{:?}]", hash);
// Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle {
return;
}
if let Some(extrinsic) = self.transaction_pool.transaction(hash) {
let propagated_to = self.do_propagate_extrinsics(&[(hash.clone(), extrinsic)]);
if let Some(transaction) = self.transaction_pool.transaction(hash) {
let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
self.transaction_pool.on_broadcasted(propagated_to);
}
}

fn do_propagate_extrinsics(
fn do_propagate_transactions(
&mut self,
extrinsics: &[(H, B::Extrinsic)],
transactions: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() {
// never send extrinsics to the light node
// never send transactions to the light node
if !peer.info.roles.is_full() {
continue;
}

let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
let (hashes, to_send): (Vec<_>, Vec<_>) = transactions
.iter()
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
.filter(|&(ref hash, _)| peer.known_transactions.insert(hash.clone()))
.cloned()
.unzip();

Expand All @@ -1244,22 +1245,22 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {

if propagated_to.len() > 0 {
if let Some(ref metrics) = self.metrics {
metrics.propagated_extrinsics.inc();
metrics.propagated_transactions.inc();
}
}

propagated_to
}

/// Call when we must propagate ready extrinsics to peers.
pub fn propagate_extrinsics(&mut self) {
debug!(target: "sync", "Propagating extrinsics");
/// Call when we must propagate ready transactions to peers.
pub fn propagate_transactions(&mut self) {
debug!(target: "sync", "Propagating transactions");
// Accept transactions only when fully synced
if self.sync.status().state != SyncState::Idle {
return;
}
let extrinsics = self.transaction_pool.transactions();
let propagated_to = self.do_propagate_extrinsics(&extrinsics);
let transactions = self.transaction_pool.transactions();
let propagated_to = self.do_propagate_transactions(&transactions);
self.transaction_pool.on_broadcasted(propagated_to);
}

Expand Down Expand Up @@ -1983,7 +1984,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}

while let Poll::Ready(Some(())) = self.propagate_timeout.poll_next_unpin(cx) {
self.propagate_extrinsics();
self.propagate_transactions();
}

for (id, mut r) in self.sync.block_requests() {
Expand Down Expand Up @@ -2011,7 +2012,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
self.pending_messages.push_back(event);
}
if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
self.on_handle_extrinsic_import(peer_id, result);
self.on_handle_transaction_import(peer_id, result);
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
Expand Down Expand Up @@ -2050,7 +2051,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
Some(Fallback::Transactions) => {
if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
self.on_extrinsics(peer_id, m);
self.on_transactions(peer_id, m);
} else {
warn!(target: "sub-libp2p", "Failed to decode transactions list");
}
Expand Down
18 changes: 9 additions & 9 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,15 +587,15 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// All transactions will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers.
pub fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsics);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransactions);
}

/// You must call when new transaction is imported by the transaction pool.
///
/// This transaction will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration and propagated to peers.
pub fn propagate_extrinsic(&self, hash: H) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsic(hash));
pub fn propagate_transaction(&self, hash: H) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransaction(hash));
}

/// Make sure an important block is propagated to peers.
Expand Down Expand Up @@ -798,8 +798,8 @@ impl<B, H> NetworkStateInfo for NetworkService<B, H>
///
/// Each entry corresponds to a method of `NetworkService`.
enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
PropagateExtrinsic(H),
PropagateExtrinsics,
PropagateTransaction(H),
PropagateTransactions,
RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash, Vec<u8>),
GetValue(record::Key),
Expand Down Expand Up @@ -1119,10 +1119,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().announce_block(hash, data),
ServiceToWorkerMsg::RequestJustification(hash, number) =>
this.network_service.user_protocol_mut().request_justification(&hash, number),
ServiceToWorkerMsg::PropagateExtrinsic(hash) =>
this.network_service.user_protocol_mut().propagate_extrinsic(&hash),
ServiceToWorkerMsg::PropagateExtrinsics =>
this.network_service.user_protocol_mut().propagate_extrinsics(),
ServiceToWorkerMsg::PropagateTransaction(hash) =>
this.network_service.user_protocol_mut().propagate_transaction(&hash),
ServiceToWorkerMsg::PropagateTransactions =>
this.network_service.user_protocol_mut().propagate_transactions(),
ServiceToWorkerMsg::GetValue(key) =>
this.network_service.get_value(&key),
ServiceToWorkerMsg::PutValue(key, value) =>
Expand Down
8 changes: 4 additions & 4 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ ServiceBuilder<

spawn_handle.spawn(
"on-transaction-imported",
extrinsic_notifications(transaction_pool.clone(), network.clone()),
transaction_notifications(transaction_pool.clone(), network.clone()),
);

// Prometheus metrics.
Expand Down Expand Up @@ -1245,18 +1245,18 @@ ServiceBuilder<
}
}

async fn extrinsic_notifications<TBl, TExPool>(
async fn transaction_notifications<TBl, TExPool>(
transaction_pool: Arc<TExPool>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>
)
where
TBl: BlockT,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
{
// extrinsic notifications
// transaction notifications
transaction_pool.import_notification_stream()
.for_each(move |hash| {
network.propagate_extrinsic(hash);
network.propagate_transaction(hash);
let status = transaction_pool.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
Expand Down

0 comments on commit ef2a6c1

Please sign in to comment.