Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle missing StorageDescriptor in Hive Glue tables #17655

Merged
merged 3 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import static java.lang.Float.intBitsToFloat;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.common.FileUtils.unescapePathName;
import static org.apache.hadoop.hive.metastore.ColumnType.typeToThriftType;
Expand Down Expand Up @@ -164,6 +165,10 @@ public class MetastoreUtil
// rather than copying the old transient_lastDdlTime to hive partition.
private static final String TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime";
private static final Set<String> STATS_PROPERTIES = ImmutableSet.of(NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE, TRANSIENT_LAST_DDL_TIME);
public static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
public static final String SPARK_TABLE_PROVIDER_KEY = "spark.sql.sources.provider";
public static final String DELTA_LAKE_PROVIDER = "delta";

private MetastoreUtil()
{
Expand Down Expand Up @@ -968,4 +973,25 @@ public static void deleteDirectoryRecursively(HdfsContext context, HdfsEnvironme
log.warn(e, "Failed to delete path: " + path.toString());
}
}

public static boolean isDeltaLakeTable(Table table)
{
return isDeltaLakeTable(table.getParameters());
}

public static boolean isDeltaLakeTable(Map<String, String> tableParameters)
{
return tableParameters.containsKey(SPARK_TABLE_PROVIDER_KEY)
&& tableParameters.get(SPARK_TABLE_PROVIDER_KEY).toLowerCase(ENGLISH).equals(DELTA_LAKE_PROVIDER);
}

public static boolean isIcebergTable(Table table)
{
return isIcebergTable(table.getParameters());
}

