Skip to content

Commit

Permalink
blockprod: Fix test race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
iljakuklic committed Oct 13, 2023
1 parent 1251575 commit 3426273
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 57 deletions.
24 changes: 11 additions & 13 deletions blockprod/src/detail/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,19 +464,17 @@ impl BlockProduction {
// scratch every time a different timestamp is attempted. That is more costly
// in terms of computational resources but will allow the node to include more
// transactions since the passing time may release some time locks.
let collected_transactions = {
let accumulator = self
.collect_transactions(
current_tip_index.block_id(),
min_constructed_block_timestamp,
transactions.clone(),
transaction_ids.clone(),
packing_strategy,
)
.await?;

accumulator.transactions().clone()
};
let collected_transactions = self
.collect_transactions(
current_tip_index.block_id(),
min_constructed_block_timestamp,
transactions.clone(),
transaction_ids.clone(),
packing_strategy,
)
.await?
.transactions()
.clone();

let block_body = BlockBody::new(block_reward, collected_transactions);

Expand Down
29 changes: 15 additions & 14 deletions blockprod/src/detail/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ mod produce_block {
chain_config,
Arc::new(test_blockprod_config()),
chainstate.clone(),
mempool,
mempool.clone(),
p2p,
Default::default(),
prepare_thread_pool(1),
Expand All @@ -938,7 +938,7 @@ mod produce_block {
job_finished_receiver.await.expect("Job finished receiver closed");

assert_job_count(&block_production, 0).await;
assert_process_block(&chainstate, new_block).await;
assert_process_block(&chainstate, &mempool, new_block).await;
}
});

Expand All @@ -962,7 +962,7 @@ mod produce_block {
chain_config,
Arc::new(test_blockprod_config()),
chainstate.clone(),
mempool,
mempool.clone(),
p2p,
Default::default(),
prepare_thread_pool(1),
Expand All @@ -983,7 +983,7 @@ mod produce_block {
job_finished_receiver.await.expect("Job finished receiver closed");

assert_job_count(&block_production, 0).await;
assert_process_block(&chainstate, new_block).await;
assert_process_block(&chainstate, &mempool, new_block).await;
}
});

Expand Down Expand Up @@ -1095,7 +1095,7 @@ mod produce_block {
chain_config,
Arc::new(test_blockprod_config()),
chainstate.clone(),
mempool,
mempool.clone(),
p2p,
Default::default(),
prepare_thread_pool(1),
Expand All @@ -1115,7 +1115,7 @@ mod produce_block {
job_finished_receiver.await.expect("Job finished receiver closed");

assert_job_count(&block_production, 0).await;
assert_process_block(&chainstate, new_block).await;
assert_process_block(&chainstate, &mempool, new_block).await;
}
});

Expand Down Expand Up @@ -1152,7 +1152,7 @@ mod produce_block {
chain_config,
Arc::new(test_blockprod_config()),
chainstate.clone(),
mempool,
mempool.clone(),
p2p,
Default::default(),
prepare_thread_pool(1),
Expand All @@ -1174,7 +1174,7 @@ mod produce_block {
job_finished_receiver.await.expect("Job finished receiver closed");

assert_job_count(&block_production, 0).await;
assert_process_block(&chainstate, new_block).await;
assert_process_block(&chainstate, &mempool, new_block).await;
}
});

Expand Down Expand Up @@ -1209,7 +1209,7 @@ mod produce_block {
chain_config.clone(),
Arc::new(test_blockprod_config()),
chainstate.clone(),
mempool,
mempool.clone(),
p2p,
Default::default(),
prepare_thread_pool(1),
Expand Down Expand Up @@ -1240,7 +1240,7 @@ mod produce_block {
job_finished_receiver.await.expect("Job finished receiver closed");

assert_job_count(&block_production, 0).await;
assert_process_block(&chainstate, new_block).await;
assert_process_block(&chainstate, &mempool, new_block).await;
}
});

