Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: allow pausing and un-pausing of leader broker routine #13045

Merged
merged 6 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/13045.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
cli: Added `scheduler get-config` and `scheduler set-config` commands to the operator CLI
```

```release-note:improvement
core: Added the ability to pause and un-pause the eval broker and blocked eval broker
```
4 changes: 4 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type SchedulerConfiguration struct {
// management ACL token
RejectJobRegistration bool

// PauseEvalBroker stops the leader evaluation broker process from running
// until the configuration is updated and written to the Nomad servers.
PauseEvalBroker bool

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
Expand Down
44 changes: 44 additions & 0 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/require"
)

func TestOperator_RaftGetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -53,3 +54,46 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestOperator_SchedulerGetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.NotEmpty(t, schedulerConfig)
}

func TestOperator_SchedulerSetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

newSchedulerConfig := SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
PreemptionConfig: PreemptionConfig{
SystemSchedulerEnabled: true,
SysBatchSchedulerEnabled: true,
BatchSchedulerEnabled: true,
ServiceSchedulerEnabled: true,
},
MemoryOversubscriptionEnabled: true,
RejectJobRegistration: true,
PauseEvalBroker: true,
}

schedulerConfigUpdateResp, _, err := c.Operator().SchedulerSetConfiguration(&newSchedulerConfig, nil)
require.Nil(t, err)
require.True(t, schedulerConfigUpdateResp.Updated)

// We can't exactly predict the query meta responses, so we test fields
// individually.
schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.Equal(t, schedulerConfig.SchedulerConfig.SchedulerAlgorithm, SchedulerAlgorithmSpread)
require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
jrasell marked this conversation as resolved.
Show resolved Hide resolved
require.True(t, schedulerConfig.SchedulerConfig.RejectJobRegistration)
require.True(t, schedulerConfig.SchedulerConfig.MemoryOversubscriptionEnabled)
require.Equal(t, newSchedulerConfig.PreemptionConfig, schedulerConfig.SchedulerConfig.PreemptionConfig)
}
1 change: 1 addition & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
RejectJobRegistration: conf.RejectJobRegistration,
PauseEvalBroker: conf.PauseEvalBroker,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
Expand Down
41 changes: 21 additions & 20 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,32 +272,32 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
func TestOperator_SchedulerGetConfiguration(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
require.Nil(t, err)
require.Equal(t, 200, resp.Code)
out, ok := obj.(structs.SchedulerConfigurationResponse)
require.True(ok)
require.True(t, ok)

// Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled)
require.True(t, out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(t, out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(t, out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(t, out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(t, out.SchedulerConfig.MemoryOversubscriptionEnabled)
require.False(t, out.SchedulerConfig.PauseEvalBroker)
})
}

func TestOperator_SchedulerSetConfiguration(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`
{
"MemoryOversubscriptionEnabled": true,
"PauseEvalBroker": true,
"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"ServiceSchedulerEnabled": true
Expand All @@ -306,11 +306,11 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
require.Nil(t, err)
require.Equal(t, 200, resp.Code)
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)
require.True(t, ok)
require.NotZero(t, schedSetResp.Index)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Expand All @@ -320,12 +320,13 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {

var reply structs.SchedulerConfigurationResponse
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled)
require.Nil(t, err)
require.True(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(t, reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(t, reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(t, reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(t, reply.SchedulerConfig.MemoryOversubscriptionEnabled)
require.True(t, reply.SchedulerConfig.PauseEvalBroker)
})
}

Expand Down
16 changes: 15 additions & 1 deletion command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,21 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},

"operator scheduler": func() (cli.Command, error) {
return &OperatorSchedulerCommand{
Meta: meta,
}, nil
},
"operator scheduler get-config": func() (cli.Command, error) {
return &OperatorSchedulerGetConfig{
Meta: meta,
}, nil
},
"operator scheduler set-config": func() (cli.Command, error) {
return &OperatorSchedulerSetConfig{
Meta: meta,
}, nil
},
"operator snapshot": func() (cli.Command, error) {
return &OperatorSnapshotCommand{
Meta: meta,
Expand Down
6 changes: 3 additions & 3 deletions command/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (m *monitor) monitor(evalID string) int {
// Add the initial pending state
m.update(newEvalState())

m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q",
formatTime(time.Now()), limit(evalID, m.length)))

for {
// Query the evaluation
eval, _, err := m.client.Evaluations().Info(evalID, nil)
Expand All @@ -194,9 +197,6 @@ func (m *monitor) monitor(evalID string) int {
return 1
}

m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q",
formatTime(time.Now()), limit(eval.ID, m.length)))

