diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java new file mode 100644 index 000000000000..b70eed700908 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.RewriteAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; + +import java.io.IOException; + +public class ExecutionStrategyUtil { + + /** + * Transform IndexedRecord into HoodieRecord. + * + * @param indexedRecord indexedRecord. + * @param writeConfig writeConfig. + * @return hoodieRecord. + * @param + */ + public static HoodieRecord transform(IndexedRecord indexedRecord, + HoodieWriteConfig writeConfig) { + + GenericRecord record = (GenericRecord) indexedRecord; + Option keyGeneratorOpt = Option.empty(); + + if (!writeConfig.populateMetaFields()) { + try { + TypedProperties typedProperties = new TypedProperties(writeConfig.getProps()); + keyGeneratorOpt = Option.of((BaseKeyGenerator) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties)); + } catch (IOException e) { + throw new HoodieIOException( + "Only BaseKeyGenerators are supported when meta columns are disabled ", e); + } + } + + String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); + String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); + HoodieKey hoodieKey = new HoodieKey(key, partition); + + HoodieRecordPayload avroPayload = new RewriteAvroPayload(record); + HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, avroPayload); + return hoodieRecord; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 80b9f400f614..c3a4ec3bd469 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -30,12 +30,10 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ClusteringOperation; -import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.CollectionUtils; @@ -47,7 +45,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; @@ -57,16 +54,12 @@ import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; -import org.apache.hudi.keygen.BaseKeyGenerator; -import org.apache.hudi.keygen.KeyGenUtils; -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -424,21 +417,6 @@ private JavaRDD[] convertStreamToArray(Stream> * Transform IndexedRecord into HoodieRecord. */ private static HoodieRecord transform(IndexedRecord indexedRecord, HoodieWriteConfig writeConfig) { - GenericRecord record = (GenericRecord) indexedRecord; - Option keyGeneratorOpt = Option.empty(); - if (!writeConfig.populateMetaFields()) { - try { - keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())); - } catch (IOException e) { - throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); - } - } - String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); - String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); - HoodieKey hoodieKey = new HoodieKey(key, partition); - - HoodieRecordPayload avroPayload = new RewriteAvroPayload(record); - HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, avroPayload); - return hoodieRecord; + return ExecutionStrategyUtil.transform(indexedRecord, writeConfig); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index bb6d3df5f105..601d2ec8a7f4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -24,33 +24,25 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.ConcatenatingIterator; import org.apache.hudi.common.config.SerializableSchema; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.ClusteringGroupInfo; import org.apache.hudi.common.model.ClusteringOperation; -import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReaderFactory; -import org.apache.hudi.keygen.BaseKeyGenerator; -import org.apache.hudi.keygen.KeyGenUtils; -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -170,21 +162,6 @@ private Iterator> readRecordsForGroupBaseFiles(List transform(IndexedRecord indexedRecord) { - GenericRecord record = (GenericRecord) indexedRecord; - Option keyGeneratorOpt = Option.empty(); - if (!getWriteConfig().populateMetaFields()) { - try { - keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps()))); - } catch (IOException e) { - throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); - } - } - String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); - String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); - HoodieKey hoodieKey = new HoodieKey(key, partition); - - HoodieRecordPayload avroPayload = new RewriteAvroPayload(record); - HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, avroPayload); - return hoodieRecord; + return ExecutionStrategyUtil.transform(indexedRecord, getWriteConfig()); } }