Skip to content

Commit

Permalink
job_endpoint: Validate volume permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
endocrimes committed Aug 12, 2019
1 parent 92250c3 commit 0ecaecf
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 27 deletions.
18 changes: 18 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
// Validate Volume Permsissions
for _, tg := range args.Job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type != structs.VolumeTypeHost {
return structs.ErrPermissionDenied
}

cfg, err := structs.ParseHostVolumeConfig(vol.Config)
if err != nil {
return structs.ErrPermissionDenied
}

if !aclObj.AllowHostVolumeOperation(cfg.Source, acl.HostVolumeCapabilityMount) {
return structs.ErrPermissionDenied
}
}
}

// Check if override is set and we do not have permissions
if args.PolicyOverride {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySentinelOverride) {
Expand Down
118 changes: 91 additions & 27 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,44 +106,108 @@ func TestJobEndpoint_Register(t *testing.T) {

func TestJobEndpoint_Register_ACL(t *testing.T) {
t.Parallel()

s1, root := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
newVolumeJob := func() *structs.Job {
j := mock.Job()
tg := j.TaskGroups[0]
tg.Volumes = map[string]*structs.VolumeRequest{
"ca-certs": {
Type: structs.VolumeTypeHost,
Config: map[string]interface{}{
"source": "prod-ca-certs",
},
},
}

// Try without a token, expect failure
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err == nil {
t.Fatalf("expected error")
}
tg.Tasks[0].VolumeMounts = []*structs.VolumeMount{
{
Volume: "ca-certs",
Destination: "/etc/ca-certificates",
ReadOnly: true,
},
}

// Try with a token
req.AuthToken = root.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
return j
}

// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
submitJobPolicy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob, acl.NamespaceCapabilitySubmitJob})

submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-submit-job", submitJobPolicy)

volumesPolicy := mock.HostVolumePolicy("prod-*", "", []string{acl.HostVolumeCapabilityMount})

submitJobWithVolumesToken := mock.CreatePolicyAndToken(t, s1.State(), 1002, "test-submit-volumes", submitJobPolicy+"\n"+volumesPolicy)

cases := []struct {
Name string
Job *structs.Job
Token string
ErrExpected bool
}{
{
Name: "without a token",
Job: mock.Job(),
Token: "",
ErrExpected: true,
},
{
Name: "with a token",
Job: mock.Job(),
Token: root.SecretID,
ErrExpected: false,
},
{
Name: "with a token that can submit a job, but not use a required volumes",
Job: newVolumeJob(),
Token: submitJobToken.SecretID,
ErrExpected: true,
},
{
Name: "with a token that can submit a job, and use all required volumes",
Job: newVolumeJob(),
Token: submitJobWithVolumesToken.SecretID,
ErrExpected: false,
},
}
if out == nil {
t.Fatalf("expected job")

for _, tt := range cases {
t.Run(tt.Name, func(t *testing.T) {
codec := rpcClient(t, s1)
req := &structs.JobRegisterRequest{
Job: tt.Job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
req.AuthToken = tt.Token

// Try without a token, expect failure
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)

// If we expected an error, then the job should _not_ be registered.
if tt.ErrExpected {
require.Error(t, err, "expected error")
return
}

if !tt.ErrExpected {
require.NoError(t, err, "unexpected error")
}

require.NotEqual(t, 0, resp.Index)

state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, tt.Job.Namespace, tt.Job.ID)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, tt.Job.TaskGroups, out.TaskGroups)
})
}
}

Expand Down
20 changes: 20 additions & 0 deletions nomad/mock/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ func NamespacePolicy(namespace string, policy string, capabilities []string) str
return policyHCL
}

// HostVolumePolicy is a helper for generating the policy hcl for a given
// host-volume. Either policy or capabilities may be nil but not both.
func HostVolumePolicy(vol string, policy string, capabilities []string) string {
policyHCL := fmt.Sprintf("host_volume %q {", vol)
if policy != "" {
policyHCL += fmt.Sprintf("\n\tpolicy = %q", policy)
}
if len(capabilities) != 0 {
for i, s := range capabilities {
if !strings.HasPrefix(s, "\"") {
capabilities[i] = strconv.Quote(s)
}
}

policyHCL += fmt.Sprintf("\n\tcapabilities = [%v]", strings.Join(capabilities, ","))
}
policyHCL += "\n}"
return policyHCL
}

// AgentPolicy is a helper for generating the hcl for a given agent policy.
func AgentPolicy(policy string) string {
return fmt.Sprintf("agent {\n\tpolicy = %q\n}\n", policy)
Expand Down

0 comments on commit 0ecaecf

Please sign in to comment.