Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server side Vault telemetry #2318

Merged
merged 3 commits into from
Feb 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion client/vaultclient/vaultclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,13 @@ func (c *vaultClient) DeriveToken(alloc *structs.Allocation, taskNames []string)
// Use the token supplied to interact with vault
c.client.SetToken("")

return c.tokenDeriver(alloc, taskNames, c.client)
tokens, err := c.tokenDeriver(alloc, taskNames, c.client)
if err != nil {
c.logger.Printf("[ERR] client.vault: failed to derive token for allocation %q and tasks %v: %v", alloc.ID, taskNames, err)
return nil, err
}

return tokens, nil
}

// GetConsulACL creates a vault API client and reads from vault a consul ACL
Expand Down
8 changes: 4 additions & 4 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,

secret, err := n.srv.vault.CreateToken(ctx, alloc, task)
if err != nil {
wrapped := fmt.Errorf("failed to create token for task %q: %v", task, err)
wrapped := fmt.Errorf("failed to create token for task %q on alloc %q: %v", task, alloc.ID, err)
if rerr, ok := err.(*structs.RecoverableError); ok && rerr.Recoverable {
// If the error is recoverable, propogate it
return structs.NewRecoverableError(wrapped, true)
Expand Down Expand Up @@ -1117,10 +1117,10 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,

// If there was an error revoke the created tokens
if createErr != nil {
n.srv.logger.Printf("[ERR] nomad.node: Vault token creation failed: %v", createErr)
n.srv.logger.Printf("[ERR] nomad.node: Vault token creation for alloc %q failed: %v", alloc.ID, createErr)

if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil {
n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation failed: %v", revokeErr)
n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation for alloc %q failed: %v", alloc.ID, revokeErr)
}

if rerr, ok := createErr.(*structs.RecoverableError); ok {
Expand All @@ -1136,7 +1136,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
req := structs.VaultAccessorsRequest{Accessors: accessors}
_, index, err := n.srv.raftApply(structs.VaultAccessorRegisterRequestType, &req)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors failed: %v", err)
n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors for alloc %q failed: %v", alloc.ID, err)

// Determine if we can recover from the error
retry := false
Expand Down
3 changes: 3 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger)
// Emit metrics for the blocked eval tracker.
go blockedEvals.EmitStats(time.Second, s.shutdownCh)

// Emit metrics for the Vault client.
go s.vault.EmitStats(time.Second, s.shutdownCh)

// Emit metrics
go s.heartbeatStats()

Expand Down
86 changes: 81 additions & 5 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"gopkg.in/tomb.v2"

metrics "github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
Expand Down Expand Up @@ -135,6 +136,21 @@ type VaultClient interface {

// Running returns whether the Vault client is running
Running() bool

// Stats returns the Vault clients statistics
Stats() *VaultStats

// EmitStats emits that clients statistics at the given period until stopCh
// is called.
EmitStats(period time.Duration, stopCh chan struct{})
}

// VaultStats returns all the stats about Vault tokens created and managed by
// Nomad.
type VaultStats struct {
// TrackedForRevoke is the count of tokens that are being tracked to be
// revoked since they could not be immediately revoked.
TrackedForRevoke int
}

// PurgeVaultAccessor is called to remove VaultAccessors from the system. If
Expand Down Expand Up @@ -204,6 +220,10 @@ type vaultClient struct {
tomb *tomb.Tomb
logger *log.Logger

// stats stores the stats
stats *VaultStats
statsLock sync.RWMutex

// l is used to lock the configuration aspects of the client such that
// multiple callers can't cause conflicting config updates
l sync.Mutex
Expand All @@ -227,6 +247,7 @@ func NewVaultClient(c *config.VaultConfig, logger *log.Logger, purgeFn PurgeVaul
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
stats: new(VaultStats),
}

if v.config.IsEnabled() {
Expand Down Expand Up @@ -821,7 +842,6 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta
if !v.Enabled() {
return nil, fmt.Errorf("Vault integration disabled")
}

if !v.Active() {
return nil, structs.NewRecoverableError(fmt.Errorf("Vault client not active"), true)
}
Expand All @@ -833,6 +853,9 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta
return nil, fmt.Errorf("Connection to Vault failed: %v", err)
}

// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "create_token"}, time.Now())

// Retrieve the Vault block for the task
policies := a.Job.VaultPolicies()
if policies == nil {
Expand Down Expand Up @@ -908,6 +931,9 @@ func (v *vaultClient) LookupToken(ctx context.Context, token string) (*vapi.Secr
return nil, fmt.Errorf("Connection to Vault failed: %v", err)
}

// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "lookup_token"}, time.Now())

// Ensure we are under our rate limit
if err := v.limiter.Wait(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -943,6 +969,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return fmt.Errorf("Vault client not active")
}

// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "revoke_tokens"}, time.Now())

// Check if we have established a connection with Vault. If not just add it
// to the queue
if established, err := v.ConnectionEstablished(); !established && err == nil {
Expand All @@ -952,22 +981,29 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
v.storeForRevocation(accessors)
}

// Track that we are abandoning these accessors.
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors)))
return nil
}

// Attempt to revoke immediately and if it fails, add it to the revoke queue
err := v.parallelRevoke(ctx, accessors)
if !committed {
if err != nil {
// If it is uncommitted, it is a best effort revoke as it will shortly
// TTL within the cubbyhole and has not been leaked to any outside
// system
return nil
}
if !committed {
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors)))
return nil
}

