Skip to content

Commit

Permalink
eval: add notification method when set enabled called.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed May 30, 2022
1 parent cf3f426 commit 8a3df7f
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 9 deletions.
108 changes: 108 additions & 0 deletions helper/broker/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package broker

import (
"time"

"github.com/hashicorp/nomad/helper"
)

// GenericNotifier allows a process to send updates to many subscribers in an
// easy manner.
type GenericNotifier struct {

// publishCh is the channel used to receive the update which will be sent
// to all subscribers.
publishCh chan interface{}

// subscribeCh and unsubscribeCh are the channels used to modify the
// subscription membership mapping.
subscribeCh chan chan interface{}
unsubscribeCh chan chan interface{}
}

// NewGenericNotifier returns a generic notifier which can be used by a process
// to notify many subscribers when a specific update is triggered.
func NewGenericNotifier() *GenericNotifier {
return &GenericNotifier{
publishCh: make(chan interface{}, 1),
subscribeCh: make(chan chan interface{}, 1),
unsubscribeCh: make(chan chan interface{}, 1),
}
}

// Notify allows the implementer to notify all subscribers will a specific
// update. There is no guarantee the order in which subscribers receive the
// message which is sent linearly.
func (g *GenericNotifier) Notify(msg interface{}) {
select {
case g.publishCh <- msg:
default:
}
}

// Run is a long-lived process which handles updating subscribers as well as
// ensuring any update is sent to them. The passed stopCh is used to coordinate
// shutdown.
func (g *GenericNotifier) Run(stopCh <-chan struct{}) {

// Store our subscribers inline with a map. This map can only be accessed
// via a single channel update at a time, meaning we can manage with
// without using a lock.
subscribers := map[chan interface{}]struct{}{}

for {
select {
case <-stopCh:
return
case msgCh := <-g.subscribeCh:
subscribers[msgCh] = struct{}{}
case msgCh := <-g.unsubscribeCh:
delete(subscribers, msgCh)
case update := <-g.publishCh:
for subscriberCh := range subscribers {

// The subscribers channels are buffered, but ensure we don't
// block the whole process on this.
select {
case subscriberCh <- update:
default:
}
}
}
}
}

// WaitForChange allows a subscriber to wait until there is a notification
// change, or the timeout is reached. The function will block until one
// condition ie met.
func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {

// Create a channel and subscribe to any update. This channel is buffered
// to ensure we do not block the main broker process.
updateCh := make(chan interface{}, 1)
g.subscribeCh <- updateCh

// Create a timeout timer and use the helper to ensure this routine doesn't
// panic and making the stop call clear.
timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout)

// Defer a function which performs all the required cleanup of the
// subscriber once it has been notified of a change, or reached its wait
// timeout.
defer func() {
g.unsubscribeCh <- updateCh
close(updateCh)
timeoutStop()
}()

// Enter the main loop which listens for an update or timeout and returns
// this information to the subscriber.
for {
select {
case <-timeoutTimer.C:
return "wait timed out after " + timeout.String()
case update := <-updateCh:
return update
}
}
}
55 changes: 55 additions & 0 deletions helper/broker/notify_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package broker

import (
"sync"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
"github.com/stretchr/testify/require"
)

func TestGenericNotifier(t *testing.T) {
ci.Parallel(t)

// Create the new notifier.
stopChan := make(chan struct{})
defer close(stopChan)

notifier := NewGenericNotifier()
go notifier.Run(stopChan)

// Ensure we have buffered channels.
require.Equal(t, 1, cap(notifier.publishCh))
require.Equal(t, 1, cap(notifier.subscribeCh))
require.Equal(t, 1, cap(notifier.unsubscribeCh))

// Test that the timeout works.
var timeoutWG sync.WaitGroup

for i := 0; i < 6; i++ {
go func(wg *sync.WaitGroup) {
wg.Add(1)
msg := notifier.WaitForChange(100 * time.Millisecond)
require.Equal(t, "wait timed out after 100ms", msg)
wg.Done()
}(&timeoutWG)
}
timeoutWG.Wait()

// Test that all subscribers recieve an update when a single notification
// is sent.
var notifiedWG sync.WaitGroup

for i := 0; i < 6; i++ {
go func(wg *sync.WaitGroup) {
wg.Add(1)
msg := notifier.WaitForChange(3 * time.Second)
require.Equal(t, "we got an update and not a timeout", msg)
wg.Done()
}(&notifiedWG)
}

notifier.Notify("we got an update and not a timeout")
notifiedWG.Wait()
}
12 changes: 10 additions & 2 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/broker"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -48,8 +50,10 @@ type EvalBroker struct {
nackTimeout time.Duration
deliveryLimit int

enabled bool
stats *BrokerStats
enabled bool
enabledNotifier *broker.GenericNotifier

stats *BrokerStats

// evals tracks queued evaluations by ID to de-duplicate enqueue.
// The counter is the number of times we've attempted delivery,
Expand Down Expand Up @@ -131,6 +135,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
Expand Down Expand Up @@ -176,6 +181,9 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
if !enabled {
b.flush()
}

// Notify all subscribers to state changes of the broker enabled value.
b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled))
}

// Enqueue is used to enqueue a new evaluation
Expand Down
24 changes: 17 additions & 7 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,28 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
args.SchedulerVersion, scheduler.SchedulerVersion)
}

// If the eval broker is not enabled, but we are the leader, we have
// nothing to do. Not checking this will return errors to the workers which
// does nothing but clog up the logs.
if !e.srv.evalBroker.Enabled() {
return nil
}

// Ensure there is a default timeout
if args.Timeout <= 0 {
args.Timeout = DefaultDequeueTimeout
}

// If the eval broker is paused, attempt to block and wait for a state
// change before returning. This avoids a tight loop and mimics the
// behaviour where there are no evals to process.
//
// The call can return because either the timeout is reached or the broker
// SetEnabled function was called to modify its state. It is possible this
// is because of leadership transition, therefore the RPC should exit to
// allow all safety checks and RPC forwarding to occur again.
//
// The log line is trace, because the default worker timeout is 500ms which
// produces a large amount of logging.
if !e.srv.evalBroker.Enabled() {
message := e.srv.evalBroker.enabledNotifier.WaitForChange(args.Timeout)
e.logger.Trace("eval broker wait for un-pause", "message", message)
return nil
}

// Attempt the dequeue
eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
return nil, fmt.Errorf("failed to create volume watcher: %v", err)
}

// Start the eval broker notification system so any subscribers can get
// updates when the processes SetEnabled is triggered.
go s.evalBroker.enabledNotifier.Run(s.shutdownCh)

// Setup the node drainer.
s.setupNodeDrainer()

Expand Down

0 comments on commit 8a3df7f

Please sign in to comment.