Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Balancing #107

Merged
merged 28 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions balancing/balance_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (meter *CallMeter) Calls() float64 {

// CallsInLastPeriod returns number of calls in last duration
func (meter *CallMeter) CallsInLastPeriod(period time.Duration) float64 {
lastPeriodSeries := meter.histogram.pickLastSeries(period)
lastPeriodSeries := meter.histogram.PickLastSeries(period)
sum := float64(0)
now := meter.now()
for _, series := range lastPeriodSeries {
Expand Down Expand Up @@ -146,7 +146,7 @@ func (meter *CallMeter) SetActive(active bool) {

// TimeSpent returns float64 repesentation of time spent in execution
func (meter *CallMeter) TimeSpent() float64 {
Copy link
Contributor

@wookie41 wookie41 Nov 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this be too expensive to calculate?
It happens every time we want to know the node's weight, which is basically every call. If there are a lot rps, then each call we have to traverse each series in a histogram for all of the active nodes to elect a node.
We could simply aggregate this instead of calculating.

allSeries := meter.histogram.pickLastSeries(meter.resolution)
allSeries := meter.histogram.PickLastSeries(meter.resolution)
sum := float64(0)
now := meter.now()

Expand All @@ -170,14 +170,12 @@ func (series *dataSeries) Add(value float64, dateTime time.Time) {
series.data = append(series.data, &timeValue{dateTime, value})
}

func (series *dataSeries) ValueRangeFun(timeStart, timeEnd time.Time, fun func(*timeValue)) []float64 {
dataRange := []float64{}
func (series *dataSeries) ValueRangeFun(timeStart, timeEnd time.Time, fun func(*timeValue)) {
for _, timeVal := range series.data {
if (timeStart == timeVal.date || timeStart.Before(timeVal.date)) && timeEnd.After(timeVal.date) {
fun(timeVal)
}
}
return dataRange
}

func (series *dataSeries) ValueRange(timeStart, timeEnd time.Time) []float64 {
Expand Down Expand Up @@ -226,7 +224,8 @@ func (h *histogram) pickSeries(at time.Time) *dataSeries {
return h.data[idx]
}

func (h *histogram) pickLastSeries(period time.Duration) []*dataSeries {
// PickLastSeries returns slice of dataSeries tracking at least given period of time
func (h *histogram) PickLastSeries(period time.Duration) []*dataSeries {
h.mx.Lock()
defer h.mx.Unlock()
if period > h.retention {
Expand Down Expand Up @@ -294,7 +293,7 @@ func newBreaker(retention int, callTimeLimit time.Duration,
closeDelay, maxDelay time.Duration) Breaker {
return &NodeBreaker{
timeData: newLenLimitCounter(retention),
successData: newLenLimitCounter(retention),
failures: newLenLimitCounter(retention),
rate: errorRate,
callTimeLimit: callTimeLimit,
timeLimitPercentile: timeLimitPercentile,
Expand All @@ -310,25 +309,25 @@ type NodeBreaker struct {
callTimeLimit time.Duration
timeLimitPercentile float64
timeData *lengthDelimitedCounter
successData *lengthDelimitedCounter
failures *lengthDelimitedCounter
now func() time.Time
closeDelay time.Duration
maxDelay time.Duration
state *openStateTracker
}

// Record collects call data and returns bool if breaker should be open
// Record collects call data and returns bool if breaker should be opened
func (breaker *NodeBreaker) Record(duration time.Duration, success bool) bool {
breaker.timeData.Add(float64(duration))
successValue := float64(1)
failValue := float64(1)
if success {
successValue = float64(0)
failValue = float64(0)
}
breaker.successData.Add(successValue)
breaker.failures.Add(failValue)
return breaker.ShouldOpen()
}

// ShouldOpen checks if breaker should be open
// ShouldOpen checks if breaker should be opened
func (breaker *NodeBreaker) ShouldOpen() bool {
exceeded := breaker.limitsExceeded()
if breaker.state != nil {
Expand Down Expand Up @@ -368,7 +367,7 @@ func (breaker *NodeBreaker) limitsExceeded() bool {
percentile := breaker.timeData.Percentile(breaker.timeLimitPercentile)
if percentile > float64(breaker.callTimeLimit) {
breaker.openBreaker()
log.Debugf("Breaker: time percentile exceeded %f", percentile)
log.Debugf("Breaker: time percentile exceeded %f / %f", percentile, float64(breaker.callTimeLimit))
return true
}
return false
Expand All @@ -384,12 +383,12 @@ func (breaker *NodeBreaker) openBreaker() {

func (breaker *NodeBreaker) reset() {
breaker.timeData.Reset()
breaker.successData.Reset()
breaker.failures.Reset()
}

func (breaker *NodeBreaker) errorRate() float64 {
sum := breaker.successData.Sum()
count := float64(len(breaker.successData.values))
sum := breaker.failures.Sum()
count := float64(len(breaker.failures.values))
return sum / count
}

Expand Down Expand Up @@ -522,24 +521,28 @@ func (ms *MeasuredStorage) RoundTrip(req *http.Request) (*http.Response, error)
log.Debugf("MeasuredStorage %s: Got request id %s\n", ms.Name, reqID)
resp, err := ms.RoundTripper.RoundTrip(req)
duration := time.Since(start)
success := backend.IsSuccessful(resp, err)
success := backendSuccess(resp, err)
open := ms.Breaker.Record(duration, success)
log.Debugf("MeasuredStorage %s: Request %s took %s was successful: %t, opened breaker %t\n", ms.Name, reqID, duration, success, open)
log.Debugf("s %s: Request %s took %s was successful: %t, opened breaker %t\n", ms.Name, reqID, duration, success, open)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You replaced MeasuredStorage with s in the log message. Is this intentional?


ms.Node.UpdateTimeSpent(duration)
ms.Node.SetActive(!open)
raportMetrics(ms.RoundTripper, start, open)
reportMetrics(ms.RoundTripper, start, open)
return resp, err
}

func backendSuccess(response *http.Response, err error) bool {
return err == nil && response != nil && response.StatusCode < 500
}

// IsActive checks Breaker status propagates it to Node compound
func (ms *MeasuredStorage) IsActive() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't like the fact that the method's name hides what it's actually doing and you have to take a look at the doc to know the whole truth.

isActive := !ms.Breaker.ShouldOpen()
ms.Node.SetActive(isActive)
return ms.Node.IsActive()
}

func raportMetrics(rt http.RoundTripper, since time.Time, open bool) {
func reportMetrics(rt http.RoundTripper, since time.Time, open bool) {
if b, ok := rt.(*backend.Backend); ok {
prefix := fmt.Sprintf("reqs.backend.%s.balancer", b.Name)
metrics.UpdateSince(prefix+".duration", since)
Expand Down
23 changes: 14 additions & 9 deletions balancing/balance_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,34 @@ func TestCallMeterConcurrency(t *testing.T) {
sampleDuration := time.Millisecond
waitGroup := &sync.WaitGroup{}
waitGroup.Add(numberOfSamples)
callMeter := newCallMeter(5*time.Second, 5*time.Second)
clockAdvance := time.Microsecond
timer := &mockTimer{
baseTime: time.Now(),
advanceDur: clockAdvance}
retention := 5 * time.Second
resolution := 5 * time.Second
callMeter := newCallMeterWithTimer(retention, resolution, timer.now)

for i := 0; i < numberOfSamples; i++ {
go func() {
callMeter.UpdateTimeSpent(sampleDuration)
timer.advance()
waitGroup.Done()
}()
}
waitGroup.Wait()
require.Equal(t, float64(numberOfSamples*int(sampleDuration)), callMeter.TimeSpent())
require.Equal(t, float64(numberOfSamples), callMeter.Calls())
require.Equal(t, float64(numberOfSamples*int(sampleDuration)), callMeter.TimeSpent(), "Time spent missmatch")
require.Equal(t, float64(numberOfSamples), callMeter.Calls(), "Number of calls missmatch")
}

func TestCallMeterRetention(t *testing.T) {
numberOfSamples := 100
timer := &mockTimer{
baseTime: time.Now(),
advanceDur: 100 * time.Millisecond}

callMeter := newCallMeter(5*time.Second, 1*time.Second)
callMeter.now = timer.now
callMeter.histogram.now = timer.now
retention := 5 * time.Second
resolution := time.Second
callMeter := newCallMeterWithTimer(retention, resolution, timer.now)

for i := 0; i < numberOfSamples; i++ {
callMeter.UpdateTimeSpent(time.Millisecond)
Expand Down Expand Up @@ -196,9 +203,7 @@ func TestCallMeterNoActivity(t *testing.T) {
require.Equal(t, expectedCalls, callMeter.Calls())
require.Equal(t, expectedTime, callMeter.TimeSpent())
timer.baseTime = timer.baseTime.Add(6*resolution + time.Second)
fmt.Println("Time shifted", timer.baseTime)
require.Equal(t, float64(0), callMeter.TimeSpent())
fmt.Println("Update time spent after delay")
callMeter.UpdateTimeSpent(timeSpent)
timer.baseTime = timer.baseTime.Add(time.Millisecond)
require.Equal(t, float64(timeSpent), callMeter.TimeSpent())
Expand Down
1 change: 0 additions & 1 deletion config/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (c *YamlConfig) validateRegionCluster(policyName string, policies confregio
}

for _, policy := range policies.Shards {
println("dupa")
fmt.Printf("sharding policies %v\n", c.Shards)
_, exists := c.Shards[policy.ShardName]
if !exists {
Expand Down
62 changes: 43 additions & 19 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import:
- package: github.com/serialx/hashring
- package: github.com/sirupsen/logrus
version: ^1.0.3

- package: golang.org/x/sync
subpackages:
- syncmap
Expand All @@ -24,13 +23,15 @@ import:
- package: gopkg.in/tylerb/graceful.v1
version: ^1.2.15
- package: gopkg.in/yaml.v2
- package: github.com/bnogas/minio-go/
- package: github.com/bnogas/minio-go
version: verify_signature
subpackages:
- s3signer
- package: github.com/hashicorp/consul
subpackages:
- api
- package: github.com/jinzhu/gorm
version: ^1.9.2
testImport:
- package: github.com/DATA-DOG/go-sqlmock
version: ^1.2.0
Expand Down
2 changes: 1 addition & 1 deletion storages/shardclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *ShardClient) balancerRoundTrip(req *http.Request) (resp *http.Response,
for node := c.balancer.GetMostAvailable(notFoundNodes...); node != nil; node = c.balancer.GetMostAvailable(notFoundNodes...) {
log.Printf("Balancer roundTrip node loop %s %s", node.Name, reqID)
if node == nil {
return nil, fmt.Errorf("no avialable node")
return nil, fmt.Errorf("no available node")
}
resp, err = node.RoundTrip(req)
if (resp == nil && err != balancing.ErrNoActiveNodes) || resp.StatusCode == http.StatusNotFound {
Expand Down
23 changes: 0 additions & 23 deletions vendor/github.com/stretchr/testify/Godeps/Godeps.json

This file was deleted.

5 changes: 0 additions & 5 deletions vendor/github.com/stretchr/testify/Godeps/Readme

This file was deleted.

Loading