Skip to content

Commit

Permalink
Always allow idempotency key meta. Tests for idempotent dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmunda committed Jun 29, 2021
1 parent c4be87a commit b5d21a9
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
3 changes: 2 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2039,7 +2039,8 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job)
for k := range req.Meta {
_, req := required[k]
_, opt := optional[k]
if !req && !opt {
// Always allow the idempotency key
if !req && !opt && k != MetaDispatchIdempotencyKey {
unpermitted[k] = struct{}{}
}
}
Expand Down
69 changes: 63 additions & 6 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6133,14 +6133,24 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
reqInputDataTooLarge := &structs.JobDispatchRequest{
Payload: make([]byte, DispatchPayloadSizeLimit+100),
}
reqIdempotentMeta := &structs.JobDispatchRequest{
Meta: map[string]string{
MetaDispatchIdempotencyKey: "foo",
},
}

type existingIdempotentChildJob struct {
isTerminal bool
}

type testCase struct {
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
existingIdempotentJob *existingIdempotentChildJob
}
cases := []testCase{
{
Expand Down Expand Up @@ -6233,6 +6243,32 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
err: true,
errStr: "stopped",
},
{
name: "idempotent meta key, no existing child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
err: false,
existingIdempotentJob: nil,
},
{
name: "idempotent meta key, w/ existing non-terminal child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
err: true,
errStr: "dispatch violates idempotency key of non-terminal child job",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: false,
},
},
{
name: "idempotent meta key, w/ existing terminal job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
err: false,
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: true,
},
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -6266,6 +6302,27 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
Namespace: tc.parameterizedJob.Namespace,
}

// Dispatch with the same request so a child job w/ the idempotency key exists
if tc.existingIdempotentJob != nil {
var initialDispatchResp structs.JobDispatchResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialDispatchResp); err != nil {
t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err)
}

if tc.existingIdempotentJob.isTerminal {
eval, err := s1.State().EvalByID(nil, initialDispatchResp.EvalID)
if err != nil {
t.Fatalf("Unexpected error fetching eval %v", err)
}
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialDispatchResp.Index+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("Unexpected error completing eval %v", err)
}
}
}

var dispatchResp structs.JobDispatchResponse
dispatchErr := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &dispatchResp)

Expand Down

0 comments on commit b5d21a9

Please sign in to comment.