public static boolean isIcebergTable(Map<String, String> tableParameters)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(tableParameters.get(ICEBERG_TABLE_TYPE_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.convertPredicateToParts;
import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveBasicStatistics;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isIcebergTable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.makePartName;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.updateStatisticsParameters;
Expand Down Expand Up @@ -119,8 +120,6 @@ public class FileHiveMetastore
private static final String ADMIN_ROLE_NAME = "admin";
private static final String PRESTO_SCHEMA_FILE_NAME = ".prestoSchema";
private static final String PRESTO_PERMISSIONS_DIRECTORY_NAME = ".prestoPermissions";
private static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
private static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
// todo there should be a way to manage the admins list
private static final Set<String> ADMIN_USERS = ImmutableSet.of("admin", "hive", "hdfs");

Expand Down Expand Up @@ -262,7 +261,7 @@ else if (table.getTableType().equals(EXTERNAL_TABLE)) {
if (!externalFileSystem.isDirectory(externalLocation)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "External table location does not exist");
}
if (isChildDirectory(catalogDirectory, externalLocation) && !isIcebergTable(table.getParameters())) {
if (isChildDirectory(catalogDirectory, externalLocation) && !isIcebergTable(table)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "External table location can not be inside the system metadata directory");
}
}
Expand Down Expand Up @@ -451,7 +450,7 @@ public synchronized MetastoreOperationResult replaceTable(MetastoreContext metas
checkArgument(!newTable.getTableType().equals(TEMPORARY_TABLE), "temporary tables must never be stored in the metastore");

Table table = getRequiredTable(metastoreContext, databaseName, tableName);
if ((!table.getTableType().equals(VIRTUAL_VIEW) || !newTable.getTableType().equals(VIRTUAL_VIEW)) && !isIcebergTable(table.getParameters())) {
if ((!table.getTableType().equals(VIRTUAL_VIEW) || !newTable.getTableType().equals(VIRTUAL_VIEW)) && !isIcebergTable(table)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Only views can be updated with replaceTable");
}
if (!table.getDatabaseName().equals(databaseName) || !table.getTableName().equals(tableName)) {
Expand Down Expand Up @@ -484,7 +483,7 @@ public synchronized MetastoreOperationResult renameTable(MetastoreContext metast

Table table = getRequiredTable(metastoreContext, databaseName, tableName);
getRequiredDatabase(metastoreContext, newDatabaseName);
if (isIcebergTable(table.getParameters())) {
if (isIcebergTable(table)) {
throw new PrestoException(NOT_SUPPORTED, "Rename not supported for Iceberg tables");
}

Expand All @@ -503,11 +502,6 @@ public synchronized MetastoreOperationResult renameTable(MetastoreContext metast
return EMPTY_RESULT;
}

private static boolean isIcebergTable(Map<String, String> parameters)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(parameters.get(ICEBERG_TABLE_TYPE_NAME));
}

@Override
public synchronized MetastoreOperationResult addColumn(MetastoreContext metastoreContext, String databaseName, String tableName, String columnName, HiveType columnType, String columnComment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.amazonaws.services.glue.model.SerDeInfo;
import com.amazonaws.services.glue.model.StorageDescriptor;
import com.facebook.presto.hive.HiveBucketProperty;
import com.facebook.presto.hive.HiveStorageFormat;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Database;
Expand All @@ -40,6 +41,10 @@

import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isDeltaLakeTable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isIcebergTable;
import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
import static com.facebook.presto.hive.metastore.util.Memoizers.memoizeLast;
import static com.google.common.base.Strings.nullToEmpty;
Expand All @@ -66,28 +71,41 @@ public static Database convertDatabase(com.amazonaws.services.glue.model.Databas

public static Table convertTable(com.amazonaws.services.glue.model.Table glueTable, String dbName)
{
requireNonNull(glueTable.getStorageDescriptor(), "Table StorageDescriptor is null");
StorageDescriptor sd = glueTable.getStorageDescriptor();
Map<String, String> tableParameters = convertParameters(glueTable.getParameters());

Table.Builder tableBuilder = Table.builder()
.setDatabaseName(dbName)
.setTableName(glueTable.getName())
.setOwner(nullToEmpty(glueTable.getOwner()))
// Athena treats missing table type as EXTERNAL_TABLE.
.setTableType(PrestoTableType.optionalValueOf(glueTable.getTableType()).orElse(EXTERNAL_TABLE))
.setDataColumns(convertColumns(sd.getColumns()))
.setParameters(convertParameters(glueTable.getParameters()))
.setParameters(tableParameters)
.setViewOriginalText(Optional.ofNullable(glueTable.getViewOriginalText()))
.setViewExpandedText(Optional.ofNullable(glueTable.getViewExpandedText()));

if (glueTable.getPartitionKeys() != null) {
tableBuilder.setPartitionColumns(convertColumns(glueTable.getPartitionKeys()));
StorageDescriptor sd = glueTable.getStorageDescriptor();
if (isIcebergTable(tableParameters) || (sd == null && isDeltaLakeTable(tableParameters))) {
// Iceberg and Delta Lake tables do not use the StorageDescriptor field, but we need to return a Table so the caller can check that
// the table is an Iceberg/Delta table and decide whether to redirect or fail.
tableBuilder.setDataColumns(ImmutableList.of(new Column("dummy", HIVE_INT, Optional.empty(), Optional.empty())));
tableBuilder.getStorageBuilder().setStorageFormat(StorageFormat.fromHiveStorageFormat(HiveStorageFormat.PARQUET));
tableBuilder.getStorageBuilder().setLocation(sd.getLocation());
}
else {
tableBuilder.setPartitionColumns(ImmutableList.of());
if (sd == null) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, format("Table StorageDescriptor is null for table %s.%s (%s)", dbName, glueTable.getName(), glueTable));
}
tableBuilder.setDataColumns(convertColumns(sd.getColumns()));
if (glueTable.getPartitionKeys() != null) {
tableBuilder.setPartitionColumns(convertColumns(glueTable.getPartitionKeys()));
}
else {
tableBuilder.setPartitionColumns(ImmutableList.of());
}

new StorageConverter().setConvertedStorage(sd, tableBuilder.getStorageBuilder());
}

new StorageConverter().setConvertedStorage(sd, tableBuilder.getStorageBuilder());
return tableBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getProtectMode;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isDeltaLakeTable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isIcebergTable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyOnline;
Expand Down Expand Up @@ -625,6 +627,10 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema
throw new TableNotFoundException(tableName);
}

if (isIcebergTable(table.get()) || isDeltaLakeTable(table.get())) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, format("Not a Hive table '%s'", tableName));
}

Function<HiveColumnHandle, ColumnMetadata> metadataGetter = columnMetadataGetter(table.get(), typeManager, metastoreContext.getColumnConverter());
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ protected Transaction newTransaction()
return new HiveTransaction(transactionManager, metadataFactory.get());
}

interface Transaction
protected interface Transaction
extends AutoCloseable
{
ConnectorMetadata getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.hive.metastore.glue.converter.GlueToPrestoConverter.GluePartitionConverter;
import com.facebook.presto.spi.security.PrincipalType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -32,6 +33,10 @@
import java.util.List;

import static com.amazonaws.util.CollectionUtils.isNullOrEmpty;
import static com.facebook.presto.hive.metastore.MetastoreUtil.DELTA_LAKE_PROVIDER;
import static com.facebook.presto.hive.metastore.MetastoreUtil.ICEBERG_TABLE_TYPE_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.ICEBERG_TABLE_TYPE_VALUE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.SPARK_TABLE_PROVIDER_KEY;
import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
import static com.facebook.presto.hive.metastore.glue.TestingMetastoreObjects.getGlueTestColumn;
import static com.facebook.presto.hive.metastore.glue.TestingMetastoreObjects.getGlueTestDatabase;
Expand Down Expand Up @@ -186,6 +191,24 @@ public void testConvertTableWithoutTableType()
assertEquals(prestoTable.getTableType(), EXTERNAL_TABLE);
}

@Test
public void testIcebergTableNonNullStorageDescriptor()
{
testTbl.setParameters(ImmutableMap.of(ICEBERG_TABLE_TYPE_NAME, ICEBERG_TABLE_TYPE_VALUE));
assertNotNull(testTbl.getStorageDescriptor());
com.facebook.presto.hive.metastore.Table prestoTable = GlueToPrestoConverter.convertTable(testTbl, testDb.getName());
assertEquals(prestoTable.getDataColumns().size(), 1);
}

@Test
public void testDeltaTableNonNullStorageDescriptor()
{
testTbl.setParameters(ImmutableMap.of(SPARK_TABLE_PROVIDER_KEY, DELTA_LAKE_PROVIDER));
assertNotNull(testTbl.getStorageDescriptor());
com.facebook.presto.hive.metastore.Table prestoTable = GlueToPrestoConverter.convertTable(testTbl, testDb.getName());
assertEquals(prestoTable.getDataColumns().size(), 1);
}

private static void assertColumnList(List<Column> actual, List<com.amazonaws.services.glue.model.Column> expected)
{
if (expected == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
*/
package com.facebook.presto.hive.metastore.glue;

import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder;
import com.amazonaws.services.glue.model.CreateTableRequest;
import com.amazonaws.services.glue.model.DeleteTableRequest;
import com.amazonaws.services.glue.model.TableInput;
import com.facebook.presto.hive.AbstractTestHiveClientLocal;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
Expand All @@ -22,22 +27,37 @@
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.hive.HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER;
import static com.facebook.presto.hive.HiveQueryRunner.METASTORE_CONTEXT;
import static com.facebook.presto.hive.metastore.MetastoreUtil.DELTA_LAKE_PROVIDER;
import static com.facebook.presto.hive.metastore.MetastoreUtil.ICEBERG_TABLE_TYPE_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.ICEBERG_TABLE_TYPE_VALUE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.SPARK_TABLE_PROVIDER_KEY;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isDeltaLakeTable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isIcebergTable;
import static java.util.Locale.ENGLISH;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -139,4 +159,57 @@ public void testGetPartitions()
dropTable(tablePartitionFormat);
}
}

@Test
public void testTableWithoutStorageDescriptor()
{
// StorageDescriptor is an Optional field for Glue tables. Iceberg and Delta Lake tables may not have it set.
SchemaTableName table = temporaryTable("test_missing_storage_descriptor");
DeleteTableRequest deleteTableRequest = new DeleteTableRequest()
.withDatabaseName(table.getSchemaName())
.withName(table.getTableName());
AWSGlueAsync glueClient = AWSGlueAsyncClientBuilder.defaultClient();
try {
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(
session.getIdentity(),
session.getQueryId(),
session.getClientInfo(),
session.getSource(),
getMetastoreHeaders(session),
false,
DEFAULT_COLUMN_CONVERTER_PROVIDER);
TableInput tableInput = new TableInput()
.withName(table.getTableName())
.withTableType(EXTERNAL_TABLE.name());
glueClient.createTable(new CreateTableRequest()
.withDatabaseName(database)
.withTableInput(tableInput));

assertThatThrownBy(() -> getMetastoreClient().getTable(metastoreContext, table.getSchemaName(), table.getTableName()))
.hasMessageStartingWith("Table StorageDescriptor is null for table");
glueClient.deleteTable(deleteTableRequest);

// Iceberg table
tableInput = tableInput.withParameters(ImmutableMap.of(ICEBERG_TABLE_TYPE_NAME, ICEBERG_TABLE_TYPE_VALUE));
glueClient.createTable(new CreateTableRequest()
.withDatabaseName(database)
.withTableInput(tableInput));
assertTrue(isIcebergTable(getMetastoreClient().getTable(metastoreContext, table.getSchemaName(), table.getTableName()).orElseThrow(() -> new NoSuchElementException())));
glueClient.deleteTable(deleteTableRequest);

// Delta Lake table
tableInput = tableInput.withParameters(ImmutableMap.of(SPARK_TABLE_PROVIDER_KEY, DELTA_LAKE_PROVIDER));
glueClient.createTable(new CreateTableRequest()
.withDatabaseName(database)
.withTableInput(tableInput));
assertTrue(isDeltaLakeTable(getMetastoreClient().getTable(metastoreContext, table.getSchemaName(), table.getTableName()).orElseThrow(() -> new NoSuchElementException())));
}
finally {
// Table cannot be dropped through HiveMetastore since a TableHandle cannot be created
glueClient.deleteTable(new DeleteTableRequest()
.withDatabaseName(table.getSchemaName())
.withName(table.getTableName()));
}
}
}