Skip to content

Commit

Permalink
[executor] Introduce on-the-fly scheduling for Read/Allocate/Write ke…
Browse files Browse the repository at this point in the history
…ys (#814)

* [executor] refactor worker func

* [executor] renaming

* [executor] use blocking/dependency type from programmatic-deploy fork

* [executor] add key type support

* wip

* wip

* switch statements

* programmatic deploy fork + concurrent reads work

* sequential writes work

* w->r->r works

* r->r->w works

* w->r->r...w->r->r works

* r->r->w...r->r->w works

* all unit tests pass

* cleanup

* coverage 97%

* cleanup switch

* switch comments

* self review

* go mod tidy

* add done prints to tests

* rename isAllocateWrite

* remove id from task

* use dummy dep to keep track of all keys

* simplify for loop in adding bt dep

* fix integration bug and add unit test for it

* fix race condition with write-after-read(s) potentially

* run unit tests over multiple iterations

* cut down on unit testing time and add comments

* simplify num times we call add blocking if not exec

* new way to keep track of concurrent Reads

* self review

* have unit tests run under max time

* [executor] Simplify Logic + Speed Up Tests (#831)

* first pass

* need to set higher base dependencies

* remove extra logging from tests

* use right var

* tests passing

* cleanup task loop

* make tests faster

* ensure arr order matches

* add more comments

* tests are passing

* ensure we actually stop building early

* add comment about failure case

* add comment about deadlock

* better var names

* add larger unit tests

* ignore lint for rand

* add unique and conflicting keys randomly to txs

* fix for loops

* use max function

* make conflict keys 100 and pick 1-5

* make num slow chan consistent

* use set.Contains to speed up tests

* random perm for each unique key

* group var names

* use numTxs in generating blocking txs

* increase num conflict keys for concurrent Reads and Writes test

* [executor] multi-key conflict bug (#837)

* random perm per conflict key

* fix dont block on ourself and edit TestTwoConflictKeys to track this

* keep reading/readers relationship

* make reading a map

* comments

* clarify use of map for reading

* simplify rng for conflict and unique perm

* random perm per conflict key

* make maxDep a param

* add maxDep as const in chain

* placement of maxDep comment

---------

Co-authored-by: Patrick O'Grady <prohb125@gmail.com>
  • Loading branch information
wlawt and patrick-ogrady authored Apr 17, 2024
1 parent a0b6584 commit 203e78b
Show file tree
Hide file tree
Showing 13 changed files with 946 additions and 145 deletions.
9 changes: 7 additions & 2 deletions chain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func HandlePreExecute(log logging.Logger, err error) bool {
}
}

// Note: This code is terrible and will be removed during the Vryx integration.
func BuildBlock(
ctx context.Context,
vm VM,
Expand Down Expand Up @@ -135,12 +136,15 @@ func BuildBlock(
// prepareStreamLock ensures we don't overwrite stream prefetching spawned
// asynchronously.
prepareStreamLock sync.Mutex

// stop is used to trigger that we should stop building, assuming we are no longer executing
stop bool
)

// Batch fetch items from mempool to unblock incoming RPC/Gossip traffic
mempool.StartStreaming(ctx)
b.Txs = []*Transaction{}
for time.Since(start) < vm.GetTargetBuildDuration() {
for time.Since(start) < vm.GetTargetBuildDuration() && !stop {
prepareStreamLock.Lock()
txs := mempool.Stream(ctx, streamBatch)
prepareStreamLock.Unlock()
Expand All @@ -157,7 +161,7 @@ func BuildBlock(
break
}

e := executor.New(streamBatch, vm.GetTransactionExecutionCores(), vm.GetExecutorBuildRecorder())
e := executor.New(streamBatch, vm.GetTransactionExecutionCores(), MaxKeyDependencies, vm.GetExecutorBuildRecorder())
pending := make(map[ids.ID]*Transaction, streamBatch)
var pendingLock sync.Mutex
for li, ltx := range txs {
Expand Down Expand Up @@ -372,6 +376,7 @@ func BuildBlock(
// stop building. This prevents a full mempool iteration looking for the
// "perfect fit".
if feeManager.LastConsumed(dimension) >= targetUnits[dimension] {
stop = true
return errBlockFull
}
}
Expand Down
4 changes: 4 additions & 0 deletions chain/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
HeightKeyChunks = 1
TimestampKeyChunks = 1
FeeKeyChunks = 8 // 96 (per dimension) * 5 (num dimensions)

// MaxKeyDependencies must be greater than the maximum number of key dependencies
// any single task could have when executing a task.
MaxKeyDependencies = 100_000_000
)

func HeightKey(prefix []byte) []byte {
Expand Down
2 changes: 1 addition & 1 deletion chain/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (b *StatelessBlock) Execute(
t = b.GetTimestamp()

f = fetcher.New(im, numTxs, b.vm.GetStateFetchConcurrency())
e = executor.New(numTxs, b.vm.GetTransactionExecutionCores(), b.vm.GetExecutorVerifyRecorder())
e = executor.New(numTxs, b.vm.GetTransactionExecutionCores(), MaxKeyDependencies, b.vm.GetExecutorVerifyRecorder())
ts = tstate.New(numTxs * 2) // TODO: tune this heuristic
results = make([]*Result, numTxs)
)
Expand Down
1 change: 1 addition & 0 deletions examples/morpheusvm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ require (
go.opentelemetry.io/otel/sdk v1.11.2 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/morpheusvm/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06O
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
Expand Down
1 change: 1 addition & 0 deletions examples/tokenvm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ require (
go.opentelemetry.io/otel/sdk v1.11.2 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/tokenvm/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06O
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
Expand Down
Loading

0 comments on commit 203e78b

Please sign in to comment.