diff --git a/nomad/leader.go b/nomad/leader.go index 0b08fc60cc9d..e3e55d49f935 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -289,6 +289,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return err } + s.setConsistentReadReady() + return nil } @@ -714,6 +716,8 @@ func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) { func (s *Server) revokeLeadership() error { defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now()) + s.resetConsistentReadReady() + // Clear the leader token since we are no longer the leader. s.setLeaderAcl("") diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 7cb3a0ed2bf1..19ad39df88e2 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1126,6 +1126,25 @@ func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) { require.Nil(t, s1.revokeLeadership()) } +func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) { + s1 := TestServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + testutil.WaitForResult(func() (bool, error) { + return s1.isReadyForConsistentReads(), nil + }, func(err error) { + require.Fail(t, "should have finished establish leader loop") + }) + + require.Nil(t, s1.revokeLeadership()) + require.False(t, s1.isReadyForConsistentReads()) + + ch := make(chan struct{}) + require.Nil(t, s1.establishLeadership(ch)) + require.True(t, s1.isReadyForConsistentReads()) +} + // Test doing an inplace upgrade on a server from raft protocol 2 to 3 // This verifies that removing the server and adding it back with a uuid works // even if the server's address stays the same. diff --git a/nomad/rpc.go b/nomad/rpc.go index ed662762a94e..e6ea18fcb2d6 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -432,7 +432,7 @@ CHECK_LEADER: isLeader, remoteServer := r.getLeader() // Handle the case we are the leader - if isLeader { + if isLeader && r.Server.isReadyForConsistentReads() { return false, nil } @@ -457,7 +457,11 @@ CHECK_LEADER: } } - // No leader found and hold time exceeded + // hold time exceeeded without being ready to respond + if isLeader { + return true, structs.ErrNotReadyForConsistentReads + } + return true, structs.ErrNoLeader } diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index a9413e525b74..b97667063209 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -74,6 +74,47 @@ func TestRPC_forwardLeader(t *testing.T) { } } +func TestRPC_WaitForConsistentReads(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.RPCHoldTimeout = 20 * time.Millisecond + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + isLeader, _ := s1.getLeader() + require.True(t, isLeader) + require.True(t, s1.isReadyForConsistentReads()) + + s1.resetConsistentReadReady() + require.False(t, s1.isReadyForConsistentReads()) + + codec := rpcClient(t, s1) + + get := &structs.JobListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "default", + }, + } + + // check timeout while waiting for consistency + var resp structs.JobListResponse + err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp) + require.Error(t, err) + require.Contains(t, err.Error(), structs.ErrNotReadyForConsistentReads.Error()) + + // check we wait and block + go func() { + time.Sleep(5 * time.Millisecond) + s1.setConsistentReadReady() + }() + + err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp) + require.NoError(t, err) + +} + func TestRPC_forwardRegion(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) diff --git a/nomad/server.go b/nomad/server.go index 2df2dca6a2a8..ecf9d0533417 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -167,6 +167,9 @@ type Server struct { // join/leave from the region. reconcileCh chan serf.Member + // used to track when the server is ready to serve consistent reads, updated atomically + readyForConsistentReads int32 + // eventCh is used to receive events from the serf cluster eventCh chan serf.Event @@ -1400,6 +1403,21 @@ func (s *Server) getLeaderAcl() string { return s.leaderAcl } +// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write +func (s *Server) setConsistentReadReady() { + atomic.StoreInt32(&s.readyForConsistentReads, 1) +} + +// Atomically reset readiness state flag on leadership revoke +func (s *Server) resetConsistentReadReady() { + atomic.StoreInt32(&s.readyForConsistentReads, 0) +} + +// Returns true if this server is ready to serve consistent reads +func (s *Server) isReadyForConsistentReads() bool { + return atomic.LoadInt32(&s.readyForConsistentReads) == 1 +} + // Regions returns the known regions in the cluster. func (s *Server) Regions() []string { s.peerLock.RLock() diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index f1646125048f..5b9166d3fa3e 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -7,14 +7,15 @@ import ( ) const ( - errNoLeader = "No cluster leader" - errNoRegionPath = "No path to region" - errTokenNotFound = "ACL token not found" - errPermissionDenied = "Permission denied" - errNoNodeConn = "No path to node" - errUnknownMethod = "Unknown rpc method" - errUnknownNomadVersion = "Unable to determine Nomad version" - errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" + errNoLeader = "No cluster leader" + errNotReadyForConsistentReads = "Not ready to serve consistent reads" + errNoRegionPath = "No path to region" + errTokenNotFound = "ACL token not found" + errPermissionDenied = "Permission denied" + errNoNodeConn = "No path to node" + errUnknownMethod = "Unknown rpc method" + errUnknownNomadVersion = "Unable to determine Nomad version" + errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" // Prefix based errors that are used to check if the error is of a given // type. These errors should be created with the associated constructor. @@ -26,14 +27,15 @@ const ( ) var ( - ErrNoLeader = errors.New(errNoLeader) - ErrNoRegionPath = errors.New(errNoRegionPath) - ErrTokenNotFound = errors.New(errTokenNotFound) - ErrPermissionDenied = errors.New(errPermissionDenied) - ErrNoNodeConn = errors.New(errNoNodeConn) - ErrUnknownMethod = errors.New(errUnknownMethod) - ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) - ErrNodeLacksRpc = errors.New(errNodeLacksRpc) + ErrNoLeader = errors.New(errNoLeader) + ErrNotReadyForConsistentReads = errors.New(errNotReadyForConsistentReads) + ErrNoRegionPath = errors.New(errNoRegionPath) + ErrTokenNotFound = errors.New(errTokenNotFound) + ErrPermissionDenied = errors.New(errPermissionDenied) + ErrNoNodeConn = errors.New(errNoNodeConn) + ErrUnknownMethod = errors.New(errUnknownMethod) + ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) + ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ) // IsErrNoLeader returns whether the error is due to there being no leader.