Skip to content

Commit

Permalink
Add default circuit breaker rule support (#2232)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwen5 authored Nov 8, 2022
1 parent 8de6cb9 commit 061371b
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.alibaba.csp.sentinel.slots.block.degrade;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ResponseTimeCircuitBreaker;
import com.alibaba.csp.sentinel.util.StringUtil;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* @author wuwen
*/
public class DefaultDegradeRuleManager {

public static final String DEFAULT_KEY = "*";

private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new ConcurrentHashMap<>();
private static volatile Set<DegradeRule> rules = new HashSet<>();

private static final DefaultDegradeRuleManager.RulePropertyListener LISTENER = new DefaultDegradeRuleManager.RulePropertyListener();
private static SentinelProperty<List<DegradeRule>> currentProperty
= new DynamicSentinelProperty<>();

static {
currentProperty.addListener(LISTENER);
}


static List<CircuitBreaker> getDefaultCircuitBreakers(String resourceName) {
List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.circuitBreakers.get(resourceName);
if (circuitBreakers == null && !rules.isEmpty()) {
return DefaultDegradeRuleManager.circuitBreakers.computeIfAbsent(resourceName,
r -> rules.stream().map(DefaultDegradeRuleManager::newCircuitBreakerFrom).collect(Collectors.toList()));
}
return circuitBreakers;
}

/**
* Load {@link DegradeRule}s, former rules will be replaced.
*
* @param rules new rules to load.
*/
public static void loadRules(List<DegradeRule> rules) {
try {
currentProperty.updateValue(rules);
} catch (Throwable e) {
RecordLog.error("[DefaultDegradeRuleManager] Unexpected error when loading degrade rules", e);
}
}

public static boolean isValidDefaultRule(DegradeRule rule) {
if (!DegradeRuleManager.isValidRule(rule)) {
return false;
}

return rule.getResource().equals(DEFAULT_KEY);
}

/**
* Create a circuit breaker instance from provided circuit breaking rule.
*
* @param rule a valid circuit breaking rule
* @return new circuit breaker based on provided rule; null if rule is invalid or unsupported type
*/
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
switch (rule.getGrade()) {
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}


private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {

private synchronized void reloadFrom(List<DegradeRule> list) {

if (list == null) {
return;
}

Set<DegradeRule> rules = new HashSet<>();
List<CircuitBreaker> cbs = new ArrayList<>();

for (DegradeRule rule : list) {
if (!isValidDefaultRule(rule)) {
RecordLog.warn("[DefaultDegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
} else {

if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
CircuitBreaker cb = newCircuitBreakerFrom(rule);
cbs.add(cb);
rules.add(rule);
}
}

Map<String, List<CircuitBreaker>> cbMap = new ConcurrentHashMap<>(8);

DefaultDegradeRuleManager.circuitBreakers.forEach((k, v) -> cbMap.put(k, cbs));

DefaultDegradeRuleManager.rules = rules;
DefaultDegradeRuleManager.circuitBreakers = cbMap;
}

@Override
public void configUpdate(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DefaultDegradeRuleManager] Degrade rules has been updated to: {}", rules);
}

@Override
public void configLoad(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DefaultDegradeRuleManager] Degrade rules loaded: {}", rules);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.alibaba.csp.sentinel.slots.block.degrade;

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.spi.Spi;

import java.util.List;

/**
* @author wuwen
*/
@Spi(order = Constants.ORDER_DEGRADE_SLOT + 100)
public class DefaultDegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

private void performChecking(Context context, ResourceWrapper r) throws BlockException {

if (DegradeRuleManager.hasConfig(r.getName())) {
return;
}

List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName());

if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}

for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}

@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}

if (DegradeRuleManager.hasConfig(r.getName())) {
fireExit(context, r, count, args);
return;
}

List<CircuitBreaker> circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName());

if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}

if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}