Expand Down Expand Up @@ -1354,7 +1354,7 @@ mod produce_block {
chain_config.clone(),
Arc::new(test_blockprod_config()),
chainstate.clone(),
mempool,
mempool.clone(),
p2p,
Default::default(),
prepare_thread_pool(1),
Expand Down Expand Up @@ -1400,7 +1400,7 @@ mod produce_block {

job_finished_receiver.await.expect("Job finished receiver closed");

assert_process_block(&chainstate, new_block.clone()).await;
assert_process_block(&chainstate, &mempool, new_block.clone()).await;
}
RequiredConsensus::PoS(_) => {
// Try no input data for PoS consensus
Expand Down Expand Up @@ -1457,7 +1457,8 @@ mod produce_block {

job_finished_receiver.await.expect("Job finished receiver closed");

let result = assert_process_block(&chainstate, new_block).await;
let result =
assert_process_block(&chainstate, &mempool, new_block).await;

// Update kernel input parameters for future PoS blocks

Expand Down Expand Up @@ -1528,7 +1529,7 @@ mod produce_block {

job_finished_receiver.await.expect("Job finished receiver closed");

assert_process_block(&chainstate, new_block.clone()).await;
assert_process_block(&chainstate, &mempool, new_block.clone()).await;
}
}
}
Expand Down
38 changes: 33 additions & 5 deletions blockprod/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ mod tests {
Block, ConsensusUpgrade, Destination, Genesis, NetUpgrades, PoSChainConfigBuilder,
TxOutput, UpgradeVersion,
},
primitives::{per_thousand::PerThousand, Amount, BlockHeight, H256},
primitives::{per_thousand::PerThousand, Amount, BlockHeight, Idable, H256},
time_getter::TimeGetter,
};
use crypto::{
Expand All @@ -154,13 +154,37 @@ mod tests {

pub async fn assert_process_block(
chainstate: &ChainstateHandle,
mempool: &MempoolHandle,
new_block: Block,
) -> BlockIndex {
chainstate
let block_id = new_block.get_id();

// Wait for mempool to be up-to-date with the new block. The subscriptions are not cleaned
// up but hopefully it's not too bad just for testing.
let (tip_sx, tip_rx) = tokio::sync::oneshot::channel();
let tip_sx = utils::sync::Mutex::new(Some(tip_sx));
mempool
.call_mut(move |m| {
m.subscribe_to_events(Arc::new({
move |evt| match evt {
mempool::event::MempoolEvent::NewTip(tip) => {
if let Some(tip_sx) = tip_sx.lock().unwrap().take() {
assert_eq!(tip.block_id(), &block_id);
tip_sx.send(()).unwrap();
}
}
mempool::event::MempoolEvent::TransactionProcessed(_) => (),
}
}))
})
.await
.unwrap();

let block_index = chainstate
.call_mut(move |this| {
let new_block_index = this
.process_block(new_block.clone(), BlockSource::Local)
.expect("Failed to process block: {:?}")
.expect("Failed to process block")
.expect("Failed to activate best chain");

assert_eq!(
Expand All @@ -170,7 +194,7 @@ mod tests {
);

let best_block_index =
this.get_best_block_index().expect("Failed to get best block index: {:?}");
this.get_best_block_index().expect("Failed to get best block index");

assert_eq!(
new_block_index.clone().into_gen_block_index().block_id(),
Expand All @@ -181,7 +205,11 @@ mod tests {
new_block_index
})
.await
.expect("New block is not the new tip: {:?}")
.expect("New block is not the new tip");

tip_rx.await.unwrap();

block_index
}

pub fn setup_blockprod_test(
Expand Down
6 changes: 2 additions & 4 deletions mempool/src/pool/collect_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ pub fn collect_txs<M>(
mempool.chainstate_handle.shallow_clone(),
);

let verifier_time = tx_accumulator.block_timestamp();

let best_index = mempool
.blocking_chainstate_handle()
.call(|c| c.get_best_block_index())?
Expand All @@ -106,7 +104,7 @@ pub fn collect_txs<M>(
.iter()
.map(|transaction| {
let _fee =
tx_verifier.connect_transaction(&tx_source, transaction, &verifier_time, None)?;
tx_verifier.connect_transaction(&tx_source, transaction, &block_timestamp, None)?;
Ok(transaction.transaction().get_id())
})
.collect::<Result<Vec<_>, TxValidationError>>()?;
Expand Down Expand Up @@ -195,7 +193,7 @@ pub fn collect_txs<M>(
let verification_result = tx_verifier.connect_transaction(
&tx_source,
next_tx.transaction(),
&verifier_time,
&block_timestamp,
None,
);

Expand Down
8 changes: 7 additions & 1 deletion test/functional/blockprod_generate_blocks_all_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ def run_test(self):
timeout = 5
)

old_tip = node.chainstate_best_block_id()

block_hex = node.blockprod_generate_block(
block_input_data,
transactions,
Expand All @@ -150,11 +152,15 @@ def run_test(self):
for expected_transaction in expected_transactions:
self.assert_transaction_in_block(expected_transaction, block)

old_tip = node.chainstate_best_block_id()
node.chainstate_submit_block(block_hex)
new_tip = node.chainstate_best_block_id()
assert(old_tip != new_tip)

self.wait_until(
lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(),
timeout = 5
)

#
# Check chainstate and mempool is as expected
#
Expand Down
10 changes: 6 additions & 4 deletions test/functional/blockprod_generate_pos_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,23 @@ def assert_tip(self, expected_block):
assert_equal(block, expected_block)

def generate_block(self, expected_height, block_input_data, transactions):
previous_block_id = self.nodes[0].chainstate_best_block_id()
node = self.nodes[0]
previous_block_id = node.chainstate_best_block_id()

# Block production may fail if the Job Manager found a new tip, so try and sleep
for _ in range(5):
try:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
break
except JSONRPCException:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
time.sleep(1)

block_hex_array = bytearray.fromhex(block_hex)
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()

self.nodes[0].chainstate_submit_block(block_hex)
node.chainstate_submit_block(block_hex)
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)

self.assert_tip(block_hex)
self.assert_height(expected_height, block_hex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,23 @@ def block_height(self, n):
return self.nodes[n].chainstate_block_height_in_main_chain(tip)

def generate_block(self, expected_height, block_input_data, transactions):
previous_block_id = self.nodes[0].chainstate_best_block_id()
node = self.nodes[0]
previous_block_id = node.chainstate_best_block_id()

# Block production may fail if the Job Manager found a new tip, so try and sleep
for _ in range(5):
try:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
break
except JSONRPCException:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
time.sleep(1)

block_hex_array = bytearray.fromhex(block_hex)
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()

self.nodes[0].chainstate_submit_block(block_hex)
node.chainstate_submit_block(block_hex)
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)

self.assert_tip(block_hex)
self.assert_height(expected_height, block_hex)
Expand Down
10 changes: 6 additions & 4 deletions test/functional/blockprod_generate_pos_genesis_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,23 @@ def assert_tip(self, expected_block):
assert_equal(block, expected_block)

def generate_block(self, expected_height, block_input_data, transactions):
previous_block_id = self.nodes[0].chainstate_best_block_id()
node = self.nodes[0]
previous_block_id = node.chainstate_best_block_id()

# Block production may fail if the Job Manager found a new tip, so try and sleep
for _ in range(5):
try:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
break
except JSONRPCException:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
time.sleep(1)

block_hex_array = bytearray.fromhex(block_hex)
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()

self.nodes[0].chainstate_submit_block(block_hex)
node.chainstate_submit_block(block_hex)
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)

self.assert_tip(block_hex)
self.assert_height(expected_height, block_hex)
Expand Down
10 changes: 6 additions & 4 deletions test/functional/blockprod_generate_pow_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,23 @@ def block_height(self, n):
return self.nodes[n].chainstate_block_height_in_main_chain(tip)

def generate_block(self, expected_height, block_input_data, transactions):
previous_block_id = self.nodes[0].chainstate_best_block_id()
node = self.nodes[0]
previous_block_id = node.chainstate_best_block_id()

# Block production may fail if the Job Manager found a new tip, so try and sleep
for _ in range(5):
try:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
break
except JSONRPCException:
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
time.sleep(1)

block_hex_array = bytearray.fromhex(block_hex)
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()

self.nodes[0].chainstate_submit_block(block_hex)
node.chainstate_submit_block(block_hex)
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)

self.assert_tip(block_hex)
self.assert_height(expected_height, block_hex)
Expand Down
Loading

0 comments on commit 3426273

Please sign in to comment.