Skip to content

Commit

Permalink
Merge pull request #5911 from hashicorp/b-rpc-consistent-reads
Browse files Browse the repository at this point in the history
Block rpc handling until state store is caught up
  • Loading branch information
Mahmood Ali authored Aug 20, 2019
2 parents 6fee34f + 3fb788b commit 97705ed
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 18 deletions.
4 changes: 4 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}

s.setConsistentReadReady()

return nil
}

Expand Down Expand Up @@ -767,6 +769,8 @@ func (s *Server) iterateJobStatusMetrics(jobs *memdb.ResultIterator) {
func (s *Server) revokeLeadership() error {
defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now())

s.resetConsistentReadReady()

// Clear the leader token since we are no longer the leader.
s.setLeaderAcl("")

Expand Down
19 changes: 19 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,25 @@ func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
require.Nil(t, s1.revokeLeadership())
}

func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

testutil.WaitForResult(func() (bool, error) {
return s1.isReadyForConsistentReads(), nil
}, func(err error) {
require.Fail(t, "should have finished establish leader loop")
})

require.Nil(t, s1.revokeLeadership())
require.False(t, s1.isReadyForConsistentReads())

ch := make(chan struct{})
require.Nil(t, s1.establishLeadership(ch))
require.True(t, s1.isReadyForConsistentReads())
}

// Test doing an inplace upgrade on a server from raft protocol 2 to 3
// This verifies that removing the server and adding it back with a uuid works
// even if the server's address stays the same.
Expand Down
8 changes: 6 additions & 2 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ CHECK_LEADER:
isLeader, remoteServer := r.getLeader()

// Handle the case we are the leader
if isLeader {
if isLeader && r.Server.isReadyForConsistentReads() {
return false, nil
}

Expand All @@ -457,7 +457,11 @@ CHECK_LEADER:
}
}

// No leader found and hold time exceeded
// hold time exceeeded without being ready to respond
if isLeader {
return true, structs.ErrNotReadyForConsistentReads
}

return true, structs.ErrNoLeader
}

Expand Down
41 changes: 41 additions & 0 deletions nomad/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,47 @@ func TestRPC_forwardLeader(t *testing.T) {
}
}

func TestRPC_WaitForConsistentReads(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.RPCHoldTimeout = 20 * time.Millisecond
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

isLeader, _ := s1.getLeader()
require.True(t, isLeader)
require.True(t, s1.isReadyForConsistentReads())

s1.resetConsistentReadReady()
require.False(t, s1.isReadyForConsistentReads())

codec := rpcClient(t, s1)

get := &structs.JobListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}

// check timeout while waiting for consistency
var resp structs.JobListResponse
err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp)
require.Error(t, err)
require.Contains(t, err.Error(), structs.ErrNotReadyForConsistentReads.Error())

// check we wait and block
go func() {
time.Sleep(5 * time.Millisecond)
s1.setConsistentReadReady()
}()

err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp)
require.NoError(t, err)

}

func TestRPC_forwardRegion(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
Expand Down
18 changes: 18 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ type Server struct {
// join/leave from the region.
reconcileCh chan serf.Member

// used to track when the server is ready to serve consistent reads, updated atomically
readyForConsistentReads int32

// eventCh is used to receive events from the serf cluster
eventCh chan serf.Event

Expand Down Expand Up @@ -1400,6 +1403,21 @@ func (s *Server) getLeaderAcl() string {
return s.leaderAcl
}

// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
}

// Atomically reset readiness state flag on leadership revoke
func (s *Server) resetConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 0)
}

// Returns true if this server is ready to serve consistent reads
func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
}

// Regions returns the known regions in the cluster.
func (s *Server) Regions() []string {
s.peerLock.RLock()
Expand Down
34 changes: 18 additions & 16 deletions nomad/structs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
)

const (
errNoLeader = "No cluster leader"
errNoRegionPath = "No path to region"
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errNoNodeConn = "No path to node"
errUnknownMethod = "Unknown rpc method"
errUnknownNomadVersion = "Unable to determine Nomad version"
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
errNoLeader = "No cluster leader"
errNotReadyForConsistentReads = "Not ready to serve consistent reads"
errNoRegionPath = "No path to region"
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errNoNodeConn = "No path to node"
errUnknownMethod = "Unknown rpc method"
errUnknownNomadVersion = "Unable to determine Nomad version"
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"

// Prefix based errors that are used to check if the error is of a given
// type. These errors should be created with the associated constructor.
Expand All @@ -26,14 +27,15 @@ const (
)

var (
ErrNoLeader = errors.New(errNoLeader)
ErrNoRegionPath = errors.New(errNoRegionPath)
ErrTokenNotFound = errors.New(errTokenNotFound)
ErrPermissionDenied = errors.New(errPermissionDenied)
ErrNoNodeConn = errors.New(errNoNodeConn)
ErrUnknownMethod = errors.New(errUnknownMethod)
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
ErrNoLeader = errors.New(errNoLeader)
ErrNotReadyForConsistentReads = errors.New(errNotReadyForConsistentReads)
ErrNoRegionPath = errors.New(errNoRegionPath)
ErrTokenNotFound = errors.New(errTokenNotFound)
ErrPermissionDenied = errors.New(errPermissionDenied)
ErrNoNodeConn = errors.New(errNoNodeConn)
ErrUnknownMethod = errors.New(errUnknownMethod)
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
)

// IsErrNoLeader returns whether the error is due to there being no leader.
Expand Down

0 comments on commit 97705ed

Please sign in to comment.