Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doris On ES][WIP] Support external ES table with SSL secured and configurable node sniffing #5325

Merged
merged 13 commits into from
Apr 12, 2021
12 changes: 12 additions & 0 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ ESScanReader::ESScanReader(const std::string& target,
if (props.find(KEY_QUERY) != props.end()) {
_query = props.at(KEY_QUERY);
}
if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) {
std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client;
}

std::string batch_size_str = props.at(KEY_BATCH_SIZE);
_batch_size = atoi(batch_size_str.c_str());
Expand Down Expand Up @@ -103,6 +106,9 @@ Status ESScanReader::open() {
}
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
if (_use_ssl_client) {
_network_client.use_untrusted_ssl();
}
// phase open, we cached the first response for `get_next` phase
Status status = _network_client.execute_post_request(_query, &_cached_response);
if (!status.ok() || _network_client.get_http_status() != 200) {
Expand Down Expand Up @@ -134,6 +140,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(_http_timeout_ms);
if (_use_ssl_client) {
_network_client.use_untrusted_ssl();
}
RETURN_IF_ERROR(_network_client.execute_post_request(
ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive),
&response));
Expand Down Expand Up @@ -188,6 +197,9 @@ Status ESScanReader::close() {
_network_client.set_method(DELETE);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(5 * 1000);
if (_use_ssl_client) {
_network_client.use_untrusted_ssl();
}
std::string response;
RETURN_IF_ERROR(_network_client.execute_delete_request(
ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/es/es_scan_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ESScanReader {
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
static constexpr const char* KEY_TERMINATE_AFTER = "limit";
static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode";
static constexpr const char* KEY_HTTP_SSL_ENABLED = "http_ssl_enabled";
ESScanReader(const std::string& target, const std::map<std::string, std::string>& props,
bool doc_value_mode);
~ESScanReader();
Expand All @@ -63,6 +64,8 @@ class ESScanReader {
std::string _query;
// Elasticsearch shards to fetch document
std::string _shards;
// whether use ssl client
bool _use_ssl_client = false;
// distinguish the first scroll phase and the following scroll
bool _is_first;

Expand Down
6 changes: 6 additions & 0 deletions be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class HttpClient {
curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str());
}

// Currently, only fake SSL configurations are supported
void use_untrusted_ssl() {
curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L);
}

// TODO(zc): support set header
// void set_header(const std::string& key, const std::string& value) {
// _cntl.http_request().SetHeader(key, value);
Expand Down
57 changes: 57 additions & 0 deletions docs/en/extending-doris/doris-on-es.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,63 @@ This term does not match any term in the dictionary, and will not return any res

The type of `k4.keyword` is `keyword`, and writing data into ES is a complete term, so it can be matched

### Enable node discovery mechanism, default is true(es\_nodes\_discovery=true)

```
CREATE EXTERNAL TABLE `test` (
`k1` bigint(20) COMMENT "",
`k2` datetime COMMENT "",
`k3` varchar(20) COMMENT "",
`k4` varchar(100) COMMENT "",
`k5` float COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
"index" = "test”,
"type" = "doc",
"user" = "root",
"password" = "root",

"nodes_discovery" = "true"
);
```

Parameter Description:

Parameter | Description
---|---
**es\_nodes\_discovery** | Whether or not to enable ES node discovery. the default is true

Doris would find all available related data nodes (shards allocated on)from ES when this is true. Just set false if address of ES data nodes are not accessed by Doris BE, eg. the ES cluster is deployed in the intranet which isolated from your public Internet, and users access through a proxy

### Whether ES cluster enables https access mode, if enabled should set value with`true`, default is false(http\_ssl\_enable=true)

```
CREATE EXTERNAL TABLE `test` (
`k1` bigint(20) COMMENT "",
`k2` datetime COMMENT "",
`k3` varchar(20) COMMENT "",
`k4` varchar(100) COMMENT "",
`k5` float COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
"index" = "test”,
"type" = "doc",
"user" = "root",
"password" = "root",

"http_ssl_enabled" = "true"
);
```

Parameter Description:

Parameter | Description
---|---
**http\_ssl\_enabled** | Whether ES cluster enables https access mode

The current FE/BE implementation is to trust all, this is a temporary solution, and the real user configuration certificate will be used later

### Query usage

Expand Down
58 changes: 58 additions & 0 deletions docs/zh-CN/extending-doris/doris-on-es.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,64 @@ POST /_analyze

`k4.keyword` 的类型是`keyword`,数据写入ES中是一个完整的term,所以可以匹配

### 开启节点自动发现, 默认为true(es\_nodes\_discovery=true)

```
CREATE EXTERNAL TABLE `test` (
`k1` bigint(20) COMMENT "",
`k2` datetime COMMENT "",
`k3` varchar(20) COMMENT "",
`k4` varchar(100) COMMENT "",
`k5` float COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
"index" = "test”,
"type" = "doc",
"user" = "root",
"password" = "root",

"nodes_discovery" = "true"
);
```

参数说明:

参数 | 说明
---|---
**es\_nodes\_discovery** | 是否开启es节点发现,默认为true

当配置为true时,Doris将从ES找到所有可用的相关数据节点(在上面分配的分片)。如果ES数据节点的地址没有被Doris BE访问,则设置为false。ES集群部署在与公共Internet隔离的内网,用户通过代理访问

### ES集群是否开启https访问模式,如果开启应设置为`true`,默认为false(http\_ssl\_enabled=true)

```
CREATE EXTERNAL TABLE `test` (
`k1` bigint(20) COMMENT "",
`k2` datetime COMMENT "",
`k3` varchar(20) COMMENT "",
`k4` varchar(100) COMMENT "",
`k5` float COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
"index" = "test”,
"type" = "doc",
"user" = "root",
"password" = "root",

"http_ssl_enabled" = "true"
);
```

参数说明:

参数 | 说明
---|---
**http\_ssl\_enabled** | ES集群是否开启https访问模式

目前会fe/be实现方式为信任所有,这是临时解决方案,后续会使用真实的用户配置证书

### 查询用法

完成在Doris中建立ES外表后,除了无法使用Doris中的数据模型(rollup、预聚合、物化视图等)外并无区别
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4150,6 +4150,8 @@ public static void getDdlStmt(String dbName, Table table, List<String> createTab
sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\"\n");
sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n");
sb.append(")");
} else if (table.getType() == TableType.HIVE) {
HiveTable hiveTable = (HiveTable) table;
Expand Down
65 changes: 48 additions & 17 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsTablePartitions;
import org.apache.doris.external.elasticsearch.EsUtil;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
Expand Down Expand Up @@ -63,6 +64,8 @@ public class EsTable extends Table {
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
public static final String NODES_DISCOVERY = "nodes_discovery";
public static final String HTTP_SSL_ENABLED = "http_ssl_enabled";

private String hosts;
private String[] seeds;
Expand All @@ -87,6 +90,10 @@ public class EsTable extends Table {
// would downgrade to extract value from `stored_fields`
private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;

private boolean nodesDiscovery = true;

private boolean httpSslEnabled = false;

// Solr doc_values vs stored_fields performance-smackdown indicate:
// It is possible to notice that retrieving an high number of fields leads
// to a sensible worsening of performance if DocValues are used.
Expand Down Expand Up @@ -138,6 +145,13 @@ public boolean isKeywordSniffEnable() {
return enableKeywordSniff;
}

public boolean isNodesDiscovery() {
return nodesDiscovery;
}

public boolean isHttpSslEnabled() {
return httpSslEnabled;
}

private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
Expand Down Expand Up @@ -185,36 +199,40 @@ private void validate(Map<String, String> properties) throws DdlException {

// enable doc value scan for Elasticsearch
if (properties.containsKey(DOC_VALUE_SCAN)) {
try {
enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim());
} catch (Exception e) {
throw new DdlException("fail to parse enable_docvalue_scan, enable_docvalue_scan= "
+ properties.get(VERSION).trim() + " ,`enable_docvalue_scan`"
+ " should be like 'true' or 'false', value should be double quotation marks");
}
enableDocValueScan = EsUtil.getBoolean(properties, DOC_VALUE_SCAN);
}

if (properties.containsKey(KEYWORD_SNIFF)) {
try {
enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim());
} catch (Exception e) {
throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= "
+ properties.get(VERSION).trim() + " ,`enable_keyword_sniff`"
+ " should be like 'true' or 'false', value should be double quotation marks");
enableKeywordSniff = EsUtil.getBoolean(properties, KEYWORD_SNIFF);
}

if (properties.containsKey(NODES_DISCOVERY)) {
nodesDiscovery = EsUtil.getBoolean(properties, NODES_DISCOVERY);
}

if (properties.containsKey(HTTP_SSL_ENABLED)) {
httpSslEnabled = EsUtil.getBoolean(properties, HTTP_SSL_ENABLED);
// check protocol
for (String seed : seeds) {
if (httpSslEnabled && seed.startsWith("http://")) {
throw new DdlException("if http_ssl_enabled is true, the https protocol must be used");
}
if (!httpSslEnabled && seed.startsWith("https://")) {
throw new DdlException("if http_ssl_enabled is false, the http protocol must be used");
}
}
} else {
enableKeywordSniff = true;
}

if (!Strings.isNullOrEmpty(properties.get(TYPE))
&& !Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
mappingType = properties.get(TYPE).trim();
}

if (!Strings.isNullOrEmpty(properties.get(TRANSPORT))
&& !Strings.isNullOrEmpty(properties.get(TRANSPORT).trim())) {
transport = properties.get(TRANSPORT).trim();
if (!(TRANSPORT_HTTP.equals(transport) || TRANSPORT_THRIFT.equals(transport))) {
throw new DdlException("transport of ES table must be http(recommend) or thrift(reserved inner usage),"
throw new DdlException("transport of ES table must be http/https(recommend) or thrift(reserved inner usage),"
+ " but value is " + transport);
}
}
Expand All @@ -241,6 +259,8 @@ private void validate(Map<String, String> properties) throws DdlException {
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
tableContext.put(NODES_DISCOVERY, String.valueOf(nodesDiscovery));
tableContext.put(HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled));
}

public TTableDescriptor toThrift() {
Expand Down Expand Up @@ -323,7 +343,16 @@ public void readFields(DataInput in) throws IOException {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}

if (tableContext.containsKey(NODES_DISCOVERY)) {
nodesDiscovery = Boolean.parseBoolean(tableContext.get(NODES_DISCOVERY));
} else {
nodesDiscovery = true;
}
if (tableContext.containsKey(HTTP_SSL_ENABLED)) {
httpSslEnabled = Boolean.parseBoolean(tableContext.get(HTTP_SSL_ENABLED));
} else {
httpSslEnabled = false;
}
PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
partitionInfo = SinglePartitionInfo.read(in);
Expand Down Expand Up @@ -357,6 +386,8 @@ public void readFields(DataInput in) throws IOException {
tableContext.put("transport", transport);
tableContext.put("enableDocValueScan", "false");
tableContext.put(KEYWORD_SNIFF, "true");
tableContext.put(NODES_DISCOVERY, "true");
tableContext.put(HTTP_SSL_ENABLED, "false");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.doris.thrift.TNetworkAddress;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

Expand All @@ -38,7 +41,9 @@ public class EsNodeInfo {
private boolean hasThrift;
private TNetworkAddress thriftAddress;

public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class);

public EsNodeInfo(String id, Map<String, Object> map, boolean httpSslEnabled) {
this.id = id;
EsMajorVersion version = EsMajorVersion.parse((String) map.get("version"));
this.name = (String) map.get("name");
Expand Down Expand Up @@ -66,7 +71,7 @@ public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
String address = (String) httpMap.get("publish_address");
if (address != null) {
String[] scratch = address.split(":");
this.publishAddress = new TNetworkAddress(scratch[0], Integer.valueOf(scratch[1]));
this.publishAddress = new TNetworkAddress((httpSslEnabled ? "https://" : "") + scratch[0], Integer.parseInt(scratch[1]));
this.hasHttp = true;
} else {
this.publishAddress = null;
Expand Down Expand Up @@ -96,6 +101,24 @@ public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
}
}

public EsNodeInfo(String id, String seed) {
this.id = id;
String[] scratch = seed.split(":");
int port = 80;
if (scratch.length == 3) {
port = Integer.parseInt(scratch[2]);
}
String remoteHost = scratch[0] + ":" + scratch[1];
this.name = remoteHost;
this.host = remoteHost;
this.ip = remoteHost;
this.isClient = true;
this.isData = true;
this.isIngest = true;
this.publishAddress = new TNetworkAddress(remoteHost, port);
this.hasHttp = true;
}

public boolean hasHttp() {
return hasHttp;
}
Expand Down
Loading