Skip to content

Commit

Permalink
Merge pull request #9160 from hashicorp/f-sysbatch
Browse files Browse the repository at this point in the history
core: implement system batch scheduler
  • Loading branch information
Mahmood Ali committed Aug 16, 2021
2 parents 91fb72f + 141ea60 commit fdb8684
Show file tree
Hide file tree
Showing 36 changed files with 2,761 additions and 607 deletions.
7 changes: 4 additions & 3 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ const (

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
SystemSchedulerEnabled bool
SysBatchSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
Expand Down
18 changes: 11 additions & 7 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ const (
// jitter is the percent of jitter added to restart delays.
jitter = 0.25

ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableErrror = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableError = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch
onSuccess := true

// Batch & SysBatch jobs should not restart if they exit successfully
if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch {
onSuccess = false
}

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
Expand Down Expand Up @@ -201,7 +205,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
r.reason = ReasonUnrecoverableError
return structs.TaskNotRestarting, 0
}
} else if r.exitRes != nil {
Expand Down
7 changes: 4 additions & 3 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
}

if err := args.Config.Validate(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {

// Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled)
Expand Down Expand Up @@ -319,6 +320,8 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled)
})
Expand All @@ -330,6 +333,7 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"SysBatchSchedulerEnabled":true,
"BatchSchedulerEnabled":true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
Expand All @@ -352,7 +356,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)

// Create a CAS request, bad index
{
Expand Down Expand Up @@ -393,7 +399,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
})
}

Expand Down
3 changes: 2 additions & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
_ "github.com/hashicorp/nomad/e2e/rescheduling"
_ "github.com/hashicorp/nomad/e2e/scaling"
_ "github.com/hashicorp/nomad/e2e/scalingpolicies"
_ "github.com/hashicorp/nomad/e2e/scheduler_sysbatch"
_ "github.com/hashicorp/nomad/e2e/scheduler_system"
_ "github.com/hashicorp/nomad/e2e/spread"
_ "github.com/hashicorp/nomad/e2e/systemsched"
_ "github.com/hashicorp/nomad/e2e/taskevents"
_ "github.com/hashicorp/nomad/e2e/vaultsecrets"
_ "github.com/hashicorp/nomad/e2e/volumes"
Expand Down
24 changes: 24 additions & 0 deletions e2e/e2eutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,30 @@ func WaitForAllocStopped(t *testing.T, nomadClient *api.Client, allocID string)
})
}

func WaitForAllocStatus(t *testing.T, nomadClient *api.Client, allocID string, status string) {
testutil.WaitForResultRetries(retries, func() (bool, error) {
time.Sleep(time.Millisecond * 100)
alloc, _, err := nomadClient.Allocations().Info(allocID, nil)
if err != nil {
return false, err
}
switch alloc.ClientStatus {
case status:
return true, nil
default:
return false, fmt.Errorf("expected %s alloc, but was: %s", status, alloc.ClientStatus)
}
}, func(err error) {
t.Fatalf("failed to wait on alloc: %v", err)
})
}

func WaitForAllocsStatus(t *testing.T, nomadClient *api.Client, allocIDs []string, status string) {
for _, allocID := range allocIDs {
WaitForAllocStatus(t, nomadClient, allocID, status)
}
}

func AllocIDsFromAllocationListStubs(allocs []*api.AllocationListStub) []string {
allocIDs := make([]string, 0, len(allocs))
for _, alloc := range allocs {
Expand Down
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_dispatch.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

parameterized {
payload = "forbidden"
meta_required = ["KEY"]
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_fast.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_slow.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1000000"]
}
}
}
}
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_periodic.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

periodic {
cron = "*/15 * * * * *"
prohibit_overlap = true
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}
Loading

0 comments on commit fdb8684

Please sign in to comment.