Skip to content

Commit

Permalink
[HUDI-915] Add missing partititonpath to records COW (#8666)
Browse files Browse the repository at this point in the history
In a bootstrapped table, if there was an upsert in a file group, 
the records in the filegroups that were not upserted would 
have null values for their partition column. This patch fixes the issue.
  • Loading branch information
jonvex authored May 17, 2023
1 parent 647ad0f commit efc30f6
Show file tree
Hide file tree
Showing 12 changed files with 515 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
protected Option<BaseKeyGenerator> keyGeneratorOpt;
private HoodieBaseFile baseFileToMerge;

protected Option<String[]> partitionFields = Option.empty();
protected Object[] partitionValues = new Object[0];

public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
Expand Down Expand Up @@ -476,4 +479,20 @@ public IOType getIOType() {
public HoodieBaseFile baseFileForMerge() {
return baseFileToMerge;
}

public void setPartitionFields(Option<String[]> partitionFields) {
this.partitionFields = partitionFields;
}

public Option<String[]> getPartitionFields() {
return this.partitionFields;
}

public void setPartitionValues(Object[] partitionValues) {
this.partitionValues = partitionValues;
}

public Object[] getPartitionValues() {
return this.partitionValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -119,14 +118,12 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
if (baseFile.getBootstrapBaseFile().isPresent()) {
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath);

recordIterator = new MergingIterator<>(
baseFileRecordIterator,
bootstrapFileReader.getRecordIterator(),
(left, right) ->
left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
baseFileReader,
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath),
mergeHandle.getPartitionFields(),
mergeHandle.getPartitionValues());
recordIterator = bootstrapFileReader.getRecordIterator(mergeHandle.getWriterSchemaWithMetaFields());
recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
} else {
recordIterator = baseFileRecordIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
Expand Down Expand Up @@ -54,7 +52,6 @@
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -72,8 +69,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.BaseRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -90,6 +85,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals;
import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
Expand Down Expand Up @@ -379,16 +375,7 @@ private HoodieFileReader getBaseOrBootstrapFileReader(SerializableConfiguration
if (partitionFields.isPresent()) {
int startOfPartitionPath = bootstrapFilePath.indexOf(bootstrapBasePath) + bootstrapBasePath.length() + 1;
String partitionFilePath = bootstrapFilePath.substring(startOfPartitionPath, bootstrapFilePath.lastIndexOf("/"));
CachingPath bootstrapCachingPath = new CachingPath(bootstrapBasePath);
SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil();
partitionValues = HoodieSparkUtils.parsePartitionColumnValues(
partitionFields.get(),
partitionFilePath,
bootstrapCachingPath,
AvroConversionUtils.convertAvroSchemaToStructType(baseFileReader.getSchema()),
hadoopConf.get().get("timeZone", SQLConf.get().sessionLocalTimeZone()),
sparkParsePartitionUtil,
hadoopConf.get().getBoolean("spark.sql.sources.validatePartitionColumns", true));
partitionValues = getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), hadoopConf.get());
}
baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
baseFileReader,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.CachingPath;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil;
import org.apache.spark.sql.internal.SQLConf;

public class SparkPartitionUtils {

public static Object[] getPartitionFieldVals(Option<String[]> partitionFields,
String partitionPath,
String basePath,
Schema writerSchema,
Configuration hadoopConf) {
if (!partitionFields.isPresent()) {
return new Object[0];
}
SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil();
return HoodieSparkUtils.parsePartitionColumnValues(
partitionFields.get(),
partitionPath,
new CachingPath(basePath),
AvroConversionUtils.convertAvroSchemaToStructType(writerSchema),
hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()),
sparkParsePartitionUtil,
hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns", true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
Expand Down Expand Up @@ -222,6 +223,14 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
Option<String[]> partitionFields = getMetaClient().getTableConfig().getPartitionFields();
Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(),
getMetaClient().getTableConfig().getBootstrapBasePath().get(),
upsertHandle.getWriterSchema(), getHadoopConf());
upsertHandle.setPartitionFields(partitionFields);
upsertHandle.setPartitionValues(partitionValues);
}
HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
Expand Down Expand Up @@ -366,6 +367,15 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
Option<String[]> partitionFields = table.getMetaClient().getTableConfig().getPartitionFields();
Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(),
table.getMetaClient().getTableConfig().getBootstrapBasePath().get(),
upsertHandle.getWriterSchema(), table.getHadoopConf());
upsertHandle.setPartitionFields(partitionFields);
upsertHandle.setPartitionValues(partitionValues);
}

