Skip to content

Commit

Permalink
Merge branch 'master' into se
Browse files Browse the repository at this point in the history
  • Loading branch information
trushev authored Jul 7, 2022
2 parents 88ce744 + 7eeaff9 commit 197780a
Show file tree
Hide file tree
Showing 230 changed files with 15,078 additions and 3,101 deletions.
18 changes: 9 additions & 9 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -47,10 +47,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
Expand Down Expand Up @@ -79,10 +79,10 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfig.HIVE_USER.key(), "hive").
option(HiveSyncConfig.HIVE_PASS.key(), "hive").
option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.Partition;

import com.amazonaws.services.glue.AWSGlue;
Expand Down Expand Up @@ -50,10 +50,6 @@
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
Expand All @@ -69,8 +65,12 @@

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
Expand All @@ -79,18 +79,18 @@
*
* @Experimental
*/
public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
public class AWSGlueCatalogSyncClient extends HoodieSyncClient {

private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
private final AWSGlue awsGlue;
private final String databaseName;

public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
super(syncConfig, hadoopConf, fs);
public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
this.awsGlue = AWSGlueClientBuilder.standard().build();
this.databaseName = syncConfig.databaseName;
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
}

@Override
Expand Down Expand Up @@ -126,7 +126,7 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
StorageDescriptor sd = table.getStorageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
Expand Down Expand Up @@ -160,7 +160,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
StorageDescriptor sd = table.getStorageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
Expand Down Expand Up @@ -204,12 +204,12 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
}

@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
boolean cascade = syncConfig.partitionFields.size() > 0;
boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
Expand Down Expand Up @@ -237,21 +237,6 @@ public void updateTableDefinition(String tableName, MessageType newSchema) {
}
}

@Override
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
}

@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}

@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}

@Override
public void createTable(String tableName,
MessageType storageSchema,
Expand All @@ -265,26 +250,26 @@ public void createTable(String tableName,
}
CreateTableRequest request = new CreateTableRequest();
Map<String, String> params = new HashMap<>();
if (!syncConfig.createManagedTable) {
if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
params.put("EXTERNAL", "TRUE");
}
params.putAll(tableProperties);

try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);

List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (!syncConfig.partitionFields.contains(key)) {
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}

// now create the schema partition
List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
String keyType = getPartitionKeyType(mapSchema, partitionKey);
return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
Expand All @@ -293,7 +278,7 @@ public void createTable(String tableName,
serdeProperties.put("serialization.format", "1");
storageDescriptor
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
.withLocation(s3aToS3(syncConfig.basePath))
.withLocation(s3aToS3(getBasePath()))
.withInputFormat(inputFormatClass)
.withOutputFormat(outputFormatClass)
.withColumns(schemaWithoutPartitionKeys);
Expand All @@ -320,7 +305,7 @@ public void createTable(String tableName,
}

@Override
public Map<String, String> getTableSchema(String tableName) {
public Map<String, String> getMetastoreSchema(String tableName) {
try {
// GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
Expand All @@ -340,11 +325,6 @@ public Map<String, String> getTableSchema(String tableName) {
}
}

@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}

@Override
public boolean tableExists(String tableName) {
GetTableRequest request = new GetTableRequest()
Expand Down Expand Up @@ -412,11 +392,11 @@ public void close() {

@Override
public void updateLastCommitTimeSynced(String tableName) {
if (!activeTimeline.lastInstant().isPresent()) {
if (!getActiveTimeline().lastInstant().isPresent()) {
LOG.warn("No commit in active timeline.");
return;
}
final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp();
final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp();
try {
updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,44 @@

package org.apache.hudi.aws.sync;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;

import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;

import java.util.Properties;

/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
* to enable querying via Glue ETLs, Athena etc.
*
* <p>
* Extends HiveSyncTool since most logic is similar to Hive syncing,
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
* the necessary functionality using Glue APIs.
*
* @Experimental
*/
public class AwsGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, new HiveConf(conf, HiveConf.class), fs);
}
public class AWSGlueCatalogSyncTool extends HiveSyncTool {

public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig, hiveConf, fs);
public AWSGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
}

@Override
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
}

public static void main(String[] args) {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
final HiveSyncConfig.HiveSyncConfigParams params = new HiveSyncConfig.HiveSyncConfigParams();
JCommander cmd = JCommander.newBuilder().addObject(params).build();
cmd.parse(args);
if (params.isHelp()) {
cmd.usage();
System.exit(1);
System.exit(0);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
new AWSGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;

/**
Expand Down Expand Up @@ -143,7 +142,7 @@ private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartition
if (config.getBloomIndexPruneByRanges()) {
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) {
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
}
// fallback to loading column ranges from files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;

/**
Expand All @@ -64,7 +63,7 @@ private BloomFilter getBloomFilter() {
HoodieTimer timer = new HoodieTimer().startTimer();
try {
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;

Expand Down Expand Up @@ -579,7 +578,7 @@ private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti
}

private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
Expand Down Expand Up @@ -716,7 +715,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
}

public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
Set<String> completedIndexes = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> completedIndexes = dataMetaClient.getTableConfig().getMetadataPartitions();
Set<String> inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig());

for (MetadataPartitionType partitionType : metadataPartitions) {
Expand Down Expand Up @@ -806,7 +805,7 @@ private <T> void processAndCommit(String instantTime, ConvertMetadataFunction co

private Set<String> getMetadataPartitionsToUpdate() {
// fetch partitions to update from table config
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> partitionsToUpdate = dataMetaClient.getTableConfig().getMetadataPartitions();
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
// NOTE: Async HoodieIndexer can move some partition to inflight. While that partition is still being built,
// the regular ingestion writers should not be blocked. They can go ahead and log updates to the metadata partition.
Expand Down
Loading

0 comments on commit 197780a

Please sign in to comment.