Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make number of scheduler workers reloadable #11593

Merged
merged 32 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1908187
Working POC
angrycub Nov 20, 2021
0071e55
Unexport setupNewWorkers; improve comments
angrycub Nov 23, 2021
763671a
Added some VSCode codetours
angrycub Nov 23, 2021
339316a
Update shutdown to use context
angrycub Nov 30, 2021
1a985b3
Apply suggestions from code review
angrycub Dec 1, 2021
22f93b7
Implement GET for SchedulerWorker API + tests
angrycub Dec 1, 2021
16f9dd4
Merge branch 'f-reload-num-schedulers' of github.com:hashicorp/nomad …
angrycub Dec 1, 2021
48428e7
Wired API, refactors, more testing
angrycub Dec 3, 2021
1258128
Merge branch 'main' into f-reload-num-schedulers
angrycub Dec 6, 2021
1845577
Fix linter complaints
angrycub Dec 6, 2021
9c4e5c4
Updating worker to cache EnabledScheduler list
angrycub Dec 6, 2021
0d8b7ec
Refactor `unsafe...` func names to `...Locked`
angrycub Dec 8, 2021
f5bb227
Passing enabled schedulers list to worker
angrycub Dec 10, 2021
292518b
Add note about scheduler death
angrycub Dec 10, 2021
1337f04
Worker API refactor
angrycub Dec 10, 2021
bd345e0
Made handler methods public for OpenAPI, remove unused test bool
angrycub Dec 10, 2021
31687cd
Implement SchedulerWorker status part 1
angrycub Dec 10, 2021
3739987
Fix broken Pause logic; split WorkloadWaiting status
angrycub Dec 11, 2021
7fe5949
Added scheduler info api
angrycub Dec 11, 2021
60d53fa
Added worker info api to api package
angrycub Dec 11, 2021
3d755aa
bugfixes
angrycub Dec 11, 2021
4ee6b8c
Adding stringer to build deps
angrycub Dec 13, 2021
71dab36
Changing route to /v1/agent/schedulers
angrycub Dec 20, 2021
1dc9f96
Adding docs for scheduler worker api
angrycub Dec 21, 2021
0417332
Adding API test for bad worker info
angrycub Dec 22, 2021
420a158
Add changelog message
angrycub Dec 23, 2021
fd016de
typo in changelog 🤦
angrycub Dec 23, 2021
167c6a3
Incorporate API code review feedback
angrycub Jan 3, 2022
f4f610b
Incorporate api-docs feedback
angrycub Jan 4, 2022
689fa77
Updates to worker/leader code from code review
angrycub Jan 4, 2022
982c397
Fix test response type
angrycub Jan 5, 2022
7581957
Set both statuses in markStopped so they are atomic
angrycub Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .tours/scheduler-worker---hot-reload.tour
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Hot Reload",
"steps": [
{
"file": "nomad/server.go",
"description": "## Server.Reload()\n\nServer configuration reloads start here.",
"line": 782,
"selection": {
"start": {
"line": 780,
"character": 4
},
"end": {
"line": 780,
"character": 10
}
}
},
{
"file": "nomad/server.go",
"description": "## Did NumSchedulers change?\nIf the number of schedulers has changed between the running configuration and the new one we need to adopt that change in realtime.",
"line": 812
},
{
"file": "nomad/server.go",
"description": "## Server.setupNewWorkers()\n\nsetupNewWorkers performs three tasks:\n\n- makes a copy of the existing worker pointers\n\n- creates a fresh array and loads a new set of workers into them\n\n- iterates through the \"old\" workers and shuts them down in individual\n goroutines for maximum parallelism",
"line": 1482,
"selection": {
"start": {
"line": 1480,
"character": 4
},
"end": {
"line": 1480,
"character": 12
}
}
},
{
"file": "nomad/server.go",
"description": "Once all of the work in setupNewWorkers is complete, we stop the old ones.",
"line": 1485
},
{
"file": "nomad/server.go",
"description": "The `stopOldWorkers` function iterates through the array of workers and calls their `Shutdown` method\nas a goroutine to prevent blocking.",
"line": 1505
},
{
"file": "nomad/worker.go",
"description": "The `Shutdown` method sets `w.stop` to true signaling that we intend for the `Worker` to stop the next time we consult it. We also manually unpause the `Worker` by setting w.paused to false and sending a `Broadcast()` via the cond.",
"line": 110
}
],
"ref": "f-reload-num-schedulers"
}
66 changes: 66 additions & 0 deletions .tours/scheduler-worker---pause.tour
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Pause",
"steps": [
{
"file": "nomad/leader.go",
"description": "## Server.establishLeadership()\n\nUpon becoming a leader, the server pauses a subset of the workers to allow for the additional burden of the leader's goroutines. The `handlePausableWorkers` function takes a boolean that states whether or not the current node is a leader or not. Because we are in `establishLeadership` we use `true` rather than calling `s.IsLeader()`",
"line": 233,
"selection": {
"start": {
"line": 233,
"character": 4
},
"end": {
"line": 233,
"character": 12
}
}
},
{
"file": "nomad/leader.go",
"description": "## Server.handlePausableWorkers()\n\nhandlePausableWorkers ranges over a slice of Workers and manipulates their paused state by calling their `SetPause` method.",
"line": 443,
"selection": {
"start": {
"line": 443,
"character": 18
},
"end": {
"line": 443,
"character": 26
}
}
},
{
"file": "nomad/leader.go",
"description": "## Server.pausableWorkers()\n\nThe pausableWorkers function provides a consistent slice of workers that the server can pause and unpause. Since the Worker array is never mutated, the same slice is returned by pausableWorkers on every invocation.\nThis comment is interesting/potentially confusing\n\n```golang\n // Disabling 3/4 of the workers frees CPU for raft and the\n\t// plan applier which uses 1/2 the cores.\n``` \n\nHowever, the key point is that it will return a slice containg 3/4th of the workers.",
"line": 1100,
"selection": {
"start": {
"line": 1104,
"character": 1
},
"end": {
"line": 1105,
"character": 43
}
}
},
{
"file": "nomad/worker.go",
"description": "## Worker.SetPause()\n\nThe `SetPause` function is used to signal an intention to pause the worker. Because the worker's work is happening in the `run()` goroutine, pauses happen asynchronously.",
"line": 91
},
{
"file": "nomad/worker.go",
"description": "## Worker.dequeueEvaluation()\n\nCalls checkPaused, which will be the function we wait in if the scheduler is set to be paused. \n\n> **NOTE:** This is called here rather than in run() because this function loops in case of an error fetching a evaluation.",
"line": 206
},
{
"file": "nomad/worker.go",
"description": "## Worker.checkPaused()\n\nWhen `w.paused` is `true`, we call the `Wait()` function on the condition. Execution of this goroutine will stop here until it receives a `Broadcast()` or a `Signal()`. At this point, the `Worker` is paused.",
"line": 104
}
]
}
51 changes: 51 additions & 0 deletions .tours/scheduler-worker---unpause.tour
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Unpause",
"steps": [
{
"file": "nomad/leader.go",
"description": "## revokeLeadership()\n\nAs a server transistions from leader to non-leader, the pausableWorkers are resumed since the other leader goroutines are stopped providing extra capacity.",
"line": 1040,
"selection": {
"start": {
"line": 1038,
"character": 10
},
"end": {
"line": 1038,
"character": 20
}
}
},
{
"file": "nomad/leader.go",
"description": "## handlePausableWorkers()\n\nThe handlePausableWorkers method is called with `false`. We fetch the pausableWorkers and call their SetPause method with `false`.\n",
"line": 443,
"selection": {
"start": {
"line": 443,
"character": 18
},
"end": {
"line": 443,
"character": 27
}
}
},
{
"file": "nomad/worker.go",
"description": "## Worker.SetPause()\n\nDuring unpause, p is false. We update w.paused in the mutex, and then call Broadcast on the cond. This wakes the goroutine sitting in the Wait() inside of `checkPaused()`",
"line": 91
},
{
"file": "nomad/worker.go",
"description": "## Worker.checkPaused()\n\nOnce the goroutine receives the `Broadcast()` message from `SetPause()`, execution continues here. Now that `w.paused == false`, we exit the loop and return to the caller (the `dequeueEvaluation()` function).",
"line": 104
},
{
"file": "nomad/worker.go",
"description": "## Worker.dequeueEvaluation\n\nWe return back into dequeueEvaluation after the call to checkPaused. At this point the worker will either stop (if that signal boolean has been set) or continue looping after returning to run().",
"line": 207
}
]
}
36 changes: 36 additions & 0 deletions .tours/scheduler-worker.tour
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Start",
"steps": [
{
"file": "nomad/server.go",
"description": "## Server.NewServer()\n\nScheduler workers are started as the agent starts the `server` go routines.",
"line": 402
},
{
"file": "nomad/server.go",
"description": "## Server.setupWorkers()\n\nThe `setupWorkers()` function validates that there are enabled Schedulers by type and count. It then creates s.config.NumSchedulers by calling `NewWorker()`\n\nThe `_core` scheduler _**must**_ be enabled. **TODO: why?**\n",
"line": 1443,
"selection": {
"start": {
"line": 1442,
"character": 4
},
"end": {
"line": 1442,
"character": 12
}
}
},
{
"file": "nomad/worker.go",
"description": "## Worker.NewWorker\n\nNewWorker creates the Worker and starts `run()` in a goroutine.",
"line": 78
},
{
"file": "nomad/worker.go",
"description": "## Worker.run()\n\nThe `run()` function runs in a loop until it's paused, it's stopped, or the server indicates that it is shutting down. All of the work the `Worker` performs should be\nimplemented in or called from here.\n",
"line": 152
}
]
}
14 changes: 8 additions & 6 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Disable workers to free half the cores for use in the plan queue and
// evaluation broker
for _, w := range s.pausableWorkers() {
w.SetPause(true)
}
s.handlePausableWorkers(true)

// Initialize and start the autopilot routine
s.getOrCreateAutopilotConfig()
Expand Down Expand Up @@ -442,6 +440,12 @@ ERR_WAIT:
}
}

func (s *Server) handlePausableWorkers(isLeader bool) {
for _, w := range s.pausableWorkers() {
w.SetPause(isLeader)
}
}

// diffNamespaces is used to perform a two-way diff between the local namespaces
// and the remote namespaces to determine which namespaces need to be deleted or
// updated.
Expand Down Expand Up @@ -1081,9 +1085,7 @@ func (s *Server) revokeLeadership() error {
}

// Unpause our worker if we paused previously
for _, w := range s.pausableWorkers() {
w.SetPause(false)
}
s.handlePausableWorkers(false)

return nil
}
Expand Down
57 changes: 51 additions & 6 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
}

// Initialize the scheduling workers
if err := s.setupWorkers(); err != nil {
if err := s.setupWorkers(s.shutdownCtx); err != nil {
s.Shutdown()
s.logger.Error("failed to start workers", "error", err)
return nil, fmt.Errorf("Failed to start workers: %v", err)
Expand Down Expand Up @@ -558,7 +558,7 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {

// Check if we can reload the RPC listener
if s.rpcListener == nil || s.rpcCancel == nil {
s.logger.Warn("unable to reload configuration due to uninitialized rpc listner")
s.logger.Warn("unable to reload configuration due to uninitialized rpc listener")
return fmt.Errorf("can't reload uninitialized RPC listener")
}

Expand Down Expand Up @@ -809,6 +809,15 @@ func (s *Server) Reload(newConfig *Config) error {
s.EnterpriseState.ReloadLicense(newConfig)
}

if newConfig.NumSchedulers != s.config.NumSchedulers {
s.logger.Debug("changing number of schedulers", "from", s.config.NumSchedulers, "to", newConfig.NumSchedulers)
s.config.NumSchedulers = newConfig.NumSchedulers
if err := s.setupNewWorkers(); err != nil {
s.logger.Error("error creating new workers", "error", err)
_ = multierror.Append(&mErr, err)
}
}

return mErr.ErrorOrNil()
}

Expand Down Expand Up @@ -1431,7 +1440,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
}

// setupWorkers is used to start the scheduling workers
func (s *Server) setupWorkers() error {
func (s *Server) setupWorkers(ctx context.Context) error {
// Check if all the schedulers are disabled
if len(s.config.EnabledSchedulers) == 0 || s.config.NumSchedulers == 0 {
s.logger.Warn("no enabled schedulers")
Expand All @@ -1453,19 +1462,55 @@ func (s *Server) setupWorkers() error {
if !foundCore {
return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore)
}

s.logger.Info("starting scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers)
// Start the workers
for i := 0; i < s.config.NumSchedulers; i++ {
if w, err := NewWorker(s); err != nil {
if w, err := NewWorker(ctx, s); err != nil {
return err
} else {
s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers)

s.workers = append(s.workers, w)
}
}
s.logger.Info("starting scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers)
s.logger.Info("started scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers)
return nil
}

// setupNewWorkers() is used to start a new set of workers after a configuration
// change and a hot reload.
func (s *Server) setupNewWorkers() error {
// make a copy of the s.workers array so we can safely stop those goroutines asynchronously
oldWorkers := make([]*Worker, len(s.workers))
defer s.stopOldWorkers(oldWorkers)
for i, w := range s.workers {
oldWorkers[i] = w
}
s.logger.Info(fmt.Sprintf("marking %v current schedulers for shutdown", len(oldWorkers)))

// build a clean backing array and call setupWorkers like in the normal startup path
s.workers = make([]*Worker, 0, s.config.NumSchedulers)
err := s.setupWorkers(s.shutdownCtx)
if err != nil {
return err
}

// if we're the leader, we need to pause all of the pausable workers.
s.handlePausableWorkers(s.IsLeader())

return nil
}
angrycub marked this conversation as resolved.
Show resolved Hide resolved

// stopOldWorkers is called once setupNewWorkers has created the new worker
// array to asynchronously stop each of the old workers individually.
func (s *Server) stopOldWorkers(oldWorkers []*Worker) {
workerCount := len(oldWorkers)
for i, w := range oldWorkers {
s.logger.Debug("stopping old scheduling worker", "id", w.ID(), "index", i+1, "of", workerCount)
tgross marked this conversation as resolved.
Show resolved Hide resolved
go w.Shutdown()
}
}

// numPeers is used to check on the number of known peers, including the local
// node.
func (s *Server) numPeers() (int, error) {
Expand Down
23 changes: 21 additions & 2 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,13 @@ func TestServer_InvalidSchedulers(t *testing.T) {
}

config.EnabledSchedulers = []string{"batch"}
err := s.setupWorkers()
err := s.setupWorkers(s.shutdownCtx)
require.NotNil(err)
require.Contains(err.Error(), "scheduler not enabled")

// Set the config to have an unknown scheduler
config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"}
err = s.setupWorkers()
err = s.setupWorkers(s.shutdownCtx)
require.NotNil(err)
require.Contains(err.Error(), "foo")
}
Expand Down Expand Up @@ -577,3 +577,22 @@ func TestServer_RPCNameAndRegionValidation(t *testing.T) {
tc.name, tc.region, tc.expected)
}
}

func TestServer_Reload_NumSchedulers(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 8
c.Region = "global"
})
defer cleanupS1()

require.Equal(t, s1.config.NumSchedulers, len(s1.workers))

config := DefaultConfig()
config.NumSchedulers = 4
require.NoError(t, s1.Reload(config))

time.Sleep(1 * time.Second)
require.Equal(t, config.NumSchedulers, len(s1.workers))
}
Loading