Skip to content

Commit

Permalink
tests: don't mutate global structs in core scheduler tests (#16120)
Browse files Browse the repository at this point in the history
Some of the core scheduler tests need the maximum batch size for writes to be
smaller than the usual `structs.MaxUUIDsPerWriteRequest`. But they do so by
unsafely modifying the global struct, which creates test flakes in other tests.

Modify the functions under test to take a batch size parameter. Production code
will pass the global while the tests can inject smaller values. Turn the
`structs.MaxUUIDsPerWriteRequest` into a constant, and add a semgrep rule for
avoiding this kind of thing in the future.
  • Loading branch information
tgross committed Feb 10, 2023
1 parent 5d5e389 commit 1d2a243
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
13 changes: 13 additions & 0 deletions .semgrep/protect_globals.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
rules:
- id: "no-overriding-struct-globals"
patterns:
- pattern: |
structs.$A = ...
message: "Mutating global structs is never safe"
languages:
- "go"
severity: "ERROR"
fix: " "
paths:
# including tests!
include: ["*"]
19 changes: 10 additions & 9 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ OUTER:
// jobReap contacts the leader and issues a reap on the passed jobs
func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionJobReap(jobs, leaderACL) {
for _, req := range c.partitionJobReap(jobs, leaderACL, maxIdsPerReap) {
var resp structs.JobBatchDeregisterResponse
if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil {
c.logger.Error("batch job reap failed", "error", err)
Expand All @@ -192,7 +192,7 @@ func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
// partitionJobReap returns a list of JobBatchDeregisterRequests to make,
// ensuring a single request does not contain too many jobs. This is necessary
// to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) []*structs.JobBatchDeregisterRequest {
func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string, batchSize int) []*structs.JobBatchDeregisterRequest {
option := &structs.JobDeregisterOptions{Purge: true}
var requests []*structs.JobBatchDeregisterRequest
submittedJobs := 0
Expand All @@ -205,7 +205,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
},
}
requests = append(requests, req)
available := maxIdsPerReap
available := batchSize

if remaining := len(jobs) - submittedJobs; remaining > 0 {
if remaining <= available {
Expand Down Expand Up @@ -358,7 +358,7 @@ func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job,
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionEvalReap(evals, allocs) {
for _, req := range c.partitionEvalReap(evals, allocs, maxIdsPerReap) {
var resp structs.GenericResponse
if err := c.srv.RPC("Eval.Reap", req, &resp); err != nil {
c.logger.Error("eval reap failed", "error", err)
Expand All @@ -372,7 +372,8 @@ func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// partitionEvalReap returns a list of EvalDeleteRequest to make, ensuring a single
// request does not contain too many allocations and evaluations. This is
// necessary to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.EvalDeleteRequest {

func (c *CoreScheduler) partitionEvalReap(evals, allocs []string, batchSize int) []*structs.EvalDeleteRequest {
var requests []*structs.EvalDeleteRequest
submittedEvals, submittedAllocs := 0, 0
for submittedEvals != len(evals) || submittedAllocs != len(allocs) {
Expand All @@ -382,7 +383,7 @@ func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.Eva
},
}
requests = append(requests, req)
available := maxIdsPerReap
available := batchSize

// Add the allocs first
if remaining := len(allocs) - submittedAllocs; remaining > 0 {
Expand Down Expand Up @@ -598,7 +599,7 @@ OUTER:
// deployments.
func (c *CoreScheduler) deploymentReap(deployments []string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionDeploymentReap(deployments) {
for _, req := range c.partitionDeploymentReap(deployments, maxIdsPerReap) {
var resp structs.GenericResponse
if err := c.srv.RPC("Deployment.Reap", req, &resp); err != nil {
c.logger.Error("deployment reap failed", "error", err)
Expand All @@ -612,7 +613,7 @@ func (c *CoreScheduler) deploymentReap(deployments []string) error {
// partitionDeploymentReap returns a list of DeploymentDeleteRequest to make,
// ensuring a single request does not contain too many deployments. This is
// necessary to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionDeploymentReap(deployments []string) []*structs.DeploymentDeleteRequest {
func (c *CoreScheduler) partitionDeploymentReap(deployments []string, batchSize int) []*structs.DeploymentDeleteRequest {
var requests []*structs.DeploymentDeleteRequest
submittedDeployments := 0
for submittedDeployments != len(deployments) {
Expand All @@ -622,7 +623,7 @@ func (c *CoreScheduler) partitionDeploymentReap(deployments []string) []*structs
},
}
requests = append(requests, req)
available := maxIdsPerReap
available := batchSize

if remaining := len(deployments) - submittedDeployments; remaining > 0 {
if remaining <= available {
Expand Down
26 changes: 10 additions & 16 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,12 +1852,11 @@ func TestCoreScheduler_PartitionEvalReap(t *testing.T) {
}
core := NewCoreScheduler(s1, snap)

// Set the max ids per reap to something lower.
maxIdsPerReap = 2

evals := []string{"a", "b", "c"}
allocs := []string{"1", "2", "3"}
requests := core.(*CoreScheduler).partitionEvalReap(evals, allocs)

// Set the max ids per reap to something lower.
requests := core.(*CoreScheduler).partitionEvalReap(evals, allocs, 2)
if len(requests) != 3 {
t.Fatalf("Expected 3 requests got: %v", requests)
}
Expand Down Expand Up @@ -1895,11 +1894,9 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
}
core := NewCoreScheduler(s1, snap)

// Set the max ids per reap to something lower.
maxIdsPerReap = 2

deployments := []string{"a", "b", "c"}
requests := core.(*CoreScheduler).partitionDeploymentReap(deployments)
// Set the max ids per reap to something lower.
requests := core.(*CoreScheduler).partitionDeploymentReap(deployments, 2)
if len(requests) != 2 {
t.Fatalf("Expected 2 requests got: %v", requests)
}
Expand All @@ -1917,7 +1914,6 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {

func TestCoreScheduler_PartitionJobReap(t *testing.T) {
ci.Parallel(t)
require := require.New(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
Expand All @@ -1929,18 +1925,16 @@ func TestCoreScheduler_PartitionJobReap(t *testing.T) {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}

// Set the max ids per reap to something lower.
maxIdsPerReap = 2

jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
require.Len(requests, 2)
requests := core.(*CoreScheduler).partitionJobReap(jobs, "", 2)
require.Len(t, requests, 2)

first := requests[0]
second := requests[1]
require.Len(first.Jobs, 2)
require.Len(second.Jobs, 1)
require.Len(t, first.Jobs, 2)
require.Len(t, second.Jobs, 1)
}

// Tests various scenarios when allocations are eligible to be GCed
Expand Down

0 comments on commit 1d2a243

Please sign in to comment.