Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Balancing #107

Merged
merged 28 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
531 changes: 531 additions & 0 deletions balancing/balance_breaker.go

Large diffs are not rendered by default.

351 changes: 351 additions & 0 deletions balancing/balance_breaker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
package balancing

import (
"fmt"
"net/http"
"sync"
"testing"
"time"

"github.com/allegro/akubra/metrics"
"github.com/allegro/akubra/storages/config"
"github.com/stretchr/testify/require"
)

func TestResponseTimeBalancingMemberElects(t *testing.T) {
balancer := &ResponseTimeBalancer{}
member, err := balancer.Elect()
require.Error(t, err)
require.Nil(t, member)

balancer = &ResponseTimeBalancer{
Nodes: []Node{
&nodeMock{active: true},
},
}

member, err = balancer.Elect()
require.NoError(t, err)
require.NotNil(t, member)

firstNode := &nodeMock{err: fmt.Errorf("first"), time: 1, calls: 1, active: true}
secondNode := &nodeMock{err: fmt.Errorf("second"), time: 1, calls: 2, active: true}
balancer = &ResponseTimeBalancer{
Nodes: []Node{
firstNode,
secondNode,
},
}
member, err = balancer.Elect()
require.NoError(t, err)
require.Equal(t, firstNode, member)

balancer = &ResponseTimeBalancer{
Nodes: []Node{
&nodeMock{err: fmt.Errorf("first"), time: 1, calls: 1, active: false},
&nodeMock{err: fmt.Errorf("second"), time: 1, calls: 2, active: true},
},
}

member, err = balancer.Elect()
require.NoError(t, err)
require.Equal(t, secondNode, member)

balancer = &ResponseTimeBalancer{
Nodes: []Node{
&nodeMock{err: fmt.Errorf("first"), time: 1, calls: 1, active: false},
&nodeMock{err: fmt.Errorf("second"), time: 1, calls: 2, active: false},
},
}
member, err = balancer.Elect()
require.Error(t, err)
require.Equal(t, nil, member)
}

type nodeMock struct {
err error
calls float64
time float64
active bool
member interface{}
}

func (node *nodeMock) Member() interface{} {
return node.member
}

func (node *nodeMock) Calls() float64 {
return node.calls
}

func (node *nodeMock) Time() float64 {
return node.time
}

func (node *nodeMock) IsActive() bool {
return node.active
}

func (node *nodeMock) SetActive(bool) {
}

func (node *nodeMock) Update(time.Duration) {
}

func TestCallMeter(t *testing.T) {
callMeter := newCallMeter(5*time.Second, 5*time.Second)
require.Implements(t, (*Node)(nil), callMeter)

callMeter.Update(time.Millisecond)
require.Equal(t, float64(time.Millisecond), callMeter.Time(), "Time summary missmatch")
require.Equal(t, float64(1), callMeter.Calls(), "Number of calls missmatch")
}

func TestCallMeterConcurrency(t *testing.T) {
numberOfSamples := 10000
sampleDuration := time.Millisecond
waitGroup := &sync.WaitGroup{}
waitGroup.Add(numberOfSamples)
callMeter := newCallMeter(5*time.Second, 5*time.Second)
for i := 0; i < numberOfSamples; i++ {
go func() {
callMeter.Update(sampleDuration)
waitGroup.Done()
}()
}
waitGroup.Wait()
require.Equal(t, float64(numberOfSamples*int(sampleDuration)), callMeter.Time())
require.Equal(t, float64(numberOfSamples), callMeter.Calls())
}

func TestCallMeterRetention(t *testing.T) {
numberOfSamples := 100
timer := &mockTimer{
baseTime: time.Now(),
advanceDur: 100 * time.Millisecond}

callMeter := newCallMeter(5*time.Second, 1*time.Second)
callMeter.now = timer.now

for i := 0; i < numberOfSamples; i++ {
callMeter.Update(time.Millisecond)
timer.advance()
}

require.InDelta(t, float64(callMeter.resolution/timer.advanceDur), callMeter.Calls(), float64(1))
period := 2 * time.Second
require.InDelta(t, float64(period/timer.advanceDur), callMeter.CallsIn(period), float64(1))
timer.advanceDur = 2 * time.Second
timer.advance()
require.Equal(t, float64(0), callMeter.Calls())
}

type mockTimer struct {
baseTime time.Time
advanceDur time.Duration
mx sync.Mutex
}

func (timer *mockTimer) now() time.Time {
timer.mx.Lock()
defer timer.mx.Unlock()
return timer.baseTime
}