HoodieMergeHelper.newInstance().runMerge(table, upsertHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;

Expand Down Expand Up @@ -64,35 +63,21 @@ public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
ClosableIterator<HoodieRecord<T>> skeletonIterator = skeletonFileReader.getRecordIterator(readerSchema, requestedSchema);
ClosableIterator<HoodieRecord<T>> dataFileIterator = dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema), requestedSchema);
return new ClosableIterator<HoodieRecord<T>>() {
return new HoodieBootstrapRecordIterator<T>(skeletonIterator, dataFileIterator, readerSchema, partitionFields, partitionValues) {
@Override
public void close() {
skeletonIterator.close();
dataFileIterator.close();
}

@Override
public boolean hasNext() {
return skeletonIterator.hasNext() && dataFileIterator.hasNext();
protected void setPartitionPathField(int position, Object fieldValue, T row) {
setPartitionField(position, fieldValue, row);
}
};
}

public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema schema) throws IOException {
ClosableIterator<HoodieRecord<T>> skeletonIterator = skeletonFileReader.getRecordIterator(schema);
ClosableIterator<HoodieRecord<T>> dataFileIterator = dataFileReader.getRecordIterator(dataFileReader.getSchema());
return new HoodieBootstrapRecordIterator<T>(skeletonIterator, dataFileIterator, schema, partitionFields, partitionValues) {
@Override
public HoodieRecord<T> next() {
HoodieRecord<T> dataRecord = dataFileIterator.next();
HoodieRecord<T> skeletonRecord = skeletonIterator.next();
HoodieRecord<T> ret = dataRecord.prependMetaFields(readerSchema, readerSchema,
new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD))
.setCommitSeqno(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
.setRecordKey(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD))
.setPartitionPath(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.setFileName(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.FILENAME_METADATA_FIELD)), null);
if (partitionFields.isPresent()) {
for (int i = 0; i < partitionValues.length; i++) {
int position = readerSchema.getField(partitionFields.get()[i]).pos();
setPartitionField(position, partitionValues[i], ret.getData());
}
}
return ret;
protected void setPartitionPathField(int position, Object fieldValue, T row) {
setPartitionField(position, fieldValue, row);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.storage;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;

import org.apache.avro.Schema;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

public abstract class HoodieBootstrapRecordIterator<T> implements ClosableIterator<HoodieRecord<T>> {

protected ClosableIterator<HoodieRecord<T>> skeletonIterator;
protected ClosableIterator<HoodieRecord<T>> dataFileIterator;
private final Option<String[]> partitionFields;
private final Object[] partitionValues;

protected Schema schema;

public HoodieBootstrapRecordIterator(ClosableIterator<HoodieRecord<T>> skeletonIterator,
ClosableIterator<HoodieRecord<T>> dataFileIterator,
Schema schema,
Option<String[]> partitionFields,
Object[] partitionValues) {
this.skeletonIterator = skeletonIterator;
this.dataFileIterator = dataFileIterator;
this.schema = schema;
this.partitionFields = partitionFields;
this.partitionValues = partitionValues;
}

@Override
public void close() {

}

@Override
public boolean hasNext() {
checkState(skeletonIterator.hasNext() == dataFileIterator.hasNext());
return skeletonIterator.hasNext();
}

@Override
public HoodieRecord<T> next() {
HoodieRecord<T> dataRecord = dataFileIterator.next();
HoodieRecord<T> skeletonRecord = skeletonIterator.next();
HoodieRecord<T> ret = dataRecord.prependMetaFields(schema, schema,
new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_TIME_METADATA_FIELD))
.setCommitSeqno(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
.setRecordKey(skeletonRecord.getRecordKey(schema, HoodieRecord.RECORD_KEY_METADATA_FIELD))
.setPartitionPath(skeletonRecord.getRecordKey(schema, HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.setFileName(skeletonRecord.getRecordKey(schema, HoodieRecord.FILENAME_METADATA_FIELD)), null);
if (partitionFields.isPresent()) {
for (int i = 0; i < partitionValues.length; i++) {
int position = schema.getField(partitionFields.get()[i]).pos();
setPartitionPathField(position, partitionValues[i], ret.getData());
}
}
return ret;
}

protected abstract void setPartitionPathField(int position, Object fieldValue, T row);
}
Loading

0 comments on commit efc30f6

Please sign in to comment.