Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix Circuit Breaker half-open error handling #1811

Merged
merged 8 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions internal/circuitbreaker/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
)

type breaker struct {
key string // breaker key for logging
count atomic.Value // type: *count
tripped int32 // tripped flag. when flag value is 1, breaker state is "Open" or "HalfOpen".
key string // breaker key for logging
count *count // type: *count
tripped int32 // tripped flag. when flag value is 1, breaker state is "Open" or "HalfOpen".

closedErrRate float32
closedErrShouldTrip Tripper
Expand All @@ -41,7 +41,8 @@ type breaker struct {

func newBreaker(key string, opts ...BreakerOption) (*breaker, error) {
b := &breaker{
key: key,
key: key,
count: &count{},
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
}
for _, opt := range append(defaultBreakerOpts, opts...) {
if err := opt(b); err != nil {
Expand All @@ -54,7 +55,6 @@ func newBreaker(key string, opts ...BreakerOption) (*breaker, error) {
log.Warn(oerr)
}
}
b.count.Store(&count{})

if b.closedErrShouldTrip == nil {
b.closedErrShouldTrip = NewRateTripper(b.closedErrRate, b.minSamples)
Expand All @@ -69,6 +69,7 @@ func newBreaker(key string, opts ...BreakerOption) (*breaker, error) {
// If the current breaker state is "Open", this function returns ErrCircuitBreakerOpenState.
func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val interface{}, err error)) (val interface{}, st State, err error) {
if st, err := b.isReady(); err != nil {
b.count.onIgnore()
return nil, st, err
}
val, err = fn(ctx)
Expand Down Expand Up @@ -107,40 +108,36 @@ func (b *breaker) isReady() (st State, err error) {

// For flow control in the "Half-Open" state. It is limited to 50%.
// If this modulo is used, 1/2 of the requests will be error. And if an error occurs, mark as failures.
cnt := b.count.Load().(*count)
if cnt.Total()%2 == 0 {
cnt.onFail()
if b.count.Total()%2 == 0 {
return st, errors.ErrCircuitBreakerHalfOpenFlowLimitation
}
}
return st, nil
}

func (b *breaker) success() {
cnt := b.count.Load().(*count)
cnt.onSuccess()
b.count.onSuccess()

// halfOpenErrShouldTrip.ShouldTrip returns true when the sum of the number of successes and failures is greater than the b.minSamples and when the error rate is greater than the b.halfOpenErrRate.
// In other words, if the error rate is less than the b.halfOpenErrRate, it can be judged that the success rate is high, so this function change to the "Close" state from "Half-Open".
if st := b.currentState(); st == StateHalfOpen &&
cnt.Total() >= b.minSamples &&
!b.halfOpenErrShouldTrip.ShouldTrip(cnt) {
b.count.Successes()+b.count.Fails() >= b.minSamples &&
!b.halfOpenErrShouldTrip.ShouldTrip(b.count) {
log.Infof("the operation succeeded, circuit breaker state for '%s' changed,\tfrom: %s, to: %s", b.key, st.String(), StateClosed.String())
b.reset()
}
}

func (b *breaker) fail() {
cnt := b.count.Load().(*count)
cnt.onFail()
b.count.onFail()

var ok bool
var st State
switch st = b.currentState(); st {
case StateHalfOpen:
ok = b.halfOpenErrShouldTrip.ShouldTrip(cnt)
ok = b.halfOpenErrShouldTrip.ShouldTrip(b.count)
case StateClosed:
ok = b.closedErrShouldTrip.ShouldTrip(cnt)
ok = b.closedErrShouldTrip.ShouldTrip(b.count)
default:
return
}
Expand Down Expand Up @@ -172,14 +169,14 @@ func (b *breaker) reset() {
atomic.StoreInt32(&b.tripped, 0)
atomic.StoreInt64(&b.openExp, 0)
atomic.StoreInt64(&b.closedRefreshExp, time.Now().Add(b.cloedRefreshTimeout).UnixNano())
b.count.Load().(*count).reset()
b.count.reset()
}

func (b *breaker) trip() {
atomic.StoreInt32(&b.tripped, 1)
atomic.StoreInt64(&b.openExp, time.Now().Add(b.openTimeout).UnixNano())
atomic.StoreInt64(&b.closedRefreshExp, 0)
b.count.Load().(*count).reset()
b.count.reset()
}

func (b *breaker) isTripped() (ok bool) {
Expand Down
Loading