func (timer *mockTimer) advance() {
timer.mx.Lock()
defer timer.mx.Unlock()
timer.baseTime = timer.baseTime.Add(timer.advanceDur)
}

func TestHistogramRetention(t *testing.T) {
retention := 5 * time.Second
resolution := 1 * time.Second
hist := newTimeHistogram(retention, resolution)
series := hist.pickSeries(time.Now())
require.NotNil(t, series)
}

func TestBreaker(t *testing.T) {
breaker := makeTestBreaker()
require.Implements(t, (*Breaker)(nil), breaker)

breaker.Record(100*time.Millisecond, true)
require.False(t, breaker.ShouldOpen())

breaker = makeTestBreaker()
for i := 0; i < 100; i++ {
breaker.Record(1100*time.Millisecond, true)
}
require.True(t, breaker.ShouldOpen())

breaker = makeTestBreaker()
breaker.Record(1*time.Millisecond, false)
require.False(t, breaker.ShouldOpen())

breaker = makeTestBreaker()
for i := 0; i < 11; i++ {
breaker.Record(1*time.Millisecond, false)
}
require.True(t, breaker.ShouldOpen())
}

func makeTestBreaker() Breaker {
errorRate := 0.1
timeLimit := time.Second
retention := 100
timeLimitPercentile := 0.9
closeDelay := time.Second
maxDelay := 4 * time.Second
breaker := newBreaker(
retention, timeLimit, timeLimitPercentile,
errorRate, closeDelay, maxDelay)

return breaker
}

func makeTestBreakerWithTimer(now func() time.Time) Breaker {
breaker := makeTestBreaker()
nodebreaker := breaker.(*NodeBreaker)
nodebreaker.now = now
return nodebreaker
}

func TestBreakerRecoveryPeriodsProgression(t *testing.T) {
timer := &mockTimer{
baseTime: time.Now(),
advanceDur: 1000 * time.Millisecond}

breaker := makeTestBreakerWithTimer(timer.now)
openBreaker(breaker)
opentime := timer.now()
checkOpenFor(t, time.Second, breaker, timer)
require.False(t, breaker.ShouldOpen(),
fmt.Sprintf("should be in halfclosed state after %s", timer.now().Sub(opentime)))

openBreaker(breaker)
require.True(t, breaker.ShouldOpen(), fmt.Sprintf("should be in open"))

checkOpenFor(t, 2*time.Second, breaker, timer)
require.False(t, breaker.ShouldOpen())

openBreaker(breaker)
checkOpenFor(t, 4*time.Second, breaker, timer)
require.False(t, breaker.ShouldOpen())

openBreaker(breaker)
checkOpenFor(t, 4*time.Second, breaker, timer)
require.False(t, breaker.ShouldOpen())
}

func TestBreakerRecoveryPeriodsProgressionResetIfOpen(t *testing.T) {
timer := &mockTimer{
baseTime: time.Now(),
advanceDur: 1100 * time.Millisecond}

breaker := makeTestBreakerWithTimer(timer.now)
openBreaker(breaker)
checkOpenFor(t, time.Second, breaker, timer)
require.False(t, breaker.ShouldOpen())

timer.advance()
require.False(t, breaker.ShouldOpen())
openBreaker(breaker)
checkOpenFor(t, time.Second, breaker, timer)
require.False(t, breaker.ShouldOpen(), "breaker should be closed after stats reset")
}

func openBreaker(breaker Breaker) {
for i := 0; i < 11; i++ {
breaker.Record(1*time.Millisecond, false)
}
}

func checkOpenFor(t *testing.T, d time.Duration, breaker Breaker, timer *mockTimer) {
start := timer.now()
for timer.now().Sub(start) < d {
require.True(t, breaker.ShouldOpen(),
fmt.Sprintf("braker closed after %s", timer.now().Sub(start)))
timer.advance()
}
}

