Skip to content

Commit

Permalink
Refactor event.Blocker into job.Scheduler (#3125)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Jun 19, 2024
1 parent 2e72c7c commit e740b44
Show file tree
Hide file tree
Showing 9 changed files with 675 additions and 358 deletions.
43 changes: 14 additions & 29 deletions snow/engine/snowman/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,30 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/snow/engine/snowman/job"
)

var _ job.Job[ids.ID] = (*issuer)(nil)

// issuer issues [blk] into to consensus after its dependencies are met.
type issuer struct {
t *Transitive
nodeID ids.NodeID // nodeID of the peer that provided this block
blk snowman.Block
issuedMetric prometheus.Counter
abandoned bool
deps set.Set[ids.ID]
push bool
issuedMetric prometheus.Counter
}

func (i *issuer) Dependencies() set.Set[ids.ID] {
return i.deps
}

// Mark that a dependency has been met
func (i *issuer) Fulfill(ctx context.Context, id ids.ID) {
i.deps.Remove(id)
i.Update(ctx)
}

// Abandon the attempt to issue [i.block]
func (i *issuer) Abandon(ctx context.Context, _ ids.ID) {
if !i.abandoned {
blkID := i.blk.ID()
i.t.removeFromPending(i.blk)
i.t.addToNonVerifieds(i.blk)
i.t.blocked.Abandon(ctx, blkID)
func (i *issuer) Execute(ctx context.Context, _ []ids.ID, abandoned []ids.ID) error {
if len(abandoned) == 0 {
// If the parent block wasn't abandoned, this block can be issued.
return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric)
}
i.abandoned = true
}

func (i *issuer) Update(ctx context.Context) {
if i.abandoned || i.deps.Len() != 0 || i.t.errs.Errored() {
return
}
// Issue the block into consensus
i.t.errs.Add(i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric))
// If the parent block was abandoned, this block should be abandoned as
// well.
blkID := i.blk.ID()
i.t.removeFromPending(i.blk)
i.t.addToNonVerifieds(i.blk)
return i.t.blocked.Abandon(ctx, blkID)
}
109 changes: 109 additions & 0 deletions snow/engine/snowman/job/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

// Package job provides a Scheduler to manage and execute Jobs with
// dependencies.
package job

import "context"

// Job is a unit of work that can be executed based on the result of resolving
// requested dependencies.
type Job[T any] interface {
Execute(ctx context.Context, fulfilled []T, abandoned []T) error
}

type job[T comparable] struct {
// Once all dependencies are resolved, the job will be executed.
numUnresolved int
fulfilled []T
abandoned []T
job Job[T]
}

// Scheduler implements a dependency graph for jobs. Jobs can be registered with
// dependencies, and once all dependencies are resolved, the job will be
// executed.
type Scheduler[T comparable] struct {
// dependents maps a dependency to the jobs that depend on it.
dependents map[T][]*job[T]
}

func NewScheduler[T comparable]() *Scheduler[T] {
return &Scheduler[T]{
dependents: make(map[T][]*job[T]),
}
}

// Schedule a job to be executed once all of its dependencies are resolved. If a
// job is scheduled with no dependencies, it's executed immediately.
//
// In order to prevent a memory leak, all dependencies must eventually either be
// fulfilled or abandoned.
//
// While registering a job with duplicate dependencies is discouraged, it is
// allowed.
func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job[T], dependencies ...T) error {
numUnresolved := len(dependencies)
if numUnresolved == 0 {
return userJob.Execute(ctx, nil, nil)
}

j := &job[T]{
numUnresolved: numUnresolved,
job: userJob,
}
for _, d := range dependencies {
s.dependents[d] = append(s.dependents[d], j)
}
return nil
}

// NumDependencies returns the number of dependencies that jobs are currently
// blocking on.
func (s *Scheduler[_]) NumDependencies() int {
return len(s.dependents)
}

// Fulfill a dependency. If all dependencies for a job are resolved, the job
// will be executed.
//
// It is safe to call the scheduler during the execution of a job.
func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error {
return s.resolveDependency(ctx, dependency, true)
}

// Abandon a dependency. If all dependencies for a job are resolved, the job
// will be executed.
//
// It is safe to call the scheduler during the execution of a job.
func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error {
return s.resolveDependency(ctx, dependency, false)
}

func (s *Scheduler[T]) resolveDependency(
ctx context.Context,
dependency T,
fulfilled bool,
) error {
jobs := s.dependents[dependency]
delete(s.dependents, dependency)

for _, job := range jobs {
job.numUnresolved--
if fulfilled {
job.fulfilled = append(job.fulfilled, dependency)
} else {
job.abandoned = append(job.abandoned, dependency)
}

if job.numUnresolved > 0 {
continue
}

if err := job.job.Execute(ctx, job.fulfilled, job.abandoned); err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit e740b44

Please sign in to comment.