Skip to content

Commit

Permalink
Add --unified-scheduler-handler-threads (#35195)
Browse files Browse the repository at this point in the history
* Add --unified-scheduler-handler-threads

* Adjust value name

* Warn if the flag was ignored

* Tweak message a bit
  • Loading branch information
ryoqun authored Feb 22, 2024
1 parent 537c3d8 commit 024d6ec
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ pub struct ValidatorConfig {
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -329,6 +330,7 @@ impl Default for ValidatorConfig {
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
}
}
}
Expand Down Expand Up @@ -813,9 +815,16 @@ impl Validator {
match &config.block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
if let Some(count) = config.unified_scheduler_handler_threads {
warn!(
"--unified-scheduler-handler-threads={count} is ignored because unified \
scheduler isn't enabled"
);
}
}
BlockVerificationMethod::UnifiedScheduler => {
let scheduler_pool = DefaultSchedulerPool::new_dyn(
config.unified_scheduler_handler_threads,
config.runtime_config.log_messages_bytes_limit,
transaction_status_sender.clone(),
Some(replay_vote_sender.clone()),
Expand Down
9 changes: 9 additions & 0 deletions ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,17 @@ pub fn load_and_process_ledger(
"Using: block-verification-method: {}",
block_verification_method,
);
let unified_scheduler_handler_threads =
value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok();
match block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
if let Some(count) = unified_scheduler_handler_threads {
warn!(
"--unified-scheduler-handler-threads={count} is ignored because unified \
scheduler isn't enabled"
);
}
}
BlockVerificationMethod::UnifiedScheduler => {
let no_transaction_status_sender = None;
Expand All @@ -303,6 +311,7 @@ pub fn load_and_process_ledger(
.write()
.unwrap()
.install_scheduler_pool(DefaultSchedulerPool::new_dyn(
unified_scheduler_handler_threads,
process_options.runtime_config.log_messages_bytes_limit,
no_transaction_status_sender,
no_replay_vote_sender,
Expand Down
13 changes: 12 additions & 1 deletion ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
input_parsers::{cluster_type_of, pubkey_of, pubkeys_of},
input_validators::{
is_parsable, is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage,
validate_maximum_full_snapshot_archives_to_retain,
is_within_range, validate_maximum_full_snapshot_archives_to_retain,
validate_maximum_incremental_snapshot_archives_to_retain,
},
},
Expand Down Expand Up @@ -72,6 +72,7 @@ use {
transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader},
},
solana_stake_program::stake_state::{self, PointValue},
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::{
self,
vote_state::{self, VoteState},
Expand Down Expand Up @@ -852,6 +853,16 @@ fn main() {
.hidden(hidden_unless_forced())
.help(BlockVerificationMethod::cli_message()),
)
.arg(
Arg::with_name("unified_scheduler_handler_threads")
.long("unified-scheduler-handler-threads")
.value_name("COUNT")
.takes_value(true)
.validator(|s| is_within_range(s, 1..))
.global(true)
.hidden(hidden_unless_forced())
.help(DefaultSchedulerPool::cli_message()),
)
.arg(
Arg::with_name("output_format")
.long("output")
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
generator_config: config.generator_config.clone(),
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
}
}

Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 74 additions & 20 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc, Mutex, Weak,
Arc, Mutex, OnceLock, Weak,
},
thread::{self, JoinHandle},
},
Expand All @@ -48,6 +48,7 @@ type AtomicSchedulerId = AtomicU64;
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
handler_count: usize,
handler_context: HandlerContext,
// weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to
// Arc<Self> from &Self, because SchedulerPool is used as in the form of Arc<SchedulerPool>
Expand Down Expand Up @@ -83,13 +84,20 @@ where
// Some internal impl and test code want an actual concrete type, NOT the
// `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`.
fn new(
handler_count: Option<usize>,
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(1);
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later

Arc::new_cyclic(|weak_self| Self {
scheduler_inners: Mutex::default(),
handler_count,
handler_context: HandlerContext {
log_messages_bytes_limit,
transaction_status_sender,
Expand All @@ -105,12 +113,14 @@ where
// This apparently-meaningless wrapper is handy, because some callers explicitly want
// `dyn InstalledSchedulerPool` to be returned for type inference convenience.
pub fn new_dyn(
handler_count: Option<usize>,
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> InstalledSchedulerPoolArc {
Self::new(
handler_count,
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
Expand Down Expand Up @@ -145,6 +155,37 @@ where
S::spawn(self.self_arc(), context)
}
}

pub fn default_handler_count() -> usize {
Self::calculate_default_handler_count(
thread::available_parallelism()
.ok()
.map(|non_zero| non_zero.get()),
)
}

pub fn calculate_default_handler_count(detected_cpu_core_count: Option<usize>) -> usize {
// Divide by 4 just not to consume all available CPUs just with handler threads, sparing for
// other active forks and other subsystems.
// Also, if available_parallelism fails (which should be very rare), use 4 threads,
// as a relatively conservatism assumption of modern multi-core systems ranging from
// engineers' laptops to production servers.
detected_cpu_core_count
.map(|core_count| (core_count / 4).max(1))
.unwrap_or(4)
}

pub fn cli_message() -> &'static str {
static MESSAGE: OnceLock<String> = OnceLock::new();

MESSAGE.get_or_init(|| {
format!(
"Change the number of the unified scheduler's transaction execution threads \
dedicated to each block, otherwise calculated as cpu_cores/4 [default: {}]",
Self::default_handler_count()
)
})
}
}

impl<S, TH> InstalledSchedulerPool for SchedulerPool<S, TH>
Expand Down Expand Up @@ -372,7 +413,6 @@ pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_id: SchedulerId,
pool: Arc<SchedulerPool<S, TH>>,
handler_count: usize,
new_task_sender: Sender<NewTaskPayload>,
new_task_receiver: Receiver<NewTaskPayload>,
session_result_sender: Sender<Option<ResultWithTimings>>,
Expand All @@ -384,28 +424,24 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {

impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
let handler_count = 1;

Self::from_inner(
PooledSchedulerInner::<Self, TH> {
thread_manager: ThreadManager::new(pool, handler_count),
thread_manager: ThreadManager::new(pool),
},
initial_context,
)
}
}

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>, handler_count: usize) -> Self {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = unbounded();
let (session_result_sender, session_result_receiver) = unbounded();
let handler_count = pool.handler_count;

Self {
scheduler_id: pool.new_scheduler_id(),
pool,
handler_count,
new_task_sender,
new_task_receiver,
session_result_sender,
Expand Down Expand Up @@ -477,7 +513,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
let scheduler_main_loop = || {
let handler_count = self.handler_count;
let handler_count = self.pool.handler_count;
let session_result_sender = self.session_result_sender.clone();
let new_task_receiver = self.new_task_receiver.clone();

Expand Down Expand Up @@ -613,7 +649,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.unwrap(),
);

self.handler_threads = (0..self.handler_count)
self.handler_threads = (0..self.pool.handler_count)
.map({
|thx| {
thread::Builder::new()
Expand Down Expand Up @@ -760,7 +796,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);

// this indirectly proves that there should be circular link because there's only one Arc
// at this moment now
Expand All @@ -775,7 +811,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = SchedulingContext::new(bank);
let scheduler = pool.take_scheduler(context);
Expand All @@ -789,7 +825,8 @@ mod tests {
solana_logger::setup();

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);

Expand Down Expand Up @@ -817,7 +854,8 @@ mod tests {
solana_logger::setup();

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);
let mut scheduler = pool.do_take_scheduler(context.clone());
Expand All @@ -835,7 +873,8 @@ mod tests {
solana_logger::setup();

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let old_bank = &Arc::new(Bank::default_for_tests());
let new_bank = &Arc::new(Bank::default_for_tests());
assert!(!Arc::ptr_eq(old_bank, new_bank));
Expand All @@ -861,7 +900,7 @@ mod tests {
let mut bank_forks = bank_forks.write().unwrap();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
bank_forks.install_scheduler_pool(pool);
}

Expand All @@ -875,7 +914,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);

let bank = Bank::default_for_tests();
let bank_forks = BankForks::new_rw_arc(bank);
Expand Down Expand Up @@ -928,7 +967,7 @@ mod tests {
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());

assert_eq!(bank.transaction_count(), 0);
Expand All @@ -953,7 +992,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());
let mut scheduler = pool.take_scheduler(context);

Expand Down Expand Up @@ -1159,6 +1198,7 @@ mod tests {
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let scheduler = pool.take_scheduler(context);
Expand Down Expand Up @@ -1193,4 +1233,18 @@ mod tests {
fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() {
do_test_scheduler_schedule_execution_recent_blockhash_edge_case::<false>();
}

#[test]
fn test_default_handler_count() {
for (detected, expected) in [(32, 8), (4, 1), (2, 1)] {
assert_eq!(
DefaultSchedulerPool::calculate_default_handler_count(Some(detected)),
expected
);
}
assert_eq!(
DefaultSchedulerPool::calculate_default_handler_count(None),
4
);
}
}
1 change: 1 addition & 0 deletions validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ solana-streamer = { workspace = true }
solana-svm = { workspace = true }
solana-test-validator = { workspace = true }
solana-tpu-client = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }
solana-vote-program = { workspace = true }
symlink = { workspace = true }
Expand Down
Loading

0 comments on commit 024d6ec

Please sign in to comment.