Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Iceberg] Support Iceberg Kerberos #7246

Merged
merged 12 commits into from
Aug 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
WRITE_SEATUNNEL_ROW_ERROR(
"COMMON-23",
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),

SQL_TEMPLATE_HANDLED_ERROR(
"COMMON-24",
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),

VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."),
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."),
CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
Expand All @@ -77,6 +75,8 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
FORMAT_DATETIME_ERROR(
"COMMON-33",
"The datetime format '<datetime>' of field '<field>' is not supported. Please check the datetime format."),

KERBEROS_AUTHORIZED_FAILED("COMMON-24", "Kerberos authorized failed"),
luzongzhu marked this conversation as resolved.
Show resolved Hide resolved
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;

import lombok.extern.slf4j.Slf4j;
import sun.security.krb5.KrbException;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -62,7 +69,7 @@ public Catalog loadCatalog() {
* @param config
* @return
*/
private Object loadHadoopConfig(CommonConfig config) {
public Object loadHadoopConfig(CommonConfig config) {
Class<?> configClass =
DynClasses.builder()
.impl("org.apache.hadoop.hdfs.HdfsConfiguration")
Expand All @@ -80,7 +87,6 @@ private Object loadHadoopConfig(CommonConfig config) {
log.info("Hadoop not found on classpath, not creating Hadoop config");
return null;
}

try {
Object result = configClass.getDeclaredConstructor().newInstance();
DynMethods.BoundMethod addResourceMethod =
Expand Down Expand Up @@ -109,6 +115,8 @@ private Object loadHadoopConfig(CommonConfig config) {
});
}
config.getHadoopProps().forEach(setMethod::invoke);
// kerberos authentication
doKerberosLogin((Configuration) result);
log.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (InstantiationException
Expand All @@ -121,4 +129,59 @@ private Object loadHadoopConfig(CommonConfig config) {
}
return null;
}

/**
* kerberos authentication
*
* @param configuration Configuration
*/
private Configuration doKerberosLogin(Configuration configuration) {
String kerberosKrb5ConfPath = config.getKerberosKrb5ConfPath();
String kerberosKeytabPath = config.getKerberosKeytabPath();
String kerberosPrincipal = config.getKerberosPrincipal();

if (StringUtils.isNotEmpty(kerberosPrincipal)
&& StringUtils.isNotEmpty(kerberosKrb5ConfPath)
&& StringUtils.isNotEmpty(kerberosKeytabPath)) {
try {
System.setProperty("java.security.krb5.conf", kerberosKrb5ConfPath);
System.setProperty("krb.principal", kerberosPrincipal);
doKerberosAuthentication(configuration, kerberosPrincipal, kerberosKeytabPath);
} catch (Exception e) {
throw new IcebergConnectorException(
CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
String.format("Kerberos authentication failed: %s", e.getMessage()));
}
} else {
log.warn(
"Kerberos authentication is not configured, it will skip kerberos authentication");
}

return configuration;
}

public static void doKerberosAuthentication(
Configuration configuration, String principal, String keytabPath) {
if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) {
log.warn(
"Principal [{}] or keytabPath [{}] is empty, it will skip kerberos authentication",
principal,
keytabPath);
} else {
configuration.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(configuration);
try {
log.info(
"Start Kerberos authentication using principal {} and keytab {}",
principal,
keytabPath);
sun.security.krb5.Config.refresh();
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
log.info("Kerberos authentication successful,UGI {}", loginUser);
} catch (IOException | KrbException e) {
throw new SeaTunnelException("check connectivity failed, " + e.getMessage(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ public OptionRule optionRule() {
CommonConfig.KEY_NAMESPACE,
CommonConfig.KEY_TABLE,
CommonConfig.CATALOG_PROPS)
.optional(CommonConfig.HADOOP_PROPS, KEY_CASE_SENSITIVE)
.optional(
CommonConfig.HADOOP_PROPS,
CommonConfig.KERBEROS_PRINCIPAL,
CommonConfig.KERBEROS_KEYTAB_PATH,
CommonConfig.KERBEROS_KRB5_CONF_PATH,
KEY_CASE_SENSITIVE)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,25 @@ public class CommonConfig implements Serializable {
.defaultValue(false)
.withDescription(" the iceberg case_sensitive");

// for kerberos
public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("jdbc kerberos_principal");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("jdbc kerberos_keytab_path");

public static final Option<String> KERBEROS_KRB5_CONF_PATH =
Options.key("kerberos_krb5_conf_path")
.stringType()
.noDefaultValue()
.withDescription("jdbc kerberos_keytab_path");
luzongzhu marked this conversation as resolved.
Show resolved Hide resolved

private String catalogName;
private String namespace;
private String table;
Expand All @@ -88,6 +107,10 @@ public class CommonConfig implements Serializable {
private Map<String, String> catalogProps;
private Map<String, String> hadoopProps;
private String hadoopConfPath;
// kerberos
private String kerberosPrincipal;
private String kerberosKeytabPath;
private String kerberosKrb5ConfPath;

public CommonConfig(ReadonlyConfig pluginConfig) {
this.catalogName = checkArgumentNotNull(pluginConfig.get(KEY_CATALOG_NAME));
Expand All @@ -99,6 +122,15 @@ public CommonConfig(ReadonlyConfig pluginConfig) {
if (pluginConfig.toConfig().hasPath(KEY_CASE_SENSITIVE.key())) {
this.caseSensitive = pluginConfig.get(KEY_CASE_SENSITIVE);
}
if (pluginConfig.getOptional(KERBEROS_PRINCIPAL).isPresent()) {
this.kerberosPrincipal = pluginConfig.getOptional(KERBEROS_PRINCIPAL).get();
}
if (pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).isPresent()) {
this.kerberosKeytabPath = pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).get();
}
if (pluginConfig.getOptional(KERBEROS_KRB5_CONF_PATH).isPresent()) {
this.kerberosKrb5ConfPath = pluginConfig.getOptional(KERBEROS_KRB5_CONF_PATH).get();
}
validate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicInteger;

public class IcebergTypeMapper {
private static int fieldId = 1;
luzongzhu marked this conversation as resolved.
Show resolved Hide resolved

public static SeaTunnelDataType<?> mapping(String field, @NonNull Type icebergType) {
switch (icebergType.typeId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public OptionRule optionRule() {
.optional(
SinkConfig.TABLE_PROPS,
SinkConfig.HADOOP_PROPS,
SinkConfig.KERBEROS_PRINCIPAL,
SinkConfig.KERBEROS_KEYTAB_PATH,
SinkConfig.KERBEROS_KRB5_CONF_PATH,
SinkConfig.WRITE_PROPS,
SinkConfig.AUTO_CREATE_PROPS,
SinkConfig.TABLE_PRIMARY_KEYS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public abstract class AbstractSplitEnumerator
implements SourceSplitEnumerator<IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> {

protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context;
protected final Context<IcebergFileScanTaskSplit> context;
protected final SourceConfig sourceConfig;
protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class IcebergSourceReader implements SourceReader<SeaTunnelRow, IcebergFi

private static final long POLL_WAIT_MS = 1000;

private final SourceReader.Context context;
private final Context context;
private final Queue<IcebergFileScanTaskSplit> pendingSplits;
private final Deserializer deserializer;
private final Schema tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,19 @@ private static void commitSchemaUpdates(Table table, SchemaChangeWrapper wrapper
log.info("Schema for table {} updated with new columns", table.name());
}

private static boolean columnExists(org.apache.iceberg.Schema schema, SchemaAddColumn update) {
private static boolean columnExists(Schema schema, SchemaAddColumn update) {
Types.StructType struct =
update.parentName() == null
? schema.asStruct()
: schema.findType(update.parentName()).asStructType();
return struct.field(update.name()) != null;
}

private static boolean typeMatches(
org.apache.iceberg.Schema schema, SchemaModifyColumn update) {
private static boolean typeMatches(Schema schema, SchemaModifyColumn update) {
return schema.findType(update.name()).typeId() == update.type().typeId();
}

private static boolean findColumns(
org.apache.iceberg.Schema schema, SchemaDeleteColumn deleteColumn) {
private static boolean findColumns(Schema schema, SchemaDeleteColumn deleteColumn) {
return schema.findField(deleteColumn.name()) != null;
}

Expand All @@ -300,8 +298,7 @@ public static Type toIcebergType(SeaTunnelDataType rowType) {
return IcebergTypeMapper.toIcebergType(rowType);
}

public static PartitionSpec createPartitionSpec(
org.apache.iceberg.Schema schema, List<String> partitionBy) {
public static PartitionSpec createPartitionSpec(Schema schema, List<String> partitionBy) {
if (partitionBy.isEmpty()) {
return PartitionSpec.unpartitioned();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ static void setUpBeforeClass() throws Exception {
configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME);
configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
configs.put(SinkConfig.TABLE_DEFAULT_PARTITION_KEYS.key(), "dt_col");
// hadoop config directory
configs.put(CommonConfig.HADOOP_CONF_PATH_PROP.key(), "/tmp/hadoop/conf");
// hadoop kerberos config
configs.put(CommonConfig.KERBEROS_PRINCIPAL.key(), "hive/xxxx@xxxx.COM");
configs.put(
CommonConfig.KERBEROS_KEYTAB_PATH.key(), "/tmp/hadoop/conf/hive.service.keytab");
configs.put(CommonConfig.KERBEROS_KRB5_CONF_PATH.key(), "/tmp/hadoop/conf/krb5.conf");
icebergCatalog = new IcebergCatalog(CATALOG_NAME, ReadonlyConfig.fromMap(configs));
icebergCatalog.open();
}
Expand Down
Loading