Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue catalog cleanup #18543

Merged
merged 6 commits into from
Aug 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ private List<String> listNamespaces(ConnectorSession session, Optional<String> n
public void dropNamespace(ConnectorSession session, String namespace)
{
try {
glueTableCache.invalidateAll();
stats.getDeleteDatabase().call(() ->
glueClient.deleteDatabase(new DeleteDatabaseRequest().withName(namespace)));
}
Expand Down Expand Up @@ -522,7 +523,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName schemaTabl

private com.amazonaws.services.glue.model.Table dropTableFromMetastore(ConnectorSession session, SchemaTableName schemaTableName)
{
com.amazonaws.services.glue.model.Table table = getTable(session, schemaTableName)
com.amazonaws.services.glue.model.Table table = getTableAndCacheMetadata(session, schemaTableName)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
if (!isIcebergTable(getTableParameters(table))) {
throw new UnknownTableTypeException(schemaTableName);
Expand All @@ -542,7 +543,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
{
boolean newTableCreated = false;
try {
com.amazonaws.services.glue.model.Table table = getTable(session, from)
com.amazonaws.services.glue.model.Table table = getTableAndCacheMetadata(session, from)
.orElseThrow(() -> new TableNotFoundException(from));
Map<String, String> tableParameters = new HashMap<>(getTableParameters(table));
FileIO io = loadTable(session, from).io();
Expand All @@ -559,10 +560,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
metadataLocation,
tableParameters,
cacheTableMetadata);
CreateTableRequest createTableRequest = new CreateTableRequest()
.withDatabaseName(to.getSchemaName())
.withTableInput(tableInput);
stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest));
createTable(to.getSchemaName(), tableInput);
newTableCreated = true;
deleteTable(from.getSchemaName(), from.getTableName());
}
Expand All @@ -581,86 +579,94 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
}
}

private Optional<com.amazonaws.services.glue.model.Table> getTable(ConnectorSession session, SchemaTableName schemaTableName)
private Optional<com.amazonaws.services.glue.model.Table> getTableAndCacheMetadata(ConnectorSession session, SchemaTableName schemaTableName)
{
com.amazonaws.services.glue.model.Table table;
try {
com.amazonaws.services.glue.model.Table table = stats.getGetTable().call(() ->
glueClient.getTable(new GetTableRequest()
.withDatabaseName(schemaTableName.getSchemaName())
.withName(schemaTableName.getTableName()))
.getTable());

Map<String, String> parameters = getTableParameters(table);
if (isIcebergTable(parameters) && !tableMetadataCache.containsKey(schemaTableName)) {
if (viewCache.containsKey(schemaTableName) || materializedViewCache.containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Table cannot also be a view/materialized view");
}
table = getTable(schemaTableName, false);
}
catch (TableNotFoundException e) {
return Optional.empty();
}

String metadataLocation = parameters.get(METADATA_LOCATION_PROP);
try {
// Cache the TableMetadata while we have the Table retrieved anyway
TableOperations operations = tableOperationsProvider.createTableOperations(
this,
session,
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
Optional.empty(),
Optional.empty());
FileIO io = operations.io();
tableMetadataCache.put(schemaTableName, TableMetadataParser.read(io, io.newInputFile(metadataLocation)));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation);
}
Map<String, String> parameters = getTableParameters(table);
if (isIcebergTable(parameters) && !tableMetadataCache.containsKey(schemaTableName)) {
if (viewCache.containsKey(schemaTableName) || materializedViewCache.containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Table cannot also be a view/materialized view");
}
else if (isTrinoMaterializedView(getTableType(table), parameters)) {
if (viewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Materialized View cannot also be a table or view");
}

try {
createMaterializedViewDefinition(session, schemaTableName, table)
.ifPresent(materializedView -> materializedViewCache.put(schemaTableName, materializedView));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName);
}
String metadataLocation = parameters.get(METADATA_LOCATION_PROP);
try {
// Cache the TableMetadata while we have the Table retrieved anyway
TableOperations operations = tableOperationsProvider.createTableOperations(
this,
session,
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
Optional.empty(),
Optional.empty());
FileIO io = operations.io();
tableMetadataCache.put(schemaTableName, TableMetadataParser.read(io, io.newInputFile(metadataLocation)));
}
else if (isPrestoView(parameters) && !viewCache.containsKey(schemaTableName)) {
if (materializedViewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. View cannot also be a materialized view or table");
}

try {
TrinoViewUtil.getView(schemaTableName,
Optional.ofNullable(table.getViewOriginalText()),
getTableType(table),
parameters,
Optional.ofNullable(table.getOwner()))
.ifPresent(viewDefinition -> viewCache.put(schemaTableName, viewDefinition));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache view from %s", schemaTableName);
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation);
}
}
else if (isTrinoMaterializedView(getTableType(table), parameters)) {
if (viewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Materialized View cannot also be a table or view");
}

return Optional.of(table);
try {
createMaterializedViewDefinition(session, schemaTableName, table)
.ifPresent(materializedView -> materializedViewCache.put(schemaTableName, materializedView));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName);
}
}
catch (EntityNotFoundException e) {
return Optional.empty();
else if (isPrestoView(parameters) && !viewCache.containsKey(schemaTableName)) {
if (materializedViewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. View cannot also be a materialized view or table");
}

try {
TrinoViewUtil.getView(schemaTableName,
Optional.ofNullable(table.getViewOriginalText()),
getTableType(table),
parameters,
Optional.ofNullable(table.getOwner()))
.ifPresent(viewDefinition -> viewCache.put(schemaTableName, viewDefinition));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache view from %s", schemaTableName);
}
}

return Optional.of(table);
}

