Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]finish cluster concurrent flow control rule checker #1631

Merged
merged 4 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ public TokenResult requestParamToken(Long flowId, int acquireCount, Collection<O
}
}

@Override
public TokenResult requestConcurrentToken(String clientAddress, Long ruleId, int acquireCount) {
return null;
}

@Override
public void releaseConcurrentToken(Long tokenId) {
}

private void logForResult(TokenResult result) {
switch (result.getStatus()) {
case TokenResultStatus.NO_RULE_EXISTS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public final class ClusterConstants {
public static final int MSG_TYPE_PING = 0;
public static final int MSG_TYPE_FLOW = 1;
public static final int MSG_TYPE_PARAM_FLOW = 2;
public static final int MSG_TYPE_CONCURRENT_FLOW_ACQUIRE = 3;
public static final int MSG_TYPE_CONCURRENT_FLOW_RELEASE = 4;


public static final int RESPONSE_STATUS_BAD = -1;
public static final int RESPONSE_STATUS_OK = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author yunfeiyanggzq
*/
final public class ConcurrentClusterFlowChecker {

public static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
return count * connectedCount;
}
}

public static TokenResult acquireConcurrentToken(/*@Valid*/ String clientAddress, FlowRule rule, int acquireCount) {
long flowId = rule.getClusterConfig().getFlowId();
AtomicInteger nowCalls = CurrentConcurrencyManager.get(flowId);
if (nowCalls == null) {
RecordLog.warn("[ConcurrentClusterFlowChecker] Fail to get nowCalls by flowId<{}>", flowId);
return new TokenResult(TokenResultStatus.FAIL);
}

// check before enter the lock to improve the efficiency
if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
return new TokenResult(TokenResultStatus.BLOCKED);
}

// ensure the atomicity of operations
// lock different nowCalls to improve the efficiency
synchronized (nowCalls) {
// check again whether the request can pass.
if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
return new TokenResult(TokenResultStatus.BLOCKED);
} else {
nowCalls.getAndAdd(acquireCount);
}
}
ClusterServerStatLogUtil.log("concurrent|pass|" + flowId, acquireCount);
TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquireCount, clientAddress);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to pay attention to the memory and GC footprint when there are large amount of requests coming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the max memory used is decided by rules*maxConcurrency, I think memory used is controllable.every token information is small and the token can be replaced fastly during the process of acquiring and releasing, there may be some young gc in the case you mention, I think it can be acceptable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may provide benchmark of the scenario. Frequent YGC may need optimizing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it, Thanks !

Copy link
Contributor Author

@yunfeiyanggzq yunfeiyanggzq Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

图片
Because the process is too short, so the image is not statistically accurate.
图片
图片

the code is as follows:

       @Before
    public void setUp() {
        FlowRule rule = new FlowRule();
        ClusterFlowConfig config = new ClusterFlowConfig();
        config.setResourceTimeout(500);
        config.setClientOfflineTime(1000);
        config.setFlowId(111L);
        config.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL);
        rule.setClusterConfig(config);
        rule.setClusterMode(true);
        rule.setCount(1000);
        rule.setResource("test");
        rule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        ArrayList<FlowRule> rules = new ArrayList<>();
        rules.add(rule);
        ClusterFlowRuleManager.registerPropertyIfAbsent("1-name");
        ClusterFlowRuleManager.loadRules("1-name", rules);
    }
   
   @Test
    public void testConcurrentAcquireAndRelease() throws InterruptedException {
        setCurrentMillis(System.currentTimeMillis());
        final FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L);
        final CountDownLatch countDownLatch = new CountDownLatch(1000000);
        ExecutorService pool = Executors.newFixedThreadPool(100);
        final AtomicInteger success=new AtomicInteger(0);
        for (long i = 0; i < 1000000; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    assert rule != null;
                    TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1);
                    Assert.assertTrue("concurrent control fail", CurrentConcurrencyManager.get(111L).get() <= rule.getCount());
                    if (result.getStatus() == TokenResultStatus.OK) {
                        success.incrementAndGet();
                        ConcurrentClusterFlowChecker.releaseConcurrentToken(result.getTokenId());
                    }
                    countDownLatch.countDown();
                }
            };
            pool.execute(task);
        }
        countDownLatch.await();
        pool.shutdown();
        System.out.println(success.get()+"成功的");
        assert rule != null;
        Assert.assertTrue("fail to acquire and release token",
                CurrentConcurrencyManager.get(rule.getClusterConfig().getFlowId()).get() == 0 && TokenCacheNodeManager.getSize() == 0);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

