diff --git a/nomad/encrypter.go b/nomad/encrypter.go index e6e05bcce79e..f13dc84be537 100644 --- a/nomad/encrypter.go +++ b/nomad/encrypter.go @@ -155,12 +155,33 @@ const keyIDHeader = "kid" // SignClaims signs the identity claim for the task and returns an // encoded JWT with both the claim and its signature func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) { - e.lock.RLock() - defer e.lock.RUnlock() - keyset, err := e.activeKeySetLocked() + getActiveKeyset := func() (*keyset, error) { + e.lock.RLock() + defer e.lock.RUnlock() + keyset, err := e.activeKeySetLocked() + return keyset, err + } + + // If a key is rotated immediately following a leader election, plans that + // are in-flight may get signed before the new leader has the key. Allow for + // a short timeout-and-retry to avoid rejecting plans + keyset, err := getActiveKeyset() if err != nil { - return "", err + ctx, cancel := context.WithTimeout(e.srv.shutdownCtx, 5*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + return "", err + default: + time.Sleep(50 * time.Millisecond) + keyset, err = getActiveKeyset() + if keyset != nil { + break + } + } + } } token := jwt.NewWithClaims(&jwt.SigningMethodEd25519{}, claim) @@ -435,7 +456,10 @@ START: return default: // Rate limit how often we attempt replication - limiter.Wait(ctx) + err := limiter.Wait(ctx) + if err != nil { + goto ERR_WAIT // rate limit exceeded + } ws := store.NewWatchSet() iter, err := store.RootKeyMetas(ws) @@ -461,7 +485,8 @@ START: getReq := &structs.KeyringGetRootKeyRequest{ KeyID: keyID, QueryOptions: structs.QueryOptions{ - Region: krr.srv.config.Region, + Region: krr.srv.config.Region, + MinQueryIndex: keyMeta.ModifyIndex - 1, }, } getResp := &structs.KeyringGetRootKeyResponse{} @@ -479,7 +504,7 @@ START: getReq.AllowStale = true for _, peer := range krr.getAllPeers() { err = krr.srv.forwardServer(peer, "Keyring.Get", getReq, getResp) - if err == nil { + if err == nil && getResp.Key != nil { break } } diff --git a/nomad/encrypter_test.go b/nomad/encrypter_test.go index b006b503d1cc..98d71758600c 100644 --- a/nomad/encrypter_test.go +++ b/nomad/encrypter_test.go @@ -123,8 +123,8 @@ func TestEncrypter_Restore(t *testing.T) { } } -// TestKeyringReplicator exercises key replication between servers -func TestKeyringReplicator(t *testing.T) { +// TestEncrypter_KeyringReplication exercises key replication between servers +func TestEncrypter_KeyringReplication(t *testing.T) { ci.Parallel(t) @@ -267,6 +267,31 @@ func TestKeyringReplicator(t *testing.T) { require.Eventually(t, checkReplicationFn(keyID3), time.Second*5, time.Second, "expected keys to be replicated to followers after election") + + // Scenario: new members join the cluster + + srv4, cleanupSRV4 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 0 + c.NumSchedulers = 0 + }) + defer cleanupSRV4() + srv5, cleanupSRV5 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 0 + c.NumSchedulers = 0 + }) + defer cleanupSRV5() + + TestJoin(t, srv4, srv5) + TestJoin(t, srv5, srv1) + servers = []*Server{srv1, srv2, srv3, srv4, srv5} + + testutil.WaitForLeader(t, srv4.RPC) + testutil.WaitForLeader(t, srv5.RPC) + + require.Eventually(t, checkReplicationFn(keyID3), + time.Second*5, time.Second, + "expected new servers to get replicated keys") + } func TestEncrypter_EncryptDecrypt(t *testing.T) { diff --git a/nomad/keyring_endpoint.go b/nomad/keyring_endpoint.go index 78d4808ce298..9b7e27ad90f7 100644 --- a/nomad/keyring_endpoint.go +++ b/nomad/keyring_endpoint.go @@ -264,7 +264,20 @@ func (k *Keyring) Get(args *structs.KeyringGetRootKeyRequest, reply *structs.Key Key: key, } reply.Key = rootKey - reply.Index = keyMeta.ModifyIndex + + // Use the last index that affected the policy table + index, err := s.Index(state.TableRootKeyMeta) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query + // cannot be used. We floor the index at one, since realistically + // the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index return nil }, } diff --git a/nomad/leader.go b/nomad/leader.go index 141b1df1035e..b6b2895fae67 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -303,9 +303,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Initialize scheduler configuration. schedulerConfig := s.getOrCreateSchedulerConfig() - // Create the first root key if it doesn't already exist - go s.initializeKeyring(stopCh) - // Initialize the ClusterID _, _ = s.ClusterID() // todo: use cluster ID for stuff, later! @@ -350,6 +347,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Further clean ups and follow up that don't block RPC consistency + // Create the first root key if it doesn't already exist + go s.initializeKeyring(stopCh) + // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { return err @@ -2005,7 +2005,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) { break } } - // we might have lost leadershuip during the version check + // we might have lost leadership during the version check if !s.IsLeader() { return }