Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[executor] Introduce on-the-fly scheduling for Read/Allocate/Write keys #814

Merged
merged 52 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7cabcd7
[executor] refactor worker func
wlawt Mar 28, 2024
8f1d6f9
[executor] renaming
wlawt Mar 28, 2024
af221fc
[executor] use blocking/dependency type from programmatic-deploy fork
wlawt Mar 28, 2024
38d0d12
[executor] add key type support
wlawt Mar 28, 2024
adcedb6
wip
wlawt Apr 2, 2024
e376a81
wip
wlawt Apr 2, 2024
35c7936
switch statements
wlawt Apr 3, 2024
54d739a
programmatic deploy fork + concurrent reads work
wlawt Apr 3, 2024
4f713d2
sequential writes work
wlawt Apr 3, 2024
46bbffb
w->r->r works
wlawt Apr 3, 2024
2cb8686
r->r->w works
wlawt Apr 3, 2024
f0a6a7f
w->r->r...w->r->r works
wlawt Apr 3, 2024
171c25c
r->r->w...r->r->w works
wlawt Apr 3, 2024
d22fd32
all unit tests pass
wlawt Apr 3, 2024
5665181
cleanup
wlawt Apr 3, 2024
c2d0a92
coverage 97%
wlawt Apr 3, 2024
95555a5
cleanup switch
wlawt Apr 3, 2024
05fd669
switch comments
wlawt Apr 3, 2024
c892947
self review
wlawt Apr 3, 2024
f50423a
go mod tidy
wlawt Apr 4, 2024
17ad618
add done prints to tests
wlawt Apr 9, 2024
bf5ffa1
rename isAllocateWrite
wlawt Apr 9, 2024
a64d5db
remove id from task
wlawt Apr 9, 2024
eac02e1
use dummy dep to keep track of all keys
wlawt Apr 9, 2024
71975ad
simplify for loop in adding bt dep
wlawt Apr 9, 2024
d3074af
fix integration bug and add unit test for it
wlawt Apr 9, 2024
8783dc8
fix race condition with write-after-read(s) potentially
wlawt Apr 10, 2024
bd5bbab
run unit tests over multiple iterations
wlawt Apr 10, 2024
b3b2353
cut down on unit testing time and add comments
wlawt Apr 10, 2024
553c5d7
simplify num times we call add blocking if not exec
wlawt Apr 10, 2024
2a9f12e
new way to keep track of concurrent Reads
wlawt Apr 11, 2024
1d2f636
self review
wlawt Apr 11, 2024
dd0acff
have unit tests run under max time
wlawt Apr 11, 2024
dafea5b
[executor] Simplify Logic + Speed Up Tests (#831)
patrick-ogrady Apr 12, 2024
6398126
better var names
wlawt Apr 15, 2024
42e2250
add larger unit tests
wlawt Apr 15, 2024
b1bbf9e
ignore lint for rand
wlawt Apr 16, 2024
09a732b
add unique and conflicting keys randomly to txs
wlawt Apr 16, 2024
6d084f7
fix for loops
wlawt Apr 16, 2024
08362f1
use max function
wlawt Apr 16, 2024
159e9a2
make conflict keys 100 and pick 1-5
wlawt Apr 16, 2024
ad56cbd
make num slow chan consistent
wlawt Apr 16, 2024
2d346d4
use set.Contains to speed up tests
wlawt Apr 16, 2024
d3b4643
random perm for each unique key
wlawt Apr 16, 2024
ef41d6a
group var names
wlawt Apr 16, 2024
53aeadd
use numTxs in generating blocking txs
wlawt Apr 16, 2024
79cef56
increase num conflict keys for concurrent Reads and Writes test
wlawt Apr 16, 2024
fd2bdde
[executor] multi-key conflict bug (#837)
wlawt Apr 17, 2024
180d43f
random perm per conflict key
wlawt Apr 16, 2024
6a9b6b3
make maxDep a param
wlawt Apr 17, 2024
2009f95
add maxDep as const in chain
wlawt Apr 17, 2024
a14e434
placement of maxDep comment
wlawt Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,17 @@ import (
uatomic "go.uber.org/atomic"
)

// maxDependencies must be greater than the maximum number of dependencies
// any single task could have. This is used to ensure a dependent task
// does not begin executing a task until all dependencies have been enqueued.
const maxDependencies = 100_000_000

// Executor sequences the concurrent execution of
// tasks with arbitrary conflicts on-the-fly.
//
// Executor ensures that conflicting tasks
// are executed in the order they were queued.
// Tasks with no conflicts are executed immediately.
//
// It is assumed that no single task has more than [maxDependencies]. If
// this invariant is violated, some tasks will never execute and this code
// could deadlock.
// It is assumed that no single task has more than [maxDependencies].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be above New imo.

// This is used to ensure a dependent task does not begin executing a
// task until all dependencies have been enqueued. If this invariant is
// violated, some tasks will never execute and this code could deadlock.
type Executor struct {
metrics Metrics

Expand All @@ -37,18 +33,20 @@ type Executor struct {
outstanding sync.WaitGroup
executable chan *task

tasks int
nodes map[string]*task
tasks int
maxDependencies int64
nodes map[string]*task

err uatomic.Error
}

// New creates a new [Executor].
func New(items, concurrency int, metrics Metrics) *Executor {
func New(items, concurrency int, maxDependencies int64, metrics Metrics) *Executor {
e := &Executor{
metrics: metrics,
nodes: make(map[string]*task, items*2), // TODO: tune this
executable: make(chan *task, items), // ensure we don't block while holding lock
metrics: metrics,
maxDependencies: maxDependencies,
nodes: make(map[string]*task, items*2), // TODO: tune this
executable: make(chan *task, items), // ensure we don't block while holding lock
}
e.workers.Add(concurrency)
for i := 0; i < concurrency; i++ {
Expand Down Expand Up @@ -159,7 +157,7 @@ func (e *Executor) Run(keys state.Keys, f func() error) {
// We can have more than 1 dependency per key (in the case that there
// are many readers for a single key), so we set this higher than we ever
// expect to see.
t.dependencies.Add(maxDependencies)
t.dependencies.Add(e.maxDependencies)

// Record dependencies
dependencies := set.NewSet[int](len(keys))
Expand Down Expand Up @@ -209,7 +207,7 @@ func (e *Executor) Run(keys state.Keys, f func() error) {
}

// Adjust dependency traker and execute if necessary
difference := maxDependencies - int64(dependencies.Len())
difference := e.maxDependencies - int64(dependencies.Len())
if t.dependencies.Add(-difference) > 0 {
if e.metrics != nil {
e.metrics.RecordBlocked()
Expand Down
42 changes: 22 additions & 20 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"github.com/ava-labs/hypersdk/state"
)

// Run several times to catch non-determinism
const numIterations = 10
const (
numIterations = 10 // Run several times to catch non-determinism
maxDependencies = 100_000_000
)

func generateNumbers(start int) []int {
array := make([]int, 9999)
Expand All @@ -32,7 +34,7 @@ func TestExecutorNoConflicts(t *testing.T) {
require = require.New(t)
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
canWait = make(chan struct{})
)

Expand Down Expand Up @@ -63,7 +65,7 @@ func TestExecutorNoConflictsSlow(t *testing.T) {
require = require.New(t)
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -97,7 +99,7 @@ func TestExecutorSimpleConflict(t *testing.T) {
conflictKey = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -134,7 +136,7 @@ func TestExecutorMultiConflict(t *testing.T) {
conflictKey2 = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow1 = make(chan struct{})
slow2 = make(chan struct{})
)
Expand Down Expand Up @@ -179,7 +181,7 @@ func TestEarlyExit(t *testing.T) {
require = require.New(t)
l sync.Mutex
completed = make([]int, 0, 500)
e = New(500, 4, nil)
e = New(500, 4, maxDependencies, nil)
terr = errors.New("uh oh")
)
for i := 0; i < 500; i++ {
Expand Down Expand Up @@ -207,7 +209,7 @@ func TestStop(t *testing.T) {
require = require.New(t)
l sync.Mutex
completed = make([]int, 0, 500)
e = New(500, 4, nil)
e = New(500, 4, maxDependencies, nil)
)
for i := 0; i < 500; i++ {
s := make(state.Keys, (i + 1))
Expand Down Expand Up @@ -238,7 +240,7 @@ func TestManyWrites(t *testing.T) {
l sync.Mutex
completed = make([]int, 0, 100)
answer = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -277,7 +279,7 @@ func TestManyReads(t *testing.T) {
conflictKey = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -317,7 +319,7 @@ func TestWriteThenRead(t *testing.T) {
conflictKey = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -361,7 +363,7 @@ func TestReadThenWrite(t *testing.T) {
conflictKey = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -407,7 +409,7 @@ func TestWriteThenReadRepeated(t *testing.T) {
conflictKey = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -450,7 +452,7 @@ func TestReadThenWriteRepeated(t *testing.T) {
conflictKey = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -494,7 +496,7 @@ func TestTwoConflictKeys(t *testing.T) {
conflictKey2 = ids.GenerateTestID().String()
l sync.Mutex
completed = make([]int, 0, 100)
e = New(100, 4, nil)
e = New(100, 4, maxDependencies, nil)
slow = make(chan struct{})
)
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -533,7 +535,7 @@ func TestLargeConcurrentRead(t *testing.T) {
numKeys = 10
numTxs = 100_000
completed = make([]int, 0, numTxs)
e = New(numTxs, 4, nil)
e = New(numTxs, 4, maxDependencies, nil)

numBlocking = 1000
blocking = set.Set[int]{}
Expand Down Expand Up @@ -608,7 +610,7 @@ func TestLargeSequentialWrites(t *testing.T) {
numKeys = 10
numTxs = 100_000
completed = make([]int, 0, numTxs)
e = New(numTxs, 4, nil)
e = New(numTxs, 4, maxDependencies, nil)

numBlocking = 1000
blocking = set.Set[int]{}
Expand Down Expand Up @@ -686,7 +688,7 @@ func TestLargeReadsThenWrites(t *testing.T) {
numTxs = 100_000
completed = make([]int, 0, numTxs)
answers = make([][]int, 5)
e = New(numTxs, 4, nil)
e = New(numTxs, 4, maxDependencies, nil)

numBlocking = 10000
blocking = set.Set[int]{}
Expand Down Expand Up @@ -764,7 +766,7 @@ func TestLargeWritesThenReads(t *testing.T) {
numTxs = 100_000
completed = make([]int, 0, numTxs)
answers = make([][]int, 5)
e = New(numTxs, 4, nil)
e = New(numTxs, 4, maxDependencies, nil)

numBlocking = 10000
blocking = set.Set[int]{}
Expand Down Expand Up @@ -840,7 +842,7 @@ func TestLargeRandomReadsAndWrites(t *testing.T) {
numKeys = 10
numTxs = 100000
completed = make([]int, 0, numTxs)
e = New(numTxs, 4, nil)
e = New(numTxs, 4, maxDependencies, nil)

conflictSize = 100
conflictKeys = make([]string, 0, conflictSize)
Expand Down