Skip to content

Commit

Permalink
fix for updateTableParameters which is not excluding partition column…
Browse files Browse the repository at this point in the history
…s and updateTableProperties boolean check
  • Loading branch information
kumudkumartirupati committed Jul 8, 2022
1 parent a998586 commit 243d486
Showing 1 changed file with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
import java.util.stream.Collectors;

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
Expand Down Expand Up @@ -193,7 +193,7 @@ public void dropPartitions(String tableName, List<String> partitionsToDrop) {
*/
@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
if (nonEmpty(tableProperties)) {
if (isNullOrEmpty(tableProperties)) {
return;
}
try {
Expand All @@ -210,10 +210,7 @@ public void updateTableSchema(String tableName, MessageType newSchema) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
List<Column> newColumns = getColumnsFromSchema(newSchemaMap);
StorageDescriptor sd = table.getStorageDescriptor();
sd.setColumns(newColumns);

Expand Down Expand Up @@ -258,15 +255,7 @@ public void createTable(String tableName,
try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);

List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}
List<Column> schemaWithoutPartitionKeys = getColumnsFromSchema(mapSchema);

// now create the schema partition
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
Expand Down Expand Up @@ -419,6 +408,19 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
}

private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
// In Glue, the full schema should exclude the partition keys
if (!syncConfig.partitionFields.contains(key)) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
cols.add(column);
}
}
return cols;
}

private enum TableType {
MANAGED_TABLE,
EXTERNAL_TABLE,
Expand Down

0 comments on commit 243d486

Please sign in to comment.