Skip to content

Commit

Permalink
change cancelling symantics
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Jun 18, 2024
1 parent 2d4472f commit e99eee0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 33 deletions.
30 changes: 16 additions & 14 deletions snow/engine/snowman/event/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package event

import (
"context"
"math"

"github.com/ava-labs/avalanchego/utils/set"
)
Expand All @@ -18,6 +17,8 @@ type Job interface {
type job[T comparable] struct {
// If empty, the job is ready to be executed.
dependencies set.Set[T]
// If true, the job should be cancelled.
shouldCancel bool
// If nil, the job has already been executed or cancelled.
job Job
}
Expand Down Expand Up @@ -66,28 +67,22 @@ func (q *Queue[_]) NumDependencies() int {
//
// It is safe to call the queue during the execution of a job.
func (q *Queue[T]) Fulfill(ctx context.Context, dependency T) error {
return q.resolveDependency(ctx, dependency, 0, Job.Execute)
return q.resolveDependency(ctx, dependency, false)
}

// Abandon a dependency. If any dependencies for a job are abandoned, the job
// will be cancelled.
// will be cancelled. The job will only be cancelled once all dependencies are
// resolved.
//
// It is safe to call the queue during the cancelling of a job.
func (q *Queue[T]) Abandon(ctx context.Context, dependency T) error {
return q.resolveDependency(ctx, dependency, math.MaxInt, Job.Cancel)
return q.resolveDependency(ctx, dependency, true)
}

// resolveDependency the provided dependency and execute the operation on all of
// the unexecuted jobs that have no more than [minDependencies].
//
// For example, if [minDependencies] is 0, only jobs that have no more
// outstanding dependencies will be executed. If [minDependencies] is MaxInt,
// all jobs will be executed.
func (q *Queue[T]) resolveDependency(
ctx context.Context,
dependency T,
minDependencies int,
operation func(Job, context.Context) error,
shouldCancel bool,
) error {
jobs := q.jobs[dependency]
delete(q.jobs, dependency)
Expand All @@ -96,17 +91,24 @@ func (q *Queue[T]) resolveDependency(
// Removing the dependency keeps the queue in a consistent state.
// However, it isn't strictly needed.
job.dependencies.Remove(dependency)
job.shouldCancel = shouldCancel || job.shouldCancel

userJob := job.job
if userJob == nil || job.dependencies.Len() > minDependencies {
if userJob == nil || job.dependencies.Len() != 0 {
continue
}

// Mark the job as cancelled so that any reentrant calls do not interact
// with this job again.
job.job = nil

if err := operation(userJob, ctx); err != nil {
var err error
if job.shouldCancel {
err = userJob.Cancel(ctx)
} else {
err = userJob.Execute(ctx)
}
if err != nil {
return err
}
}
Expand Down
45 changes: 26 additions & 19 deletions snow/engine/snowman/event/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ func (j *testJob) Cancel(ctx context.Context) error {
return j.cancel(ctx)
}

func newQueueWithJob[T comparable](t *testing.T, job Job, dependencies ...T) *Queue[T] {
func newQueueWithJob[T comparable](t *testing.T, job Job, shouldCancel bool, dependencies ...T) *Queue[T] {
q := NewQueue[T]()
require.NoError(t, q.Register(context.Background(), job, dependencies...))
if shouldCancel {
for _, dependency := range dependencies {
for _, j := range q.jobs[dependency] {
j.shouldCancel = true
}
}
}
return q
}

Expand Down Expand Up @@ -183,25 +190,25 @@ func TestQueue_Fulfill(t *testing.T) {
},
{
name: "single dependency",
queue: newQueueWithJob(t, userJob, 1),
queue: newQueueWithJob(t, userJob, false, 1),
shouldExecute: true,
expectedQueue: NewQueue[int](),
},
{
name: "non-existent dependency",
queue: newQueueWithJob(t, userJob, 2),
queue: newQueueWithJob(t, userJob, false, 2),
shouldExecute: false,
expectedQueue: newQueueWithJob(t, userJob, 2),
expectedQueue: newQueueWithJob(t, userJob, false, 2),
},
{
name: "incomplete dependencies",
queue: newQueueWithJob(t, userJob, 1, 2),
queue: newQueueWithJob(t, userJob, false, 1, 2),
shouldExecute: false,
expectedQueue: newQueueWithJob(t, userJob, 2),
expectedQueue: newQueueWithJob(t, userJob, false, 2),
},
{
name: "duplicate dependency",
queue: newQueueWithJob(t, userJob, 1, 1),
queue: newQueueWithJob(t, userJob, false, 1, 1),
shouldExecute: true,
expectedQueue: NewQueue[int](),
},
Expand All @@ -221,16 +228,16 @@ func TestQueue_Fulfill(t *testing.T) {
}

func TestQueue_Abandon(t *testing.T) {
var calledAbandon bool
var calledCancel bool
userJob := &testJob{
execute: func(context.Context) error {
return errUnexpectedInvocation
},
cancel: func(context.Context) error {
if calledAbandon {
if calledCancel {
return errDuplicateInvocation
}
calledAbandon = true
calledCancel = true
return nil
},
}
Expand All @@ -249,25 +256,25 @@ func TestQueue_Abandon(t *testing.T) {
},
{
name: "single dependency",
queue: newQueueWithJob(t, userJob, 1),
queue: newQueueWithJob(t, userJob, false, 1),
shouldCancel: true,
expectedQueue: NewQueue[int](),
},
{
name: "non-existent dependency",
queue: newQueueWithJob(t, userJob, 2),
queue: newQueueWithJob(t, userJob, false, 2),
shouldCancel: false,
expectedQueue: newQueueWithJob(t, userJob, 2),
expectedQueue: newQueueWithJob(t, userJob, false, 2),
},
{
name: "incomplete dependencies",
queue: newQueueWithJob(t, userJob, 1, 2),
shouldCancel: true,
expectedQueue: newQueueWithJob(t, nil, 2),
queue: newQueueWithJob(t, userJob, false, 1, 2),
shouldCancel: false,
expectedQueue: newQueueWithJob(t, userJob, true, 2),
},
{
name: "duplicate dependency",
queue: newQueueWithJob(t, userJob, 1, 1),
queue: newQueueWithJob(t, userJob, false, 1, 1),
shouldCancel: true,
expectedQueue: NewQueue[int](),
},
Expand All @@ -277,10 +284,10 @@ func TestQueue_Abandon(t *testing.T) {
require := require.New(t)

// Reset the variable between tests
calledAbandon = false
calledCancel = false

require.NoError(test.queue.Abandon(context.Background(), 1))
require.Equal(test.shouldCancel, calledAbandon)
require.Equal(test.shouldCancel, calledCancel)
require.Equal(test.expectedQueue, test.queue)
})
}
Expand Down

0 comments on commit e99eee0

Please sign in to comment.