Skip to content

Commit

Permalink
Make unified scheduler's new task code fallible
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Apr 27, 2024
1 parent 90bea33 commit 410dbc5
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 49 deletions.
8 changes: 6 additions & 2 deletions core/tests/unified_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use {
std::{
collections::HashMap,
sync::{Arc, Mutex},
thread::sleep,
time::Duration,
},
};

Expand All @@ -55,7 +57,7 @@ fn test_scheduler_waited_by_drop_bank_service() {
info!("Stalling at StallingHandler::handle()...");
*LOCK_TO_STALL.lock().unwrap();
// Wait a bit for the replay stage to prune banks
std::thread::sleep(std::time::Duration::from_secs(3));
sleep(Duration::from_secs(3));
info!("Now entering into DefaultTaskHandler::handle()...");

DefaultTaskHandler::handle(result, timings, bank, transaction, index, handler_context);
Expand Down Expand Up @@ -107,7 +109,9 @@ fn test_scheduler_waited_by_drop_bank_service() {
// Delay transaction execution to ensure transaction execution happens after termintion has
// been started
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
pruned_bank.schedule_transaction_executions([(&tx, &0)].into_iter());
pruned_bank
.schedule_transaction_executions([(&tx, &0)].into_iter())
.unwrap();
drop(pruned_bank);
assert_eq!(pool_raw.pooled_scheduler_count(), 0);
drop(lock_to_stall);
Expand Down
10 changes: 5 additions & 5 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,7 @@ fn process_batches(
// scheduling always succeeds here without being blocked on actual transaction executions.
// The transaction execution errors will be collected via the blocking fn called
// BankWithScheduler::wait_for_completed_scheduler(), if any.
schedule_batches_for_execution(bank, batches);
Ok(())
schedule_batches_for_execution(bank, batches)
} else {
debug!(
"process_batches()/rebatch_and_execute_batches({} batches)",
Expand All @@ -364,7 +363,7 @@ fn process_batches(
fn schedule_batches_for_execution(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
) {
) -> Result<()> {
for TransactionBatchWithIndexes {
batch,
transaction_indexes,
Expand All @@ -375,8 +374,9 @@ fn schedule_batches_for_execution(
.sanitized_transactions()
.iter()
.zip(transaction_indexes.iter()),
);
)?;
}
Ok(())
}

fn rebatch_transactions<'a>(
Expand Down Expand Up @@ -4766,7 +4766,7 @@ pub mod tests {
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
.returning(|_| Ok(()));
mocked_scheduler
.expect_wait_for_termination()
.with(mockall::predicate::eq(true))
Expand Down
22 changes: 13 additions & 9 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use {
log::*,
solana_program_runtime::timings::ExecuteTimings,
solana_sdk::{
clock::Slot,
hash::Hash,
slot_history::Slot,
transaction::{Result, SanitizedTransaction},
},
std::{
fmt::Debug,
ops::Deref,
sync::{Arc, RwLock},
thread,
},
};
#[cfg(feature = "dev-context-only-utils")]
Expand Down Expand Up @@ -105,7 +106,7 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static {
fn schedule_execution<'a>(
&'a self,
transaction_with_index: &'a (&'a SanitizedTransaction, usize),
);
) -> Result<()>;

/// Wait for a scheduler to terminate after processing.
///
Expand Down Expand Up @@ -290,7 +291,7 @@ impl BankWithScheduler {
pub fn schedule_transaction_executions<'a>(
&self,
transactions_with_indexes: impl ExactSizeIterator<Item = (&'a SanitizedTransaction, &'a usize)>,
) {
) -> Result<()> {
trace!(
"schedule_transaction_executions(): {} txs",
transactions_with_indexes.len()
Expand All @@ -300,8 +301,10 @@ impl BankWithScheduler {
let scheduler = scheduler_guard.as_ref().unwrap();

for (sanitized_transaction, &index) in transactions_with_indexes {
scheduler.schedule_execution(&(sanitized_transaction, index));
scheduler.schedule_execution(&(sanitized_transaction, index))?;
}

Ok(())
}

// take needless &mut only to communicate its semantic mutability to humans...
Expand Down Expand Up @@ -356,7 +359,7 @@ impl BankWithSchedulerInner {
"wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
bank.slot(),
reason,
std::thread::current(),
thread::current(),
);

let mut scheduler = scheduler.write().unwrap();
Expand All @@ -378,14 +381,14 @@ impl BankWithSchedulerInner {
reason,
was_noop,
result_with_timings.as_ref().map(|(result, _)| result),
std::thread::current(),
thread::current(),
);

result_with_timings
}

fn drop_scheduler(&self) {
if std::thread::panicking() {
if thread::panicking() {
error!(
"BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
self.bank.slot(),
Expand Down Expand Up @@ -573,11 +576,12 @@ mod tests {
mocked
.expect_schedule_execution()
.times(1)
.returning(|(_, _)| ());
.returning(|(_, _)| Ok(()));
}),
);

let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
bank.schedule_transaction_executions([(&tx0, &0)].into_iter());
bank.schedule_transaction_executions([(&tx0, &0)].into_iter())
.unwrap();
}
}
Loading

0 comments on commit 410dbc5

Please sign in to comment.