diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DefaultDegradeRuleManager.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DefaultDegradeRuleManager.java new file mode 100644 index 0000000000..06214c3d8d --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DefaultDegradeRuleManager.java @@ -0,0 +1,134 @@ +package com.alibaba.csp.sentinel.slots.block.degrade; + +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; +import com.alibaba.csp.sentinel.property.PropertyListener; +import com.alibaba.csp.sentinel.property.SentinelProperty; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ResponseTimeCircuitBreaker; +import com.alibaba.csp.sentinel.util.StringUtil; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * @author wuwen + */ +public class DefaultDegradeRuleManager { + + public static final String DEFAULT_KEY = "*"; + + private static volatile Map> circuitBreakers = new ConcurrentHashMap<>(); + private static volatile Set rules = new HashSet<>(); + + private static final DefaultDegradeRuleManager.RulePropertyListener LISTENER = new DefaultDegradeRuleManager.RulePropertyListener(); + private static SentinelProperty> currentProperty + = new DynamicSentinelProperty<>(); + + static { + currentProperty.addListener(LISTENER); + } + + + static List getDefaultCircuitBreakers(String resourceName) { + List circuitBreakers = DefaultDegradeRuleManager.circuitBreakers.get(resourceName); + if (circuitBreakers == null && !rules.isEmpty()) { + return DefaultDegradeRuleManager.circuitBreakers.computeIfAbsent(resourceName, + r -> rules.stream().map(DefaultDegradeRuleManager::newCircuitBreakerFrom).collect(Collectors.toList())); + } + return circuitBreakers; + } + + /** + * Load {@link DegradeRule}s, former rules will be replaced. + * + * @param rules new rules to load. + */ + public static void loadRules(List rules) { + try { + currentProperty.updateValue(rules); + } catch (Throwable e) { + RecordLog.error("[DefaultDegradeRuleManager] Unexpected error when loading degrade rules", e); + } + } + + public static boolean isValidDefaultRule(DegradeRule rule) { + if (!DegradeRuleManager.isValidRule(rule)) { + return false; + } + + return rule.getResource().equals(DEFAULT_KEY); + } + + /** + * Create a circuit breaker instance from provided circuit breaking rule. + * + * @param rule a valid circuit breaking rule + * @return new circuit breaker based on provided rule; null if rule is invalid or unsupported type + */ + private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) { + switch (rule.getGrade()) { + case RuleConstant.DEGRADE_GRADE_RT: + return new ResponseTimeCircuitBreaker(rule); + case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO: + case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT: + return new ExceptionCircuitBreaker(rule); + default: + return null; + } + } + + + private static class RulePropertyListener implements PropertyListener> { + + private synchronized void reloadFrom(List list) { + + if (list == null) { + return; + } + + Set rules = new HashSet<>(); + List cbs = new ArrayList<>(); + + for (DegradeRule rule : list) { + if (!isValidDefaultRule(rule)) { + RecordLog.warn("[DefaultDegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule); + } else { + + if (StringUtil.isBlank(rule.getLimitApp())) { + rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); + } + CircuitBreaker cb = newCircuitBreakerFrom(rule); + cbs.add(cb); + rules.add(rule); + } + } + + Map> cbMap = new ConcurrentHashMap<>(8); + + DefaultDegradeRuleManager.circuitBreakers.forEach((k, v) -> cbMap.put(k, cbs)); + + DefaultDegradeRuleManager.rules = rules; + DefaultDegradeRuleManager.circuitBreakers = cbMap; + } + + @Override + public void configUpdate(List conf) { + reloadFrom(conf); + RecordLog.info("[DefaultDegradeRuleManager] Degrade rules has been updated to: {}", rules); + } + + @Override + public void configLoad(List conf) { + reloadFrom(conf); + RecordLog.info("[DefaultDegradeRuleManager] Degrade rules loaded: {}", rules); + } + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DefaultDegradeSlot.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DefaultDegradeSlot.java new file mode 100644 index 0000000000..4d2c97fc9a --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DefaultDegradeSlot.java @@ -0,0 +1,76 @@ +package com.alibaba.csp.sentinel.slots.block.degrade; + +import com.alibaba.csp.sentinel.Constants; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.node.DefaultNode; +import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; +import com.alibaba.csp.sentinel.spi.Spi; + +import java.util.List; + +/** + * @author wuwen + */ +@Spi(order = Constants.ORDER_DEGRADE_SLOT + 100) +public class DefaultDegradeSlot extends AbstractLinkedProcessorSlot { + + @Override + public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, + boolean prioritized, Object... args) throws Throwable { + performChecking(context, resourceWrapper); + + fireEntry(context, resourceWrapper, node, count, prioritized, args); + } + + private void performChecking(Context context, ResourceWrapper r) throws BlockException { + + if (DegradeRuleManager.hasConfig(r.getName())) { + return; + } + + List circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName()); + + if (circuitBreakers == null || circuitBreakers.isEmpty()) { + return; + } + + for (CircuitBreaker cb : circuitBreakers) { + if (!cb.tryPass(context)) { + throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); + } + } + } + + @Override + public void exit(Context context, ResourceWrapper r, int count, Object... args) { + Entry curEntry = context.getCurEntry(); + if (curEntry.getBlockError() != null) { + fireExit(context, r, count, args); + return; + } + + if (DegradeRuleManager.hasConfig(r.getName())) { + fireExit(context, r, count, args); + return; + } + + List circuitBreakers = DefaultDegradeRuleManager.getDefaultCircuitBreakers(r.getName()); + + if (circuitBreakers == null || circuitBreakers.isEmpty()) { + return; + } + + if (curEntry.getBlockError() == null) { + // passed request + for (CircuitBreaker circuitBreaker : circuitBreakers) { + circuitBreaker.onRequestComplete(context); + } + } + + fireExit(context, r, count, args); + } +} diff --git a/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot b/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot index afd9777162..2e887a9350 100644 --- a/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot +++ b/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot @@ -6,4 +6,5 @@ com.alibaba.csp.sentinel.slots.statistic.StatisticSlot com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot com.alibaba.csp.sentinel.slots.system.SystemSlot com.alibaba.csp.sentinel.slots.block.flow.FlowSlot -com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot \ No newline at end of file +com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot +com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot \ No newline at end of file diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/DefaultSlotChainBuilderTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/DefaultSlotChainBuilderTest.java index ecdea6dad1..489843f19f 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/DefaultSlotChainBuilderTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/DefaultSlotChainBuilderTest.java @@ -18,6 +18,7 @@ import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot; +import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot; import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; @@ -70,6 +71,9 @@ public void testBuild() { next = next.getNext(); assertTrue(next instanceof DegradeSlot); + next = next.getNext(); + assertTrue(next instanceof DefaultDegradeSlot); + next = next.getNext(); assertNull(next); diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java index eb9dac797c..0716f6e5c3 100755 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java @@ -44,16 +44,17 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest { @Before public void setUp() { - DegradeRuleManager.loadRules(new ArrayList()); + DegradeRuleManager.loadRules(new ArrayList<>()); } @After - public void tearDown() throws Exception { - DegradeRuleManager.loadRules(new ArrayList()); + public void tearDown() { + DegradeRuleManager.loadRules(new ArrayList<>()); + DefaultDegradeRuleManager.loadRules(new ArrayList<>()); } @Test - public void testSlowRequestMode() throws Exception { + public void testSlowRequestMode() { CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); setCurrentMillis(System.currentTimeMillis() / 1000 * 1000); int retryTimeoutSec = 5; @@ -115,7 +116,69 @@ public void testSlowRequestMode() throws Exception { } @Test - public void testExceptionRatioMode() throws Exception { + public void testSlowRequestModeUseDefaultRule() { + CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); + setCurrentMillis(System.currentTimeMillis() / 1000 * 1000); + int retryTimeoutSec = 5; + int maxRt = 50; + int statIntervalMs = 20000; + int minRequestAmount = 10; + String res = "CircuitBreakingIntegrationTest_testSlowRequestModeUseDefaultRule"; + EventObserverRegistry.getInstance().addStateChangeObserver(res, observer); + + DefaultDegradeRuleManager.loadRules(Arrays.asList( + new DegradeRule(DefaultDegradeRuleManager.DEFAULT_KEY).setTimeWindow(retryTimeoutSec).setCount(maxRt) + .setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount) + .setSlowRatioThreshold(0.8d).setGrade(0))); + + // Try first N requests where N = minRequestAmount. + for (int i = 0; i < minRequestAmount; i++) { + if (i < 7) { + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + } else { + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(-20, -10))); + } + } + + // Till now slow ratio should be 70%. + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + // Circuit breaker has transformed to OPEN since here. + verify(observer) + .onStateChange(eq(State.CLOSED), eq(State.OPEN), any(DegradeRule.class), anyDouble()); + assertEquals(State.OPEN, DefaultDegradeRuleManager.getDefaultCircuitBreakers(res).get(0).currentState()); + assertFalse(entryAndSleepFor(res, 1)); + + sleepSecond(1); + assertFalse(entryAndSleepFor(res, 1)); + sleepSecond(retryTimeoutSec); + // Test HALF-OPEN to OPEN. + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + + verify(observer) + .onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class)); + verify(observer) + .onStateChange(eq(State.HALF_OPEN), eq(State.OPEN), any(DegradeRule.class), anyDouble()); + // Wait for next retry timeout; + reset(observer); + sleepSecond(retryTimeoutSec + 1); + assertTrue(entryAndSleepFor(res, maxRt - ThreadLocalRandom.current().nextInt(10, 20))); + verify(observer) + .onStateChange(eq(State.OPEN), eq(State.HALF_OPEN), any(DegradeRule.class), nullable(Double.class)); + verify(observer) + .onStateChange(eq(State.HALF_OPEN), eq(State.CLOSED), any(DegradeRule.class), nullable(Double.class)); + // Now circuit breaker has been closed. + assertTrue(entryAndSleepFor(res, maxRt + ThreadLocalRandom.current().nextInt(10, 20))); + + EventObserverRegistry.getInstance().removeStateChangeObserver(res); + } + + @Test + public void testExceptionRatioMode() { CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); setCurrentMillis(System.currentTimeMillis() / 1000 * 1000); int retryTimeoutSec = 5; @@ -169,7 +232,7 @@ public void testExceptionRatioMode() throws Exception { } @Test - public void testExceptionCountMode() throws Throwable { + public void testExceptionCountMode() { // TODO } @@ -188,7 +251,7 @@ private void verifyState(List breakers, int target) { } @Test - public void testMultipleHalfOpenedBreakers() throws Exception { + public void testMultipleHalfOpenedBreakers() { CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); setCurrentMillis(System.currentTimeMillis() / 1000 * 1000); int retryTimeoutSec = 2; diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/spi/SpiLoaderTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/spi/SpiLoaderTest.java index 7eb8f13e75..6fde7524f2 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/spi/SpiLoaderTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/spi/SpiLoaderTest.java @@ -22,6 +22,7 @@ import com.alibaba.csp.sentinel.slotchain.SlotChainBuilder; import com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder; import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot; +import com.alibaba.csp.sentinel.slots.block.degrade.DefaultDegradeSlot; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot; import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; @@ -98,13 +99,14 @@ public void testLoadInstanceList() { prototypeSlotClasses.add(NodeSelectorSlot.class); prototypeSlotClasses.add(ClusterBuilderSlot.class); - List> singletonSlotClasses = new ArrayList<>(6); + List> singletonSlotClasses = new ArrayList<>(7); singletonSlotClasses.add(LogSlot.class); singletonSlotClasses.add(StatisticSlot.class); singletonSlotClasses.add(AuthoritySlot.class); singletonSlotClasses.add(SystemSlot.class); singletonSlotClasses.add(FlowSlot.class); singletonSlotClasses.add(DegradeSlot.class); + singletonSlotClasses.add(DefaultDegradeSlot.class); for (int i = 0; i < slots1.size(); i++) { ProcessorSlot slot1 = slots1.get(i); @@ -148,7 +150,7 @@ public void testLoadInstanceListSorted() { assertNotNull(sortedSlots); // Total 8 default slot in sentinel-core - assertEquals(8, sortedSlots.size()); + assertEquals(9, sortedSlots.size()); // Verify the order of slot int index = 0; @@ -160,6 +162,7 @@ public void testLoadInstanceListSorted() { assertTrue(sortedSlots.get(index++) instanceof SystemSlot); assertTrue(sortedSlots.get(index++) instanceof FlowSlot); assertTrue(sortedSlots.get(index++) instanceof DegradeSlot); + assertTrue(sortedSlots.get(index++) instanceof DefaultDegradeSlot); } @Test @@ -177,7 +180,7 @@ public void testLoadLowestPriorityInstance() { assertNotNull(slot); // NodeSelectorSlot is lowest order priority with @Spi(order = -1000) among all slots - assertTrue(slot instanceof DegradeSlot); + assertTrue(slot instanceof DefaultDegradeSlot); } @Test