From 826ef285e7ffd80f743348cef60530b52ec746e1 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 14 Feb 2017 16:02:18 -0800 Subject: [PATCH 1/3] Add server metrics --- nomad/server.go | 3 ++ nomad/vault.go | 86 +++++++++++++++++++++++++++++++++++++++--- nomad/vault_test.go | 4 ++ nomad/vault_testing.go | 11 ++++-- 4 files changed, 95 insertions(+), 9 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index 3d1446f33b17..112b9e2b90d4 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -286,6 +286,9 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger) // Emit metrics for the blocked eval tracker. go blockedEvals.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the Vault client. + go s.vault.EmitStats(time.Second, s.shutdownCh) + // Emit metrics go s.heartbeatStats() diff --git a/nomad/vault.go b/nomad/vault.go index 898825d6ea63..19f533bceb66 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -13,6 +13,7 @@ import ( "gopkg.in/tomb.v2" + metrics "github.com/armon/go-metrics" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" @@ -135,6 +136,21 @@ type VaultClient interface { // Running returns whether the Vault client is running Running() bool + + // Stats returns the Vault clients statistics + Stats() *VaultStats + + // EmitStats emits that clients statistics at the given period until stopCh + // is called. + EmitStats(period time.Duration, stopCh chan struct{}) +} + +// VaultStats returns all the stats about Vault tokens created and managed by +// Nomad. +type VaultStats struct { + // TrackedForRevoke is the count of tokens that are being tracked to be + // revoked since they could not be immediately revoked. + TrackedForRevoke int } // PurgeVaultAccessor is called to remove VaultAccessors from the system. If @@ -204,6 +220,10 @@ type vaultClient struct { tomb *tomb.Tomb logger *log.Logger + // stats stores the stats + stats *VaultStats + statsLock sync.RWMutex + // l is used to lock the configuration aspects of the client such that // multiple callers can't cause conflicting config updates l sync.Mutex @@ -227,6 +247,7 @@ func NewVaultClient(c *config.VaultConfig, logger *log.Logger, purgeFn PurgeVaul revoking: make(map[*structs.VaultAccessor]time.Time), purgeFn: purgeFn, tomb: &tomb.Tomb{}, + stats: new(VaultStats), } if v.config.IsEnabled() { @@ -821,7 +842,6 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta if !v.Enabled() { return nil, fmt.Errorf("Vault integration disabled") } - if !v.Active() { return nil, structs.NewRecoverableError(fmt.Errorf("Vault client not active"), true) } @@ -833,6 +853,9 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta return nil, fmt.Errorf("Connection to Vault failed: %v", err) } + // Track how long the request takes + defer metrics.MeasureSince([]string{"nomad", "vault", "create_token"}, time.Now()) + // Retrieve the Vault block for the task policies := a.Job.VaultPolicies() if policies == nil { @@ -908,6 +931,9 @@ func (v *vaultClient) LookupToken(ctx context.Context, token string) (*vapi.Secr return nil, fmt.Errorf("Connection to Vault failed: %v", err) } + // Track how long the request takes + defer metrics.MeasureSince([]string{"nomad", "vault", "lookup_token"}, time.Now()) + // Ensure we are under our rate limit if err := v.limiter.Wait(ctx); err != nil { return nil, err @@ -943,6 +969,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau return fmt.Errorf("Vault client not active") } + // Track how long the request takes + defer metrics.MeasureSince([]string{"nomad", "vault", "revoke_tokens"}, time.Now()) + // Check if we have established a connection with Vault. If not just add it // to the queue if established, err := v.ConnectionEstablished(); !established && err == nil { @@ -952,22 +981,29 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau v.storeForRevocation(accessors) } + // Track that we are abandoning these accessors. + metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors))) return nil } // Attempt to revoke immediately and if it fails, add it to the revoke queue err := v.parallelRevoke(ctx, accessors) - if !committed { + if err != nil { // If it is uncommitted, it is a best effort revoke as it will shortly // TTL within the cubbyhole and has not been leaked to any outside // system - return nil - } + if !committed { + metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors))) + return nil + } - if err != nil { v.logger.Printf("[WARN] vault: failed to revoke tokens. Will reattempt til TTL: %v", err) v.storeForRevocation(accessors) return nil + } else if !committed { + // Mark that it was revoked but there is nothing to purge so exit + metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_revoked"}, float32(len(accessors))) + return nil } if err := v.purgeFn(accessors); err != nil { @@ -976,6 +1012,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau return nil } + // Track that it was revoked successfully + metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(accessors))) + return nil } @@ -984,10 +1023,13 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau // time. func (v *vaultClient) storeForRevocation(accessors []*structs.VaultAccessor) { v.revLock.Lock() + v.statsLock.Lock() now := time.Now() for _, a := range accessors { v.revoking[a] = now.Add(time.Duration(a.CreationTTL) * time.Second) } + v.stats.TrackedForRevoke = len(v.revoking) + v.statsLock.Unlock() v.revLock.Unlock() } @@ -1103,12 +1145,19 @@ func (v *vaultClient) revokeDaemon() { continue } + // Track that tokens were revoked successfully + metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(revoking))) + // Can delete from the tracked list now that we have purged v.revLock.Lock() + v.statsLock.Lock() for _, va := range revoking { delete(v.revoking, va) } + v.stats.TrackedForRevoke = len(v.revoking) + v.statsLock.Unlock() v.revLock.Unlock() + } } } @@ -1137,3 +1186,30 @@ func (v *vaultClient) setLimit(l rate.Limit) { defer v.l.Unlock() v.limiter = rate.NewLimiter(l, int(l)) } + +// Stats is used to query the state of the blocked eval tracker. +func (v *vaultClient) Stats() *VaultStats { + // Allocate a new stats struct + stats := new(VaultStats) + + v.statsLock.RLock() + defer v.statsLock.RUnlock() + + // Copy all the stats + stats.TrackedForRevoke = v.stats.TrackedForRevoke + + return stats +} + +// EmitStats is used to export metrics about the blocked eval tracker while enabled +func (v *vaultClient) EmitStats(period time.Duration, stopCh chan struct{}) { + for { + select { + case <-time.After(period): + stats := v.Stats() + metrics.SetGauge([]string{"nomad", "vault", "distributed_tokens_revoking"}, float32(stats.TrackedForRevoke)) + case <-stopCh: + return + } + } +} diff --git a/nomad/vault_test.go b/nomad/vault_test.go index 38c9ceea7e6d..648ca26945bc 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -993,6 +993,10 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) { if len(client.revoking) != 2 { t.Fatalf("didn't add to revoke loop") } + + if client.Stats().TrackedForRevoke != 2 { + t.Fatalf("didn't add to revoke loop") + } } func TestVaultClient_RevokeTokens_Root(t *testing.T) { diff --git a/nomad/vault_testing.go b/nomad/vault_testing.go index efeea31bb5d5..d5a361c9e5cb 100644 --- a/nomad/vault_testing.go +++ b/nomad/vault_testing.go @@ -2,6 +2,7 @@ package nomad import ( "context" + "time" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" @@ -134,7 +135,9 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs return nil } -func (v *TestVaultClient) Stop() {} -func (v *TestVaultClient) SetActive(enabled bool) {} -func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil } -func (v *TestVaultClient) Running() bool { return true } +func (v *TestVaultClient) Stop() {} +func (v *TestVaultClient) SetActive(enabled bool) {} +func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil } +func (v *TestVaultClient) Running() bool { return true } +func (v *TestVaultClient) Stats() *VaultStats { return new(VaultStats) } +func (v *TestVaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {} From 639afa9253d74588c0a511286f970bc31fc4f324 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 14 Feb 2017 16:26:49 -0800 Subject: [PATCH 2/3] Include alloc on server side err/logs --- nomad/node_endpoint.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index a09c6232d7aa..d7516025cd7e 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1061,7 +1061,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, secret, err := n.srv.vault.CreateToken(ctx, alloc, task) if err != nil { - wrapped := fmt.Errorf("failed to create token for task %q: %v", task, err) + wrapped := fmt.Errorf("failed to create token for task %q on alloc %q: %v", task, alloc.ID, err) if rerr, ok := err.(*structs.RecoverableError); ok && rerr.Recoverable { // If the error is recoverable, propogate it return structs.NewRecoverableError(wrapped, true) @@ -1117,10 +1117,10 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, // If there was an error revoke the created tokens if createErr != nil { - n.srv.logger.Printf("[ERR] nomad.node: Vault token creation failed: %v", createErr) + n.srv.logger.Printf("[ERR] nomad.node: Vault token creation for alloc %q failed: %v", alloc.ID, createErr) if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil { - n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation failed: %v", revokeErr) + n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation for alloc %q failed: %v", alloc.ID, revokeErr) } if rerr, ok := createErr.(*structs.RecoverableError); ok { @@ -1136,7 +1136,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, req := structs.VaultAccessorsRequest{Accessors: accessors} _, index, err := n.srv.raftApply(structs.VaultAccessorRegisterRequestType, &req) if err != nil { - n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors failed: %v", err) + n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors for alloc %q failed: %v", alloc.ID, err) // Determine if we can recover from the error retry := false From 22b64a53cfb04b92b105bbe8b11f3b96cf08c3f9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 14 Feb 2017 16:46:54 -0800 Subject: [PATCH 3/3] Better derive token logging --- client/vaultclient/vaultclient.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/vaultclient/vaultclient.go b/client/vaultclient/vaultclient.go index 701b2d479c0c..a5d0f512c72d 100644 --- a/client/vaultclient/vaultclient.go +++ b/client/vaultclient/vaultclient.go @@ -249,7 +249,13 @@ func (c *vaultClient) DeriveToken(alloc *structs.Allocation, taskNames []string) // Use the token supplied to interact with vault c.client.SetToken("") - return c.tokenDeriver(alloc, taskNames, c.client) + tokens, err := c.tokenDeriver(alloc, taskNames, c.client) + if err != nil { + c.logger.Printf("[ERR] client.vault: failed to derive token for allocation %q and tasks %v: %v", alloc.ID, taskNames, err) + return nil, err + } + + return tokens, nil } // GetConsulACL creates a vault API client and reads from vault a consul ACL