diff --git a/nomad/consul.go b/nomad/consul.go index 14511ad4bad7..7036ee480a66 100644 --- a/nomad/consul.go +++ b/nomad/consul.go @@ -95,6 +95,9 @@ type ConsulACLsAPI interface { // RevokeTokens instructs Consul to revoke the given token accessors. RevokeTokens(context.Context, []*structs.SITokenAccessor, bool) bool + // MarkForRevocation marks the tokens for background revocation + MarkForRevocation([]*structs.SITokenAccessor) + // Stop is used to stop background token revocations. Intended to be used // on Nomad Server shutdown. Stop() @@ -143,6 +146,10 @@ type consulACLsAPI struct { } func NewConsulACLsAPI(aclClient consul.ACLsAPI, logger hclog.Logger, purgeFunc PurgeSITokenAccessorFunc) *consulACLsAPI { + if purgeFunc == nil { + purgeFunc = func([]*structs.SITokenAccessor) error { return nil } + } + c := &consulACLsAPI{ aclClient: aclClient, limiter: rate.NewLimiter(siTokenRequestRateLimit, int(siTokenRequestRateLimit)), @@ -285,6 +292,10 @@ func (c *consulACLsAPI) RevokeTokens(ctx context.Context, accessors []*structs.S return false } +func (c *consulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) { + c.storeForRevocation(accessors) +} + func (c *consulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor) { c.bgRevokeLock.Lock() defer c.bgRevokeLock.Unlock() @@ -369,6 +380,10 @@ func (c *consulACLsAPI) bgRetryRevokeDaemon() { } } +// maxConsulRevocationBatchSize is the maximum tokens a bgRetryRevoke should revoke +// at any given time. +const maxConsulRevocationBatchSize = 1000 + func (c *consulACLsAPI) bgRetryRevoke() { c.bgRevokeLock.Lock() defer c.bgRevokeLock.Unlock() @@ -380,7 +395,11 @@ func (c *consulACLsAPI) bgRetryRevoke() { // unlike vault tokens, SI tokens do not have a TTL, and so we must try to // remove all SI token accessors, every time, until they're gone - toPurge := make([]*structs.SITokenAccessor, len(c.bgRetryRevocation), len(c.bgRetryRevocation)) + toRevoke := len(c.bgRetryRevocation) + if toRevoke > maxConsulRevocationBatchSize { + toRevoke = maxConsulRevocationBatchSize + } + toPurge := make([]*structs.SITokenAccessor, toRevoke) copy(toPurge, c.bgRetryRevocation) if err := c.parallelRevoke(context.Background(), toPurge); err != nil { diff --git a/nomad/consul_test.go b/nomad/consul_test.go index 051c45e3e09e..333cd2e38196 100644 --- a/nomad/consul_test.go +++ b/nomad/consul_test.go @@ -65,6 +65,14 @@ func (mps *mockPurgingServer) purgeFunc(accessors []*structs.SITokenAccessor) er } func (m *mockConsulACLsAPI) RevokeTokens(_ context.Context, accessors []*structs.SITokenAccessor, committed bool) bool { + return m.storeForRevocation(accessors, committed) +} + +func (m *mockConsulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) { + m.storeForRevocation(accessors, true) +} + +func (m *mockConsulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor, committed bool) bool { m.lock.Lock() defer m.lock.Unlock() @@ -168,6 +176,31 @@ func TestConsulACLsAPI_RevokeTokens(t *testing.T) { }) } +func TestConsulACLsAPI_MarkForRevocation(t *testing.T) { + t.Parallel() + + logger := testlog.HCLogger(t) + aclAPI := consul.NewMockACLsAPI(logger) + + c := NewConsulACLsAPI(aclAPI, logger, nil) + + generated, err := c.CreateToken(context.Background(), ServiceIdentityRequest{ + ClusterID: uuid.Generate(), + AllocID: uuid.Generate(), + TaskName: "task1-sidecar-proxy", + TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"), + }) + require.NoError(t, err) + + // set the mock error after calling CreateToken for setting up + aclAPI.SetError(nil) + + accessors := []*structs.SITokenAccessor{{AccessorID: generated.AccessorID}} + c.MarkForRevocation(accessors) + require.Len(t, c.bgRetryRevocation, 1) + require.Contains(t, c.bgRetryRevocation, accessors[0]) +} + func TestConsulACLsAPI_bgRetryRevoke(t *testing.T) { t.Parallel() diff --git a/nomad/leader.go b/nomad/leader.go index 18c36ef386ef..26b81871fea8 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -251,19 +251,17 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Activate the vault client s.vault.SetActive(true) - // Cleanup orphaned Vault token accessors - if err := s.revokeVaultAccessorsOnRestore(); err != nil { - return err - } - - // Cleanup orphaned Service Identity token accessors - if err := s.revokeSITokenAccessorsOnRestore(); err != nil { - return err - } // Enable the periodic dispatcher, since we are now the leader. s.periodicDispatcher.SetEnabled(true) + // Activate RPC now that local FSM caught up with Raft (as evident by Barrier call success) + // and all leader related components (e.g. broker queue) are enabled. + // Auxiliary processes (e.g. background, bookkeeping, and cleanup tasks can start after) + s.setConsistentReadReady() + + // Further clean ups and follow up that don't block RPC consistency + // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { return err @@ -313,7 +311,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return err } - s.setConsistentReadReady() + // Cleanup orphaned Vault token accessors + if err := s.revokeVaultAccessorsOnRestore(); err != nil { + return err + } + + // Cleanup orphaned Service Identity token accessors + if err := s.revokeSITokenAccessorsOnRestore(); err != nil { + return err + } return nil } @@ -390,7 +396,9 @@ func (s *Server) revokeVaultAccessorsOnRestore() error { } if len(revoke) != 0 { - if err := s.vault.RevokeTokens(context.Background(), revoke, true); err != nil { + s.logger.Info("revoking vault accessors after becoming leader", "accessors", len(revoke)) + + if err := s.vault.MarkForRevocation(revoke); err != nil { return fmt.Errorf("failed to revoke tokens: %v", err) } } @@ -436,8 +444,8 @@ func (s *Server) revokeSITokenAccessorsOnRestore() error { } if len(toRevoke) > 0 { - ctx := context.Background() - s.consulACLs.RevokeTokens(ctx, toRevoke, true) + s.logger.Info("revoking consul accessors after becoming leader", "accessors", len(toRevoke)) + s.consulACLs.MarkForRevocation(toRevoke) } return nil diff --git a/nomad/vault.go b/nomad/vault.go index 9dbcd0660088..1d943fc4d3ad 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -121,6 +121,9 @@ type VaultClient interface { // RevokeTokens takes a set of tokens accessor and revokes the tokens RevokeTokens(ctx context.Context, accessors []*structs.VaultAccessor, committed bool) error + // MarkForRevocation revokes the tokens in background + MarkForRevocation(accessors []*structs.VaultAccessor) error + // Stop is used to stop token renewal Stop() @@ -249,6 +252,9 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault if logger == nil { return nil, fmt.Errorf("must pass valid logger") } + if purgeFn == nil { + purgeFn = func(accessors []*structs.VaultAccessor) error { return nil } + } v := &vaultClient{ config: c, @@ -1128,6 +1134,19 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau return nil } +func (v *vaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error { + if !v.Enabled() { + return nil + } + + if !v.Active() { + return fmt.Errorf("Vault client not active") + } + + v.storeForRevocation(accessors) + return nil +} + // storeForRevocation stores the passed set of accessors for revocation. It // captures their effective TTL by storing their create TTL plus the current // time. @@ -1207,6 +1226,16 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V return g.Wait() } +// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke +// at any given time. +// +// Limiting the revocation batch size is beneficial for few reasons: +// * A single revocation failure of any entry in batch result into retrying the whole batch; +// the larger the batch is the higher likelihood of such failure +// * Smaller batch sizes result into more co-operativeness: provides hooks for +// reconsidering token TTL and leadership steps down. +const maxVaultRevokeBatchSize = 1000 + // revokeDaemon should be called in a goroutine and is used to periodically // revoke Vault accessors that failed the original revocation func (v *vaultClient) revokeDaemon() { @@ -1231,13 +1260,22 @@ func (v *vaultClient) revokeDaemon() { } // Build the list of accessors that need to be revoked while pruning any TTL'd checks - revoking := make([]*structs.VaultAccessor, 0, len(v.revoking)) + toRevoke := len(v.revoking) + if toRevoke > maxVaultRevokeBatchSize { + toRevoke = maxVaultRevokeBatchSize + } + revoking := make([]*structs.VaultAccessor, 0, toRevoke) + ttlExpired := []*structs.VaultAccessor{} for va, ttl := range v.revoking { if now.After(ttl) { - delete(v.revoking, va) + ttlExpired = append(ttlExpired, va) } else { revoking = append(revoking, va) } + + if len(revoking) >= toRevoke { + break + } } if err := v.parallelRevoke(context.Background(), revoking); err != nil { @@ -1249,6 +1287,10 @@ func (v *vaultClient) revokeDaemon() { // Unlock before a potentially expensive operation v.revLock.Unlock() + // purge all explicitly revoked as well as ttl expired tokens + // and only remove them locally on purge success + revoking = append(revoking, ttlExpired...) + // Call the passed in token revocation function if err := v.purgeFn(revoking); err != nil { // Can continue since revocation is idempotent diff --git a/nomad/vault_test.go b/nomad/vault_test.go index 99d7c78e40b6..3f94d79446e6 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -1365,6 +1365,33 @@ func TestVaultClient_CreateToken_Prestart(t *testing.T) { } } +func TestVaultClient_MarkForRevocation(t *testing.T) { + vconfig := &config.VaultConfig{ + Enabled: helper.BoolToPtr(true), + Token: uuid.Generate(), + Addr: "http://127.0.0.1:0", + } + logger := testlog.HCLogger(t) + client, err := NewVaultClient(vconfig, logger, nil) + require.NoError(t, err) + + client.SetActive(true) + defer client.Stop() + + // Create some VaultAccessors + vas := []*structs.VaultAccessor{ + mock.VaultAccessor(), + mock.VaultAccessor(), + } + + err = client.MarkForRevocation(vas) + require.NoError(t, err) + + // Wasn't committed + require.Len(t, client.revoking, 2) + require.Equal(t, 2, client.stats().TrackedForRevoke) + +} func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) { t.Parallel() vconfig := &config.VaultConfig{ diff --git a/nomad/vault_testing.go b/nomad/vault_testing.go index ba0b1d923941..e3555173701e 100644 --- a/nomad/vault_testing.go +++ b/nomad/vault_testing.go @@ -135,6 +135,11 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs return nil } +func (v *TestVaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error { + v.RevokedTokens = append(v.RevokedTokens, accessors...) + return nil +} + func (v *TestVaultClient) Stop() {} func (v *TestVaultClient) SetActive(enabled bool) {} func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }