From 45f73524bba77d70db380df5e3e6989f8dc31610 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 21 Oct 2022 10:04:39 -0400 Subject: [PATCH] leader: initialize keyring after we have consistent reads Wait until we're sure the FSM is current before we try to initialize the keyring. Also, if a key is rotated immediately following a leader election, plans that are in-flight may get signed before the new leader has the key. Allow for a short timeout-and-retry to avoid rejecting plans --- nomad/encrypter.go | 29 +++++++++++++++++++++++++---- nomad/leader.go | 8 ++++---- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/nomad/encrypter.go b/nomad/encrypter.go index 9b6a13864b87..f13dc84be537 100644 --- a/nomad/encrypter.go +++ b/nomad/encrypter.go @@ -155,12 +155,33 @@ const keyIDHeader = "kid" // SignClaims signs the identity claim for the task and returns an // encoded JWT with both the claim and its signature func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) { - e.lock.RLock() - defer e.lock.RUnlock() - keyset, err := e.activeKeySetLocked() + getActiveKeyset := func() (*keyset, error) { + e.lock.RLock() + defer e.lock.RUnlock() + keyset, err := e.activeKeySetLocked() + return keyset, err + } + + // If a key is rotated immediately following a leader election, plans that + // are in-flight may get signed before the new leader has the key. Allow for + // a short timeout-and-retry to avoid rejecting plans + keyset, err := getActiveKeyset() if err != nil { - return "", err + ctx, cancel := context.WithTimeout(e.srv.shutdownCtx, 5*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + return "", err + default: + time.Sleep(50 * time.Millisecond) + keyset, err = getActiveKeyset() + if keyset != nil { + break + } + } + } } token := jwt.NewWithClaims(&jwt.SigningMethodEd25519{}, claim) diff --git a/nomad/leader.go b/nomad/leader.go index 141b1df1035e..b6b2895fae67 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -303,9 +303,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Initialize scheduler configuration. schedulerConfig := s.getOrCreateSchedulerConfig() - // Create the first root key if it doesn't already exist - go s.initializeKeyring(stopCh) - // Initialize the ClusterID _, _ = s.ClusterID() // todo: use cluster ID for stuff, later! @@ -350,6 +347,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Further clean ups and follow up that don't block RPC consistency + // Create the first root key if it doesn't already exist + go s.initializeKeyring(stopCh) + // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { return err @@ -2005,7 +2005,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) { break } } - // we might have lost leadershuip during the version check + // we might have lost leadership during the version check if !s.IsLeader() { return }