Skip to content

Commit

Permalink
scheduler prioritize by fees/cost
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Jan 23, 2024
1 parent bb829c0 commit a155566
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,13 @@ impl PrioGraphScheduler {
pub fn receive_completed(
&mut self,
container: &mut TransactionStateContainer,
lamports_per_signature: u64,
) -> Result<(usize, usize), SchedulerError> {
let mut total_num_transactions: usize = 0;
let mut total_num_retryable: usize = 0;
loop {
let (num_transactions, num_retryable) = self.try_receive_completed(container)?;
let (num_transactions, num_retryable) =
self.try_receive_completed(container, lamports_per_signature)?;
if num_transactions == 0 {
break;
}
Expand All @@ -276,6 +278,7 @@ impl PrioGraphScheduler {
fn try_receive_completed(
&mut self,
container: &mut TransactionStateContainer,
lamports_per_signature: u64,
) -> Result<(usize, usize), SchedulerError> {
match self.finished_consume_work_receiver.try_recv() {
Ok(FinishedConsumeWork {
Expand Down Expand Up @@ -307,6 +310,7 @@ impl PrioGraphScheduler {
transaction,
max_age_slot,
},
lamports_per_signature,
);
retryable_iter.next();
continue;
Expand Down Expand Up @@ -500,6 +504,8 @@ mod tests {
std::borrow::Borrow,
};

const TEST_LAMPORTS_PER_SIGNATURE: u64 = 5_000;

macro_rules! txid {
($value:expr) => {
TransactionId::new($value)
Expand Down Expand Up @@ -581,6 +587,7 @@ mod tests {
compute_unit_limit: 1,
},
transaction_cost,
TEST_LAMPORTS_PER_SIGNATURE,
);
}

Expand Down Expand Up @@ -753,7 +760,9 @@ mod tests {
retryable_indexes: vec![],
})
.unwrap();
scheduler.receive_completed(&mut container).unwrap();
scheduler
.receive_completed(&mut container, TEST_LAMPORTS_PER_SIGNATURE)
.unwrap();
let scheduling_summary = scheduler
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,11 @@ impl SchedulerController {

/// Receives completed transactions from the workers and updates metrics.
fn receive_completed(&mut self) -> Result<(), SchedulerError> {
let ((num_transactions, num_retryable), receive_completed_time_us) =
measure_us!(self.scheduler.receive_completed(&mut self.container)?);
let bank = self.bank_forks.read().unwrap().working_bank();
let lamports_per_signature = bank.get_lamports_per_signature();
let ((num_transactions, num_retryable), receive_completed_time_us) = measure_us!(self
.scheduler
.receive_completed(&mut self.container, lamports_per_signature)?);
saturating_add_assign!(self.count_metrics.num_finished, num_transactions);
saturating_add_assign!(self.count_metrics.num_retryable, num_retryable);
saturating_add_assign!(
Expand Down Expand Up @@ -299,6 +302,7 @@ impl SchedulerController {
fn buffer_packets(&mut self, packets: Vec<ImmutableDeserializedPacket>) {
// Sanitize packets, generate IDs, and insert into the container.
let bank = self.bank_forks.read().unwrap().working_bank();
let lamports_per_signature = bank.get_lamports_per_signature();
let last_slot_in_epoch = bank.epoch_schedule().get_last_slot_in_epoch(bank.epoch());
let transaction_account_lock_limit = bank.get_transaction_account_lock_limit();
let feature_set = &bank.feature_set;
Expand Down Expand Up @@ -355,6 +359,7 @@ impl SchedulerController {
transaction_ttl,
priority_details,
transaction_cost,
lamports_per_signature,
) {
saturating_add_assign!(self.count_metrics.num_dropped_on_capacity, 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ impl TransactionStateContainer {
transaction_ttl: SanitizedTransactionTTL,
transaction_priority_details: TransactionPriorityDetails,
transaction_cost: TransactionCost,
lamports_per_signature: u64,
) -> bool {
let priority_id =
TransactionPriorityId::new(transaction_priority_details.priority, transaction_id);
self.id_to_transaction_state.insert(
transaction_id,
TransactionState::new(
transaction_ttl,
transaction_priority_details,
transaction_cost,
),
let state = TransactionState::new(
transaction_ttl,
transaction_priority_details,
transaction_cost,
);
let priority = Self::calculate_prioritization(lamports_per_signature, &state);
self.id_to_transaction_state.insert(transaction_id, state);

let priority_id = TransactionPriorityId::new(priority, transaction_id);
self.push_id_into_queue(priority_id)
}

Expand All @@ -120,11 +120,13 @@ impl TransactionStateContainer {
&mut self,
transaction_id: TransactionId,
transaction_ttl: SanitizedTransactionTTL,
lamports_per_signature: u64,
) {
let transaction_state = self
.get_mut_transaction_state(&transaction_id)
.expect("transaction must exist");
let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id);
let priority = Self::calculate_prioritization(lamports_per_signature, transaction_state);
let priority_id = TransactionPriorityId::new(priority, transaction_id);
transaction_state.transition_to_unprocessed(transaction_ttl);
self.push_id_into_queue(priority_id);
}
Expand All @@ -149,6 +151,16 @@ impl TransactionStateContainer {
.remove(id)
.expect("transaction must exist");
}

/// Calculate prioritization for a transaction.
fn calculate_prioritization(lamports_per_signature: u64, state: &TransactionState) -> u64 {
let compute_unit_price = state.transaction_priority_details().priority;
let signature_fees =
lamports_per_signature * state.transaction_ttl().transaction.signatures().len() as u64;
let cost = state.transaction_cost().sum();

signature_fees / cost + compute_unit_price
}
}

#[cfg(test)]
Expand All @@ -169,6 +181,8 @@ mod tests {
},
};

const TEST_LAMPORTS_PER_SIGNATURE: u64 = 5_000;

fn test_transaction(
priority: u64,
) -> (
Expand Down Expand Up @@ -216,6 +230,7 @@ mod tests {
transaction_ttl,
transaction_priority_details,
transaction_cost,
TEST_LAMPORTS_PER_SIGNATURE,
);
}
}
Expand Down

0 comments on commit a155566

Please sign in to comment.