Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvements to scheduler config API #4865

Merged
merged 4 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,19 @@ type SchedulerConfiguration struct {
// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration
type SchedulerConfigurationResponse struct {
// SchedulerConfig contains scheduler config options
SchedulerConfig SchedulerConfiguration
SchedulerConfig *SchedulerConfiguration

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
QueryMeta
}

// SchedulerSetConfigurationResponse is the response object used
// when updating scheduler configuration
type SchedulerSetConfigurationResponse struct {
// Updated returns whether the config was actually updated
// Only set when the request uses CAS
Updated bool

WriteMeta
}

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
Expand All @@ -137,32 +145,32 @@ type PreemptionConfig struct {
// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) {
var resp SchedulerConfigurationResponse
qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q)
qm, err := op.c.query("/v1/operator/scheduler/configuration", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q)
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) {
var out SchedulerSetConfigurationResponse
wm, err := op.c.write("/v1/operator/scheduler/configuration", conf, &out, q)
if err != nil {
return nil, err
return nil, nil, err
}
return wm, nil
return &out, wm, nil
}

// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) {
var out SchedulerSetConfigurationResponse
wm, err := op.c.write("/v1/operator/scheduler/configuration?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return false, nil, err
return nil, nil, err
}

return out, wm, nil
return &out, wm, nil
}
18 changes: 11 additions & 7 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {

// Change a config setting
newConf := &SchedulerConfiguration{PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}}
_, err := operator.SchedulerSetConfiguration(newConf, nil)
resp, wm, err := operator.SchedulerSetConfiguration(newConf, nil)
require.Nil(err)
require.NotZero(wm.LastIndex)
require.False(resp.Updated)

config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
Expand All @@ -99,21 +101,23 @@ func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex - 1,
ModifyIndex: config.SchedulerConfig.ModifyIndex - 1,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
resp, wm, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.False(resp)
require.NotZero(wm.LastIndex)
require.False(resp.Updated)
}

// Pass a valid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex,
ModifyIndex: config.SchedulerConfig.ModifyIndex,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
resp, wm, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.True(resp)
require.NotZero(wm.LastIndex)
require.True(resp.Updated)
}
}
2 changes: 1 addition & 1 deletion command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))

s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration))
s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
Expand Down
93 changes: 43 additions & 50 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,66 +215,59 @@ func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, re
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}
return s.schedulerGetConfig(resp, req)

var reply structs.SchedulerConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}
case "PUT", "POST":
return s.schedulerUpdateConfig(resp, req)

out := api.SchedulerConfiguration{
PreemptionConfig: api.PreemptionConfig{SystemSchedulerEnabled: reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled},
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}

resp := api.SchedulerConfigurationResponse{
SchedulerConfig: out,
CreateIndex: out.CreateIndex,
ModifyIndex: out.ModifyIndex,
}
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}

return resp, nil
func (s *HTTPServer) schedulerGetConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}

case "PUT":
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)
var reply structs.SchedulerConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}
setMeta(resp, &reply.QueryMeta)

var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}
return reply, nil
}

args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}
func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)

// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
args.Config.ModifyIndex = casVal
args.CAS = true
}
var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}

var reply bool
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}
args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}

// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
return reply, nil
args.Config.ModifyIndex = casVal
args.CAS = true
}

default:
return nil, CodedError(404, ErrInvalidMethod)
var reply structs.SchedulerSetConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}
setIndex(resp, reply.Index)
return reply, nil
}
38 changes: 26 additions & 12 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/config", body)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
out, ok := obj.(api.SchedulerConfigurationResponse)
out, ok := obj.(structs.SchedulerConfigurationResponse)
require.True(ok)
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
})
Expand All @@ -282,11 +282,14 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Expand All @@ -308,11 +311,14 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Expand All @@ -331,23 +337,31 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex-1), buf)
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/configuration?cas=%d", reply.QueryMeta.Index-1), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.False(obj.(bool))
// Verify that the response has Updated=false
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)
require.False(schedSetResp.Updated)
}

// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex), buf)
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/configuration?cas=%d", reply.QueryMeta.Index), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.True(obj.(bool))
// Verify that the response has Updated=true
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)
require.True(schedSetResp.Updated)
}

// Verify the update
Expand Down
22 changes: 11 additions & 11 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot.
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *bool) error {
func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *structs.SchedulerSetConfigurationResponse) error {
if done, err := op.srv.forward("Operator.SchedulerSetConfiguration", args, args, reply); done {
return err
}
Expand All @@ -300,18 +300,20 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}

// Apply the update
resp, _, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args)
resp, index, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args)
if err != nil {
op.logger.Error("failed applying Scheduler configuration", "error", err)
return err
} else if respErr, ok := resp.(error); ok {
return respErr
}

// Check if the return type is a bool.
// Check if the return type is a bool
// Only applies to CAS requests
if respBool, ok := resp.(bool); ok {
*reply = respBool
reply.Updated = respBool
}
reply.Index = index
return nil
}

Expand All @@ -330,19 +332,17 @@ func (op *Operator) SchedulerGetConfiguration(args *structs.GenericRequest, repl
}

state := op.srv.fsm.State()
_, config, err := state.SchedulerConfig()
index, config, err := state.SchedulerConfig()

if err != nil {
return err
} else if config == nil {
return fmt.Errorf("scheduler config not initialized yet")
}

resp := &structs.SchedulerConfigurationResponse{
SchedulerConfig: *config,
CreateIndex: config.CreateIndex,
ModifyIndex: config.ModifyIndex,
}
*reply = *resp
reply.SchedulerConfig = config
reply.QueryMeta.Index = index
op.srv.setQueryMeta(&reply.QueryMeta)

return nil
}
Loading