From b5d21a919191a5c6a325fa5851fe9cb59c853a48 Mon Sep 17 00:00:00 2001 From: Alex Munda Date: Tue, 29 Jun 2021 10:02:30 -0500 Subject: [PATCH] Always allow idempotency key meta. Tests for idempotent dispatch --- nomad/job_endpoint.go | 3 +- nomad/job_endpoint_test.go | 69 ++++++++++++++++++++++++++++++++++---- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e5581fc003d2..29df8fb757eb 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -2039,7 +2039,8 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) for k := range req.Meta { _, req := required[k] _, opt := optional[k] - if !req && !opt { + // Always allow the idempotency key + if !req && !opt && k != MetaDispatchIdempotencyKey { unpermitted[k] = struct{}{} } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 889dacba1212..b893941a546b 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6133,14 +6133,24 @@ func TestJobEndpoint_Dispatch(t *testing.T) { reqInputDataTooLarge := &structs.JobDispatchRequest{ Payload: make([]byte, DispatchPayloadSizeLimit+100), } + reqIdempotentMeta := &structs.JobDispatchRequest{ + Meta: map[string]string{ + MetaDispatchIdempotencyKey: "foo", + }, + } + + type existingIdempotentChildJob struct { + isTerminal bool + } type testCase struct { - name string - parameterizedJob *structs.Job - dispatchReq *structs.JobDispatchRequest - noEval bool - err bool - errStr string + name string + parameterizedJob *structs.Job + dispatchReq *structs.JobDispatchRequest + noEval bool + err bool + errStr string + existingIdempotentJob *existingIdempotentChildJob } cases := []testCase{ { @@ -6233,6 +6243,32 @@ func TestJobEndpoint_Dispatch(t *testing.T) { err: true, errStr: "stopped", }, + { + name: "idempotent meta key, no existing child job", + parameterizedJob: d1, + dispatchReq: reqIdempotentMeta, + err: false, + existingIdempotentJob: nil, + }, + { + name: "idempotent meta key, w/ existing non-terminal child job", + parameterizedJob: d1, + dispatchReq: reqIdempotentMeta, + err: true, + errStr: "dispatch violates idempotency key of non-terminal child job", + existingIdempotentJob: &existingIdempotentChildJob{ + isTerminal: false, + }, + }, + { + name: "idempotent meta key, w/ existing terminal job", + parameterizedJob: d1, + dispatchReq: reqIdempotentMeta, + err: false, + existingIdempotentJob: &existingIdempotentChildJob{ + isTerminal: true, + }, + }, } for _, tc := range cases { @@ -6266,6 +6302,27 @@ func TestJobEndpoint_Dispatch(t *testing.T) { Namespace: tc.parameterizedJob.Namespace, } + // Dispatch with the same request so a child job w/ the idempotency key exists + if tc.existingIdempotentJob != nil { + var initialDispatchResp structs.JobDispatchResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialDispatchResp); err != nil { + t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err) + } + + if tc.existingIdempotentJob.isTerminal { + eval, err := s1.State().EvalByID(nil, initialDispatchResp.EvalID) + if err != nil { + t.Fatalf("Unexpected error fetching eval %v", err) + } + eval = eval.Copy() + eval.Status = structs.EvalStatusComplete + err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialDispatchResp.Index+1, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("Unexpected error completing eval %v", err) + } + } + } + var dispatchResp structs.JobDispatchResponse dispatchErr := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &dispatchResp)