Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransactionScheduler: MultiIteratorConsumeScheduler #33332

Closed

Conversation

apfitzge
Copy link
Contributor

@apfitzge apfitzge commented Sep 20, 2023

Problem

Multi-Iterator-based scheduler for consuming transactions.

Summary of Changes

  • Add a central multi-iterator scheduler which generates batches for execution
  • Each mutli-iterator inner-iteration, the scheduler will attempt to create a batch per thread
  • Checks are in-place to ensure that each batch:
    • has no conflicting locks,
    • locks do not conflict with batches on other threads (currently being built or already scheduled),
    • locks are not taken on accounts for needed by higher-priority unschedulable txs (multiple thread conflicts)

Fixes #

@apfitzge apfitzge force-pushed the scheduler/consume_scheduling branch 4 times, most recently from 8a35f64 to c222ce8 Compare September 21, 2023 00:07
@codecov
Copy link

codecov bot commented Sep 21, 2023

Codecov Report

Merging #33332 (22bb0a5) into master (3b1cbae) will increase coverage by 0.0%.
The diff coverage is 96.6%.

@@           Coverage Diff            @@
##           master   #33332    +/-   ##
========================================
  Coverage    81.9%    81.9%            
========================================
  Files         796      798     +2     
  Lines      215861   216168   +307     
========================================
+ Hits       176938   177211   +273     
- Misses      38923    38957    +34     

@apfitzge apfitzge marked this pull request as ready for review September 22, 2023 16:27
Copy link
Contributor

@tao-stones tao-stones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat code 👍🏼 , a lot to unpack with MI and stuff. Some comments/questions for the first round.

solana_sdk::{clock::Slot, transaction::SanitizedTransaction},
};

const QUEUED_TRANSACTION_LIMIT: usize = 64 * 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps comment on why 64 and 100?

