Skip to content

Commit

Permalink
[1.16] Revert proof queue changes (#13878), (#13703) (#13902)
Browse files Browse the repository at this point in the history
* Revert "Change proof queue data structure (#13878)"

This reverts commit daea1c7.

* Revert "Proof queue with more accurate calculation of remaining txns in the pipeline (#13703)"

This reverts commit 387c649.
  • Loading branch information
vusirikala authored Jul 2, 2024
1 parent d8ac339 commit 7680255
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 391 deletions.
2 changes: 1 addition & 1 deletion consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl BlockStore {

counters::CONSENSUS_PROPOSAL_PENDING_ROUNDS.observe(pending_rounds as f64);
counters::CONSENSUS_PROPOSAL_PENDING_DURATION
.observe_duration(oldest_not_committed_spent_in_pipeline);
.observe(oldest_not_committed_spent_in_pipeline.as_secs_f64());

if pending_rounds > 1 {
// TODO cleanup
Expand Down
23 changes: 4 additions & 19 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,10 @@ pub static CONSENSUS_PROPOSAL_PENDING_ROUNDS: Lazy<Histogram> = Lazy::new(|| {
});

/// duration pending when creating proposal
pub static CONSENSUS_PROPOSAL_PENDING_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
register_histogram!(
"aptos_consensus_proposal_pending_duration",
"duration pending when creating proposal",
)
.unwrap(),
pub static CONSENSUS_PROPOSAL_PENDING_DURATION: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"aptos_consensus_proposal_pending_duration",
"duration pending when creating proposal",
)
});

Expand Down Expand Up @@ -748,16 +745,6 @@ pub static NUM_TXNS_PER_BLOCK: Lazy<Histogram> = Lazy::new(|| {
.unwrap()
});

/// Histogram for the number of bytes in the committed blocks.
pub static NUM_BYTES_PER_BLOCK: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_consensus_num_bytes_per_block",
"Histogram for the number of bytes per (committed) blocks.",
exponential_buckets(/*start=*/ 500.0, /*factor=*/ 1.4, /*count=*/ 32).unwrap()
)
.unwrap()
});

// Histogram buckets that expand DEFAULT_BUCKETS with more granularity:
// * 0.3 to 2.0: step 0.1
// * 2.0 to 4.0: step 0.2
Expand Down Expand Up @@ -1059,8 +1046,6 @@ pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<PipelinedBlo
observe_block(block.block().timestamp_usecs(), BlockStage::COMMITTED);
let txn_status = block.compute_result().compute_status_for_input_txns();
NUM_TXNS_PER_BLOCK.observe(txn_status.len() as f64);
NUM_BYTES_PER_BLOCK
.observe(block.block().payload().map_or(0, |payload| payload.size()) as f64);
COMMITTED_BLOCKS_COUNT.inc();
LAST_COMMITTED_ROUND.set(block.round() as i64);
LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64);
Expand Down
7 changes: 1 addition & 6 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,7 @@ impl BatchCoordinator {
let peer_id = persist_requests[0].author();
let batches = persist_requests
.iter()
.map(|persisted_value| {
(
persisted_value.batch_info().clone(),
persisted_value.summary(),
)
})
.map(|persisted_value| persisted_value.batch_info().clone())
.collect();
let signed_batch_infos = batch_store.persist(persist_requests);
if !signed_batch_infos.is_empty() {
Expand Down
112 changes: 2 additions & 110 deletions consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use aptos_metrics_core::{
exponential_buckets, op_counters::DurationHistogram, register_avg_counter, register_histogram,
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
register_histogram_vec, register_int_counter, register_int_counter_vec, Histogram,
HistogramVec, IntCounter, IntCounterVec,
};
use once_cell::sync::Lazy;
use std::time::Duration;
Expand All @@ -28,14 +28,6 @@ static TRANSACTION_COUNT_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
.unwrap()
});

static PROOF_COUNT_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
[
1.0, 3.0, 5.0, 7.0, 10.0, 12.0, 15.0, 20.0, 25.0, 30.0, 40.0, 50.0, 60.0, 75.0, 100.0,
125.0, 150.0, 200.0, 250.0, 300.0, 500.0,
]
.to_vec()
});

static BYTE_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
exponential_buckets(
/*start=*/ 500.0, /*factor=*/ 1.5, /*count=*/ 25,
Expand Down Expand Up @@ -90,46 +82,6 @@ pub static PROOF_MANAGER_MAIN_LOOP: Lazy<DurationHistogram> = Lazy::new(|| {
)
});

