Skip to content

Commit

Permalink
Add Iceberg Glue and Hadoop catalog support
Browse files Browse the repository at this point in the history
  • Loading branch information
jackye1995 committed Oct 15, 2021
1 parent 13a11c6 commit 83e90c0
Show file tree
Hide file tree
Showing 44 changed files with 1,873 additions and 461 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public GlueHiveMetastore(
this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats);
}

private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
{
ClientConfiguration clientConfig = new ClientConfiguration()
.withMaxConnections(config.getMaxGlueConnections())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
Expand Down Expand Up @@ -1666,7 +1668,23 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
request.addLockComponent(createLockComponentForOperation(partition.getTableName(), operation, isDynamicPartitionWrite, Optional.of(partition.getPartitionId())));
}

LockRequest lockRequest = request.build();
acquireLock(identity, format("hive transaction %s for query %s", transactionId, queryId), request.build());
}

@Override
public long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName)
{
LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, dbName);
lockComponent.setTablename(tableName);
LockRequest lockRequest = new LockRequestBuilder(queryId)
.addLockComponent(lockComponent)
.setUser(identity.getUsername().get())
.build();
return acquireLock(identity, format("query %s", queryId), lockRequest);
}

private long acquireLock(HiveIdentity identity, String context, LockRequest lockRequest)
{
try {
LockResponse response = retry()
.stopOn(NoSuchTxnException.class, TxnAbortedException.class, MetaException.class)
Expand All @@ -1681,10 +1699,10 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
while (response.getState() == LockState.WAITING) {
if (Duration.nanosSince(waitStart).compareTo(maxWaitForLock) > 0) {
// timed out
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId)));
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d for %s", lockId, context)));
}

log.debug("Waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId);
log.debug("Waiting for lock %d for %s", lockId, context);

response = retry()
.stopOn(NoSuchTxnException.class, NoSuchLockException.class, TxnAbortedException.class, MetaException.class)
Expand All @@ -1698,6 +1716,8 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
if (response.getState() != LockState.ACQUIRED) {
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, "Could not acquire lock. Lock in state " + response.getState()));
}

return response.getLockid();
}
catch (TException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
Expand All @@ -1710,15 +1730,16 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
private <T extends Exception> T unlockSuppressing(HiveIdentity identity, long lockId, T exception)
{
try {
unlockTableLock(identity, lockId);
releaseTableLock(identity, lockId);
}
catch (RuntimeException e) {
exception.addSuppressed(e);
}
return exception;
}

private void unlockTableLock(HiveIdentity identity, long lockId)
@Override
public void releaseTableLock(HiveIdentity identity, long lockId)
{
try {
retry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ default void acquireTableWriteLock(HiveIdentity identity, String queryId, long t
throw new UnsupportedOperationException();
}

default long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName)
{
throw new UnsupportedOperationException();
}

default void releaseTableLock(HiveIdentity identity, long lockId)
{
throw new UnsupportedOperationException();
}

default void updateTableWriteId(HiveIdentity identity, String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange)
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Tab
return result;
}

static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table)
public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table)
{
org.apache.hadoop.hive.metastore.api.Table result = new org.apache.hadoop.hive.metastore.api.Table();
result.setDbName(table.getDatabaseName());
Expand Down
10 changes: 10 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Loading

0 comments on commit 83e90c0

Please sign in to comment.