diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 073a0e0aad1d2..d83237cb92536 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -116,6 +116,9 @@ public class HoodieMergeHandle extends HoodieWriteHandle protected Option keyGeneratorOpt; private HoodieBaseFile baseFileToMerge; + protected Option partitionFields = Option.empty(); + protected Object[] partitionValues = new Object[0]; + public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { @@ -476,4 +479,20 @@ public IOType getIOType() { public HoodieBaseFile baseFileForMerge() { return baseFileToMerge; } + + public void setPartitionFields(Option partitionFields) { + this.partitionFields = partitionFields; + } + + public Option getPartitionFields() { + return this.partitionFields; + } + + public void setPartitionValues(Object[] partitionValues) { + this.partitionValues = partitionValues; + } + + public Object[] getPartitionValues() { + return this.partitionValues; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index fa5711616aa1f..b7668a347969e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.action.commit; -import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; @@ -119,14 +118,12 @@ public void runMerge(HoodieTable table, if (baseFile.getBootstrapBaseFile().isPresent()) { Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); - bootstrapFileReader = - HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath); - - recordIterator = new MergingIterator<>( - baseFileRecordIterator, - bootstrapFileReader.getRecordIterator(), - (left, right) -> - left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields())); + bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( + baseFileReader, + HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath), + mergeHandle.getPartitionFields(), + mergeHandle.getPartitionValues()); + recordIterator = bootstrapFileReader.getRecordIterator(mergeHandle.getWriterSchemaWithMetaFields()); recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); } else { recordIterator = baseFileRecordIterator; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index bf05e43fd900c..9210cd5eac61c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,8 +18,6 @@ package org.apache.hudi.client.clustering.run.strategy; -import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; @@ -54,7 +52,6 @@ import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner; -import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -72,8 +69,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.sources.BaseRelation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +85,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals; import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; @@ -379,16 +375,7 @@ private HoodieFileReader getBaseOrBootstrapFileReader(SerializableConfiguration if (partitionFields.isPresent()) { int startOfPartitionPath = bootstrapFilePath.indexOf(bootstrapBasePath) + bootstrapBasePath.length() + 1; String partitionFilePath = bootstrapFilePath.substring(startOfPartitionPath, bootstrapFilePath.lastIndexOf("/")); - CachingPath bootstrapCachingPath = new CachingPath(bootstrapBasePath); - SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil(); - partitionValues = HoodieSparkUtils.parsePartitionColumnValues( - partitionFields.get(), - partitionFilePath, - bootstrapCachingPath, - AvroConversionUtils.convertAvroSchemaToStructType(baseFileReader.getSchema()), - hadoopConf.get().get("timeZone", SQLConf.get().sessionLocalTimeZone()), - sparkParsePartitionUtil, - hadoopConf.get().getBoolean("spark.sql.sources.validatePartitionColumns", true)); + partitionValues = getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), hadoopConf.get()); } baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( baseFileReader, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java new file mode 100644 index 0000000000000..6dc344ec7347b --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.SparkAdapterSupport$; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.CachingPath; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil; +import org.apache.spark.sql.internal.SQLConf; + +public class SparkPartitionUtils { + + public static Object[] getPartitionFieldVals(Option partitionFields, + String partitionPath, + String basePath, + Schema writerSchema, + Configuration hadoopConf) { + if (!partitionFields.isPresent()) { + return new Object[0]; + } + SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil(); + return HoodieSparkUtils.parsePartitionColumnValues( + partitionFields.get(), + partitionPath, + new CachingPath(basePath), + AvroConversionUtils.convertAvroSchemaToStructType(writerSchema), + hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()), + sparkParsePartitionUtil, + hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns", true)); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 8306fbbe9d8fe..b8be53eeaf999 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -30,6 +30,7 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.SparkPartitionUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; @@ -222,6 +223,14 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle partitionFields = getMetaClient().getTableConfig().getPartitionFields(); + Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), + getMetaClient().getTableConfig().getBootstrapBasePath().get(), + upsertHandle.getWriterSchema(), getHadoopConf()); + upsertHandle.setPartitionFields(partitionFields); + upsertHandle.setPartitionValues(partitionValues); + } HoodieMergeHelper.newInstance().runMerge(this, upsertHandle); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 47f031e0ac7d1..f8ba2c8e1e4a9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.SparkPartitionUtils; import org.apache.hudi.client.utils.SparkValidatorUtils; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; @@ -366,6 +367,15 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle partitionFields = table.getMetaClient().getTableConfig().getPartitionFields(); + Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), + table.getMetaClient().getTableConfig().getBootstrapBasePath().get(), + upsertHandle.getWriterSchema(), table.getHadoopConf()); + upsertHandle.setPartitionFields(partitionFields); + upsertHandle.setPartitionValues(partitionValues); + } + HoodieMergeHelper.newInstance().runMerge(table, upsertHandle); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java index 7184312012ad3..4dc1a73e6ddb3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java @@ -21,7 +21,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; @@ -64,35 +63,21 @@ public Set filterRowKeys(Set candidateRowKeys) { public ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { ClosableIterator> skeletonIterator = skeletonFileReader.getRecordIterator(readerSchema, requestedSchema); ClosableIterator> dataFileIterator = dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema), requestedSchema); - return new ClosableIterator>() { + return new HoodieBootstrapRecordIterator(skeletonIterator, dataFileIterator, readerSchema, partitionFields, partitionValues) { @Override - public void close() { - skeletonIterator.close(); - dataFileIterator.close(); - } - - @Override - public boolean hasNext() { - return skeletonIterator.hasNext() && dataFileIterator.hasNext(); + protected void setPartitionPathField(int position, Object fieldValue, T row) { + setPartitionField(position, fieldValue, row); } + }; + } + public ClosableIterator> getRecordIterator(Schema schema) throws IOException { + ClosableIterator> skeletonIterator = skeletonFileReader.getRecordIterator(schema); + ClosableIterator> dataFileIterator = dataFileReader.getRecordIterator(dataFileReader.getSchema()); + return new HoodieBootstrapRecordIterator(skeletonIterator, dataFileIterator, schema, partitionFields, partitionValues) { @Override - public HoodieRecord next() { - HoodieRecord dataRecord = dataFileIterator.next(); - HoodieRecord skeletonRecord = skeletonIterator.next(); - HoodieRecord ret = dataRecord.prependMetaFields(readerSchema, readerSchema, - new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD)) - .setCommitSeqno(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)) - .setRecordKey(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD)) - .setPartitionPath(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.PARTITION_PATH_METADATA_FIELD)) - .setFileName(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.FILENAME_METADATA_FIELD)), null); - if (partitionFields.isPresent()) { - for (int i = 0; i < partitionValues.length; i++) { - int position = readerSchema.getField(partitionFields.get()[i]).pos(); - setPartitionField(position, partitionValues[i], ret.getData()); - } - } - return ret; + protected void setPartitionPathField(int position, Object fieldValue, T row) { + setPartitionField(position, fieldValue, row); } }; } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java new file mode 100644 index 0000000000000..43f2d1ad1ad58 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.MetadataValues; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; + +import org.apache.avro.Schema; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +public abstract class HoodieBootstrapRecordIterator implements ClosableIterator> { + + protected ClosableIterator> skeletonIterator; + protected ClosableIterator> dataFileIterator; + private final Option partitionFields; + private final Object[] partitionValues; + + protected Schema schema; + + public HoodieBootstrapRecordIterator(ClosableIterator> skeletonIterator, + ClosableIterator> dataFileIterator, + Schema schema, + Option partitionFields, + Object[] partitionValues) { + this.skeletonIterator = skeletonIterator; + this.dataFileIterator = dataFileIterator; + this.schema = schema; + this.partitionFields = partitionFields; + this.partitionValues = partitionValues; + } + + @Override + public void close() { + + } + + @Override + public boolean hasNext() { + checkState(skeletonIterator.hasNext() == dataFileIterator.hasNext()); + return skeletonIterator.hasNext(); + } + + @Override + public HoodieRecord next() { + HoodieRecord dataRecord = dataFileIterator.next(); + HoodieRecord skeletonRecord = skeletonIterator.next(); + HoodieRecord ret = dataRecord.prependMetaFields(schema, schema, + new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_TIME_METADATA_FIELD)) + .setCommitSeqno(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)) + .setRecordKey(skeletonRecord.getRecordKey(schema, HoodieRecord.RECORD_KEY_METADATA_FIELD)) + .setPartitionPath(skeletonRecord.getRecordKey(schema, HoodieRecord.PARTITION_PATH_METADATA_FIELD)) + .setFileName(skeletonRecord.getRecordKey(schema, HoodieRecord.FILENAME_METADATA_FIELD)), null); + if (partitionFields.isPresent()) { + for (int i = 0; i < partitionValues.length; i++) { + int position = schema.getField(partitionFields.get()[i]).pos(); + setPartitionPathField(position, partitionValues[i], ret.getData()); + } + } + return ret; + } + + protected abstract void setPartitionPathField(int position, Object fieldValue, T row); +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index d3c1de56773b3..1e6e9642c6429 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -71,13 +71,21 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT return init(getDefaultHadoopConf(), basePath, tableType, properties); } - public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException { + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator) throws IOException { Properties props = new Properties(); props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), bootstrapIndexEnable); + if (keyGenerator != null) { + props.put("hoodie.datasource.write.keygenerator.class", keyGenerator); + props.put("hoodie.datasource.write.partitionpath.field", "datestr"); + } return init(getDefaultHadoopConf(), basePath, tableType, props); } + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException { + return init(basePath, tableType, bootstrapBasePath, bootstrapIndexEnable, null); + } + public static HoodieTableMetaClient init(String basePath, HoodieFileFormat baseFileFormat) throws IOException { return init(getDefaultHadoopConf(), basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat); } @@ -140,7 +148,8 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa .setPayloadClass(HoodieAvroPayload.class); String keyGen = properties.getProperty("hoodie.datasource.write.keygenerator.class"); - if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")) { + if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + && !properties.containsKey("hoodie.datasource.write.partitionpath.field")) { builder.setPartitionFields("some_nonexistent_field"); } @@ -149,10 +158,14 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, processedProperties); } - public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException { + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat, String keyGenerator) throws IOException { Properties props = new Properties(); props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.name()); + if (keyGenerator != null) { + props.put("hoodie.datasource.write.keygenerator.class", keyGenerator); + props.put("hoodie.datasource.write.partitionpath.field", "datestr"); + } return init(getDefaultHadoopConf(), basePath, tableType, props); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index cb90f2d27055c..b6cda7c63c548 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -194,15 +194,15 @@ private enum EffectiveMode { private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception { + String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() + : NonpartitionedKeyGenerator.class.getCanonicalName(); if (deltaCommit) { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass); } else { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass); } int totalRecords = 100; - String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() - : NonpartitionedKeyGenerator.class.getCanonicalName(); final String bootstrapModeSelectorClass; final String bootstrapCommitInstantTs; final boolean checkNumRawFiles; @@ -242,6 +242,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec HoodieWriteConfig config = getConfigBuilder(schema.toString()) .withAutoCommit(true) .withSchema(schema.toString()) + .withKeyGenerator(keyGeneratorClass) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1) .build()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java new file mode 100644 index 0000000000000..c8f342aac35ce --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector; +import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieBootstrapConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.testutils.HoodieSparkClientTestBase; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.functions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests different layouts for bootstrap base path + */ +@Tag("functional") +public class TestBootstrapRead extends HoodieSparkClientTestBase { + + @TempDir + public java.nio.file.Path tmpFolder; + protected String bootstrapBasePath = null; + protected String bootstrapTargetPath = null; + protected String hudiBasePath = null; + + protected static int nInserts = 100; + protected static int nUpdates = 20; + protected static String[] dashPartitionPaths = {"2016-03-15", "2015-03-16", "2015-03-17"}; + protected static String[] slashPartitionPaths = {"2016/03/15", "2015/03/16", "2015/03/17"}; + protected String bootstrapType; + protected Boolean dashPartitions; + protected String tableType; + protected Integer nPartitions; + + protected String[] partitionCols; + protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state"}; + + @BeforeEach + public void setUp() throws Exception { + bootstrapBasePath = tmpFolder.toAbsolutePath() + "/bootstrapBasePath"; + hudiBasePath = tmpFolder.toAbsolutePath() + "/hudiBasePath"; + bootstrapTargetPath = tmpFolder.toAbsolutePath() + "/bootstrapTargetPath"; + initSparkContexts(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupSparkContexts(); + cleanupClients(); + cleanupTestDataGenerator(); + } + + private static Stream testArgs() { + Stream.Builder b = Stream.builder(); + //TODO: add dash partitions false with [HUDI-4944] + Boolean[] dashPartitions = {true/*,false*/}; + String[] tableType = {"COPY_ON_WRITE", "MERGE_ON_READ"}; + String[] bootstrapType = {"full", "metadata", "mixed"}; + Integer[] nPartitions = {0, 1, 2}; + + for (String tt : tableType) { + for (Boolean dash : dashPartitions) { + for (String bt : bootstrapType) { + for (Integer n : nPartitions) { + //can't be mixed bootstrap if it's nonpartitioned + if (!bt.equals("mixed") || n > 0) { + b.add(Arguments.of(bt, dash, tt, n)); + } + } + } + } + } + return b.build(); + } + + @ParameterizedTest + @MethodSource("testArgs") + public void runTests(String bootstrapType,Boolean dashPartitions, String tableType, Integer nPartitions) { + this.bootstrapType = bootstrapType; + this.dashPartitions = dashPartitions; + this.tableType = tableType; + this.nPartitions = nPartitions; + setupDirs(); + + //do bootstrap + Map options = setBootstrapOptions(); + Dataset bootstrapDf = sparkSession.emptyDataFrame(); + bootstrapDf.write().format("hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(bootstrapTargetPath); + compareTables(); + + //do upserts + options = basicOptions(); + doUpdate(options, "001"); + compareTables(); + + doUpdate(options, "002"); + compareTables(); + } + + private Map basicOptions() { + Map options = new HashMap<>(); + options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType); + options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); + options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + //TODO: enable timeline server with [HUDI-6201] + options.put("hoodie.embed.timeline.server", "false"); + if (nPartitions == 0) { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); + } else { + options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), String.join(",", partitionCols)); + if (nPartitions == 1) { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + } else { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName()); + } + } + options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + if (tableType.equals("MERGE_ON_READ")) { + options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); + } + options.put(HoodieWriteConfig.TBL_NAME.key(), "test"); + return options; + } + + private Map setBootstrapOptions() { + Map options = basicOptions(); + options.put(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL()); + options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath); + + switch (bootstrapType) { + case "metadata": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), MetadataOnlyBootstrapModeSelector.class.getName()); + break; + case "full": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), FullRecordBootstrapModeSelector.class.getName()); + break; + case "mixed": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), BootstrapRegexModeSelector.class.getName()); + String regexPattern; + if (dashPartitions) { + regexPattern = "partition_path=2015-03-1[5-7]"; + } else { + regexPattern = "partition_path=2015%2F03%2F1[5-7]"; + } + if (nPartitions > 1) { + regexPattern = regexPattern + "\\/.*"; + } + options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), regexPattern); + break; + default: + throw new RuntimeException(); + } + return options; + } + + protected void doUpdate(Map options, String instantTime) { + Dataset updates = generateTestUpdates(instantTime); + String nCompactCommits = "3"; + updates.write().format("hudi") + .options(options) + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) + .mode(SaveMode.Append) + .save(hudiBasePath); + if (bootstrapType.equals("mixed")) { + //mixed tables have a commit for each of the metadata and full bootstrap modes + nCompactCommits = "4"; + } + updates.write().format("hudi") + .options(options) + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) + .mode(SaveMode.Append) + .save(bootstrapTargetPath); + } + + protected void compareTables() { + Map readOpts = new HashMap<>(); + if (tableType.equals("MERGE_ON_READ")) { + //Bootstrap MOR currently only has read optimized queries implemented + readOpts.put(DataSourceReadOptions.QUERY_TYPE().key(),DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()); + } + Dataset hudiDf = sparkSession.read().options(readOpts).format("hudi").load(hudiBasePath); + Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath); + if (nPartitions == 0) { + compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns)); + return; + } + compareDf(hudiDf.drop(dropColumns).drop(partitionCols), bootstrapDf.drop(dropColumns).drop(partitionCols)); + compareDf(hudiDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); + } + + protected void compareDf(Dataset df1, Dataset df2) { + assertEquals(0, df1.except(df2).count()); + assertEquals(0, df2.except(df1).count()); + } + + protected void setupDirs() { + dataGen = new HoodieTestDataGenerator(dashPartitions ? dashPartitionPaths : slashPartitionPaths); + Dataset inserts = generateTestInserts(); + if (nPartitions > 0) { + partitionCols = new String[nPartitions]; + partitionCols[0] = "partition_path"; + for (int i = 1; i < partitionCols.length; i++) { + partitionCols[i] = "partpath" + (i + 1); + } + inserts.write().partitionBy(partitionCols).save(bootstrapBasePath); + } else { + inserts.write().save(bootstrapBasePath); + } + + inserts.write().format("hudi") + .options(basicOptions()) + .mode(SaveMode.Overwrite) + .save(hudiBasePath); + } + + public Dataset generateTestInserts() { + List records = dataGen.generateInserts("000", nInserts).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD rdd = jsc.parallelize(records); + return addPartitionColumns(sparkSession.read().json(rdd), nPartitions); + } + + public Dataset generateTestUpdates(String instantTime) { + try { + List records = dataGen.generateUpdates(instantTime, nUpdates).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD rdd = jsc.parallelize(records); + return addPartitionColumns(sparkSession.read().json(rdd), nPartitions); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static Dataset addPartitionColumns(Dataset df, Integer nPartitions) { + if (nPartitions < 2) { + return df; + } + for (int i = 2; i <= nPartitions; i++) { + df = applyPartition(df, i); + } + return df; + } + + private static Dataset applyPartition(Dataset df, Integer n) { + return df.withColumn("partpath" + n, + functions.md5(functions.concat_ws("," + n + ",", + df.col("partition_path"), + functions.hash(df.col("_row_key")).mod(n)))); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index b65e02402e787..80c2744efcd98 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -53,6 +53,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.table.action.bootstrap.BootstrapUtils; import org.apache.hudi.testutils.HoodieSparkClientTestBase; @@ -182,11 +183,13 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec if (!HoodieSparkUtils.gteqSpark3_0()) { return; } + String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() + : NonpartitionedKeyGenerator.class.getCanonicalName(); if (deltaCommit) { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, HoodieFileFormat.ORC); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, HoodieFileFormat.ORC, keyGeneratorClass); } else { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, HoodieFileFormat.ORC); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, HoodieFileFormat.ORC, keyGeneratorClass); } int totalRecords = 100; @@ -229,6 +232,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec HoodieWriteConfig config = getConfigBuilder(schema.toString(), partitioned) .withAutoCommit(true) .withSchema(schema.toString()) + .withKeyGenerator(keyGeneratorClass) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1) .build())