pub static PROOF_QUEUE_ADD_BATCH_SUMMARIES_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
register_histogram!(
"quorum_store_proof_queue_add_batch_summaries_duration",
"Duration of adding batch summaries to proof queue"
)
.unwrap(),
)
});

pub static PROOF_QUEUE_COMMIT_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
register_histogram!(
"quorum_store_proof_queue_commit_duration",
"Duration of committing proofs from proof queue"
)
.unwrap(),
)
});

pub static PROOF_QUEUE_UPDATE_TIMESTAMP_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
register_histogram!(
"quorum_store_proof_queue_update_block_timestamp_duration",
"Duration of updating block timestamp in proof queue"
)
.unwrap(),
)
});

pub static PROOF_QUEUE_REMAINING_TXNS_DURATION: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
register_histogram!(
"quorum_store_proof_queue_remaining_txns_duration",
"Duration of calculating remaining txns in proof queue"
)
.unwrap(),
)
});

/// Duration of each run of the event loop.
pub static BATCH_GENERATOR_MAIN_LOOP: Lazy<DurationHistogram> = Lazy::new(|| {
DurationHistogram::new(
Expand Down Expand Up @@ -352,58 +304,6 @@ pub fn pos_to_commit(bucket: u64, secs: f64) {
.observe(secs);
}

//////////////////////
// Proof Queue
//////////////////////

pub static PROOFS_WITHOUT_BATCH_DATA: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"quorum_store_proofs_without_batch_data",
"Number of proofs received without batch data"
)
.unwrap()
});

pub static TXNS_WITH_DUPLICATE_BATCHES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"quorum_store_txns_with_duplicate_batches",
"Number of transactions received with duplicate batches"
)
.unwrap()
});

pub static TXNS_IN_PROOF_QUEUE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"quorum_store_txns_in_proof_queue",
"Number of transactions in the proof queue"
)
.unwrap()
});

pub static PROOFS_IN_PROOF_QUEUE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"quorum_store_proofs_in_proof_queue",
"Number of proofs in the proof queue"
)
.unwrap()
});

pub static NUM_PROOFS_IN_PROOF_QUEUE_AFTER_PULL: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"quorum_store_num_proofs_left_in_proof_queue_after_pull",
"Histogram for the number of proofs left in the proof queue after block proposal generation.",
PROOF_COUNT_BUCKETS.clone(),
).unwrap()
});

pub static NUM_TXNS_IN_PROOF_QUEUE_AFTER_PULL: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"quorum_store_num_txns_left_in_proof_queue_after_pull",
"Histogram for the number of transactions left in the proof queue after block proposal generation.",
TRANSACTION_COUNT_BUCKETS.clone(),
).unwrap()
});

/// Histogram for the number of total txns left after adding or cleaning batches.
pub static NUM_TOTAL_TXNS_LEFT_ON_UPDATE: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
Expand All @@ -412,14 +312,6 @@ pub static NUM_TOTAL_TXNS_LEFT_ON_UPDATE: Lazy<Histogram> = Lazy::new(|| {
)
});

pub static NUM_UNIQUE_TOTAL_TXNS_LEFT_ON_UPDATE: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"quorum_store_num_unique_total_txns_left_on_update",
"Histogram for the number of total txns left after adding or cleaning batches, without duplicates.",
TRANSACTION_COUNT_BUCKETS.clone()
).unwrap()
});

