Skip to content

Commit

Permalink
vault: expired tokens count toward batch limit
Browse files Browse the repository at this point in the history
As of 0.11.3 Vault token revocation and purging was done in batches.
However the batch size was only limited by the number of *non-expired*
tokens being revoked.

Due to bugs prior to 0.11.3, *expired* tokens were not properly purged.
Long-lived clusters could have thousands to *millions* of very old
expired tokens that never got purged from the state store.

Since these expired tokens did not count against the batch limit, very
large batches could be created and overwhelm servers.

This commit ensures expired tokens count toward the batch limit with
this one line change:

```
- if len(revoking) >= toRevoke {
+ if len(revoking)+len(ttlExpired) >= toRevoke {
```

However, this code was difficult to test due to being in a periodically
executing loop. Most of the changes are to make this one line change
testable and test it.
  • Loading branch information
schmichael committed Jul 28, 2020
1 parent f651278 commit 21f8f71
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 11 deletions.
26 changes: 16 additions & 10 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ type vaultClient struct {

// setConfigLock serializes access to the SetConfig method
setConfigLock sync.Mutex

// consts as struct fields for overriding in tests
maxRevokeBatchSize int
revocationIntv time.Duration
}

// NewVaultClient returns a Vault client from the given config. If the client
Expand All @@ -257,12 +261,14 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
}

v := &vaultClient{
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
maxRevokeBatchSize: maxVaultRevokeBatchSize,
revocationIntv: vaultRevocationIntv,
}

if v.config.IsEnabled() {
Expand Down Expand Up @@ -1231,7 +1237,7 @@ const maxVaultRevokeBatchSize = 1000
// revokeDaemon should be called in a goroutine and is used to periodically
// revoke Vault accessors that failed the original revocation
func (v *vaultClient) revokeDaemon() {
ticker := time.NewTicker(vaultRevocationIntv)
ticker := time.NewTicker(v.revocationIntv)
defer ticker.Stop()

for {
Expand All @@ -1253,8 +1259,8 @@ func (v *vaultClient) revokeDaemon() {

// Build the list of accessors that need to be revoked while pruning any TTL'd checks
toRevoke := len(v.revoking)
if toRevoke > maxVaultRevokeBatchSize {
toRevoke = maxVaultRevokeBatchSize
if toRevoke > v.maxRevokeBatchSize {
toRevoke = v.maxRevokeBatchSize
}
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
ttlExpired := []*structs.VaultAccessor{}
Expand All @@ -1265,7 +1271,7 @@ func (v *vaultClient) revokeDaemon() {
revoking = append(revoking, va)
}

if len(revoking) >= toRevoke {
if len(revoking)+len(ttlExpired) >= toRevoke {
break
}
}
Expand Down
75 changes: 74 additions & 1 deletion nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
}
}

// TestVaultClient_RevokeTokens_failures_TTL asserts that
// TestVaultClient_RevokeTokens_Failures_TTL asserts that
// the registered TTL doesn't get extended on retries
func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1694,6 +1694,79 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) {
require.Errorf(t, err, "failed to purge token: %v", s)
}

// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation
// batches are bounded in size.
func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) {
t.Parallel()
v := testutil.NewTestVault(t)
defer v.Stop()

// Set the configs token in a new test role
v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5)

// Disable client until we can change settings for testing
conf := v.Config.Copy()
conf.Enabled = helper.BoolToPtr(false)

const (
batchSize = 100
batches = 3
)
resultCh := make(chan error, batches)

// Purge function asserts batches are always < batchSize
purge := func(vas []*structs.VaultAccessor) error {
if len(vas) > batchSize {
resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize)
} else {
resultCh <- nil
}

return nil
}

logger := testlog.HCLogger(t)
client, err := NewVaultClient(conf, logger, purge)
if err != nil {
t.Fatalf("failed to build vault client: %v", err)
}

// Override settings for testing and then enable client
client.maxRevokeBatchSize = batchSize
client.revocationIntv = 3 * time.Millisecond
conf = v.Config.Copy()
conf.Enabled = helper.BoolToPtr(true)
require.NoError(t, client.SetConfig(conf))

client.SetActive(true)
defer client.Stop()

waitForConnection(client, t)

// Create more tokens in Nomad than can fit in a batch; they don't need
// to exist in Vault.
accessors := make([]*structs.VaultAccessor, batchSize*3)
for i := 0; i < len(accessors); i++ {
accessors[i] = &structs.VaultAccessor{Accessor: "abcd"}
}

// Mark for revocation
require.NoError(t, client.MarkForRevocation(accessors))

// Wait for tokens to be revoked
for i := 0; i < batches; i++ {
select {
case err := <-resultCh:
require.NoError(t, err)
case <-time.After(10 * time.Second):
// 10 seconds should be plenty long to process 3
// batches at a 3ms tick interval!
t.Errorf("timed out processing %d batches. %d/%d complete in 10s",
batches, i, batches)
}
}
}

func waitForConnection(v *vaultClient, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
return v.ConnectionEstablished()
Expand Down

0 comments on commit 21f8f71

Please sign in to comment.