forked from alibaba/Sentinel
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Eureka data-source extension (alibaba#1502)
- Loading branch information
1 parent
424c91e
commit a297d06
Showing
7 changed files
with
475 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
# Sentinel DataSource Eureka | ||
|
||
Sentinel DataSource Eureka provides integration with [Eureka](https://github.com/Netflix/eureka) so that Eureka | ||
can be the dynamic rule data source of Sentinel. | ||
|
||
To use Sentinel DataSource Eureka, you should add the following dependency: | ||
|
||
```xml | ||
<dependency> | ||
<groupId>com.alibaba.csp</groupId> | ||
<artifactId>sentinel-datasource-eureka</artifactId> | ||
<version>x.y.z</version> | ||
</dependency> | ||
``` | ||
|
||
Then you can create an `EurekaDataSource` and register to rule managers. | ||
|
||
SDK usage: | ||
|
||
```java | ||
EurekaDataSource<List<FlowRule>> eurekaDataSource = new EurekaDataSource("app-id", "instance-id", | ||
Arrays.asList("http://localhost:8761/eureka", "http://localhost:8762/eureka", "http://localhost:8763/eureka"), | ||
"rule-key", new Converter<String, List<FlowRule>>() { | ||
@Override | ||
public List<FlowRule> convert(String o) { | ||
return JSON.parseObject(o, new TypeReference<List<FlowRule>>() { | ||
}); | ||
} | ||
}); | ||
FlowRuleManager.register2Property(eurekaDataSource.getProperty()); | ||
``` | ||
|
||
Example for Spring Cloud Application: | ||
|
||
```java | ||
@Bean | ||
public EurekaDataSource<List<FlowRule>> eurekaDataSource(EurekaInstanceConfig eurekaInstanceConfig, EurekaClientConfig eurekaClientConfig) { | ||
|
||
List<String> serviceUrls = EndpointUtils.getServiceUrlsFromConfig(eurekaClientConfig, | ||
eurekaInstanceConfig.getMetadataMap().get("zone"), eurekaClientConfig.shouldPreferSameZoneEureka()); | ||
|
||
EurekaDataSource<List<FlowRule>> eurekaDataSource = new EurekaDataSource(eurekaInstanceConfig.getAppname(), | ||
eurekaInstanceConfig.getInstanceId(), serviceUrls, "flowrules", new Converter<String, List<FlowRule>>() { | ||
@Override | ||
public List<FlowRule> convert(String o) { | ||
return JSON.parseObject(o, new TypeReference<List<FlowRule>>() { | ||
}); | ||
} | ||
}); | ||
|
||
FlowRuleManager.register2Property(eurekaDataSource.getProperty()); | ||
return eurekaDataSource; | ||
} | ||
|
||
``` | ||
|
||
To refresh the rule dynamically,you need to call [Eureka-REST-operations](https://github.com/Netflix/eureka/wiki/Eureka-REST-operations) | ||
to update instance metadata: | ||
|
||
``` | ||
PUT /eureka/apps/{appID}/{instanceID}/metadata?{ruleKey}={json of the rules} | ||
``` | ||
|
||
Note: don't forget to encode your json string in the url. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>sentinel-extension</artifactId> | ||
<groupId>com.alibaba.csp</groupId> | ||
<version>1.8.0-SNAPSHOT</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>sentinel-datasource-eureka</artifactId> | ||
|
||
<properties> | ||
<spring.cloud.version>2.1.2.RELEASE</spring.cloud.version> | ||
</properties> | ||
|
||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>com.alibaba.csp</groupId> | ||
<artifactId>sentinel-datasource-extension</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.alibaba</groupId> | ||
<artifactId>fastjson</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.awaitility</groupId> | ||
<artifactId>awaitility</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-test</artifactId> | ||
<version>${spring.cloud.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.springframework.cloud</groupId> | ||
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> | ||
<version>${spring.cloud.version}</version> | ||
<scope>test</scope> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>com.google.code.gson</groupId> | ||
<artifactId>gson</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
|
||
|
||
</dependencies> | ||
|
||
|
||
</project> |
213 changes: 213 additions & 0 deletions
213
...rce-eureka/src/main/java/com/alibaba/csp/sentinel/datasource/eureka/EurekaDataSource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
/* | ||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.alibaba.csp.sentinel.datasource.eureka; | ||
|
||
import com.alibaba.csp.sentinel.datasource.AutoRefreshDataSource; | ||
import com.alibaba.csp.sentinel.datasource.Converter; | ||
import com.alibaba.csp.sentinel.datasource.ReadableDataSource; | ||
import com.alibaba.csp.sentinel.log.RecordLog; | ||
import com.alibaba.csp.sentinel.util.AssertUtil; | ||
import com.alibaba.csp.sentinel.util.StringUtil; | ||
import com.alibaba.fastjson.JSON; | ||
|
||
import java.io.*; | ||
import java.net.HttpURLConnection; | ||
import java.net.InetAddress; | ||
import java.net.URL; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
/** | ||
* <p> | ||
* A {@link ReadableDataSource} based on Eureka. This class will automatically | ||
* fetches the metadata of the instance every period. | ||
* </p> | ||
* <p> | ||
* Limitations: Default refresh interval is 10s. Because there is synchronization between eureka servers, | ||
* it may take longer to take effect. | ||
* </p> | ||
* | ||
* @author: liyang | ||
* @create: 2020-05-23 12:01 | ||
*/ | ||
public class EurekaDataSource<T> extends AutoRefreshDataSource<String, T> { | ||
|
||
private static final long DEFAULT_REFRESH_MS = 10000; | ||
|
||
/** | ||
* connect timeout: 3s | ||
*/ | ||
private static final int DEFAULT_CONNECT_TIMEOUT_MS = 3000; | ||
|
||
/** | ||
* read timeout: 30s | ||
*/ | ||
private static final int DEFAULT_READ_TIMEOUT_MS = 30000; | ||
|
||
|
||
private int connectTimeoutMills; | ||
|
||
|
||
private int readTimeoutMills; | ||
|
||
/** | ||
* eureka instance appid | ||
*/ | ||
private String appId; | ||
/** | ||
* eureka instance id | ||
*/ | ||
private String instanceId; | ||
|
||
/** | ||
* collect of eureka server urls | ||
*/ | ||
private List<String> serviceUrls; | ||
|
||
/** | ||
* metadata key of the rule source | ||
*/ | ||
private String ruleKey; | ||
|
||
|
||
public EurekaDataSource(String appId, String instanceId, List<String> serviceUrls, String ruleKey, | ||
Converter<String, T> configParser) { | ||
this(appId, instanceId, serviceUrls, ruleKey, configParser, DEFAULT_REFRESH_MS, DEFAULT_CONNECT_TIMEOUT_MS, DEFAULT_READ_TIMEOUT_MS); | ||
} | ||
|
||
|
||
public EurekaDataSource(String appId, String instanceId, List<String> serviceUrls, String ruleKey, | ||
Converter<String, T> configParser, long refreshMs, int connectTimeoutMills, | ||
int readTimeoutMills) { | ||
super(configParser, refreshMs); | ||
AssertUtil.notNull(appId, "appId can't be null"); | ||
AssertUtil.notNull(instanceId, "instanceId can't be null"); | ||
AssertUtil.assertNotEmpty(serviceUrls, "serviceUrls can't be empty"); | ||
AssertUtil.notNull(ruleKey, "ruleKey can't be null"); | ||
AssertUtil.assertState(connectTimeoutMills > 0, "connectTimeoutMills must be greater than 0"); | ||
AssertUtil.assertState(readTimeoutMills > 0, "readTimeoutMills must be greater than 0"); | ||
|
||
this.appId = appId; | ||
this.instanceId = instanceId; | ||
this.serviceUrls = ensureEndWithSlash(serviceUrls); | ||
AssertUtil.assertNotEmpty(this.serviceUrls, "No available service url"); | ||
this.ruleKey = ruleKey; | ||
this.connectTimeoutMills = connectTimeoutMills; | ||
this.readTimeoutMills = readTimeoutMills; | ||
} | ||
|
||
|
||
private List<String> ensureEndWithSlash(List<String> serviceUrls) { | ||
List<String> newServiceUrls = new ArrayList<>(); | ||
for (String serviceUrl : serviceUrls) { | ||
if (StringUtil.isBlank(serviceUrl)) { | ||
continue; | ||
} | ||
if (!serviceUrl.endsWith("/")) { | ||
serviceUrl = serviceUrl + "/"; | ||
} | ||
newServiceUrls.add(serviceUrl); | ||
} | ||
return newServiceUrls; | ||
} | ||
|
||
@Override | ||
public String readSource() throws Exception { | ||
return fetchStringSourceFromEurekaMetadata(this.appId, this.instanceId, this.serviceUrls, ruleKey); | ||
} | ||
|
||
|
||
private String fetchStringSourceFromEurekaMetadata(String appId, String instanceId, List<String> serviceUrls, | ||
String ruleKey) throws Exception { | ||
List<String> shuffleUrls = new ArrayList<>(serviceUrls.size()); | ||
shuffleUrls.addAll(serviceUrls); | ||
Collections.shuffle(shuffleUrls); | ||
for (int i = 0; i < shuffleUrls.size(); i++) { | ||
String serviceUrl = shuffleUrls.get(i) + String.format("apps/%s/%s", appId, instanceId); | ||
HttpURLConnection conn = null; | ||
try { | ||
conn = (HttpURLConnection) new URL(serviceUrl).openConnection(); | ||
conn.addRequestProperty("Accept", "application/json;charset=utf-8"); | ||
|
||
conn.setConnectTimeout(connectTimeoutMills); | ||
conn.setReadTimeout(readTimeoutMills); | ||
conn.setRequestMethod("GET"); | ||
conn.setDoOutput(true); | ||
conn.connect(); | ||
RecordLog.debug("[EurekaDataSource] Request from eureka server: " + serviceUrl); | ||
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) { | ||
String s = toString(conn.getInputStream()); | ||
String ruleString = JSON.parseObject(s) | ||
.getJSONObject("instance") | ||
.getJSONObject("metadata") | ||
.getString(ruleKey); | ||
return ruleString; | ||
} | ||
RecordLog.warn("[EurekaDataSource] Warn: retrying on another server if available " + | ||
"due to response code: {}, response message: {}", conn.getResponseCode(), toString(conn.getErrorStream())); | ||
} catch (Exception e) { | ||
try { | ||
if (conn != null) { | ||
RecordLog.warn("[EurekaDataSource] Warn: failed to request " + conn.getURL() + " from " | ||
+ InetAddress.getByName(conn.getURL().getHost()).getHostAddress(), e); | ||
} | ||
} catch (Exception e1) { | ||
RecordLog.warn("[EurekaDataSource] Warn: failed to request ", e1); | ||
//ignore | ||
} | ||
RecordLog.warn("[EurekaDataSource] Warn: failed to request,retrying on another server if available"); | ||
|
||
} finally { | ||
if (conn != null) { | ||
conn.disconnect(); | ||
} | ||
} | ||
} | ||
throw new EurekaMetadataFetchException("Can't get any data"); | ||
} | ||
|
||
|
||
public static class EurekaMetadataFetchException extends Exception { | ||
|
||
public EurekaMetadataFetchException(String message) { | ||
super(message); | ||
} | ||
} | ||
|
||
|
||
private String toString(InputStream input) throws IOException { | ||
if (input == null) { | ||
return null; | ||
} | ||
InputStreamReader inputStreamReader = new InputStreamReader(input, "utf-8"); | ||
CharArrayWriter sw = new CharArrayWriter(); | ||
copy(inputStreamReader, sw); | ||
return sw.toString(); | ||
} | ||
|
||
private long copy(Reader input, Writer output) throws IOException { | ||
char[] buffer = new char[1 << 12]; | ||
long count = 0; | ||
for (int n = 0; (n = input.read(buffer)) >= 0; ) { | ||
output.write(buffer, 0, n); | ||
count += n; | ||
} | ||
return count; | ||
} | ||
|
||
|
||
} |
Oops, something went wrong.