Skip to content

Commit

Permalink
Refactor degrade hierarchy with new circuit breaker mechanism and imp…
Browse files Browse the repository at this point in the history
…rove strategy

* Add `CircuitBreaker` abstraction (with half-open state) and add circuit breaker state change event observer support.
* Improve circuit breaking strategy (avg RT → slow request ratio) and make statistics of each rule dependent (to support arbitrary statistic interval).
* Add simple "trial" mechanism (aka. half-open).
* Refactor mechanism of metric recording and state change handling for circuit breakers: record RT and error when requests have completed (i.e. `onExit`, based on alibaba#1420).

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
  • Loading branch information
sczyh30 authored and wavesZh committed Jul 28, 2020
1 parent 17c3ff7 commit 7d0724d
Show file tree
Hide file tree
Showing 10 changed files with 902 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,12 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade;

import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slots.block.AbstractRule;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Objects;

/**
* <p>
Expand All @@ -52,47 +45,45 @@
* </ul>
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class DegradeRule extends AbstractRule {

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));

public DegradeRule() {}

public DegradeRule(String resourceName) {
setResource(resourceName);
}

/**
* RT threshold or exception ratio threshold count.
* Circuit breaking strategy (0: average RT, 1: exception ratio, 2: exception count).
*/
private double count;
private int grade = RuleConstant.DEGRADE_GRADE_RT;

/**
* Degrade recover timeout (in seconds) when degradation occurs.
* Threshold count.
*/
private int timeWindow;
private double count;

/**
* Degrade strategy (0: average RT, 1: exception ratio, 2: exception count).
* Recovery timeout (in seconds) when circuit breaker opens. After the timeout, the circuit breaker will
* transform to half-open state for trying a few requests.
*/
private int grade = RuleConstant.DEGRADE_GRADE_RT;
private int timeWindow;

/**
* Minimum number of consecutive slow requests that can trigger RT circuit breaking.
* Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
*/
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;

/**
* Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
* The threshold of slow request ratio in RT mode.
*/
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
private double slowRatioThreshold = 1.0d;

private int statIntervalMs = 1000;

public int getGrade() {
return grade;
Expand Down Expand Up @@ -121,21 +112,30 @@ public DegradeRule setTimeWindow(int timeWindow) {
return this;
}

public int getRtSlowRequestAmount() {
return rtSlowRequestAmount;
public int getMinRequestAmount() {
return minRequestAmount;
}

public DegradeRule setRtSlowRequestAmount(int rtSlowRequestAmount) {
this.rtSlowRequestAmount = rtSlowRequestAmount;
public DegradeRule setMinRequestAmount(int minRequestAmount) {
this.minRequestAmount = minRequestAmount;
return this;
}

public int getMinRequestAmount() {
return minRequestAmount;
public double getSlowRatioThreshold() {
return slowRatioThreshold;
}

public DegradeRule setMinRequestAmount(int minRequestAmount) {
this.minRequestAmount = minRequestAmount;
public DegradeRule setSlowRatioThreshold(double slowRatioThreshold) {
this.slowRatioThreshold = slowRatioThreshold;
return this;
}

public int getStatIntervalMs() {
return statIntervalMs;
}

public DegradeRule setStatIntervalMs(int statIntervalMs) {
this.statIntervalMs = statIntervalMs;
return this;
}

Expand All @@ -144,23 +144,19 @@ public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
if (!super.equals(o)) { return false; }
DegradeRule that = (DegradeRule) o;
return Double.compare(that.count, count) == 0 &&
timeWindow == that.timeWindow &&
grade == that.grade &&
rtSlowRequestAmount == that.rtSlowRequestAmount &&
minRequestAmount == that.minRequestAmount;
DegradeRule rule = (DegradeRule)o;
return Double.compare(rule.count, count) == 0 &&
timeWindow == rule.timeWindow &&
grade == rule.grade &&
minRequestAmount == rule.minRequestAmount &&
Double.compare(rule.slowRatioThreshold, slowRatioThreshold) == 0 &&
statIntervalMs == rule.statIntervalMs;
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + new Double(count).hashCode();
result = 31 * result + timeWindow;
result = 31 * result + grade;
result = 31 * result + rtSlowRequestAmount;
result = 31 * result + minRequestAmount;
return result;
return Objects.hash(super.hashCode(), count, timeWindow, grade, minRequestAmount,
slowRatioThreshold, statIntervalMs);
}

@Override
Expand All @@ -171,84 +167,15 @@ public String toString() {
", count=" + count +
", limitApp=" + getLimitApp() +
", timeWindow=" + timeWindow +
", rtSlowRequestAmount=" + rtSlowRequestAmount +
", minRequestAmount=" + minRequestAmount +
"}";
", slowRatioThreshold=" + slowRatioThreshold +
", statIntervalMs=" + statIntervalMs +
'}';
}

// Internal implementation (will be deprecated and moved outside).

private AtomicLong passCount = new AtomicLong(0);
private final AtomicBoolean cut = new AtomicBoolean(false);

@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
if (cut.get()) {
return false;
}

ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}

if (grade == RuleConstant.DEGRADE_GRADE_RT) {
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}

// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
if (total < minRequestAmount) {
return true;
}

// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}

if (exception / success < count) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}

if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}

@Deprecated
public boolean passCheck(Context context, DefaultNode node, int count, Object... args) {
return false;
}

private static final class ResetTask implements Runnable {

private DegradeRule rule;

ResetTask(DegradeRule rule) {
this.rule = rule;
}

@Override
public void run() {
rule.passCount.set(0);
rule.cut.set(false);
}
}
}
Loading

0 comments on commit 7d0724d

Please sign in to comment.