diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index a1015b058c6d..7f73453ca825 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1345,9 +1345,8 @@ func apiConnectGatewayProxyToStructs(in *api.ConsulGatewayProxy) *structs.Consul return nil } - var bindAddresses map[string]*structs.ConsulGatewayBindAddress + bindAddresses := make(map[string]*structs.ConsulGatewayBindAddress) if in.EnvoyGatewayBindAddresses != nil { - bindAddresses = make(map[string]*structs.ConsulGatewayBindAddress) for k, v := range in.EnvoyGatewayBindAddresses { bindAddresses[k] = &structs.ConsulGatewayBindAddress{ Address: v.Address, diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index fdde940a566e..87c0cb9bec3e 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -24,6 +24,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/nodedrain" _ "github.com/hashicorp/nomad/e2e/nomad09upgrade" _ "github.com/hashicorp/nomad/e2e/nomadexec" + _ "github.com/hashicorp/nomad/e2e/periodic" _ "github.com/hashicorp/nomad/e2e/podman" _ "github.com/hashicorp/nomad/e2e/quotas" _ "github.com/hashicorp/nomad/e2e/rescheduling" diff --git a/e2e/e2eutil/job.go b/e2e/e2eutil/job.go index 072f7e7843d1..b954c762c22b 100644 --- a/e2e/e2eutil/job.go +++ b/e2e/e2eutil/job.go @@ -6,12 +6,12 @@ import ( "io/ioutil" "os/exec" "regexp" + "strings" ) // Register registers a jobspec from a file but with a unique ID. // The caller is responsible for recording that ID for later cleanup. func Register(jobID, jobFilePath string) error { - cmd := exec.Command("nomad", "job", "run", "-") stdin, err := cmd.StdinPipe() if err != nil { @@ -40,6 +40,33 @@ func Register(jobID, jobFilePath string) error { return nil } +// PeriodicForce forces a periodic job to dispatch, returning the child job ID +// or an error +func PeriodicForce(jobID string) error { + // nomad job periodic force + cmd := exec.Command("nomad", "job", "periodic", "force", jobID) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("could not register job: %w\n%v", err, string(out)) + } + + return nil +} + +// JobInspectTemplate runs nomad job inspect and formats the output +// using the specified go template +func JobInspectTemplate(jobID, template string) (string, error) { + cmd := exec.Command("nomad", "job", "inspect", "-t", template, jobID) + out, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("could not inspect job: %w\n%v", err, string(out)) + } + outStr := string(out) + outStr = strings.TrimSuffix(outStr, "\n") + return outStr, nil +} + // Register registers a jobspec from a string, also with a unique ID. // The caller is responsible for recording that ID for later cleanup. func RegisterFromJobspec(jobID, jobspec string) error { diff --git a/e2e/periodic/input/simple.nomad b/e2e/periodic/input/simple.nomad new file mode 100644 index 000000000000..ea9920aa915d --- /dev/null +++ b/e2e/periodic/input/simple.nomad @@ -0,0 +1,28 @@ +job "periodic" { + datacenters = ["dc1"] + type = "batch" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + + periodic { + cron = "* * * * *" + prohibit_overlap = true + } + + group "group" { + task "task" { + driver = "docker" + + config { + image = "busybox:1" + command = "/bin/sh" + args = ["-c", "sleep 5"] + } + } + } +} + diff --git a/e2e/periodic/periodic.go b/e2e/periodic/periodic.go new file mode 100644 index 000000000000..8cfcd36a6429 --- /dev/null +++ b/e2e/periodic/periodic.go @@ -0,0 +1,82 @@ +package periodic + +import ( + "fmt" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +type PeriodicTest struct { + framework.TC + jobIDs []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "Periodic", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(PeriodicTest), + }, + }) +} + +func (tc *PeriodicTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) +} + +func (tc *PeriodicTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + j := nomadClient.Jobs() + + for _, id := range tc.jobIDs { + j.Deregister(id, true, nil) + } + _, err := e2eutil.Command("nomad", "system", "gc") + f.NoError(err) +} + +func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) { + t := f.T() + + uuid := uuid.Generate() + jobID := fmt.Sprintf("periodicjob-%s", uuid[0:8]) + tc.jobIDs = append(tc.jobIDs, jobID) + + // register job + e2eutil.Register(jobID, "periodic/input/simple.nomad") + + // force dispatch + require.NoError(t, e2eutil.PeriodicForce(jobID)) + + // Get the child job ID + childID, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .ID}}{{end}}`) + require.NoError(t, err) + require.NotEmpty(t, childID) + + testutil.WaitForResult(func() (bool, error) { + status, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .Status}}{{end}}`) + require.NoError(t, err) + require.NotEmpty(t, status) + if status == "dead" { + return true, nil + } + return false, fmt.Errorf("expected periodic job to be dead, got %s", status) + }, func(err error) { + require.NoError(t, err) + }) + + // Assert there are no pending children + pending, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Pending}}{{end}}`) + require.NoError(t, err) + require.Equal(t, "0", pending) + + // Assert there are no pending children + dead, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Dead}}{{end}}`) + require.NoError(t, err) + require.Equal(t, "1", dead) +} diff --git a/e2e/terraform/Makefile b/e2e/terraform/Makefile index 9a473a26d7a4..863d4f50c106 100644 --- a/e2e/terraform/Makefile +++ b/e2e/terraform/Makefile @@ -1,6 +1,9 @@ NOMAD_SHA ?= $(shell git rev-parse HEAD) PKG_PATH = $(shell pwd)/../../pkg/linux_amd64/nomad +# The version of nomad that gets deployed depends on an order of precedence +# linked below +# https://github.com/hashicorp/nomad/blob/master/e2e/terraform/README.md#nomad-version dev-cluster: terraform apply -auto-approve \ -var="nomad_sha=$(NOMAD_SHA)" diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 20ecf18f32df..83949dc0e22b 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,6 +2,7 @@ package nomad import ( "bytes" + "context" "fmt" "reflect" "strings" @@ -3422,3 +3423,59 @@ func TestFSM_ACLEvents(t *testing.T) { }) } } + +// TestFSM_EventBroker_JobRegisterFSMEvents asserts that only a single job +// register event is emitted when registering a job +func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + job := mock.Job() + eval := mock.Eval() + eval.JobID = job.ID + + req := structs.JobRegisterRequest{ + Job: job, + Eval: eval, + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + require.NoError(t, err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(t, resp) + + broker, err := fsm.State().EventBroker() + require.NoError(t, err) + + subReq := &stream.SubscribeRequest{ + Topics: map[structs.Topic][]string{ + structs.TopicJob: {"*"}, + }, + } + + sub, err := broker.Subscribe(subReq) + require.NoError(t, err) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Millisecond)) + defer cancel() + + // consume the queue + var events []structs.Event + for { + out, err := sub.Next(ctx) + if len(out.Events) == 0 { + break + } + + // consume the queue until the deadline has exceeded or until we've + // received more events than expected + if err == context.DeadlineExceeded || len(events) > 1 { + break + } + + events = append(events, out.Events...) + } + + require.Len(t, events, 1) + require.Equal(t, structs.TypeJobRegistered, events[0].Type) +} diff --git a/nomad/job_endpoint_hook_connect.go b/nomad/job_endpoint_hook_connect.go index 40f7eeeb0a11..e7249097f716 100644 --- a/nomad/job_endpoint_hook_connect.go +++ b/nomad/job_endpoint_hook_connect.go @@ -345,7 +345,7 @@ func gatewayProxyForBridge(gateway *structs.ConsulGateway) *structs.ConsulGatewa func gatewayBindAddresses(ingress *structs.ConsulIngressConfigEntry) map[string]*structs.ConsulGatewayBindAddress { if ingress == nil || len(ingress.Listeners) == 0 { - return nil + return make(map[string]*structs.ConsulGatewayBindAddress) } addresses := make(map[string]*structs.ConsulGatewayBindAddress) diff --git a/nomad/job_endpoint_hook_connect_test.go b/nomad/job_endpoint_hook_connect_test.go index 53252c2acf4c..494dca56c17f 100644 --- a/nomad/job_endpoint_hook_connect_test.go +++ b/nomad/job_endpoint_hook_connect_test.go @@ -412,12 +412,12 @@ func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) { func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) { t.Run("nil", func(t *testing.T) { result := gatewayBindAddresses(nil) - require.Nil(t, result) + require.Empty(t, result) }) t.Run("no listeners", func(t *testing.T) { result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{Listeners: nil}) - require.Nil(t, result) + require.Empty(t, result) }) t.Run("simple", func(t *testing.T) { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index ccb8a4c19107..a3fe090542cc 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -875,7 +875,8 @@ func ConnectIngressGatewayJob(mode string, inject bool) *structs.Job { Connect: &structs.ConsulConnect{ Gateway: &structs.ConsulGateway{ Proxy: &structs.ConsulGatewayProxy{ - ConnectTimeout: helper.TimeToPtr(3 * time.Second), + ConnectTimeout: helper.TimeToPtr(3 * time.Second), + EnvoyGatewayBindAddresses: make(map[string]*structs.ConsulGatewayBindAddress), }, Ingress: &structs.ConsulIngressConfigEntry{ Listeners: []*structs.ConsulIngressListener{{ diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1c749827327d..bcbc6c16787e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4412,6 +4412,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *txn, if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { return err } + } return nil @@ -4427,9 +4428,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, // Capture the current status so we can check if there is a change oldStatus := job.Status - if index == job.CreateIndex { - oldStatus = "" - } + firstPass := index == job.CreateIndex newStatus := forceStatus // If forceStatus is not set, compute the jobs status. @@ -4441,8 +4440,12 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, } } - // Fast-path if nothing has changed. + // Fast-path if the job has changed. + // Still update the job summary if necessary. if oldStatus == newStatus { + if err := s.setJobSummary(txn, job, index, oldStatus, newStatus, firstPass); err != nil { + return err + } return nil } @@ -4460,64 +4463,72 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, } // Update the children summary - if updated.ParentID != "" { - // Try to update the summary of the parent job summary - summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID) - if err != nil { - return fmt.Errorf("unable to retrieve summary for parent job: %v", err) - } + if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus, firstPass); err != nil { + return fmt.Errorf("job summary update failed %w", err) + } + return nil +} - // Only continue if the summary exists. It could not exist if the parent - // job was removed - if summaryRaw != nil { - existing := summaryRaw.(*structs.JobSummary) - pSummary := existing.Copy() - if pSummary.Children == nil { - pSummary.Children = new(structs.JobChildrenSummary) - } +func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string, firstPass bool) error { + if updated.ParentID == "" { + return nil + } - // Determine the transition and update the correct fields - children := pSummary.Children + // Try to update the summary of the parent job summary + summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID) + if err != nil { + return fmt.Errorf("unable to retrieve summary for parent job: %v", err) + } - // Decrement old status - if oldStatus != "" { - switch oldStatus { - case structs.JobStatusPending: - children.Pending-- - case structs.JobStatusRunning: - children.Running-- - case structs.JobStatusDead: - children.Dead-- - default: - return fmt.Errorf("unknown old job status %q", oldStatus) - } - } + // Only continue if the summary exists. It could not exist if the parent + // job was removed + if summaryRaw != nil { + existing := summaryRaw.(*structs.JobSummary) + pSummary := existing.Copy() + if pSummary.Children == nil { + pSummary.Children = new(structs.JobChildrenSummary) + } + + // Determine the transition and update the correct fields + children := pSummary.Children - // Increment new status - switch newStatus { + // Decrement old status + if !firstPass { + switch oldStatus { case structs.JobStatusPending: - children.Pending++ + children.Pending-- case structs.JobStatusRunning: - children.Running++ + children.Running-- case structs.JobStatusDead: - children.Dead++ + children.Dead-- default: - return fmt.Errorf("unknown new job status %q", newStatus) + return fmt.Errorf("unknown old job status %q", oldStatus) } + } - // Update the index - pSummary.ModifyIndex = index + // Increment new status + switch newStatus { + case structs.JobStatusPending: + children.Pending++ + case structs.JobStatusRunning: + children.Running++ + case structs.JobStatusDead: + children.Dead++ + default: + return fmt.Errorf("unknown new job status %q", newStatus) + } - // Insert the summary - if err := txn.Insert("job_summary", pSummary); err != nil { - return fmt.Errorf("job summary insert failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } + // Update the index + pSummary.ModifyIndex = index + + // Insert the summary + if err := txn.Insert("job_summary", pSummary); err != nil { + return fmt.Errorf("job summary insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) } } - return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 7bff0f1ca719..1c42e677f1c6 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6839,32 +6839,20 @@ func TestStateStore_UpdateJobStability(t *testing.T) { // Insert a job twice to get two versions job := mock.Job() - if err := state.UpsertJob(structs.MsgTypeTestSetup, 1, job); err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1, job)) - if err := state.UpsertJob(structs.MsgTypeTestSetup, 2, job); err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 2, job.Copy())) // Update the stability to true err := state.UpdateJobStability(3, job.Namespace, job.ID, 0, true) - if err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(t, err) // Check that the job was updated properly ws := memdb.NewWatchSet() - jout, _ := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) - if err != nil { - t.Fatalf("bad: %v", err) - } - if jout == nil { - t.Fatalf("bad: %#v", jout) - } - if !jout.Stable { - t.Fatalf("job not marked stable %#v", jout) - } + jout, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) + require.NoError(t, err) + require.NotNil(t, jout) + require.True(t, jout.Stable, "job not marked as stable") // Update the stability to false err = state.UpdateJobStability(3, job.Namespace, job.ID, 0, false) @@ -6873,16 +6861,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) { } // Check that the job was updated properly - jout, _ = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) - if err != nil { - t.Fatalf("bad: %v", err) - } - if jout == nil { - t.Fatalf("bad: %#v", jout) - } - if jout.Stable { - t.Fatalf("job marked stable %#v", jout) - } + jout, err = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) + require.NoError(t, err) + require.NotNil(t, jout) + require.False(t, jout.Stable) } // Test that nonexistent deployment can't be promoted diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 12b3b5b55c4a..493c6fefcc29 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -293,6 +293,23 @@ func TestConsulConnect_CopyEquals(t *testing.T) { require.False(t, c.Equals(o)) } +func TestConsulConnect_GatewayProxy_CopyEquals(t *testing.T) { + t.Parallel() + + c := &ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(1 * time.Second), + EnvoyGatewayBindTaggedAddresses: false, + EnvoyGatewayBindAddresses: make(map[string]*ConsulGatewayBindAddress), + } + + require.NoError(t, c.Validate()) + + // Copies should be equivalent + o := c.Copy() + require.Equal(t, c, o) + require.True(t, c.Equals(o)) +} + func TestSidecarTask_MergeIntoTask(t *testing.T) { t.Parallel()