每秒请求量越小gc越少

TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
TokenResult tokenResult = new TokenResult(TokenResultStatus.OK);
tokenResult.setTokenId(node.getTokenId());
return tokenResult;
}

public static TokenResult releaseConcurrentToken(/*@Valid*/ long tokenId) {
TokenCacheNode node = TokenCacheNodeManager.getTokenCacheNode(tokenId);
if (node == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released", tokenId);
return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
}
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId());
if (rule == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Fail to get rule by flowId<{}>", node.getFlowId());
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
if (TokenCacheNodeManager.removeTokenCacheNode(tokenId) == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released for flowId<{}>", tokenId, node.getFlowId());
return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
}
int acquireCount = node.getAcquireCount();
AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
nowCalls.getAndAdd(-1 * acquireCount);
ClusterServerStatLogUtil.log("concurrent|release|" + rule.getClusterConfig().getFlowId(), acquireCount);
return new TokenResult(TokenResultStatus.RELEASE_OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
*/
package com.alibaba.csp.sentinel.cluster.flow;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;

import java.util.Collection;

/**
* Default implementation for cluster {@link TokenService}.
*
Expand Down Expand Up @@ -61,10 +61,35 @@ public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<O
return ClusterParamFlowChecker.acquireClusterToken(rule, acquireCount, params);
}

@Override
public TokenResult requestConcurrentToken(String clientAddress, Long ruleId, int acquireCount) {
if (notValidRequest(clientAddress, ruleId, acquireCount)) {
return badRequest();
}
// The rule should be valid.
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
return ConcurrentClusterFlowChecker.acquireConcurrentToken(clientAddress, rule, acquireCount);
}

@Override
public void releaseConcurrentToken(Long tokenId) {
if (tokenId == null) {
return;
}
ConcurrentClusterFlowChecker.releaseConcurrentToken(tokenId);
}

private boolean notValidRequest(Long id, int count) {
return id == null || id <= 0 || count <= 0;
}

private boolean notValidRequest(String address, Long id, int count) {
return address == null || "".equals(address) || id == null || id <= 0 || count <= 0;
}

private TokenResult badRequest() {
return new TokenResult(TokenResultStatus.BAD_REQUEST);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
*/
package com.alibaba.csp.sentinel.cluster.flow.rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric;
import com.alibaba.csp.sentinel.cluster.server.ServerConstants;
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
Expand All @@ -41,6 +34,9 @@
import com.alibaba.csp.sentinel.util.function.Function;
import com.alibaba.csp.sentinel.util.function.Predicate;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* Manager for cluster flow rules.
*
Expand All @@ -54,12 +50,12 @@ public final class ClusterFlowRuleManager {
* for a specific namespace to do rule management manually.
*/
public static final Function<String, SentinelProperty<List<FlowRule>>> DEFAULT_PROPERTY_SUPPLIER =
new Function<String, SentinelProperty<List<FlowRule>>>() {
@Override
public SentinelProperty<List<FlowRule>> apply(String namespace) {
return new DynamicSentinelProperty<>();
}
};
new Function<String, SentinelProperty<List<FlowRule>>>() {
@Override
public SentinelProperty<List<FlowRule>> apply(String namespace) {
return new DynamicSentinelProperty<>();
}
};

/**
* (flowId, clusterRule)
Expand Down Expand Up @@ -87,7 +83,7 @@ public SentinelProperty<List<FlowRule>> apply(String namespace) {
* Cluster flow rule property supplier for a specific namespace.
*/
private static volatile Function<String, SentinelProperty<List<FlowRule>>> propertySupplier
= DEFAULT_PROPERTY_SUPPLIER;
= DEFAULT_PROPERTY_SUPPLIER;

private static final Object UPDATE_LOCK = new Object();

Expand Down Expand Up @@ -118,18 +114,18 @@ public static void register2Property(String namespace) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
if (propertySupplier == null) {
RecordLog.warn(
"[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property");
"[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property");
return;
}
SentinelProperty<List<FlowRule>> property = propertySupplier.apply(namespace);
if (property == null) {
RecordLog.warn(
"[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring");
"[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring");
return;
}
synchronized (UPDATE_LOCK) {
RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager"
+ " for namespace <{}>", namespace);
+ " for namespace <{}>", namespace);
registerPropertyInternal(namespace, property);
}
}
Expand Down Expand Up @@ -180,7 +176,7 @@ public static void removeProperty(String namespace) {
PROPERTY_MAP.remove(namespace);
}
RecordLog.info("[ClusterFlowRuleManager] Removing property from cluster flow rule manager"
+ " for namespace <{}>", namespace);
+ " for namespace <{}>", namespace);
}
}

