Skip to content

Commit

Permalink
[Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246)
Browse files Browse the repository at this point in the history
  • Loading branch information
luzongzhu authored Aug 14, 2024
1 parent b5140f5 commit e300120
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.kerberos;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class KerberosConfig {

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("When use kerberos, we should set kerberos user principal");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf'");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("When using kerberos, We should specify the keytab path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
WRITE_SEATUNNEL_ROW_ERROR(
"COMMON-23",
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),

SQL_TEMPLATE_HANDLED_ERROR(
"COMMON-24",
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),

VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."),
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."),
CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
Expand All @@ -78,6 +76,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"COMMON-33",
"The datetime format '<datetime>' of field '<field>' is not supported. Please check the datetime format."),
UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method '<methodName>'"),
KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.kerberos.KerberosConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
Expand All @@ -34,7 +35,7 @@
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;

public class BaseSinkConfig {
public class BaseSinkConfig extends KerberosConfig {
public static final String SEATUNNEL = "seatunnel";
public static final String NON_PARTITION = "NON_PARTITION";
public static final String TRANSACTION_ID_SPLIT = "_";
Expand Down Expand Up @@ -228,25 +229,6 @@ public class BaseSinkConfig {
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("Kerberos keytab file path");

public static final Option<Integer> MAX_ROWS_IN_MEMORY =
Options.key("max_rows_in_memory")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;

import lombok.extern.slf4j.Slf4j;
import sun.security.krb5.KrbException;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -62,7 +69,7 @@ public Catalog loadCatalog() {
* @param config
* @return
*/
private Object loadHadoopConfig(CommonConfig config) {
public Object loadHadoopConfig(CommonConfig config) {
Class<?> configClass =
DynClasses.builder()
.impl("org.apache.hadoop.hdfs.HdfsConfiguration")
Expand All @@ -80,7 +87,6 @@ private Object loadHadoopConfig(CommonConfig config) {
log.info("Hadoop not found on classpath, not creating Hadoop config");
return null;
}

try {
Object result = configClass.getDeclaredConstructor().newInstance();
DynMethods.BoundMethod addResourceMethod =
Expand Down Expand Up @@ -109,6 +115,8 @@ private Object loadHadoopConfig(CommonConfig config) {
});
}
config.getHadoopProps().forEach(setMethod::invoke);
// kerberos authentication
doKerberosLogin((Configuration) result);
log.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (InstantiationException
Expand All @@ -121,4 +129,59 @@ private Object loadHadoopConfig(CommonConfig config) {
}
return null;
}

/**
* kerberos authentication
*
* @param configuration Configuration
*/
private Configuration doKerberosLogin(Configuration configuration) {
String kerberosKrb5ConfPath = config.getKerberosKrb5ConfPath();
String kerberosKeytabPath = config.getKerberosKeytabPath();
String kerberosPrincipal = config.getKerberosPrincipal();

if (StringUtils.isNotEmpty(kerberosPrincipal)
&& StringUtils.isNotEmpty(kerberosKrb5ConfPath)
&& StringUtils.isNotEmpty(kerberosKeytabPath)) {
try {
System.setProperty("java.security.krb5.conf", kerberosKrb5ConfPath);
System.setProperty("krb.principal", kerberosPrincipal);
doKerberosAuthentication(configuration, kerberosPrincipal, kerberosKeytabPath);
} catch (Exception e) {
throw new IcebergConnectorException(
CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
String.format("Kerberos authentication failed: %s", e.getMessage()));
}
} else {
log.warn(
"Kerberos authentication is not configured, it will skip kerberos authentication");
}

return configuration;
}

public static void doKerberosAuthentication(
Configuration configuration, String principal, String keytabPath) {
if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) {
log.warn(
"Principal [{}] or keytabPath [{}] is empty, it will skip kerberos authentication",
principal,
keytabPath);
} else {
configuration.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(configuration);
try {
log.info(
"Start Kerberos authentication using principal {} and keytab {}",
principal,
keytabPath);
sun.security.krb5.Config.refresh();
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
log.info("Kerberos authentication successful,UGI {}", loginUser);
} catch (IOException | KrbException e) {
throw new SeaTunnelException("check connectivity failed, " + e.getMessage(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ public OptionRule optionRule() {
CommonConfig.KEY_NAMESPACE,
CommonConfig.KEY_TABLE,
CommonConfig.CATALOG_PROPS)
.optional(CommonConfig.HADOOP_PROPS, KEY_CASE_SENSITIVE)
.optional(
CommonConfig.HADOOP_PROPS,
CommonConfig.KERBEROS_PRINCIPAL,
CommonConfig.KERBEROS_KEYTAB_PATH,
CommonConfig.KRB5_PATH,
KEY_CASE_SENSITIVE)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.kerberos.KerberosConfig;
import org.apache.seatunnel.common.config.ConfigRuntimeException;

import lombok.Getter;
Expand All @@ -33,7 +34,7 @@

@Getter
@ToString
public class CommonConfig implements Serializable {
public class CommonConfig extends KerberosConfig implements Serializable {
private static final long serialVersionUID = 239821141534421580L;

public static final Option<String> KEY_CATALOG_NAME =
Expand Down Expand Up @@ -89,6 +90,12 @@ public class CommonConfig implements Serializable {
private Map<String, String> hadoopProps;
private String hadoopConfPath;

// kerberos

private String kerberosPrincipal;
private String kerberosKeytabPath;
private String kerberosKrb5ConfPath;

public CommonConfig(ReadonlyConfig pluginConfig) {
this.catalogName = checkArgumentNotNull(pluginConfig.get(KEY_CATALOG_NAME));
this.namespace = pluginConfig.get(KEY_NAMESPACE);
Expand All @@ -99,6 +106,15 @@ public CommonConfig(ReadonlyConfig pluginConfig) {
if (pluginConfig.toConfig().hasPath(KEY_CASE_SENSITIVE.key())) {
this.caseSensitive = pluginConfig.get(KEY_CASE_SENSITIVE);
}
if (pluginConfig.getOptional(KERBEROS_PRINCIPAL).isPresent()) {
this.kerberosPrincipal = pluginConfig.getOptional(KERBEROS_PRINCIPAL).get();
}
if (pluginConfig.getOptional(KRB5_PATH).isPresent()) {
this.kerberosKrb5ConfPath = pluginConfig.getOptional(KRB5_PATH).get();
}
if (pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).isPresent()) {
this.kerberosKeytabPath = pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).get();
}
validate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.atomic.AtomicInteger;

public class IcebergTypeMapper {

public static SeaTunnelDataType<?> mapping(String field, @NonNull Type icebergType) {
switch (icebergType.typeId()) {
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public OptionRule optionRule() {
.optional(
SinkConfig.TABLE_PROPS,
SinkConfig.HADOOP_PROPS,
SinkConfig.KERBEROS_PRINCIPAL,
SinkConfig.KERBEROS_KEYTAB_PATH,
SinkConfig.KRB5_PATH,
SinkConfig.WRITE_PROPS,
SinkConfig.AUTO_CREATE_PROPS,
SinkConfig.TABLE_PRIMARY_KEYS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public abstract class AbstractSplitEnumerator
implements SourceSplitEnumerator<IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> {

protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context;
protected final Context<IcebergFileScanTaskSplit> context;
protected final SourceConfig sourceConfig;
protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class IcebergSourceReader implements SourceReader<SeaTunnelRow, IcebergFi

private static final long POLL_WAIT_MS = 1000;

private final SourceReader.Context context;
private final Context context;
private final Queue<IcebergFileScanTaskSplit> pendingSplits;
private final Deserializer deserializer;
private final Schema tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,19 @@ private static void commitSchemaUpdates(Table table, SchemaChangeWrapper wrapper
log.info("Schema for table {} updated with new columns", table.name());
}

private static boolean columnExists(org.apache.iceberg.Schema schema, SchemaAddColumn update) {
private static boolean columnExists(Schema schema, SchemaAddColumn update) {
Types.StructType struct =
update.parentName() == null
? schema.asStruct()
: schema.findType(update.parentName()).asStructType();
return struct.field(update.name()) != null;
}

private static boolean typeMatches(
org.apache.iceberg.Schema schema, SchemaModifyColumn update) {
private static boolean typeMatches(Schema schema, SchemaModifyColumn update) {
return schema.findType(update.name()).typeId() == update.type().typeId();
}

private static boolean findColumns(
org.apache.iceberg.Schema schema, SchemaDeleteColumn deleteColumn) {
private static boolean findColumns(Schema schema, SchemaDeleteColumn deleteColumn) {
return schema.findField(deleteColumn.name()) != null;
}

Expand All @@ -300,8 +298,7 @@ public static Type toIcebergType(SeaTunnelDataType rowType) {
return IcebergTypeMapper.toIcebergType(rowType);
}

public static PartitionSpec createPartitionSpec(
org.apache.iceberg.Schema schema, List<String> partitionBy) {
public static PartitionSpec createPartitionSpec(Schema schema, List<String> partitionBy) {
if (partitionBy.isEmpty()) {
return PartitionSpec.unpartitioned();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ static void setUpBeforeClass() throws Exception {
configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME);
configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
configs.put(SinkConfig.TABLE_DEFAULT_PARTITION_KEYS.key(), "dt_col");
// hadoop config directory
configs.put(CommonConfig.HADOOP_CONF_PATH_PROP.key(), "/tmp/hadoop/conf");
// hadoop kerberos config
configs.put(CommonConfig.KERBEROS_PRINCIPAL.key(), "hive/xxxx@xxxx.COM");
configs.put(
CommonConfig.KERBEROS_KEYTAB_PATH.key(), "/tmp/hadoop/conf/hive.service.keytab");
configs.put(CommonConfig.KRB5_PATH.key(), "/tmp/hadoop/conf/krb5.conf");
icebergCatalog = new IcebergCatalog(CATALOG_NAME, ReadonlyConfig.fromMap(configs));
icebergCatalog.open();
}
Expand Down

0 comments on commit e300120

Please sign in to comment.