Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mempool: remove disconnected orphans #1152

Merged
merged 5 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ common = { path = '../common' }
crypto = { path = '../crypto' }
logging = { path = '../logging' }
mempool-types = { path = 'types' }
p2p-types = { path = '../p2p/types' }
pos_accounting = { path = '../pos_accounting' }
rpc = { path = '../rpc' }
serialization = { path = '../serialization' }
Expand Down
3 changes: 3 additions & 0 deletions mempool/src/interface/mempool_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pub trait MempoolInterface: Send + Sync {
/// Get the fee rate such that it would put the new transaction in the top X MB of the mempool
/// making it less likely to get rejected or trimmed in the case the mempool is full
fn get_fee_rate(&self, in_top_x_mb: usize) -> Result<FeeRate, Error>;

/// Notify mempool given peer has disconnected
fn notify_peer_disconnected(&mut self, peer_id: p2p_types::PeerId);
}

#[async_trait::async_trait]
Expand Down
6 changes: 5 additions & 1 deletion mempool/src/interface/mempool_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ impl MempoolInterface for Mempool {
}

fn get_fee_rate(&self, in_top_x_mb: usize) -> Result<FeeRate, Error> {
self.get_fee_rate(in_top_x_mb).map_err(Error::Policy)
Ok(self.get_fee_rate(in_top_x_mb)?)
}

fn notify_peer_disconnected(&mut self, peer_id: p2p_types::PeerId) {
self.on_peer_disconnected(peer_id)
}
}

Expand Down
4 changes: 4 additions & 0 deletions mempool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,10 @@ impl<M: MemoryUsageEstimator> Mempool<M> {
Ok(())
}

pub fn on_peer_disconnected(&mut self, peer_id: p2p_types::PeerId) {
self.orphans.remove_by_origin(TxOrigin::Peer(peer_id));
}

pub fn get_fee_rate(&self, in_top_x_mb: usize) -> Result<FeeRate, MempoolPolicyError> {
let mut total_size = 0;
self.store
Expand Down
33 changes: 32 additions & 1 deletion mempool/src/pool/orphans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crypto::random::{make_pseudo_rng, Rng};
use logging::log;
use utils::{const_value::ConstValue, ensure};

use super::{OrphanPoolError, Time, TxDependency, TxEntry};
use super::{OrphanPoolError, Time, TxDependency, TxEntry, TxOrigin};
use crate::config;
pub use detect::OrphanType;

Expand Down Expand Up @@ -64,6 +64,9 @@ struct TxOrphanPoolMaps {

/// Transactions indexed by their dependencies
by_deps: BTreeSet<(TxDependency, InternalId)>,

/// Transactions indexed by the origin
by_origin: BTreeSet<(TxOrigin, InternalId)>,
}

impl TxOrphanPoolMaps {
Expand All @@ -72,6 +75,7 @@ impl TxOrphanPoolMaps {
by_tx_id: BTreeMap::new(),
by_insertion_time: BTreeSet::new(),
by_deps: BTreeSet::new(),
by_origin: BTreeSet::new(),
}
}

Expand All @@ -82,6 +86,9 @@ impl TxOrphanPoolMaps {
let inserted = self.by_insertion_time.insert((entry.creation_time(), iid));
assert!(inserted, "Tx entry already in insertion time map");

let inserted = self.by_origin.insert((entry.origin(), iid));
assert!(inserted, "Tx entry already in the origin map");

self.by_deps.extend(entry.requires().map(|dep| (dep, iid)));
}

Expand All @@ -91,6 +98,9 @@ impl TxOrphanPoolMaps {
let removed = self.by_insertion_time.remove(&(entry.creation_time(), iid));
assert!(removed, "Tx entry not present in the insertion time map");

let removed = self.by_origin.remove(&(entry.origin(), iid));
assert!(removed, "Tx entry not present in the origin map");

entry.requires().for_each(|dep| {
self.by_deps.remove(&(dep, iid));
})
Expand Down Expand Up @@ -263,6 +273,27 @@ impl TxOrphanPool {

n_evicted
}

/// Remove orphans for given originator
pub fn remove_by_origin(&mut self, origin: TxOrigin) -> usize {
let mut n_removed = 0;

while let Some(iid) = self.pick_by_origin(origin) {
let _ = self.remove_at(iid);
n_removed += 1;
}

n_removed
}

/// Pick one orphan from given origin
fn pick_by_origin(&self, origin: TxOrigin) -> Option<InternalId> {
self.maps
.by_origin
.range((origin, InternalId::ZERO)..=(origin, InternalId::MAX))
.map(|(_origin, iid)| *iid)
.next()
}
alfiedotwtf marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
Expand Down
32 changes: 30 additions & 2 deletions mempool/src/pool/orphans/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ fn check_integrity(orphans: &TxOrphanPool) {
});
}

