Skip to content

Commit

Permalink
keyring: fixes for keyring replication on cluster join (#14987)
Browse files Browse the repository at this point in the history
* keyring: don't unblock early if rate limit burst exceeded

The rate limiter returns an error and unblocks early if its burst limit is
exceeded (unless the burst limit is Inf). Ensure we're not unblocking early,
otherwise we'll only slow down the cases where we're already pausing to make
external RPC requests.

* keyring: set MinQueryIndex on stale queries

When keyring replication makes a stale query to non-leader peers to find a key
the leader doesn't have, we need to make sure the peer we're querying has had a
chance to catch up to the most current index for that key. Otherwise it's
possible for newly-added servers to query another newly-added server and get a
non-error nil response for that key ID.

Ensure that we're setting the correct reply index in the blocking query.

Note that the "not found" case does not return an error, just an empty key. So
as a belt-and-suspenders, update the handling of empty responses so that we
don't break the loop early if we hit a server that doesn't have the key.

* test for adding new servers to keyring

* leader: initialize keyring after we have consistent reads

Wait until we're sure the FSM is current before we try to initialize the
keyring.

Also, if a key is rotated immediately following a leader election, plans that
are in-flight may get signed before the new leader has the key. Allow for a
short timeout-and-retry to avoid rejecting plans
  • Loading branch information
tgross committed Oct 21, 2022
1 parent 5ed7404 commit 5732eb2
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 14 deletions.
39 changes: 32 additions & 7 deletions nomad/encrypter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,33 @@ const keyIDHeader = "kid"
// SignClaims signs the identity claim for the task and returns an
// encoded JWT with both the claim and its signature
func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) {
e.lock.RLock()
defer e.lock.RUnlock()

keyset, err := e.activeKeySetLocked()
getActiveKeyset := func() (*keyset, error) {
e.lock.RLock()
defer e.lock.RUnlock()
keyset, err := e.activeKeySetLocked()
return keyset, err
}

// If a key is rotated immediately following a leader election, plans that
// are in-flight may get signed before the new leader has the key. Allow for
// a short timeout-and-retry to avoid rejecting plans
keyset, err := getActiveKeyset()
if err != nil {
return "", err
ctx, cancel := context.WithTimeout(e.srv.shutdownCtx, 5*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return "", err
default:
time.Sleep(50 * time.Millisecond)
keyset, err = getActiveKeyset()
if keyset != nil {
break
}
}
}
}

token := jwt.NewWithClaims(&jwt.SigningMethodEd25519{}, claim)
Expand Down Expand Up @@ -435,7 +456,10 @@ START:
return
default:
// Rate limit how often we attempt replication
limiter.Wait(ctx)
err := limiter.Wait(ctx)
if err != nil {
goto ERR_WAIT // rate limit exceeded
}

ws := store.NewWatchSet()
iter, err := store.RootKeyMetas(ws)
Expand All @@ -461,7 +485,8 @@ START:
getReq := &structs.KeyringGetRootKeyRequest{
KeyID: keyID,
QueryOptions: structs.QueryOptions{
Region: krr.srv.config.Region,
Region: krr.srv.config.Region,
MinQueryIndex: keyMeta.ModifyIndex - 1,
},
}
getResp := &structs.KeyringGetRootKeyResponse{}
Expand All @@ -479,7 +504,7 @@ START:
getReq.AllowStale = true
for _, peer := range krr.getAllPeers() {
err = krr.srv.forwardServer(peer, "Keyring.Get", getReq, getResp)
if err == nil {
if err == nil && getResp.Key != nil {
break
}
}
Expand Down
29 changes: 27 additions & 2 deletions nomad/encrypter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func TestEncrypter_Restore(t *testing.T) {
}
}

// TestKeyringReplicator exercises key replication between servers
func TestKeyringReplicator(t *testing.T) {
// TestEncrypter_KeyringReplication exercises key replication between servers
func TestEncrypter_KeyringReplication(t *testing.T) {

ci.Parallel(t)

Expand Down Expand Up @@ -267,6 +267,31 @@ func TestKeyringReplicator(t *testing.T) {
require.Eventually(t, checkReplicationFn(keyID3),
time.Second*5, time.Second,
"expected keys to be replicated to followers after election")

// Scenario: new members join the cluster

srv4, cleanupSRV4 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 0
c.NumSchedulers = 0
})
defer cleanupSRV4()
srv5, cleanupSRV5 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 0
c.NumSchedulers = 0
})
defer cleanupSRV5()

TestJoin(t, srv4, srv5)
TestJoin(t, srv5, srv1)
servers = []*Server{srv1, srv2, srv3, srv4, srv5}

testutil.WaitForLeader(t, srv4.RPC)
testutil.WaitForLeader(t, srv5.RPC)

require.Eventually(t, checkReplicationFn(keyID3),
time.Second*5, time.Second,
"expected new servers to get replicated keys")

}

func TestEncrypter_EncryptDecrypt(t *testing.T) {
Expand Down
15 changes: 14 additions & 1 deletion nomad/keyring_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,20 @@ func (k *Keyring) Get(args *structs.KeyringGetRootKeyRequest, reply *structs.Key
Key: key,
}
reply.Key = rootKey
reply.Index = keyMeta.ModifyIndex

// Use the last index that affected the policy table
index, err := s.Index(state.TableRootKeyMeta)
if err != nil {
return err
}

// Ensure we never set the index to zero, otherwise a blocking query
// cannot be used. We floor the index at one, since realistically
// the first write must have a higher index.
if index == 0 {
index = 1
}
reply.Index = index
return nil
},
}
Expand Down
8 changes: 4 additions & 4 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Initialize scheduler configuration.
schedulerConfig := s.getOrCreateSchedulerConfig()

// Create the first root key if it doesn't already exist
go s.initializeKeyring(stopCh)

// Initialize the ClusterID
_, _ = s.ClusterID()
// todo: use cluster ID for stuff, later!
Expand Down Expand Up @@ -350,6 +347,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Further clean ups and follow up that don't block RPC consistency

// Create the first root key if it doesn't already exist
go s.initializeKeyring(stopCh)

// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
return err
Expand Down Expand Up @@ -2005,7 +2005,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
break
}
}
// we might have lost leadershuip during the version check
// we might have lost leadership during the version check
if !s.IsLeader() {
return
}
Expand Down

0 comments on commit 5732eb2

Please sign in to comment.