diff --git a/nomad/vault.go b/nomad/vault.go index 6f23a47f5b06..19b73035129b 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -242,6 +242,10 @@ type vaultClient struct { // setConfigLock serializes access to the SetConfig method setConfigLock sync.Mutex + // consts as struct fields for overriding in tests + maxRevokeBatchSize int + revocationIntv time.Duration + entHandler taskClientHandler } @@ -267,13 +271,15 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault } v := &vaultClient{ - config: c, - logger: logger.Named("vault"), - limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), - revoking: make(map[*structs.VaultAccessor]time.Time), - purgeFn: purgeFn, - tomb: &tomb.Tomb{}, - entHandler: delegate, + config: c, + logger: logger.Named("vault"), + limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), + revoking: make(map[*structs.VaultAccessor]time.Time), + purgeFn: purgeFn, + tomb: &tomb.Tomb{}, + maxRevokeBatchSize: maxVaultRevokeBatchSize, + revocationIntv: vaultRevocationIntv, + entHandler: delegate, } if v.config.IsEnabled() { @@ -1259,19 +1265,22 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V } // maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke -// at any given time. +// and purge at any given time. // // Limiting the revocation batch size is beneficial for few reasons: // * A single revocation failure of any entry in batch result into retrying the whole batch; // the larger the batch is the higher likelihood of such failure // * Smaller batch sizes result into more co-operativeness: provides hooks for // reconsidering token TTL and leadership steps down. +// * Batches limit the size of the Raft message purging tokens. Due to bugs +// pre-0.11.3, expired tokens were not properly purged, so users upgrading from +// older versions may have huge numbers (millions) of expired tokens to purge. const maxVaultRevokeBatchSize = 1000 // revokeDaemon should be called in a goroutine and is used to periodically // revoke Vault accessors that failed the original revocation func (v *vaultClient) revokeDaemon() { - ticker := time.NewTicker(vaultRevocationIntv) + ticker := time.NewTicker(v.revocationIntv) defer ticker.Stop() for { @@ -1293,8 +1302,8 @@ func (v *vaultClient) revokeDaemon() { // Build the list of accessors that need to be revoked while pruning any TTL'd checks toRevoke := len(v.revoking) - if toRevoke > maxVaultRevokeBatchSize { - toRevoke = maxVaultRevokeBatchSize + if toRevoke > v.maxRevokeBatchSize { + toRevoke = v.maxRevokeBatchSize } revoking := make([]*structs.VaultAccessor, 0, toRevoke) ttlExpired := []*structs.VaultAccessor{} @@ -1305,7 +1314,10 @@ func (v *vaultClient) revokeDaemon() { revoking = append(revoking, va) } - if len(revoking) >= toRevoke { + // Batches should consider tokens to be revoked + // as well as expired tokens to ensure the Raft + // message is reasonably sized. + if len(revoking)+len(ttlExpired) >= toRevoke { break } } diff --git a/nomad/vault_test.go b/nomad/vault_test.go index cd7f3d13d738..6c2784f6c12b 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -8,6 +8,7 @@ import ( "math/rand" "reflect" "strings" + "sync/atomic" "testing" "time" @@ -1436,7 +1437,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) { } } -// TestVaultClient_RevokeTokens_failures_TTL asserts that +// TestVaultClient_RevokeTokens_Failures_TTL asserts that // the registered TTL doesn't get extended on retries func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) { t.Parallel() @@ -1694,6 +1695,81 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) { require.Errorf(t, err, "failed to purge token: %v", s) } +// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation +// batches are bounded in size. +func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) { + t.Parallel() + v := testutil.NewTestVault(t) + defer v.Stop() + + // Set the configs token in a new test role + v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5) + + // Disable client until we can change settings for testing + conf := v.Config.Copy() + conf.Enabled = helper.BoolToPtr(false) + + const ( + batchSize = 100 + batches = 3 + ) + resultCh := make(chan error, batches) + var totalPurges int64 + + // Purge function asserts batches are always < batchSize + purge := func(vas []*structs.VaultAccessor) error { + if len(vas) > batchSize { + resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize) + } else { + resultCh <- nil + } + atomic.AddInt64(&totalPurges, int64(len(vas))) + + return nil + } + + logger := testlog.HCLogger(t) + client, err := NewVaultClient(conf, logger, purge, nil) + require.NoError(t, err) + + // Override settings for testing and then enable client + client.maxRevokeBatchSize = batchSize + client.revocationIntv = 3 * time.Millisecond + conf = v.Config.Copy() + conf.Enabled = helper.BoolToPtr(true) + require.NoError(t, client.SetConfig(conf)) + + client.SetActive(true) + defer client.Stop() + + waitForConnection(client, t) + + // Create more tokens in Nomad than can fit in a batch; they don't need + // to exist in Vault. + accessors := make([]*structs.VaultAccessor, batchSize*batches) + for i := 0; i < len(accessors); i++ { + accessors[i] = &structs.VaultAccessor{Accessor: "abcd"} + } + + // Mark for revocation + require.NoError(t, client.MarkForRevocation(accessors)) + + // Wait for tokens to be revoked + for i := 0; i < batches; i++ { + select { + case err := <-resultCh: + require.NoError(t, err) + case <-time.After(10 * time.Second): + // 10 seconds should be plenty long to process 3 + // batches at a 3ms tick interval! + t.Errorf("timed out processing %d batches. %d/%d complete in 10s", + batches, i, batches) + } + } + + require.Equal(t, int64(len(accessors)), atomic.LoadInt64(&totalPurges)) +} + func waitForConnection(v *vaultClient, t *testing.T) { testutil.WaitForResult(func() (bool, error) { return v.ConnectionEstablished()