From 21f8f7152f0f5291ef3b41e7fbde927ea2a45577 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 28 Jul 2020 14:59:16 -0700 Subject: [PATCH] vault: expired tokens count toward batch limit As of 0.11.3 Vault token revocation and purging was done in batches. However the batch size was only limited by the number of *non-expired* tokens being revoked. Due to bugs prior to 0.11.3, *expired* tokens were not properly purged. Long-lived clusters could have thousands to *millions* of very old expired tokens that never got purged from the state store. Since these expired tokens did not count against the batch limit, very large batches could be created and overwhelm servers. This commit ensures expired tokens count toward the batch limit with this one line change: ``` - if len(revoking) >= toRevoke { + if len(revoking)+len(ttlExpired) >= toRevoke { ``` However, this code was difficult to test due to being in a periodically executing loop. Most of the changes are to make this one line change testable and test it. --- nomad/vault.go | 26 ++++++++++------ nomad/vault_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 11 deletions(-) diff --git a/nomad/vault.go b/nomad/vault.go index 013f38be9214..9def07a65054 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -240,6 +240,10 @@ type vaultClient struct { // setConfigLock serializes access to the SetConfig method setConfigLock sync.Mutex + + // consts as struct fields for overriding in tests + maxRevokeBatchSize int + revocationIntv time.Duration } // NewVaultClient returns a Vault client from the given config. If the client @@ -257,12 +261,14 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault } v := &vaultClient{ - config: c, - logger: logger.Named("vault"), - limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), - revoking: make(map[*structs.VaultAccessor]time.Time), - purgeFn: purgeFn, - tomb: &tomb.Tomb{}, + config: c, + logger: logger.Named("vault"), + limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), + revoking: make(map[*structs.VaultAccessor]time.Time), + purgeFn: purgeFn, + tomb: &tomb.Tomb{}, + maxRevokeBatchSize: maxVaultRevokeBatchSize, + revocationIntv: vaultRevocationIntv, } if v.config.IsEnabled() { @@ -1231,7 +1237,7 @@ const maxVaultRevokeBatchSize = 1000 // revokeDaemon should be called in a goroutine and is used to periodically // revoke Vault accessors that failed the original revocation func (v *vaultClient) revokeDaemon() { - ticker := time.NewTicker(vaultRevocationIntv) + ticker := time.NewTicker(v.revocationIntv) defer ticker.Stop() for { @@ -1253,8 +1259,8 @@ func (v *vaultClient) revokeDaemon() { // Build the list of accessors that need to be revoked while pruning any TTL'd checks toRevoke := len(v.revoking) - if toRevoke > maxVaultRevokeBatchSize { - toRevoke = maxVaultRevokeBatchSize + if toRevoke > v.maxRevokeBatchSize { + toRevoke = v.maxRevokeBatchSize } revoking := make([]*structs.VaultAccessor, 0, toRevoke) ttlExpired := []*structs.VaultAccessor{} @@ -1265,7 +1271,7 @@ func (v *vaultClient) revokeDaemon() { revoking = append(revoking, va) } - if len(revoking) >= toRevoke { + if len(revoking)+len(ttlExpired) >= toRevoke { break } } diff --git a/nomad/vault_test.go b/nomad/vault_test.go index 3f94d79446e6..29d037b8b081 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -1436,7 +1436,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) { } } -// TestVaultClient_RevokeTokens_failures_TTL asserts that +// TestVaultClient_RevokeTokens_Failures_TTL asserts that // the registered TTL doesn't get extended on retries func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) { t.Parallel() @@ -1694,6 +1694,79 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) { require.Errorf(t, err, "failed to purge token: %v", s) } +// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation +// batches are bounded in size. +func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) { + t.Parallel() + v := testutil.NewTestVault(t) + defer v.Stop() + + // Set the configs token in a new test role + v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5) + + // Disable client until we can change settings for testing + conf := v.Config.Copy() + conf.Enabled = helper.BoolToPtr(false) + + const ( + batchSize = 100 + batches = 3 + ) + resultCh := make(chan error, batches) + + // Purge function asserts batches are always < batchSize + purge := func(vas []*structs.VaultAccessor) error { + if len(vas) > batchSize { + resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize) + } else { + resultCh <- nil + } + + return nil + } + + logger := testlog.HCLogger(t) + client, err := NewVaultClient(conf, logger, purge) + if err != nil { + t.Fatalf("failed to build vault client: %v", err) + } + + // Override settings for testing and then enable client + client.maxRevokeBatchSize = batchSize + client.revocationIntv = 3 * time.Millisecond + conf = v.Config.Copy() + conf.Enabled = helper.BoolToPtr(true) + require.NoError(t, client.SetConfig(conf)) + + client.SetActive(true) + defer client.Stop() + + waitForConnection(client, t) + + // Create more tokens in Nomad than can fit in a batch; they don't need + // to exist in Vault. + accessors := make([]*structs.VaultAccessor, batchSize*3) + for i := 0; i < len(accessors); i++ { + accessors[i] = &structs.VaultAccessor{Accessor: "abcd"} + } + + // Mark for revocation + require.NoError(t, client.MarkForRevocation(accessors)) + + // Wait for tokens to be revoked + for i := 0; i < batches; i++ { + select { + case err := <-resultCh: + require.NoError(t, err) + case <-time.After(10 * time.Second): + // 10 seconds should be plenty long to process 3 + // batches at a 3ms tick interval! + t.Errorf("timed out processing %d batches. %d/%d complete in 10s", + batches, i, batches) + } + } +} + func waitForConnection(v *vaultClient, t *testing.T) { testutil.WaitForResult(func() (bool, error) { return v.ConnectionEstablished()