Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Speculative execution fixes #979

Merged
merged 6 commits into from
Aug 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
// across the cluster, except to the local peer. If any peer fails requests to
// other peers are aborted. If enough peers have been heard from (based on
// speculation-threshold configuration), and we are missing the others, try to
// speculatively query other members of the shard group.
// speculatively query each other member of each shard group.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
Expand All @@ -323,7 +323,6 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl
defer cancel()

originalPeers := make(map[string]struct{}, len(peerGroups))
pendingResponses := make(map[int32]struct{}, len(peerGroups))
receivedResponses := make(map[int32]struct{}, len(peerGroups))

responses := make(chan struct {
Expand Down Expand Up @@ -357,15 +356,20 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl
for group, peers := range peerGroups {
peer := peers[0]
originalPeers[peer.GetName()] = struct{}{}
pendingResponses[group] = struct{}{}
go askPeer(group, peer)
}

result := make(map[string]PeerResponse)

specCheckTicker := time.NewTicker(5 * time.Millisecond)
var ticker *time.Ticker
var tickChan <-chan time.Time
if speculationThreshold != 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's nice. so the ticker doesn't even get instantiated if speculationThreshold == 1

ticker = time.NewTicker(5 * time.Millisecond)
tickChan = ticker.C
defer ticker.Stop()
}

for len(pendingResponses) > 0 {
for len(receivedResponses) < len(peerGroups) {
select {
case resp := <-responses:
if _, ok := receivedResponses[resp.shardGroup]; ok {
Expand All @@ -379,18 +383,20 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl

result[resp.data.peer.GetName()] = resp.data
receivedResponses[resp.shardGroup] = struct{}{}
delete(pendingResponses, resp.shardGroup)
delete(originalPeers, resp.data.peer.GetName())

case <-specCheckTicker.C:
case <-tickChan:
// Check if it's time to speculate!
percentReceived := 1 - (float64(len(pendingResponses)) / float64(len(peerGroups)))
if percentReceived > speculationThreshold {
percentReceived := float64(len(receivedResponses)) / float64(len(peerGroups))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is pendingResponses still worth anything? It seems superfluous. The current uses are:

  1. To drive the main loop (for len(pendingResponses) > 0) which could be for len(receivedResponses) < len(peerGroups)
  2. To decide upon who to speculate which could loop on peerGroups and continue if in recievedResponses.

I'm not sure it's worth the extra allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's changing the logic, but it's actually making it correct, so that's good.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the change in logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this particular line should not change logic. but the line below it does.
i explained in depth in the commit message
(i generally try to make commit messages explain everything)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was my analysis as well 👍

if percentReceived >= speculationThreshold {
// kick off speculative queries to other members now
specCheckTicker.Stop()
ticker.Stop()
speculativeAttempts.Inc()
for shardGroup := range pendingResponses {
eligiblePeers := peerGroups[shardGroup][1:]
for shardGroup, peers := range peerGroups {
if _, ok := receivedResponses[shardGroup]; ok {
continue
}
eligiblePeers := peers[1:]
for _, peer := range eligiblePeers {
speculativeRequests.Inc()
go askPeer(shardGroup, peer)
Expand Down
13 changes: 10 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type partitionCandidates struct {
nodes []Node
}

// return the list of nodes to broadcast requests to
// MembersForQuery returns the list of nodes to broadcast requests to
// If partitions are assinged to nodes in groups
// (a[0,1], b[0,1], c[2,3], d[2,3] as opposed to a[0,1], b[0,2], c[1,3], d[2,3]),
// only 1 member per partition is returned.
Expand Down Expand Up @@ -133,7 +133,10 @@ func MembersForQuery() ([]Node, error) {
count := int(atomic.AddUint32(&counter, 1))

LOOP:
// for every partition...
for _, candidates := range membersMap {

// prefer the local node if it serves this partition
if candidates.nodes[0].GetName() == thisNode.GetName() {
if _, ok := selectedMembers[thisNode.GetName()]; !ok {
selectedMembers[thisNode.GetName()] = struct{}{}
Expand All @@ -142,14 +145,18 @@ LOOP:
continue LOOP
}

// for remote nodes, try to pick one we've already included

for _, n := range candidates.nodes {
if _, ok := selectedMembers[n.GetName()]; ok {
continue LOOP
}
}

// if no nodes have been selected yet then grab a node from
// the set of available nodes in such a way that nodes are
// weighted fairly across MembersForQuery calls

selected := candidates.nodes[count%len(candidates.nodes)]
selectedMembers[selected.GetName()] = struct{}{}
answer = append(answer, selected)
Expand All @@ -159,7 +166,7 @@ LOOP:
}

// MembersForSpeculativeQuery returns a prioritized list of nodes for each shard group
// TODO - this assumes that the partition set for each node is perfectly aligned
// keyed by the first (lowest) partition of their shard group
func MembersForSpeculativeQuery() (map[int32][]Node, error) {
thisNode := Manager.ThisNode()
allNodes := Manager.MemberList()
Expand Down Expand Up @@ -192,7 +199,7 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) {
}

for _, shard := range membersMap {
// Shuffle to avoid always choosing the same peer firsts
// Shuffle to avoid always choosing the same peer first
for i := len(shard) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
shard[i], shard[j] = shard[j], shard[i]
Expand Down