Skip to content

Commit

Permalink
TransactionScheduler: CLI and hookup for central-scheduler (solana-la…
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Nov 13, 2023
1 parent ae30572 commit 81a007b
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 43 deletions.
237 changes: 206 additions & 31 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@ use {
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
},
crate::{
banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats,
banking_stage::{
consume_worker::ConsumeWorker,
packet_deserializer::PacketDeserializer,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphScheduler,
scheduler_controller::SchedulerController, scheduler_error::SchedulerError,
},
},
banking_trace::BankingPacketReceiver,
tracer_packet_stats::TracerPacketStats,
validator::BlockProductionMethod,
},
crossbeam_channel::RecvTimeoutError,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::blockstore_processor::TransactionStatusSender,
solana_measure::{measure, measure_us},
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::PohRecorder,
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::timing::AtomicInterval,
solana_vote::vote_sender_types::ReplayVoteSender,
Expand Down Expand Up @@ -378,6 +387,20 @@ impl BankingStage {
prioritization_fee_cache,
)
}
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
cluster_info,
poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
),
}
}

Expand Down Expand Up @@ -405,6 +428,15 @@ impl BankingStage {
TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize);
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|id| {
Expand Down Expand Up @@ -432,48 +464,182 @@ impl BankingStage {
),
};

let mut packet_receiver =
PacketReceiver::new(id, packet_receiver, bank_forks.clone());
let poh_recorder = poh_recorder.clone();

let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let forwarder = Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
cluster_info.clone(),
connection_cache.clone(),
data_budget.clone(),
);
let consumer = Consumer::new(
committer,

Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
forwarder,
unprocessed_transaction_storage,
)
})
.collect();
Self { bank_thread_hdls }
}

#[allow(clippy::too_many_arguments)]
pub fn new_central_scheduler(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
gossip_vote_receiver: BankingPacketReceiver,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
let data_budget = Arc::new(DataBudget::default());
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

// + 1 for the central scheduler thread
let mut bank_thread_hdls = Vec::with_capacity(num_threads as usize + 1);

// Spawn legacy voting threads first: 1 gossip, 1 tpu
for (id, packet_receiver, vote_source) in [
(0, gossip_vote_receiver, VoteSource::Gossip),
(1, tpu_vote_receiver, VoteSource::Tpu),
] {
bank_thread_hdls.push(Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
cluster_info.clone(),
connection_cache.clone(),
data_budget.clone(),
),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
vote_source,
),
));
}

// Create channels for communication between scheduler and workers
let num_workers = (num_threads).saturating_sub(NUM_VOTE_PROCESSING_THREADS);
let (work_senders, work_receivers): (Vec<Sender<_>>, Vec<Receiver<_>>) =
(0..num_workers).map(|_| unbounded()).unzip();
let (finished_work_sender, finished_work_receiver) = unbounded();

// Spawn the worker threads
for (index, work_receiver) in work_receivers.into_iter().enumerate() {
let id = (index as u32).saturating_add(NUM_VOTE_PROCESSING_THREADS);
let consume_worker = ConsumeWorker::new(
work_receiver,
Consumer::new(
committer.clone(),
poh_recorder.read().unwrap().new_recorder(),
QosService::new(id),
log_messages_bytes_limit,
);
),
finished_work_sender.clone(),
poh_recorder.read().unwrap().new_leader_bank_notifier(),
);

bank_thread_hdls.push(
Builder::new()
.name(format!("solBanknStgTx{id:02}"))
.name(format!("solCoWorker{id:02}"))
.spawn(move || {
Self::process_loop(
&mut packet_receiver,
&decision_maker,
&forwarder,
&consumer,
id,
unprocessed_transaction_storage,
);
let _ = consume_worker.run();
})
.unwrap()
})
.collect();
.unwrap(),
)
}

// Spawn the central scheduler thread
bank_thread_hdls.push({
let packet_deserializer =
PacketDeserializer::new(non_vote_receiver, bank_forks.clone());
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
packet_deserializer,
bank_forks,
scheduler,
);
Builder::new()
.name("solBnkTxSched".to_string())
.spawn(move || match scheduler_controller.run() {
Ok(_) => {}
Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
Err(SchedulerError::DisconnectedSendChannel(_)) => {
warn!("Unexpected worker disconnect from scheduler")
}
})
.unwrap()
});

Self { bank_thread_hdls }
}

fn spawn_thread_local_multi_iterator_thread(
id: u32,
packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
decision_maker: DecisionMaker,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
forwarder: Forwarder,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks);
let consumer = Consumer::new(
committer,
transaction_recorder,
QosService::new(id),
log_messages_bytes_limit,
);

Builder::new()
.name(format!("solBanknStgTx{id:02}"))
.spawn(move || {
Self::process_loop(
&mut packet_receiver,
&decision_maker,
&forwarder,
&consumer,
id,
unprocessed_transaction_storage,
)
})
.unwrap()
}

#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
decision_maker: &DecisionMaker,
Expand Down Expand Up @@ -793,8 +959,7 @@ mod tests {
with_vers.into_iter().map(|(b, _)| b).collect()
}

#[test]
fn test_banking_stage_entries_only() {
fn test_banking_stage_entries_only(block_production_method: BlockProductionMethod) {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
Expand Down Expand Up @@ -829,7 +994,7 @@ mod tests {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();

let banking_stage = BankingStage::new(
BlockProductionMethod::ThreadLocalMultiIterator,
block_production_method,
&cluster_info,
&poh_recorder,
non_vote_receiver,
Expand Down Expand Up @@ -922,6 +1087,16 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_banking_stage_entries_only_thread_local_multi_iterator() {
test_banking_stage_entries_only(BlockProductionMethod::ThreadLocalMultiIterator);
}

#[test]
fn test_banking_stage_entries_only_central_scheduler() {
test_banking_stage_entries_only(BlockProductionMethod::CentralScheduler);
}

#[test]
fn test_banking_stage_entryfication() {
solana_logger::setup();
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub(super) struct PreBalanceInfo {
pub mint_decimals: HashMap<Pubkey, u8>,
}

#[derive(Clone)]
pub struct Committer {
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/decision_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl BufferedPacketsDecision {
}
}

#[derive(Clone)]
pub struct DecisionMaker {
my_pubkey: Pubkey,
poh_recorder: Arc<RwLock<PohRecorder>>,
Expand Down
18 changes: 6 additions & 12 deletions core/src/banking_stage/transaction_scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
mod batch_id_generator;
#[allow(dead_code)]
mod in_flight_tracker;
pub(crate) mod prio_graph_scheduler;
pub(crate) mod scheduler_controller;
pub(crate) mod scheduler_error;
mod thread_aware_account_locks;

mod transaction_id_generator;
mod transaction_priority_id;
#[allow(dead_code)]
mod transaction_state;
#[allow(dead_code)]
mod transaction_state_container;

mod batch_id_generator;
#[allow(dead_code)]
mod in_flight_tracker;
#[allow(dead_code)]
mod prio_graph_scheduler;
#[allow(dead_code)]
mod scheduler_controller;
mod scheduler_error;
#[allow(dead_code)]
mod transaction_id_generator;
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl BlockVerificationMethod {
pub enum BlockProductionMethod {
#[default]
ThreadLocalMultiIterator,
CentralScheduler,
}

impl BlockProductionMethod {
Expand Down

0 comments on commit 81a007b

Please sign in to comment.