From 07da7c60b95710b6e78653bfeff930b4d2db8103 Mon Sep 17 00:00:00 2001 From: z00484332 Date: Tue, 18 Oct 2022 16:42:42 +0800 Subject: [PATCH 1/9] [HUDI-5000] Support schema evolution for Hive --- .../hudi/hadoop/HoodieParquetInputFormat.java | 3 + .../hudi/hadoop/SchemaEvolutionContext.java | 344 ++++++++++++++++++ .../AbstractRealtimeRecordReader.java | 29 +- .../RealtimeCompactedRecordReader.java | 2 + packaging/hudi-hadoop-mr-bundle/pom.xml | 1 + 5 files changed, 373 insertions(+), 6 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 7b79f61e49bc..c168924e65fe 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -76,6 +76,9 @@ public RecordReader getRecordReader(final InputSpli return createBootstrappingRecordReader(split, job, reporter); } + // adapt schema evolution + new SchemaEvolutionContext(split, job).doEvolutionForParquetFormat(); + if (LOG.isDebugEnabled()) { LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java new file mode 100644 index 000000000000..66013789c741 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.TablePathUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import java.io.IOException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time for hive. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public class SchemaEvolutionContext { + + private static final Logger LOG = LogManager.getLogger(SchemaEvolutionContext.class); + + private static final String HIVE_TMP_READ_COLUMN_NAMES_CONF_STR = "hive.tmp.io.file.readcolumn.ids"; + private static final String HIVE_TMP_COLUMNS = "hive.tmp.columns"; + private static final String HIVE_EVOLUTION_ENABLE = "hudi.hive.schema.evolution"; + + private final InputSplit split; + private final JobConf job; + private HoodieTableMetaClient metaClient; + public Option internalSchemaOption; + + public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException { + this(split, job, Option.empty()); + } + + public SchemaEvolutionContext(InputSplit split, JobConf job, Option metaClientOption) throws IOException { + this.split = split; + this.job = job; + this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); + try { + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + } catch (Exception e) { + internalSchemaOption = Option.empty(); + LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePathV2()), e); + } + LOG.info(String.format("finish init schema evolution for split: %s", split)); + } + + private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { + Path inputPath = ((FileSplit)split).getPath(); + FileSystem fs = inputPath.getFileSystem(job); + Option tablePath = TablePathUtils.getTablePath(fs, inputPath); + return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()).setConf(job).build(); + } + + /** + * Do schema evolution for RealtimeInputFormat. + * + * @param realtimeRecordReader recordReader for RealtimeInputFormat. + * @return + */ + public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realtimeRecordReader) throws Exception { + if (!(split instanceof RealtimeSplit)) { + LOG.warn(String.format("expect realtime split for mor table, but find other type split %s", split)); + return; + } + if (internalSchemaOption.isPresent()) { + Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + List requiredColumns = getRequireColumn(job); + InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), + requiredColumns); + // Add partitioning fields to writer schema for resulting row to contain null values for these fields + String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); + List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) + : new ArrayList<>(); + Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName()); + writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); + Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); + // we should get HoodieParquetInputFormat#HIVE_TMP_COLUMNS,since serdeConstants#LIST_COLUMNS maybe change by HoodieParquetInputFormat#setColumnNameList + Schema hiveSchema = realtimeRecordReader.constructHiveOrderedSchema(writerSchema, schemaFieldsMap, job.get(HIVE_TMP_COLUMNS)); + Schema readerSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName()); + // setUp evolution schema + realtimeRecordReader.setWriterSchema(writerSchema); + realtimeRecordReader.setReaderSchema(readerSchema); + realtimeRecordReader.setHiveSchema(hiveSchema); + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", + realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns)); + } + } + + /** + * Do schema evolution for ParquetFormat. + */ + public void doEvolutionForParquetFormat() { + if (internalSchemaOption.isPresent()) { + // reading hoodie schema evolution table + job.setBoolean(HIVE_EVOLUTION_ENABLE, true); + Path finalPath = ((FileSplit)split).getPath(); + InternalSchema prunedSchema; + List requiredColumns = getRequireColumn(job); + // No need trigger schema evolution for count(*)/count(1) operation + boolean disableSchemaEvolution = + requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); + if (!disableSchemaEvolution) { + prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); + InternalSchema querySchema = prunedSchema; + Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName())); + InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); + InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, + true).mergeSchema(); + List fields = mergedInternalSchema.columns(); + setColumnNameList(job, fields); + setColumnTypeList(job, fields); + pushDownFilter(job, querySchema, fileSchema); + } + } + } + + public void setColumnTypeList(JobConf job, List fields) { + List fullTypeInfos = TypeInfoUtils.getTypeInfosFromTypeString(job.get(serdeConstants.LIST_COLUMN_TYPES)); + List tmpColIdList = Arrays.stream(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")) + .map(Integer::parseInt).collect(Collectors.toList()); + if (tmpColIdList.size() != fields.size()) { + throw new HoodieException(String.format("The size of hive.io.file.readcolumn.ids: %s is not equal to projection columns: %s", + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), fields.stream().map(Types.Field::name).collect(Collectors.joining(",")))); + } + List fieldTypes = new ArrayList<>(); + for (int i = 0; i < tmpColIdList.size(); i++) { + Types.Field field = fields.get(i); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfosFromTypeString(fullTypeInfos.get(tmpColIdList.get(i)).getQualifiedName()).get(0); + TypeInfo fieldType = constructHiveSchemaFromType(field.type(), typeInfo); + fieldTypes.add(fieldType); + } + for (int i = 0; i < tmpColIdList.size(); i++) { + TypeInfo typeInfo = fieldTypes.get(i); + if (!(typeInfo instanceof PrimitiveTypeInfo)) { + int index = tmpColIdList.get(i); + fullTypeInfos.remove(index); + fullTypeInfos.add(index, typeInfo); + } + } + List fullColTypeList = TypeInfoUtils.getTypeStringsFromTypeInfo(fullTypeInfos); + String fullColTypeListString = String.join(",", fullColTypeList); + job.set(serdeConstants.LIST_COLUMN_TYPES, fullColTypeListString); + } + + private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { + switch (type.typeId()) { + case RECORD: + Types.RecordType record = (Types.RecordType)type; + List fields = record.fields(); + ArrayList fieldTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); + for (int index = 0; index < fields.size(); index++) { + StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo; + TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type()); + fieldTypes.add(subTypeInfo); + String name = fields.get(index).name(); + fieldNames.add(name); + } + StructTypeInfo structTypeInfo = new StructTypeInfo(); + structTypeInfo.setAllStructFieldNames(fieldNames); + structTypeInfo.setAllStructFieldTypeInfos(fieldTypes); + return structTypeInfo; + case ARRAY: + ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo; + Types.ArrayType array = (Types.ArrayType) type; + TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType()); + listTypeInfo.setListElementTypeInfo(subTypeInfo); + return listTypeInfo; + case MAP: + Types.MapType map = (Types.MapType)type; + MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo; + TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType()); + TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType()); + MapTypeInfo mapType = new MapTypeInfo(); + mapType.setMapKeyTypeInfo(keyType); + mapType.setMapValueTypeInfo(valueType); + return mapType; + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case TIMESTAMP: + case STRING: + case UUID: + case FIXED: + case BINARY: + case DECIMAL: + return typeInfo; + case TIME: + throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type })); + default: + LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); + throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); + } + } + + private TypeInfo getSchemaSubTypeInfo(TypeInfo hoodieTypeInfo, Type hiveType) { + TypeInfo subTypeInfo = TypeInfoUtils.getTypeInfosFromTypeString(hoodieTypeInfo.getQualifiedName()).get(0); + TypeInfo typeInfo; + if (subTypeInfo instanceof PrimitiveTypeInfo) { + typeInfo = subTypeInfo; + } else { + typeInfo = constructHiveSchemaFromType(hiveType, subTypeInfo); + } + return typeInfo; + } + + private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSchema fileSchema) { + String filterExprSerialized = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized != null) { + ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized); + LinkedList exprNodes = new LinkedList(); + exprNodes.add(filterExpr); + while (!exprNodes.isEmpty()) { + int size = exprNodes.size(); + for (int i = 0; i < size; i++) { + ExprNodeDesc expr = exprNodes.poll(); + if (expr instanceof ExprNodeColumnDesc) { + String oldColumn = ((ExprNodeColumnDesc)expr).getColumn(); + String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema); + ((ExprNodeColumnDesc)expr).setColumn(newColumn); + } + List children = expr.getChildren(); + if (children != null) { + exprNodes.addAll(children); + } + } + } + String filterText = filterExpr.getExprString(); + String serializedFilterExpr = SerializationUtilities.serializeExpression(filterExpr); + if (LOG.isDebugEnabled()) { + LOG.debug("Pushdown initiated with filterText = " + filterText + ", filterExpr = " + + filterExpr + ", serializedFilterExpr = " + serializedFilterExpr); + } + job.set(TableScanDesc.FILTER_TEXT_CONF_STR, filterText); + job.set(TableScanDesc.FILTER_EXPR_CONF_STR, serializedFilterExpr); + } + } + + private void setColumnNameList(JobConf job, List fields) { + if (fields == null) { + return; + } + List tmpColIdList = Arrays.asList(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")); + if (fields.size() != tmpColIdList.size()) { + return; + } + StringBuilder readColumnNames = new StringBuilder(); + List tmpColNameList = Arrays.asList(job.get(serdeConstants.LIST_COLUMNS).split(",")); + List fullColNamelist = new ArrayList(tmpColNameList); + for (int index = 0; index < fields.size(); index++) { + String colName = fields.get(index).name(); + if (readColumnNames.length() > 0) { + readColumnNames.append(','); + } + readColumnNames.append(colName); + int id = Integer.parseInt(tmpColIdList.get(index)); + if (!colName.equals(fullColNamelist.get(id))) { + fullColNamelist.remove(id); + fullColNamelist.add(id, colName); + } + } + String readColumnNamesString = readColumnNames.toString(); + job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColumnNamesString); + String fullColNamelistString = String.join(",", fullColNamelist); + job.set(serdeConstants.LIST_COLUMNS, fullColNamelistString); + } + + public static List getRequireColumn(JobConf jobConf) { + String originColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); + if (StringUtils.isNullOrEmpty(originColumnString)) { + jobConf.set(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR, jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + } + String hoodieFullColumnString = jobConf.get(HIVE_TMP_COLUMNS); + if (StringUtils.isNullOrEmpty(hoodieFullColumnString)) { + jobConf.set(HIVE_TMP_COLUMNS, jobConf.get(serdeConstants.LIST_COLUMNS)); + } + String tableColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); + List tableColumns = Arrays.asList(tableColumnString.split(",")); + return new ArrayList<>(tableColumns); + } +} \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index dfdda9dfc825..47da956578f7 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.SchemaEvolutionContext; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.avro.Schema; @@ -55,6 +57,7 @@ public abstract class AbstractRealtimeRecordReader { private Schema writerSchema; private Schema hiveSchema; private HoodieTableMetaClient metaClient; + protected SchemaEvolutionContext schemaEvolutionContext; public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { this.split = split; @@ -69,7 +72,12 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { } this.usesCustomPayload = usesCustomPayload(metaClient); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); - init(); + schemaEvolutionContext = new SchemaEvolutionContext(split, job, Option.of(metaClient)); + if (schemaEvolutionContext.internalSchemaOption.isPresent()) { + schemaEvolutionContext.doEvolutionForRealtimeInputFormat(this); + } else { + init(); + } } catch (Exception e) { throw new HoodieException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); } @@ -99,7 +107,7 @@ private void init() throws Exception { jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields); Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); - hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap); + hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS)); // TODO(vc): In the future, the reader schema should be updated based on log files & be able // to null out fields not present before @@ -108,10 +116,7 @@ private void init() throws Exception { split.getDeltaLogPaths(), split.getPath(), projectionFields)); } - private Schema constructHiveOrderedSchema(Schema writerSchema, Map schemaFieldsMap) { - // Get all column names of hive table - String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS); - LOG.info("Hive Columns : " + hiveColumnString); + public Schema constructHiveOrderedSchema(Schema writerSchema, Map schemaFieldsMap, String hiveColumnString) { String[] hiveColumns = hiveColumnString.split(","); LOG.info("Hive Columns : " + hiveColumnString); List hiveSchemaFields = new ArrayList<>(); @@ -154,4 +159,16 @@ public RealtimeSplit getSplit() { public JobConf getJobConf() { return jobConf; } + + public void setReaderSchema(Schema readerSchema) { + this.readerSchema = readerSchema; + } + + public void setWriterSchema(Schema writerSchema) { + this.writerSchema = writerSchema; + } + + public void setHiveSchema(Schema hiveSchema) { + this.hiveSchema = hiveSchema; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index b1bd3df50f3b..c0e783536283 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -35,6 +35,7 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -92,6 +93,7 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept .withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) .withUseScanV2(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2, false)) + .withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())) .build(); } diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml index 340152d19099..92859002d7a2 100644 --- a/packaging/hudi-hadoop-mr-bundle/pom.xml +++ b/packaging/hudi-hadoop-mr-bundle/pom.xml @@ -88,6 +88,7 @@ com.yammer.metrics:metrics-core commons-io:commons-io org.openjdk.jol:jol-core + com.github.ben-manes.caffeine:caffeine From 8faa0e37b8f43c6f462f35a116c0c12f6f13cf54 Mon Sep 17 00:00:00 2001 From: z00484332 Date: Wed, 19 Oct 2022 19:06:59 +0800 Subject: [PATCH 2/9] [HUDI-5000] Support schema evolution for Hive/presto --- .../hudi/hadoop/SchemaEvolutionContext.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index 66013789c741..61953485cfb5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -93,6 +93,10 @@ public SchemaEvolutionContext(InputSplit split, JobConf job, Option tablePath = TablePathUtils.getTablePath(fs, inputPath); - return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()).setConf(job).build(); + try { + Path inputPath = ((FileSplit)split).getPath(); + FileSystem fs = inputPath.getFileSystem(job); + Option tablePath = TablePathUtils.getTablePath(fs, inputPath); + return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()).setConf(job).build(); + } catch (Exception e) { + LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e); + return null; + } } /** From 6287b2c48eea93e662c0f40f5955fdf3cec5691d Mon Sep 17 00:00:00 2001 From: z00484332 Date: Sat, 22 Oct 2022 00:34:13 +0800 Subject: [PATCH 3/9] [HUDI-5000] Support schema evolution for Hive/presto --- .../TestHiveTableSchemaEvolution.java | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java new file mode 100644 index 000000000000..4264868ba111 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hudi.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.SchemaEvolutionContext; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.Date; + +@Tag("functional") +public class TestHiveTableSchemaEvolution { + + private SparkSession sparkSession = null; + + @BeforeEach + public void setUp() { + initSparkContexts("HiveSchemaEvolution"); + } + + private void initSparkContexts(String appName) { + SparkConf sparkConf = new SparkConf(); + if (HoodieSparkUtils.gteqSpark3_2()) { + sparkConf.set("spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.hudi.catalog.HoodieCatalog"); + } + sparkSession = SparkSession.builder().appName(appName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .withExtensions(new HoodieSparkSessionExtension()) + .config("hoodie.insert.shuffle.parallelism", "4") + .config("hoodie.upsert.shuffle.parallelism", "4") + .config("hoodie.delete.shuffle.parallelism", "4") + .config("hoodie.support.write.lock", "false") + .config("spark.sql.session.timeZone", "CTT") + .config("spark.sql.hive.convertMetastoreParquet", "false") + .config(sparkConf) + .master("local[1]").getOrCreate(); + sparkSession.sparkContext().setLogLevel("ERROR"); + } + + @Test + public void testCopyOnWriteTableForHive() throws Exception { + String tableName = "huditest" + new Date().getTime(); + File file = new File(System.getProperty("java.io.tmpdir") + tableName); + if (HoodieSparkUtils.gteqSpark3_1()) { + // sparkSession.sql("drop table if exists " + tableName); + sparkSession.sql("set hoodie.schema.on.read.enable=true"); + String path = new Path(file.getCanonicalPath()).toUri().toString(); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); + sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); + sparkSession.sql("alter table " + tableName + " alter column col1 type double"); + sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); + + HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); + JobConf jobConf = new JobConf(); + inputFormat.setConf(jobConf); + FileInputFormat.setInputPaths(jobConf, path); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEvolutionResult("cow", splits[0], jobConf); + } + } + + private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception { + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa"); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); + jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno," + + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa"); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string"); + + SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf); + if ("cow".equals(tableType)) { + schemaEvolutionContext.doEvolutionForParquetFormat(); + } else { + // mot table + } + + assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2"); + assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno," + + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2"); + assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string"); + } +} From 11d8108e89bc1de462978acbaee3905f9cb9edba Mon Sep 17 00:00:00 2001 From: z00484332 Date: Sat, 22 Oct 2022 10:53:36 +0800 Subject: [PATCH 4/9] [HUDI-5000] Support schema evolution for Hive/presto --- .../TestHiveTableSchemaEvolution.java | 52 +++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index 4264868ba111..e782b65bd7d9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -17,23 +17,33 @@ */ package org.apache.hudi.functional; -import static org.junit.jupiter.api.Assertions.assertEquals; - import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.SchemaEvolutionContext; +import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.hudi.HoodieSparkSessionExtension; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; import java.io.File; import java.util.Date; @@ -73,7 +83,6 @@ public void testCopyOnWriteTableForHive() throws Exception { String tableName = "huditest" + new Date().getTime(); File file = new File(System.getProperty("java.io.tmpdir") + tableName); if (HoodieSparkUtils.gteqSpark3_1()) { - // sparkSession.sql("drop table if exists " + tableName); sparkSession.sql("set hoodie.schema.on.read.enable=true"); String path = new Path(file.getCanonicalPath()).toUri().toString(); sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); @@ -90,6 +99,28 @@ public void testCopyOnWriteTableForHive() throws Exception { } } + @Test + public void testMergeOnReadTableForHive() throws Exception { + String tableName = "huditest" + new Date().getTime(); + File file = new File(System.getProperty("java.io.tmpdir") + tableName); + if (HoodieSparkUtils.gteqSpark3_1()) { + sparkSession.sql("set hoodie.schema.on.read.enable=true"); + String path = new Path(file.getCanonicalPath()).toUri().toString(); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); + sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); + sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')"); + sparkSession.sql("alter table " + tableName + " alter column col1 type double"); + sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); + + HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat(); + JobConf jobConf = new JobConf(); + inputFormat.setConf(jobConf); + FileInputFormat.setInputPaths(jobConf, path); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEvolutionResult("mor", splits[0], jobConf); + } + } + private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception { jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa"); jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); @@ -102,6 +133,19 @@ private void assertEvolutionResult(String tableType, InputSplit split, JobConf j schemaEvolutionContext.doEvolutionForParquetFormat(); } else { // mot table + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + RecordReader recordReader; + // for log only split, set the parquet reader as empty. + if (FSUtils.isLogFile(realtimeSplit.getPath())) { + recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); + } else { + // create a RecordReader to be used by HoodieRealtimeRecordReader + recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null); + } + RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader); + // mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat + schemaEvolutionContext.doEvolutionForParquetFormat(); + schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader); } assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2"); @@ -109,4 +153,4 @@ private void assertEvolutionResult(String tableType, InputSplit split, JobConf j + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2"); assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string"); } -} +} \ No newline at end of file From e47fda7bf01414a4258b0dc54818a2e30b62ba3c Mon Sep 17 00:00:00 2001 From: z00484332 Date: Mon, 24 Oct 2022 15:18:56 +0800 Subject: [PATCH 5/9] [HUDI-5000] Support schema evolution for Hive/presto --- .../hudi/hadoop/realtime/RealtimeCompactedRecordReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index c0e783536283..8e1f8530b474 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -45,7 +45,7 @@ import java.util.Map; import java.util.Set; -class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader +public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class); From edb0ca12b2889c8617102bb0502b3c747292cdf5 Mon Sep 17 00:00:00 2001 From: z00484332 Date: Mon, 24 Oct 2022 22:28:31 +0800 Subject: [PATCH 6/9] [HUDI-5000] Support schema evolution for Hive/presto --- .../apache/hudi/functional/TestHiveTableSchemaEvolution.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index e782b65bd7d9..e2be688c928e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -21,8 +21,6 @@ import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -134,7 +132,7 @@ private void assertEvolutionResult(String tableType, InputSplit split, JobConf j } else { // mot table RealtimeSplit realtimeSplit = (RealtimeSplit) split; - RecordReader recordReader; + RecordReader recordReader; // for log only split, set the parquet reader as empty. if (FSUtils.isLogFile(realtimeSplit.getPath())) { recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); From ce351e5cffcc7a9bcbdee2ea07bd07390f70b93c Mon Sep 17 00:00:00 2001 From: z00484332 Date: Tue, 25 Oct 2022 19:07:36 +0800 Subject: [PATCH 7/9] [HUDI-5000] Support schema evolution for Hive/presto --- packaging/hudi-presto-bundle/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/packaging/hudi-presto-bundle/pom.xml b/packaging/hudi-presto-bundle/pom.xml index e29f6b709ef9..709029336a8c 100644 --- a/packaging/hudi-presto-bundle/pom.xml +++ b/packaging/hudi-presto-bundle/pom.xml @@ -71,6 +71,7 @@ org.apache.parquet:parquet-avro org.apache.avro:avro + com.github.ben-manes.caffeine:caffeine org.codehaus.jackson:* org.apache.commons:commons-lang3 org.apache.hbase:hbase-common From d76cb8da4634364b540faf91b66c2fc9644476c4 Mon Sep 17 00:00:00 2001 From: z00484332 Date: Fri, 28 Oct 2022 14:14:40 +0800 Subject: [PATCH 8/9] [HUDI-5000] Support schema evolution for Hive/presto --- .../hudi/hadoop/SchemaEvolutionContext.java | 502 +++++++++--------- .../TestHiveTableSchemaEvolution.java | 180 +++---- 2 files changed, 341 insertions(+), 341 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index 61953485cfb5..59184d14b350 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -74,280 +74,280 @@ */ public class SchemaEvolutionContext { - private static final Logger LOG = LogManager.getLogger(SchemaEvolutionContext.class); + private static final Logger LOG = LogManager.getLogger(SchemaEvolutionContext.class); - private static final String HIVE_TMP_READ_COLUMN_NAMES_CONF_STR = "hive.tmp.io.file.readcolumn.ids"; - private static final String HIVE_TMP_COLUMNS = "hive.tmp.columns"; - private static final String HIVE_EVOLUTION_ENABLE = "hudi.hive.schema.evolution"; + private static final String HIVE_TMP_READ_COLUMN_NAMES_CONF_STR = "hive.tmp.io.file.readcolumn.ids"; + private static final String HIVE_TMP_COLUMNS = "hive.tmp.columns"; + private static final String HIVE_EVOLUTION_ENABLE = "hudi.hive.schema.evolution"; - private final InputSplit split; - private final JobConf job; - private HoodieTableMetaClient metaClient; - public Option internalSchemaOption; + private final InputSplit split; + private final JobConf job; + private HoodieTableMetaClient metaClient; + public Option internalSchemaOption; - public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException { - this(split, job, Option.empty()); - } + public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException { + this(split, job, Option.empty()); + } - public SchemaEvolutionContext(InputSplit split, JobConf job, Option metaClientOption) throws IOException { - this.split = split; - this.job = job; - this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); - if (this.metaClient == null) { - internalSchemaOption = Option.empty(); - return; - } - try { - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); - } catch (Exception e) { - internalSchemaOption = Option.empty(); - LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePathV2()), e); - } - LOG.info(String.format("finish init schema evolution for split: %s", split)); + public SchemaEvolutionContext(InputSplit split, JobConf job, Option metaClientOption) throws IOException { + this.split = split; + this.job = job; + this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); + if (this.metaClient == null) { + internalSchemaOption = Option.empty(); + return; } - - private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { - try { - Path inputPath = ((FileSplit)split).getPath(); - FileSystem fs = inputPath.getFileSystem(job); - Option tablePath = TablePathUtils.getTablePath(fs, inputPath); - return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()).setConf(job).build(); - } catch (Exception e) { - LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e); - return null; - } + try { + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + } catch (Exception e) { + internalSchemaOption = Option.empty(); + LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePathV2()), e); } + LOG.info(String.format("finish init schema evolution for split: %s", split)); + } - /** - * Do schema evolution for RealtimeInputFormat. - * - * @param realtimeRecordReader recordReader for RealtimeInputFormat. - * @return - */ - public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realtimeRecordReader) throws Exception { - if (!(split instanceof RealtimeSplit)) { - LOG.warn(String.format("expect realtime split for mor table, but find other type split %s", split)); - return; - } - if (internalSchemaOption.isPresent()) { - Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); - List requiredColumns = getRequireColumn(job); - InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), - requiredColumns); - // Add partitioning fields to writer schema for resulting row to contain null values for these fields - String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); - List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) - : new ArrayList<>(); - Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName()); - writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); - Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); - // we should get HoodieParquetInputFormat#HIVE_TMP_COLUMNS,since serdeConstants#LIST_COLUMNS maybe change by HoodieParquetInputFormat#setColumnNameList - Schema hiveSchema = realtimeRecordReader.constructHiveOrderedSchema(writerSchema, schemaFieldsMap, job.get(HIVE_TMP_COLUMNS)); - Schema readerSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName()); - // setUp evolution schema - realtimeRecordReader.setWriterSchema(writerSchema); - realtimeRecordReader.setReaderSchema(readerSchema); - realtimeRecordReader.setHiveSchema(hiveSchema); - RealtimeSplit realtimeSplit = (RealtimeSplit) split; - LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", - realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns)); - } + private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { + try { + Path inputPath = ((FileSplit)split).getPath(); + FileSystem fs = inputPath.getFileSystem(job); + Option tablePath = TablePathUtils.getTablePath(fs, inputPath); + return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()).setConf(job).build(); + } catch (Exception e) { + LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e); + return null; } + } - /** - * Do schema evolution for ParquetFormat. - */ - public void doEvolutionForParquetFormat() { - if (internalSchemaOption.isPresent()) { - // reading hoodie schema evolution table - job.setBoolean(HIVE_EVOLUTION_ENABLE, true); - Path finalPath = ((FileSplit)split).getPath(); - InternalSchema prunedSchema; - List requiredColumns = getRequireColumn(job); - // No need trigger schema evolution for count(*)/count(1) operation - boolean disableSchemaEvolution = - requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); - if (!disableSchemaEvolution) { - prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); - InternalSchema querySchema = prunedSchema; - Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName())); - InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); - InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, - true).mergeSchema(); - List fields = mergedInternalSchema.columns(); - setColumnNameList(job, fields); - setColumnTypeList(job, fields); - pushDownFilter(job, querySchema, fileSchema); - } - } + /** + * Do schema evolution for RealtimeInputFormat. + * + * @param realtimeRecordReader recordReader for RealtimeInputFormat. + * @return + */ + public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realtimeRecordReader) throws Exception { + if (!(split instanceof RealtimeSplit)) { + LOG.warn(String.format("expect realtime split for mor table, but find other type split %s", split)); + return; + } + if (internalSchemaOption.isPresent()) { + Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + List requiredColumns = getRequireColumn(job); + InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), + requiredColumns); + // Add partitioning fields to writer schema for resulting row to contain null values for these fields + String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); + List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) + : new ArrayList<>(); + Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName()); + writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); + Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); + // we should get HoodieParquetInputFormat#HIVE_TMP_COLUMNS,since serdeConstants#LIST_COLUMNS maybe change by HoodieParquetInputFormat#setColumnNameList + Schema hiveSchema = realtimeRecordReader.constructHiveOrderedSchema(writerSchema, schemaFieldsMap, job.get(HIVE_TMP_COLUMNS)); + Schema readerSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName()); + // setUp evolution schema + realtimeRecordReader.setWriterSchema(writerSchema); + realtimeRecordReader.setReaderSchema(readerSchema); + realtimeRecordReader.setHiveSchema(hiveSchema); + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", + realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns)); } + } - public void setColumnTypeList(JobConf job, List fields) { - List fullTypeInfos = TypeInfoUtils.getTypeInfosFromTypeString(job.get(serdeConstants.LIST_COLUMN_TYPES)); - List tmpColIdList = Arrays.stream(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")) - .map(Integer::parseInt).collect(Collectors.toList()); - if (tmpColIdList.size() != fields.size()) { - throw new HoodieException(String.format("The size of hive.io.file.readcolumn.ids: %s is not equal to projection columns: %s", - job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), fields.stream().map(Types.Field::name).collect(Collectors.joining(",")))); - } - List fieldTypes = new ArrayList<>(); - for (int i = 0; i < tmpColIdList.size(); i++) { - Types.Field field = fields.get(i); - TypeInfo typeInfo = TypeInfoUtils.getTypeInfosFromTypeString(fullTypeInfos.get(tmpColIdList.get(i)).getQualifiedName()).get(0); - TypeInfo fieldType = constructHiveSchemaFromType(field.type(), typeInfo); - fieldTypes.add(fieldType); - } - for (int i = 0; i < tmpColIdList.size(); i++) { - TypeInfo typeInfo = fieldTypes.get(i); - if (!(typeInfo instanceof PrimitiveTypeInfo)) { - int index = tmpColIdList.get(i); - fullTypeInfos.remove(index); - fullTypeInfos.add(index, typeInfo); - } - } - List fullColTypeList = TypeInfoUtils.getTypeStringsFromTypeInfo(fullTypeInfos); - String fullColTypeListString = String.join(",", fullColTypeList); - job.set(serdeConstants.LIST_COLUMN_TYPES, fullColTypeListString); + /** + * Do schema evolution for ParquetFormat. + */ + public void doEvolutionForParquetFormat() { + if (internalSchemaOption.isPresent()) { + // reading hoodie schema evolution table + job.setBoolean(HIVE_EVOLUTION_ENABLE, true); + Path finalPath = ((FileSplit)split).getPath(); + InternalSchema prunedSchema; + List requiredColumns = getRequireColumn(job); + // No need trigger schema evolution for count(*)/count(1) operation + boolean disableSchemaEvolution = + requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); + if (!disableSchemaEvolution) { + prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); + InternalSchema querySchema = prunedSchema; + Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName())); + InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); + InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, + true).mergeSchema(); + List fields = mergedInternalSchema.columns(); + setColumnNameList(job, fields); + setColumnTypeList(job, fields); + pushDownFilter(job, querySchema, fileSchema); + } } + } - private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { - switch (type.typeId()) { - case RECORD: - Types.RecordType record = (Types.RecordType)type; - List fields = record.fields(); - ArrayList fieldTypes = new ArrayList<>(); - ArrayList fieldNames = new ArrayList<>(); - for (int index = 0; index < fields.size(); index++) { - StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo; - TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type()); - fieldTypes.add(subTypeInfo); - String name = fields.get(index).name(); - fieldNames.add(name); - } - StructTypeInfo structTypeInfo = new StructTypeInfo(); - structTypeInfo.setAllStructFieldNames(fieldNames); - structTypeInfo.setAllStructFieldTypeInfos(fieldTypes); - return structTypeInfo; - case ARRAY: - ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo; - Types.ArrayType array = (Types.ArrayType) type; - TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType()); - listTypeInfo.setListElementTypeInfo(subTypeInfo); - return listTypeInfo; - case MAP: - Types.MapType map = (Types.MapType)type; - MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo; - TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType()); - TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType()); - MapTypeInfo mapType = new MapTypeInfo(); - mapType.setMapKeyTypeInfo(keyType); - mapType.setMapValueTypeInfo(valueType); - return mapType; - case BOOLEAN: - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case DATE: - case TIMESTAMP: - case STRING: - case UUID: - case FIXED: - case BINARY: - case DECIMAL: - return typeInfo; - case TIME: - throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type })); - default: - LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); - throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); - } + public void setColumnTypeList(JobConf job, List fields) { + List fullTypeInfos = TypeInfoUtils.getTypeInfosFromTypeString(job.get(serdeConstants.LIST_COLUMN_TYPES)); + List tmpColIdList = Arrays.stream(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")) + .map(Integer::parseInt).collect(Collectors.toList()); + if (tmpColIdList.size() != fields.size()) { + throw new HoodieException(String.format("The size of hive.io.file.readcolumn.ids: %s is not equal to projection columns: %s", + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), fields.stream().map(Types.Field::name).collect(Collectors.joining(",")))); + } + List fieldTypes = new ArrayList<>(); + for (int i = 0; i < tmpColIdList.size(); i++) { + Types.Field field = fields.get(i); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfosFromTypeString(fullTypeInfos.get(tmpColIdList.get(i)).getQualifiedName()).get(0); + TypeInfo fieldType = constructHiveSchemaFromType(field.type(), typeInfo); + fieldTypes.add(fieldType); } + for (int i = 0; i < tmpColIdList.size(); i++) { + TypeInfo typeInfo = fieldTypes.get(i); + if (!(typeInfo instanceof PrimitiveTypeInfo)) { + int index = tmpColIdList.get(i); + fullTypeInfos.remove(index); + fullTypeInfos.add(index, typeInfo); + } + } + List fullColTypeList = TypeInfoUtils.getTypeStringsFromTypeInfo(fullTypeInfos); + String fullColTypeListString = String.join(",", fullColTypeList); + job.set(serdeConstants.LIST_COLUMN_TYPES, fullColTypeListString); + } - private TypeInfo getSchemaSubTypeInfo(TypeInfo hoodieTypeInfo, Type hiveType) { - TypeInfo subTypeInfo = TypeInfoUtils.getTypeInfosFromTypeString(hoodieTypeInfo.getQualifiedName()).get(0); - TypeInfo typeInfo; - if (subTypeInfo instanceof PrimitiveTypeInfo) { - typeInfo = subTypeInfo; - } else { - typeInfo = constructHiveSchemaFromType(hiveType, subTypeInfo); + private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { + switch (type.typeId()) { + case RECORD: + Types.RecordType record = (Types.RecordType)type; + List fields = record.fields(); + ArrayList fieldTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); + for (int index = 0; index < fields.size(); index++) { + StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo; + TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type()); + fieldTypes.add(subTypeInfo); + String name = fields.get(index).name(); + fieldNames.add(name); } + StructTypeInfo structTypeInfo = new StructTypeInfo(); + structTypeInfo.setAllStructFieldNames(fieldNames); + structTypeInfo.setAllStructFieldTypeInfos(fieldTypes); + return structTypeInfo; + case ARRAY: + ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo; + Types.ArrayType array = (Types.ArrayType) type; + TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType()); + listTypeInfo.setListElementTypeInfo(subTypeInfo); + return listTypeInfo; + case MAP: + Types.MapType map = (Types.MapType)type; + MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo; + TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType()); + TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType()); + MapTypeInfo mapType = new MapTypeInfo(); + mapType.setMapKeyTypeInfo(keyType); + mapType.setMapValueTypeInfo(valueType); + return mapType; + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case TIMESTAMP: + case STRING: + case UUID: + case FIXED: + case BINARY: + case DECIMAL: return typeInfo; + case TIME: + throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type })); + default: + LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); + throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); } + } - private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSchema fileSchema) { - String filterExprSerialized = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filterExprSerialized != null) { - ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized); - LinkedList exprNodes = new LinkedList(); - exprNodes.add(filterExpr); - while (!exprNodes.isEmpty()) { - int size = exprNodes.size(); - for (int i = 0; i < size; i++) { - ExprNodeDesc expr = exprNodes.poll(); - if (expr instanceof ExprNodeColumnDesc) { - String oldColumn = ((ExprNodeColumnDesc)expr).getColumn(); - String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema); - ((ExprNodeColumnDesc)expr).setColumn(newColumn); - } - List children = expr.getChildren(); - if (children != null) { - exprNodes.addAll(children); - } - } - } - String filterText = filterExpr.getExprString(); - String serializedFilterExpr = SerializationUtilities.serializeExpression(filterExpr); - if (LOG.isDebugEnabled()) { - LOG.debug("Pushdown initiated with filterText = " + filterText + ", filterExpr = " - + filterExpr + ", serializedFilterExpr = " + serializedFilterExpr); - } - job.set(TableScanDesc.FILTER_TEXT_CONF_STR, filterText); - job.set(TableScanDesc.FILTER_EXPR_CONF_STR, serializedFilterExpr); - } + private TypeInfo getSchemaSubTypeInfo(TypeInfo hoodieTypeInfo, Type hiveType) { + TypeInfo subTypeInfo = TypeInfoUtils.getTypeInfosFromTypeString(hoodieTypeInfo.getQualifiedName()).get(0); + TypeInfo typeInfo; + if (subTypeInfo instanceof PrimitiveTypeInfo) { + typeInfo = subTypeInfo; + } else { + typeInfo = constructHiveSchemaFromType(hiveType, subTypeInfo); } + return typeInfo; + } - private void setColumnNameList(JobConf job, List fields) { - if (fields == null) { - return; - } - List tmpColIdList = Arrays.asList(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")); - if (fields.size() != tmpColIdList.size()) { - return; + private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSchema fileSchema) { + String filterExprSerialized = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized != null) { + ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized); + LinkedList exprNodes = new LinkedList(); + exprNodes.add(filterExpr); + while (!exprNodes.isEmpty()) { + int size = exprNodes.size(); + for (int i = 0; i < size; i++) { + ExprNodeDesc expr = exprNodes.poll(); + if (expr instanceof ExprNodeColumnDesc) { + String oldColumn = ((ExprNodeColumnDesc)expr).getColumn(); + String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema); + ((ExprNodeColumnDesc)expr).setColumn(newColumn); + } + List children = expr.getChildren(); + if (children != null) { + exprNodes.addAll(children); + } } - StringBuilder readColumnNames = new StringBuilder(); - List tmpColNameList = Arrays.asList(job.get(serdeConstants.LIST_COLUMNS).split(",")); - List fullColNamelist = new ArrayList(tmpColNameList); - for (int index = 0; index < fields.size(); index++) { - String colName = fields.get(index).name(); - if (readColumnNames.length() > 0) { - readColumnNames.append(','); - } - readColumnNames.append(colName); - int id = Integer.parseInt(tmpColIdList.get(index)); - if (!colName.equals(fullColNamelist.get(id))) { - fullColNamelist.remove(id); - fullColNamelist.add(id, colName); - } - } - String readColumnNamesString = readColumnNames.toString(); - job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColumnNamesString); - String fullColNamelistString = String.join(",", fullColNamelist); - job.set(serdeConstants.LIST_COLUMNS, fullColNamelistString); + } + String filterText = filterExpr.getExprString(); + String serializedFilterExpr = SerializationUtilities.serializeExpression(filterExpr); + if (LOG.isDebugEnabled()) { + LOG.debug("Pushdown initiated with filterText = " + filterText + ", filterExpr = " + + filterExpr + ", serializedFilterExpr = " + serializedFilterExpr); + } + job.set(TableScanDesc.FILTER_TEXT_CONF_STR, filterText); + job.set(TableScanDesc.FILTER_EXPR_CONF_STR, serializedFilterExpr); } + } - public static List getRequireColumn(JobConf jobConf) { - String originColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); - if (StringUtils.isNullOrEmpty(originColumnString)) { - jobConf.set(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR, jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); - } - String hoodieFullColumnString = jobConf.get(HIVE_TMP_COLUMNS); - if (StringUtils.isNullOrEmpty(hoodieFullColumnString)) { - jobConf.set(HIVE_TMP_COLUMNS, jobConf.get(serdeConstants.LIST_COLUMNS)); - } - String tableColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); - List tableColumns = Arrays.asList(tableColumnString.split(",")); - return new ArrayList<>(tableColumns); + private void setColumnNameList(JobConf job, List fields) { + if (fields == null) { + return; + } + List tmpColIdList = Arrays.asList(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")); + if (fields.size() != tmpColIdList.size()) { + return; + } + StringBuilder readColumnNames = new StringBuilder(); + List tmpColNameList = Arrays.asList(job.get(serdeConstants.LIST_COLUMNS).split(",")); + List fullColNamelist = new ArrayList(tmpColNameList); + for (int index = 0; index < fields.size(); index++) { + String colName = fields.get(index).name(); + if (readColumnNames.length() > 0) { + readColumnNames.append(','); + } + readColumnNames.append(colName); + int id = Integer.parseInt(tmpColIdList.get(index)); + if (!colName.equals(fullColNamelist.get(id))) { + fullColNamelist.remove(id); + fullColNamelist.add(id, colName); + } + } + String readColumnNamesString = readColumnNames.toString(); + job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColumnNamesString); + String fullColNamelistString = String.join(",", fullColNamelist); + job.set(serdeConstants.LIST_COLUMNS, fullColNamelistString); + } + + public static List getRequireColumn(JobConf jobConf) { + String originColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); + if (StringUtils.isNullOrEmpty(originColumnString)) { + jobConf.set(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR, jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + } + String hoodieFullColumnString = jobConf.get(HIVE_TMP_COLUMNS); + if (StringUtils.isNullOrEmpty(hoodieFullColumnString)) { + jobConf.set(HIVE_TMP_COLUMNS, jobConf.get(serdeConstants.LIST_COLUMNS)); } + String tableColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); + List tableColumns = Arrays.asList(tableColumnString.split(",")); + return new ArrayList<>(tableColumns); + } } \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index e2be688c928e..7e13eb54be02 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -49,106 +49,106 @@ @Tag("functional") public class TestHiveTableSchemaEvolution { - private SparkSession sparkSession = null; + private SparkSession sparkSession = null; - @BeforeEach - public void setUp() { - initSparkContexts("HiveSchemaEvolution"); - } + @BeforeEach + public void setUp() { + initSparkContexts("HiveSchemaEvolution"); + } - private void initSparkContexts(String appName) { - SparkConf sparkConf = new SparkConf(); - if (HoodieSparkUtils.gteqSpark3_2()) { - sparkConf.set("spark.sql.catalog.spark_catalog", - "org.apache.spark.sql.hudi.catalog.HoodieCatalog"); - } - sparkSession = SparkSession.builder().appName(appName) - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .withExtensions(new HoodieSparkSessionExtension()) - .config("hoodie.insert.shuffle.parallelism", "4") - .config("hoodie.upsert.shuffle.parallelism", "4") - .config("hoodie.delete.shuffle.parallelism", "4") - .config("hoodie.support.write.lock", "false") - .config("spark.sql.session.timeZone", "CTT") - .config("spark.sql.hive.convertMetastoreParquet", "false") - .config(sparkConf) - .master("local[1]").getOrCreate(); - sparkSession.sparkContext().setLogLevel("ERROR"); + private void initSparkContexts(String appName) { + SparkConf sparkConf = new SparkConf(); + if (HoodieSparkUtils.gteqSpark3_2()) { + sparkConf.set("spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.hudi.catalog.HoodieCatalog"); } + sparkSession = SparkSession.builder().appName(appName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .withExtensions(new HoodieSparkSessionExtension()) + .config("hoodie.insert.shuffle.parallelism", "4") + .config("hoodie.upsert.shuffle.parallelism", "4") + .config("hoodie.delete.shuffle.parallelism", "4") + .config("hoodie.support.write.lock", "false") + .config("spark.sql.session.timeZone", "CTT") + .config("spark.sql.hive.convertMetastoreParquet", "false") + .config(sparkConf) + .master("local[1]").getOrCreate(); + sparkSession.sparkContext().setLogLevel("ERROR"); + } - @Test - public void testCopyOnWriteTableForHive() throws Exception { - String tableName = "huditest" + new Date().getTime(); - File file = new File(System.getProperty("java.io.tmpdir") + tableName); - if (HoodieSparkUtils.gteqSpark3_1()) { - sparkSession.sql("set hoodie.schema.on.read.enable=true"); - String path = new Path(file.getCanonicalPath()).toUri().toString(); - sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); - sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); - sparkSession.sql("alter table " + tableName + " alter column col1 type double"); - sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); + @Test + public void testCopyOnWriteTableForHive() throws Exception { + String tableName = "huditest" + new Date().getTime(); + File file = new File(System.getProperty("java.io.tmpdir") + tableName); + if (HoodieSparkUtils.gteqSpark3_1()) { + sparkSession.sql("set hoodie.schema.on.read.enable=true"); + String path = new Path(file.getCanonicalPath()).toUri().toString(); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); + sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); + sparkSession.sql("alter table " + tableName + " alter column col1 type double"); + sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); - HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); - JobConf jobConf = new JobConf(); - inputFormat.setConf(jobConf); - FileInputFormat.setInputPaths(jobConf, path); - InputSplit[] splits = inputFormat.getSplits(jobConf, 1); - assertEvolutionResult("cow", splits[0], jobConf); - } + HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); + JobConf jobConf = new JobConf(); + inputFormat.setConf(jobConf); + FileInputFormat.setInputPaths(jobConf, path); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEvolutionResult("cow", splits[0], jobConf); } + } - @Test - public void testMergeOnReadTableForHive() throws Exception { - String tableName = "huditest" + new Date().getTime(); - File file = new File(System.getProperty("java.io.tmpdir") + tableName); - if (HoodieSparkUtils.gteqSpark3_1()) { - sparkSession.sql("set hoodie.schema.on.read.enable=true"); - String path = new Path(file.getCanonicalPath()).toUri().toString(); - sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); - sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); - sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')"); - sparkSession.sql("alter table " + tableName + " alter column col1 type double"); - sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); + @Test + public void testMergeOnReadTableForHive() throws Exception { + String tableName = "huditest" + new Date().getTime(); + File file = new File(System.getProperty("java.io.tmpdir") + tableName); + if (HoodieSparkUtils.gteqSpark3_1()) { + sparkSession.sql("set hoodie.schema.on.read.enable=true"); + String path = new Path(file.getCanonicalPath()).toUri().toString(); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); + sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); + sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')"); + sparkSession.sql("alter table " + tableName + " alter column col1 type double"); + sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); - HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat(); - JobConf jobConf = new JobConf(); - inputFormat.setConf(jobConf); - FileInputFormat.setInputPaths(jobConf, path); - InputSplit[] splits = inputFormat.getSplits(jobConf, 1); - assertEvolutionResult("mor", splits[0], jobConf); - } + HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat(); + JobConf jobConf = new JobConf(); + inputFormat.setConf(jobConf); + FileInputFormat.setInputPaths(jobConf, path); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEvolutionResult("mor", splits[0], jobConf); } + } - private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception { - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa"); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); - jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno," - + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa"); - jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string"); - - SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf); - if ("cow".equals(tableType)) { - schemaEvolutionContext.doEvolutionForParquetFormat(); - } else { - // mot table - RealtimeSplit realtimeSplit = (RealtimeSplit) split; - RecordReader recordReader; - // for log only split, set the parquet reader as empty. - if (FSUtils.isLogFile(realtimeSplit.getPath())) { - recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); - } else { - // create a RecordReader to be used by HoodieRealtimeRecordReader - recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null); - } - RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader); - // mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat - schemaEvolutionContext.doEvolutionForParquetFormat(); - schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader); - } + private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception { + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa"); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); + jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno," + + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa"); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string"); - assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2"); - assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno," - + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2"); - assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string"); + SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf); + if ("cow".equals(tableType)) { + schemaEvolutionContext.doEvolutionForParquetFormat(); + } else { + // mot table + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + RecordReader recordReader; + // for log only split, set the parquet reader as empty. + if (FSUtils.isLogFile(realtimeSplit.getPath())) { + recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); + } else { + // create a RecordReader to be used by HoodieRealtimeRecordReader + recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null); + } + RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader); + // mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat + schemaEvolutionContext.doEvolutionForParquetFormat(); + schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader); } + + assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2"); + assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno," + + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2"); + assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string"); + } } \ No newline at end of file From 552ad3ddcc716636f2a783125ff5c140c56e6326 Mon Sep 17 00:00:00 2001 From: z00484332 Date: Fri, 28 Oct 2022 14:48:25 +0800 Subject: [PATCH 9/9] [HUDI-5000] Support schema evolution for Hive/presto --- .../apache/hudi/functional/TestHiveTableSchemaEvolution.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index 7e13eb54be02..071b954a17f7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hudi.functional; import org.apache.hadoop.fs.Path; @@ -83,7 +84,7 @@ public void testCopyOnWriteTableForHive() throws Exception { if (HoodieSparkUtils.gteqSpark3_1()) { sparkSession.sql("set hoodie.schema.on.read.enable=true"); String path = new Path(file.getCanonicalPath()).toUri().toString(); - sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'"); sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); sparkSession.sql("alter table " + tableName + " alter column col1 type double"); sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); @@ -104,7 +105,7 @@ public void testMergeOnReadTableForHive() throws Exception { if (HoodieSparkUtils.gteqSpark3_1()) { sparkSession.sql("set hoodie.schema.on.read.enable=true"); String path = new Path(file.getCanonicalPath()).toUri().toString(); - sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path +"'"); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'"); sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')"); sparkSession.sql("alter table " + tableName + " alter column col1 type double");