Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Cancel from the Job interface #3127

Merged
merged 5 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions snow/engine/snowman/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/snowman/job"
)

var _ job.Job = (*issuer)(nil)
var _ job.Job[ids.ID] = (*issuer)(nil)

// issuer issues [blk] into to consensus after its dependencies are met.
type issuer struct {
Expand All @@ -24,11 +24,14 @@ type issuer struct {
issuedMetric prometheus.Counter
}

func (i *issuer) Execute(ctx context.Context) error {
return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric)
}
func (i *issuer) Execute(ctx context.Context, _ []ids.ID, abandoned []ids.ID) error {
if len(abandoned) == 0 {
// If the parent block wasn't abandoned, this block can be issued.
return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric)
}

func (i *issuer) Cancel(ctx context.Context) error {
// If the parent block was abandoned, this block should be abandoned as
// well.
blkID := i.blk.ID()
i.t.removeFromPending(i.blk)
i.t.addToNonVerifieds(i.blk)
Expand Down
84 changes: 35 additions & 49 deletions snow/engine/snowman/job/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,25 @@
// dependencies.
package job

import (
"context"
import "context"

"github.com/ava-labs/avalanchego/utils/set"
)

// Job is a unit of work that can be executed or cancelled.
type Job interface {
Execute(context.Context) error
Cancel(context.Context) error
// Job is a unit of work that can be executed based on the result of resolving
// requested dependencies.
type Job[T any] interface {
marun marked this conversation as resolved.
Show resolved Hide resolved
Execute(ctx context.Context, fulfilled []T, abandoned []T) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was the price for getting rid of Cancel - needing to provide more context to Execute in the form of the fulfilled/abandoned slices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. I think this makes it implicitly clear that every dependency is going to be resolved before execution. Whereas before the reliance on no short-circuiting for Cancel wasn't clear

}

type job[T comparable] struct {
// If empty, the job is ready to be executed.
dependencies set.Set[T]
// If true, the job should be cancelled.
shouldCancel bool
// If nil, the job has already been executed or cancelled.
job Job
// Once all dependencies are resolved, the job will be executed.
numUnresolved int
fulfilled []T
abandoned []T
job Job[T]
}

// Scheduler implements a dependency graph for jobs. Jobs can be registered with
// dependencies, and once all dependencies are fulfilled, the job will be
// executed. If any of the dependencies are abandoned, the job will be
// cancelled.
// dependencies, and once all dependencies are resolved, the job will be
// executed.
type Scheduler[T comparable] struct {
// dependents maps a dependency to the jobs that depend on it.
dependents map[T][]*job[T]
Expand All @@ -41,22 +35,22 @@ func NewScheduler[T comparable]() *Scheduler[T] {
}
}

// Schedule a job to be executed once all of its dependencies are fulfilled.
// Schedule a job to be executed once all of its dependencies are resolved.
//
// In order to prevent a memory leak, all dependencies must eventually either be
// fulfilled or abandoned.
//
// While registering a job with duplicate dependencies is discouraged, it is
// allowed and treated similarly to registering the job with the dependencies
// de-duplicated.
func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job, dependencies ...T) error {
if len(dependencies) == 0 {
return userJob.Execute(ctx)
// allowed.
func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job[T], dependencies ...T) error {
numUnresolved := len(dependencies)
if numUnresolved == 0 {
return userJob.Execute(ctx, nil, nil)
}

j := &job[T]{
dependencies: set.Of(dependencies...),
job: userJob,
numUnresolved: numUnresolved,
job: userJob,
}
for _, d := range dependencies {
s.dependents[d] = append(s.dependents[d], j)
Expand All @@ -70,51 +64,43 @@ func (s *Scheduler[_]) NumDependencies() int {
return len(s.dependents)
}

// Fulfill a dependency. If all dependencies for a job are fulfilled, the job
// Fulfill a dependency. If all dependencies for a job are resolved, the job
// will be executed.
//
// It is safe to call the scheduler during the execution of a job.
func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error {
return s.resolveDependency(ctx, dependency, false)
return s.resolveDependency(ctx, dependency, true)
}

// Abandon a dependency. If any dependencies for a job are abandoned, the job
// will be cancelled. The job will only be cancelled once all dependencies are
// resolved.
// Abandon a dependency. If all dependencies for a job are resolved, the job
// will be executed.
//
// It is safe to call the scheduler during the cancelling of a job.
// It is safe to call the scheduler during the execution of a job.
func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error {
return s.resolveDependency(ctx, dependency, true)
return s.resolveDependency(ctx, dependency, false)
}

func (s *Scheduler[T]) resolveDependency(
ctx context.Context,
dependency T,
shouldCancel bool,
fulfilled bool,
) error {
jobs := s.dependents[dependency]
delete(s.dependents, dependency)

for _, job := range jobs {
job.dependencies.Remove(dependency)
job.shouldCancel = shouldCancel || job.shouldCancel
job.numUnresolved--
if fulfilled {
job.fulfilled = append(job.fulfilled, dependency)
} else {
job.abandoned = append(job.abandoned, dependency)
}

userJob := job.job
if userJob == nil || job.dependencies.Len() != 0 {
if job.numUnresolved > 0 {
continue
}

// Mark the job as handled so that any reentrant calls do not interact
// with this job again.
job.job = nil

var err error
if job.shouldCancel {
err = userJob.Cancel(ctx)
} else {
err = userJob.Execute(ctx)
}
if err != nil {
if err := job.job.Execute(ctx, job.fulfilled, job.abandoned); err != nil {
return err
}
}
Expand Down
Loading
Loading