Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3981] Flink engine support for comprehensive schema evolution #5830

Merged
merged 1 commit into from
Nov 30, 2022

Conversation

trushev
Copy link
Contributor

@trushev trushev commented Jun 10, 2022

Change Logs

This PR adds support of reading by flink when comprehensive schema evolution(RFC-33) enabled and there are operations add column, rename column, change type of column, drop column.

Impact

user-facing feature change: comprehensive schema evolution in flink

Risk level medium

This change added tests and can be verified as follows:

  • Added unit test TestCastMap to verify that type conversion is correct
  • Added integration test ITTestSchemaEvolution to verify that table with added, renamed, casted, dropped columns is read as expected.

Documentation Update

There is schema evolution doc https://hudi.apache.org/docs/schema_evolution

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@trushev trushev changed the title [HUDI-3981] Flink engine support for comprehensive schema evolution(R… [HUDI-3981] Flink engine support for comprehensive schema evolution(RFC-33) Jun 10, 2022
@danny0405
Copy link
Contributor

If it is ready for reviewing, you can ping someone for help :)

@trushev
Copy link
Contributor Author

trushev commented Jun 13, 2022

@xiarixiaoyao could you pls review this PR :)

@xiarixiaoyao
Copy link
Contributor

@trushev thanks for your contribution, i will review it next few days

}

public static LogicalType[] project(List<DataType> fieldTypes, int[] selectedFields) {
return Arrays.stream(selectedFields)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be better to support nested column projects in the future

@xiarixiaoyao
Copy link
Contributor

Partial review. Still looking @trushev Overall, looks good.

@xiarixiaoyao
Copy link
Contributor

@danny0405 @XuQianJin-Stars could you pls help review this pr, thanks very much

@trushev
Copy link
Contributor Author

trushev commented Jul 1, 2022

Sorry for force push, rebased on the latest master to get fix [HUDI-4258]

@danny0405 danny0405 self-assigned this Jul 3, 2022
@trushev
Copy link
Contributor Author

trushev commented Jul 7, 2022

Resolved conflict with master

@trushev
Copy link
Contributor Author

trushev commented Jul 10, 2022

Resolved conflict with master

@trushev
Copy link
Contributor Author

trushev commented Jul 12, 2022

I think it is ready to merge

@danny0405
Copy link
Contributor

Thanks, i will take a look this week, and before that, please do not merge.

@trushev
Copy link
Contributor Author

trushev commented Jul 15, 2022

Ok, then I will fix the typo in commit message HUDI-3983 => HUDI-3981 along with comment fixes

if (!internalSchemaOption.isPresent()) {
throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath()));
}
return Pair.of(internalSchemaOption.get(), metaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take a quick look at the PR and feels that the schema about codes is too invasive to be everywhere, which is hard to maintain and prone to be buggy, we need a more neat way for the code engineering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for the review. I will think about decoupling schema evo from the other code

boolean needToReWriteRecord = false;
Map<String, String> renameCols = new HashMap<>();
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the schema evolution for each file ? or each read/commit ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this code snippet from HoodieMergeHelper to BaseMergeHelper as is. Anyway I will think about avoiding unnecessary checks you pointed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trushev
can we avoid moved this code snippet, i donnot think flink evolution need to modify those codes.
#6358 and #7183 will optimize this code

@danny0405
we need check evolution for each base file.
Once we have made multiple columns changes, different base files may have different schemas, and we cannot use the schema of the current table to read these files directly, an exception will be thrown directly

tableA: a int, b string, c double and there exist three files in this table: f1, f2, f3

drop column from tableA and add new column d, and then we update tableA, but we only update f2,and f3, f1 is not touched
now schema

schema1  from tableA: a int, b string, d long.  
schema2  from f2,f3:  a int, b string, d long 
schema3 from f1 is: a int, b string , c double

we should not use schema1 to read f1.

Copy link
Contributor Author

@trushev trushev Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trushev can we avoid moved this code snippet, i donnot think flink evolution need to modify those codes. #6358 and #7183 will optimize this code

@xiarixiaoyao This code should be moved from HoodieMergeHelper to BaseMergeHelper due to current class hierarchy:

I don't want to modify that code I just want to reuse it in flink

@@ -135,10 +137,15 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
return this;
}

