Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #979 from grafana/speculative-fixes
Browse files Browse the repository at this point in the history
Speculative execution fixes
  • Loading branch information
Dieterbe authored Aug 7, 2018
2 parents adbae33 + 3428869 commit 8d392cb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
30 changes: 18 additions & 12 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
// across the cluster, except to the local peer. If any peer fails requests to
// other peers are aborted. If enough peers have been heard from (based on
// speculation-threshold configuration), and we are missing the others, try to
// speculatively query other members of the shard group.
// speculatively query each other member of each shard group.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
Expand All @@ -323,7 +323,6 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl
defer cancel()

originalPeers := make(map[string]struct{}, len(peerGroups))
pendingResponses := make(map[int32]struct{}, len(peerGroups))
receivedResponses := make(map[int32]struct{}, len(peerGroups))

responses := make(chan struct {
Expand Down Expand Up @@ -357,15 +356,20 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl
for group, peers := range peerGroups {
peer := peers[0]
originalPeers[peer.GetName()] = struct{}{}
pendingResponses[group] = struct{}{}
go askPeer(group, peer)
}

result := make(map[string]PeerResponse)

specCheckTicker := time.NewTicker(5 * time.Millisecond)
var ticker *time.Ticker
var tickChan <-chan time.Time
if speculationThreshold != 1 {
ticker = time.NewTicker(5 * time.Millisecond)
tickChan = ticker.C
defer ticker.Stop()
}

for len(pendingResponses) > 0 {
for len(receivedResponses) < len(peerGroups) {
select {
case resp := <-responses:
if _, ok := receivedResponses[resp.shardGroup]; ok {
Expand All @@ -379,18 +383,20 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl

result[resp.data.peer.GetName()] = resp.data
receivedResponses[resp.shardGroup] = struct{}{}
delete(pendingResponses, resp.shardGroup)
delete(originalPeers, resp.data.peer.GetName())

case <-specCheckTicker.C:
case <-tickChan:
// Check if it's time to speculate!
percentReceived := 1 - (float64(len(pendingResponses)) / float64(len(peerGroups)))
if percentReceived > speculationThreshold {
percentReceived := float64(len(receivedResponses)) / float64(len(peerGroups))
if percentReceived >= speculationThreshold {
// kick off speculative queries to other members now
specCheckTicker.Stop()
ticker.Stop()
speculativeAttempts.Inc()
for shardGroup := range pendingResponses {
eligiblePeers := peerGroups[shardGroup][1:]
for shardGroup, peers := range peerGroups {
if _, ok := receivedResponses[shardGroup]; ok {
continue
}
eligiblePeers := peers[1:]
for _, peer := range eligiblePeers {
speculativeRequests.Inc()
go askPeer(shardGroup, peer)
Expand Down
13 changes: 10 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type partitionCandidates struct {
nodes []Node
}

// return the list of nodes to broadcast requests to
// MembersForQuery returns the list of nodes to broadcast requests to
// If partitions are assinged to nodes in groups
// (a[0,1], b[0,1], c[2,3], d[2,3] as opposed to a[0,1], b[0,2], c[1,3], d[2,3]),
// only 1 member per partition is returned.
Expand Down Expand Up @@ -133,7 +133,10 @@ func MembersForQuery() ([]Node, error) {
count := int(atomic.AddUint32(&counter, 1))

LOOP:
// for every partition...
for _, candidates := range membersMap {

// prefer the local node if it serves this partition
if candidates.nodes[0].GetName() == thisNode.GetName() {
if _, ok := selectedMembers[thisNode.GetName()]; !ok {
selectedMembers[thisNode.GetName()] = struct{}{}
Expand All @@ -142,14 +145,18 @@ LOOP:
continue LOOP
}

// for remote nodes, try to pick one we've already included

for _, n := range candidates.nodes {
if _, ok := selectedMembers[n.GetName()]; ok {
continue LOOP
}
}

// if no nodes have been selected yet then grab a node from
// the set of available nodes in such a way that nodes are
// weighted fairly across MembersForQuery calls

selected := candidates.nodes[count%len(candidates.nodes)]
selectedMembers[selected.GetName()] = struct{}{}
answer = append(answer, selected)
Expand All @@ -159,7 +166,7 @@ LOOP:
}

// MembersForSpeculativeQuery returns a prioritized list of nodes for each shard group
// TODO - this assumes that the partition set for each node is perfectly aligned
// keyed by the first (lowest) partition of their shard group
func MembersForSpeculativeQuery() (map[int32][]Node, error) {
thisNode := Manager.ThisNode()
allNodes := Manager.MemberList()
Expand Down Expand Up @@ -192,7 +199,7 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) {
}

for _, shard := range membersMap {
// Shuffle to avoid always choosing the same peer firsts
// Shuffle to avoid always choosing the same peer first
for i := len(shard) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
shard[i], shard[j] = shard[j], shard[i]
Expand Down

0 comments on commit 8d392cb

Please sign in to comment.