Skip to content

Commit

Permalink
use wake-up channel instead of waiting for ack
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Nov 16, 2022
1 parent 5b7c8f2 commit 63e7389
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 44 deletions.
32 changes: 0 additions & 32 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,35 +770,3 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest,
}}
return e.srv.blockingRPC(&opts)
}

// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval
// broker and updates their status to canceled.
func cancelCancelableEvals(srv *Server) error {

const cancelDesc = "canceled after more recent eval was processed"

// We *can* send larger raft logs but rough benchmarks show that a smaller
// page size strikes a balance between throughput and time we block the FSM
// apply for other operations
cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10)
if len(cancelable) > 0 {
for i, eval := range cancelable {
eval = eval.Copy()
eval.Status = structs.EvalStatusCancelled
eval.StatusDescription = cancelDesc
eval.UpdateModifyTime()
cancelable[i] = eval
}

update := &structs.EvalUpdateRequest{
Evals: cancelable,
WriteRequest: structs.WriteRequest{Region: srv.Region()},
}
_, _, err := srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
srv.logger.Warn("eval cancel failed", "error", err, "method", "ack")
return err
}
}
return nil
}
66 changes: 54 additions & 12 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
go s.reapDupBlockedEvaluations(stopCh)

// Reap any cancelable evaluations
go s.reapCancelableEvaluations(stopCh)
s.reapCancelableEvalsCh = s.reapCancelableEvaluations(stopCh)

// Periodically unblock failed allocations
go s.periodicUnblockFailedEvals(stopCh)
Expand Down Expand Up @@ -998,19 +998,61 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) {
// reapCancelableEvaluations is used to reap evaluations that were marked
// cancelable by the eval broker and should be canceled. These get swept up
// whenever an eval Acks, but this ensures that we don't have a straggling batch
// when the cluster doesn't have any more work to do
func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) {
timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval)
defer cancel()
for {
select {
case <-stopCh:
return
case <-timer.C:
cancelCancelableEvals(s)
timer.Reset(s.config.EvalReapCancelableInterval)
// when the cluster doesn't have any more work to do. Returns a wake-up channel
// that can be used to trigger a new reap without waiting for the timer
func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} {

wakeCh := make(chan struct{}, 1)
go func() {

timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval)
defer cancel()
for {
select {
case <-stopCh:
return
case <-wakeCh:
cancelCancelableEvals(s)
case <-timer.C:
cancelCancelableEvals(s)
timer.Reset(s.config.EvalReapCancelableInterval)
}
}
}()

return wakeCh
}

// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval
// broker and updates their status to canceled.
func cancelCancelableEvals(srv *Server) error {

const cancelDesc = "canceled after more recent eval was processed"

// We *can* send larger raft logs but rough benchmarks show that a smaller
// page size strikes a balance between throughput and time we block the FSM
// apply for other operations
cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10)
if len(cancelable) > 0 {
for i, eval := range cancelable {
eval = eval.Copy()
eval.Status = structs.EvalStatusCancelled
eval.StatusDescription = cancelDesc
eval.UpdateModifyTime()
cancelable[i] = eval
}

update := &structs.EvalUpdateRequest{
Evals: cancelable,
WriteRequest: structs.WriteRequest{Region: srv.Region()},
}
_, _, err := srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
srv.logger.Warn("eval cancel failed", "error", err, "method", "ack")
return err
}
}
return nil
}

// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations.
Expand Down
4 changes: 4 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type Server struct {
// transitions to collide and create inconsistent state.
brokerLock sync.Mutex

// reapCancelableEvalsCh is used to signal the cancelable evals reaper to wake up
reapCancelableEvalsCh chan struct{}

// deploymentWatcher is used to watch deployments and their allocations and
// make the required calls to continue to transition the deployment.
deploymentWatcher *deploymentwatcher.Watcher
Expand Down Expand Up @@ -362,6 +365,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
reapCancelableEvalsCh: make(chan struct{}),
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
Expand Down

0 comments on commit 63e7389

Please sign in to comment.