Skip to content

Commit

Permalink
self review
Browse files Browse the repository at this point in the history
  • Loading branch information
wlawt committed Apr 4, 2024
1 parent 437a371 commit a6d133a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
9 changes: 6 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
)

// Executor sequences the concurrent execution of
// tasks with arbitrary keys on-the-fly.
// tasks with arbitrary conflicts on-the-fly.
//
// Executor ensures that conflicting tasks
// are executed in the order they were queued.
// Tasks with no keys are executed immediately.
// Tasks with no conflicts are executed immediately.
type Executor struct {
metrics Metrics

Expand Down Expand Up @@ -93,9 +93,11 @@ func (e *Executor) runTask(t *task) {

t.l.Lock()
for _, bt := range t.blocking {
// Reads would have zero dependencies
if bt.dependencies.Load() == 0 || bt.dependencies.Add(-1) > 0 {
continue
}
// This might happen in Read-after-Read, so don't enqueue again
if !bt.executed {
e.executable <- bt
}
Expand All @@ -112,7 +114,7 @@ func (e *Executor) runTask(t *task) {
func (e *Executor) Run(keys state.Keys, f func() error) {
e.outstanding.Add(1)

// Generate task
// Add task to map
id := len(e.tasks)
t := &task{
id: id,
Expand Down Expand Up @@ -163,6 +165,7 @@ func (e *Executor) Run(keys state.Keys, f func() error) {
e.update(id, k, v)
}

// Start execution if there are no blocking dependencies
if t.dependencies.Load() > 0 {
if e.metrics != nil {
e.metrics.RecordBlocked()
Expand Down
7 changes: 1 addition & 6 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package executor

import (
"errors"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -169,7 +168,7 @@ func TestEarlyExit(t *testing.T) {
})
}
require.ErrorIs(e.Wait(), terr) // no task running
require.True(len(completed) < 500)
require.Less(len(completed), 500)
}

func TestStop(t *testing.T) {
Expand Down Expand Up @@ -295,7 +294,6 @@ func TestWriteThenRead(t *testing.T) {
})
}
require.NoError(e.Wait())
fmt.Printf("completed %v\n", completed)
require.Equal(0, completed[0]) // Write first to execute
// 1..99 are ran in parallel, so non-deterministic
require.Len(completed, 100)
Expand Down Expand Up @@ -330,7 +328,6 @@ func TestReadThenWrite(t *testing.T) {
})
}
require.NoError(e.Wait())
fmt.Printf("completed %v\n", completed)
// 0..9 are ran in parallel, so non-deterministic
require.Equal(10, completed[10]) // First write to execute
// 11..99 are ran in parallel, so non-deterministic
Expand Down Expand Up @@ -369,7 +366,6 @@ func TestWriteThenReadRepeated(t *testing.T) {
})
}
require.NoError(e.Wait())
fmt.Printf("completed %v\n", completed)
require.Equal(0, completed[0]) // First write to execute
// 1..48 are ran in parallel, so non-deterministic
require.Equal(49, completed[49]) // Second write to execute
Expand Down Expand Up @@ -409,7 +405,6 @@ func TestReadThenWriteRepeated(t *testing.T) {
})
}
require.NoError(e.Wait())
fmt.Printf("completed %v\n", completed)
// 0..9 are ran in parallel, so non-deterministic
require.Equal(10, completed[10])
require.Equal(11, completed[11])
Expand Down

0 comments on commit a6d133a

Please sign in to comment.