diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index 7dc38091df3d4f..87131a76476b4b 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -60,6 +60,9 @@ ESScanReader::ESScanReader(const std::string& target, if (props.find(KEY_QUERY) != props.end()) { _query = props.at(KEY_QUERY); } + if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) { + std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client; + } std::string batch_size_str = props.at(KEY_BATCH_SIZE); _batch_size = atoi(batch_size_str.c_str()); @@ -103,6 +106,9 @@ Status ESScanReader::open() { } _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); + if (_use_ssl_client) { + _network_client.use_untrusted_ssl(); + } // phase open, we cached the first response for `get_next` phase Status status = _network_client.execute_post_request(_query, &_cached_response); if (!status.ok() || _network_client.get_http_status() != 200) { @@ -134,6 +140,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr& scr _network_client.set_basic_auth(_user_name, _passwd); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(_http_timeout_ms); + if (_use_ssl_client) { + _network_client.use_untrusted_ssl(); + } RETURN_IF_ERROR(_network_client.execute_post_request( ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive), &response)); @@ -188,6 +197,9 @@ Status ESScanReader::close() { _network_client.set_method(DELETE); _network_client.set_content_type("application/json"); _network_client.set_timeout_ms(5 * 1000); + if (_use_ssl_client) { + _network_client.use_untrusted_ssl(); + } std::string response; RETURN_IF_ERROR(_network_client.execute_delete_request( ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response)); diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index 6a1f9d4a41066c..f162dc3bca36be 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -40,6 +40,7 @@ class ESScanReader { static constexpr const char* KEY_BATCH_SIZE = "batch_size"; static constexpr const char* KEY_TERMINATE_AFTER = "limit"; static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode"; + static constexpr const char* KEY_HTTP_SSL_ENABLED = "http_ssl_enabled"; ESScanReader(const std::string& target, const std::map& props, bool doc_value_mode); ~ESScanReader(); @@ -63,6 +64,8 @@ class ESScanReader { std::string _query; // Elasticsearch shards to fetch document std::string _shards; + // whether use ssl client + bool _use_ssl_client = false; // distinguish the first scroll phase and the following scroll bool _is_first; diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index cdbe1435c8cd93..5fec678345383f 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -66,6 +66,12 @@ class HttpClient { curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str()); } + // Currently, only fake SSL configurations are supported + void use_untrusted_ssl() { + curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L); + } + // TODO(zc): support set header // void set_header(const std::string& key, const std::string& value) { // _cntl.http_request().SetHeader(key, value); diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index 3a7209a16745a1..9be1bedb67f219 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -328,6 +328,63 @@ This term does not match any term in the dictionary, and will not return any res The type of `k4.keyword` is `keyword`, and writing data into ES is a complete term, so it can be matched +### Enable node discovery mechanism, default is true(es\_nodes\_discovery=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"nodes_discovery" = "true" +); +``` + +Parameter Description: + +Parameter | Description +---|--- +**es\_nodes\_discovery** | Whether or not to enable ES node discovery. the default is true + +Doris would find all available related data nodes (shards allocated on)from ES when this is true. Just set false if address of ES data nodes are not accessed by Doris BE, eg. the ES cluster is deployed in the intranet which isolated from your public Internet, and users access through a proxy + +### Whether ES cluster enables https access mode, if enabled should set value with`true`, default is false(http\_ssl\_enable=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"http_ssl_enabled" = "true" +); +``` + +Parameter Description: + +Parameter | Description +---|--- +**http\_ssl\_enabled** | Whether ES cluster enables https access mode + +The current FE/BE implementation is to trust all, this is a temporary solution, and the real user configuration certificate will be used later ### Query usage diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index a1c314829e5ce0..4a046f08f83f8a 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -325,6 +325,64 @@ POST /_analyze `k4.keyword` 的类型是`keyword`,数据写入ES中是一个完整的term,所以可以匹配 +### 开启节点自动发现, 默认为true(es\_nodes\_discovery=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"nodes_discovery" = "true" +); +``` + +参数说明: + +参数 | 说明 +---|--- +**es\_nodes\_discovery** | 是否开启es节点发现,默认为true + +当配置为true时,Doris将从ES找到所有可用的相关数据节点(在上面分配的分片)。如果ES数据节点的地址没有被Doris BE访问,则设置为false。ES集群部署在与公共Internet隔离的内网,用户通过代理访问 + +### ES集群是否开启https访问模式,如果开启应设置为`true`,默认为false(http\_ssl\_enabled=true) + +``` +CREATE EXTERNAL TABLE `test` ( + `k1` bigint(20) COMMENT "", + `k2` datetime COMMENT "", + `k3` varchar(20) COMMENT "", + `k4` varchar(100) COMMENT "", + `k5` float COMMENT "" +) ENGINE=ELASTICSEARCH +PROPERTIES ( +"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", +"index" = "test”, +"type" = "doc", +"user" = "root", +"password" = "root", + +"http_ssl_enabled" = "true" +); +``` + +参数说明: + +参数 | 说明 +---|--- +**http\_ssl\_enabled** | ES集群是否开启https访问模式 + +目前会fe/be实现方式为信任所有,这是临时解决方案,后续会使用真实的用户配置证书 + ### 查询用法 完成在Doris中建立ES外表后,除了无法使用Doris中的数据模型(rollup、预聚合、物化视图等)外并无区别 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index cd54710f846062..a4b5b626bdf81c 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4150,6 +4150,8 @@ public static void getDdlStmt(String dbName, Table table, List createTab sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n"); sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); + sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\"\n"); + sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n"); sb.append(")"); } else if (table.getType() == TableType.HIVE) { HiveTable hiveTable = (HiveTable) table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index 8cae113f26a073..cbd2b7786d988c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -24,6 +24,7 @@ import org.apache.doris.external.elasticsearch.EsMetaStateTracker; import org.apache.doris.external.elasticsearch.EsRestClient; import org.apache.doris.external.elasticsearch.EsTablePartitions; +import org.apache.doris.external.elasticsearch.EsUtil; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -63,6 +64,8 @@ public class EsTable extends Table { public static final String DOC_VALUE_SCAN = "enable_docvalue_scan"; public static final String KEYWORD_SNIFF = "enable_keyword_sniff"; public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields"; + public static final String NODES_DISCOVERY = "nodes_discovery"; + public static final String HTTP_SSL_ENABLED = "http_ssl_enabled"; private String hosts; private String[] seeds; @@ -87,6 +90,10 @@ public class EsTable extends Table { // would downgrade to extract value from `stored_fields` private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; + private boolean nodesDiscovery = true; + + private boolean httpSslEnabled = false; + // Solr doc_values vs stored_fields performance-smackdown indicate: // It is possible to notice that retrieving an high number of fields leads // to a sensible worsening of performance if DocValues are used. @@ -138,6 +145,13 @@ public boolean isKeywordSniffEnable() { return enableKeywordSniff; } + public boolean isNodesDiscovery() { + return nodesDiscovery; + } + + public boolean isHttpSslEnabled() { + return httpSslEnabled; + } private void validate(Map properties) throws DdlException { if (properties == null) { @@ -185,36 +199,40 @@ private void validate(Map properties) throws DdlException { // enable doc value scan for Elasticsearch if (properties.containsKey(DOC_VALUE_SCAN)) { - try { - enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim()); - } catch (Exception e) { - throw new DdlException("fail to parse enable_docvalue_scan, enable_docvalue_scan= " - + properties.get(VERSION).trim() + " ,`enable_docvalue_scan`" - + " should be like 'true' or 'false', value should be double quotation marks"); - } + enableDocValueScan = EsUtil.getBoolean(properties, DOC_VALUE_SCAN); } if (properties.containsKey(KEYWORD_SNIFF)) { - try { - enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim()); - } catch (Exception e) { - throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= " - + properties.get(VERSION).trim() + " ,`enable_keyword_sniff`" - + " should be like 'true' or 'false', value should be double quotation marks"); + enableKeywordSniff = EsUtil.getBoolean(properties, KEYWORD_SNIFF); + } + + if (properties.containsKey(NODES_DISCOVERY)) { + nodesDiscovery = EsUtil.getBoolean(properties, NODES_DISCOVERY); + } + + if (properties.containsKey(HTTP_SSL_ENABLED)) { + httpSslEnabled = EsUtil.getBoolean(properties, HTTP_SSL_ENABLED); + // check protocol + for (String seed : seeds) { + if (httpSslEnabled && seed.startsWith("http://")) { + throw new DdlException("if http_ssl_enabled is true, the https protocol must be used"); + } + if (!httpSslEnabled && seed.startsWith("https://")) { + throw new DdlException("if http_ssl_enabled is false, the http protocol must be used"); + } } - } else { - enableKeywordSniff = true; } if (!Strings.isNullOrEmpty(properties.get(TYPE)) && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { mappingType = properties.get(TYPE).trim(); } + if (!Strings.isNullOrEmpty(properties.get(TRANSPORT)) && !Strings.isNullOrEmpty(properties.get(TRANSPORT).trim())) { transport = properties.get(TRANSPORT).trim(); if (!(TRANSPORT_HTTP.equals(transport) || TRANSPORT_THRIFT.equals(transport))) { - throw new DdlException("transport of ES table must be http(recommend) or thrift(reserved inner usage)," + throw new DdlException("transport of ES table must be http/https(recommend) or thrift(reserved inner usage)," + " but value is " + transport); } } @@ -241,6 +259,8 @@ private void validate(Map properties) throws DdlException { tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan)); tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff)); tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields)); + tableContext.put(NODES_DISCOVERY, String.valueOf(nodesDiscovery)); + tableContext.put(HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled)); } public TTableDescriptor toThrift() { @@ -323,7 +343,16 @@ public void readFields(DataInput in) throws IOException { maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; } } - + if (tableContext.containsKey(NODES_DISCOVERY)) { + nodesDiscovery = Boolean.parseBoolean(tableContext.get(NODES_DISCOVERY)); + } else { + nodesDiscovery = true; + } + if (tableContext.containsKey(HTTP_SSL_ENABLED)) { + httpSslEnabled = Boolean.parseBoolean(tableContext.get(HTTP_SSL_ENABLED)); + } else { + httpSslEnabled = false; + } PartitionType partType = PartitionType.valueOf(Text.readString(in)); if (partType == PartitionType.UNPARTITIONED) { partitionInfo = SinglePartitionInfo.read(in); @@ -357,6 +386,8 @@ public void readFields(DataInput in) throws IOException { tableContext.put("transport", transport); tableContext.put("enableDocValueScan", "false"); tableContext.put(KEYWORD_SNIFF, "true"); + tableContext.put(NODES_DISCOVERY, "true"); + tableContext.put(HTTP_SSL_ENABLED, "false"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index 73d8daf5b31242..31bfc5074a3d0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -19,6 +19,9 @@ import org.apache.doris.thrift.TNetworkAddress; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; @@ -38,7 +41,9 @@ public class EsNodeInfo { private boolean hasThrift; private TNetworkAddress thriftAddress; - public EsNodeInfo(String id, Map map) throws DorisEsException { + private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class); + + public EsNodeInfo(String id, Map map, boolean httpSslEnabled) { this.id = id; EsMajorVersion version = EsMajorVersion.parse((String) map.get("version")); this.name = (String) map.get("name"); @@ -66,7 +71,7 @@ public EsNodeInfo(String id, Map map) throws DorisEsException { String address = (String) httpMap.get("publish_address"); if (address != null) { String[] scratch = address.split(":"); - this.publishAddress = new TNetworkAddress(scratch[0], Integer.valueOf(scratch[1])); + this.publishAddress = new TNetworkAddress((httpSslEnabled ? "https://" : "") + scratch[0], Integer.parseInt(scratch[1])); this.hasHttp = true; } else { this.publishAddress = null; @@ -96,6 +101,24 @@ public EsNodeInfo(String id, Map map) throws DorisEsException { } } + public EsNodeInfo(String id, String seed) { + this.id = id; + String[] scratch = seed.split(":"); + int port = 80; + if (scratch.length == 3) { + port = Integer.parseInt(scratch[2]); + } + String remoteHost = scratch[0] + ":" + scratch[1]; + this.name = remoteHost; + this.host = remoteHost; + this.ip = remoteHost; + this.isClient = true; + this.isData = true; + this.isIngest = true; + this.publishAddress = new TNetworkAddress(remoteHost, port); + this.hasHttp = true; + } + public boolean hasHttp() { return hasHttp; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java index 98e895e61ca500..2d94ee8df10551 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java @@ -59,7 +59,7 @@ public void registerTable(EsTable esTable) { } esTables.put(esTable.getId(), esTable); esClients.put(esTable.getId(), - new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd())); + new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isHttpSslEnabled())); LOG.info("register a new table [{}] to sync list", esTable); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index f2868aa5eedc34..7e1983a625e180 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -27,11 +27,20 @@ import org.codehaus.jackson.map.SerializationConfig; import java.io.IOException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -51,13 +60,16 @@ public class EsRestClient { private static OkHttpClient networkClient = new OkHttpClient.Builder() .readTimeout(10, TimeUnit.SECONDS) .build(); + + private static OkHttpClient sslNetworkClient; private Request.Builder builder; private String[] nodes; private String currentNode; private int currentNodeIndex = 0; + private boolean httpSslEnable; - public EsRestClient(String[] nodes, String authUser, String authPassword) { + public EsRestClient(String[] nodes, String authUser, String authPassword, boolean httpSslEnable) { this.nodes = nodes; this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { @@ -65,6 +77,7 @@ public EsRestClient(String[] nodes, String authUser, String authPassword) { Credentials.basic(authUser, authPassword)); } this.currentNode = nodes[currentNodeIndex]; + this.httpSslEnable = httpSslEnable; } private void selectNextNode() { @@ -83,7 +96,7 @@ public Map getHttpNodes() throws DorisEsException { } Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), httpSslEnable); if (node.hasHttp()) { nodesMap.put(node.getId(), node); } @@ -141,6 +154,20 @@ public EsShardPartitions searchShards(String indexName) throws DorisEsException } return EsShardPartitions.findShardPartitions(indexName, searchShards); } + + /** + * init ssl networkClient use lazy way + **/ + private synchronized OkHttpClient getOrCreateSslNetworkClient() { + if (sslNetworkClient == null) { + sslNetworkClient = new OkHttpClient.Builder() + .readTimeout(10, TimeUnit.SECONDS) + .sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts()) + .hostnameVerifier(new TrustAllHostnameVerifier()) + .build(); + } + return sslNetworkClient; + } /** * execute request for specific path,it will try again nodes.length times if it fails @@ -151,6 +178,12 @@ public EsShardPartitions searchShards(String indexName) throws DorisEsException private String execute(String path) throws DorisEsException { int retrySize = nodes.length; DorisEsException scratchExceptionForThrow = null; + OkHttpClient httpClient; + if (httpSslEnable) { + httpClient = getOrCreateSslNetworkClient(); + } else { + httpClient = networkClient; + } for (int i = 0; i < retrySize; i++) { // maybe should add HTTP schema to the address // actually, at this time we can only process http protocol @@ -170,7 +203,7 @@ private String execute(String path) throws DorisEsException { LOG.trace("es rest client request URL: {}", currentNode + "/" + path); } try { - response = networkClient.newCall(request).execute(); + response = httpClient.newCall(request).execute(); if (response.isSuccessful()) { return response.body().string(); } @@ -207,4 +240,33 @@ private T parseContent(String response, String key) { } return (T) (key != null ? map.get(key) : map); } + + /** + * support https + **/ + private static class TrustAllCerts implements X509TrustManager { + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + + public X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];} + } + + private static class TrustAllHostnameVerifier implements HostnameVerifier { + public boolean verify(String hostname, SSLSession session) { + return true; + } + } + + private static SSLSocketFactory createSSLSocketFactory() { + SSLSocketFactory ssfFactory; + try { + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom()); + ssfFactory = sc.getSocketFactory(); + } catch (Exception e) { + throw new DorisEsException("Errors happens when create ssl socket"); + } + return ssfFactory; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java index 05c4fe08b55347..d6bc5e3d1ca527 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.common.DdlException; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,7 +35,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Random; /** * save the dynamic info parsed from es cluster state such as shard routing, partition info @@ -109,22 +107,6 @@ public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPart } return esTablePartitions; } - - public void addHttpAddress(Map nodesInfo) { - for (EsShardPartitions indexState : partitionedIndexStates.values()) { - indexState.addHttpAddress(nodesInfo); - } - for (EsShardPartitions indexState : unPartitionedIndexStates.values()) { - indexState.addHttpAddress(nodesInfo); - } - - } - - public TNetworkAddress randomAddress(Map nodesInfo) { - int seed = new Random().nextInt() % nodesInfo.size(); - EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); - return nodeInfos[seed].getPublishAddress(); - } public PartitionInfo getPartitionInfo() { return partitionInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index d25a01fe7898ae..454fb5fbc9706b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -21,8 +21,12 @@ import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; + import org.json.JSONObject; +import java.util.Map; + public class EsUtil { public static void analyzePartitionAndDistributionDesc(PartitionDesc partitionDesc, @@ -82,4 +86,13 @@ public static JSONObject getJsonObject(JSONObject jsonObject, String key, int fr return null; } } + + public static boolean getBoolean(Map properties, String name) throws DdlException { + String property = properties.get(name).trim(); + try { + return Boolean.parseBoolean(property); + } catch (Exception e) { + throw new DdlException(String.format("fail to parse %s, %s = %s, `%s` should be like 'true' or 'false', value should be double quotation marks", name, name, property, name)); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index de1bb76d60667f..bb5d416f56cb08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.EsTable; +import java.util.HashMap; import java.util.Map; /** @@ -37,7 +38,15 @@ public PartitionPhase(EsRestClient client) { @Override public void execute(SearchContext context) throws DorisEsException { shardPartitions = client.searchShards(context.sourceIndex()); - nodesInfo = client.getHttpNodes(); + if (context.nodesDiscovery()) { + nodesInfo = client.getHttpNodes(); + } else { + nodesInfo = new HashMap<>(); + String[] seeds = context.esTable().getSeeds(); + for (int i = 0; i < seeds.length; i++) { + nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i])); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java index 06b4c7dde8f12a..3e9e03dc9f2b50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java @@ -84,12 +84,16 @@ public class SearchContext { // the ES cluster version private EsMajorVersion version; + // whether the nodes needs to be discovered + private boolean nodesDiscovery; + public SearchContext(EsTable table) { this.table = table; fullSchema = table.getFullSchema(); sourceIndex = table.getIndexName(); type = table.getMappingType(); + nodesDiscovery = table.isNodesDiscovery(); } @@ -142,4 +146,8 @@ public EsShardPartitions partitions() { public EsTablePartitions tablePartitions() throws Exception { return EsTablePartitions.fromShardPartitions(table, shardPartitions); } + + public boolean nodesDiscovery() { + return nodesDiscovery; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index d075946a3d781e..e662de08931f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -148,6 +148,7 @@ protected void toThrift(TPlanNode msg) { Map properties = Maps.newHashMap(); properties.put(EsTable.USER, table.getUserName()); properties.put(EsTable.PASSWORD, table.getPasswd()); + properties.put(EsTable.HTTP_SSL_ENABLED, String.valueOf(table.isHttpSslEnabled())); TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt()); esScanNode.setProperties(properties); if (table.isDocValueScanEnable()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java index aa82dff94e34e6..ae858017ed23d8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/PartitionPhaseTest.java @@ -50,7 +50,7 @@ public void testWorkFlow(@Injectable EsRestClient client) throws Exception { Map> nodesData = (Map>) mapper.readValue(jsonParser, Map.class).get("nodes"); Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { - EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); + EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), false); if (node.hasHttp()) { nodesMap.put(node.getId(), node); }