Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Balancing #107

Merged
merged 28 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions balancing/balance_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (h *histogram) unshiftData(now time.Time) {
h.growSeries()
}

// Breaker is interface od citcuit breaker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of

type Breaker interface {
Record(duration time.Duration, success bool) bool
ShouldOpen() bool
Expand All @@ -252,6 +253,7 @@ func newBreaker(retention int, timeLimit time.Duration,
}
}

// NodeBreaker is implementation of Breaker interface
type NodeBreaker struct {
rate float64
limit time.Duration
Expand All @@ -265,6 +267,7 @@ type NodeBreaker struct {
state *openStateTracker
}

// Record collects call data and returns bool if breaker should be open
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open -> opened

func (breaker *NodeBreaker) Record(duration time.Duration, success bool) bool {
breaker.timeData.Add(float64(duration))
successValue := float64(1)
Expand All @@ -275,6 +278,7 @@ func (breaker *NodeBreaker) Record(duration time.Duration, success bool) bool {
return breaker.ShouldOpen()
}

// ShouldOpen checks if breaker should be open
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

func (breaker *NodeBreaker) ShouldOpen() bool {
exceeded := breaker.limitsExceeded()
if breaker.state != nil {
Expand Down Expand Up @@ -348,12 +352,14 @@ type lenLimitCounter struct {
mx sync.Mutex
mjarco marked this conversation as resolved.
Show resolved Hide resolved
}

// Add acumates new values
func (counter *lenLimitCounter) Add(value float64) {
index := counter.nextIdx
counter.values[index] = value
counter.nextIdx = (counter.nextIdx + 1) % cap(counter.values)
}

// Sum returns sum of valuesś
mjarco marked this conversation as resolved.
Show resolved Hide resolved
func (counter *lenLimitCounter) Sum() float64 {
sum := float64(0)
for _, v := range counter.values {
Expand All @@ -362,6 +368,7 @@ func (counter *lenLimitCounter) Sum() float64 {
return sum
}

// Percentile return value for given percentile
func (counter *lenLimitCounter) Percentile(percentile float64) float64 {
snapshot := make([]float64, len(counter.values))
copy(snapshot, counter.values)
Expand Down Expand Up @@ -442,26 +449,28 @@ func (tracker *openStateTracker) currentState(now time.Time, limitsExceeded bool
return tracker.state, changed
}

// MeasuredStorage coordinates metrics collection
type MeasuredStorage struct {
Node
Breaker
http.RoundTripper
Name string
}

// RoundTrip implements http.RoundTripper
func (ms *MeasuredStorage) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := ms.RoundTripper.RoundTrip(req)
duration := time.Since(start)
success := backend.IsSuccessful(resp, err)
open := ms.Breaker.Record(duration, success)
fmt.Printf("Updated %s with success %t, and it's open %t\n", ms.Name, success, open)
ms.Node.Update(duration)
ms.Node.SetActive(!open)
return resp, err
}

func NewBalancerPrioritySet(storagesConfig config.Storages, backends map[string]http.RoundTripper) BalancerPrioritySet {
// NewBalancerPrioritySet configures prioritized balancers stack
func NewBalancerPrioritySet(storagesConfig config.Storages, backends map[string]http.RoundTripper) *BalancerPrioritySet {
priorities := make([]int, 0)
priotitiesFilter := make(map[int]struct{})
priorityStorage := make(map[int][]*MeasuredStorage)
Expand Down Expand Up @@ -492,7 +501,7 @@ func NewBalancerPrioritySet(storagesConfig config.Storages, backends map[string]
priorityStorage[storageConfig.Priority], mstorage)
}
sort.Ints(priorities)
bps := BalancerPrioritySet{balancers: []*ResponseTimeBalancer{}}
bps := &BalancerPrioritySet{balancers: []*ResponseTimeBalancer{}}
for _, key := range priorities {
nodes := make([]Node, 0)
for _, node := range priorityStorage[key] {
Expand All @@ -504,10 +513,12 @@ func NewBalancerPrioritySet(storagesConfig config.Storages, backends map[string]
return bps
}

// BalancerPrioritySet selects storage by priority and availability
type BalancerPrioritySet struct {
balancers []*ResponseTimeBalancer
}

// GetMostAvailable returns balancer member
func (bps *BalancerPrioritySet) GetMostAvailable() *MeasuredStorage {
for _, balancer := range bps.balancers {
node, err := balancer.Elect()
Expand Down
44 changes: 15 additions & 29 deletions balancing/balance_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,6 @@ func TestBreakerRecoveryPeriodsProgressionResetIfOpen(t *testing.T) {
require.False(t, breaker.ShouldOpen(), "breaker should be closed after stats reset")
}

// func TestConcurentBreakerCalls(t *testing.T) {
// timer := &mockTimer{
// baseTime: time.Now(),
// advanceDur: 1000 * time.Millisecond}

// breaker := makeTestBreakerWithTimer(timer.now)
// asyncOpenBreaker(breaker)
// }

func openBreaker(breaker Breaker) {
for i := 0; i < 11; i++ {
breaker.Record(1*time.Millisecond, false)
Expand All @@ -286,36 +277,36 @@ func TestPriorityLayersPicker(t *testing.T) {
Priority: 0,
BreakerProbeSize: 10,
BreakerErrorRate: 0.09,
BreakerTimeLimit: metrics.Interval{500 * time.Millisecond},
BreakerTimeLimit: metrics.Interval{Duration: 500 * time.Millisecond},
BreakerTimeLimitPercentile: 0.9,
BreakerBasicCutOutDuration: metrics.Interval{time.Second},
BreakerMaxCutOutDuration: metrics.Interval{180 * time.Second},
MeterResolution: metrics.Interval{5 * time.Second},
MeterRetention: metrics.Interval{10 * time.Second},
BreakerBasicCutOutDuration: metrics.Interval{Duration: time.Second},
BreakerMaxCutOutDuration: metrics.Interval{Duration: 180 * time.Second},
MeterResolution: metrics.Interval{Duration: 5 * time.Second},
MeterRetention: metrics.Interval{Duration: 10 * time.Second},
},
{
Name: "first-b",
Priority: 0,
BreakerProbeSize: 10,
BreakerErrorRate: 0.09,
BreakerTimeLimit: metrics.Interval{500 * time.Millisecond},
BreakerTimeLimit: metrics.Interval{Duration: 500 * time.Millisecond},
BreakerTimeLimitPercentile: 0.9,
BreakerBasicCutOutDuration: metrics.Interval{time.Second},
BreakerMaxCutOutDuration: metrics.Interval{180 * time.Second},
MeterResolution: metrics.Interval{5 * time.Second},
MeterRetention: metrics.Interval{10 * time.Second},
BreakerBasicCutOutDuration: metrics.Interval{Duration: time.Second},
BreakerMaxCutOutDuration: metrics.Interval{Duration: 180 * time.Second},
MeterResolution: metrics.Interval{Duration: 5 * time.Second},
MeterRetention: metrics.Interval{Duration: 10 * time.Second},
},
{
Name: "second",
Priority: 1,
BreakerProbeSize: 1000,
BreakerErrorRate: 0.1,
BreakerTimeLimit: metrics.Interval{500 * time.Millisecond},
BreakerTimeLimit: metrics.Interval{Duration: 500 * time.Millisecond},
BreakerTimeLimitPercentile: 0.9,
BreakerBasicCutOutDuration: metrics.Interval{time.Second},
BreakerMaxCutOutDuration: metrics.Interval{180 * time.Second},
MeterResolution: metrics.Interval{5 * time.Second},
MeterRetention: metrics.Interval{10 * time.Second},
BreakerBasicCutOutDuration: metrics.Interval{Duration: time.Second},
BreakerMaxCutOutDuration: metrics.Interval{Duration: 180 * time.Second},
MeterResolution: metrics.Interval{Duration: 5 * time.Second},
MeterRetention: metrics.Interval{Duration: 10 * time.Second},
},
}
errFirstStorageResponse := fmt.Errorf("Error from first-a")
Expand All @@ -328,7 +319,6 @@ func TestPriorityLayersPicker(t *testing.T) {
}
balancerSet := NewBalancerPrioritySet(config, backends)
require.NotNil(t, balancerSet)
fmt.Println("First expected")

member := balancerSet.GetMostAvailable()
require.NotNil(t, member, "Member should be not nil")
Expand All @@ -340,14 +330,12 @@ func TestPriorityLayersPicker(t *testing.T) {

require.Equal(t, errFirstStorageResponse, err)
require.Nil(t, resp, err)
fmt.Println("Second expected")

member = balancerSet.GetMostAvailable()
resp, err = member.RoundTrip(&http.Request{})
require.Equal(t, errSecondStorageResponse, err)
require.Nil(t, resp, err)

fmt.Println("Third expected")
member = balancerSet.GetMostAvailable()
resp, err = member.RoundTrip(&http.Request{})
require.Equal(t, errThirdStorageResponse, err)
Expand All @@ -361,5 +349,3 @@ type MockRoundTripper struct {
func (mrt *MockRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
return nil, mrt.err
}

var ErrMockResponse = fmt.Errorf("Mock error")
41 changes: 0 additions & 41 deletions balancing/balancer.go

This file was deleted.

77 changes: 0 additions & 77 deletions balancing/balancer_test.go

This file was deleted.

64 changes: 0 additions & 64 deletions balancing/breaker.go

This file was deleted.

Loading