Skip to content

Commit

Permalink
Bench for justification of the accumulator thread
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jan 13, 2024
1 parent 5050cc4 commit 6120684
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ assert_matches = { workspace = true }
crossbeam-channel = { workspace = true }
derivative = { workspace = true }
log = { workspace = true }
qualifier_attr = { workspace = true }
solana-ledger = { workspace = true }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }
Expand All @@ -25,3 +26,11 @@ solana-vote = { workspace = true }
assert_matches = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
# See order-crates-for-publishing.py for using this unusual `path = "."`
solana-unified-scheduler-pool = { path = ".", features = ["dev-context-only-utils"] }

[features]
dev-context-only-utils = []
bench-drop-in-accumulator = []
bench-drop-in-scheduler = []
bench-conflicting-execution = []
120 changes: 120 additions & 0 deletions unified-scheduler-pool/benches/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#![feature(test)]

extern crate test;

use {
solana_program_runtime::timings::ExecuteTimings,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
installed_scheduler_pool::{InstalledScheduler, SchedulingContext},
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{
system_transaction,
transaction::{Result, SanitizedTransaction},
},
solana_unified_scheduler_pool::{HandlerContext, PooledScheduler, SchedulerPool, TaskHandler},
std::sync::Arc,
test::Bencher,
};

#[derive(Debug)]
struct DummyTaskHandler;

impl TaskHandler for DummyTaskHandler {
fn handle(
_result: &mut Result<()>,
_timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &SanitizedTransaction,
_index: usize,
_handler_context: &HandlerContext,
) {
}
}

fn setup_dummy_fork_graph(bank: Bank) -> Arc<Bank> {
let slot = bank.slot();
let bank_fork = BankForks::new_rw_arc(bank);
let bank = bank_fork.read().unwrap().get(slot).unwrap();
bank.loaded_programs_cache
.write()
.unwrap()
.set_fork_graph(bank_fork);
bank
}

fn do_bench_tx_throughput(bencher: &mut Bencher) {
solana_logger::setup();

let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));
let bank = Bank::new_for_tests(&genesis_config);
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = SchedulerPool::<PooledScheduler<DummyTaskHandler>, _>::new(
None,
None,
None,
ignored_prioritization_fee_cache,
);
let context = SchedulingContext::new(bank.clone());

assert_eq!(bank.transaction_count(), 0);
let mut scheduler = pool.do_take_scheduler(context);
bencher.iter(|| {
for _ in 0..10_000 {
scheduler.schedule_execution(&(tx0, 0));
}
scheduler.pause_for_recent_blockhash();
scheduler.clear_session_result_with_timings();
scheduler.restart_session();
});
}

#[cfg(all(
feature = "bench-drop-in-accumulator",
not(feature = "bench-conflicting-execution")
))]
#[bench]
fn bench_tx_throughput_drop_in_accumulator(bencher: &mut Bencher) {
do_bench_tx_throughput(bencher)
}

#[cfg(all(
feature = "bench-drop-in-scheduler",
not(feature = "bench-conflicting-execution")
))]
#[bench]
fn bench_tx_throughput_drop_in_scheduler(bencher: &mut Bencher) {
do_bench_tx_throughput(bencher)
}

#[cfg(all(
feature = "bench-drop-in-accumulator",
feature = "bench-conflicting-execution"
))]
#[bench]
fn bench_tx_throughput_drop_in_accumulator_conflicting(bencher: &mut Bencher) {
do_bench_tx_throughput(bencher)
}

#[cfg(all(
feature = "bench-drop-in-scheduler",
feature = "bench-conflicting-execution"
))]
#[bench]
fn bench_tx_throughput_drop_in_scheduler_conflicting(bencher: &mut Bencher) {
do_bench_tx_throughput(bencher)
}
62 changes: 55 additions & 7 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
crossbeam_channel::{select, unbounded, Receiver, SendError, Sender},
derivative::Derivative,
log::*,
qualifier_attr::qualifiers,
solana_ledger::blockstore_processor::{
execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
},
Expand All @@ -30,6 +31,7 @@ use {
solana_unified_scheduler_logic::Task,
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
collections::VecDeque,
fmt::Debug,
marker::PhantomData,
sync::{
Expand Down Expand Up @@ -82,6 +84,7 @@ where
{
// Some internal impl and test code want an actual concrete type, NOT the
// `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`.
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
fn new(
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
Expand Down Expand Up @@ -136,6 +139,7 @@ where
.push(scheduler);
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
fn do_take_scheduler(&self, context: SchedulingContext) -> S {
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
Expand Down Expand Up @@ -429,6 +433,19 @@ impl<TH: TaskHandler> PooledScheduler<TH> {
initial_context,
)
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
fn clear_session_result_with_timings(&mut self) {
assert_matches!(
self.inner.thread_manager.take_session_result_with_timings(),
(Ok(_), _)
);
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
fn restart_session(&mut self) {
self.inner.thread_manager.start_session(&self.context);
}
}

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
Expand Down Expand Up @@ -547,28 +564,59 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// by design or by means of offloading at the last resort.
move || loop {
let mut is_finished = false;

#[cfg(not(feature = "bench-conflicting-execution"))]
let is_conflicting = false;
#[cfg(feature = "bench-conflicting-execution")]
let is_conflicting = true;
let mut tasks = VecDeque::with_capacity(10000);

while !is_finished {
select! {
recv(finished_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();

active_task_count = active_task_count.checked_sub(1).unwrap();
executed_task_sender
.send(ExecutedTaskPayload::Payload(executed_task))
.unwrap();

if is_conflicting {
if let Some(task) = tasks.pop_front() {
runnable_task_sender
.send_payload(task)
.unwrap();
}
}

#[cfg(feature = "bench-drop-in-accumulator")]
{
executed_task_sender
.send(ExecutedTaskPayload::Payload(executed_task))
.unwrap();
}
#[cfg(feature = "bench-drop-in-scheduler")]
{
assert_matches!(executed_task.result_with_timings, (Ok(_), _));
drop(executed_task);
}
},
recv(new_task_receiver) -> message => {
match message.unwrap() {
let Ok(message) = message else {
break;
};
match message {
NewTaskPayload::Payload(task) => {
assert!(!session_ending);

// so, we're NOT scheduling at all here; rather, just execute
// tx straight off. the inter-tx locking deps aren't needed to
// be resolved in the case of single-threaded FIFO like this.
if !is_conflicting || active_task_count == 0 {
runnable_task_sender
.send_payload(task)
.unwrap();
} else {
tasks.push_back(task);
}
active_task_count = active_task_count.checked_add(1).unwrap();
runnable_task_sender
.send_payload(task)
.unwrap();
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to both handler and
Expand Down

0 comments on commit 6120684

Please sign in to comment.