Skip to content

Commit

Permalink
[fix](multi-catalog)fix paimon meta properties convert (#37249) (#37958)
Browse files Browse the repository at this point in the history
bp #37249

Co-authored-by: slothever <18522955+wsjz@users.noreply.github.com>
  • Loading branch information
morningman and wsjz authored Jul 16, 2024
1 parent b6e5281 commit ba66ff5
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
Expand Down Expand Up @@ -55,8 +57,11 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
PaimonProperties.WAREHOUSE
);

public PaimonExternalCatalog(long catalogId, String name, String comment) {
public PaimonExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props, String comment) {
super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.datasource.paimon;

import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
Expand All @@ -34,8 +33,7 @@ public class PaimonFileExternalCatalog extends PaimonExternalCatalog {

public PaimonFileExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props, String comment) {
super(catalogId, name, comment);
catalogProperty = new CatalogProperty(resource, props);
super(catalogId, name, resource, props, comment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.datasource.paimon;

import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.PaimonProperties;

Expand All @@ -37,8 +36,7 @@ public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {

public PaimonHMSExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props, String comment) {
super(catalogId, name, comment);
catalogProperty = new CatalogProperty(resource, props);
super(catalogId, name, resource, props, comment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.InitCatalogLog.Type;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
Expand Down Expand Up @@ -250,13 +249,14 @@ private static Map<String, String> convertToS3Properties(Map<String, String> pro
return s3Properties;
}

private static String checkRegion(String endpoint, String region, String regionKey) {
public static String checkRegion(String endpoint, String region, String regionKey) {
if (Strings.isNullOrEmpty(region)) {
region = S3Properties.getRegionOfEndpoint(endpoint);
}
if (Strings.isNullOrEmpty(region)) {
String errorMsg = String.format("Required property '%s' when region is not in endpoint.", regionKey);
Util.logAndThrowRuntimeException(LOG, errorMsg, new IllegalArgumentException(errorMsg));
String errorMsg = String.format("No '%s' info found, using SDK default region: us-east-1", regionKey);
LOG.warn(errorMsg);
return "us-east-1";
}
return region;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ private static Map<String, String> getBeAWSPropertiesFromS3(Map<String, String>
Map<String, String> beProperties = new HashMap<>();
String endpoint = properties.get(S3Properties.ENDPOINT);
beProperties.put(S3Properties.Env.ENDPOINT, endpoint);
String region = S3Properties.getRegionOfEndpoint(endpoint);
String region = PropertyConverter.checkRegion(endpoint, properties.get(S3Properties.Env.REGION),
S3Properties.Env.REGION);
beProperties.put(S3Properties.Env.REGION, properties.getOrDefault(S3Properties.REGION, region));
if (properties.containsKey(S3Properties.ACCESS_KEY)) {
beProperties.put(S3Properties.Env.ACCESS_KEY, properties.get(S3Properties.ACCESS_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

public class S3Properties extends BaseProperties {

Expand Down Expand Up @@ -76,6 +77,8 @@ public class S3Properties extends BaseProperties {
WebIdentityTokenCredentialsProvider.class.getName(),
IAMInstanceCredentialsProvider.class.getName());

private static final Pattern IPV4_PORT_PATTERN = Pattern.compile("((?:\\d{1,3}\\.){3}\\d{1,3}:\\d{1,5})");

public static Map<String, String> credentialToMap(CloudCredentialWithEndpoint credential) {
Map<String, String> resMap = new HashMap<>();
resMap.put(S3Properties.ENDPOINT, credential.getEndpoint());
Expand Down Expand Up @@ -125,11 +128,18 @@ public static CloudCredentialWithEndpoint getEnvironmentCredentialWithEndpoint(M
}
String endpoint = props.get(Env.ENDPOINT);
String region = props.getOrDefault(Env.REGION, S3Properties.getRegionOfEndpoint(endpoint));
props.putIfAbsent(Env.REGION, PropertyConverter.checkRegion(endpoint, region, Env.REGION));
return new CloudCredentialWithEndpoint(endpoint, region, credential);
}

public static String getRegionOfEndpoint(String endpoint) {
String[] endpointSplit = endpoint.split("\\.");
if (IPV4_PORT_PATTERN.matcher(endpoint).find()) {
// if endpoint contains '192.168.0.1:8999', return null region
return null;
}
String[] endpointSplit = endpoint.replace("http://", "")
.replace("https://", "")
.split("\\.");
if (endpointSplit.length < 2) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.DLFProperties;
import org.apache.doris.datasource.property.constants.GCSProperties;
import org.apache.doris.datasource.property.constants.GlueProperties;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.MinioProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
Expand Down Expand Up @@ -677,4 +679,103 @@ public void testS3PropertiesConvertor() {
Assertions.assertEquals("region", beProperties.get(S3Properties.Env.REGION));
Assertions.assertEquals("false", beProperties.get(PropertyConverter.USE_PATH_STYLE));
}

@Test
public void testMetaPropertiesConvertor() {
// test region parser
Assertions.assertNull(S3Properties.getRegionOfEndpoint("http://192.168.2.30:9099/com.region.test/dir"));
Assertions.assertEquals("cn-beijing",
S3Properties.getRegionOfEndpoint("http://dlf.cn-beijing.aliyuncs.com/com.region.test/dir"));
Assertions.assertEquals("oss-cn-beijing",
S3Properties.getRegionOfEndpoint("http://oss-cn-beijing.aliyuncs.com/com.region.test/dir"));
Assertions.assertEquals("us-east-1",
S3Properties.getRegionOfEndpoint("http://s3.us-east-1.amazonaws.com/com.region.test/dir"));

//1. dlf
Map<String, String> props = new HashMap<>();
// iceberg.catalog.type
props.put("type", "hms");
props.put("hive.metastore.type", "dlf");
props.put(DLFProperties.PROXY_MODE, "DLF_ONLY");
props.put(DLFProperties.ENDPOINT, "dlf.cn-beijing.aliyuncs.com");
props.put(DLFProperties.UID, "20239444");
props.put(DLFProperties.ACCESS_KEY, "akk");
props.put(DLFProperties.SECRET_KEY, "skk");
props.put(DLFProperties.REGION, "cn-beijing");
props.put(DLFProperties.ACCESS_PUBLIC, "false");
Map<String, String> res = PropertyConverter.convertToMetaProperties(new HashMap<>(props));
Assertions.assertEquals(25, res.size());
Assertions.assertEquals("akk", res.get(S3Properties.Env.ACCESS_KEY));
Assertions.assertEquals("skk", res.get(S3Properties.Env.SECRET_KEY));
Assertions.assertEquals("akk", res.get(DataLakeConfig.CATALOG_ACCESS_KEY_ID));
Assertions.assertEquals("skk", res.get(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET));
Assertions.assertEquals("dlf.cn-beijing.aliyuncs.com", res.get(DataLakeConfig.CATALOG_ENDPOINT));
Assertions.assertEquals("oss-cn-beijing-internal.aliyuncs.com", res.get(S3Properties.Env.ENDPOINT));
Assertions.assertEquals("cn-beijing", res.get(DataLakeConfig.CATALOG_REGION_ID));
Assertions.assertEquals("oss-cn-beijing", res.get(S3Properties.Env.REGION));

props.put(DLFProperties.ACCESS_PUBLIC, "true");
res = PropertyConverter.convertToMetaProperties(new HashMap<>(props));
Assertions.assertEquals(25, res.size());
Assertions.assertEquals("oss-cn-beijing.aliyuncs.com", res.get(S3Properties.Env.ENDPOINT));
Assertions.assertEquals("oss-cn-beijing", res.get(S3Properties.Env.REGION));

props.put(OssProperties.OSS_HDFS_ENABLED, "true");
res = PropertyConverter.convertToMetaProperties(new HashMap<>(props));
Assertions.assertEquals(28, res.size());
Assertions.assertEquals("com.aliyun.jindodata.oss.JindoOssFileSystem", res.get("fs.oss.impl"));
Assertions.assertEquals("com.aliyun.jindodata.oss.OSS", res.get("fs.AbstractFileSystem.oss.impl"));
Assertions.assertEquals("false", res.get(DataLakeConfig.CATALOG_CREATE_DEFAULT_DB));
Assertions.assertEquals("cn-beijing", res.get(S3Properties.Env.REGION));

// 2. glue
Map<String, String> props2 = new HashMap<>();
props2.put("hive.metastore.type", "glue");
props2.put("aws.glue.endpoint", "glue.us-east-1.amazonaws.com");
props2.put("aws.glue.access-key", "akk");
props2.put("aws.glue.secret-key", "skk");
props2.put("aws.region", "us-east-1");
res = PropertyConverter.convertToMetaProperties(props2);
Assertions.assertEquals(16, res.size());
Assertions.assertEquals("akk", res.get(S3Properties.Env.ACCESS_KEY));
Assertions.assertEquals("skk", res.get(S3Properties.Env.SECRET_KEY));
Assertions.assertEquals("s3.us-east-1.amazonaws.com", res.get(S3Properties.Env.ENDPOINT));
Assertions.assertEquals("us-east-1", res.get(S3Properties.Env.REGION));

Map<String, String> props3 = new HashMap<>();
props3.put("hive.metastore.type", "glue");
props3.put(GlueProperties.ENDPOINT, "glue.us-east-1.amazonaws.com");
props3.put(GlueProperties.ACCESS_KEY, "akk");
props3.put(GlueProperties.SECRET_KEY, "skk");
res = PropertyConverter.convertToMetaProperties(props3);
Assertions.assertEquals(16, res.size());
Assertions.assertEquals("akk", res.get(S3Properties.Env.ACCESS_KEY));
Assertions.assertEquals("skk", res.get(S3Properties.Env.SECRET_KEY));
Assertions.assertEquals("s3.us-east-1.amazonaws.com", res.get(S3Properties.Env.ENDPOINT));
Assertions.assertEquals("us-east-1", res.get(S3Properties.Env.REGION));

// 3. s3 env
Map<String, String> props4 = new HashMap<>();
props4.put("hive.metastore.type", "hms");
props4.put(S3Properties.Env.ENDPOINT, "s3.us-west-2.amazonaws.com");
props4.put(S3Properties.Env.ACCESS_KEY, "akk");
props4.put(S3Properties.Env.SECRET_KEY, "skk");
res = PropertyConverter.convertToMetaProperties(new HashMap<>(props4));
Assertions.assertEquals(9, res.size());
Assertions.assertEquals("akk", res.get(S3Properties.Env.ACCESS_KEY));
Assertions.assertEquals("skk", res.get(S3Properties.Env.SECRET_KEY));
Assertions.assertEquals("s3.us-west-2.amazonaws.com", res.get(S3Properties.Env.ENDPOINT));
Assertions.assertEquals("us-west-2", res.get(S3Properties.Env.REGION));

props4.put(S3Properties.Env.ENDPOINT, "http://172.23.56.19:9033");
res = PropertyConverter.convertToMetaProperties(new HashMap<>(props4));
Assertions.assertEquals(9, res.size());
Assertions.assertEquals("http://172.23.56.19:9033", res.get(S3Properties.Env.ENDPOINT));
Assertions.assertEquals("us-east-1", res.get(S3Properties.Env.REGION));

props4.put(S3Properties.Env.REGION, "north");
res = PropertyConverter.convertToMetaProperties(new HashMap<>(props4));
Assertions.assertEquals(9, res.size());
Assertions.assertEquals("north", res.get(S3Properties.Env.REGION));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !no_region1 --
2024-01-02T10:04:05.100 2024-01-02T02:04:05.123456

-- !no_region2 --
1 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T02:04:05.100 2024-01-02T02:04:05.120 2024-01-02T02:04:05.123 2024-01-02T02:04:05.123400 2024-01-02T02:04:05.123450 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456

-- !region1 --
2024-01-02T10:04:05.100 2024-01-02T02:04:05.123456

-- !region2 --
1 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T02:04:05.100 2024-01-02T02:04:05.120 2024-01-02T02:04:05.123 2024-01-02T02:04:05.123400 2024-01-02T02:04:05.123450 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_paimon_minio", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String catalog_name = "test_paimon_minio"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String table_name = "ts_scale_orc"

sql """drop catalog if exists ${catalog_name}"""

sql """
CREATE CATALOG ${catalog_name} PROPERTIES (
'type' = 'paimon',
'warehouse' = 's3://warehouse/wh',
's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
's3.access_key' = 'admin',
's3.secret_key' = 'password',
's3.path.style.access' = 'true'
);
"""
sql """switch `${catalog_name}`"""
sql """show databases; """
sql """use `${catalog_name}`.`flink_paimon`"""
order_qt_no_region1 """select ts1,ts19 from ${table_name} """
order_qt_no_region2 """select * from ${table_name} """
sql """drop catalog if exists ${catalog_name}"""

sql """drop catalog if exists ${catalog_name}_with_region"""
sql """
CREATE CATALOG ${catalog_name}_with_region PROPERTIES (
'type' = 'paimon',
'warehouse' = 's3://warehouse/wh',
's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
's3.access_key' = 'admin',
's3.secret_key' = 'password',
's3.region' = 'us-west-2',
's3.path.style.access' = 'true'
);
"""
sql """switch `${catalog_name}_with_region`"""
sql """show databases; """
sql """use `${catalog_name}_with_region`.`flink_paimon`"""
order_qt_region1 """select ts1,ts19 from ${table_name} """
order_qt_region2 """select * from ${table_name} """
sql """drop catalog if exists ${catalog_name}_with_region"""
}
}


0 comments on commit ba66ff5

Please sign in to comment.