Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the flow of request to fix #1638 for circuit breakers #1645

Merged
merged 4 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
*/
package com.alibaba.csp.sentinel;

import java.util.Iterator;
import java.util.LinkedList;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.context.NullContext;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.util.function.BiConsumer;

/**
* Linked entry within current context.
Expand All @@ -35,6 +40,8 @@ class CtEntry extends Entry {

protected ProcessorSlot<Object> chain;
protected Context context;
protected LinkedList<BiConsumer<Context, Entry>> exitHandlers;


CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
Expand Down Expand Up @@ -102,10 +109,32 @@ protected void exitForContext(Context context, int count, Object... args) throws
protected void clearEntryContext() {
this.context = null;
}

@Override
public void whenComplete(BiConsumer<Context, Entry> consumer) {
if (this.exitHandlers == null) {
this.exitHandlers = new LinkedList<>();
}
this.exitHandlers.add(consumer);
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(context, count, args);

if (this.exitHandlers != null) {
Iterator<BiConsumer<Context, Entry>> it = this.exitHandlers.iterator();
BiConsumer<Context, Entry> cur;
while (it.hasNext()) {
cur = it.next();
try {
cur.accept(this.context, this);
} catch (Exception e) {
RecordLog.warn("Error invoking exit handler", e);
}
}
this.exitHandlers = null;
}

return parent;
}
Expand Down
10 changes: 10 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.util.function.BiConsumer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
Expand Down Expand Up @@ -178,4 +179,13 @@ public void setOriginNode(Node originNode) {
this.originNode = originNode;
}

/**
* Like `CompletableFuture` since JDK8 it guarantees specified consumer
* is invoked when this entry exited.
* Use it when you did some STATEFUL operations on entries.
*
* @param consumer
*/
public abstract void whenComplete(BiConsumer<Context, Entry> consumer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.spi.SpiOrder;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* A {@link ProcessorSlot} dedicates to circuit breaking.
Expand All @@ -40,18 +39,18 @@ public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(resourceWrapper);
performChecking(context, resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(ResourceWrapper r) throws BlockException {
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass()) {
if (!cb.tryPass(context, r)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
Expand All @@ -71,14 +70,9 @@ public void exit(Context context, ResourceWrapper r, int count, Object... args)
}

if (curEntry.getBlockError() == null) {
long completeTime = curEntry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - curEntry.getCreateTimestamp();
Throwable error = curEntry.getError();
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(rt, error);
circuitBreaker.onRequestComplete(context, r);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

import java.util.concurrent.atomic.AtomicReference;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.util.function.BiConsumer;

/**
* @author Eric Zhao
Expand Down Expand Up @@ -61,14 +65,14 @@ public State currentState() {
}

@Override
public boolean tryPass() {
public boolean tryPass(Context context, ResourceWrapper r) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for trial.
return retryTimeoutArrived() && fromOpenToHalfOpen();
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
Expand All @@ -91,30 +95,44 @@ protected boolean fromCloseToOpen(double snapshotValue) {
if (currentState.compareAndSet(prev, State.OPEN)) {
updateNextRetryTimestamp();

for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prev, State.OPEN, rule, snapshotValue);
}
notifyObservers(prev, State.OPEN, snapshotValue);
return true;
}
return false;
}

protected boolean fromOpenToHalfOpen() {
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null);
}
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenComplete(new BiConsumer<Context, Entry>() {
jasonjoo2010 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void accept(Context context, Entry entry) {
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
jasonjoo2010 marked this conversation as resolved.
Show resolved Hide resolved
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
return;
}

}
});
return true;
}
return false;
}

private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prevState, newState, rule, snapshotValue);
}
}

protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp();
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.HALF_OPEN, State.OPEN, rule, snapshotValue);
}
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
return true;
}
return false;
Expand All @@ -123,9 +141,7 @@ protected boolean fromHalfOpenToOpen(double snapshotValue) {
protected boolean fromHalfOpenToClose() {
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
resetStat();
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.HALF_OPEN, State.CLOSED, rule, null);
}
notifyObservers(State.HALF_OPEN, State.CLOSED, null);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;

/**
Expand All @@ -32,11 +34,13 @@ public interface CircuitBreaker {
DegradeRule getRule();

/**
* Acquires permission of an invocation only if it is available at the time of invocation.
* Acquires permission of an invocation only if it is available at the time of invoking.
*
* @param context
* @param r
* @return {@code true} if permission was acquired and {@code false} otherwise
*/
boolean tryPass();
boolean tryPass(Context context, ResourceWrapper r);

/**
* Get current state of the circuit breaker.
Expand All @@ -46,13 +50,12 @@ public interface CircuitBreaker {
State currentState();

/**
* Record a completed request with the given response time and error (if present) and
* handle state transformation of the circuit breaker.
* Called when a `passed` invocation finished.
*
* @param rt the response time of this entry
* @param error the error of this entry (if present)
* @param context context of current invocation
* @param wrapper current resource
*/
void onRequestComplete(long rt, Throwable error);
void onRequestComplete(Context context, ResourceWrapper wrapper);

/**
* Circuit breaker state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import java.util.List;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
Expand Down Expand Up @@ -60,7 +63,12 @@ protected void resetStat() {
}

@Override
public void onRequestComplete(long rt, Throwable error) {
public void onRequestComplete(Context context, ResourceWrapper r) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
Expand All @@ -74,14 +82,17 @@ private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}

if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}

List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

import java.util.List;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author Eric Zhao
Expand Down Expand Up @@ -57,8 +61,17 @@ public void resetStat() {
}

@Override
public void onRequestComplete(long rt, Throwable error) {
public void onRequestComplete(Context context, ResourceWrapper wrapper) {
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
Expand All @@ -71,7 +84,9 @@ private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}

if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.util.function;

/**
* BiConsumer interface from JDK 8.
*/
public interface BiConsumer<T, U> {

void accept(T t, U u);
}
Loading