func TestPriorityLayersPicker(t *testing.T) {
config := config.Storages{
{
Name: "first-a",
Priority: 0,
BreakerProbeSize: 10,
BreakerErrorRate: 0.09,
BreakerTimeLimit: metrics.Interval{Duration: 500 * time.Millisecond},
BreakerTimeLimitPercentile: 0.9,
BreakerBasicCutOutDuration: metrics.Interval{Duration: time.Second},
BreakerMaxCutOutDuration: metrics.Interval{Duration: 180 * time.Second},
MeterResolution: metrics.Interval{Duration: 5 * time.Second},
MeterRetention: metrics.Interval{Duration: 10 * time.Second},
},
{
Name: "first-b",
Priority: 0,
BreakerProbeSize: 10,
BreakerErrorRate: 0.09,
BreakerTimeLimit: metrics.Interval{Duration: 500 * time.Millisecond},
BreakerTimeLimitPercentile: 0.9,
BreakerBasicCutOutDuration: metrics.Interval{Duration: time.Second},
BreakerMaxCutOutDuration: metrics.Interval{Duration: 180 * time.Second},
MeterResolution: metrics.Interval{Duration: 5 * time.Second},
MeterRetention: metrics.Interval{Duration: 10 * time.Second},
},
{
Name: "second",
Priority: 1,
BreakerProbeSize: 1000,
BreakerErrorRate: 0.1,
BreakerTimeLimit: metrics.Interval{Duration: 500 * time.Millisecond},
BreakerTimeLimitPercentile: 0.9,
BreakerBasicCutOutDuration: metrics.Interval{Duration: time.Second},
BreakerMaxCutOutDuration: metrics.Interval{Duration: 180 * time.Second},
MeterResolution: metrics.Interval{Duration: 5 * time.Second},
MeterRetention: metrics.Interval{Duration: 10 * time.Second},
},
}
errFirstStorageResponse := fmt.Errorf("Error from first-a")
errSecondStorageResponse := fmt.Errorf("Error from first-b")
errThirdStorageResponse := fmt.Errorf("Error from second-b")
backends := map[string]http.RoundTripper{
"first-a": &MockRoundTripper{err: errFirstStorageResponse},
"first-b": &MockRoundTripper{err: errSecondStorageResponse},
"second": &MockRoundTripper{err: errThirdStorageResponse},
}
balancerSet := NewBalancerPrioritySet(config, backends)
require.NotNil(t, balancerSet)

member := balancerSet.GetMostAvailable()
require.NotNil(t, member, "Member should be not nil")
require.Implements(t, (*Node)(nil), member, "Member should implement Node interface")
require.Implements(t, (*Breaker)(nil), member, "Member should implement Breaker interface")
require.Implements(t, (*http.RoundTripper)(nil), member, "Member should implement `http.RoundTripper` interface")

resp, err := member.RoundTrip(&http.Request{})

require.Equal(t, errFirstStorageResponse, err)
require.Nil(t, resp, err)

member = balancerSet.GetMostAvailable()
resp, err = member.RoundTrip(&http.Request{})
require.Equal(t, errSecondStorageResponse, err)
require.Nil(t, resp, err)

member = balancerSet.GetMostAvailable()
resp, err = member.RoundTrip(&http.Request{})
require.Equal(t, errThirdStorageResponse, err)
require.Nil(t, resp, err)
}

type MockRoundTripper struct {
err error
}

func (mrt *MockRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
return nil, mrt.err
}
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func PrepareYamlConfig(
}
clustersMap := make(storageconfig.ClustersMap)
clustersMap["cluster1test"] = storageconfig.Cluster{
Backends: []string{"default"},
Storages: storageconfig.Storages{{Name: "default"}},
}

additionalRequestHeaders := httpHandlerConfig.AdditionalHeaders{
Expand Down
4 changes: 2 additions & 2 deletions httphandler/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type AccessMessageData struct {
Path string `json:"path"`
UserAgent string `json:"useragent"`
StatusCode int `json:"status"`
Duration float64 `json:"duration"`
Duration float64 `json:"duration_ms"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't json camel case? (duration_ms)

RespErr string `json:"error"`
ReqID string `json:"reqID"`
Time string `json:"ts"`
Expand All @@ -40,7 +40,7 @@ func NewAccessLogMessage(req http.Request,
req.Host,
req.URL.Path,
req.Header.Get("User-Agent"),
statusCode, duration * 1000, respErr,
statusCode, duration, respErr,
reqID, ts}
}

Expand Down
2 changes: 1 addition & 1 deletion httphandler/roundtripper_decorators.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (lrt *loggingRoundTripper) RoundTrip(req *http.Request) (resp *http.Respons
timeStart := time.Now()
resp, err = lrt.roundTripper.RoundTrip(req)

duration := time.Since(timeStart).Seconds()
duration := time.Since(timeStart).Seconds() * 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why multiply by 1000?

statusCode := http.StatusServiceUnavailable

if resp != nil {
Expand Down
3 changes: 0 additions & 3 deletions sharding/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,3 @@ func (slm *SyncLogMethod) UnmarshalYAML(unmarshal func(interface{}) error) error
slm.Method = method
return nil
}

// HumanSizeUnits is yaml deserializer
type HumanSizeUnits = types.HumanSizeUnits
Loading