Skip to content

Commit

Permalink
Copy commitToExistingTable into subclasses
Browse files Browse the repository at this point in the history
In the following commit, the implementation will diverge.
  • Loading branch information
findepi committed Oct 15, 2021
1 parent 79a6f11 commit 803aeba
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,44 +224,7 @@ protected void commitNewTable(TableMetadata metadata)
metastore.createTable(identity, table, privileges);
}

protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

// TODO: use metastore locking

Table table;
try {
Table currentTable = getTable();

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}

table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
}
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
}
throw e;
}

PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
HiveIdentity identity = new HiveIdentity(session);
metastore.replaceTable(identity, database, tableName, table, privileges);
}
protected abstract void commitToExistingTable(TableMetadata base, TableMetadata metadata);

@Override
public FileIO io()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@
*/
package io.trino.plugin.iceberg.catalog.file;

import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations;
import io.trino.spi.connector.ConnectorSession;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;

@NotThreadSafe
public class FileMetastoreTableOperations
extends AbstractMetastoreTableOperations
Expand All @@ -37,4 +45,44 @@ public FileMetastoreTableOperations(
{
super(fileIo, metastore, session, database, table, owner, location);
}

@Override
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

// TODO: use metastore locking

Table table;
try {
Table currentTable = getTable();

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}

table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
}
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
}
throw e;
}

PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
HiveIdentity identity = new HiveIdentity(session);
metastore.replaceTable(identity, database, tableName, table, privileges);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@
*/
package io.trino.plugin.iceberg.catalog.hms;

import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations;
import io.trino.spi.connector.ConnectorSession;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;

@NotThreadSafe
public class HiveMetastoreTableOperations
extends AbstractMetastoreTableOperations
Expand All @@ -37,4 +45,44 @@ public HiveMetastoreTableOperations(
{
super(fileIo, metastore, session, database, table, owner, location);
}

@Override
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

// TODO: use metastore locking

Table table;
try {
Table currentTable = getTable();

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}

table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
}
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
}
throw e;
}

PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
HiveIdentity identity = new HiveIdentity(session);
metastore.replaceTable(identity, database, tableName, table, privileges);
}
}

0 comments on commit 803aeba

Please sign in to comment.