Expand Down Expand Up @@ -253,7 +249,7 @@ public static List<FlowRule> getFlowRules(String namespace) {
* Load flow rules for a specific namespace. The former rules of the namespace will be replaced.
*
* @param namespace a valid namespace
* @param rules rule list
* @param rules rule list
*/
public static void loadRules(String namespace, List<FlowRule> rules) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
Expand All @@ -278,6 +274,9 @@ private static void clearAndResetRulesFor(/*@Valid*/ String namespace) {
for (Long flowId : flowIdSet) {
FLOW_RULES.remove(flowId);
FLOW_NAMESPACE_MAP.remove(flowId);
if (CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.remove(flowId);
}
}
flowIdSet.clear();
} else {
Expand All @@ -293,6 +292,9 @@ private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, P
FLOW_RULES.remove(flowId);
FLOW_NAMESPACE_MAP.remove(flowId);
ClusterMetricStatistics.removeMetric(flowId);
if (CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.remove(flowId);
}
}
}
oldIdSet.clear();
Expand Down Expand Up @@ -335,7 +337,7 @@ private static void applyClusterFlowRule(List<FlowRule> list, /*@Valid*/ String
}
if (!FlowRuleUtil.isValidRule(rule)) {
RecordLog.warn(
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
Expand All @@ -351,10 +353,13 @@ private static void applyClusterFlowRule(List<FlowRule> list, /*@Valid*/ String
ruleMap.put(flowId, rule);
FLOW_NAMESPACE_MAP.put(flowId, namespace);
flowIdSet.add(flowId);
if (!CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.put(flowId, 0);
}

// Prepare cluster metric from valid flow ID.
ClusterMetricStatistics.putMetricIfAbsent(flowId,
new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs()));
new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs()));
}

// Cleanup unused cluster metrics.
Expand All @@ -381,16 +386,17 @@ public FlowRulePropertyListener(String namespace) {
public synchronized void configUpdate(List<FlowRule> conf) {
applyClusterFlowRule(conf, namespace);
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received for namespace <{}>: {}",
namespace, FLOW_RULES);
namespace, FLOW_RULES);
}

@Override
public synchronized void configLoad(List<FlowRule> conf) {
applyClusterFlowRule(conf, namespace);
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded for namespace <{}>: {}",
namespace, FLOW_RULES);
namespace, FLOW_RULES);
}
}

private ClusterFlowRuleManager() {}
private ClusterFlowRuleManager() {
}
}
Loading