Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --unified-scheduler-handler-threads #35195

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ pub struct ValidatorConfig {
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -329,6 +330,7 @@ impl Default for ValidatorConfig {
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
}
}
}
Expand Down Expand Up @@ -813,9 +815,16 @@ impl Validator {
match &config.block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
if let Some(count) = config.unified_scheduler_handler_threads {
warn!(
"--unified-scheduler-handler-threads={count} is ignored because unified \
scheduler isn't enabled"
);
}
}
BlockVerificationMethod::UnifiedScheduler => {
let scheduler_pool = DefaultSchedulerPool::new_dyn(
config.unified_scheduler_handler_threads,
config.runtime_config.log_messages_bytes_limit,
transaction_status_sender.clone(),
Some(replay_vote_sender.clone()),
Expand Down
9 changes: 9 additions & 0 deletions ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,17 @@ pub fn load_and_process_ledger(
"Using: block-verification-method: {}",
block_verification_method,
);
let unified_scheduler_handler_threads =
value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok();
match block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
if let Some(count) = unified_scheduler_handler_threads {
warn!(
"--unified-scheduler-handler-threads={count} is ignored because unified \
scheduler isn't enabled"
);
}
}
BlockVerificationMethod::UnifiedScheduler => {
let no_transaction_status_sender = None;
Expand All @@ -303,6 +311,7 @@ pub fn load_and_process_ledger(
.write()
.unwrap()
.install_scheduler_pool(DefaultSchedulerPool::new_dyn(
unified_scheduler_handler_threads,
process_options.runtime_config.log_messages_bytes_limit,
no_transaction_status_sender,
no_replay_vote_sender,
Expand Down
13 changes: 12 additions & 1 deletion ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
input_parsers::{cluster_type_of, pubkey_of, pubkeys_of},
input_validators::{
is_parsable, is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage,
validate_maximum_full_snapshot_archives_to_retain,
is_within_range, validate_maximum_full_snapshot_archives_to_retain,
validate_maximum_incremental_snapshot_archives_to_retain,
},
},
Expand Down Expand Up @@ -72,6 +72,7 @@ use {
transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader},
},
solana_stake_program::stake_state::{self, PointValue},
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::{
self,
vote_state::{self, VoteState},
Expand Down Expand Up @@ -847,6 +848,16 @@ fn main() {
.hidden(hidden_unless_forced())
.help(BlockVerificationMethod::cli_message()),
)
.arg(
Arg::with_name("unified_scheduler_handler_threads")
.long("unified-scheduler-handler-threads")
Comment on lines +852 to +853
Copy link
Member Author

@ryoqun ryoqun Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outside unified-scheduler-pool code, i think handler thread count is too verbose and impl-oriented naming. so, I used handler threads intentionally. Also note that there's bunch of --*-threads cli flags in solana-validator as precedents. so, ending at -treads should be no-brainer.

on the other hand, handler is left for technical rigidness, because there's additional thread scScheduler, which are created for each scheduler. and the thread doesn't count towards to unified-scheduler-handler-threads.

.value_name("COUNT")
.takes_value(true)
.validator(|s| is_within_range(s, 1..))
Copy link
Member Author

@ryoqun ryoqun Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought about introducing some sensible upper bound like 1024, but opted not to do so because this flag doesn't need to be friendly that much and that restriction would hamper some pathological stress testing (again, this isn't strong point, as this is veerry niche & hypothetical use case).

.global(true)
.hidden(hidden_unless_forced())
.help(DefaultSchedulerPool::cli_message()),
Comment on lines +852 to +859
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the actual builder method invocation order and lack of .global(true) are the subtle differences from the solana-validator counterpart. however, this is my best attempt to be consistent within respective surrounding convention (btw, hard to see consistencies in each places; they're already inconsistent, thus this is my arbitrary best effort)...

)
.arg(
Arg::with_name("output_format")
.long("output")
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
generator_config: config.generator_config.clone(),
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
}
}

Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 74 additions & 20 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc, Mutex, Weak,
Arc, Mutex, OnceLock, Weak,
},
thread::{self, JoinHandle},
},
Expand All @@ -48,6 +48,7 @@ type AtomicSchedulerId = AtomicU64;
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
handler_count: usize,
handler_context: HandlerContext,
// weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to
// Arc<Self> from &Self, because SchedulerPool is used as in the form of Arc<SchedulerPool>
Expand Down Expand Up @@ -83,13 +84,20 @@ where
// Some internal impl and test code want an actual concrete type, NOT the
// `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`.
fn new(
handler_count: Option<usize>,
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(1);
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later

Comment on lines +93 to +97
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, --unified-scheduler-handler-threads is meaningless for now.

(diff would be a lot clearer if i had more foresight at #34676..)

Arc::new_cyclic(|weak_self| Self {
scheduler_inners: Mutex::default(),
handler_count,
handler_context: HandlerContext {
log_messages_bytes_limit,
transaction_status_sender,
Expand All @@ -105,12 +113,14 @@ where
// This apparently-meaningless wrapper is handy, because some callers explicitly want
// `dyn InstalledSchedulerPool` to be returned for type inference convenience.
pub fn new_dyn(
handler_count: Option<usize>,
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> InstalledSchedulerPoolArc {
Self::new(
handler_count,
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
Expand Down Expand Up @@ -145,6 +155,37 @@ where
S::spawn(self.self_arc(), context)
}
}

pub fn default_handler_count() -> usize {
Self::calculate_default_handler_count(
thread::available_parallelism()
.ok()
.map(|non_zero| non_zero.get()),
)
}

pub fn calculate_default_handler_count(detected_cpu_core_count: Option<usize>) -> usize {
// Divide by 4 just not to consume all available CPUs just with handler threads, sparing for
// other active forks and other subsystems.
// Also, if available_parallelism fails (which should be very rare), use 4 threads,
// as a relatively conservatism assumption of modern multi-core systems ranging from
// engineers' laptops to production servers.
detected_cpu_core_count
.map(|core_count| (core_count / 4).max(1))
.unwrap_or(4)
}

pub fn cli_message() -> &'static str {
static MESSAGE: OnceLock<String> = OnceLock::new();
Copy link
Member Author

@ryoqun ryoqun Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

back into the very beginnings of this looong unified scheduler journey, OnceLock hasn't been stabilized yet.. ;) ref: #30746 (review)


MESSAGE.get_or_init(|| {
format!(
"Change the number of the unified scheduler's transaction execution threads \
Copy link
Member Author

@ryoqun ryoqun Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while i'm leaving impl jargon (handler) inside the cli flag name, I intentionally avoided to use the word handler in this cli message, instead using the phrase transaction execution threads. That's my habit for achieving both grep-ablity and rephrase-based quick introduction of unfamiliar words (= the handler in the cli flag) for human.

dedicated to each block, otherwise calculated as cpu_cores/4 [default: {}]",
Copy link
Member Author

@ryoqun ryoqun Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dedicated to each block has rather important significance here to avoid misunderstanding of this number being the global thread count inside the whole solana-validator process.

also, this whole message is specifically worded to put other calculated as cpu_cores/4 at the end of this message, so that it's lexically close enough to the actual default value.

Self::default_handler_count()
)
})
}
}

impl<S, TH> InstalledSchedulerPool for SchedulerPool<S, TH>
Expand Down Expand Up @@ -372,7 +413,6 @@ pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_id: SchedulerId,
pool: Arc<SchedulerPool<S, TH>>,
handler_count: usize,
new_task_sender: Sender<NewTaskPayload>,
new_task_receiver: Receiver<NewTaskPayload>,
session_result_sender: Sender<Option<ResultWithTimings>>,
Expand All @@ -384,28 +424,24 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {

impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
let handler_count = 1;

Self::from_inner(
PooledSchedulerInner::<Self, TH> {
thread_manager: ThreadManager::new(pool, handler_count),
thread_manager: ThreadManager::new(pool),
},
initial_context,
)
}
}

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>, handler_count: usize) -> Self {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = unbounded();
let (session_result_sender, session_result_receiver) = unbounded();
let handler_count = pool.handler_count;

Self {
scheduler_id: pool.new_scheduler_id(),
pool,
handler_count,
new_task_sender,
new_task_receiver,
session_result_sender,
Expand Down Expand Up @@ -477,7 +513,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
let scheduler_main_loop = || {
let handler_count = self.handler_count;
let handler_count = self.pool.handler_count;
let session_result_sender = self.session_result_sender.clone();
let new_task_receiver = self.new_task_receiver.clone();

Expand Down Expand Up @@ -613,7 +649,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.unwrap(),
);

self.handler_threads = (0..self.handler_count)
self.handler_threads = (0..self.pool.handler_count)
.map({
|thx| {
thread::Builder::new()
Expand Down Expand Up @@ -760,7 +796,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);

// this indirectly proves that there should be circular link because there's only one Arc
// at this moment now
Expand All @@ -775,7 +811,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = SchedulingContext::new(bank);
let scheduler = pool.take_scheduler(context);
Expand All @@ -789,7 +825,8 @@ mod tests {
solana_logger::setup();

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);

Expand Down Expand Up @@ -817,7 +854,8 @@ mod tests {
solana_logger::setup();

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);
let mut scheduler = pool.do_take_scheduler(context.clone());
Expand All @@ -835,7 +873,8 @@ mod tests {
solana_logger::setup();

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let old_bank = &Arc::new(Bank::default_for_tests());
let new_bank = &Arc::new(Bank::default_for_tests());
assert!(!Arc::ptr_eq(old_bank, new_bank));
Expand All @@ -861,7 +900,7 @@ mod tests {
let mut bank_forks = bank_forks.write().unwrap();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
bank_forks.install_scheduler_pool(pool);
}

Expand All @@ -875,7 +914,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);

let bank = Bank::default_for_tests();
let bank_forks = BankForks::new_rw_arc(bank);
Expand Down Expand Up @@ -928,7 +967,7 @@ mod tests {
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());

assert_eq!(bank.transaction_count(), 0);
Expand All @@ -953,7 +992,7 @@ mod tests {

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());
let mut scheduler = pool.take_scheduler(context);

Expand Down Expand Up @@ -1159,6 +1198,7 @@ mod tests {
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let scheduler = pool.take_scheduler(context);
Expand Down Expand Up @@ -1193,4 +1233,18 @@ mod tests {
fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() {
do_test_scheduler_schedule_execution_recent_blockhash_edge_case::<false>();
}

#[test]
fn test_default_handler_count() {
for (detected, expected) in [(32, 8), (4, 1), (2, 1)] {
assert_eq!(
DefaultSchedulerPool::calculate_default_handler_count(Some(detected)),
expected
);
}
assert_eq!(
DefaultSchedulerPool::calculate_default_handler_count(None),
4
);
}
}
1 change: 1 addition & 0 deletions validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ solana-streamer = { workspace = true }
solana-svm = { workspace = true }
solana-test-validator = { workspace = true }
solana-tpu-client = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }
solana-vote-program = { workspace = true }
symlink = { workspace = true }
Expand Down
Loading
Loading