Skip to content

Commit

Permalink
Fix the bug of circuit breaker half-open state transformation when re…
Browse files Browse the repository at this point in the history
…quest is blocked by upcoming rules (alibaba#1645)

* Refactor the workflow to fix the bug that circuit breaker may remain half-open state forever when the request is blocked by upcoming rules: revert the state change in exit handler (as a temporary workaround)
* Add exit handler in Entry as a per-invocation hook.
  • Loading branch information
jasonjoo2010 authored and mastertiller committed Aug 20, 2020
1 parent 3468ad7 commit c59377b
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 153 deletions.
29 changes: 29 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
*/
package com.alibaba.csp.sentinel;

import java.util.Iterator;
import java.util.LinkedList;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.context.NullContext;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.util.function.BiConsumer;

/**
* Linked entry within current context.
Expand All @@ -35,6 +40,8 @@ class CtEntry extends Entry {

protected ProcessorSlot<Object> chain;
protected Context context;
protected LinkedList<BiConsumer<Context, Entry>> exitHandlers;


CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
Expand Down Expand Up @@ -102,10 +109,32 @@ protected void exitForContext(Context context, int count, Object... args) throws
protected void clearEntryContext() {
this.context = null;
}

@Override
public void whenComplete(BiConsumer<Context, Entry> consumer) {
if (this.exitHandlers == null) {
this.exitHandlers = new LinkedList<>();
}
this.exitHandlers.add(consumer);
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(context, count, args);

if (this.exitHandlers != null) {
Iterator<BiConsumer<Context, Entry>> it = this.exitHandlers.iterator();
BiConsumer<Context, Entry> cur;
while (it.hasNext()) {
cur = it.next();
try {
cur.accept(this.context, this);
} catch (Exception e) {
RecordLog.warn("Error invoking exit handler", e);
}
}
this.exitHandlers = null;
}

return parent;
}
Expand Down
10 changes: 10 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.util.function.BiConsumer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
Expand Down Expand Up @@ -178,4 +179,13 @@ public void setOriginNode(Node originNode) {
this.originNode = originNode;
}

/**
* Like `CompletableFuture` since JDK8 it guarantees specified consumer
* is invoked when this entry exited.
* Use it when you did some STATEFUL operations on entries.
*
* @param consumer
*/
public abstract void whenComplete(BiConsumer<Context, Entry> consumer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.spi.SpiOrder;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* A {@link ProcessorSlot} dedicates to circuit breaking.
Expand All @@ -40,18 +39,18 @@ public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(resourceWrapper);
performChecking(context, resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(ResourceWrapper r) throws BlockException {
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass()) {
if (!cb.tryPass(context, r)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
Expand All @@ -71,14 +70,9 @@ public void exit(Context context, ResourceWrapper r, int count, Object... args)
}

if (curEntry.getBlockError() == null) {
long completeTime = curEntry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - curEntry.getCreateTimestamp();
Throwable error = curEntry.getError();
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(rt, error);
circuitBreaker.onRequestComplete(context, r);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

import java.util.concurrent.atomic.AtomicReference;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.util.function.BiConsumer;

/**
* @author Eric Zhao
Expand Down Expand Up @@ -61,14 +65,14 @@ public State currentState() {
}

@Override
public boolean tryPass() {
public boolean tryPass(Context context, ResourceWrapper r) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for trial.
return retryTimeoutArrived() && fromOpenToHalfOpen();
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
Expand All @@ -91,30 +95,44 @@ protected boolean fromCloseToOpen(double snapshotValue) {
if (currentState.compareAndSet(prev, State.OPEN)) {
updateNextRetryTimestamp();

for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prev, State.OPEN, rule, snapshotValue);
}
notifyObservers(prev, State.OPEN, snapshotValue);
return true;
}
return false;
}

protected boolean fromOpenToHalfOpen() {
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null);
}
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenComplete(new BiConsumer<Context, Entry>() {

@Override
public void accept(Context context, Entry entry) {
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
return;
}

}
});
return true;
}
return false;
}

private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prevState, newState, rule, snapshotValue);
}
}

protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp();
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.HALF_OPEN, State.OPEN, rule, snapshotValue);
}
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
return true;
}
return false;
Expand All @@ -123,9 +141,7 @@ protected boolean fromHalfOpenToOpen(double snapshotValue) {
protected boolean fromHalfOpenToClose() {
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
resetStat();
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.HALF_OPEN, State.CLOSED, rule, null);
}
notifyObservers(State.HALF_OPEN, State.CLOSED, null);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;

/**
Expand All @@ -32,11 +34,13 @@ public interface CircuitBreaker {
DegradeRule getRule();

/**
* Acquires permission of an invocation only if it is available at the time of invocation.
* Acquires permission of an invocation only if it is available at the time of invoking.
*
* @param context
* @param r
* @return {@code true} if permission was acquired and {@code false} otherwise
*/
boolean tryPass();
boolean tryPass(Context context, ResourceWrapper r);

/**
* Get current state of the circuit breaker.
Expand All @@ -46,13 +50,12 @@ public interface CircuitBreaker {
State currentState();

/**
* Record a completed request with the given response time and error (if present) and
* handle state transformation of the circuit breaker.
* Called when a `passed` invocation finished.
*
* @param rt the response time of this entry
* @param error the error of this entry (if present)
* @param context context of current invocation
* @param wrapper current resource
*/
void onRequestComplete(long rt, Throwable error);
void onRequestComplete(Context context, ResourceWrapper wrapper);

/**
* Circuit breaker state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import java.util.List;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
Expand Down Expand Up @@ -60,7 +63,12 @@ protected void resetStat() {
}

@Override
public void onRequestComplete(long rt, Throwable error) {
public void onRequestComplete(Context context, ResourceWrapper r) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
Expand All @@ -74,14 +82,17 @@ private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}

if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}

List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

import java.util.List;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author Eric Zhao
Expand Down Expand Up @@ -57,8 +61,17 @@ public void resetStat() {
}

@Override
public void onRequestComplete(long rt, Throwable error) {
public void onRequestComplete(Context context, ResourceWrapper wrapper) {
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
Expand All @@ -71,7 +84,9 @@ private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}

if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.util.function;

/**
* BiConsumer interface from JDK 8.
*/
public interface BiConsumer<T, U> {

void accept(T t, U u);
}
Loading

0 comments on commit c59377b

Please sign in to comment.