Skip to content

Commit

Permalink
Bugfix Circuit Breaker half-open error handling (#1811)
Browse files Browse the repository at this point in the history
* bugfix cb half-open error handling

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* refactor logic

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* add fail function test

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* add ignores counter for internal state error

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix counter type from atomic to pure pointer

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix counter interface

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* make format

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>
Co-authored-by: Kiichiro YUKAWA <kyukawa315@gmail.com>
  • Loading branch information
hlts2 and vankichi committed Oct 19, 2022
1 parent 1b98802 commit e9d86ef
Show file tree
Hide file tree
Showing 4 changed files with 525 additions and 129 deletions.
33 changes: 15 additions & 18 deletions internal/circuitbreaker/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
)

type breaker struct {
key string // breaker key for logging
count atomic.Value // type: *count
tripped int32 // tripped flag. when flag value is 1, breaker state is "Open" or "HalfOpen".
key string // breaker key for logging
count *count // type: *count
tripped int32 // tripped flag. when flag value is 1, breaker state is "Open" or "HalfOpen".

closedErrRate float32
closedErrShouldTrip Tripper
Expand All @@ -41,7 +41,8 @@ type breaker struct {

func newBreaker(key string, opts ...BreakerOption) (*breaker, error) {
b := &breaker{
key: key,
key: key,
count: &count{},
}
for _, opt := range append(defaultBreakerOpts, opts...) {
if err := opt(b); err != nil {
Expand All @@ -54,7 +55,6 @@ func newBreaker(key string, opts ...BreakerOption) (*breaker, error) {
log.Warn(oerr)
}
}
b.count.Store(&count{})

if b.closedErrShouldTrip == nil {
b.closedErrShouldTrip = NewRateTripper(b.closedErrRate, b.minSamples)
Expand All @@ -69,6 +69,7 @@ func newBreaker(key string, opts ...BreakerOption) (*breaker, error) {
// If the current breaker state is "Open", this function returns ErrCircuitBreakerOpenState.
func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val interface{}, err error)) (val interface{}, st State, err error) {
if st, err := b.isReady(); err != nil {
b.count.onIgnore()
return nil, st, err
}
val, err = fn(ctx)
Expand Down Expand Up @@ -107,40 +108,36 @@ func (b *breaker) isReady() (st State, err error) {

// For flow control in the "Half-Open" state. It is limited to 50%.
// If this modulo is used, 1/2 of the requests will be error. And if an error occurs, mark as failures.
cnt := b.count.Load().(*count)
if cnt.Total()%2 == 0 {
cnt.onFail()
if b.count.Total()%2 == 0 {
return st, errors.ErrCircuitBreakerHalfOpenFlowLimitation
}
}
return st, nil
}

func (b *breaker) success() {
cnt := b.count.Load().(*count)
cnt.onSuccess()
b.count.onSuccess()

// halfOpenErrShouldTrip.ShouldTrip returns true when the sum of the number of successes and failures is greater than the b.minSamples and when the error rate is greater than the b.halfOpenErrRate.
// In other words, if the error rate is less than the b.halfOpenErrRate, it can be judged that the success rate is high, so this function change to the "Close" state from "Half-Open".
if st := b.currentState(); st == StateHalfOpen &&
cnt.Total() >= b.minSamples &&
!b.halfOpenErrShouldTrip.ShouldTrip(cnt) {
b.count.Successes()+b.count.Fails() >= b.minSamples &&
!b.halfOpenErrShouldTrip.ShouldTrip(b.count) {
log.Infof("the operation succeeded, circuit breaker state for '%s' changed,\tfrom: %s, to: %s", b.key, st.String(), StateClosed.String())
b.reset()
}
}

func (b *breaker) fail() {
cnt := b.count.Load().(*count)
cnt.onFail()
b.count.onFail()

var ok bool
var st State
switch st = b.currentState(); st {
case StateHalfOpen:
ok = b.halfOpenErrShouldTrip.ShouldTrip(cnt)
ok = b.halfOpenErrShouldTrip.ShouldTrip(b.count)
case StateClosed:
ok = b.closedErrShouldTrip.ShouldTrip(cnt)
ok = b.closedErrShouldTrip.ShouldTrip(b.count)
default:
return
}
Expand Down Expand Up @@ -172,14 +169,14 @@ func (b *breaker) reset() {
atomic.StoreInt32(&b.tripped, 0)
atomic.StoreInt64(&b.openExp, 0)
atomic.StoreInt64(&b.closedRefreshExp, time.Now().Add(b.cloedRefreshTimeout).UnixNano())
b.count.Load().(*count).reset()
b.count.reset()
}

func (b *breaker) trip() {
atomic.StoreInt32(&b.tripped, 1)
atomic.StoreInt64(&b.openExp, time.Now().Add(b.openTimeout).UnixNano())
atomic.StoreInt64(&b.closedRefreshExp, 0)
b.count.Load().(*count).reset()
b.count.reset()
}

func (b *breaker) isTripped() (ok bool) {
Expand Down
Loading

0 comments on commit e9d86ef

Please sign in to comment.