diff --git a/snow/engine/snowman/event/queue.go b/snow/engine/snowman/event/queue.go index 34818c317f4..86850d350a2 100644 --- a/snow/engine/snowman/event/queue.go +++ b/snow/engine/snowman/event/queue.go @@ -5,7 +5,6 @@ package event import ( "context" - "math" "github.com/ava-labs/avalanchego/utils/set" ) @@ -18,6 +17,8 @@ type Job interface { type job[T comparable] struct { // If empty, the job is ready to be executed. dependencies set.Set[T] + // If true, the job should be cancelled. + shouldCancel bool // If nil, the job has already been executed or cancelled. job Job } @@ -66,28 +67,22 @@ func (q *Queue[_]) NumDependencies() int { // // It is safe to call the queue during the execution of a job. func (q *Queue[T]) Fulfill(ctx context.Context, dependency T) error { - return q.resolveDependency(ctx, dependency, 0, Job.Execute) + return q.resolveDependency(ctx, dependency, false) } // Abandon a dependency. If any dependencies for a job are abandoned, the job -// will be cancelled. +// will be cancelled. The job will only be cancelled once all dependencies are +// resolved. // // It is safe to call the queue during the cancelling of a job. func (q *Queue[T]) Abandon(ctx context.Context, dependency T) error { - return q.resolveDependency(ctx, dependency, math.MaxInt, Job.Cancel) + return q.resolveDependency(ctx, dependency, true) } -// resolveDependency the provided dependency and execute the operation on all of -// the unexecuted jobs that have no more than [minDependencies]. -// -// For example, if [minDependencies] is 0, only jobs that have no more -// outstanding dependencies will be executed. If [minDependencies] is MaxInt, -// all jobs will be executed. func (q *Queue[T]) resolveDependency( ctx context.Context, dependency T, - minDependencies int, - operation func(Job, context.Context) error, + shouldCancel bool, ) error { jobs := q.jobs[dependency] delete(q.jobs, dependency) @@ -96,9 +91,10 @@ func (q *Queue[T]) resolveDependency( // Removing the dependency keeps the queue in a consistent state. // However, it isn't strictly needed. job.dependencies.Remove(dependency) + job.shouldCancel = shouldCancel || job.shouldCancel userJob := job.job - if userJob == nil || job.dependencies.Len() > minDependencies { + if userJob == nil || job.dependencies.Len() != 0 { continue } @@ -106,7 +102,13 @@ func (q *Queue[T]) resolveDependency( // with this job again. job.job = nil - if err := operation(userJob, ctx); err != nil { + var err error + if job.shouldCancel { + err = userJob.Cancel(ctx) + } else { + err = userJob.Execute(ctx) + } + if err != nil { return err } } diff --git a/snow/engine/snowman/event/queue_test.go b/snow/engine/snowman/event/queue_test.go index a5fbd451503..9d8d09fc345 100644 --- a/snow/engine/snowman/event/queue_test.go +++ b/snow/engine/snowman/event/queue_test.go @@ -31,9 +31,16 @@ func (j *testJob) Cancel(ctx context.Context) error { return j.cancel(ctx) } -func newQueueWithJob[T comparable](t *testing.T, job Job, dependencies ...T) *Queue[T] { +func newQueueWithJob[T comparable](t *testing.T, job Job, shouldCancel bool, dependencies ...T) *Queue[T] { q := NewQueue[T]() require.NoError(t, q.Register(context.Background(), job, dependencies...)) + if shouldCancel { + for _, dependency := range dependencies { + for _, j := range q.jobs[dependency] { + j.shouldCancel = true + } + } + } return q } @@ -183,25 +190,25 @@ func TestQueue_Fulfill(t *testing.T) { }, { name: "single dependency", - queue: newQueueWithJob(t, userJob, 1), + queue: newQueueWithJob(t, userJob, false, 1), shouldExecute: true, expectedQueue: NewQueue[int](), }, { name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, 2), + queue: newQueueWithJob(t, userJob, false, 2), shouldExecute: false, - expectedQueue: newQueueWithJob(t, userJob, 2), + expectedQueue: newQueueWithJob(t, userJob, false, 2), }, { name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, 1, 2), + queue: newQueueWithJob(t, userJob, false, 1, 2), shouldExecute: false, - expectedQueue: newQueueWithJob(t, userJob, 2), + expectedQueue: newQueueWithJob(t, userJob, false, 2), }, { name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, 1, 1), + queue: newQueueWithJob(t, userJob, false, 1, 1), shouldExecute: true, expectedQueue: NewQueue[int](), }, @@ -221,16 +228,16 @@ func TestQueue_Fulfill(t *testing.T) { } func TestQueue_Abandon(t *testing.T) { - var calledAbandon bool + var calledCancel bool userJob := &testJob{ execute: func(context.Context) error { return errUnexpectedInvocation }, cancel: func(context.Context) error { - if calledAbandon { + if calledCancel { return errDuplicateInvocation } - calledAbandon = true + calledCancel = true return nil }, } @@ -249,25 +256,25 @@ func TestQueue_Abandon(t *testing.T) { }, { name: "single dependency", - queue: newQueueWithJob(t, userJob, 1), + queue: newQueueWithJob(t, userJob, false, 1), shouldCancel: true, expectedQueue: NewQueue[int](), }, { name: "non-existent dependency", - queue: newQueueWithJob(t, userJob, 2), + queue: newQueueWithJob(t, userJob, false, 2), shouldCancel: false, - expectedQueue: newQueueWithJob(t, userJob, 2), + expectedQueue: newQueueWithJob(t, userJob, false, 2), }, { name: "incomplete dependencies", - queue: newQueueWithJob(t, userJob, 1, 2), - shouldCancel: true, - expectedQueue: newQueueWithJob(t, nil, 2), + queue: newQueueWithJob(t, userJob, false, 1, 2), + shouldCancel: false, + expectedQueue: newQueueWithJob(t, userJob, true, 2), }, { name: "duplicate dependency", - queue: newQueueWithJob(t, userJob, 1, 1), + queue: newQueueWithJob(t, userJob, false, 1, 1), shouldCancel: true, expectedQueue: NewQueue[int](), }, @@ -277,10 +284,10 @@ func TestQueue_Abandon(t *testing.T) { require := require.New(t) // Reset the variable between tests - calledAbandon = false + calledCancel = false require.NoError(test.queue.Abandon(context.Background(), 1)) - require.Equal(test.shouldCancel, calledAbandon) + require.Equal(test.shouldCancel, calledCancel) require.Equal(test.expectedQueue, test.queue) }) }