From 4a14604e664078094ad220f4e85b668b164afebf Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 28 Jul 2020 14:59:16 -0700 Subject: [PATCH] vault: expired tokens count toward batch limit As of 0.11.3 Vault token revocation and purging was done in batches. However the batch size was only limited by the number of *non-expired* tokens being revoked. Due to bugs prior to 0.11.3, *expired* tokens were not properly purged. Long-lived clusters could have thousands to *millions* of very old expired tokens that never got purged from the state store. Since these expired tokens did not count against the batch limit, very large batches could be created and overwhelm servers. This commit ensures expired tokens count toward the batch limit with this one line change: ``` - if len(revoking) >= toRevoke { + if len(revoking)+len(ttlExpired) >= toRevoke { ``` However, this code was difficult to test due to being in a periodically executing loop. Most of the changes are to make this one line change testable and test it. --- nomad/vault.go | 36 ++++++++++++++------- nomad/vault_test.go | 78 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 13 deletions(-) diff --git a/nomad/vault.go b/nomad/vault.go index 6f23a47f5b06..19b73035129b 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -242,6 +242,10 @@ type vaultClient struct { // setConfigLock serializes access to the SetConfig method setConfigLock sync.Mutex + // consts as struct fields for overriding in tests + maxRevokeBatchSize int + revocationIntv time.Duration + entHandler taskClientHandler } @@ -267,13 +271,15 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault } v := &vaultClient{ - config: c, - logger: logger.Named("vault"), - limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), - revoking: make(map[*structs.VaultAccessor]time.Time), - purgeFn: purgeFn, - tomb: &tomb.Tomb{}, - entHandler: delegate, + config: c, + logger: logger.Named("vault"), + limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), + revoking: make(map[*structs.VaultAccessor]time.Time), + purgeFn: purgeFn, + tomb: &tomb.Tomb{}, + maxRevokeBatchSize: maxVaultRevokeBatchSize, + revocationIntv: vaultRevocationIntv, + entHandler: delegate, } if v.config.IsEnabled() { @@ -1259,19 +1265,22 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V } // maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke -// at any given time. +// and purge at any given time. // // Limiting the revocation batch size is beneficial for few reasons: // * A single revocation failure of any entry in batch result into retrying the whole batch; // the larger the batch is the higher likelihood of such failure // * Smaller batch sizes result into more co-operativeness: provides hooks for // reconsidering token TTL and leadership steps down. +// * Batches limit the size of the Raft message purging tokens. Due to bugs +// pre-0.11.3, expired tokens were not properly purged, so users upgrading from +// older versions may have huge numbers (millions) of expired tokens to purge. const maxVaultRevokeBatchSize = 1000 // revokeDaemon should be called in a goroutine and is used to periodically // revoke Vault accessors that failed the original revocation func (v *vaultClient) revokeDaemon() { - ticker := time.NewTicker(vaultRevocationIntv) + ticker := time.NewTicker(v.revocationIntv) defer ticker.Stop() for { @@ -1293,8 +1302,8 @@ func (v *vaultClient) revokeDaemon() { // Build the list of accessors that need to be revoked while pruning any TTL'd checks toRevoke := len(v.revoking) - if toRevoke > maxVaultRevokeBatchSize { - toRevoke = maxVaultRevokeBatchSize + if toRevoke > v.maxRevokeBatchSize { + toRevoke = v.maxRevokeBatchSize } revoking := make([]*structs.VaultAccessor, 0, toRevoke) ttlExpired := []*structs.VaultAccessor{} @@ -1305,7 +1314,10 @@ func (v *vaultClient) revokeDaemon() { revoking = append(revoking, va) } - if len(revoking) >= toRevoke { + // Batches should consider tokens to be revoked + // as well as expired tokens to ensure the Raft + // message is reasonably sized. + if len(revoking)+len(ttlExpired) >= toRevoke { break } } diff --git a/nomad/vault_test.go b/nomad/vault_test.go index cd7f3d13d738..6c2784f6c12b 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -8,6 +8,7 @@ import ( "math/rand" "reflect" "strings" + "sync/atomic" "testing" "time" @@ -1436,7 +1437,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) { } } -// TestVaultClient_RevokeTokens_failures_TTL asserts that +// TestVaultClient_RevokeTokens_Failures_TTL asserts that // the registered TTL doesn't get extended on retries func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) { t.Parallel() @@ -1694,6 +1695,81 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) { require.Errorf(t, err, "failed to purge token: %v", s) } +// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation +// batches are bounded in size. +func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) { + t.Parallel() + v := testutil.NewTestVault(t) + defer v.Stop() + + // Set the configs token in a new test role + v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5) + + // Disable client until we can change settings for testing + conf := v.Config.Copy() + conf.Enabled = helper.BoolToPtr(false) + + const ( + batchSize = 100 + batches = 3 + ) + resultCh := make(chan error, batches) + var totalPurges int64 + + // Purge function asserts batches are always < batchSize + purge := func(vas []*structs.VaultAccessor) error { + if len(vas) > batchSize { + resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize) + } else { + resultCh <- nil + } + atomic.AddInt64(&totalPurges, int64(len(vas))) + + return nil + } + + logger := testlog.HCLogger(t) + client, err := NewVaultClient(conf, logger, purge, nil) + require.NoError(t, err) + + // Override settings for testing and then enable client + client.maxRevokeBatchSize = batchSize + client.revocationIntv = 3 * time.Millisecond + conf = v.Config.Copy() + conf.Enabled = helper.BoolToPtr(true) + require.NoError(t, client.SetConfig(conf)) + + client.SetActive(true) + defer client.Stop() + + waitForConnection(client, t) + + // Create more tokens in Nomad than can fit in a batch; they don't need + // to exist in Vault. + accessors := make([]*structs.VaultAccessor, batchSize*batches) + for i := 0; i < len(accessors); i++ { + accessors[i] = &structs.VaultAccessor{Accessor: "abcd"} + } + + // Mark for revocation + require.NoError(t, client.MarkForRevocation(accessors)) + + // Wait for tokens to be revoked + for i := 0; i < batches; i++ { + select { + case err := <-resultCh: + require.NoError(t, err) + case <-time.After(10 * time.Second): + // 10 seconds should be plenty long to process 3 + // batches at a 3ms tick interval! + t.Errorf("timed out processing %d batches. %d/%d complete in 10s", + batches, i, batches) + } + } + + require.Equal(t, int64(len(accessors)), atomic.LoadInt64(&totalPurges)) +} + func waitForConnection(v *vaultClient, t *testing.T) { testutil.WaitForResult(func() (bool, error) { return v.ConnectionEstablished()