Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vault: expired tokens count toward batch limit #8553

Merged
merged 1 commit into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ type vaultClient struct {
// setConfigLock serializes access to the SetConfig method
setConfigLock sync.Mutex

// consts as struct fields for overriding in tests
maxRevokeBatchSize int
revocationIntv time.Duration

entHandler taskClientHandler
}

Expand All @@ -267,13 +271,15 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
}

v := &vaultClient{
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
entHandler: delegate,
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
maxRevokeBatchSize: maxVaultRevokeBatchSize,
revocationIntv: vaultRevocationIntv,
entHandler: delegate,
}

if v.config.IsEnabled() {
Expand Down Expand Up @@ -1259,19 +1265,22 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
}

// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
// at any given time.
// and purge at any given time.
//
// Limiting the revocation batch size is beneficial for few reasons:
// * A single revocation failure of any entry in batch result into retrying the whole batch;
// the larger the batch is the higher likelihood of such failure
// * Smaller batch sizes result into more co-operativeness: provides hooks for
// reconsidering token TTL and leadership steps down.
// * Batches limit the size of the Raft message purging tokens. Due to bugs
// pre-0.11.3, expired tokens were not properly purged, so users upgrading from
// older versions may have huge numbers (millions) of expired tokens to purge.
const maxVaultRevokeBatchSize = 1000

// revokeDaemon should be called in a goroutine and is used to periodically
// revoke Vault accessors that failed the original revocation
func (v *vaultClient) revokeDaemon() {
ticker := time.NewTicker(vaultRevocationIntv)
ticker := time.NewTicker(v.revocationIntv)
defer ticker.Stop()

for {
Expand All @@ -1293,8 +1302,8 @@ func (v *vaultClient) revokeDaemon() {

// Build the list of accessors that need to be revoked while pruning any TTL'd checks
toRevoke := len(v.revoking)
if toRevoke > maxVaultRevokeBatchSize {
toRevoke = maxVaultRevokeBatchSize
if toRevoke > v.maxRevokeBatchSize {
toRevoke = v.maxRevokeBatchSize
}
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
ttlExpired := []*structs.VaultAccessor{}
Expand All @@ -1305,7 +1314,10 @@ func (v *vaultClient) revokeDaemon() {
revoking = append(revoking, va)
}

if len(revoking) >= toRevoke {
// Batches should consider tokens to be revoked
// as well as expired tokens to ensure the Raft
// message is reasonably sized.
if len(revoking)+len(ttlExpired) >= toRevoke {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment indicating that maxVaultRevokeBatchSize is meant to constraint the batch size for both submitting requests to Vault as well as restrict the size of Raft messages - hence needing to account for ttlExpired too.

break
}
}
Expand Down
78 changes: 77 additions & 1 deletion nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1436,7 +1437,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
}
}

// TestVaultClient_RevokeTokens_failures_TTL asserts that
// TestVaultClient_RevokeTokens_Failures_TTL asserts that
// the registered TTL doesn't get extended on retries
func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1694,6 +1695,81 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) {
require.Errorf(t, err, "failed to purge token: %v", s)
}

// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation
// batches are bounded in size.
func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) {
t.Parallel()
v := testutil.NewTestVault(t)
defer v.Stop()

// Set the configs token in a new test role
v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5)

// Disable client until we can change settings for testing
conf := v.Config.Copy()
conf.Enabled = helper.BoolToPtr(false)
Comment on lines +1706 to +1710
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that copying isn't necessary here since we are still initializing the config. If not, unclear why the token can be modified without copying.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that vaultClient keeps a reference to the Config, so I copied it out of an abundance of caution.


const (
batchSize = 100
batches = 3
)
resultCh := make(chan error, batches)
var totalPurges int64

// Purge function asserts batches are always < batchSize
purge := func(vas []*structs.VaultAccessor) error {
if len(vas) > batchSize {
resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize)
} else {
resultCh <- nil
}
atomic.AddInt64(&totalPurges, int64(len(vas)))

return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good assertion as-is, I would like to add a check that we actually received all messages, not that we received 3 batches not bigger than expected size.

Maybe add a counter for number of purge calls, and how many accessors received so far. Also, you can send nil on resultCh (or close it) only when received all accessors over 3 batches and then you can avoid the loop below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better idea, thanks.

}

logger := testlog.HCLogger(t)
client, err := NewVaultClient(conf, logger, purge, nil)
require.NoError(t, err)

// Override settings for testing and then enable client
client.maxRevokeBatchSize = batchSize
client.revocationIntv = 3 * time.Millisecond
conf = v.Config.Copy()
conf.Enabled = helper.BoolToPtr(true)
require.NoError(t, client.SetConfig(conf))

client.SetActive(true)
defer client.Stop()

waitForConnection(client, t)

// Create more tokens in Nomad than can fit in a batch; they don't need
// to exist in Vault.
accessors := make([]*structs.VaultAccessor, batchSize*batches)
for i := 0; i < len(accessors); i++ {
accessors[i] = &structs.VaultAccessor{Accessor: "abcd"}
}

// Mark for revocation
require.NoError(t, client.MarkForRevocation(accessors))

// Wait for tokens to be revoked
for i := 0; i < batches; i++ {
select {
case err := <-resultCh:
require.NoError(t, err)
case <-time.After(10 * time.Second):
// 10 seconds should be plenty long to process 3
// batches at a 3ms tick interval!
t.Errorf("timed out processing %d batches. %d/%d complete in 10s",
batches, i, batches)
}
}

require.Equal(t, int64(len(accessors)), atomic.LoadInt64(&totalPurges))
}

func waitForConnection(v *vaultClient, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
return v.ConnectionEstablished()
Expand Down