fireExit(context, r, count, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot;
import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
Expand Down Expand Up @@ -70,6 +71,9 @@ public void testBuild() {
next = next.getNext();
assertTrue(next instanceof DegradeSlot);

next = next.getNext();
assertTrue(next instanceof DefaultDegradeSlot);

next = next.getNext();
assertNull(next);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest {

@Before
public void setUp() {
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>());
DegradeRuleManager.loadRules(new ArrayList<>());
}

@After
public void tearDown() throws Exception {
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>());
public void tearDown() {
DegradeRuleManager.loadRules(new ArrayList<>());
DefaultDegradeRuleManager.loadRules(new ArrayList<>());
}

@Test
public void testSlowRequestMode() throws Exception {
public void testSlowRequestMode() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 5;
Expand Down Expand Up @@ -115,7 +116,69 @@ public void testSlowRequestMode() throws Exception {
}

@Test
public void testExceptionRatioMode() throws Exception {
public void testSlowRequestModeUseDefaultRule() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 5;
int maxRt = 50;
int statIntervalMs = 20000;
int minRequestAmount = 10;
String res = "CircuitBreakingIntegrationTest_testSlowRequestModeUseDefaultRule";
EventObserverRegistry.getInstance().addStateChangeObserver(res, observer);

DefaultDegradeRuleManager.loadRules(Arrays.asList(
new DegradeRule(DefaultDegradeRuleManager.DEFAULT_KEY).setTimeWindow(retryTimeoutSec).setCount(maxRt)
.setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount)
.setSlowRatioThreshold(0.8d).setGrade(0)));

// Try first N requests where N = minRequestAmount.
for (int i = 0; i < minRequestAmount; i++) {
if (i < 7) {
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
} else {
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(-20, -10)));
}
}

// Till now slow ratio should be 70%.
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));
// Circuit breaker has transformed to OPEN since here.
verify(observer)
.onStateChange(eq(State.CLOSED), eq(State.OPEN), any(DegradeRule.class), anyDouble());
assertEquals(State.OPEN, DefaultDegradeRuleManager.getDefaultCircuitBreakers(res).get(0).currentState());
assertFalse(entryAndSleepFor(res, 1));

sleepSecond(1);
assertFalse(entryAndSleepFor(res, 1));
sleepSecond(retryTimeoutSec);
// Test HALF-OPEN to OPEN.
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));

verify(observer)
.onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class));
verify(observer)
.onStateChange(eq(State.HALF_OPEN), eq(State.OPEN), any(DegradeRule.class), anyDouble());
// Wait for next retry timeout;
reset(observer);
sleepSecond(retryTimeoutSec + 1);
assertTrue(entryAndSleepFor(res, maxRt - ThreadLocalRandom.current().nextInt(10, 20)));
verify(observer)
.onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class));
verify(observer)
.onStateChange(eq(State.HALF_OPEN), eq(State.CLOSED), any(DegradeRule.class), nullable(Double.class));
// Now circuit breaker has been closed.
assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20)));

EventObserverRegistry.getInstance().removeStateChangeObserver(res);
}

@Test
public void testExceptionRatioMode() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 5;
Expand Down Expand Up @@ -169,7 +232,7 @@ public void testExceptionRatioMode() throws Exception {
}

@Test
public void testExceptionCountMode() throws Throwable {
public void testExceptionCountMode() {
// TODO
}

Expand All @@ -188,7 +251,7 @@ private void verifyState(List<CircuitBreaker> breakers, int target) {
}

@Test
public void testMultipleHalfOpenedBreakers() throws Exception {
public void testMultipleHalfOpenedBreakers() {
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class);
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000);
int retryTimeoutSec = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.csp.sentinel.slotchain.SlotChainBuilder;
import com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder;
import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot;
import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
Expand Down Expand Up @@ -98,13 +99,14 @@ public void testLoadInstanceList() {
prototypeSlotClasses.add(NodeSelectorSlot.class);
prototypeSlotClasses.add(ClusterBuilderSlot.class);

List<Class<? extends ProcessorSlot>> singletonSlotClasses = new ArrayList<>(6);
List<Class<? extends ProcessorSlot>> singletonSlotClasses = new ArrayList<>(7);
singletonSlotClasses.add(LogSlot.class);
singletonSlotClasses.add(StatisticSlot.class);
singletonSlotClasses.add(AuthoritySlot.class);
singletonSlotClasses.add(SystemSlot.class);
singletonSlotClasses.add(FlowSlot.class);
singletonSlotClasses.add(DegradeSlot.class);
singletonSlotClasses.add(DefaultDegradeSlot.class);

for (int i = 0; i < slots1.size(); i++) {
ProcessorSlot slot1 = slots1.get(i);
Expand Down Expand Up @@ -148,7 +150,7 @@ public void testLoadInstanceListSorted() {
assertNotNull(sortedSlots);

// Total 8 default slot in sentinel-core
assertEquals(8, sortedSlots.size());
assertEquals(9, sortedSlots.size());

// Verify the order of slot
int index = 0;
Expand All @@ -160,6 +162,7 @@ public void testLoadInstanceListSorted() {
assertTrue(sortedSlots.get(index++) instanceof SystemSlot);
assertTrue(sortedSlots.get(index++) instanceof FlowSlot);
assertTrue(sortedSlots.get(index++) instanceof DegradeSlot);
assertTrue(sortedSlots.get(index++) instanceof DefaultDegradeSlot);
}

@Test
Expand All @@ -177,7 +180,7 @@ public void testLoadLowestPriorityInstance() {
assertNotNull(slot);

// NodeSelectorSlot is lowest order priority with @Spi(order = -1000) among all slots
assertTrue(slot instanceof DegradeSlot);
assertTrue(slot instanceof DefaultDegradeSlot);
}

@Test
Expand Down

0 comments on commit 061371b

Please sign in to comment.