Skip to content

Commit

Permalink
Nodes generate Secret ID and used for retrieving allocations and regi…
Browse files Browse the repository at this point in the history
…stering
  • Loading branch information
dadgar committed Aug 17, 2016
1 parent 803df05 commit ba51037
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 47 deletions.
55 changes: 35 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,33 +486,45 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}

// nodeID restores a persistent unique ID or generates a new one
func (c *Client) nodeID() (string, error) {
// nodeIDs restores the nodes persistent unique ID and SecretID or generates new
// ones
func (c *Client) nodeID() (id string, secret string, err error) {
// Do not persist in dev mode
if c.config.DevMode {
return structs.GenerateUUID(), nil
return structs.GenerateUUID(), structs.GenerateUUID(), nil
}

// Attempt to read existing ID
path := filepath.Join(c.config.StateDir, "client-id")
buf, err := ioutil.ReadFile(path)
idPath := filepath.Join(c.config.StateDir, "client-id")
idBuf, err := ioutil.ReadFile(idPath)
if err != nil && !os.IsNotExist(err) {
return "", err
return "", "", err
}

// Attempt to read existing secret ID
secretPath := filepath.Join(c.config.StateDir, "secret-id")
secretBuf, err := ioutil.ReadFile(secretPath)
if err != nil && !os.IsNotExist(err) {
return "", "", err
}

// Use existing ID if any
if len(buf) != 0 {
return string(buf), nil
if len(idBuf) != 0 && len(secretBuf) != 0 {
return string(idBuf), string(secretBuf), nil
}

// Generate new ID
id := structs.GenerateUUID()
id = structs.GenerateUUID()
secret = structs.GenerateUUID()

// Persist the ID
if err := ioutil.WriteFile(path, []byte(id), 0700); err != nil {
return "", err
// Persist the IDs
if err := ioutil.WriteFile(idPath, []byte(id), 0700); err != nil {
return "", "", err
}
return id, nil
if err := ioutil.WriteFile(secretPath, []byte(secret), 0700); err != nil {
return "", "", err
}
return id, secret, nil
}

// setupNode is used to setup the initial node
Expand All @@ -523,11 +535,13 @@ func (c *Client) setupNode() error {
c.config.Node = node
}
// Generate an iD for the node
var err error
node.ID, err = c.nodeID()
id, secretID, err := c.nodeID()
if err != nil {
return fmt.Errorf("node ID setup failed: %v", err)
}

node.ID = id
node.SecretID = secretID
if node.Attributes == nil {
node.Attributes = make(map[string]string)
}
Expand Down Expand Up @@ -827,6 +841,8 @@ func (c *Client) retryRegisterNode() {
for {
if err := c.registerNode(); err == nil {
break
} else {
c.logger.Printf("[ERR] client: %v", err)
}
select {
case <-time.After(c.retryIntv(registerRetryIntv)):
Expand Down Expand Up @@ -992,8 +1008,10 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
// The request and response for getting the map of allocations that should
// be running on the Node to their AllocModifyIndex which is incremented
// when the allocation is updated by the servers.
n := c.Node()
req := structs.NodeSpecificRequest{
NodeID: c.Node().ID,
NodeID: n.ID,
SecretID: n.SecretID,
QueryOptions: structs.QueryOptions{
Region: c.Region(),
AllowStale: true,
Expand Down Expand Up @@ -1417,10 +1435,7 @@ func (c *Client) collectHostStats() {

// emitStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitStats(hStats *stats.HostStats) {
nodeID, err := c.nodeID()
if err != nil {
return
}
nodeID := c.Node().ID
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used))
Expand Down
70 changes: 57 additions & 13 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,26 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
})
defer c1.Shutdown()

// Wait til the node is ready
waitTilNodeReady(c1, t)

job := mock.Job()
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
alloc.Job = job
alloc.JobID = job.ID
originalStatus := "foo"
alloc.ClientStatus = originalStatus

// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)); err != nil {
if err := state.UpsertJob(0, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(100, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
state.UpsertAllocs(100, []*structs.Allocation{alloc})
state.UpsertAllocs(101, []*structs.Allocation{alloc})

testutil.WaitForResult(func() (bool, error) {
out, err := state.AllocByID(alloc.ID)
Expand Down Expand Up @@ -391,21 +401,29 @@ func TestClient_WatchAllocs(t *testing.T) {
})
defer c1.Shutdown()

// Wait til the node is ready
waitTilNodeReady(c1, t)

// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.Job = job
alloc1.NodeID = c1.Node().ID
alloc2 := mock.Alloc()
alloc2.NodeID = c1.Node().ID
alloc2.JobID = job.ID
alloc2.Job = job

// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)); err != nil {
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)); err != nil {
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(102, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -421,7 +439,7 @@ func TestClient_WatchAllocs(t *testing.T) {
})

// Delete one allocation
err = state.DeleteEval(101, nil, []string{alloc1.ID})
err = state.DeleteEval(103, nil, []string{alloc1.ID})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -432,8 +450,7 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(102,
[]*structs.Allocation{alloc2_2})
err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -459,6 +476,18 @@ func TestClient_WatchAllocs(t *testing.T) {
})
}

func waitTilNodeReady(client *Client, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
n := client.Node()
if n.Status != structs.NodeStatusReady {
return false, fmt.Errorf("node not registered")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestClient_SaveRestoreState(t *testing.T) {
ctestutil.ExecCompatible(t)
s1, _ := testServer(t, nil)
Expand All @@ -471,18 +500,27 @@ func TestClient_SaveRestoreState(t *testing.T) {
})
defer c1.Shutdown()

// Wait til the node is ready
waitTilNodeReady(c1, t)

// Create mock allocations
job := mock.Job()
alloc1 := mock.Alloc()
alloc1.NodeID = c1.Node().ID
alloc1.Job = job
alloc1.JobID = job.ID
task := alloc1.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Config["args"] = []string{"100"}

state := s1.State()
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)); err != nil {
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}); err != nil {
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(102, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -491,7 +529,13 @@ func TestClient_SaveRestoreState(t *testing.T) {
c1.allocLock.RLock()
ar := c1.allocs[alloc1.ID]
c1.allocLock.RUnlock()
return ar != nil && ar.Alloc().ClientStatus == structs.AllocClientStatusRunning, nil
if ar == nil {
return false, fmt.Errorf("nil alloc runner")
}
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
Expand Down
9 changes: 5 additions & 4 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
func Node() *structs.Node {
node := &structs.Node{
ID: structs.GenerateUUID(),
SecretID: structs.GenerateUUID(),
Datacenter: "dc1",
Name: "foobar",
Attributes: map[string]string{
"kernel.name": "linux",
"arch": "x86",
"version": "0.1.0",
"driver.exec": "1",
"kernel.name": "linux",
"arch": "x86",
"nomad.version": "0.5.0",
"driver.exec": "1",
},
Resources: &structs.Resources{
CPU: 4000,
Expand Down
64 changes: 61 additions & 3 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -57,6 +58,18 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
if args.Node.Name == "" {
return fmt.Errorf("missing node name for client registration")
}
if len(args.Node.Attributes) == 0 {
return fmt.Errorf("missing attributes for client registration")
}
if args.Node.SecretID == "" {
// COMPAT: Remove after 0.6
// Need to check if this node is <0.4.x since SecretID is new in 0.5
if pre, err := nodePreSecretID(args.Node); err != nil {
return err
} else if !pre {
return fmt.Errorf("missing node secret ID for client registration")
}
}

// Default the status if none is given
if args.Node.Status == "" {
Expand Down Expand Up @@ -135,6 +148,22 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return nil
}

// nodePreSecretID is a helper that returns whether the node is on a version
// that is before SecretIDs were introduced
func nodePreSecretID(node *structs.Node) (bool, error) {
a := node.Attributes
if a == nil {
return false, fmt.Errorf("node doesn't have attributes set")
}

v, ok := a["nomad.version"]
if !ok {
return false, fmt.Errorf("missing Nomad version in attributes")
}

return !strings.HasPrefix(v, "0.5"), nil
}

// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading.
func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
reply.LeaderRPCAddr = n.srv.raft.Leader()
Expand Down Expand Up @@ -217,7 +246,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct

// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
return fmt.Errorf("missing node ID for client status update")
}
if !structs.ValidNodeStatus(args.Status) {
return fmt.Errorf("invalid status for node")
Expand All @@ -236,6 +265,9 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
return fmt.Errorf("node not found")
}

// XXX: Could use the SecretID here but have to update the heartbeat system
// to track SecretIDs.

// Update the timestamp of when the node status was updated
node.StatusUpdatedAt = time.Now().Unix()

Expand Down Expand Up @@ -423,15 +455,18 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest,
}

// Setup the output
reply.Node = out
if out != nil {
// Clear the secret ID
reply.Node = out.Copy()
reply.Node.SecretID = ""
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("nodes")
if err != nil {
return err
}
reply.Node = nil
reply.Index = index
}

Expand Down Expand Up @@ -524,11 +559,34 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
if err != nil {
return err
}
allocs, err := snap.AllocsByNode(args.NodeID)

// Look for the node
node, err := snap.NodeByID(args.NodeID)
if err != nil {
return err
}

var allocs []*structs.Allocation
if node != nil {
// COMPAT: Remove in 0.6
// Check if the node should have a SecretID set
if args.SecretID == "" {
if pre, err := nodePreSecretID(node); err != nil {
return err
} else if !pre {
return fmt.Errorf("missing node secret ID for client status update")
}
} else if args.SecretID != node.SecretID {
return fmt.Errorf("node secret ID does not match")
}

var err error
allocs, err = snap.AllocsByNode(args.NodeID)
if err != nil {
return err
}
}

reply.Allocs = make(map[string]uint64)
// Setup the output
if len(allocs) != 0 {
Expand Down
Loading

0 comments on commit ba51037

Please sign in to comment.