/// Histogram for the number of total batches/PoS left after adding or cleaning batches.
pub static NUM_TOTAL_PROOFS_LEFT_ON_UPDATE: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
Expand Down
15 changes: 3 additions & 12 deletions consensus/src/quorum_store/proof_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
},
};
use aptos_consensus_types::{
common::{Payload, PayloadFilter, ProofWithData, TransactionSummary},
common::{Payload, PayloadFilter, ProofWithData},
proof_of_store::{BatchInfo, ProofOfStore, ProofOfStoreMsg},
request_response::{GetPayloadCommand, GetPayloadResponse},
};
Expand All @@ -29,7 +29,7 @@ use std::{
#[derive(Debug)]
pub enum ProofManagerCommand {
ReceiveProofs(ProofOfStoreMsg),
ReceiveBatches(Vec<(BatchInfo, Vec<TransactionSummary>)>),
ReceiveBatches(Vec<BatchInfo>),
CommitNotification(u64, Vec<BatchInfo>),
Shutdown(tokio::sync::oneshot::Sender<()>),
}
Expand Down Expand Up @@ -166,19 +166,10 @@ impl ProofManager {
self.proofs_for_consensus.remaining_txns_and_proofs();
}

pub(crate) fn receive_batches(
&mut self,
batch_summaries: Vec<(BatchInfo, Vec<TransactionSummary>)>,
) {
pub(crate) fn receive_batches(&mut self, batches: Vec<BatchInfo>) {
if self.allow_batches_without_pos_in_proposal {
let batches = batch_summaries
.iter()
.map(|(batch_info, _)| batch_info.clone())
.collect();
self.batch_queue.add_batches(batches);
}
self.proofs_for_consensus
.add_batch_summaries(batch_summaries);
}

pub(crate) fn handle_commit_notification(
Expand Down
68 changes: 1 addition & 67 deletions consensus/src/quorum_store/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::quorum_store::utils::ProofQueue;
use aptos_consensus_types::{
common::TransactionSummary,
proof_of_store::{BatchId, BatchInfo, ProofOfStore},
};
use aptos_consensus_types::proof_of_store::{BatchId, BatchInfo, ProofOfStore};
use aptos_crypto::HashValue;
use aptos_types::{aggregate_signature::AggregateSignature, PeerId};
use maplit::hashset;
Expand Down Expand Up @@ -96,66 +93,3 @@ fn test_proof_queue_sorting() {
assert_eq!(count_author_0, 2);
assert_eq!(count_author_1, 2);
}

#[test]
fn test_proof_calculate_remaining_txns_and_proofs() {
let my_peer_id = PeerId::random();
let mut proof_queue = ProofQueue::new(my_peer_id);

let author_0 = PeerId::random();
let author_1 = PeerId::random();

let author_0_batches = vec![
proof_of_store(author_0, BatchId::new_for_test(0), 100),
proof_of_store(author_0, BatchId::new_for_test(1), 200),
proof_of_store(author_0, BatchId::new_for_test(2), 50),
proof_of_store(author_0, BatchId::new_for_test(3), 300),
];
let info_1 = author_0_batches[0].info().clone();
let info_2 = author_0_batches[3].info().clone();
proof_queue.add_batch_summaries(vec![(info_1, vec![TransactionSummary::new(
PeerId::ONE,
1,
HashValue::zero(),
)])]);
for batch in author_0_batches {
proof_queue.push(batch);
}

let author_1_batches = vec![
proof_of_store(author_1, BatchId::new_for_test(4), 500),
proof_of_store(author_1, BatchId::new_for_test(5), 400),
proof_of_store(author_1, BatchId::new_for_test(6), 600),
proof_of_store(author_1, BatchId::new_for_test(7), 50),
];
let info_3 = author_1_batches[1].info().clone();
let info_4 = author_1_batches[3].info().clone();
for batch in author_1_batches {
proof_queue.push(batch);
}
assert_eq!(proof_queue.remaining_txns_and_proofs(), (8, 8));

proof_queue.add_batch_summaries(vec![(info_3, vec![TransactionSummary::new(
PeerId::ONE,
1,
HashValue::zero(),
)])]);

assert_eq!(proof_queue.remaining_txns_and_proofs(), (7, 8));

proof_queue.add_batch_summaries(vec![(info_2, vec![TransactionSummary::new(
PeerId::ONE,
2,
HashValue::zero(),
)])]);

assert_eq!(proof_queue.remaining_txns_and_proofs(), (7, 8));

proof_queue.add_batch_summaries(vec![(info_4, vec![TransactionSummary::new(
PeerId::ONE,
2,
HashValue::zero(),
)])]);

assert_eq!(proof_queue.remaining_txns_and_proofs(), (6, 8));
}
18 changes: 1 addition & 17 deletions consensus/src/quorum_store/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use anyhow::ensure;
use aptos_consensus_types::{
common::{BatchPayload, TransactionSummary},
common::BatchPayload,
proof_of_store::{BatchId, BatchInfo},
};
use aptos_crypto::{hash::CryptoHash, HashValue};
Expand Down Expand Up @@ -57,22 +57,6 @@ impl PersistedValue {
pub fn payload(&self) -> &Option<Vec<SignedTransaction>> {
&self.maybe_payload
}

pub fn summary(&self) -> Vec<TransactionSummary> {
if let Some(payload) = &self.maybe_payload {
return payload
.iter()
.map(|txn| {
TransactionSummary::new(
txn.sender(),
txn.sequence_number(),
txn.committed_hash(),
)
})
.collect();
}
vec![]
}
}

impl Deref for PersistedValue {
Expand Down
Loading

0 comments on commit 7680255

Please sign in to comment.