Skip to content

Commit

Permalink
[HUDI-4098] Support HMS for flink HoodieCatalog
Browse files Browse the repository at this point in the history
[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4098] Support HMS for flink HoodieCatalog

[minor] following 4152, refactor the clazz about plan selection strategy (apache#6060)

[HUDI-4367] Support copyToTable on call (apache#6054)

[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4335] Bug fixes in AWSGlueCatalogSyncClient post schema evolution. (apache#5995)

* fix for updateTableParameters which is not excluding partition columns and updateTableProperties boolean check

* Fix - serde parameters getting overrided on table property update

* removing stale syncConfig

[HUDI-4276] Reconcile schema-inject null values for missing fields and add new fields (apache#6017)

* [HUDI-4276] Reconcile schema-inject null values for missing fields and add new fields.

* fix comments

Co-authored-by: public (bdcee5037027) <mengtao0326@qq.com>

[HUDI-3500] Add call procedure for RepairsCommand (apache#6053)

[HUDI-2150] Rename/Restructure configs for better modularity (apache#6061)

- Move clean related configuration to HoodieCleanConfig
- Move Archival related configuration to HoodieArchivalConfig
- hoodie.compaction.payload.class move this to HoodiePayloadConfig

[MINOR] Bump xalan from 2.7.1 to 2.7.2 (apache#6062)

Bumps xalan from 2.7.1 to 2.7.2.

---
updated-dependencies:
- dependency-name: xalan:xalan
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

[HUDI-4324] Remove use_jdbc config from hudi sync (apache#6072)

* [HUDI-4324] Remove use_jdbc config from hudi sync
* Users should use HIVE_SYNC_MODE instead

[HUDI-3730][RFC-55] Improve hudi-sync classes design and simplify configs (apache#5695)

* [HUDI-4146] RFC for Improve Hive/Meta sync class design and hierarchies

Co-authored-by: jian.feng <jian.feng@shopee.com>
Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

[HUDI-4323] Make database table names optional in sync tool (apache#6073)

* [HUDI-4323] Make database table names optional in sync tool
* Infer from these properties from the table config

[MINOR] Update RFCs status (apache#6078)
  • Loading branch information
cuibo01 committed Jul 11, 2022
1 parent c744848 commit f72a622
Show file tree
Hide file tree
Showing 123 changed files with 5,663 additions and 1,283 deletions.
2 changes: 1 addition & 1 deletion conf/hudi-defaults.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.mode jdbc
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false
3 changes: 2 additions & 1 deletion docker/demo/config/hoodie-incr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
# hive sync
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
hoodie.datasource.hive_sync.mode=jdbc
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
hoodie.datasource.hive_sync.partition_fields=partition
hoodie.datasource.hive_sync.partition_fields=partition
2 changes: 2 additions & 0 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down Expand Up @@ -79,6 +80,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
import java.util.stream.Collectors;

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
Expand Down Expand Up @@ -193,11 +193,11 @@ public void dropPartitions(String tableName, List<String> partitionsToDrop) {
*/
@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
if (nonEmpty(tableProperties)) {
if (isNullOrEmpty(tableProperties)) {
return;
}
try {
updateTableParameters(awsGlue, databaseName, tableName, tableProperties, true);
updateTableParameters(awsGlue, databaseName, tableName, tableProperties, false);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update properties for table " + tableId(databaseName, tableName), e);
}
Expand All @@ -210,10 +210,7 @@ public void updateTableSchema(String tableName, MessageType newSchema) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
List<Column> newColumns = getColumnsFromSchema(newSchemaMap);
StorageDescriptor sd = table.getStorageDescriptor();
sd.setColumns(newColumns);

Expand Down Expand Up @@ -258,15 +255,7 @@ public void createTable(String tableName,
try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);

List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}
List<Column> schemaWithoutPartitionKeys = getColumnsFromSchema(mapSchema);

// now create the schema partition
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
Expand Down Expand Up @@ -419,6 +408,19 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
}

private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
// In Glue, the full schema should exclude the partition keys
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
cols.add(column);
}
}
return cols;
}

private enum TableType {
MANAGED_TABLE,
EXTERNAL_TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
Expand Down Expand Up @@ -538,7 +538,7 @@ private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, Stri
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
Expand Down Expand Up @@ -72,7 +73,8 @@ public void init() throws Exception {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
Expand Down Expand Up @@ -212,7 +213,8 @@ public void testShowArchivedCommits(boolean enableMetadataTable) throws Exceptio
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
Expand Down Expand Up @@ -266,7 +268,8 @@ public void testShowArchivedCommitsWithMultiCommitsFile(boolean enableMetadataTa
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
Expand Down Expand Up @@ -166,7 +167,8 @@ private void generateArchive() throws IOException {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -276,15 +277,21 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty()) {
InternalSchema internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
if (historySchemaStr.isEmpty()) {
internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema);
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr.isEmpty() ? SerDeHelper.inheritSchemas(evolvedSchema, "") : historySchemaStr);
} else {
evolvedSchema.setSchemaId(Long.parseLong(instantTime));
String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
Expand Down
Loading

0 comments on commit f72a622

Please sign in to comment.