Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read latest configuration independently from main loop #379

Merged
merged 3 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ type Raft struct {
// the log/snapshot.
configurations configurations

// Holds a copy of the latest configuration which can be read
// independently from main loop.
latestConfiguration Configuration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be returning the latest configuration or the latest comitted configuration? I may well be misremembering but I feel like we shouldn't be using/returning the config until it's committed right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nm. GetConfiguration docs state:

GetConfiguration returns the latest configuration. This may not yet be currently in use. This may not yet be committed...

SO this preserves that behaviour although frankly that seems strange to me that Consul or an application would "see" a config that tis not the one actually in use and may never get committed by a quorum 🤔.

I guess that is an edge case issue we can solve separately though this preserves same behaviour while fixing the performance issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about the same thing, but wanted to preserve current behaviour.

latestConfigurationLock sync.RWMutex
Copy link
Member

@banks banks Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would probably work well for most use cases but there is a risk that very heavy reads of GetConfiguration can starve the main loop on a config change - i.e. cause raft to block indefinitely.

For example in Consul if there were some script calling agent info on a server that accidentally got into a hot loop and called it constantly, it could bring the whole of raft grinding to a halt. While that's hopefully not super likely it feels like a pretty bad outcome.

I think in this case atomic.Value might be a better way to syncronize Load and Store of this copy. Then readers can read without blocking using latestConfiguration.Load() and high contention might slow the leaderloop a little due to cross-processor core contention on the cache line but it can't starve it indefinitely.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it would be better to use atomic and I will change it.


// RPC chan comes from the transport layer
rpcCh <-chan RPC

Expand Down Expand Up @@ -603,18 +608,17 @@ func (r *Raft) restoreSnapshot() error {
r.setLastSnapshot(snapshot.Index, snapshot.Term)

// Update the configuration
var conf Configuration
var index uint64
if snapshot.Version > 0 {
r.configurations.committed = snapshot.Configuration
r.configurations.committedIndex = snapshot.ConfigurationIndex
r.configurations.latest = snapshot.Configuration
r.configurations.latestIndex = snapshot.ConfigurationIndex
conf = snapshot.Configuration
index = snapshot.ConfigurationIndex
} else {
configuration := decodePeers(snapshot.Peers, r.trans)
r.configurations.committed = configuration
r.configurations.committedIndex = snapshot.Index
r.configurations.latest = configuration
r.configurations.latestIndex = snapshot.Index
conf = decodePeers(snapshot.Peers, r.trans)
index = snapshot.Index
}
r.setCommittedConfiguration(conf, index)
r.setLatestConfiguration(conf, index)

// Success!
return nil
Expand Down Expand Up @@ -746,19 +750,14 @@ func (r *Raft) VerifyLeader() Future {
}
}

// GetConfiguration returns the latest configuration and its associated index
// currently in use. This may not yet be committed. This must not be called on
// the main thread (which can access the information directly).
// GetConfiguration returns the latest configuration. This may not yet be
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
select {
case <-r.shutdownCh:
configReq.respond(ErrRaftShutdown)
return configReq
case r.configurationsCh <- configReq:
return configReq
}
configReq.configurations = configurations{latest: r.getLatestConfiguration()}
configReq.respond(nil)
return configReq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make case future := <-r.configurationsCh: in leader/follower loop dead code now? Should we remove that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still using it in our tests and adding an alternative would be more work. I decided to leave it as is so that this PR doesn't get out of hand. But it is essentially dead code now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory configurations could be an atomic value, and we only ever load and store. That way I could get rid of the channel and fix the tests. But that would be quite some places to change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 hmm. Making part of the internal workings atomic and accessing it directly in tests could introduce other issues though. Atomics prevent data races but not logical ones. I think the usage here is fine as we are limiting to only a single writer and wrapping concurrent readers so they don't have direct access but making internal state atomic and then tweaking it from tests could get brittle fast if we ever update or make assumptions about whether then atomic value.

Does leaving tests using a different code path to real callers cause issues? If not I'm inclined to leave it to.

Oh we also still use it in takeSnapshot. We also rely in those places on being able to get both the latest and the committed configs which means the GetConfiguration can't do the same job.

So I say it's OK to leave it for now as it is as those call sites need to lower level and more accurate answer, this just optimises for external callers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
Expand Down
54 changes: 34 additions & 20 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,7 @@ func (r *Raft) leaderLoop() {
// value.
if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
if !hasVote(r.configurations.committed, r.localID) {
stepDown = true
}
Expand Down Expand Up @@ -1043,8 +1042,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {

r.dispatchLogs([]*logFuture{&future.logFuture})
index := future.Index()
r.configurations.latest = configuration
r.configurations.latestIndex = index
r.setLatestConfiguration(configuration, index)
r.leaderState.commitment.setConfiguration(configuration)
r.startStopReplication()
}
Expand Down Expand Up @@ -1329,8 +1327,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
return
}
if entry.Index <= r.configurations.latestIndex {
r.configurations.latest = r.configurations.committed
r.configurations.latestIndex = r.configurations.committedIndex
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
Expand Down Expand Up @@ -1365,8 +1362,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
Expand All @@ -1383,15 +1379,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = DecodeConfiguration(entry.Data)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodePeers(entry.Data, r.trans)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
}
}

Expand Down Expand Up @@ -1606,10 +1598,8 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)

// Restore the peer set
r.configurations.latest = reqConfiguration
r.configurations.latestIndex = reqConfigurationIndex
r.configurations.committed = reqConfiguration
r.configurations.committedIndex = reqConfigurationIndex
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)

// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
Expand Down Expand Up @@ -1796,3 +1786,27 @@ func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.candidateFromLeadershipTransfer = true
rpc.Respond(&TimeoutNowResponse{}, nil)
}

// setLatestConfiguration stores the latest configuration and updates a copy of it.
func (r *Raft) setLatestConfiguration(c Configuration, i uint64) {
r.configurations.latest = c
r.configurations.latestIndex = i
r.latestConfigurationLock.Lock()
r.latestConfiguration = c.Clone()
r.latestConfigurationLock.Unlock()
}

// setCommittedConfiguration stores the committed configuration.
func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) {
r.configurations.committed = c
r.configurations.committedIndex = i
}

// getLatestConfiguration reads the configuration from a copy of the main
// configuration, which means it can be accessed independently from the main
// loop.
func (r *Raft) getLatestConfiguration() Configuration {
r.latestConfigurationLock.RLock()
defer r.latestConfigurationLock.RUnlock()
return r.latestConfiguration.Clone()
}