private void createTable(String schemaName, TableInput tableInput)
{
glueTableCache.invalidateAll();
stats.getCreateTable().call(() ->
glueClient.createTable(new CreateTableRequest()
.withDatabaseName(schemaName)
.withTableInput(tableInput)));
}

private void updateTable(String schemaName, TableInput tableInput)
{
glueTableCache.invalidateAll();
stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withDatabaseName(schemaName)
.withTableInput(tableInput)));
}

private void deleteTable(String schema, String table)
{
glueTableCache.invalidateAll();
stats.getDeleteTable().call(() ->
glueClient.deleteTable(new DeleteTableRequest()
.withDatabaseName(schema)
Expand Down Expand Up @@ -720,25 +726,19 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,

private void doCreateView(ConnectorSession session, SchemaTableName schemaViewName, TableInput viewTableInput, boolean replace)
{
Optional<com.amazonaws.services.glue.model.Table> existing = getTable(session, schemaViewName);
Optional<com.amazonaws.services.glue.model.Table> existing = getTableAndCacheMetadata(session, schemaViewName);
if (existing.isPresent()) {
if (!replace || !isPrestoView(getTableParameters(existing.get()))) {
// TODO: ViewAlreadyExists is misleading if the name is used by a table https://github.com/trinodb/trino/issues/10037
throw new ViewAlreadyExistsException(schemaViewName);
}

stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withDatabaseName(schemaViewName.getSchemaName())
.withTableInput(viewTableInput)));
updateTable(schemaViewName.getSchemaName(), viewTableInput);
return;
}

try {
stats.getCreateTable().call(() ->
glueClient.createTable(new CreateTableRequest()
.withDatabaseName(schemaViewName.getSchemaName())
.withTableInput(viewTableInput)));
createTable(schemaViewName.getSchemaName(), viewTableInput);
}
catch (AlreadyExistsException e) {
throw new ViewAlreadyExistsException(schemaViewName);
Expand All @@ -750,18 +750,15 @@ public void renameView(ConnectorSession session, SchemaTableName source, SchemaT
{
boolean newTableCreated = false;
try {
com.amazonaws.services.glue.model.Table existingView = getTable(session, source)
com.amazonaws.services.glue.model.Table existingView = getTableAndCacheMetadata(session, source)
.orElseThrow(() -> new TableNotFoundException(source));
viewCache.remove(source);
TableInput viewTableInput = getViewTableInput(
target.getTableName(),
existingView.getViewOriginalText(),
existingView.getOwner(),
createViewProperties(session, trinoVersion, TRINO_CREATED_BY_VALUE));
CreateTableRequest createTableRequest = new CreateTableRequest()
.withDatabaseName(target.getSchemaName())
.withTableInput(viewTableInput);
stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest));
createTable(target.getSchemaName(), viewTableInput);
newTableCreated = true;
deleteTable(source.getSchemaName(), source.getTableName());
}
Expand Down Expand Up @@ -846,7 +843,7 @@ public Optional<ConnectorViewDefinition> getView(ConnectorSession session, Schem
return Optional.empty();
}

Optional<com.amazonaws.services.glue.model.Table> table = getTable(session, viewName);
Optional<com.amazonaws.services.glue.model.Table> table = getTableAndCacheMetadata(session, viewName);
if (table.isEmpty()) {
return Optional.empty();
}
Expand Down Expand Up @@ -904,10 +901,7 @@ private void updateView(ConnectorSession session, SchemaTableName viewName, Conn
createViewProperties(session, trinoVersion, TRINO_CREATED_BY_VALUE));

try {
stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withDatabaseName(viewName.getSchemaName())
.withTableInput(viewTableInput)));
updateTable(viewName.getSchemaName(), viewTableInput);
}
catch (AmazonServiceException e) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, e);
Expand Down Expand Up @@ -953,7 +947,7 @@ public void createMaterializedView(
boolean replace,
boolean ignoreExisting)
{
Optional<com.amazonaws.services.glue.model.Table> existing = getTable(session, viewName);
Optional<com.amazonaws.services.glue.model.Table> existing = getTableAndCacheMetadata(session, viewName);

if (existing.isPresent()) {
if (!isTrinoMaterializedView(getTableType(existing.get()), getTableParameters(existing.get()))) {
Expand All @@ -978,10 +972,7 @@ public void createMaterializedView(

if (existing.isPresent()) {
try {
stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withDatabaseName(viewName.getSchemaName())
.withTableInput(materializedViewTableInput)));
updateTable(viewName.getSchemaName(), materializedViewTableInput);
}
catch (RuntimeException e) {
try {
Expand Down Expand Up @@ -1032,10 +1023,7 @@ private void updateMaterializedView(ConnectorSession session, SchemaTableName vi
createMaterializedViewProperties(session, newDefinition.getStorageTable().orElseThrow().getSchemaTableName()));

try {
stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withDatabaseName(viewName.getSchemaName())
.withTableInput(materializedViewTableInput)));
updateTable(viewName.getSchemaName(), materializedViewTableInput);
}
catch (AmazonServiceException e) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, e);
Expand All @@ -1045,7 +1033,7 @@ private void updateMaterializedView(ConnectorSession session, SchemaTableName vi
@Override
public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
com.amazonaws.services.glue.model.Table view = getTable(session, viewName)
com.amazonaws.services.glue.model.Table view = getTableAndCacheMetadata(session, viewName)
.orElseThrow(() -> new MaterializedViewNotFoundException(viewName));

if (!isTrinoMaterializedView(getTableType(view), getTableParameters(view))) {
Expand Down Expand Up @@ -1085,7 +1073,7 @@ protected Optional<ConnectorMaterializedViewDefinition> doGetMaterializedView(Co
return Optional.empty();
}

Optional<com.amazonaws.services.glue.model.Table> maybeTable = getTable(session, viewName);
Optional<com.amazonaws.services.glue.model.Table> maybeTable = getTableAndCacheMetadata(session, viewName);
if (maybeTable.isEmpty()) {
return Optional.empty();
}
Expand Down Expand Up @@ -1139,18 +1127,15 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou
{
boolean newTableCreated = false;
try {
com.amazonaws.services.glue.model.Table glueTable = getTable(session, source)
com.amazonaws.services.glue.model.Table glueTable = getTableAndCacheMetadata(session, source)
.orElseThrow(() -> new TableNotFoundException(source));
materializedViewCache.remove(source);
Map<String, String> tableParameters = getTableParameters(glueTable);
if (!isTrinoMaterializedView(getTableType(glueTable), tableParameters)) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + source);
}
TableInput tableInput = getMaterializedViewTableInput(target.getTableName(), glueTable.getViewOriginalText(), glueTable.getOwner(), tableParameters);
CreateTableRequest createTableRequest = new CreateTableRequest()
.withDatabaseName(target.getSchemaName())
.withTableInput(tableInput);
stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest));
createTable(target.getSchemaName(), tableInput);
newTableCreated = true;
deleteTable(source.getSchemaName(), source.getTableName());
}
Expand Down Expand Up @@ -1186,7 +1171,7 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
tableName.getSchemaName(),
tableName.getTableName().substring(0, metadataMarkerIndex));

Optional<com.amazonaws.services.glue.model.Table> table = getTable(session, new SchemaTableName(tableNameBase.getSchemaName(), tableNameBase.getTableName()));
Optional<com.amazonaws.services.glue.model.Table> table = getTableAndCacheMetadata(session, new SchemaTableName(tableNameBase.getSchemaName(), tableNameBase.getTableName()));

if (table.isEmpty() || VIRTUAL_VIEW.name().equals(getTableTypeNullable(table.get()))) {
return Optional.empty();
Expand Down