if err != nil {
v.logger.Printf("[WARN] vault: failed to revoke tokens. Will reattempt til TTL: %v", err)
v.storeForRevocation(accessors)
return nil
} else if !committed {
// Mark that it was revoked but there is nothing to purge so exit
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_revoked"}, float32(len(accessors)))
return nil
}

if err := v.purgeFn(accessors); err != nil {
Expand All @@ -976,6 +1012,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return nil
}

// Track that it was revoked successfully
metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(accessors)))

return nil
}

Expand All @@ -984,10 +1023,13 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
// time.
func (v *vaultClient) storeForRevocation(accessors []*structs.VaultAccessor) {
v.revLock.Lock()
v.statsLock.Lock()
now := time.Now()
for _, a := range accessors {
v.revoking[a] = now.Add(time.Duration(a.CreationTTL) * time.Second)
}
v.stats.TrackedForRevoke = len(v.revoking)
v.statsLock.Unlock()
v.revLock.Unlock()
}

Expand Down Expand Up @@ -1103,12 +1145,19 @@ func (v *vaultClient) revokeDaemon() {
continue
}

// Track that tokens were revoked successfully
metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(revoking)))

// Can delete from the tracked list now that we have purged
v.revLock.Lock()
v.statsLock.Lock()
for _, va := range revoking {
delete(v.revoking, va)
}
v.stats.TrackedForRevoke = len(v.revoking)
v.statsLock.Unlock()
v.revLock.Unlock()

}
}
}
Expand Down Expand Up @@ -1137,3 +1186,30 @@ func (v *vaultClient) setLimit(l rate.Limit) {
defer v.l.Unlock()
v.limiter = rate.NewLimiter(l, int(l))
}

// Stats is used to query the state of the blocked eval tracker.
func (v *vaultClient) Stats() *VaultStats {
// Allocate a new stats struct
stats := new(VaultStats)

v.statsLock.RLock()
defer v.statsLock.RUnlock()

// Copy all the stats
stats.TrackedForRevoke = v.stats.TrackedForRevoke

return stats
}

// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (v *vaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {
for {
select {
case <-time.After(period):
stats := v.Stats()
metrics.SetGauge([]string{"nomad", "vault", "distributed_tokens_revoking"}, float32(stats.TrackedForRevoke))
case <-stopCh:
return
}
}
}
4 changes: 4 additions & 0 deletions nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,10 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
if len(client.revoking) != 2 {
t.Fatalf("didn't add to revoke loop")
}

if client.Stats().TrackedForRevoke != 2 {
t.Fatalf("didn't add to revoke loop")
}
}

func TestVaultClient_RevokeTokens_Root(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions nomad/vault_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"context"
"time"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
Expand Down Expand Up @@ -134,7 +135,9 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
return nil
}

func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stats() *VaultStats { return new(VaultStats) }
func (v *TestVaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {}