Skip to content

Commit

Permalink
Merge pull request #8036 from hashicorp/f-background-vault-revoke-on-…
Browse files Browse the repository at this point in the history
…restore

Speed up leadership establishment
  • Loading branch information
Mahmood Ali committed Jun 1, 2020
1 parent 9343ec7 commit 902996e
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 16 deletions.
21 changes: 20 additions & 1 deletion nomad/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type ConsulACLsAPI interface {
// RevokeTokens instructs Consul to revoke the given token accessors.
RevokeTokens(context.Context, []*structs.SITokenAccessor, bool) bool

// MarkForRevocation marks the tokens for background revocation
MarkForRevocation([]*structs.SITokenAccessor)

// Stop is used to stop background token revocations. Intended to be used
// on Nomad Server shutdown.
Stop()
Expand Down Expand Up @@ -143,6 +146,10 @@ type consulACLsAPI struct {
}

func NewConsulACLsAPI(aclClient consul.ACLsAPI, logger hclog.Logger, purgeFunc PurgeSITokenAccessorFunc) *consulACLsAPI {
if purgeFunc == nil {
purgeFunc = func([]*structs.SITokenAccessor) error { return nil }
}

c := &consulACLsAPI{
aclClient: aclClient,
limiter: rate.NewLimiter(siTokenRequestRateLimit, int(siTokenRequestRateLimit)),
Expand Down Expand Up @@ -285,6 +292,10 @@ func (c *consulACLsAPI) RevokeTokens(ctx context.Context, accessors []*structs.S
return false
}

func (c *consulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
c.storeForRevocation(accessors)
}

func (c *consulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor) {
c.bgRevokeLock.Lock()
defer c.bgRevokeLock.Unlock()
Expand Down Expand Up @@ -369,6 +380,10 @@ func (c *consulACLsAPI) bgRetryRevokeDaemon() {
}
}

// maxConsulRevocationBatchSize is the maximum tokens a bgRetryRevoke should revoke
// at any given time.
const maxConsulRevocationBatchSize = 1000

func (c *consulACLsAPI) bgRetryRevoke() {
c.bgRevokeLock.Lock()
defer c.bgRevokeLock.Unlock()
Expand All @@ -380,7 +395,11 @@ func (c *consulACLsAPI) bgRetryRevoke() {

// unlike vault tokens, SI tokens do not have a TTL, and so we must try to
// remove all SI token accessors, every time, until they're gone
toPurge := make([]*structs.SITokenAccessor, len(c.bgRetryRevocation), len(c.bgRetryRevocation))
toRevoke := len(c.bgRetryRevocation)
if toRevoke > maxConsulRevocationBatchSize {
toRevoke = maxConsulRevocationBatchSize
}
toPurge := make([]*structs.SITokenAccessor, toRevoke)
copy(toPurge, c.bgRetryRevocation)

if err := c.parallelRevoke(context.Background(), toPurge); err != nil {
Expand Down
33 changes: 33 additions & 0 deletions nomad/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func (mps *mockPurgingServer) purgeFunc(accessors []*structs.SITokenAccessor) er
}

func (m *mockConsulACLsAPI) RevokeTokens(_ context.Context, accessors []*structs.SITokenAccessor, committed bool) bool {
return m.storeForRevocation(accessors, committed)
}

func (m *mockConsulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
m.storeForRevocation(accessors, true)
}

func (m *mockConsulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor, committed bool) bool {
m.lock.Lock()
defer m.lock.Unlock()

Expand Down Expand Up @@ -168,6 +176,31 @@ func TestConsulACLsAPI_RevokeTokens(t *testing.T) {
})
}

func TestConsulACLsAPI_MarkForRevocation(t *testing.T) {
t.Parallel()

logger := testlog.HCLogger(t)
aclAPI := consul.NewMockACLsAPI(logger)

c := NewConsulACLsAPI(aclAPI, logger, nil)

generated, err := c.CreateToken(context.Background(), ServiceIdentityRequest{
ClusterID: uuid.Generate(),
AllocID: uuid.Generate(),
TaskName: "task1-sidecar-proxy",
TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"),
})
require.NoError(t, err)

// set the mock error after calling CreateToken for setting up
aclAPI.SetError(nil)

accessors := []*structs.SITokenAccessor{{AccessorID: generated.AccessorID}}
c.MarkForRevocation(accessors)
require.Len(t, c.bgRetryRevocation, 1)
require.Contains(t, c.bgRetryRevocation, accessors[0])
}

func TestConsulACLsAPI_bgRetryRevoke(t *testing.T) {
t.Parallel()

Expand Down
34 changes: 21 additions & 13 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,17 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Activate the vault client
s.vault.SetActive(true)
// Cleanup orphaned Vault token accessors
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
return err
}

// Cleanup orphaned Service Identity token accessors
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
return err
}

// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)

// Activate RPC now that local FSM caught up with Raft (as evident by Barrier call success)
// and all leader related components (e.g. broker queue) are enabled.
// Auxiliary processes (e.g. background, bookkeeping, and cleanup tasks can start after)
s.setConsistentReadReady()

// Further clean ups and follow up that don't block RPC consistency

// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
return err
Expand Down Expand Up @@ -313,7 +311,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}

s.setConsistentReadReady()
// Cleanup orphaned Vault token accessors
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
return err
}

// Cleanup orphaned Service Identity token accessors
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -390,7 +396,9 @@ func (s *Server) revokeVaultAccessorsOnRestore() error {
}

if len(revoke) != 0 {
if err := s.vault.RevokeTokens(context.Background(), revoke, true); err != nil {
s.logger.Info("revoking vault accessors after becoming leader", "accessors", len(revoke))

if err := s.vault.MarkForRevocation(revoke); err != nil {
return fmt.Errorf("failed to revoke tokens: %v", err)
}
}
Expand Down Expand Up @@ -436,8 +444,8 @@ func (s *Server) revokeSITokenAccessorsOnRestore() error {
}

if len(toRevoke) > 0 {
ctx := context.Background()
s.consulACLs.RevokeTokens(ctx, toRevoke, true)
s.logger.Info("revoking consul accessors after becoming leader", "accessors", len(toRevoke))
s.consulACLs.MarkForRevocation(toRevoke)
}

return nil
Expand Down
46 changes: 44 additions & 2 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type VaultClient interface {
// RevokeTokens takes a set of tokens accessor and revokes the tokens
RevokeTokens(ctx context.Context, accessors []*structs.VaultAccessor, committed bool) error

// MarkForRevocation revokes the tokens in background
MarkForRevocation(accessors []*structs.VaultAccessor) error

// Stop is used to stop token renewal
Stop()

Expand Down Expand Up @@ -249,6 +252,9 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
if logger == nil {
return nil, fmt.Errorf("must pass valid logger")
}
if purgeFn == nil {
purgeFn = func(accessors []*structs.VaultAccessor) error { return nil }
}

v := &vaultClient{
config: c,
Expand Down Expand Up @@ -1128,6 +1134,19 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return nil
}

func (v *vaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
if !v.Enabled() {
return nil
}

if !v.Active() {
return fmt.Errorf("Vault client not active")
}

v.storeForRevocation(accessors)
return nil
}

// storeForRevocation stores the passed set of accessors for revocation. It
// captures their effective TTL by storing their create TTL plus the current
// time.
Expand Down Expand Up @@ -1207,6 +1226,16 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
return g.Wait()
}

// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
// at any given time.
//
// Limiting the revocation batch size is beneficial for few reasons:
// * A single revocation failure of any entry in batch result into retrying the whole batch;
// the larger the batch is the higher likelihood of such failure
// * Smaller batch sizes result into more co-operativeness: provides hooks for
// reconsidering token TTL and leadership steps down.
const maxVaultRevokeBatchSize = 1000

// revokeDaemon should be called in a goroutine and is used to periodically
// revoke Vault accessors that failed the original revocation
func (v *vaultClient) revokeDaemon() {
Expand All @@ -1231,13 +1260,22 @@ func (v *vaultClient) revokeDaemon() {
}

// Build the list of accessors that need to be revoked while pruning any TTL'd checks
revoking := make([]*structs.VaultAccessor, 0, len(v.revoking))
toRevoke := len(v.revoking)
if toRevoke > maxVaultRevokeBatchSize {
toRevoke = maxVaultRevokeBatchSize
}
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
ttlExpired := []*structs.VaultAccessor{}
for va, ttl := range v.revoking {
if now.After(ttl) {
delete(v.revoking, va)
ttlExpired = append(ttlExpired, va)
} else {
revoking = append(revoking, va)
}

if len(revoking) >= toRevoke {
break
}
}

if err := v.parallelRevoke(context.Background(), revoking); err != nil {
Expand All @@ -1249,6 +1287,10 @@ func (v *vaultClient) revokeDaemon() {
// Unlock before a potentially expensive operation
v.revLock.Unlock()

// purge all explicitly revoked as well as ttl expired tokens
// and only remove them locally on purge success
revoking = append(revoking, ttlExpired...)

// Call the passed in token revocation function
if err := v.purgeFn(revoking); err != nil {
// Can continue since revocation is idempotent
Expand Down
27 changes: 27 additions & 0 deletions nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,33 @@ func TestVaultClient_CreateToken_Prestart(t *testing.T) {
}
}

func TestVaultClient_MarkForRevocation(t *testing.T) {
vconfig := &config.VaultConfig{
Enabled: helper.BoolToPtr(true),
Token: uuid.Generate(),
Addr: "http://127.0.0.1:0",
}
logger := testlog.HCLogger(t)
client, err := NewVaultClient(vconfig, logger, nil)
require.NoError(t, err)

client.SetActive(true)
defer client.Stop()

// Create some VaultAccessors
vas := []*structs.VaultAccessor{
mock.VaultAccessor(),
mock.VaultAccessor(),
}

err = client.MarkForRevocation(vas)
require.NoError(t, err)

// Wasn't committed
require.Len(t, client.revoking, 2)
require.Equal(t, 2, client.stats().TrackedForRevoke)

}
func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
t.Parallel()
vconfig := &config.VaultConfig{
Expand Down
5 changes: 5 additions & 0 deletions nomad/vault_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
return nil
}

func (v *TestVaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
v.RevokedTokens = append(v.RevokedTokens, accessors...)
return nil
}

func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
Expand Down

0 comments on commit 902996e

Please sign in to comment.