Skip to content

Commit

Permalink
core: allow pausing and un-pausing of leader broker routine (#13045)
Browse files Browse the repository at this point in the history
* core: allow pause/un-pause of eval broker on region leader.

* agent: add ability to pause eval broker via scheduler config.

* cli: add operator scheduler commands to interact with config.

* api: add ability to pause eval broker via scheduler config

* e2e: add operator scheduler test for eval broker pause.

* docs: include new opertor scheduler CLI and pause eval API info.
  • Loading branch information
jrasell committed Jul 6, 2022
1 parent b9e084a commit 24220d0
Show file tree
Hide file tree
Showing 32 changed files with 1,388 additions and 75 deletions.
7 changes: 7 additions & 0 deletions .changelog/13045.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
cli: Added `scheduler get-config` and `scheduler set-config` commands to the operator CLI
```

```release-note:improvement
core: Added the ability to pause and un-pause the eval broker and blocked eval broker
```
4 changes: 4 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type SchedulerConfiguration struct {
// management ACL token
RejectJobRegistration bool

// PauseEvalBroker stops the leader evaluation broker process from running
// until the configuration is updated and written to the Nomad servers.
PauseEvalBroker bool

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
Expand Down
44 changes: 44 additions & 0 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/require"
)

func TestOperator_RaftGetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -53,3 +54,46 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestOperator_SchedulerGetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.NotEmpty(t, schedulerConfig)
}

func TestOperator_SchedulerSetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

newSchedulerConfig := SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
PreemptionConfig: PreemptionConfig{
SystemSchedulerEnabled: true,
SysBatchSchedulerEnabled: true,
BatchSchedulerEnabled: true,
ServiceSchedulerEnabled: true,
},
MemoryOversubscriptionEnabled: true,
RejectJobRegistration: true,
PauseEvalBroker: true,
}

schedulerConfigUpdateResp, _, err := c.Operator().SchedulerSetConfiguration(&newSchedulerConfig, nil)
require.Nil(t, err)
require.True(t, schedulerConfigUpdateResp.Updated)

// We can't exactly predict the query meta responses, so we test fields
// individually.
schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.Equal(t, schedulerConfig.SchedulerConfig.SchedulerAlgorithm, SchedulerAlgorithmSpread)
require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
require.True(t, schedulerConfig.SchedulerConfig.RejectJobRegistration)
require.True(t, schedulerConfig.SchedulerConfig.MemoryOversubscriptionEnabled)
require.Equal(t, newSchedulerConfig.PreemptionConfig, schedulerConfig.SchedulerConfig.PreemptionConfig)
}
1 change: 1 addition & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
RejectJobRegistration: conf.RejectJobRegistration,
PauseEvalBroker: conf.PauseEvalBroker,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
Expand Down
41 changes: 21 additions & 20 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,32 +272,32 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
func TestOperator_SchedulerGetConfiguration(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
require.Nil(t, err)
require.Equal(t, 200, resp.Code)
out, ok := obj.(structs.SchedulerConfigurationResponse)
require.True(ok)
require.True(t, ok)

// Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled)
require.True(t, out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(t, out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(t, out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(t, out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(t, out.SchedulerConfig.MemoryOversubscriptionEnabled)
require.False(t, out.SchedulerConfig.PauseEvalBroker)
})
}

func TestOperator_SchedulerSetConfiguration(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`
{
"MemoryOversubscriptionEnabled": true,
"PauseEvalBroker": true,
"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"ServiceSchedulerEnabled": true
Expand All @@ -306,11 +306,11 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
require.Nil(t, err)
require.Equal(t, 200, resp.Code)
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)
require.True(t, ok)
require.NotZero(t, schedSetResp.Index)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Expand All @@ -320,12 +320,13 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {

var reply structs.SchedulerConfigurationResponse
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled)
require.Nil(t, err)
require.True(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(t, reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(t, reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(t, reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(t, reply.SchedulerConfig.MemoryOversubscriptionEnabled)
require.True(t, reply.SchedulerConfig.PauseEvalBroker)
})
}

Expand Down
16 changes: 15 additions & 1 deletion command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,21 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},

"operator scheduler": func() (cli.Command, error) {
return &OperatorSchedulerCommand{
Meta: meta,
}, nil
},
"operator scheduler get-config": func() (cli.Command, error) {
return &OperatorSchedulerGetConfig{
Meta: meta,
}, nil
},
"operator scheduler set-config": func() (cli.Command, error) {
return &OperatorSchedulerSetConfig{
Meta: meta,
}, nil
},
"operator snapshot": func() (cli.Command, error) {
return &OperatorSnapshotCommand{
Meta: meta,
Expand Down
6 changes: 3 additions & 3 deletions command/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (m *monitor) monitor(evalID string) int {
// Add the initial pending state
m.update(newEvalState())

m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q",
formatTime(time.Now()), limit(evalID, m.length)))

for {
// Query the evaluation
eval, _, err := m.client.Evaluations().Info(evalID, nil)
Expand All @@ -194,9 +197,6 @@ func (m *monitor) monitor(evalID string) int {
return 1
}

m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q",
formatTime(time.Now()), limit(eval.ID, m.length)))

// Create the new eval state.
state := newEvalState()
state.status = eval.Status
Expand Down
42 changes: 42 additions & 0 deletions command/operator_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package command

import (
"strings"

"github.com/mitchellh/cli"
)

// Ensure OperatorSchedulerCommand satisfies the cli.Command interface.
var _ cli.Command = &OperatorSchedulerCommand{}

type OperatorSchedulerCommand struct {
Meta
}

func (o *OperatorSchedulerCommand) Help() string {
helpText := `
Usage: nomad operator scheduler <subcommand> [options]
This command groups subcommands for interacting with Nomad's scheduler
subsystem.
Get the scheduler configuration:
$ nomad operator scheduler get-config
Set the scheduler to use the spread algorithm:
$ nomad operator scheduler set-config -scheduler-algorithm=spread
Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
}

func (o *OperatorSchedulerCommand) Synopsis() string {
return "Provides access to the scheduler configuration"
}

func (o *OperatorSchedulerCommand) Name() string { return "operator scheduler" }

func (o *OperatorSchedulerCommand) Run(_ []string) int { return cli.RunResultHelp }
118 changes: 118 additions & 0 deletions command/operator_scheduler_get_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package command

import (
"fmt"
"strings"

"github.com/mitchellh/cli"
"github.com/posener/complete"
)

// Ensure OperatorSchedulerGetConfig satisfies the cli.Command interface.
var _ cli.Command = &OperatorSchedulerGetConfig{}

type OperatorSchedulerGetConfig struct {
Meta

json bool
tmpl string
}

func (o *OperatorSchedulerGetConfig) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(o.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-json": complete.PredictNothing,
"-t": complete.PredictAnything,
},
)
}

func (o *OperatorSchedulerGetConfig) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}

func (o *OperatorSchedulerGetConfig) Name() string { return "operator scheduler get-config" }

func (o *OperatorSchedulerGetConfig) Run(args []string) int {

flags := o.Meta.FlagSet("get-config", FlagSetClient)
flags.BoolVar(&o.json, "json", false, "")
flags.StringVar(&o.tmpl, "t", "", "")
flags.Usage = func() { o.Ui.Output(o.Help()) }

if err := flags.Parse(args); err != nil {
return 1
}

// Set up a client.
client, err := o.Meta.Client()
if err != nil {
o.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

// Fetch the current configuration.
resp, _, err := client.Operator().SchedulerGetConfiguration(nil)
if err != nil {
o.Ui.Error(fmt.Sprintf("Error querying scheduler configuration: %s", err))
return 1
}

// If the user has specified to output the scheduler config as JSON or
// using a template, perform this action for the entire object and exit the
// command.
if o.json || len(o.tmpl) > 0 {
out, err := Format(o.json, o.tmpl, resp)
if err != nil {
o.Ui.Error(err.Error())
return 1
}
o.Ui.Output(out)
return 0
}

schedConfig := resp.SchedulerConfig

// Output the information.
o.Ui.Output(formatKV([]string{
fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm),
fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled),
fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration),
fmt.Sprintf("Pause Eval Broker|%v", schedConfig.PauseEvalBroker),
fmt.Sprintf("Preemption System Scheduler|%v", schedConfig.PreemptionConfig.SystemSchedulerEnabled),
fmt.Sprintf("Preemption Service Scheduler|%v", schedConfig.PreemptionConfig.ServiceSchedulerEnabled),
fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled),
fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled),
fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex),
}))
return 0
}

func (o *OperatorSchedulerGetConfig) Synopsis() string {
return "Display the current scheduler configuration"
}

func (o *OperatorSchedulerGetConfig) Help() string {
helpText := `
Usage: nomad operator scheduler get-config [options]
Displays the current scheduler configuration.
If ACLs are enabled, this command requires a token with the 'operator:read'
capability.
General Options:
` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `
Scheduler Get Config Options:
-json
Output the scheduler config in its JSON format.
-t
Format and display the scheduler config using a Go template.
`

return strings.TrimSpace(helpText)
}
Loading

0 comments on commit 24220d0

Please sign in to comment.