Skip to content

Commit

Permalink
[HUDI-6804] Fix hive read schema evolution MOR table (#9573)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Sep 5, 2023
1 parent a3eea2f commit 31bc565
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class SchemaEvolutionContext {

private final InputSplit split;
private final JobConf job;
private HoodieTableMetaClient metaClient;
private final HoodieTableMetaClient metaClient;
public Option<InternalSchema> internalSchemaOption;

public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException {
Expand Down Expand Up @@ -149,6 +149,7 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
realtimeRecordReader.setWriterSchema(writerSchema);
realtimeRecordReader.setReaderSchema(readerSchema);
realtimeRecordReader.setHiveSchema(hiveSchema);
internalSchemaOption = Option.of(prunedInternalSchema);
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns));
Expand All @@ -171,7 +172,7 @@ public void doEvolutionForParquetFormat() {
if (!disableSchemaEvolution) {
prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns);
InternalSchema querySchema = prunedSchema;
Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
long commitTime = Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true,
true).mergeSchema();
Expand Down Expand Up @@ -258,10 +259,10 @@ private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
case DECIMAL:
return typeInfo;
case TIME:
throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type }));
throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", type));
default:
LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));
throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));
LOG.error(String.format("cannot convert unknown type: %s to Hive", type));
throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", type));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,46 @@
package org.apache.hudi.functional;

import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.SchemaEvolutionContext;
import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;

import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Tag("functional")
public class TestHiveTableSchemaEvolution {

private SparkSession sparkSession = null;
private SparkSession spark = null;

@TempDir
java.nio.file.Path basePath;
Expand All @@ -61,90 +68,98 @@ public void setUp() {
initSparkContexts("HiveSchemaEvolution");
}

@AfterEach
public void clean() {
if (spark != null) {
spark.close();
}
}

private void initSparkContexts(String appName) {
SparkConf sparkConf = getSparkConfForTest(appName);

sparkSession = SparkSession.builder()
spark = SparkSession.builder()
.config("hoodie.support.write.lock", "false")
.config("spark.sql.session.timeZone", "CTT")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config(sparkConf)
.getOrCreate();

sparkSession.sparkContext().setLogLevel("ERROR");
spark.sparkContext().setLogLevel("ERROR");
}

@Test
public void testCopyOnWriteTableForHive() throws Exception {
String tableName = "huditest" + new Date().getTime();
@ParameterizedTest
@ValueSource(strings = {"cow", "mor"})
public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception {
if (HoodieSparkUtils.gteqSpark3_1()) {
sparkSession.sql("set hoodie.schema.on.read.enable=true");
String tableName = "hudi_test" + new Date().getTime();
String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();
sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");
sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
sparkSession.sql("alter table " + tableName + " alter column col1 type double");
sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");

HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
spark.sql("set hoodie.schema.on.read.enable=true");
spark.sql(String.format("create table %s (col0 int, col1 float, col2 string) using hudi "
+ "tblproperties (type='%s', primaryKey='col0', preCombineField='col1') location '%s'",
tableName, tableType, path));
spark.sql(String.format("insert into %s values(1, 1.1, 'text')", tableName));
spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));
spark.sql(String.format("alter table %s alter column col1 type double", tableName));
spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName));

JobConf jobConf = new JobConf();
inputFormat.setConf(jobConf);
jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2_new");
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"
+ "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new");
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");
FileInputFormat.setInputPaths(jobConf, path);
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
assertEvolutionResult("cow", splits[0], jobConf);
}
}

@Test
public void testMergeOnReadTableForHive() throws Exception {
String tableName = "huditest" + new Date().getTime();
if (HoodieSparkUtils.gteqSpark3_1()) {
sparkSession.sql("set hoodie.schema.on.read.enable=true");
String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();
sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");
sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')");
sparkSession.sql("alter table " + tableName + " alter column col1 type double");
sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");

HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();
JobConf jobConf = new JobConf();
HoodieParquetInputFormat inputFormat = "cow".equals(tableType) ? new HoodieParquetInputFormat()
: new HoodieParquetRealtimeInputFormat();
inputFormat.setConf(jobConf);
FileInputFormat.setInputPaths(jobConf, path);
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
assertEvolutionResult("mor", splits[0], jobConf);
}
}

private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception {
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa");
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"
+ "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa");
jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");

SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf);
if ("cow".equals(tableType)) {
schemaEvolutionContext.doEvolutionForParquetFormat();
} else {
// mot table
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
RecordReader recordReader;
// for log only split, set the parquet reader as empty.
if (FSUtils.isLogFile(realtimeSplit.getPath())) {
recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
assertEquals(1, splits.length);

RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);
List<List<Writable>> records = getWritableList(recordReader);
assertEquals(1, records.size());
List<Writable> record1 = records.get(0);
if ("cow".equals(tableType)) {
// col1, col2_new
assertEquals(2, record1.size());

Writable c1 = record1.get(0);
assertTrue(c1 instanceof DoubleWritable);
assertEquals("1.1", c1.toString().substring(0, 3));

Writable c2 = record1.get(1);
assertTrue(c2 instanceof Text);
assertEquals("text2", c2.toString());
} else {
// create a RecordReader to be used by HoodieRealtimeRecordReader
recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null);
// _hoodie_record_key,_hoodie_commit_time,_hoodie_partition_path, col1, col2_new
assertEquals(5, record1.size());

Writable c1 = record1.get(3);
assertTrue(c1 instanceof DoubleWritable);
assertEquals("1.1", c1.toString().substring(0, 3));

Writable c2 = record1.get(4);
assertTrue(c2 instanceof Text);
assertEquals("text2", c2.toString());
}
RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader);
// mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat
schemaEvolutionContext.doEvolutionForParquetFormat();
schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader);
recordReader.close();
}
}

assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2");
assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno,"
+ "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2");
assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string");
private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {
List<List<Writable>> records = new ArrayList<>();
NullWritable key = recordReader.createKey();
ArrayWritable writable = recordReader.createValue();
while (writable != null && recordReader.next(key, writable)) {
records.add(Arrays.stream(writable.get())
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}
return records;
}
}

0 comments on commit 31bc565

Please sign in to comment.