From e7703d132b282b1d1f677ca6059f93570d2d0a4a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 11 Nov 2022 16:30:24 -0500 Subject: [PATCH 1/3] keyring: update handle to state inside replication loop When keyring replication starts, we take a handle to the state store. But whenever a snapshot is restored, this handle is invalidated and no longer points to a state store that is receiving new keys. This leaks a bunch of memory too! In addition to operator-initiated restores, when fresh servers are added to existing clusters with large-enough state, the keyring replication can get started quickly enough that it's running before the snapshot from the existing clusters have been restored. Fix this by updating the handle to the state store whenever the store's abandon channel is closed. Refactor the query for key metadata to use blocking queries for efficiency. --- .changelog/15227.txt | 3 +++ nomad/encrypter.go | 56 +++++++++++++++++++++++------------------ nomad/encrypter_test.go | 28 +++++++++++++++++++++ 3 files changed, 63 insertions(+), 24 deletions(-) create mode 100644 .changelog/15227.txt diff --git a/.changelog/15227.txt b/.changelog/15227.txt new file mode 100644 index 000000000000..a22d4bc7a50b --- /dev/null +++ b/.changelog/15227.txt @@ -0,0 +1,3 @@ +```release-note:bug +keyring: Fixed a bug where replication would stop after snapshot restores +``` diff --git a/nomad/encrypter.go b/nomad/encrypter.go index 23b53db724df..ecab8682e44e 100644 --- a/nomad/encrypter.go +++ b/nomad/encrypter.go @@ -18,10 +18,11 @@ import ( log "github.com/hashicorp/go-hclog" kms "github.com/hashicorp/go-kms-wrapping/v2" "github.com/hashicorp/go-kms-wrapping/v2/aead" - "golang.org/x/time/rate" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/crypto" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -434,15 +435,11 @@ func (krr *KeyringReplicator) stop() { const keyringReplicationRate = 10 func (krr *KeyringReplicator) run(ctx context.Context) { - limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate) krr.logger.Debug("starting encryption key replication") defer krr.logger.Debug("exiting key replication") - retryErrTimer, stop := helper.NewSafeTimer(time.Second * 1) - defer stop() - -START: store := krr.srv.fsm.State() + minIndex := uint64(0) for { select { @@ -450,19 +447,26 @@ START: return case <-ctx.Done(): return + case <-store.AbandonCh(): + store = krr.srv.fsm.State() + minIndex = uint64(0) default: - // Rate limit how often we attempt replication - err := limiter.Wait(ctx) + // ensure that we can abandon the blocking query periodically and + // give our other contexts a chance to fire + queryCtx, queryCancel := context.WithTimeout(ctx, time.Second) + defer queryCancel() + + var rawIter any + var err error + rawIter, minIndex, err = store.BlockingQuery(getRootKeyMetas, minIndex, queryCtx) if err != nil { - goto ERR_WAIT // rate limit exceeded - } - - ws := store.NewWatchSet() - iter, err := store.RootKeyMetas(ws) - if err != nil { - krr.logger.Error("failed to fetch keyring", "error", err) - goto ERR_WAIT + if queryCtx.Err() == nil { + // we get errors for closed context but don't want to log on that + krr.logger.Error("failed to fetch keyring", "error", err) + } + continue } + iter := rawIter.(memdb.ResultIterator) for { raw := iter.Next() @@ -488,16 +492,20 @@ START: } } -ERR_WAIT: - retryErrTimer.Reset(1 * time.Second) +} - select { - case <-retryErrTimer.C: - goto START - case <-ctx.Done(): - return +// getRootKeyMetas implements state.QueryFn and is run as a blocking query to +// detect new key metadata +func getRootKeyMetas(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { + iter, err := store.RootKeyMetas(ws) + if err != nil { + return nil, 0, err } - + index, err := store.Index(state.TableRootKeyMeta) + if err != nil { + return nil, 0, err + } + return iter, index, nil } // replicateKey replicates a single key from peer servers that was present in diff --git a/nomad/encrypter_test.go b/nomad/encrypter_test.go index 43766d8c4b5b..b16e4eae67b6 100644 --- a/nomad/encrypter_test.go +++ b/nomad/encrypter_test.go @@ -1,6 +1,7 @@ package nomad import ( + "bytes" "context" "os" "path/filepath" @@ -8,6 +9,7 @@ import ( "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/ci" @@ -300,6 +302,32 @@ func TestEncrypter_KeyringReplication(t *testing.T) { time.Second*5, time.Second, "expected new servers to get replicated keys") + // Scenario: reload a snapshot + + t.Logf("taking snapshot of node5") + + snapshot, err := srv5.fsm.Snapshot() + must.NoError(t, err) + + defer snapshot.Release() + + // Persist so we can read it back + buf := bytes.NewBuffer(nil) + sink := &MockSink{buf, false} + must.NoError(t, snapshot.Persist(sink)) + + must.NoError(t, srv5.fsm.Restore(sink)) + + // rotate the key + + err = msgpackrpc.CallWithCodec(codec, "Keyring.Rotate", rotateReq, &rotateResp) + require.NoError(t, err) + keyID4 := rotateResp.Key.KeyID + + require.Eventually(t, checkReplicationFn(keyID4), + time.Second*5, time.Second, + "expected new servers to get replicated keys after snapshot restore") + } func TestEncrypter_EncryptDecrypt(t *testing.T) { From b3f8ff53dc9b9b1aae5996e8c99cc55ebfabdda4 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 16 Nov 2022 12:20:53 -0500 Subject: [PATCH 2/3] move binding for AbandonCh --- nomad/encrypter.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/nomad/encrypter.go b/nomad/encrypter.go index ecab8682e44e..7c87cfabda32 100644 --- a/nomad/encrypter.go +++ b/nomad/encrypter.go @@ -439,6 +439,7 @@ func (krr *KeyringReplicator) run(ctx context.Context) { defer krr.logger.Debug("exiting key replication") store := krr.srv.fsm.State() + abandonCh := store.AbandonCh() minIndex := uint64(0) for { @@ -447,12 +448,15 @@ func (krr *KeyringReplicator) run(ctx context.Context) { return case <-ctx.Done(): return - case <-store.AbandonCh(): + case <-abandonCh: + // reset the blocking query to make sure we get all the keys from a + // restored snapshot store = krr.srv.fsm.State() + abandonCh = store.AbandonCh() minIndex = uint64(0) default: - // ensure that we can abandon the blocking query periodically and - // give our other contexts a chance to fire + // ensure that we abandon the blocking query periodically to give + // other contexts a chance to fire queryCtx, queryCancel := context.WithTimeout(ctx, time.Second) defer queryCancel() From 82fce0a282e3a5b79221c043f3ffedcf6c25282d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 16 Nov 2022 13:44:45 -0500 Subject: [PATCH 3/3] back out blocking query in lieu of limiter and simplify code by removing ERR_WAIT block --- nomad/encrypter.go | 51 ++++++++++------------------------------------ 1 file changed, 11 insertions(+), 40 deletions(-) diff --git a/nomad/encrypter.go b/nomad/encrypter.go index 7c87cfabda32..1f773bd95d64 100644 --- a/nomad/encrypter.go +++ b/nomad/encrypter.go @@ -18,11 +18,10 @@ import ( log "github.com/hashicorp/go-hclog" kms "github.com/hashicorp/go-kms-wrapping/v2" "github.com/hashicorp/go-kms-wrapping/v2/aead" - memdb "github.com/hashicorp/go-memdb" + "golang.org/x/time/rate" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/crypto" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -432,15 +431,13 @@ func (krr *KeyringReplicator) stop() { krr.stopFn() } -const keyringReplicationRate = 10 +const keyringReplicationRate = 5 func (krr *KeyringReplicator) run(ctx context.Context) { krr.logger.Debug("starting encryption key replication") defer krr.logger.Debug("exiting key replication") - store := krr.srv.fsm.State() - abandonCh := store.AbandonCh() - minIndex := uint64(0) + limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate) for { select { @@ -448,30 +445,18 @@ func (krr *KeyringReplicator) run(ctx context.Context) { return case <-ctx.Done(): return - case <-abandonCh: - // reset the blocking query to make sure we get all the keys from a - // restored snapshot - store = krr.srv.fsm.State() - abandonCh = store.AbandonCh() - minIndex = uint64(0) default: - // ensure that we abandon the blocking query periodically to give - // other contexts a chance to fire - queryCtx, queryCancel := context.WithTimeout(ctx, time.Second) - defer queryCancel() - - var rawIter any - var err error - rawIter, minIndex, err = store.BlockingQuery(getRootKeyMetas, minIndex, queryCtx) + err := limiter.Wait(ctx) if err != nil { - if queryCtx.Err() == nil { - // we get errors for closed context but don't want to log on that - krr.logger.Error("failed to fetch keyring", "error", err) - } - continue + continue // rate limit exceeded } - iter := rawIter.(memdb.ResultIterator) + store := krr.srv.fsm.State() + iter, err := store.RootKeyMetas(nil) + if err != nil { + krr.logger.Error("failed to fetch keyring", "error", err) + continue + } for { raw := iter.Next() if raw == nil { @@ -498,20 +483,6 @@ func (krr *KeyringReplicator) run(ctx context.Context) { } -// getRootKeyMetas implements state.QueryFn and is run as a blocking query to -// detect new key metadata -func getRootKeyMetas(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { - iter, err := store.RootKeyMetas(ws) - if err != nil { - return nil, 0, err - } - index, err := store.Index(state.TableRootKeyMeta) - if err != nil { - return nil, 0, err - } - return iter, index, nil -} - // replicateKey replicates a single key from peer servers that was present in // the state store but missing from the keyring. Returns an error only if no // peers have this key.