Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Iceberg] Support Iceberg Kerberos #7246

Merged
merged 12 commits into from
Aug 14, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.kerberos;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class KerberosConfig {

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("When use kerberos, we should set kerberos user principal");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf'");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("When using kerberos, We should specify the keytab path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
WRITE_SEATUNNEL_ROW_ERROR(
"COMMON-23",
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),

SQL_TEMPLATE_HANDLED_ERROR(
"COMMON-24",
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),

VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."),
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."),
CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
Expand All @@ -78,6 +76,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"COMMON-33",
"The datetime format '<datetime>' of field '<field>' is not supported. Please check the datetime format."),
UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method '<methodName>'"),
KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.kerberos.KerberosConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
Expand All @@ -34,7 +35,7 @@
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;

public class BaseSinkConfig {
public class BaseSinkConfig extends KerberosConfig {
public static final String SEATUNNEL = "seatunnel";
public static final String NON_PARTITION = "NON_PARTITION";
public static final String TRANSACTION_ID_SPLIT = "_";
Expand Down Expand Up @@ -228,25 +229,6 @@ public class BaseSinkConfig {
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("Kerberos keytab file path");

public static final Option<Integer> MAX_ROWS_IN_MEMORY =
Options.key("max_rows_in_memory")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;

import lombok.extern.slf4j.Slf4j;
import sun.security.krb5.KrbException;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -62,7 +69,7 @@ public Catalog loadCatalog() {
* @param config
* @return
*/
private Object loadHadoopConfig(CommonConfig config) {
public Object loadHadoopConfig(CommonConfig config) {
Class<?> configClass =
DynClasses.builder()
.impl("org.apache.hadoop.hdfs.HdfsConfiguration")
Expand All @@ -80,7 +87,6 @@ private Object loadHadoopConfig(CommonConfig config) {
log.info("Hadoop not found on classpath, not creating Hadoop config");
return null;
}

try {
Object result = configClass.getDeclaredConstructor().newInstance();
DynMethods.BoundMethod addResourceMethod =
Expand Down Expand Up @@ -109,6 +115,8 @@ private Object loadHadoopConfig(CommonConfig config) {
});
}
config.getHadoopProps().forEach(setMethod::invoke);
// kerberos authentication
doKerberosLogin((Configuration) result);
log.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (InstantiationException
Expand All @@ -121,4 +129,59 @@ private Object loadHadoopConfig(CommonConfig config) {
}
return null;
}

/**
* kerberos authentication
*
* @param configuration Configuration
*/
private Configuration doKerberosLogin(Configuration configuration) {
String kerberosKrb5ConfPath = config.getKerberosKrb5ConfPath();
String kerberosKeytabPath = config.getKerberosKeytabPath();
String kerberosPrincipal = config.getKerberosPrincipal();

if (StringUtils.isNotEmpty(kerberosPrincipal)
&& StringUtils.isNotEmpty(kerberosKrb5ConfPath)
&& StringUtils.isNotEmpty(kerberosKeytabPath)) {
try {
System.setProperty("java.security.krb5.conf", kerberosKrb5ConfPath);
System.setProperty("krb.principal", kerberosPrincipal);
doKerberosAuthentication(configuration, kerberosPrincipal, kerberosKeytabPath);
} catch (Exception e) {
throw new IcebergConnectorException(
CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
String.format("Kerberos authentication failed: %s", e.getMessage()));
}
} else {
log.warn(
"Kerberos authentication is not configured, it will skip kerberos authentication");
}

return configuration;
}

public static void doKerberosAuthentication(
Configuration configuration, String principal, String keytabPath) {
if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) {
log.warn(
"Principal [{}] or keytabPath [{}] is empty, it will skip kerberos authentication",
principal,
keytabPath);
} else {
configuration.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(configuration);
try {
log.info(
"Start Kerberos authentication using principal {} and keytab {}",
principal,
keytabPath);
sun.security.krb5.Config.refresh();
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
log.info("Kerberos authentication successful,UGI {}", loginUser);
} catch (IOException | KrbException e) {
throw new SeaTunnelException("check connectivity failed, " + e.getMessage(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ public OptionRule optionRule() {
CommonConfig.KEY_NAMESPACE,
CommonConfig.KEY_TABLE,
CommonConfig.CATALOG_PROPS)
.optional(CommonConfig.HADOOP_PROPS, KEY_CASE_SENSITIVE)
.optional(
CommonConfig.HADOOP_PROPS,
CommonConfig.KERBEROS_PRINCIPAL,
CommonConfig.KERBEROS_KEYTAB_PATH,
CommonConfig.KRB5_PATH,
KEY_CASE_SENSITIVE)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.kerberos.KerberosConfig;
import org.apache.seatunnel.common.config.ConfigRuntimeException;

import lombok.Getter;
Expand All @@ -33,7 +34,7 @@

@Getter
@ToString
public class CommonConfig implements Serializable {
public class CommonConfig extends KerberosConfig implements Serializable {
private static final long serialVersionUID = 239821141534421580L;

public static final Option<String> KEY_CATALOG_NAME =
Expand Down Expand Up @@ -89,6 +90,12 @@ public class CommonConfig implements Serializable {
private Map<String, String> hadoopProps;
private String hadoopConfPath;

// kerberos

private String kerberosPrincipal;
private String kerberosKeytabPath;
private String kerberosKrb5ConfPath;

public CommonConfig(ReadonlyConfig pluginConfig) {
this.catalogName = checkArgumentNotNull(pluginConfig.get(KEY_CATALOG_NAME));
this.namespace = pluginConfig.get(KEY_NAMESPACE);
Expand All @@ -99,6 +106,15 @@ public CommonConfig(ReadonlyConfig pluginConfig) {
if (pluginConfig.toConfig().hasPath(KEY_CASE_SENSITIVE.key())) {
this.caseSensitive = pluginConfig.get(KEY_CASE_SENSITIVE);
}
if (pluginConfig.getOptional(KERBEROS_PRINCIPAL).isPresent()) {
this.kerberosPrincipal = pluginConfig.getOptional(KERBEROS_PRINCIPAL).get();
}
if (pluginConfig.getOptional(KRB5_PATH).isPresent()) {
this.kerberosKrb5ConfPath = pluginConfig.getOptional(KRB5_PATH).get();
}
if (pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).isPresent()) {
this.kerberosKeytabPath = pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).get();
}
validate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.atomic.AtomicInteger;

public class IcebergTypeMapper {

public static SeaTunnelDataType<?> mapping(String field, @NonNull Type icebergType) {
switch (icebergType.typeId()) {
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public OptionRule optionRule() {
.optional(
SinkConfig.TABLE_PROPS,
SinkConfig.HADOOP_PROPS,
SinkConfig.KERBEROS_PRINCIPAL,
SinkConfig.KERBEROS_KEYTAB_PATH,
SinkConfig.KRB5_PATH,
SinkConfig.WRITE_PROPS,
SinkConfig.AUTO_CREATE_PROPS,
SinkConfig.TABLE_PRIMARY_KEYS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public abstract class AbstractSplitEnumerator
implements SourceSplitEnumerator<IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> {

protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context;
protected final Context<IcebergFileScanTaskSplit> context;
protected final SourceConfig sourceConfig;
protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class IcebergSourceReader implements SourceReader<SeaTunnelRow, IcebergFi

private static final long POLL_WAIT_MS = 1000;

private final SourceReader.Context context;
private final Context context;
private final Queue<IcebergFileScanTaskSplit> pendingSplits;
private final Deserializer deserializer;
private final Schema tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,19 @@ private static void commitSchemaUpdates(Table table, SchemaChangeWrapper wrapper
log.info("Schema for table {} updated with new columns", table.name());
}

private static boolean columnExists(org.apache.iceberg.Schema schema, SchemaAddColumn update) {
private static boolean columnExists(Schema schema, SchemaAddColumn update) {
Types.StructType struct =
update.parentName() == null
? schema.asStruct()
: schema.findType(update.parentName()).asStructType();
return struct.field(update.name()) != null;
}

private static boolean typeMatches(
org.apache.iceberg.Schema schema, SchemaModifyColumn update) {
private static boolean typeMatches(Schema schema, SchemaModifyColumn update) {
return schema.findType(update.name()).typeId() == update.type().typeId();
}

private static boolean findColumns(
org.apache.iceberg.Schema schema, SchemaDeleteColumn deleteColumn) {
private static boolean findColumns(Schema schema, SchemaDeleteColumn deleteColumn) {
return schema.findField(deleteColumn.name()) != null;
}

Expand All @@ -300,8 +298,7 @@ public static Type toIcebergType(SeaTunnelDataType rowType) {
return IcebergTypeMapper.toIcebergType(rowType);
}

public static PartitionSpec createPartitionSpec(
org.apache.iceberg.Schema schema, List<String> partitionBy) {
public static PartitionSpec createPartitionSpec(Schema schema, List<String> partitionBy) {
if (partitionBy.isEmpty()) {
return PartitionSpec.unpartitioned();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ static void setUpBeforeClass() throws Exception {
configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME);
configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
configs.put(SinkConfig.TABLE_DEFAULT_PARTITION_KEYS.key(), "dt_col");
// hadoop config directory
configs.put(CommonConfig.HADOOP_CONF_PATH_PROP.key(), "/tmp/hadoop/conf");
// hadoop kerberos config
configs.put(CommonConfig.KERBEROS_PRINCIPAL.key(), "hive/xxxx@xxxx.COM");
configs.put(
CommonConfig.KERBEROS_KEYTAB_PATH.key(), "/tmp/hadoop/conf/hive.service.keytab");
configs.put(CommonConfig.KRB5_PATH.key(), "/tmp/hadoop/conf/krb5.conf");
icebergCatalog = new IcebergCatalog(CATALOG_NAME, ReadonlyConfig.fromMap(configs));
icebergCatalog.open();
}
Expand Down
Loading