Skip to content

Commit

Permalink
keyring: update handle to state inside replication loop
Browse files Browse the repository at this point in the history
When keyring replication starts, we take a handle to the state store. But
whenever a snapshot is restored, this handle is invalidated and no longer points
to a state store that is receiving new keys. This leaks a bunch of memory too!

In addition to operator-initiated restores, when fresh servers are added to
existing clusters with large-enough state, the keyring replication can get
started quickly enough that it's running before the snapshot from the existing
clusters have been restored.

Fix this by updating the handle to the state store whenever the store's abandon
channel is closed.
  • Loading branch information
tgross committed Nov 15, 2022
1 parent 9ad9029 commit 9a1c450
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/15227.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
keyring: Fixed a bug where replication would stop after snapshot restores
```
10 changes: 6 additions & 4 deletions nomad/encrypter.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,31 +434,33 @@ func (krr *KeyringReplicator) stop() {
const keyringReplicationRate = 10

func (krr *KeyringReplicator) run(ctx context.Context) {
limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate)
krr.logger.Debug("starting encryption key replication")
defer krr.logger.Debug("exiting key replication")

limiter := rate.NewLimiter(keyringReplicationRate, keyringReplicationRate)
store := krr.srv.fsm.State()

retryErrTimer, stop := helper.NewSafeTimer(time.Second * 1)
defer stop()

START:
store := krr.srv.fsm.State()

for {
select {
case <-krr.srv.shutdownCtx.Done():
return
case <-ctx.Done():
return
case <-store.AbandonCh():
store = krr.srv.fsm.State()
default:
// Rate limit how often we attempt replication
err := limiter.Wait(ctx)
if err != nil {
goto ERR_WAIT // rate limit exceeded
}

ws := store.NewWatchSet()
iter, err := store.RootKeyMetas(ws)
iter, err := store.RootKeyMetas(nil)
if err != nil {
krr.logger.Error("failed to fetch keyring", "error", err)
goto ERR_WAIT
Expand Down
26 changes: 26 additions & 0 deletions nomad/encrypter_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package nomad

import (
"bytes"
"context"
"os"
"path/filepath"
"testing"
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
Expand Down Expand Up @@ -300,6 +302,30 @@ func TestEncrypter_KeyringReplication(t *testing.T) {
time.Second*5, time.Second,
"expected new servers to get replicated keys")

// Scenario: reload a snapshot

snapshot, err := srv5.fsm.Snapshot()
must.NoError(t, err)

defer snapshot.Release()

// Persist so we can read it back
buf := bytes.NewBuffer(nil)
sink := &MockSink{buf, false}
must.NoError(t, snapshot.Persist(sink))

srv5.fsm.Restore(sink)

// rotate the key

err = msgpackrpc.CallWithCodec(codec, "Keyring.Rotate", rotateReq, &rotateResp)
require.NoError(t, err)
keyID4 := rotateResp.Key.KeyID

require.Eventually(t, checkReplicationFn(keyID4),
time.Second*5, time.Second,
"expected new servers to get replicated keys after snapshot restore")

}

func TestEncrypter_EncryptDecrypt(t *testing.T) {
Expand Down

0 comments on commit 9a1c450

Please sign in to comment.