public Builder withInternalSchema(InternalSchema internalSchema) {
this.internalSchema = internalSchema;
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a read schema, why we pass around another schema, whatever it is, please use just one schema !

Copy link
Contributor Author

@trushev trushev Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the second schema here to be consistent with HoodieMergedLogRecordScanner which already uses this approach to scan logs in HoodieMergeOnReadRDD#scanLog. Do you think it is a bad practice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a very confusing practice, the reader/format should be deterministic to one static given schema, it should not care about how the schema is generated, or where it comes from, say: it should not be imposed to the evolution logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted changes in HoodieMergedLogRecordScanner. Now there is only one schema -- InternalSchema which wraps org.apache.avro.Schema. The same approach is used in HoodieUnMergedLogRecordScanner

@trushev trushev changed the title [HUDI-3981] Flink engine support for comprehensive schema evolution(RFC-33) [WIP][HUDI-3981] Flink engine support for comprehensive schema evolution(RFC-33) Jul 23, 2022
@yihua yihua added schema-and-data-types flink Issues related to flink priority:major degraded perf; unable to move forward; potential bugs labels Sep 13, 2022
@flashJd
Copy link
Contributor

flashJd commented Sep 23, 2022

@trushev Good job, I've tested it and it works on the whole, but a little defects and I'll point out

@flashJd
Copy link
Contributor

flashJd commented Sep 23, 2022

@danny0405 @xiarixiaoyao this pr is pending for two mouths, when can we merge it, as spark only support full schema evolution in spark 3.x.x, my spark version is 2.4.

@@ -120,6 +121,12 @@ private FlinkOptions() {
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");

public static final ConfigOption<Boolean> SCHEMA_EVOLUTION_ENABLED = ConfigOptions
.key(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key())
.booleanType()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to add the option if the key is the same with Hoodie core's.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, just add a tool in OptionsResolver

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with deprecated conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), false)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added OptionsResolver.isSchemaEvolutionEnabled

@@ -102,4 +108,9 @@ public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
return Option.empty();
}
}

private static void setLatestInternalSchema(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
Option<InternalSchema> internalSchema = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add pre-condition check in case of null values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with isPresent()

Option<RowDataProjection> castProjection;
InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName());
if (fileSchema.isEmptySchema()) {
castProjection = Option.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can return HoodieParquetReader directly here when we know castProjection is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy-pasted

}

private static void assertSchemasAreNotEmpty(InternalSchema schema1, InternalSchema schema2) {
Preconditions.checkArgument(!schema1.isEmptySchema(), "InternalSchema cannot be empty here");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to bind the schema validation together, and we can give more details exception msg for different schemas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed method, replaced message "InternalSchema..." with "querySchema..."

@@ -87,4 +88,8 @@ public Object[] projectAsValues(RowData rowData) {
}
return values;
}

protected @Nullable Object rewriteVal(int pos, @Nullable Object val) {
return val;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewriteVal => getVal , usually we do not overwrite impl methods, but only abstract methods, the override is not very friendly for base class performance.

Copy link
Contributor Author

@trushev trushev Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed rewriteVal => getVal

castMap.add(1, new DecimalType(), new VarCharType());
DecimalData val = DecimalData.fromBigDecimal(BigDecimal.ONE, 2, 1);
assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 3, 2), castMap.castIfNeeded(0, val));
assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(1, val));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For float, double and decimal data types, what is case when the target data type has precision loss, do we throw exception here ? Exactly what is the data type precedence(what kind of data type is castable here) for each of the type ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we throw exception here

no, we followed by spark's implementation org.apache.hudi.client.utils.SparkInternalSchemaConverter#convertDoubleType

what kind of data type is castable here

  • Float => Double, Decimal
  • Double => Decimal
  • Decimal => Decimal (change precision or scale)
  • String => Decimal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, i see we return null when CastMap cast a type that is not in its precedence list, is that reasonable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following example throws exception:

CastMap castMap = new CastMap();
castMap.add(0, new BigIntType(), new IntType()); // <---- error, cast long to int is unsupported
java.lang.IllegalArgumentException: Cannot create cast BIGINT => INT at pos 0

Copy link
Contributor Author

@trushev trushev Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following example throws exception as well:

CastMap castMap = new CastMap();
castMap.add(0, new IntType(), new BigIntType()); // cast int => long
castMap.castIfNeeded(0, "wrong arg"); // <----- error, expected int but actual is string
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Number

InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName());
if (fileSchema.isEmptySchema()) {
return new HoodieParquetSplitReader(
ParquetSplitReaderUtil.genPartColumnarRowReader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HoodieParquetSplitReader can be shared ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean shared with another file split? I guess no because of ParquetColumnarRowSplitReader is not shareable. Currently, we always create new parquet reader for each file


public HoodieParquetSplitReader(ParquetColumnarRowSplitReader reader) {
this.reader = reader;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetColumnarRowSplitReader can implement HoodieParquetReader directly.

Copy link
Contributor Author

@trushev trushev Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoided it on purpose because of

  1. ParquetColumnarRowSplitReader is copied from flink. I'd like to avoid any changes in this class.
  2. We should maintain 3 versions of this: 1.13.x, 1.14.x, 1.15.x.
  3. There is note in ParquetSplitReaderUtil:
 * <p>NOTE: reference from Flink release 1.11.2 {@code ParquetSplitReaderUtil}, modify to support INT64
 * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports that.

I think if we remove ParquetSplitReaderUtil then we want to remove ParquetColumnarRowSplitReader as well

@danny0405
Copy link
Contributor

@hudi-bot run azure

@danny0405
Copy link
Contributor

Can you scrash and force push here. I didn't see the Azure CI history, let's re-trigger it.

@trushev
Copy link
Contributor Author

trushev commented Nov 29, 2022

Can you scrash and force push here. I didn't see the Azure CI history, let's re-trigger it.

Done, waiting for azure

@trushev
Copy link
Contributor Author

trushev commented Nov 29, 2022

CI build failure due to broken master branch. I've pushed the fix #7319

@trushev
Copy link
Contributor Author

trushev commented Nov 29, 2022

It looks like azure doesn't run on this PR anymore. Verifying PR is opened #7321

@trushev trushev changed the title [HUDI-3981][RFC-33] Flink engine support for comprehensive schema evolution [HUDI-3981] Flink engine support for comprehensive schema evolution Nov 30, 2022
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@danny0405 danny0405 merged commit da89e12 into apache:master Nov 30, 2022
@trushev
Copy link
Contributor Author

trushev commented Nov 30, 2022

@danny0405 @xiarixiaoyao @flashJd thank you for the review this PR

@xiarixiaoyao
Copy link
Contributor

@trushev
Thanks a lot for contributing this feature and waiting for the review patiently.

@XuQianJin-Stars
Copy link
Contributor

@trushev Thanks a lot for contributing this feature.

@waywtdcc
Copy link
Contributor

waywtdcc commented Dec 4, 2022

@trushev @danny0405 Hello, can this pr be merged into 0.12.1 to support flink schema evolution? Do I need to merge other PRs?

@trushev
Copy link
Contributor Author

trushev commented Dec 5, 2022

@trushev @danny0405 Hello, can this pr be merged into 0.12.1 to support flink schema evolution? Do I need to merge other PRs?

Yes there are several commits that this PR depends on. I think it is not a big deal to backport the feature. I'm just not sure about release policy. Is such change suitable minor update 1.12.1 -> 1.12.2

@waywtdcc
Copy link
Contributor

Hope this pr can be merged into 0.12.2

neverdizzy pushed a commit to neverdizzy/hudi that referenced this pull request Dec 13, 2022
@voonhous
Copy link
Member

@trushev I've read through the PR and noticed that the scope of the changes included here is limited to supporting Hudi Full Schema Evolution (HFSE).

Prior to HFSE, Hudi has been relying on Avro's native Schema-Resolution (ASR) to perform schema evolution when performing UPSERTs via Spark, where schema changes are applied implicitly.

These implicit schema changes do not write to .schema and hence, the feature here will not support ASR reads via Flink.

I provided some examples (Mainly on Spark) in this issue here: #7444.

I was wondering if you have any plans on supporting ASR reads via Flink.

If there are none, I plan on adding this support for ASR reads via Flink. Wanted to clarify to prevent repeated effort on the same feature.

@trushev
Copy link
Contributor Author

trushev commented Dec 16, 2022

@voonhous
As I understand you are talking about this case

Flink SQL>
-- write with schema1
create table tbl(`id` int primary key, `value` int)
    partitioned by (`id`)
    with ('connector'='hudi', 'path'='/tmp/tbl');
insert into tbl values (1, 10);

-- write with schema2 int => double
drop table tbl;
create table tbl(`id` int primary key, `value` double)
    partitioned by (`id`)
    with ('connector'='hudi', 'path'='/tmp/tbl');
insert into tbl values (2, 20.0);

-- read all data
select * from tbl; -- throws exception due to tbl consists of two partitioned files (1, 10) and (2, 20.0)
Caused by: java.lang.IllegalArgumentException: Unexpected type: INT32

While if we delete partitioned by ('id') in sql above, tbl will consist of two unpartitioned files (1, 10.0), (2, 20.0) and read query will work fine

select * from tbl;
+----+-------------+--------------------------------+
| op |          id |                          value |
+----+-------------+--------------------------------+
| +I |           1 |                           10.0 |
| +I |           2 |                           20.0 |
+----+-------------+--------------------------------+

In my opinion it's a good idea to support such scenario you described for spark #7480
Currently, I have no plans to implement it so you can do it if you wish

@voonhous
Copy link
Member

@trushev

Yes, this is what I intend to work on.

What you described is operations made entirely on FlinkSQL. I was thinking of cross-engine operations.

i.e. Tables that were evolved using Avro Schema Resolution (ASR) via Spark, but read using Flink, the same error will be thrown for such cases too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
big-needle-movers flink Issues related to flink priority:major degraded perf; unable to move forward; potential bugs schema-and-data-types
Projects
Status: 🚧 Needs Repro
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

9 participants