From 4feb16a202005376740ec48af40d203505088afb Mon Sep 17 00:00:00 2001 From: yunfeiyanggzq Date: Fri, 4 Sep 2020 09:22:57 +0800 Subject: [PATCH] unit tests for cluster concurrent flow control checker Signed-off-by: yunfeiyanggzq --- .../ConcurrentClusterFlowCheckerTest.java | 125 ++++++++++++++++++ .../CurrentConcurrencyManagerTest.java | 49 +++++++ .../concurrent/TokenCacheNodeManagerTest.java | 79 +++++++++++ .../sentinel/test/AbstractTimeBasedTest.java | 57 ++++++++ 4 files changed, 310 insertions(+) create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowCheckerTest.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManagerTest.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManagerTest.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowCheckerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowCheckerTest.java new file mode 100644 index 0000000000..3a7e2e9618 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowCheckerTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow; + +import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.TokenResultStatus; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; +import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author yunfeiyanggzq + */ +public class ConcurrentClusterFlowCheckerTest extends AbstractTimeBasedTest { + @Before + public void setUp() { + FlowRule rule = new FlowRule(); + ClusterFlowConfig config = new ClusterFlowConfig(); + config.setResourceTimeout(500); + config.setClientOfflineTime(1000); + config.setFlowId(111L); + config.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL); + rule.setClusterConfig(config); + rule.setClusterMode(true); + rule.setCount(10); + rule.setResource("test"); + rule.setGrade(RuleConstant.FLOW_GRADE_THREAD); + ArrayList rules = new ArrayList<>(); + rules.add(rule); + ClusterFlowRuleManager.registerPropertyIfAbsent("1-name"); + ClusterFlowRuleManager.loadRules("1-name", rules); + } + + @Test + public void testEasyAcquireAndRelease() throws InterruptedException { + setCurrentMillis(System.currentTimeMillis()); + FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L); + ArrayList list = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1); + Assert.assertTrue("fail to acquire token", + result.getStatus() == TokenResultStatus.OK && result.getTokenId() != 0); + list.add(result.getTokenId()); + } + for (int i = 0; i < 10; i++) { + TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1); + Assert.assertTrue("fail to acquire block token", + result.getStatus() == TokenResultStatus.BLOCKED); + } + for (int i = 0; i < 10; i++) { + TokenResult result = ConcurrentClusterFlowChecker.releaseConcurrentToken(list.get(i)); + Assert.assertTrue("fail to release token", + result.getStatus() == TokenResultStatus.RELEASE_OK); + } + Assert.assertTrue("fail to release token", + CurrentConcurrencyManager.get(111L).get() == 0 && TokenCacheNodeManager.getSize() == 0); + } + + @Test + public void testConcurrentAcquireAndRelease() throws InterruptedException { + setCurrentMillis(System.currentTimeMillis()); + final FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L); + final CountDownLatch countDownLatch = new CountDownLatch(1000); + ExecutorService pool = Executors.newFixedThreadPool(100); + + for (long i = 0; i < 1000; i++) { + Runnable task = new Runnable() { + @Override + public void run() { + assert rule != null; + TokenResult result = ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1); + Assert.assertTrue("concurrent control fail", CurrentConcurrencyManager.get(111L).get() <= rule.getCount()); + if (result.getStatus() == TokenResultStatus.OK) { + ConcurrentClusterFlowChecker.releaseConcurrentToken(result.getTokenId()); + } + countDownLatch.countDown(); + } + }; + pool.execute(task); + } + countDownLatch.await(); + pool.shutdown(); + assert rule != null; + Assert.assertTrue("fail to acquire and release token", + CurrentConcurrencyManager.get(rule.getClusterConfig().getFlowId()).get() == 0 && TokenCacheNodeManager.getSize() == 0); + } + + @Test + public void testReleaseExpiredToken() throws InterruptedException { + ConnectionManager.addConnection("test", "127.0.0.1"); + FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(111L); + for (int i = 0; i < 10; i++) { + ConcurrentClusterFlowChecker.acquireConcurrentToken("127.0.0.1", rule, 1); + } + Thread.sleep(3000); + Assert.assertTrue("fail to acquire and release token", CurrentConcurrencyManager.get(rule.getClusterConfig().getFlowId()).get() == 0 && TokenCacheNodeManager.getSize() == 0); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManagerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManagerTest.java new file mode 100644 index 0000000000..dff3ca949d --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManagerTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent; + +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class CurrentConcurrencyManagerTest { + @Test + public void updateTest() throws InterruptedException { + CurrentConcurrencyManager.put(111L, 0); + CurrentConcurrencyManager.put(222L, 0); + final CountDownLatch countDownLatch = new CountDownLatch(1000); + ExecutorService pool = Executors.newFixedThreadPool(100); + for (int i = 0; i < 1000; i++) { + Runnable task = new Runnable() { + @Override + public void run() { + CurrentConcurrencyManager.addConcurrency(111L, 1); + CurrentConcurrencyManager.addConcurrency(222L, 2); + countDownLatch.countDown(); + } + }; + pool.execute(task); + } + countDownLatch.await(); + pool.shutdown(); + Assert.assertEquals(1000, CurrentConcurrencyManager.get(111L).get()); + Assert.assertEquals(2000, CurrentConcurrencyManager.get(222L).get()); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManagerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManagerTest.java new file mode 100644 index 0000000000..8ca4f97a40 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManagerTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent; + +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager; +import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class TokenCacheNodeManagerTest extends AbstractTimeBasedTest { + @Before + public void setUp() { + FlowRule rule = new FlowRule(); + ClusterFlowConfig config = new ClusterFlowConfig(); + config.setResourceTimeout(500); + config.setClientOfflineTime(1000); + config.setFlowId(111L); + config.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL); + rule.setClusterConfig(config); + rule.setClusterMode(true); + rule.setCount(10); + rule.setResource("test"); + rule.setGrade(RuleConstant.FLOW_GRADE_THREAD); + ArrayList rules = new ArrayList<>(); + rules.add(rule); + ClusterFlowRuleManager.registerPropertyIfAbsent("1-name"); + ClusterFlowRuleManager.loadRules("1-name", rules); + } + + @Test + public void testPutTokenCacheNode() throws InterruptedException { + setCurrentMillis(System.currentTimeMillis()); + + for (long i = 0; i < 100; i++) { + final TokenCacheNode node = new TokenCacheNode(); + node.setTokenId(i); + node.setFlowId(111L); + node.setResourceTimeout(10000L); + node.setClientTimeout(10000L); + node.setClientAddress("localhost"); + if (TokenCacheNodeManager.validToken(node)) { + TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node); + + } + } + Assert.assertEquals(100, TokenCacheNodeManager.getSize()); + for (int i = 0; i < 100; i++) { + TokenCacheNodeManager.getTokenCacheNode((long) (Math.random() * 100)); + } + List keyList = new ArrayList<>(TokenCacheNodeManager.getCacheKeySet()); + for (int i = 0; i < 100; i++) { + Assert.assertEquals(i, (long) keyList.get(i)); + TokenCacheNodeManager.removeTokenCacheNode(i); + } + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java new file mode 100644 index 0000000000..6e8bab4d1e --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.test; + +import com.alibaba.csp.sentinel.util.TimeUtil; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Mock support for {@link TimeUtil} + * + * @author jason + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({TimeUtil.class}) +public abstract class AbstractTimeBasedTest { + + private long currentMillis = 0; + + { + PowerMockito.mockStatic(TimeUtil.class); + PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis); + } + + protected final void useActualTime() { + PowerMockito.when(TimeUtil.currentTimeMillis()).thenCallRealMethod(); + } + + protected final void setCurrentMillis(long cur) { + currentMillis = cur; + PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis); + } + + protected final void sleep(int t) { + currentMillis += t; + PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis); + } + + protected final void sleepSecond(int timeSec) { + sleep(timeSec * 1000); + } +}