Skip to content

Commit

Permalink
[fix] hyperledger-iroha#3408: Fix `public_keys_cannot_be_burned_to_no…
Browse files Browse the repository at this point in the history
…thing` test

Signed-off-by: Daniil Polyakov <arjentix@gmail.com>
  • Loading branch information
Arjentix committed May 3, 2023
1 parent 6daae99 commit f372f32
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 63 deletions.
34 changes: 18 additions & 16 deletions client/tests/integration/burn_public_keys.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
#![allow(clippy::pedantic, clippy::restriction)]

use std::thread;

use iroha_client::client::{account, transaction, Client};
use iroha_crypto::{KeyPair, PublicKey};
use iroha_data_model::prelude::*;
use test_network::*;

use super::Configuration;

fn submit_and_get(
client: &mut Client,
instructions: impl IntoIterator<Item = InstructionBox>,
submitter: Option<(AccountId, KeyPair)>,
) -> TransactionValue {
let hash = if let Some((account_id, keypair)) = submitter {
let tx = TransactionBuilder::new(account_id, Vec::from_iter(instructions), 100_000)
.sign(keypair)
.unwrap();

client.submit_transaction(tx).unwrap()
let tx = if let Some((account_id, keypair)) = submitter {
let tx = TransactionBuilder::new(account_id, Vec::from_iter(instructions), 100_000);
tx.sign(keypair).unwrap()
} else {
client.submit_all(instructions).unwrap()
let tx = client
.build_transaction(instructions, UnlimitedMetadata::default())
.unwrap();
client.sign_transaction(tx).unwrap()
};

thread::sleep(Configuration::pipeline_time() * 2);
let hash = tx.hash();
let _ = client.submit_transaction_blocking(tx);

// thread::sleep(Configuration::pipeline_time() * 2);

client.request(transaction::by_hash(*hash)).unwrap()
}
Expand All @@ -36,7 +35,6 @@ fn account_keys_count(client: &mut Client, account_id: AccountId) -> usize {
}

#[test]
#[ignore = "TODO (#3408): Fix this flaky test. For some reason Iroha sometimes just ignores last transaction sometimes"]
fn public_keys_cannot_be_burned_to_nothing() {
const KEYS_COUNT: usize = 3;
let charlie_id: AccountId = "charlie@wonderland".parse().expect("Valid");
Expand Down Expand Up @@ -72,7 +70,11 @@ fn public_keys_cannot_be_burned_to_nothing() {
let charlie = client.request(account::by_id(charlie_id.clone())).unwrap();
let mut keys = charlie.signatories();
let burn = |key: PublicKey| InstructionBox::from(BurnBox::new(key, charlie_id.clone()));
let burn_keys_leaving_one = keys.by_ref().take(KEYS_COUNT - 1).cloned().map(burn);
let burn_keys_leaving_one = keys
.by_ref()
.filter(|pub_key| pub_key != &charlie_initial_keypair.public_key())
.cloned()
.map(burn);

let mut committed_txn = submit_and_get(
&mut client,
Expand All @@ -83,11 +85,11 @@ fn public_keys_cannot_be_burned_to_nothing() {
assert_eq!(keys_count, 1);
assert!(matches!(committed_txn, TransactionValue::Transaction(_)));

let burn_the_last_key = keys.cloned().map(burn);
let burn_the_last_key = burn(charlie_initial_keypair.public_key().clone());

committed_txn = submit_and_get(
&mut client,
burn_the_last_key,
std::iter::once(burn_the_last_key),
Some((charlie_id.clone(), charlie_initial_keypair)),
);
keys_count = charlie_keys_count(&mut client);
Expand Down
112 changes: 69 additions & 43 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use eyre::{Report, Result};
use iroha_config::queue::Configuration;
use iroha_crypto::HashOf;
use iroha_data_model::transaction::prelude::*;
use iroha_logger::{debug, info, trace, warn};
use iroha_primitives::{must_use::MustUse, riffle_iter::RiffleIter};
use rand::seq::IteratorRandom;
use thiserror::Error;
Expand Down Expand Up @@ -162,50 +163,67 @@ impl Queue {
tx: VersionedAcceptedTransaction,
wsv: &WorldStateView,
) -> Result<(), Failure> {
match self.check_tx(&tx, wsv) {
Err(err) => Err(Failure { tx, err }),
Ok(MustUse(signature_check)) => {
// Get `txs_len` before entry to avoid deadlock
let txs_len = self.txs.len();
let hash = tx.hash();
let entry = match self.txs.entry(hash) {
Entry::Occupied(mut old_tx) => {
// MST case
old_tx
.get_mut()
.as_mut_v1()
.signatures
.extend(tx.as_v1().signatures.clone());
return Ok(());
}
Entry::Vacant(entry) => entry,
};
if txs_len >= self.max_txs {
return Err(Failure {
tx,
err: Error::Full,
});
}
trace!(?tx, "Pushing to the queue");
let signature_check_succeed = match self.check_tx(&tx, wsv) {
Err(err) => {
warn!("Failed to evaluate signature check");
return Err(Failure { tx, err });
}
Ok(MustUse(signature_check)) => signature_check,
};

// Insert entry first so that the `tx` popped from `queue` will always have a `(hash, tx)` record in `txs`.
entry.insert(tx);
let queue_to_push = if signature_check {
&self.queue
} else {
&self.signature_buffer
};
queue_to_push.push(hash).map_err(|err_hash| {
let (_, err_tx) = self
.txs
.remove(&err_hash)
.expect("Inserted just before match");
Failure {
tx: err_tx,
err: Error::Full,
}
})
// Get `txs_len` before entry to avoid deadlock
let txs_len = self.txs.len();
let hash = tx.hash();
let entry = match self.txs.entry(hash) {
Entry::Occupied(mut old_tx) => {
// MST case
old_tx
.get_mut()
.as_mut_v1()
.signatures
.extend(tx.as_v1().signatures.clone());
info!("Signature added to existing multisignature transaction");
return Ok(());
}
Entry::Vacant(entry) => entry,
};
if txs_len >= self.max_txs {
warn!(
max = self.max_txs,
"Achieved maximum amount of transactions"
);
return Err(Failure {
tx,
err: Error::Full,
});
}

// Insert entry first so that the `tx` popped from `queue` will always have a `(hash, tx)` record in `txs`.
entry.insert(tx);
let queue_to_push = if signature_check_succeed {
&self.queue
} else {
info!("New multisignature transaction detected");
&self.signature_buffer
};
let res = queue_to_push.push(hash).map_err(|err_hash| {
warn!("Concrete sub-queue to push is full");
let (_, err_tx) = self
.txs
.remove(&err_hash)
.expect("Inserted just before match");
Failure {
tx: err_tx,
err: Error::Full,
}
});
trace!(
"Transaction queue length = {}, multisig transaction queue length = {}",
self.queue.len(),
self.signature_buffer.len()
);
res
}

/// Pop single transaction from the signature buffer. Record all visited and not removed transactions in `seen`.
Expand Down Expand Up @@ -240,21 +258,29 @@ impl Queue {
expired_transactions: &mut Vec<VersionedAcceptedTransaction>,
) -> Option<VersionedAcceptedTransaction> {
loop {
let hash = queue.pop()?;
let Some(hash) = queue.pop() else {
trace!("Queue is empty");
return None;
};
let entry = match self.txs.entry(hash) {
Entry::Occupied(entry) => entry,
// FIXME: Reachable under high load. Investigate, see if it's a problem.
// As practice shows this code is not `unreachable!()`.
// When transactions are submitted quickly it can be reached.
Entry::Vacant(_) => continue,
Entry::Vacant(_) => {
warn!("Looks like we're experiencing a high load");
continue;
}
};

let tx = entry.get();
if tx.is_in_blockchain(wsv) {
debug!("Transaction is already in blockchain");
entry.remove_entry();
continue;
}
if tx.is_expired(self.tx_time_to_live) {
debug!("Transaction is expired");
let (_, tx) = entry.remove_entry();
expired_transactions.push(tx);
continue;
Expand Down
15 changes: 11 additions & 4 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,7 @@ fn update_state(state: &mut State, sumeragi: &Sumeragi, committed_block: &Versio
}

fn cache_transaction(state: &mut State, sumeragi: &Sumeragi) {
let transaction_cache = &mut state.transaction_cache;
transaction_cache.retain(|tx| {
state.transaction_cache.retain(|tx| {
!tx.is_in_blockchain(&state.wsv) && !tx.is_expired(sumeragi.queue.tx_time_to_live)
});
}
Expand Down Expand Up @@ -704,8 +703,9 @@ fn process_message_independent(
if voting_block.is_none() {
let cache_full = state.transaction_cache.len() >= sumeragi.queue.txs_in_block;
let deadline_reached = round_start_time.elapsed() > sumeragi.block_time;
let cache_non_empty = !state.transaction_cache.is_empty();

if cache_full || (deadline_reached && !state.transaction_cache.is_empty()) {
if cache_full || (deadline_reached && cache_non_empty) {
let transactions = state.transaction_cache.clone();
info!(txns=%transactions.len(), "Creating block...");

Expand Down Expand Up @@ -913,14 +913,21 @@ pub(crate) fn run(
state
.transaction_cache
// Checking if transactions are in the blockchain is costly
.retain(|tx| !tx.is_expired(sumeragi.queue.tx_time_to_live));
.retain(|tx| {
let expired = tx.is_expired(sumeragi.queue.tx_time_to_live);
if expired {
debug!(?tx, "Transaction expired")
}
expired
});

let mut expired_transactions = Vec::new();
sumeragi.queue.get_transactions_for_block(
&state.wsv,
&mut state.transaction_cache,
&mut expired_transactions,
);
debug!("Transaction cache: {:?}", state.transaction_cache);
sumeragi.send_events(
expired_transactions
.iter()
Expand Down
2 changes: 2 additions & 0 deletions primitives/src/riffle_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
/// ```ignore
/// [(a0,a1,a2,..),(b0,b1,b2,..)] -> (a0,b0,a1,b1,a2,b2,..)
/// ```
#[derive(Clone)]
pub struct RiffleIterator<A, B> {
left_iter: A,
right_iter: B,
state: RiffleState,
}

#[derive(Clone, Copy)]
enum RiffleState {
CurrentLeft,
CurrentRight,
Expand Down

0 comments on commit f372f32

Please sign in to comment.