Skip to content

Commit

Permalink
Merge pull request #4710 from hashicorp/f-preemption-core
Browse files Browse the repository at this point in the history
Implement preemption for system jobs.
  • Loading branch information
Preetha authored Oct 15, 2018
2 parents 1b06413 + 13a3d80 commit 92836a3
Show file tree
Hide file tree
Showing 25 changed files with 2,464 additions and 35 deletions.
50 changes: 50 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package api

import "strconv"

// Operator can be used to perform low-level operator tasks for Nomad.
type Operator struct {
c *Client
Expand Down Expand Up @@ -106,3 +108,51 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
resp.Body.Close()
return nil
}

type SchedulerConfiguration struct {
// PreemptionConfig specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfiguration, *QueryMeta, error) {
var resp SchedulerConfiguration
qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q)
if err != nil {
return nil, err
}
return wm, nil
}

// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return false, nil, err
}

return out, wm, nil
}
66 changes: 66 additions & 0 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package api
import (
"strings"
"testing"

"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/require"
)

func TestOperator_RaftGetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -51,3 +54,66 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfiguration
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.PreemptionConfig.SystemSchedulerEnabled)

// Change a config setting
newConf := &SchedulerConfiguration{PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}}
_, err := operator.SchedulerSetConfiguration(newConf, nil)
require.Nil(err)

config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
require.True(config.PreemptionConfig.SystemSchedulerEnabled)
}

func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfiguration
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.PreemptionConfig.SystemSchedulerEnabled)

// Pass an invalid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex - 1,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.False(resp)
}

// Pass a valid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.True(resp)
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))

s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {
Expand Down
64 changes: 64 additions & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,67 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re

return out, nil
}

// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}

var reply structs.SchedulerConfiguration
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}

out := api.SchedulerConfiguration{
PreemptionConfig: api.PreemptionConfig{SystemSchedulerEnabled: reply.PreemptionConfig.SystemSchedulerEnabled},
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}

return out, nil

case "PUT":
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)

var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}

args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}

// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
args.Config.ModifyIndex = casVal
args.CAS = true
}

var reply bool
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}

// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
}
return reply, nil

default:
return nil, CodedError(404, ErrInvalidMethod)
}
}
100 changes: 100 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHTTP_OperatorRaftConfiguration(t *testing.T) {
Expand Down Expand Up @@ -257,3 +258,102 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
})
})
}

func TestOperator_SchedulerGetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
out, ok := obj.(api.SchedulerConfiguration)
require.True(ok)
require.True(out.PreemptionConfig.SystemSchedulerEnabled)
})
}

func TestOperator_SchedulerSetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}

var reply structs.SchedulerConfiguration
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.PreemptionConfig.SystemSchedulerEnabled)
})
}

func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}

var reply structs.SchedulerConfiguration
if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.True(reply.PreemptionConfig.SystemSchedulerEnabled)

// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex-1), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.False(obj.(bool))
}

// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.True(obj.(bool))
}

// Verify the update
if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.False(reply.PreemptionConfig.SystemSchedulerEnabled)
})
}
21 changes: 20 additions & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyNodeEligibilityUpdate(buf[1:], log.Index)
case structs.BatchNodeUpdateDrainRequestType:
return n.applyBatchDrainUpdate(buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -820,7 +822,7 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {
n.logger.Error("ApplyPlan failed", "error", err)
return err
}

n.handleUpsertedEvals(req.PreemptionEvals)
return nil
}

Expand Down Expand Up @@ -1840,6 +1842,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
return nil
}

func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} {
var req structs.SchedulerSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now())

if req.CAS {
act, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
}
return act
}
return n.state.SchedulerSetConfig(index, &req.Config)
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
Loading

0 comments on commit 92836a3

Please sign in to comment.