Skip to content

Commit

Permalink
TransactionScheduler: Schedule Filter (solana-labs#34252)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Nov 30, 2023
1 parent 479b7ee commit 18309ba
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 31 deletions.
113 changes: 85 additions & 28 deletions core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
crossbeam_channel::{Receiver, Sender, TryRecvError},
itertools::izip,
prio_graph::{AccessKind, PrioGraph},
solana_measure::measure_us,
solana_sdk::{
pubkey::Pubkey, saturating_add_assign, slot_history::Slot,
transaction::SanitizedTransaction,
Expand Down Expand Up @@ -47,6 +48,8 @@ impl PrioGraphScheduler {

/// Schedule transactions from the given `TransactionStateContainer` to be consumed by the
/// worker threads. Returns summary of scheduling, or an error.
/// `filter` is used to filter out transactions that should be skipped and dropped, and
/// should set `false` for transactions that should be dropped, and `true` otherwise.
///
/// Uses a `PrioGraph` to perform look-ahead during the scheduling of transactions.
/// This, combined with internal tracking of threads' in-flight transactions, allows
Expand All @@ -55,6 +58,7 @@ impl PrioGraphScheduler {
pub(crate) fn schedule(
&mut self,
container: &mut TransactionStateContainer,
filter: impl Fn(&[&SanitizedTransaction], &mut [bool]),
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let mut batches = Batches::new(num_threads);
Expand All @@ -67,15 +71,60 @@ impl PrioGraphScheduler {
let mut blocking_locks = ReadWriteAccountSet::default();
let mut prio_graph = PrioGraph::new(|id: &TransactionPriorityId, _graph_node| *id);

// Create the initial look-ahead window.
for _ in 0..self.look_ahead_window_size {
let Some(id) = container.pop() else {
break;
};
// Track metrics on filter.
let mut num_filtered_out: usize = 0;
let mut total_filter_time_us: u64 = 0;

let mut window_budget = self.look_ahead_window_size;
let mut chunked_pops = |container: &mut TransactionStateContainer,
prio_graph: &mut PrioGraph<_, _, _, _>,
window_budget: &mut usize| {
while *window_budget > 0 {
const MAX_FILTER_CHUNK_SIZE: usize = 128;
let mut filter_array = [true; MAX_FILTER_CHUNK_SIZE];
let mut ids = Vec::with_capacity(MAX_FILTER_CHUNK_SIZE);
let mut txs = Vec::with_capacity(MAX_FILTER_CHUNK_SIZE);

let chunk_size = (*window_budget).min(MAX_FILTER_CHUNK_SIZE);
for _ in 0..chunk_size {
if let Some(id) = container.pop() {
ids.push(id);
} else {
break;
}
}
*window_budget = window_budget.saturating_sub(chunk_size);

ids.iter().for_each(|id| {
let transaction = container.get_transaction_ttl(&id.id).unwrap();
txs.push(&transaction.transaction);
});

let (_, filter_us) = measure_us!(filter(&txs, &mut filter_array[..chunk_size]));
saturating_add_assign!(total_filter_time_us, filter_us);

for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) {
if *filter_result {
let transaction = container.get_transaction_ttl(&id.id).unwrap();
prio_graph.insert_transaction(
*id,
Self::get_transaction_account_access(transaction),
);
} else {
saturating_add_assign!(num_filtered_out, 1);
container.remove_by_id(&id.id);
}
}

let transaction = container.get_transaction_ttl(&id.id).unwrap();
prio_graph.insert_transaction(id, Self::get_transaction_account_access(transaction));
}
if ids.len() != chunk_size {
break;
}
}
};

// Create the initial look-ahead window.
// Check transactions against filter, remove from container if it fails.
chunked_pops(container, &mut prio_graph, &mut window_budget);

let mut unblock_this_batch =
Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH);
Expand All @@ -92,15 +141,6 @@ impl PrioGraphScheduler {
while let Some(id) = prio_graph.pop() {
unblock_this_batch.push(id);

// Push next transaction from container into the `PrioGraph` look-ahead window.
if let Some(next_id) = container.pop() {
let transaction = container.get_transaction_ttl(&next_id.id).unwrap();
prio_graph.insert_transaction(
next_id,
Self::get_transaction_account_access(transaction),
);
}

// Should always be in the container, during initial testing phase panic.
// Later, we can replace with a continue in case this does happen.
let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else {
Expand Down Expand Up @@ -175,6 +215,10 @@ impl PrioGraphScheduler {
// Send all non-empty batches
saturating_add_assign!(num_sent, self.send_batches(&mut batches)?);

// Refresh window budget and do chunked pops
saturating_add_assign!(window_budget, unblock_this_batch.len());
chunked_pops(container, &mut prio_graph, &mut window_budget);

// Unblock all transactions that were blocked by the transactions that were just sent.
for id in unblock_this_batch.drain(..) {
prio_graph.unblock(&id);
Expand Down Expand Up @@ -202,6 +246,8 @@ impl PrioGraphScheduler {
Ok(SchedulingSummary {
num_scheduled,
num_unschedulable,
num_filtered_out,
filter_time_us: total_filter_time_us,
})
}

Expand Down Expand Up @@ -393,6 +439,10 @@ pub(crate) struct SchedulingSummary {
pub num_scheduled: usize,
/// Number of transactions that were not scheduled due to conflicts.
pub num_unschedulable: usize,
/// Number of transactions that were dropped due to filter.
pub num_filtered_out: usize,
/// Time spent filtering transactions
pub filter_time_us: u64,
}

struct Batches {
Expand Down Expand Up @@ -551,14 +601,18 @@ mod tests {
.unzip()
}

fn test_filter(_txs: &[&SanitizedTransaction], results: &mut [bool]) {
results.fill(true);
}

#[test]
fn test_schedule_disconnected_channel() {
let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1);
let mut container = create_container([(&Keypair::new(), &[Pubkey::new_unique()], 1, 1)]);

drop(work_receivers); // explicitly drop receivers
assert_matches!(
scheduler.schedule(&mut container),
scheduler.schedule(&mut container, test_filter),
Err(SchedulerError::DisconnectedSendChannel(_))
);
}
Expand All @@ -571,7 +625,7 @@ mod tests {
(&Keypair::new(), &[Pubkey::new_unique()], 2, 2),
]);

let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]);
Expand All @@ -586,7 +640,7 @@ mod tests {
(&Keypair::new(), &[pubkey], 1, 2),
]);

let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(
Expand All @@ -604,7 +658,7 @@ mod tests {
);

// expect 4 full batches to be scheduled
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(
scheduling_summary.num_scheduled,
4 * TARGET_NUM_TRANSACTIONS_PER_BATCH
Expand All @@ -624,7 +678,7 @@ mod tests {
let mut container =
create_container((0..4).map(|i| (Keypair::new(), [Pubkey::new_unique()], 1, i)));

let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 4);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]);
Expand Down Expand Up @@ -656,7 +710,7 @@ mod tests {
// fact they eventually join means that the scheduler will schedule them
// onto the same thread to avoid causing [4], which conflicts with both
// chains, to be un-schedulable.
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 5);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(
Expand Down Expand Up @@ -697,15 +751,18 @@ mod tests {
// Because the look-ahead window is shortened to a size of 4, the scheduler does
// not have knowledge of the joining at transaction [4] until after [0] and [1]
// have been scheduled.
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 4);
assert_eq!(scheduling_summary.num_unschedulable, 2);
let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]);
assert_eq!(thread_0_ids, [txids!([0, 2])]);
assert_eq!(collect_work(&work_receivers[1]).1, [txids!([1, 3])]);
assert_eq!(thread_0_ids, [txids!([0]), txids!([2])]);
assert_eq!(
collect_work(&work_receivers[1]).1,
[txids!([1]), txids!([3])]
);

// Cannot schedule even on next pass because of lock conflicts
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 0);
assert_eq!(scheduling_summary.num_unschedulable, 2);

Expand All @@ -717,7 +774,7 @@ mod tests {
})
.unwrap();
scheduler.receive_completed(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container).unwrap();
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
crossbeam_channel::RecvTimeoutError,
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_measure::measure_us,
solana_runtime::bank_forks::BankForks,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::MAX_PROCESSING_AGE, saturating_add_assign, timing::AtomicInterval,
transaction::SanitizedTransaction,
Expand Down Expand Up @@ -113,9 +113,13 @@ impl SchedulerController {
decision: &BufferedPacketsDecision,
) -> Result<(), SchedulerError> {
match decision {
BufferedPacketsDecision::Consume(_bank_start) => {
BufferedPacketsDecision::Consume(bank_start) => {
let (scheduling_summary, schedule_time_us) =
measure_us!(self.scheduler.schedule(&mut self.container)?);
measure_us!(self
.scheduler
.schedule(&mut self.container, |txs, results| {
Self::pre_scheduling_filter(txs, results, &bank_start.working_bank)
})?);
saturating_add_assign!(
self.count_metrics.num_scheduled,
scheduling_summary.num_scheduled
Expand All @@ -124,6 +128,14 @@ impl SchedulerController {
self.count_metrics.num_unschedulable,
scheduling_summary.num_unschedulable
);
saturating_add_assign!(
self.count_metrics.num_schedule_filtered_out,
scheduling_summary.num_filtered_out
);
saturating_add_assign!(
self.timing_metrics.schedule_filter_time_us,
scheduling_summary.filter_time_us
);
saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us);
}
BufferedPacketsDecision::Forward => {
Expand All @@ -140,6 +152,25 @@ impl SchedulerController {
Ok(())
}

fn pre_scheduling_filter(
transactions: &[&SanitizedTransaction],
results: &mut [bool],
bank: &Bank,
) {
let lock_results = vec![Ok(()); transactions.len()];
let mut error_counters = TransactionErrorMetrics::default();
let check_results = bank.check_transactions(
transactions,
&lock_results,
MAX_PROCESSING_AGE,
&mut error_counters,
);

for ((check_result, _), result) in check_results.into_iter().zip(results.iter_mut()) {
*result = check_result.is_ok();
}
}

/// Clears the transaction state container.
/// This only clears pending transactions, and does **not** clear in-flight transactions.
fn clear_container(&mut self) {
Expand Down Expand Up @@ -315,6 +346,8 @@ struct SchedulerCountMetrics {
num_scheduled: usize,
/// Number of transactions that were unschedulable.
num_unschedulable: usize,
/// Number of transactions that were filtered out during scheduling.
num_schedule_filtered_out: usize,
/// Number of completed transactions received from workers.
num_finished: usize,
/// Number of transactions that were retryable.
Expand Down Expand Up @@ -352,6 +385,11 @@ impl SchedulerCountMetrics {
("num_buffered", self.num_buffered, i64),
("num_scheduled", self.num_scheduled, i64),
("num_unschedulable", self.num_unschedulable, i64),
(
"num_schedule_filtered_out",
self.num_schedule_filtered_out,
i64
),
("num_finished", self.num_finished, i64),
("num_retryable", self.num_retryable, i64),
("num_dropped_on_receive", self.num_dropped_on_receive, i64),
Expand Down Expand Up @@ -380,6 +418,7 @@ impl SchedulerCountMetrics {
|| self.num_buffered != 0
|| self.num_scheduled != 0
|| self.num_unschedulable != 0
|| self.num_schedule_filtered_out != 0
|| self.num_finished != 0
|| self.num_retryable != 0
|| self.num_dropped_on_receive != 0
Expand All @@ -395,6 +434,7 @@ impl SchedulerCountMetrics {
self.num_buffered = 0;
self.num_scheduled = 0;
self.num_unschedulable = 0;
self.num_schedule_filtered_out = 0;
self.num_finished = 0;
self.num_retryable = 0;
self.num_dropped_on_receive = 0;
Expand All @@ -415,6 +455,8 @@ struct SchedulerTimingMetrics {
receive_time_us: u64,
/// Time spent buffering packets.
buffer_time_us: u64,
/// Time spent filtering transactions during scheduling.
schedule_filter_time_us: u64,
/// Time spent scheduling transactions.
schedule_time_us: u64,
/// Time spent clearing transactions from the container.
Expand Down Expand Up @@ -442,6 +484,7 @@ impl SchedulerTimingMetrics {
("decision_time_us", self.decision_time_us, i64),
("receive_time_us", self.receive_time_us, i64),
("buffer_time_us", self.buffer_time_us, i64),
("schedule_filter_time_us", self.schedule_filter_time_us, i64),
("schedule_time_us", self.schedule_time_us, i64),
("clear_time_us", self.clear_time_us, i64),
("clean_time_us", self.clean_time_us, i64),
Expand All @@ -457,6 +500,7 @@ impl SchedulerTimingMetrics {
self.decision_time_us = 0;
self.receive_time_us = 0;
self.buffer_time_us = 0;
self.schedule_filter_time_us = 0;
self.schedule_time_us = 0;
self.clear_time_us = 0;
self.clean_time_us = 0;
Expand Down

0 comments on commit 18309ba

Please sign in to comment.