Skip to content

Commit

Permalink
[HUDI-5244] Fix bugs in schema evolution client with lost operation f…
Browse files Browse the repository at this point in the history
…ield and not found schema (apache#7248)

* [HUDI-5244] Fix bugs in schema evolution client with lost operation field and not found schema
  • Loading branch information
trushev authored and satishkotha committed Dec 12, 2022
1 parent 63668f9 commit 321451d
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
internalSchema.setSchemaId(Long.parseLong(instantTime));
Expand Down Expand Up @@ -1670,16 +1670,13 @@ public void reOrderColPosition(String colName, String referColName, TableChange.
private Pair<InternalSchema, HoodieTableMetaClient> getInternalSchemaAndMetaClient() {
HoodieTableMetaClient metaClient = createMetaClient(true);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (!internalSchemaOption.isPresent()) {
throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath()));
}
return Pair.of(internalSchemaOption.get(), metaClient);
return Pair.of(getInternalSchema(schemaUtil), metaClient);
}

private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet(
() -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), ""));
Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName());
String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType());
String instantTime = HoodieActiveTimeline.createNewInstantTime();
Expand All @@ -1701,4 +1698,14 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
}

private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
return schemaUtil.getTableInternalSchemaFromCommitMetadata().orElseGet(() -> {
try {
return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema());
} catch (Exception e) {
throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath()));
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.testutils.HoodieJavaClientTestBase;

import org.apache.avro.Schema;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Collections;

import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Tests for schema evolution client api.
*/
public class TestSchemaEvolutionClient extends HoodieJavaClientTestBase {

private static final Schema SCHEMA = getSchemaFromResource(TestSchemaEvolutionClient.class, "/exampleSchema.avsc");

@BeforeEach
public void setUpClient() throws IOException {
HoodieJavaWriteClient<RawTripTestPayload> writeClient = getWriteClient();
this.writeClient = writeClient;
prepareTable(writeClient);
}

@AfterEach
public void closeClient() {
if (writeClient != null) {
writeClient.close();
}
}

@Test
public void testUpdateColumnType() {
writeClient.updateColumnType("number", Types.LongType.get());
assertEquals(Types.LongType.get(), getFieldByName("number").type());
}

private HoodieJavaWriteClient<RawTripTestPayload> getWriteClient() {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withPath(basePath)
.withSchema(SCHEMA.toString())
.build();
return new HoodieJavaWriteClient<>(context, config);
}

private void prepareTable(HoodieJavaWriteClient<RawTripTestPayload> writeClient) throws IOException {
String commitTime = "1";
writeClient.startCommitWithTime(commitTime);
//language=JSON
String jsonRow = "{\"_row_key\": \"1\", \"time\": \"2000-01-01T00:00:00.000Z\", \"number\": 1}";
RawTripTestPayload payload = new RawTripTestPayload(jsonRow);
HoodieAvroRecord<RawTripTestPayload> record = new HoodieAvroRecord<>(
new HoodieKey(payload.getRowKey(), payload.getPartitionPath()), payload);
writeClient.insert(Collections.singletonList(record), commitTime);
}

private Types.Field getFieldByName(String fieldName) {
return new TableSchemaResolver(metaClient)
.getTableInternalSchemaFromCommitMetadata()
.get()
.findField(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public static Schema createHoodieWriteSchema(String originalSchema) {
return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema));
}

public static Schema createHoodieWriteSchema(String originalSchema, boolean withOperationField) {
return addMetadataFields(new Schema.Parser().parse(originalSchema), withOperationField);
}

/**
* Adds the Hoodie metadata fields to the given schema.
*
Expand Down

0 comments on commit 321451d

Please sign in to comment.