fn random_peer_origin(rng: &mut impl Rng) -> TxOrigin {
TxOrigin::Peer(p2p_types::PeerId::from_u64(rng.gen_range(0u64..20)))
}

fn random_tx_entry(rng: &mut impl Rng) -> TxEntry {
let n_inputs = rng.gen_range(1..=10);
let inputs: Vec<_> = (0..n_inputs)
Expand All @@ -73,7 +77,14 @@ fn random_tx_entry(rng: &mut impl Rng) -> TxEntry {
let transaction = SignedTransaction::new(transaction, signatures).unwrap();
let insertion_time = Time::from_secs(rng.gen());

TxEntry::new(transaction, insertion_time, crate::TxOrigin::LocalMempool)
let origin = match rng.gen_range(0..4) {
0 | 1 => random_peer_origin(rng),
2 => TxOrigin::LocalMempool,
3 => TxOrigin::LocalP2p,
_ => panic!("out of range"),
};

TxEntry::new(transaction, insertion_time, origin)
}

#[rstest]
Expand Down Expand Up @@ -138,7 +149,7 @@ fn simulation(#[case] seed: Seed) {

for _ in 0..300 {
let len_before = orphans.len();
match rng.gen_range(0..=4) {
match rng.gen_range(0..=5) {
// Insert a random tx
0..=1 => {
let entry = random_tx_entry(&mut rng);
Expand Down Expand Up @@ -180,6 +191,23 @@ fn simulation(#[case] seed: Seed) {
assert!(orphans.len() <= len_before);
}

// Delete all txs by origin
5..=5 => {
let origin = match rng.gen_range(0..=5) {
0..=3 => random_peer_origin(&mut rng),
4..=4 => TxOrigin::LocalMempool,
5..=5 => TxOrigin::LocalP2p,
_ => panic!("out of range"),
};
orphans.remove_by_origin(origin);
let count = orphans
.maps
.by_origin
.range((origin, InternalId::ZERO)..=(origin, InternalId::MAX))
.count();
assert_eq!(count, 0, "Removing txs by origin {origin:?} failed");
}

// This should not be generated
i => panic!("Out of range: {i}"),
}
Expand Down
1 change: 1 addition & 0 deletions mocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pos_accounting = { path = '../pos_accounting/' }
subsystem = { path = "../subsystem/" }
utils = { path = "../utils/" }
utxo = { path = "../utxo/" }
p2p-types = { path = "../p2p/types" }

async-trait.workspace = true
mockall.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions mocks/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ impl MempoolInterface for MempoolInterfaceMock {
fn get_fee_rate(&self, _in_top_x_mb: usize) -> Result<FeeRate, Error> {
Ok(FeeRate::new(Amount::ZERO))
}

fn notify_peer_disconnected(&mut self, _peer_id: p2p_types::PeerId) {
unimplemented!()
}
}

#[async_trait::async_trait]
Expand Down
10 changes: 5 additions & 5 deletions node-lib/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ async fn initialize(
let p2p = p2p::make_p2p(
Arc::clone(&chain_config),
Arc::new(node_config.p2p.unwrap_or_default().into()),
chainstate.clone(),
mempool.clone(),
subsystem::Handle::clone(&chainstate),
subsystem::Handle::clone(&mempool),
Default::default(),
peerdb_storage,
)?;
Expand All @@ -134,9 +134,9 @@ async fn initialize(
blockprod::make_blockproduction(
Arc::clone(&chain_config),
Arc::new(node_config.blockprod.unwrap_or_default().into()),
chainstate.clone(),
mempool.clone(),
p2p.clone(),
subsystem::Handle::clone(&chainstate),
subsystem::Handle::clone(&mempool),
subsystem::Handle::clone(&p2p),
Default::default(),
)?,
);
Expand Down
18 changes: 15 additions & 3 deletions p2p/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ where
},

event = self.sync_event_receiver.poll_next() => {
self.handle_peer_event(event?);
self.handle_peer_event(event?).await;
},
}
}
Expand Down Expand Up @@ -276,16 +276,28 @@ where
}

/// Sends an event to the corresponding peer.
fn handle_peer_event(&mut self, event: SyncingEvent) {
async fn handle_peer_event(&mut self, event: SyncingEvent) {
match event {
SyncingEvent::Connected {
peer_id,
services,
sync_rx,
} => self.register_peer(peer_id, services, sync_rx),
SyncingEvent::Disconnected { peer_id } => self.unregister_peer(peer_id),
SyncingEvent::Disconnected { peer_id } => {
Self::notify_mempool_peer_disconnected(&self.mempool_handle, peer_id).await;
self.unregister_peer(peer_id);
}
}
}

async fn notify_mempool_peer_disconnected(mempool_handle: &MempoolHandle, peer_id: PeerId) {
mempool_handle
.call_mut(move |mempool| mempool.notify_peer_disconnected(peer_id))
.await
.unwrap_or_else(|err| {
log::error!("Mempool dead upon peer {peer_id} disconnect: {err}");
})
}
}

/// Returns a receiver for the chainstate `NewTip` events.
Expand Down
1 change: 1 addition & 0 deletions p2p/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ pub mod services;
pub mod socket_address;

pub use global_ip::IsGlobalIp;
pub use peer_id::PeerId;
4 changes: 4 additions & 0 deletions p2p/types/src/peer_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl PeerId {
let id = NEXT_PEER_ID.fetch_add(1, Ordering::Relaxed);
Self(id)
}

pub fn from_u64(n: u64) -> Self {
iljakuklic marked this conversation as resolved.
Show resolved Hide resolved
Self(n)
}
}

impl std::fmt::Display for PeerId {
Expand Down
71 changes: 71 additions & 0 deletions test/functional/mempool_orphan_peer_disconnected.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
# Copyright (c) 2023 RBB S.r.l
# opensource@mintlayer.org
# SPDX-License-Identifier: MIT
# Licensed under the MIT License;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Mempool orphan from disconnected peer

Check that:
* A peer sees an orphan transaction
* When the originator disconnects, the orphan is removed
"""

from test_framework.test_framework import BitcoinTestFramework
from test_framework.mintlayer import (make_tx, reward_input, tx_input)
import scalecodec

class MempoolOrphanFromDisconnectedPeerTest(BitcoinTestFramework):

def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
self.extra_args = [[], []]

def setup_network(self):
self.setup_nodes()
self.sync_all(self.nodes[0:1])
self.connect_nodes(0, 1)

def run_test(self):
node0 = self.nodes[0]
node1 = self.nodes[1]

# Get genesis ID
genesis_id = node0.chainstate_best_block_id()

(tx1, tx1_id) = make_tx([ reward_input(genesis_id) ], [ 1_000_000 ] )
(tx2, tx2_id) = make_tx([ tx_input(tx1_id) ], [ 900_000 ] )

# Submit two transactions that build on top of each other but only propagate the second one
node0.mempool_submit_transaction(tx1)
node0.p2p_submit_transaction(tx2)

# Check the node gets the orphan transaction
self.wait_until(lambda: node1.mempool_contains_orphan_tx(tx2_id), timeout = 5)

# Now disconnect the nodes and check the orphan is gone
self.disconnect_nodes(0, 1)
self.wait_until(lambda: not node1.mempool_contains_orphan_tx(tx2_id), timeout = 5)

# Some final sanity checks
assert node0.mempool_contains_tx(tx1_id)
assert node0.mempool_contains_tx(tx2_id)
assert not node1.mempool_contains_tx(tx1_id)
assert not node1.mempool_contains_tx(tx2_id)
assert not node1.mempool_contains_orphan_tx(tx1_id)
assert not node1.mempool_contains_orphan_tx(tx2_id)


if __name__ == '__main__':
MempoolOrphanFromDisconnectedPeerTest().main()
1 change: 1 addition & 0 deletions test/functional/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class UnicodeOnWindowsError(ValueError):
'mempool_basic_reorg.py',
'mempool_eviction.py',
'mempool_ibd.py',
'mempool_orphan_peer_disconnected.py',
'mempool_submit_orphan.py',
'mempool_submit_tx.py',
'mempool_timelocked_tx.py',
Expand Down
Loading