Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry all servers on RPC call failure #1735

Merged
merged 14 commits into from
Sep 27, 2016
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
589 changes: 344 additions & 245 deletions client/client.go

Large diffs are not rendered by default.

779 changes: 0 additions & 779 deletions client/rpcproxy/rpcproxy.go

This file was deleted.

818 changes: 0 additions & 818 deletions client/rpcproxy/rpcproxy_test.go

This file was deleted.

84 changes: 0 additions & 84 deletions client/rpcproxy/server_endpoint.go

This file was deleted.

77 changes: 0 additions & 77 deletions client/rpcproxy/server_endpoint_test.go

This file was deleted.

111 changes: 111 additions & 0 deletions client/serverlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package client

import (
"math/rand"
"net"
"sort"
"strings"
"sync"
)

// serverlist is a prioritized randomized list of nomad servers. Users should
// call all() to retrieve the full list, followed by failed(e) on each endpoint
// that's failed and good(e) when a valid endpoint is found.
type serverlist struct {
e endpoints
mu sync.RWMutex
}

func newServerList() *serverlist {
return &serverlist{}
}

// set the server list to a new list. The new list will be shuffled and sorted
// by priority.
func (s *serverlist) set(in endpoints) {
s.mu.Lock()
s.e = in
s.mu.Unlock()
}

// all returns a copy of the full server list, shuffled and then sorted by
// priority
func (s *serverlist) all() endpoints {
s.mu.RLock()
out := make(endpoints, len(s.e))
copy(out, s.e)
s.mu.RUnlock()

// Randomize the order
for i, j := range rand.Perm(len(out)) {
out[i], out[j] = out[j], out[i]
}

// Sort by priority
sort.Sort(out)
return out
}

// failed endpoint will be deprioritized if its still in the list.
func (s *serverlist) failed(e *endpoint) {
s.mu.Lock()
defer s.mu.Unlock()
for _, cur := range s.e {
if cur.equal(e) {
cur.priority++
return
}
}
}

// good endpoint will get promoted to the highest priority if it's still in the
// list.
func (s *serverlist) good(e *endpoint) {
s.mu.Lock()
defer s.mu.Unlock()
for _, cur := range s.e {
if cur.equal(e) {
cur.priority = 0
return
}
}
}

func (e endpoints) Len() int {
return len(e)
}

func (e endpoints) Less(i int, j int) bool {
// Sort only by priority as endpoints should be shuffled and ordered
// only by priority
return e[i].priority < e[j].priority
}

func (e endpoints) Swap(i int, j int) {
e[i], e[j] = e[j], e[i]
}

type endpoints []*endpoint

func (e endpoints) String() string {
names := make([]string, 0, len(e))
for _, endpoint := range e {
names = append(names, endpoint.name)
}
return strings.Join(names, ",")
}

type endpoint struct {
name string
addr net.Addr

// 0 being the highest priority
priority int
}

// equal returns true if the name and addr match between two endpoints.
// Priority is ignored because the same endpoint may be added by discovery and
// heartbeating with different priorities.
func (e *endpoint) equal(o *endpoint) bool {
return e.name == o.name && e.addr == o.addr
}
117 changes: 117 additions & 0 deletions client/serverlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package client

import (
"log"
"os"
"strings"
"testing"
)

func TestServerList(t *testing.T) {
s := newServerList()

// New lists should be empty
if e := s.all(); len(e) != 0 {
t.Fatalf("expected empty list to return an empty list, but received: %+q", e)
}

mklist := func() endpoints {
return endpoints{
&endpoint{"b", nil, 1},
&endpoint{"c", nil, 1},
&endpoint{"g", nil, 2},
&endpoint{"d", nil, 1},
&endpoint{"e", nil, 1},
&endpoint{"f", nil, 1},
&endpoint{"h", nil, 2},
&endpoint{"a", nil, 0},
}
}
s.set(mklist())

orig := mklist()
all := s.all()
if len(all) != len(orig) {
t.Fatalf("expected %d endpoints but only have %d", len(orig), len(all))
}

// Assert list is properly randomized+sorted
for i, pri := range []int{0, 1, 1, 1, 1, 1, 2, 2} {
if all[i].priority != pri {
t.Errorf("expected endpoint %d (%+q) to be priority %d", i, all[i], pri)
}
}

// Subsequent sets should reshuffle (try multiple times as they may
// shuffle in the same order)
tries := 0
max := 3
for ; tries < max; tries++ {
if s.all().String() == s.all().String() {
// eek, matched; try again in case we just got unlucky
continue
}
break
}
if tries == max {
t.Fatalf("after %d attempts servers were still not random reshuffled", tries)
}

// Mark an endpoint as failed enough that it should be at the end of the list
sa := &endpoint{"a", nil, 0}
s.failed(sa)
s.failed(sa)
s.failed(sa)
all2 := s.all()
if len(all2) != len(orig) {
t.Fatalf("marking should not have changed list length")
}
if all2[len(all)-1].name != sa.name {
t.Fatalf("failed endpoint should be at end of list: %+q", all2)
}

// But if the bad endpoint succeeds even once it should be bumped to the top group
s.good(sa)
found := false
for _, e := range s.all() {
if e.name == sa.name {
if e.priority != 0 {
t.Fatalf("server newly marked good should have highest priority")
}
found = true
}
}
if !found {
t.Fatalf("what happened to endpoint A?!")
}
}

// TestClient_ServerList tests client methods that interact with the internal
// nomad server list.
func TestClient_ServerList(t *testing.T) {
// manually create a mostly empty client to avoid spinning up a ton of
// goroutines that complicate testing
client := Client{servers: newServerList(), logger: log.New(os.Stderr, "", log.Ltime|log.Lshortfile)}

if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"not-a-real-domain.fake"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"bad.fake", "127.0.0.1:1234", "127.0.0.1"}); err != nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
if len(s) != 2 {
t.Fatalf("expected 2 servers but received: %+q", s)
}
for _, host := range s {
if !strings.HasPrefix(host, "127.0.0.1:") {
t.Errorf("expected both servers to be localhost and include port but found: %s", host)
}
}
}
Loading