Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify proof queue data structure [Cherrypicked from (#13866)] #13878

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 47 additions & 50 deletions consensus/src/quorum_store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ pub struct ProofQueue {
author_to_batches: HashMap<PeerId, BTreeMap<BatchSortKey, BatchInfo>>,
// ProofOfStore and insertion_time. None if committed
batch_to_proof: HashMap<BatchKey, Option<(ProofOfStore, Instant)>>,
// Map of txn_summary = (sender, sequence number, hash) to all the batches that contain
// the transaction. This helps in counting the number of unique transactions in the pipeline.
txn_summary_to_batches: HashMap<TransactionSummary, HashSet<BatchKey>>,
// List of batches for which we received txn summaries from the batch coordinator
batches_with_txn_summary: HashSet<BatchKey>,
// Number of batches in which the txn_summary = (sender, sequence number, hash) has been included
txn_summary_num_occurrences: HashMap<TransactionSummary, u64>,
// List of transaction summaries for each batch
batch_to_txn_summaries: HashMap<BatchKey, Vec<TransactionSummary>>,
// Expiration index
expirations: TimeExpirations<BatchSortKey>,
latest_block_timestamp: u64,
Expand All @@ -218,8 +217,8 @@ impl ProofQueue {
my_peer_id,
author_to_batches: HashMap::new(),
batch_to_proof: HashMap::new(),
txn_summary_to_batches: HashMap::new(),
batches_with_txn_summary: HashSet::new(),
txn_summary_num_occurrences: HashMap::new(),
batch_to_txn_summaries: HashMap::new(),
expirations: TimeExpirations::new(),
latest_block_timestamp: 0,
remaining_txns_with_duplicates: 0,
Expand Down Expand Up @@ -250,30 +249,15 @@ impl ProofQueue {
}

fn remaining_txns_without_duplicates(&self) -> u64 {
// All the batch keys for which batch_to_proof is not None. This is the set of unexpired and uncommitted proofs.
let unexpired_batch_keys = self
.batch_to_proof
.iter()
.filter(|(_, proof)| proof.is_some())
.map(|(batch_key, _)| batch_key)
.collect::<HashSet<_>>();
let mut remaining_txns = self
.txn_summary_to_batches
.iter()
.filter(|(_, batches)| {
batches
.iter()
.any(|batch_key| unexpired_batch_keys.contains(batch_key))
})
.count() as u64;
let mut remaining_txns = self.txn_summary_num_occurrences.len() as u64;

// If a batch_key is not in batches_with_txn_summary, it means we've received the proof but haven't receive the
// transaction summary of the batch from batch coordinator. Add the number of txns in the batch to remaining_txns.
remaining_txns += self
.batch_to_proof
.iter()
.filter_map(|(batch_key, proof)| {
if proof.is_some() && !self.batches_with_txn_summary.contains(batch_key) {
if proof.is_some() && !self.batch_to_txn_summaries.contains_key(batch_key) {
Some(proof.as_ref().unwrap().0.num_txns())
} else {
None
Expand Down Expand Up @@ -322,13 +306,19 @@ impl ProofQueue {
let start = Instant::now();
for (batch_info, txn_summaries) in batch_summaries {
let batch_key = BatchKey::from_info(&batch_info);
for txn_summary in txn_summaries {
self.txn_summary_to_batches
.entry(txn_summary)
.or_default()
.insert(batch_key.clone());
if self
.batch_to_txn_summaries
.insert(batch_key, txn_summaries.clone())
.is_none()
{
for txn_summary in txn_summaries {
if let Some(count) = self.txn_summary_num_occurrences.get_mut(&txn_summary) {
*count += 1;
} else {
self.txn_summary_num_occurrences.insert(txn_summary, 1);
}
}
}
self.batches_with_txn_summary.insert(batch_key);
}
counters::PROOF_QUEUE_ADD_BATCH_SUMMARIES_DURATION.observe_duration(start.elapsed());
}
Expand Down Expand Up @@ -472,11 +462,17 @@ impl ProofQueue {
num_expired_but_not_committed += 1;
counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_COMMIT
.observe((block_timestamp - batch.expiration()) as f64);
self.txn_summary_to_batches.retain(|_, batches| {
batches.remove(&key.batch_key);
!batches.is_empty()
});
self.batches_with_txn_summary.remove(&key.batch_key);
if let Some(txn_summaries) = self.batch_to_txn_summaries.get(&key.batch_key)
{
for txn_summary in txn_summaries {
if let Some(count) =
self.txn_summary_num_occurrences.get_mut(txn_summary)
{
*count -= 1;
};
}
}
self.batch_to_txn_summaries.remove(&key.batch_key);
self.dec_remaining(&batch.author(), batch.num_txns());
}
claims::assert_some!(self.batch_to_proof.remove(&key.batch_key));
Expand All @@ -486,6 +482,8 @@ impl ProofQueue {
}
}
}
self.txn_summary_num_occurrences
.retain(|_, count| *count > 0);
counters::PROOF_QUEUE_UPDATE_TIMESTAMP_DURATION.observe_duration(start.elapsed());
counters::NUM_PROOFS_EXPIRED_WHEN_COMMIT.inc_by(num_expired_but_not_committed);
}
Expand All @@ -501,20 +499,20 @@ impl ProofQueue {
.observe(remaining_txns_without_duplicates as f64);
//count the number of transactions with more than one batches
counters::TXNS_WITH_DUPLICATE_BATCHES.set(
self.txn_summary_to_batches
self.txn_summary_num_occurrences
.iter()
.filter(|(_, batches)| batches.len() > 1)
.filter(|(_, count)| **count > 1)
.count() as i64,
);

counters::TXNS_IN_PROOF_QUEUE.set(self.txn_summary_to_batches.len() as i64);
counters::TXNS_IN_PROOF_QUEUE.set(self.txn_summary_num_occurrences.len() as i64);

// count the number of batches with proofs but without txn summaries
counters::PROOFS_WITHOUT_BATCH_DATA.set(
self.batch_to_proof
.iter()
.map(|(batch_key, proof)| {
if proof.is_some() && !self.batches_with_txn_summary.contains(batch_key) {
if proof.is_some() && !self.batch_to_txn_summaries.contains_key(batch_key) {
1
} else {
0
Expand Down Expand Up @@ -546,18 +544,17 @@ impl ProofQueue {
self.dec_remaining(&batch.author(), batch.num_txns());
}
self.batch_to_proof.insert(batch_key.clone(), None);
self.batches_with_txn_summary.remove(&batch_key);
}
let batch_keys = batches
.iter()
.map(BatchKey::from_info)
.collect::<HashSet<_>>();
self.txn_summary_to_batches.retain(|_, batches| {
for batch_key in &batch_keys {
batches.remove(batch_key);
if let Some(txn_summaries) = self.batch_to_txn_summaries.get(&batch_key) {
for txn_summary in txn_summaries {
if let Some(count) = self.txn_summary_num_occurrences.get_mut(txn_summary) {
*count -= 1;
};
}
}
!batches.is_empty()
});
self.batch_to_txn_summaries.remove(&batch_key);
}
self.txn_summary_num_occurrences
.retain(|_, count| *count > 0);
counters::PROOF_QUEUE_COMMIT_DURATION.observe_duration(start.elapsed());
}
}
Loading