From c6e5fd027199a5ce11a5c7f3a6048a135073588c Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 28 Oct 2022 15:55:27 +0530 Subject: [PATCH] [HUDI-3287] Remove hudi-spark dependencies from hudi-kafka-connect-bundle (#6079) --- .../keygen/factory/HoodieAvroKeyGeneratorFactory.java | 2 +- .../apache/hudi/connect/utils/KafkaConnectUtils.java | 10 ++++------ .../writers/KafkaConnectTransactionServices.java | 2 +- packaging/hudi-kafka-connect-bundle/pom.xml | 7 ------- 4 files changed, 6 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java index 0e17aff24286..b24b9a8e2d9b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java @@ -54,7 +54,7 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx return Objects.isNull(keyGenerator) ? createAvroKeyGeneratorByType(props) : keyGenerator; } - private static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException { + public static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException { // Use KeyGeneratorType.SIMPLE as default keyGeneratorType String keyGeneratorType = props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java index d62c9a768c4d..6b08bae2af94 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java @@ -18,8 +18,6 @@ package org.apache.hudi.connect.utils; -import com.google.protobuf.ByteString; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -36,9 +34,11 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.CustomAvroKeyGenerator; -import org.apache.hudi.keygen.CustomKeyGenerator; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.TopicDescription; @@ -185,8 +185,7 @@ public static String getRecordKeyColumns(KeyGenerator keyGenerator) { * @return partition columns Returns the partition columns separated by comma. */ public static String getPartitionColumns(KeyGenerator keyGenerator, TypedProperties typedProperties) { - - if (keyGenerator instanceof CustomKeyGenerator || keyGenerator instanceof CustomAvroKeyGenerator) { + if (keyGenerator instanceof CustomAvroKeyGenerator) { return ((BaseKeyGenerator) keyGenerator).getPartitionPathFields().stream().map( pathField -> Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)) .findFirst().orElse("Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) @@ -200,7 +199,6 @@ public static String getPartitionColumns(KeyGenerator keyGenerator, TypedPropert return typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); } - /** * Get the Metadata from the latest commit file. * diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index 934dbadf1c75..f71d8480c3ef 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -84,7 +84,7 @@ public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throw context = new HoodieJavaEngineContext(hadoopConf); try { - KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator( + KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createAvroKeyGeneratorByType( new TypedProperties(connectConfigs.getProps())); String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator); String partitionColumns = KafkaConnectUtils.getPartitionColumns(keyGenerator, diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index 9222937ba75f..82d6f2501649 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -75,8 +75,6 @@ org.apache.hudi:hudi-common org.apache.hudi:hudi-client-common org.apache.hudi:hudi-java-client - org.apache.hudi:hudi-spark-client - org.apache.hudi:hudi-spark-common_${scala.binary.version} org.apache.hudi:hudi-kafka-connect org.apache.hudi:hudi-utilities_${scala.binary.version} org.apache.hudi:hudi-hive-sync @@ -321,11 +319,6 @@ - - org.apache.hudi - hudi-spark-common_${scala.binary.version} - ${project.version} -