Skip to content

Commit

Permalink
Merge pull request #1603 from hashicorp/f-job-validate
Browse files Browse the repository at this point in the history
Job Register validates Vault Tokens
  • Loading branch information
dadgar committed Aug 17, 2016
2 parents 6a76a25 + 0fffe68 commit 7c65bda
Show file tree
Hide file tree
Showing 9 changed files with 575 additions and 6 deletions.
37 changes: 37 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"fmt"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -67,6 +68,42 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
}

// Ensure that the job has permissions for the requested Vault tokens
desiredPolicies := structs.VaultPoliciesSet(args.Job.VaultPolicies())
if len(desiredPolicies) != 0 {
vconf := j.srv.config.VaultConfig
if !vconf.Enabled {
return fmt.Errorf("Vault not enabled and Vault policies requested")
}

// Have to check if the user has permissions
if !vconf.AllowUnauthenticated {
if args.Job.VaultToken == "" {
return fmt.Errorf("Vault policies requested but missing Vault Token")
}

vault := j.srv.vault
s, err := vault.LookupToken(args.Job.VaultToken)
if err != nil {
return err
}

allowedPolicies, err := PoliciesFrom(s)
if err != nil {
return err
}

subset, offending := structs.SliceStringIsSubset(allowedPolicies, desiredPolicies)
if !subset {
return fmt.Errorf("Passed Vault Token doesn't allow access to the following policies: %s",
strings.Join(offending, ", "))
}
}
}

// Clear the Vault token
args.Job.VaultToken = ""

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
Expand Down
184 changes: 184 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"fmt"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -360,6 +361,189 @@ func TestJobEndpoint_Register_EnforceIndex(t *testing.T) {
}
}

func TestJobEndpoint_Register_Vault_Disabled(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
c.VaultConfig.Enabled = false
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request with a job asking for a vault policy
job := mock.Job()
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), "Vault not enabled") {
t.Fatalf("expected Vault not enabled error: %v", err)
}
}

func TestJobEndpoint_Register_Vault_AllowUnauthenticated(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Enable vault and allow authenticated
s1.config.VaultConfig.Enabled = true
s1.config.VaultConfig.AllowUnauthenticated = true

// Replace the Vault Client on the server
s1.vault = &TestVaultClient{}

// Create the register request with a job asking for a vault policy
job := mock.Job()
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err != nil {
t.Fatalf("bad: %v", err)
}

// Check for the job in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
}

func TestJobEndpoint_Register_Vault_NoToken(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Enable vault
s1.config.VaultConfig.Enabled = true
s1.config.VaultConfig.AllowUnauthenticated = false

// Replace the Vault Client on the server
s1.vault = &TestVaultClient{}

// Create the register request with a job asking for a vault policy but
// don't send a Vault token
job := mock.Job()
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), "missing Vault Token") {
t.Fatalf("expected Vault not enabled error: %v", err)
}
}

func TestJobEndpoint_Register_Vault_Policies(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Enable vault
s1.config.VaultConfig.Enabled = true
s1.config.VaultConfig.AllowUnauthenticated = false

// Replace the Vault Client on the server
tvc := &TestVaultClient{}
s1.vault = tvc

// Add three tokens: one that allows the requesting policy, one that does
// not and one that returns an error
policy := "foo"

badToken := structs.GenerateUUID()
badPolicies := []string{"a", "b", "c"}
tvc.SetLookupTokenAllowedPolicies(badToken, badPolicies)

goodToken := structs.GenerateUUID()
goodPolicies := []string{"foo", "bar", "baz"}
tvc.SetLookupTokenAllowedPolicies(goodToken, goodPolicies)

errToken := structs.GenerateUUID()
expectedErr := fmt.Errorf("return errors from vault")
tvc.SetLookupTokenError(errToken, expectedErr)

// Create the register request with a job asking for a vault policy but
// send the bad Vault token
job := mock.Job()
job.VaultToken = badToken
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{policy}}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(),
"doesn't allow access to the following policies: "+policy) {
t.Fatalf("expected permission denied error: %v", err)
}

// Use the err token
job.VaultToken = errToken
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), expectedErr.Error()) {
t.Fatalf("expected permission denied error: %v", err)
}

// Use the good token
job.VaultToken = goodToken

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("bad: %v", err)
}

// Check for the job in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
if out.VaultToken != "" {
t.Fatalf("vault token not cleared")
}
}

func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
Expand Down
41 changes: 41 additions & 0 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,44 @@ func CopySliceConstraints(s []*Constraint) []*Constraint {
}
return c
}

// SliceStringIsSubset returns whether the smaller set of strings is a subset of
// the larger. If the smaller slice is not a subset, the offending elements are
// returned.
func SliceStringIsSubset(larger, smaller []string) (bool, []string) {
largerSet := make(map[string]struct{}, len(larger))
for _, l := range larger {
largerSet[l] = struct{}{}
}

subset := true
var offending []string
for _, s := range smaller {
if _, ok := largerSet[s]; !ok {
subset = false
offending = append(offending, s)
}
}

return subset, offending
}

// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
func VaultPoliciesSet(policies map[string]map[string][]string) []string {
set := make(map[string]struct{})

for _, tgp := range policies {
for _, tp := range tgp {
for _, p := range tp {
set[p] = struct{}{}
}
}
}

flattened := make([]string, 0, len(set))
for p := range set {
flattened = append(flattened, p)
}
return flattened
}
15 changes: 15 additions & 0 deletions nomad/structs/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,18 @@ func TestGenerateUUID(t *testing.T) {
}
}
}

func TestSliceStringIsSubset(t *testing.T) {
l := []string{"a", "b", "c"}
s := []string{"d"}

sub, offending := SliceStringIsSubset(l, l[:1])
if !sub || len(offending) != 0 {
t.Fatalf("bad %v %v", sub, offending)
}

sub, offending = SliceStringIsSubset(l, s)
if sub || len(offending) == 0 || offending[0] != "d" {
t.Fatalf("bad %v %v", sub, offending)
}
}
20 changes: 20 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,26 @@ func (j *Job) IsPeriodic() bool {
return j.Periodic != nil
}

// VaultPolicies returns the set of Vault policies per task group, per task
func (j *Job) VaultPolicies() map[string]map[string][]string {
policies := make(map[string]map[string][]string, len(j.TaskGroups))

for _, tg := range j.TaskGroups {
tgPolicies := make(map[string][]string, len(tg.Tasks))
policies[tg.Name] = tgPolicies

for _, task := range tg.Tasks {
if task.Vault == nil {
continue
}

tgPolicies[task.Name] = task.Vault.Policies
}
}

return policies
}

// JobListStub is used to return a subset of job information
// for the job list
type JobListStub struct {
Expand Down
Loading

0 comments on commit 7c65bda

Please sign in to comment.