let num_scheduled = scheduler.schedule(&mut container).unwrap();
assert_eq!(num_scheduled, 4);
assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]);
assert_eq!(collect_work(&work_receivers[1]).1, [txids!([2, 0])]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why round robin when scheduling non-conflict txs, instead of putting all non-conflict txs into one threads? Former yields 2 small batch, later ends with one larger batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we have N distinct markets near the top of our queue.

Conflict graph might look something like this:

graph LR;

A1((A1)) --> A2((A2)) --> A3((A3)) --> A4((..))
B1((B1)) --> B2((B2)) --> B3((B3)) --> B4((..))
C1((C1)) --> C2((C2)) --> C3((C3)) --> C4((..))
D1((D1)) --> D2((D2)) --> D3((D3)) --> D4((..))
Loading

This view makes it somewhat clear a reasonable choice is to have A txs on one thread, B txs on another, and so on.

But the priority queue might look like this:

graph LR;
A1((A1));
A2((A2));
A3((A3));
A4((A4));

B1((B1));
B2((B2));
B3((B3));
B4((B4));

C1((C1));
C2((C2));
C3((C3));
C4((C4));

D1((D1));
D2((D2));
D3((D3));
D4((D4));

A1 --> A2 --> B1 --> C1 --> B2 --> D1 --> B3 --> A3 --> D2 --> C2 --> C3 --> A4 --> B4 --> C4 --> D3 --> D4
Loading

I'll just walk through the first few steps here, and maybe that will clear it up. Assume we start with a completely empty schedule:

  1. See A1 - can be scheduled on any thread. Choose thread with fewest scheduled. Send to thread 0.
  2. See A2 - skip for this inner-iteration since it conflicts with A1
  3. See B1 - can be scheduled on any thread. Choose thread with fewest scheduled. Send to thread 1.
  4. See C1 - can be scheduled on any thread. Choose thread with fewest scheduled. Send to thread 2.
  5. See B2 - skip for this inner-iteration since it conflicts with B1
  6. See D1 - can be scheduled on any thread. Choose thread with fewest scheduled. Send to thread 3.
  7. Remaining iterators reach end of queue since there's no more txs that don't conflict. 4 iterators are at A1, B1, C1, D1.
  8. See A2 - can only be scheduled on thread 0. Send to thread 0.
  9. See B2 - can only be scheduled on thread 1. Send to thread 1.

And so on. By round-robining non-conflicting txs at the beginning of our MI, we are effectively distributing the distinct top-of-queue conflict chains into our threads.

Obviously this is not perfect, since if we have non-conflicting txs at top-of-queue, but the txs have shared lower-priority conflicts (i.e. lower priority that conflict with both A1 and B1 for example) will cause later blocks where we have to wait for txs to finish.

assert_eq!(collect_work(&work_receivers[1]).1, [txids!([1])]);

// Cannot schedule even on next pass because of lock conflicts
let num_scheduled = scheduler.schedule(&mut container).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still bit concerned about container being a snapshot during scheduling cycle. If extend this test case to real world of 4 threads and tons of transactions want to write lock Openbook account, would be better if container can take in newly received higher prio txs between scheduling to help prio fee works better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshotting the top-of-queue is an artifact of the MI approach, as we need a slice to iterate over.

Effectively, this is running multiple priority auctions within each slot - basically as often as the MI outer iteration(s) take. Where we allow new txs to enter the auction between outer iterations.

I agree streaming would probably be better. However, while we still require non-conflicting batches the MI approach is an efficient way to create these batches.

We still see significant performance benefit from batching, so I don't think we want to get rid of it entirely. See #33332 (comment) - we also want to build these batches at the same time to distribute the work parallelly.

So let's look at what we could do instead of serializing the top-of-queue.

  1. Pop - assign to thread 0
  2. Pop - if conflict, move to some "blocked" buffer. Otherwise schedule to thread 1
  3. Pop - if conflict, move to some "blocked" buffer. Otherwise schedule to thread 2
  4. ...
    Once our batches are full or we want to send them, we will need to push our "blocked" transactions back into our main queue (which may have been receiving while we were doing this). This may only push some subset of them in, because we can only unblock the ones that would be now (likely) be unblocked; however, it's still an O(nlogn) operation for every set of batches we send out. MI only pushes back into the main queue at the end of the outer iteration, each inner iteration is O(n).

Once we don't need to scan for non-conflicting txs, we might be able to take a streaming pop approach. As most conflicts would just be schedulable in the current batches - although we still need some sort of blocking buffer because conflicts across threads.

.map(|thread_id| {
(
thread_id,
batches_per_thread[thread_id].len() + in_flight_per_thread[thread_id],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to handle out of bounds error here?

let outstanding = self.in_flight_tracker.num_in_flight_per_thread();
for (thread_id, outstanding_count) in outstanding.iter().enumerate() {
if *outstanding_count > QUEUED_TRANSACTION_LIMIT {
schedulable_threads.remove(thread_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a test to demonstrate how thread are removed then add back to scheduler due to change of outstanding trnasactions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a test to test the removal, but no test to check if it was added back in.

const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000;
let target_scanner_batch_size: usize = TARGET_NUM_TRANSACTIONS_PER_BATCH * num_threads;
let ids = container
.take_top_n(MAX_TRANSACTIONS_PER_SCHEDULING_PASS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current banking_stage has 700_000 buffer limit, so theoretically that's the limit per pass. Wounding why 100_000 is chosen here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just performance. It's very unlikely we'll schedule 700k before the end of a slot, so just took the top 100k - which is still unlikely to happen but will be faster to serialize the top-of-queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would taking 10K results too much overhead? Otherwise, it'd be even faster, and likely visits priority queue multiple times per slot (to give prioritized txs more chances for landing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can test w/ using 10k, since that's probably large enough.
We do want it to be a fairly large buffer because the MI will look ahead for non-conflicting txs.
So if we have some really competitive event (NFT mints for example), we may have nearly all our top-of-queue txs conflicting, MI would "look past" them and schedule some non-conflicting txs.

100k is nearly always going to have some non-conflicting txs - 10k will also probably nearly always have non-conflicting as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, there might be a sweet spot for the size and frequency of snapshot, and most depends on the traffic pattern. Will be good to have some data to help reasoning about it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging this pull request may close these issues.

2 participants