// Create the new eval state.
state := newEvalState()
state.status = eval.Status
Expand Down
42 changes: 42 additions & 0 deletions command/operator_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package command

import (
"strings"

"github.com/mitchellh/cli"
)

// Ensure OperatorSchedulerCommand satisfies the cli.Command interface.
var _ cli.Command = &OperatorSchedulerCommand{}

type OperatorSchedulerCommand struct {
Meta
}

func (o *OperatorSchedulerCommand) Help() string {
helpText := `
Usage: nomad operator scheduler <subcommand> [options]

This command groups subcommands for interacting with Nomad's scheduler
subsystem.

Get the scheduler configuration:

$ nomad operator scheduler get-config

Set the scheduler to use the spread algorithm:

$ nomad operator scheduler set-config -scheduler-algorithm=spread

Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
}

func (o *OperatorSchedulerCommand) Synopsis() string {
return "Provides access to the scheduler configuration"
}

func (o *OperatorSchedulerCommand) Name() string { return "operator scheduler" }

func (o *OperatorSchedulerCommand) Run(_ []string) int { return cli.RunResultHelp }
118 changes: 118 additions & 0 deletions command/operator_scheduler_get_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package command

import (
"fmt"
"strings"

"github.com/mitchellh/cli"
"github.com/posener/complete"
)

// Ensure OperatorSchedulerGetConfig satisfies the cli.Command interface.
var _ cli.Command = &OperatorSchedulerGetConfig{}

type OperatorSchedulerGetConfig struct {
Meta

json bool
tmpl string
}

func (o *OperatorSchedulerGetConfig) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(o.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-json": complete.PredictNothing,
"-t": complete.PredictAnything,
},
)
}

func (o *OperatorSchedulerGetConfig) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}

func (o *OperatorSchedulerGetConfig) Name() string { return "operator scheduler get-config" }

func (o *OperatorSchedulerGetConfig) Run(args []string) int {

flags := o.Meta.FlagSet("get-config", FlagSetClient)
flags.BoolVar(&o.json, "json", false, "")
flags.StringVar(&o.tmpl, "t", "", "")
flags.Usage = func() { o.Ui.Output(o.Help()) }

if err := flags.Parse(args); err != nil {
return 1
}

// Set up a client.
client, err := o.Meta.Client()
if err != nil {
o.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

// Fetch the current configuration.
resp, _, err := client.Operator().SchedulerGetConfiguration(nil)
if err != nil {
o.Ui.Error(fmt.Sprintf("Error querying scheduler configuration: %s", err))
return 1
}

// If the user has specified to output the scheduler config as JSON or
// using a template, perform this action for the entire object and exit the
// command.
if o.json || len(o.tmpl) > 0 {
out, err := Format(o.json, o.tmpl, resp)
if err != nil {
o.Ui.Error(err.Error())
return 1
}
o.Ui.Output(out)
return 0
}

schedConfig := resp.SchedulerConfig

// Output the information.
o.Ui.Output(formatKV([]string{
fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm),
fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled),
fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration),
fmt.Sprintf("Pause Eval Broker|%v", schedConfig.PauseEvalBroker),
fmt.Sprintf("Preemption System Scheduler|%v", schedConfig.PreemptionConfig.SystemSchedulerEnabled),
fmt.Sprintf("Preemption Service Scheduler|%v", schedConfig.PreemptionConfig.ServiceSchedulerEnabled),
fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled),
fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled),
jrasell marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex),
}))
return 0
}

func (o *OperatorSchedulerGetConfig) Synopsis() string {
return "Display the current scheduler configuration"
}

func (o *OperatorSchedulerGetConfig) Help() string {
helpText := `
Usage: nomad operator scheduler get-config [options]

Displays the current scheduler configuration.

If ACLs are enabled, this command requires a token with the 'operator:read'
capability.

General Options:

` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `

Scheduler Get Config Options:

-json
Output the scheduler config in its JSON format.

-t
Format and display the scheduler config using a Go template.
`

return strings.TrimSpace(helpText)
}
Loading