Skip to content

Commit

Permalink
[HUDI-3983] Flink engine support for comprehensive schema evolution(R…
Browse files Browse the repository at this point in the history
…FC-33)
  • Loading branch information
trushev committed Jul 10, 2022
1 parent 63f95ab commit c71cb55
Show file tree
Hide file tree
Showing 18 changed files with 1,170 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
internalSchema.setSchemaId(Long.parseLong(instantTime));
Expand Down Expand Up @@ -1667,16 +1667,14 @@ public void reOrderColPosition(String colName, String referColName, TableChange.
private Pair<InternalSchema, HoodieTableMetaClient> getInternalSchemaAndMetaClient() {
HoodieTableMetaClient metaClient = createMetaClient(true);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (!internalSchemaOption.isPresent()) {
throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath()));
}
return Pair.of(internalSchemaOption.get(), metaClient);
InternalSchema internalSchema = getInternalSchema(schemaUtil);
return Pair.of(internalSchema, metaClient);
}

private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
InternalSchema oldSchema = getInternalSchema(schemaUtil);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse(SerDeHelper.inheritSchemas(oldSchema, ""));
Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName());
String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType());
String instantTime = HoodieActiveTimeline.createNewInstantTime();
Expand All @@ -1692,10 +1690,20 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
Map<String, String> extraMeta = new HashMap<>();
extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(newSchema.setSchemaId(Long.getLong(instantTime))));
extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(newSchema.setSchemaId(Long.parseLong(instantTime))));
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
}

private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
return schemaUtil.getTableInternalSchemaFromCommitMetadata().orElseGet(() -> {
try {
return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema());
} catch (Exception e) {
throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath()));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,20 @@

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -36,13 +46,18 @@
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Helper to read records from previous version of base file and run Merge.
Expand Down Expand Up @@ -130,4 +145,48 @@ protected Void getResult() {
return null;
}
}

protected Iterator<GenericRecord> getRecordIterator(
HoodieTable<T, ?, ?, ?> table,
HoodieMergeHandle<T, ?, ?, ?> mergeHandle,
HoodieBaseFile baseFile,
HoodieFileReader<GenericRecord> reader,
Schema readSchema) throws IOException {
Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
if (!querySchemaOpt.isPresent()) {
querySchemaOpt = new TableSchemaResolver(table.getMetaClient()).getTableInternalSchemaFromCommitMetadata();
}
boolean needToReWriteRecord = false;
Map<String, String> renameCols = new HashMap<>();
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
InternalSchema writeInternalSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
if (writeInternalSchema.isEmptySchema()) {
throw new HoodieException(String.format("cannot find file schema for current commit %s", commitInstantTime));
}
List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
List<String> colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName();
List<String> sameCols = colNamesFromWriteSchema.stream()
.filter(f -> colNamesFromQuerySchema.contains(f)
&& writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f)
&& writeInternalSchema.findIdByName(f) != -1
&& writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
readSchema = AvroInternalSchemaConverter
.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
if (needToReWriteRecord) {
renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
}
}
if (needToReWriteRecord) {
return HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
} else {
return reader.getRecordIterator(readSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,14 @@

package org.apache.hudi.table.action.commit;

import org.apache.avro.SchemaCompatibility;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -52,10 +41,6 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
BaseMergeHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
Expand Down Expand Up @@ -94,45 +79,12 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());

Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
boolean needToReWriteRecord = false;
Map<String, String> renameCols = new HashMap<>();
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
long commitInstantTime = Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
InternalSchema writeInternalSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
if (writeInternalSchema.isEmptySchema()) {
throw new HoodieException(String.format("cannot find file schema for current commit %s", commitInstantTime));
}
List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
List<String> colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName();
List<String> sameCols = colNamesFromWriteSchema.stream()
.filter(f -> colNamesFromQuerySchema.contains(f)
&& writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f)
&& writeInternalSchema.findIdByName(f) != -1
&& writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
readSchema = AvroInternalSchemaConverter
.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
if (needToReWriteRecord) {
renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
}
}

try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
if (needToReWriteRecord) {
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
} else {
readerIterator = reader.getRecordIterator(readSchema);
}
readerIterator = getRecordIterator(table, mergeHandle, baseFile, reader, readSchema);
}

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
readerIterator = reader.getRecordIterator(readSchema);
readerIterator = getRecordIterator(table, mergeHandle, baseFile, reader, readSchema);
}

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -37,8 +38,8 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade

private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize,
LogRecordScannerCallback callback, Option<InstantRange> instantRange) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false);
LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema);
this.callback = callback;
}

Expand Down Expand Up @@ -84,6 +85,7 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder {
private Option<InstantRange> instantRange = Option.empty();
// specific configurations
private LogRecordScannerCallback callback;
private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();

public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
Expand Down Expand Up @@ -135,10 +137,15 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
return this;
}

public Builder withInternalSchema(InternalSchema internalSchema) {
this.internalSchema = internalSchema;
return this;
}

@Override
public HoodieUnMergedLogRecordScanner build() {
return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange);
latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange, internalSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
Expand Down Expand Up @@ -96,6 +97,12 @@ private FlinkOptions() {
+ "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n"
+ " default false to have UPSERT semantics");

public static final ConfigOption<Boolean> SCHEMA_EVOLUTION_ENABLED = ConfigOptions
.key(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key())
.booleanType()
.defaultValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue())
.withDescription(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.doc());

// ------------------------------------------------------------------------
// Metadata table Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -197,7 +198,8 @@ protected void loadRecords(String partitionPath) throws Exception {

if (latestCommitTime.isPresent()) {
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
TableSchemaResolver schemaResolver = new TableSchemaResolver(this.hoodieTable.getMetaClient());
Schema schema = schemaResolver.getTableAvroSchema();

List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
Expand Down Expand Up @@ -229,7 +231,9 @@ protected void loadRecords(String partitionPath) throws Exception {
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
InternalSchema internalSchema = schemaResolver.getTableInternalSchemaFromCommitMetadata()
.orElse(InternalSchema.getEmptyInternalSchema());
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, internalSchema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
this.requiredPos,
this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
this.conf,
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
Expand Down
Loading

0 comments on commit c71cb55

Please sign in to comment.