Skip to content

Commit

Permalink
Add Eureka data-source extension (alibaba#1502)
Browse files Browse the repository at this point in the history
  • Loading branch information
pleasecheckhere2016 authored and mastertiller committed Aug 20, 2020
1 parent 467fc91 commit 95cf77b
Show file tree
Hide file tree
Showing 7 changed files with 475 additions and 0 deletions.
1 change: 1 addition & 0 deletions sentinel-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<module>sentinel-datasource-spring-cloud-config</module>
<module>sentinel-datasource-consul</module>
<module>sentinel-datasource-etcd</module>
<module>sentinel-datasource-eureka</module>
<module>sentinel-annotation-cdi-interceptor</module>
</modules>

Expand Down
64 changes: 64 additions & 0 deletions sentinel-extension/sentinel-datasource-eureka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Sentinel DataSource Eureka

Sentinel DataSource Eureka provides integration with [Eureka](https://github.com/Netflix/eureka) so that Eureka
can be the dynamic rule data source of Sentinel.

To use Sentinel DataSource Eureka, you should add the following dependency:

```xml
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-eureka</artifactId>
<version>x.y.z</version>
</dependency>
```

Then you can create an `EurekaDataSource` and register to rule managers.

SDK usage:

```java
EurekaDataSource<List<FlowRule>> eurekaDataSource = new EurekaDataSource("app-id", "instance-id",
Arrays.asList("http://localhost:8761/eureka", "http://localhost:8762/eureka", "http://localhost:8763/eureka"),
"rule-key", new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String o) {
return JSON.parseObject(o, new TypeReference<List<FlowRule>>() {
});
}
});
FlowRuleManager.register2Property(eurekaDataSource.getProperty());
```

Example for Spring Cloud Application:

```java
@Bean
public EurekaDataSource<List<FlowRule>> eurekaDataSource(EurekaInstanceConfig eurekaInstanceConfig, EurekaClientConfig eurekaClientConfig) {

List<String> serviceUrls = EndpointUtils.getServiceUrlsFromConfig(eurekaClientConfig,
eurekaInstanceConfig.getMetadataMap().get("zone"), eurekaClientConfig.shouldPreferSameZoneEureka());

EurekaDataSource<List<FlowRule>> eurekaDataSource = new EurekaDataSource(eurekaInstanceConfig.getAppname(),
eurekaInstanceConfig.getInstanceId(), serviceUrls, "flowrules", new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String o) {
return JSON.parseObject(o, new TypeReference<List<FlowRule>>() {
});
}
});

FlowRuleManager.register2Property(eurekaDataSource.getProperty());
return eurekaDataSource;
}

```

To refresh the rule dynamically,you need to call [Eureka-REST-operations](https://github.com/Netflix/eureka/wiki/Eureka-REST-operations)
to update instance metadata:

```
PUT /eureka/apps/{appID}/{instanceID}/metadata?{ruleKey}={json of the rules}
```

Note: don't forget to encode your json string in the url.
66 changes: 66 additions & 0 deletions sentinel-extension/sentinel-datasource-eureka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-extension</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-datasource-eureka</artifactId>

<properties>
<spring.cloud.version>2.1.2.RELEASE</spring.cloud.version>
</properties>


<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.cloud.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
<version>${spring.cloud.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
</exclusions>
</dependency>


</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.datasource.eureka;

import com.alibaba.csp.sentinel.datasource.AutoRefreshDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSON;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* <p>
* A {@link ReadableDataSource} based on Eureka. This class will automatically
* fetches the metadata of the instance every period.
* </p>
* <p>
* Limitations: Default refresh interval is 10s. Because there is synchronization between eureka servers,
* it may take longer to take effect.
* </p>
*
* @author: liyang
* @create: 2020-05-23 12:01
*/
public class EurekaDataSource<T> extends AutoRefreshDataSource<String, T> {

private static final long DEFAULT_REFRESH_MS = 10000;

/**
* connect timeout: 3s
*/
private static final int DEFAULT_CONNECT_TIMEOUT_MS = 3000;

/**
* read timeout: 30s
*/
private static final int DEFAULT_READ_TIMEOUT_MS = 30000;


private int connectTimeoutMills;


private int readTimeoutMills;

/**
* eureka instance appid
*/
private String appId;
/**
* eureka instance id
*/
private String instanceId;

/**
* collect of eureka server urls
*/
private List<String> serviceUrls;

/**
* metadata key of the rule source
*/
private String ruleKey;


public EurekaDataSource(String appId, String instanceId, List<String> serviceUrls, String ruleKey,
Converter<String, T> configParser) {
this(appId, instanceId, serviceUrls, ruleKey, configParser, DEFAULT_REFRESH_MS, DEFAULT_CONNECT_TIMEOUT_MS, DEFAULT_READ_TIMEOUT_MS);
}


public EurekaDataSource(String appId, String instanceId, List<String> serviceUrls, String ruleKey,
Converter<String, T> configParser, long refreshMs, int connectTimeoutMills,
int readTimeoutMills) {
super(configParser, refreshMs);
AssertUtil.notNull(appId, "appId can't be null");
AssertUtil.notNull(instanceId, "instanceId can't be null");
AssertUtil.assertNotEmpty(serviceUrls, "serviceUrls can't be empty");
AssertUtil.notNull(ruleKey, "ruleKey can't be null");
AssertUtil.assertState(connectTimeoutMills > 0, "connectTimeoutMills must be greater than 0");
AssertUtil.assertState(readTimeoutMills > 0, "readTimeoutMills must be greater than 0");

this.appId = appId;
this.instanceId = instanceId;
this.serviceUrls = ensureEndWithSlash(serviceUrls);
AssertUtil.assertNotEmpty(this.serviceUrls, "No available service url");
this.ruleKey = ruleKey;
this.connectTimeoutMills = connectTimeoutMills;
this.readTimeoutMills = readTimeoutMills;
}


private List<String> ensureEndWithSlash(List<String> serviceUrls) {
List<String> newServiceUrls = new ArrayList<>();
for (String serviceUrl : serviceUrls) {
if (StringUtil.isBlank(serviceUrl)) {
continue;
}
if (!serviceUrl.endsWith("/")) {
serviceUrl = serviceUrl + "/";
}
newServiceUrls.add(serviceUrl);
}
return newServiceUrls;
}

@Override
public String readSource() throws Exception {
return fetchStringSourceFromEurekaMetadata(this.appId, this.instanceId, this.serviceUrls, ruleKey);
}


private String fetchStringSourceFromEurekaMetadata(String appId, String instanceId, List<String> serviceUrls,
String ruleKey) throws Exception {
List<String> shuffleUrls = new ArrayList<>(serviceUrls.size());
shuffleUrls.addAll(serviceUrls);
Collections.shuffle(shuffleUrls);
for (int i = 0; i < shuffleUrls.size(); i++) {
String serviceUrl = shuffleUrls.get(i) + String.format("apps/%s/%s", appId, instanceId);
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) new URL(serviceUrl).openConnection();
conn.addRequestProperty("Accept", "application/json;charset=utf-8");

conn.setConnectTimeout(connectTimeoutMills);
conn.setReadTimeout(readTimeoutMills);
conn.setRequestMethod("GET");
conn.setDoOutput(true);
conn.connect();
RecordLog.debug("[EurekaDataSource] Request from eureka server: " + serviceUrl);
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
String s = toString(conn.getInputStream());
String ruleString = JSON.parseObject(s)
.getJSONObject("instance")
.getJSONObject("metadata")
.getString(ruleKey);
return ruleString;
}
RecordLog.warn("[EurekaDataSource] Warn: retrying on another server if available " +
"due to response code: {}, response message: {}", conn.getResponseCode(), toString(conn.getErrorStream()));
} catch (Exception e) {
try {
if (conn != null) {
RecordLog.warn("[EurekaDataSource] Warn: failed to request " + conn.getURL() + " from "
+ InetAddress.getByName(conn.getURL().getHost()).getHostAddress(), e);
}
} catch (Exception e1) {
RecordLog.warn("[EurekaDataSource] Warn: failed to request ", e1);
//ignore
}
RecordLog.warn("[EurekaDataSource] Warn: failed to request,retrying on another server if available");

} finally {
if (conn != null) {
conn.disconnect();
}
}
}
throw new EurekaMetadataFetchException("Can't get any data");
}


public static class EurekaMetadataFetchException extends Exception {

public EurekaMetadataFetchException(String message) {
super(message);
}
}


private String toString(InputStream input) throws IOException {
if (input == null) {
return null;
}
InputStreamReader inputStreamReader = new InputStreamReader(input, "utf-8");
CharArrayWriter sw = new CharArrayWriter();
copy(inputStreamReader, sw);
return sw.toString();
}

private long copy(Reader input, Writer output) throws IOException {
char[] buffer = new char[1 << 12];
long count = 0;
for (int n = 0; (n = input.read(buffer)) >= 0; ) {
output.write(buffer, 0, n);
count += n;
}
return count;
}


}
Loading

0 comments on commit 95cf77b

Please sign in to comment.