Skip to content

Commit

Permalink
Reuse connections of the same address in ZooKeeper data-source (#788)
Browse files Browse the repository at this point in the history
* Fix the problem that too many connections (zkClient) are established
  • Loading branch information
linlinisme authored and sczyh30 committed Jun 27, 2019
1 parent dca4440 commit ba14676
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package com.alibaba.csp.sentinel.datasource.zookeeper;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;

import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* A read-only {@code DataSource} with ZooKeeper backend.
Expand All @@ -32,9 +32,13 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> {
private static final int RETRY_TIMES = 3;
private static final int SLEEP_TIME = 1000;

private static volatile Map<String, CuratorFramework> zkClientMap = new HashMap<>();
private static final Object lock = new Object();


private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"),
new ThreadPoolExecutor.DiscardOldestPolicy());
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"),
new ThreadPoolExecutor.DiscardOldestPolicy());

private NodeCacheListener listener;
private final String path;
Expand Down Expand Up @@ -116,16 +120,33 @@ public void nodeChanged() {
}
};

if (authInfos == null || authInfos.size() == 0) {
this.zkClient = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES));
String zkKey = getZkKey(serverAddr, authInfos);
if (zkClientMap.containsKey(zkKey)) {
this.zkClient = zkClientMap.get(zkKey);
} else {
this.zkClient = CuratorFrameworkFactory.builder().
connectString(serverAddr).
retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)).
authorization(authInfos).
build();
synchronized (lock) {
if (!zkClientMap.containsKey(zkKey)) {
CuratorFramework zc = null;
if (authInfos == null || authInfos.size() == 0) {
zc = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES));
} else {
zc = CuratorFrameworkFactory.builder().
connectString(serverAddr).
retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)).
authorization(authInfos).
build();
}
this.zkClient = zc;
this.zkClient.start();
Map<String, CuratorFramework> newZkClientMap = new HashMap<>(zkClientMap.size());
newZkClientMap.putAll(zkClientMap);
newZkClientMap.put(zkKey, zc);
zkClientMap = newZkClientMap;
} else {
this.zkClient = zkClientMap.get(zkKey);
}
}
}
this.zkClient.start();

this.nodeCache = new NodeCache(this.zkClient, this.path);
this.nodeCache.getListenable().addListener(this.listener, this.pool);
Expand Down Expand Up @@ -165,4 +186,31 @@ public void close() throws Exception {
private String getPath(String groupId, String dataId) {
return String.format("/%s/%s", groupId, dataId);
}

private String getZkKey(final String serverAddr, final List<AuthInfo> authInfos) {
if (authInfos == null || authInfos.size() == 0) {
return serverAddr;
}
StringBuilder builder = new StringBuilder(64);
builder.append(serverAddr).append(getAuthInfosKey(authInfos));
return builder.toString();
}

private String getAuthInfosKey(List<AuthInfo> authInfos) {
StringBuilder builder = new StringBuilder(32);
for (AuthInfo authInfo : authInfos) {
if (authInfo == null) {
builder.append("{}");
} else {
builder.append("{" + "sc=" + authInfo.getScheme() + ",au=" + Arrays.toString(authInfo.getAuth()) + "}");
}
}
return builder.toString();
}

protected CuratorFramework getZkClient() {
return this.zkClient;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
Expand All @@ -18,6 +19,7 @@
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
Expand All @@ -43,16 +45,17 @@ public void testZooKeeperDataSource() throws Exception {
final String path = "/sentinel-zk-ds-demo/flow-HK";

ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress, path,
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {});
}
});
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
});
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());

CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress,
new ExponentialBackoffRetry(3, 1000));
new ExponentialBackoffRetry(3, 1000));
zkClient.start();
Stat stat = zkClient.checkExists().forPath(path);
if (stat == null) {
Expand All @@ -67,6 +70,9 @@ public List<FlowRule> convert(String source) {
server.stop();
}




@Test
public void testZooKeeperDataSourceAuthorization() throws Exception {
TestingServer server = new TestingServer(21812);
Expand Down Expand Up @@ -116,21 +122,21 @@ public List<FlowRule> convert(String source) {

private void publishThenTestFor(CuratorFramework zkClient, String path, String resourceName, long count) throws Exception {
FlowRule rule = new FlowRule().setResource(resourceName)
.setLimitApp("default")
.as(FlowRule.class)
.setCount(count)
.setGrade(RuleConstant.FLOW_GRADE_QPS);
.setLimitApp("default")
.as(FlowRule.class)
.setCount(count)
.setGrade(RuleConstant.FLOW_GRADE_QPS);
String ruleString = JSON.toJSONString(Collections.singletonList(rule));
zkClient.setData().forPath(path, ruleString.getBytes());

await().timeout(5, TimeUnit.SECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
List<FlowRule> rules = FlowRuleManager.getRules();
return rules != null && !rules.isEmpty();
}
});
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
List<FlowRule> rules = FlowRuleManager.getRules();
return rules != null && !rules.isEmpty();
}
});

List<FlowRule> rules = FlowRuleManager.getRules();
boolean exists = false;
Expand All @@ -142,4 +148,77 @@ public Boolean call() throws Exception {
}
assertTrue(exists);
}


/**
* Test whether different dataSources can share the same zkClient when the connection parameters are the same.
* @throws Exception
*/
@Test
public void testZooKeeperDataSourceSameZkClient() throws Exception {
TestingServer server = new TestingServer(21813);
server.start();

final String remoteAddress = server.getConnectString();
final String flowPath = "/sentinel-zk-ds-demo/flow-HK";
final String degradePath = "/sentinel-zk-ds-demo/degrade-HK";


ZookeeperDataSource<List<FlowRule>> flowRuleZkDataSource = new ZookeeperDataSource<>(remoteAddress, flowPath,
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
});
ZookeeperDataSource<List<DegradeRule>> degradeRuleZkDataSource = new ZookeeperDataSource<>(remoteAddress, degradePath,
new Converter<String, List<DegradeRule>>() {
@Override
public List<DegradeRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {
});
}
});


Assert.assertTrue(flowRuleZkDataSource.getZkClient() == degradeRuleZkDataSource.getZkClient());


final String groupId = "sentinel-zk-ds-demo";
final String flowDataId = "flow-HK";
final String degradeDataId = "degrade-HK";
final String scheme = "digest";
final String auth = "root:123456";
AuthInfo authInfo = new AuthInfo(scheme, auth.getBytes());
List<AuthInfo> authInfoList = Collections.singletonList(authInfo);


ZookeeperDataSource<List<FlowRule>> flowRuleZkAutoDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress,
authInfoList, groupId, flowDataId,
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
});

ZookeeperDataSource<List<DegradeRule>> degradeRuleZkAutoDataSource = new ZookeeperDataSource<List<DegradeRule>>(remoteAddress,
authInfoList, groupId, degradeDataId,
new Converter<String, List<DegradeRule>>() {
@Override
public List<DegradeRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {
});
}
});

Assert.assertTrue(flowRuleZkAutoDataSource.getZkClient() == degradeRuleZkAutoDataSource.getZkClient());

server.stop();
}



}

0 comments on commit ba14676

Please sign in to comment.