Skip to content

Commit

Permalink
Refactror load balancer, decouple it from fadeIn
Browse files Browse the repository at this point in the history
Updates #2346

Signed-off-by: Roman Zavodskikh <roman.zavodskikh@zalando.de>
  • Loading branch information
Roman Zavodskikh committed Jun 12, 2023
1 parent ee19c08 commit c131ef0
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 142 deletions.
10 changes: 5 additions & 5 deletions filters/builtin/creationmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ func TestRouteCreationMetrics_reportRouteCreationTimes(t *testing.T) {
func TestRouteCreationMetrics_startTimes(t *testing.T) {
for _, tt := range []struct {
name string
route routing.Route
route *routing.Route
expected map[string]time.Time
}{
{
name: "no start time provided",
route: routing.Route{},
route: &routing.Route{},
expected: map[string]time.Time{},
},
{
name: "start time from origin marker",
route: routing.Route{Filters: []*routing.RouteFilter{
route: &routing.Route{Filters: []*routing.RouteFilter{
{Filter: &OriginMarker{Origin: "origin", Id: "config0", Created: time0}},
{Filter: &OriginMarker{Origin: "origin", Id: "config1", Created: time1}},
}},
Expand All @@ -94,9 +94,9 @@ func TestRouteCreationMetrics_startTimes(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
metrics := NewRouteCreationMetrics(&metricstest.MockMetrics{})
assert.Equal(t, tt.expected, metrics.startTimes(&tt.route))
assert.Equal(t, tt.expected, metrics.startTimes(tt.route))
//should be cached
assert.Empty(t, metrics.startTimes(&tt.route))
assert.Empty(t, metrics.startTimes(tt.route))
})
}
}
Expand Down
18 changes: 18 additions & 0 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,15 @@ func TestPostProcessor(t *testing.T) {
rt, _ := createRouting(t, routes)

foo := route(rt, "/foo")
foo.Lock()
defer foo.Unlock()
if foo == nil || foo.LBFadeInDuration != 0 {
t.Fatal("failed to preserve non-LB route")
}

bar := route(rt, "/bar")
bar.Lock()
defer bar.Unlock()
if bar == nil || bar.LBFadeInDuration != time.Minute {
t.Fatal("failed to postprocess LB route")
}
Expand All @@ -253,6 +257,8 @@ func TestPostProcessor(t *testing.T) {
}

baz := route(rt, "/baz")
baz.Lock()
defer baz.Unlock()
if baz == nil || baz.LBFadeInDuration != 0 {
t.Fatal("failed to preserve non-fade LB route")
}
Expand All @@ -265,6 +271,8 @@ func TestPostProcessor(t *testing.T) {

rt, _ := createRouting(t, routes)
r := route(rt, "/")
r.Lock()
defer r.Unlock()
if r == nil || len(r.LBEndpoints) == 0 || !r.LBEndpoints[0].Detected.IsZero() {
t.Fatal("failed to ignore invalid LB endpoint")
}
Expand All @@ -277,6 +285,8 @@ func TestPostProcessor(t *testing.T) {

rt, _ := createRouting(t, routes)
r := route(rt, "/")
r.Lock()
defer r.Unlock()
if r == nil || len(r.LBEndpoints) == 0 || !r.LBEndpoints[0].Detected.IsZero() {
t.Fatal("failed to ignore negative duration")
}
Expand All @@ -296,6 +306,8 @@ func TestPostProcessor(t *testing.T) {

update(nextRoutes)
r := route(rt, "/")
r.Lock()
defer r.Unlock()

var found bool
for _, ep := range r.LBEndpoints {
Expand Down Expand Up @@ -329,6 +341,8 @@ func TestPostProcessor(t *testing.T) {
update(initialRoutes)

r := route(rt, "/")
r.Lock()
defer r.Unlock()

var found bool
for _, ep := range r.LBEndpoints {
Expand Down Expand Up @@ -363,6 +377,8 @@ func TestPostProcessor(t *testing.T) {
update(initialRoutes)

r := route(rt, "/")
r.Lock()
defer r.Unlock()

var found bool
for _, ep := range r.LBEndpoints {
Expand Down Expand Up @@ -400,6 +416,8 @@ func TestPostProcessor(t *testing.T) {
nextRoutes := fmt.Sprintf(nextRoutesFmt, nows(t))
update(nextRoutes)
r := route(rt, "/")
r.Lock()
defer r.Unlock()

var found bool
for _, ep := range r.LBEndpoints {
Expand Down
165 changes: 28 additions & 137 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package loadbalancer
import (
"errors"
"fmt"
"math"
"math/rand"
"net/url"
"sort"
Expand Down Expand Up @@ -53,162 +52,69 @@ var (
defaultAlgorithm = newRoundRobin
)

func fadeInState(now time.Time, duration time.Duration, detected time.Time) (time.Duration, bool) {
rel := now.Sub(detected)
return rel, rel > 0 && rel < duration
}

func fadeIn(now time.Time, duration time.Duration, exponent float64, detected time.Time) float64 {
rel, fadingIn := fadeInState(now, duration, detected)
if !fadingIn {
return 1
}

return math.Pow(float64(rel)/float64(duration), exponent)
}

func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, w []float64, now time.Time) routing.LBEndpoint {
var sum float64
weightSums := w
rt := ctx.Route
ep := ctx.Route.LBEndpoints
for _, epi := range ep {
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected)
sum += wi
weightSums = append(weightSums, sum)
}

choice := ep[len(weightSums)-1]
r := rnd.Float64() * sum
for i := range weightSums {
if weightSums[i] > r {
choice = ep[i]
break
}
}

return choice
}

func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, wf []float64, now time.Time) routing.LBEndpoint {
notFadingIndexes := wi
ep := ctx.Route.LBEndpoints

// if all endpoints are fading, the simplest approach is to use the oldest,
// this departs from the desired curve, but guarantees monotonic fade-in. From
// the perspective of the oldest endpoint, this is temporarily the same as if
// there was no fade-in.
if len(notFadingIndexes) == 0 {
return shiftWeighted(rnd, ctx, wf, now)
}

// otherwise equally distribute between the old endpoints
return ep[notFadingIndexes[rnd.Intn(len(notFadingIndexes))]]
}

func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, notFadingIndexes []int, choice int, algo routing.LBAlgorithm) routing.LBEndpoint {
ep := ctx.Route.LBEndpoints
now := time.Now()
f := fadeIn(
now,
ctx.Route.LBFadeInDuration,
ctx.Route.LBFadeInExponent,
ctx.Route.LBEndpoints[choice].Detected,
)

if rnd.Float64() < f {
return ep[choice]
}
for i := 0; i < len(ep); i++ {
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Detected); !fadingIn {
notFadingIndexes = append(notFadingIndexes, i)
}
}

switch a := algo.(type) {
case *roundRobin:
return shiftToRemaining(a.rnd, ctx, notFadingIndexes, a.fadingWeights, now)
case *random:
return shiftToRemaining(a.rand, ctx, notFadingIndexes, a.fadingWeights, now)
case *consistentHash:
// If all endpoints are fading, normal consistent hash result
if len(notFadingIndexes) == 0 {
return ep[choice]
}
// otherwise calculate consistent hash again using endpoints which are not fading
return ep[a.chooseConsistentHashEndpoint(ctx, skipFadingEndpoints(notFadingIndexes))]
default:
return ep[choice]
}
}

type roundRobin struct {
mx sync.Mutex
index int
rnd *rand.Rand
notFadingIndexes []int
fadingWeights []float64
mx sync.Mutex
index int
rnd *rand.Rand
}

func newRoundRobin(endpoints []string) routing.LBAlgorithm {
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec
return &roundRobin{
index: rnd.Intn(len(endpoints)),
rnd: rnd,

// preallocating frequently used slice
notFadingIndexes: make([]int, 0, len(endpoints)),
fadingWeights: make([]float64, 0, len(endpoints)),
}
}

// Apply implements routing.LBAlgorithm with a roundrobin algorithm.
func (r *roundRobin) Apply(ctx *routing.LBContext) routing.LBEndpoint {
ctx.Route.Lock()
defer ctx.Route.Unlock()

if len(ctx.Route.LBEndpoints) == 1 {
return ctx.Route.LBEndpoints[0]
}

r.mx.Lock()
defer r.mx.Unlock()
r.index = (r.index + 1) % len(ctx.Route.LBEndpoints)

if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[r.index]
endpoints := ctx.Route.GoodEndpoints
if len(endpoints) == 0 {
endpoints = ctx.Route.LBEndpoints
}
r.index = (r.index + 1) % len(endpoints)

return withFadeIn(r.rnd, ctx, r.notFadingIndexes, r.index, r)
return endpoints[r.index]
}

type random struct {
rand *rand.Rand
notFadingIndexes []int
fadingWeights []float64
rand *rand.Rand
}

func newRandom(endpoints []string) routing.LBAlgorithm {
t := time.Now().UnixNano()
// #nosec
return &random{
rand: rand.New(rand.NewSource(t)),

// preallocating frequently used slice
notFadingIndexes: make([]int, 0, len(endpoints)),
fadingWeights: make([]float64, 0, len(endpoints)),
}
}

// Apply implements routing.LBAlgorithm with a stateless random algorithm.
func (r *random) Apply(ctx *routing.LBContext) routing.LBEndpoint {
ctx.Route.Lock()
defer ctx.Route.Unlock()

if len(ctx.Route.LBEndpoints) == 1 {
return ctx.Route.LBEndpoints[0]
}

i := r.rand.Intn(len(ctx.Route.LBEndpoints))
if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[i]
endpoints := ctx.Route.GoodEndpoints
if len(endpoints) == 0 {
endpoints = ctx.Route.LBEndpoints
}
i := r.rand.Intn(len(endpoints))

return withFadeIn(r.rand, ctx, r.notFadingIndexes, i, r)
return endpoints[i]
}

type (
Expand All @@ -217,9 +123,9 @@ type (
hash uint64 // hash of endpoint
}
consistentHash struct {
hashRing []endpointHash // list of endpoints sorted by hash value
rand *rand.Rand
notFadingIndexes []int
hashRing []endpointHash // list of endpoints sorted by hash value
rand *rand.Rand
allHosts map[string]struct{}
}
)

Expand All @@ -232,15 +138,16 @@ func (ch *consistentHash) Swap(i, j int) {
func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routing.LBAlgorithm {
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec
ch := &consistentHash{
hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)),
rand: rnd,
notFadingIndexes: make([]int, 0, len(endpoints)),
hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)),
rand: rnd,
allHosts: map[string]struct{}{},
}
for i, ep := range endpoints {
endpointStartIndex := hashesPerEndpoint * i
for j := 0; j < hashesPerEndpoint; j++ {
ch.hashRing[endpointStartIndex+j] = endpointHash{i, hash(fmt.Sprintf("%s-%d", ep, j))}
}
ch.allHosts[ep] = struct{}{}
}
sort.Sort(ch)
return ch
Expand Down Expand Up @@ -310,12 +217,7 @@ func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
}

choice := ch.chooseConsistentHashEndpoint(ctx, noSkippedEndpoints)

if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[choice]
}

return withFadeIn(ch.rand, ctx, ch.notFadingIndexes, choice, ch)
return ctx.Route.LBEndpoints[choice]
}

func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, skipEndpoint func(int) bool) int {
Expand All @@ -334,17 +236,6 @@ func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, s
return choice
}

func skipFadingEndpoints(notFadingEndpoints []int) func(int) bool {
return func(i int) bool {
for _, notFadingEndpoint := range notFadingEndpoints {
if i == notFadingEndpoint {
return false
}
}
return true
}
}

func noSkippedEndpoints(_ int) bool {
return false
}
Expand Down
1 change: 1 addition & 0 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func testFadeIn(
Host: eps[i],
Detected: detectionTimes[i],
})
ctx.Route.GoodEndpoints = ctx.Route.LBEndpoints
}

rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down
Loading

0 comments on commit c131ef0

Please sign in to comment.