diff --git a/nomad/vault.go b/nomad/vault.go index 013f38be9214..9def07a65054 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -240,6 +240,10 @@ type vaultClient struct { // setConfigLock serializes access to the SetConfig method setConfigLock sync.Mutex + + // consts as struct fields for overriding in tests + maxRevokeBatchSize int + revocationIntv time.Duration } // NewVaultClient returns a Vault client from the given config. If the client @@ -257,12 +261,14 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault } v := &vaultClient{ - config: c, - logger: logger.Named("vault"), - limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), - revoking: make(map[*structs.VaultAccessor]time.Time), - purgeFn: purgeFn, - tomb: &tomb.Tomb{}, + config: c, + logger: logger.Named("vault"), + limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)), + revoking: make(map[*structs.VaultAccessor]time.Time), + purgeFn: purgeFn, + tomb: &tomb.Tomb{}, + maxRevokeBatchSize: maxVaultRevokeBatchSize, + revocationIntv: vaultRevocationIntv, } if v.config.IsEnabled() { @@ -1231,7 +1237,7 @@ const maxVaultRevokeBatchSize = 1000 // revokeDaemon should be called in a goroutine and is used to periodically // revoke Vault accessors that failed the original revocation func (v *vaultClient) revokeDaemon() { - ticker := time.NewTicker(vaultRevocationIntv) + ticker := time.NewTicker(v.revocationIntv) defer ticker.Stop() for { @@ -1253,8 +1259,8 @@ func (v *vaultClient) revokeDaemon() { // Build the list of accessors that need to be revoked while pruning any TTL'd checks toRevoke := len(v.revoking) - if toRevoke > maxVaultRevokeBatchSize { - toRevoke = maxVaultRevokeBatchSize + if toRevoke > v.maxRevokeBatchSize { + toRevoke = v.maxRevokeBatchSize } revoking := make([]*structs.VaultAccessor, 0, toRevoke) ttlExpired := []*structs.VaultAccessor{} @@ -1265,7 +1271,7 @@ func (v *vaultClient) revokeDaemon() { revoking = append(revoking, va) } - if len(revoking) >= toRevoke { + if len(revoking)+len(ttlExpired) >= toRevoke { break } } diff --git a/nomad/vault_test.go b/nomad/vault_test.go index 3f94d79446e6..29d037b8b081 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -1436,7 +1436,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) { } } -// TestVaultClient_RevokeTokens_failures_TTL asserts that +// TestVaultClient_RevokeTokens_Failures_TTL asserts that // the registered TTL doesn't get extended on retries func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) { t.Parallel() @@ -1694,6 +1694,79 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) { require.Errorf(t, err, "failed to purge token: %v", s) } +// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation +// batches are bounded in size. +func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) { + t.Parallel() + v := testutil.NewTestVault(t) + defer v.Stop() + + // Set the configs token in a new test role + v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5) + + // Disable client until we can change settings for testing + conf := v.Config.Copy() + conf.Enabled = helper.BoolToPtr(false) + + const ( + batchSize = 100 + batches = 3 + ) + resultCh := make(chan error, batches) + + // Purge function asserts batches are always < batchSize + purge := func(vas []*structs.VaultAccessor) error { + if len(vas) > batchSize { + resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize) + } else { + resultCh <- nil + } + + return nil + } + + logger := testlog.HCLogger(t) + client, err := NewVaultClient(conf, logger, purge) + if err != nil { + t.Fatalf("failed to build vault client: %v", err) + } + + // Override settings for testing and then enable client + client.maxRevokeBatchSize = batchSize + client.revocationIntv = 3 * time.Millisecond + conf = v.Config.Copy() + conf.Enabled = helper.BoolToPtr(true) + require.NoError(t, client.SetConfig(conf)) + + client.SetActive(true) + defer client.Stop() + + waitForConnection(client, t) + + // Create more tokens in Nomad than can fit in a batch; they don't need + // to exist in Vault. + accessors := make([]*structs.VaultAccessor, batchSize*3) + for i := 0; i < len(accessors); i++ { + accessors[i] = &structs.VaultAccessor{Accessor: "abcd"} + } + + // Mark for revocation + require.NoError(t, client.MarkForRevocation(accessors)) + + // Wait for tokens to be revoked + for i := 0; i < batches; i++ { + select { + case err := <-resultCh: + require.NoError(t, err) + case <-time.After(10 * time.Second): + // 10 seconds should be plenty long to process 3 + // batches at a 3ms tick interval! + t.Errorf("timed out processing %d batches. %d/%d complete in 10s", + batches, i, batches) + } + } +} + func waitForConnection(v *vaultClient, t *testing.T) { testutil.WaitForResult(func() (bool, error) { return v.ConnectionEstablished()