Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up leadership establishment #8036

Merged
merged 8 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion nomad/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type ConsulACLsAPI interface {
// RevokeTokens instructs Consul to revoke the given token accessors.
RevokeTokens(context.Context, []*structs.SITokenAccessor, bool) bool

// MarkForRevocation marks the tokens for background revocation
MarkForRevocation([]*structs.SITokenAccessor)

// Stop is used to stop background token revocations. Intended to be used
// on Nomad Server shutdown.
Stop()
Expand Down Expand Up @@ -143,6 +146,10 @@ type consulACLsAPI struct {
}

func NewConsulACLsAPI(aclClient consul.ACLsAPI, logger hclog.Logger, purgeFunc PurgeSITokenAccessorFunc) *consulACLsAPI {
if purgeFunc == nil {
purgeFunc = func([]*structs.SITokenAccessor) error { return nil }
}

c := &consulACLsAPI{
aclClient: aclClient,
limiter: rate.NewLimiter(siTokenRequestRateLimit, int(siTokenRequestRateLimit)),
Expand Down Expand Up @@ -285,6 +292,10 @@ func (c *consulACLsAPI) RevokeTokens(ctx context.Context, accessors []*structs.S
return false
}

func (c *consulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
c.storeForRevocation(accessors)
}

func (c *consulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor) {
c.bgRevokeLock.Lock()
defer c.bgRevokeLock.Unlock()
Expand Down Expand Up @@ -369,6 +380,10 @@ func (c *consulACLsAPI) bgRetryRevokeDaemon() {
}
}

// maxConsulRevocationBatchSize is the maximum tokens a bgRetryRevoke should revoke
// at any given time.
const maxConsulRevocationBatchSize = 1000

func (c *consulACLsAPI) bgRetryRevoke() {
c.bgRevokeLock.Lock()
defer c.bgRevokeLock.Unlock()
Expand All @@ -380,7 +395,11 @@ func (c *consulACLsAPI) bgRetryRevoke() {

// unlike vault tokens, SI tokens do not have a TTL, and so we must try to
// remove all SI token accessors, every time, until they're gone
toPurge := make([]*structs.SITokenAccessor, len(c.bgRetryRevocation), len(c.bgRetryRevocation))
toRevoke := len(c.bgRetryRevocation)
if toRevoke > maxConsulRevocationBatchSize {
toRevoke = maxConsulRevocationBatchSize
}
toPurge := make([]*structs.SITokenAccessor, toRevoke)
copy(toPurge, c.bgRetryRevocation)

if err := c.parallelRevoke(context.Background(), toPurge); err != nil {
Expand Down
33 changes: 33 additions & 0 deletions nomad/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func (mps *mockPurgingServer) purgeFunc(accessors []*structs.SITokenAccessor) er
}

func (m *mockConsulACLsAPI) RevokeTokens(_ context.Context, accessors []*structs.SITokenAccessor, committed bool) bool {
return m.storeForRevocation(accessors, committed)
}

func (m *mockConsulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
m.storeForRevocation(accessors, true)
}

func (m *mockConsulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor, committed bool) bool {
m.lock.Lock()
defer m.lock.Unlock()

Expand Down Expand Up @@ -168,6 +176,31 @@ func TestConsulACLsAPI_RevokeTokens(t *testing.T) {
})
}

func TestConsulACLsAPI_MarkForRevocation(t *testing.T) {
t.Parallel()

logger := testlog.HCLogger(t)
aclAPI := consul.NewMockACLsAPI(logger)

c := NewConsulACLsAPI(aclAPI, logger, nil)

generated, err := c.CreateToken(context.Background(), ServiceIdentityRequest{
ClusterID: uuid.Generate(),
AllocID: uuid.Generate(),
TaskName: "task1-sidecar-proxy",
TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"),
})
require.NoError(t, err)

// set the mock error after calling CreateToken for setting up
aclAPI.SetError(nil)

accessors := []*structs.SITokenAccessor{{AccessorID: generated.AccessorID}}
c.MarkForRevocation(accessors)
require.Len(t, c.bgRetryRevocation, 1)
require.Contains(t, c.bgRetryRevocation, accessors[0])
}

func TestConsulACLsAPI_bgRetryRevoke(t *testing.T) {
t.Parallel()

Expand Down
34 changes: 21 additions & 13 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,17 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Activate the vault client
s.vault.SetActive(true)
// Cleanup orphaned Vault token accessors
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
return err
}

// Cleanup orphaned Service Identity token accessors
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
return err
}

// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)

// Activate RPC now that local FSM caught up with Raft (as evident by Barrier call success)
// and all leader related components (e.g. broker queue) are enabled.
// Auxiliary processes (e.g. background, bookkeeping, and cleanup tasks can start after)
s.setConsistentReadReady()

// Further clean ups and follow up that don't block RPC consistency

// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
return err
Expand Down Expand Up @@ -313,7 +311,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}

s.setConsistentReadReady()
// Cleanup orphaned Vault token accessors
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
return err
}

// Cleanup orphaned Service Identity token accessors
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -390,7 +396,9 @@ func (s *Server) revokeVaultAccessorsOnRestore() error {
}

if len(revoke) != 0 {
if err := s.vault.RevokeTokens(context.Background(), revoke, true); err != nil {
s.logger.Info("revoking vault accessors after becoming leader", "accessors", len(revoke))

if err := s.vault.MarkForRevocation(revoke); err != nil {
return fmt.Errorf("failed to revoke tokens: %v", err)
}
}
Expand Down Expand Up @@ -436,8 +444,8 @@ func (s *Server) revokeSITokenAccessorsOnRestore() error {
}

if len(toRevoke) > 0 {
ctx := context.Background()
s.consulACLs.RevokeTokens(ctx, toRevoke, true)
s.logger.Info("revoking consul accessors after becoming leader", "accessors", len(toRevoke))
s.consulACLs.MarkForRevocation(toRevoke)
}

return nil
Expand Down
46 changes: 44 additions & 2 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type VaultClient interface {
// RevokeTokens takes a set of tokens accessor and revokes the tokens
RevokeTokens(ctx context.Context, accessors []*structs.VaultAccessor, committed bool) error

// MarkForRevocation revokes the tokens in background
MarkForRevocation(accessors []*structs.VaultAccessor) error

// Stop is used to stop token renewal
Stop()

Expand Down Expand Up @@ -249,6 +252,9 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
if logger == nil {
return nil, fmt.Errorf("must pass valid logger")
}
if purgeFn == nil {
purgeFn = func(accessors []*structs.VaultAccessor) error { return nil }
}

v := &vaultClient{
config: c,
Expand Down Expand Up @@ -1128,6 +1134,19 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return nil
}

func (v *vaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
if !v.Enabled() {
return nil
}

if !v.Active() {
return fmt.Errorf("Vault client not active")
}

v.storeForRevocation(accessors)
return nil
}

// storeForRevocation stores the passed set of accessors for revocation. It
// captures their effective TTL by storing their create TTL plus the current
// time.
Expand Down Expand Up @@ -1207,6 +1226,16 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
return g.Wait()
}

// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
// at any given time.
//
// Limiting the revocation batch size is beneficial for few reasons:
// * A single revocation failure of any entry in batch result into retrying the whole batch;
// the larger the batch is the higher likelihood of such failure
// * Smaller batch sizes result into more co-operativeness: provides hooks for
// reconsidering token TTL and leadership steps down.
const maxVaultRevokeBatchSize = 1000

// revokeDaemon should be called in a goroutine and is used to periodically
// revoke Vault accessors that failed the original revocation
func (v *vaultClient) revokeDaemon() {
Expand All @@ -1231,13 +1260,22 @@ func (v *vaultClient) revokeDaemon() {
}

// Build the list of accessors that need to be revoked while pruning any TTL'd checks
revoking := make([]*structs.VaultAccessor, 0, len(v.revoking))
toRevoke := len(v.revoking)
if toRevoke > maxVaultRevokeBatchSize {
toRevoke = maxVaultRevokeBatchSize
}
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
ttlExpired := []*structs.VaultAccessor{}
for va, ttl := range v.revoking {
if now.After(ttl) {
delete(v.revoking, va)
ttlExpired = append(ttlExpired, va)
} else {
revoking = append(revoking, va)
}

if len(revoking) >= toRevoke {
break
}
}

if err := v.parallelRevoke(context.Background(), revoking); err != nil {
Expand All @@ -1249,6 +1287,10 @@ func (v *vaultClient) revokeDaemon() {
// Unlock before a potentially expensive operation
v.revLock.Unlock()

// purge all explicitly revoked as well as ttl expired tokens
// and only remove them locally on purge success
revoking = append(revoking, ttlExpired...)

// Call the passed in token revocation function
if err := v.purgeFn(revoking); err != nil {
// Can continue since revocation is idempotent
Expand Down
27 changes: 27 additions & 0 deletions nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,33 @@ func TestVaultClient_CreateToken_Prestart(t *testing.T) {
}
}

func TestVaultClient_MarkForRevocation(t *testing.T) {
vconfig := &config.VaultConfig{
Enabled: helper.BoolToPtr(true),
Token: uuid.Generate(),
Addr: "http://127.0.0.1:0",
}
logger := testlog.HCLogger(t)
client, err := NewVaultClient(vconfig, logger, nil)
require.NoError(t, err)

client.SetActive(true)
defer client.Stop()

// Create some VaultAccessors
vas := []*structs.VaultAccessor{
mock.VaultAccessor(),
mock.VaultAccessor(),
}

err = client.MarkForRevocation(vas)
require.NoError(t, err)

// Wasn't committed
require.Len(t, client.revoking, 2)
require.Equal(t, 2, client.stats().TrackedForRevoke)

}
func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
t.Parallel()
vconfig := &config.VaultConfig{
Expand Down
5 changes: 5 additions & 0 deletions nomad/vault_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
return nil
}

func (v *TestVaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
v.RevokedTokens = append(v.RevokedTokens, accessors...)
return nil
}

func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
Expand Down