Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read latest configuration independently from main loop #379

Merged
merged 3 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

metrics "github.com/armon/go-metrics"
Expand Down Expand Up @@ -138,6 +139,10 @@ type Raft struct {
// the log/snapshot.
configurations configurations

// Holds a copy of the latest configuration which can be read
// independently from main loop.
latestConfiguration atomic.Value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record.. I looked it up and atomic.Value is itself a struct which means even as a non-pointer field like this it will be aligned to 64 bit boundaries and the atomically accessed pointer within it won't have alignment issues.


// RPC chan comes from the transport layer
rpcCh <-chan RPC

Expand Down Expand Up @@ -603,18 +608,17 @@ func (r *Raft) restoreSnapshot() error {
r.setLastSnapshot(snapshot.Index, snapshot.Term)

// Update the configuration
var conf Configuration
var index uint64
if snapshot.Version > 0 {
r.configurations.committed = snapshot.Configuration
r.configurations.committedIndex = snapshot.ConfigurationIndex
r.configurations.latest = snapshot.Configuration
r.configurations.latestIndex = snapshot.ConfigurationIndex
conf = snapshot.Configuration
index = snapshot.ConfigurationIndex
} else {
configuration := decodePeers(snapshot.Peers, r.trans)
r.configurations.committed = configuration
r.configurations.committedIndex = snapshot.Index
r.configurations.latest = configuration
r.configurations.latestIndex = snapshot.Index
conf = decodePeers(snapshot.Peers, r.trans)
index = snapshot.Index
}
r.setCommittedConfiguration(conf, index)
r.setLatestConfiguration(conf, index)

// Success!
return nil
Expand Down Expand Up @@ -746,19 +750,14 @@ func (r *Raft) VerifyLeader() Future {
}
}

// GetConfiguration returns the latest configuration and its associated index
// currently in use. This may not yet be committed. This must not be called on
// the main thread (which can access the information directly).
// GetConfiguration returns the latest configuration. This may not yet be
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
select {
case <-r.shutdownCh:
configReq.respond(ErrRaftShutdown)
return configReq
case r.configurationsCh <- configReq:
return configReq
}
configReq.configurations = configurations{latest: r.getLatestConfiguration()}
configReq.respond(nil)
return configReq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make case future := <-r.configurationsCh: in leader/follower loop dead code now? Should we remove that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still using it in our tests and adding an alternative would be more work. I decided to leave it as is so that this PR doesn't get out of hand. But it is essentially dead code now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory configurations could be an atomic value, and we only ever load and store. That way I could get rid of the channel and fix the tests. But that would be quite some places to change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 hmm. Making part of the internal workings atomic and accessing it directly in tests could introduce other issues though. Atomics prevent data races but not logical ones. I think the usage here is fine as we are limiting to only a single writer and wrapping concurrent readers so they don't have direct access but making internal state atomic and then tweaking it from tests could get brittle fast if we ever update or make assumptions about whether then atomic value.

Does leaving tests using a different code path to real callers cause issues? If not I'm inclined to leave it to.

Oh we also still use it in takeSnapshot. We also rely in those places on being able to get both the latest and the committed configs which means the GetConfiguration can't do the same job.

So I say it's OK to leave it for now as it is as those call sites need to lower level and more accurate answer, this just optimises for external callers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
Expand Down
57 changes: 37 additions & 20 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,7 @@ func (r *Raft) leaderLoop() {
// value.
if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
if !hasVote(r.configurations.committed, r.localID) {
stepDown = true
}
Expand Down Expand Up @@ -1043,8 +1042,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {

r.dispatchLogs([]*logFuture{&future.logFuture})
index := future.Index()
r.configurations.latest = configuration
r.configurations.latestIndex = index
r.setLatestConfiguration(configuration, index)
r.leaderState.commitment.setConfiguration(configuration)
r.startStopReplication()
}
Expand Down Expand Up @@ -1329,8 +1327,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
return
}
if entry.Index <= r.configurations.latestIndex {
r.configurations.latest = r.configurations.committed
r.configurations.latestIndex = r.configurations.committedIndex
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
Expand Down Expand Up @@ -1365,8 +1362,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
Expand All @@ -1383,15 +1379,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = DecodeConfiguration(entry.Data)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodePeers(entry.Data, r.trans)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
}
}

Expand Down Expand Up @@ -1606,10 +1598,8 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)

// Restore the peer set
r.configurations.latest = reqConfiguration
r.configurations.latestIndex = reqConfigurationIndex
r.configurations.committed = reqConfiguration
r.configurations.committedIndex = reqConfigurationIndex
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)

// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
Expand Down Expand Up @@ -1796,3 +1786,30 @@ func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.candidateFromLeadershipTransfer = true
rpc.Respond(&TimeoutNowResponse{}, nil)
}

// setLatestConfiguration stores the latest configuration and updates a copy of it.
func (r *Raft) setLatestConfiguration(c Configuration, i uint64) {
r.configurations.latest = c
r.configurations.latestIndex = i
r.latestConfiguration.Store(c.Clone())
}

// setCommittedConfiguration stores the committed configuration.
func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) {
r.configurations.committed = c
r.configurations.committedIndex = i
}

// getLatestConfiguration reads the configuration from a copy of the main
// configuration, which means it can be accessed independently from the main
// loop.
func (r *Raft) getLatestConfiguration() Configuration {
// this switch catches the case where this is called without having set
// a configuration previously.
switch c := r.latestConfiguration.Load().(type) {
case Configuration:
return c
default:
return Configuration{}
}
}