Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Iceberg Glue and Hadoop catalog support #9646

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public GlueHiveMetastore(
this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats);
}

private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
{
ClientConfiguration clientConfig = new ClientConfiguration()
.withMaxConnections(config.getMaxGlueConnections())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
Expand Down Expand Up @@ -1666,7 +1668,23 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
request.addLockComponent(createLockComponentForOperation(partition.getTableName(), operation, isDynamicPartitionWrite, Optional.of(partition.getPartitionId())));
}

LockRequest lockRequest = request.build();
acquireLock(identity, format("hive transaction %s for query %s", transactionId, queryId), request.build());
}

@Override
public long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName)
{
LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, dbName);
lockComponent.setTablename(tableName);
LockRequest lockRequest = new LockRequestBuilder(queryId)
.addLockComponent(lockComponent)
.setUser(identity.getUsername().get())
.build();
return acquireLock(identity, format("query %s", queryId), lockRequest);
}

private long acquireLock(HiveIdentity identity, String context, LockRequest lockRequest)
{
try {
LockResponse response = retry()
.stopOn(NoSuchTxnException.class, TxnAbortedException.class, MetaException.class)
Expand All @@ -1681,10 +1699,10 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
while (response.getState() == LockState.WAITING) {
if (Duration.nanosSince(waitStart).compareTo(maxWaitForLock) > 0) {
// timed out
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId)));
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d for %s", lockId, context)));
}

log.debug("Waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId);
log.debug("Waiting for lock %d for %s", lockId, context);

response = retry()
.stopOn(NoSuchTxnException.class, NoSuchLockException.class, TxnAbortedException.class, MetaException.class)
Expand All @@ -1698,6 +1716,8 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
if (response.getState() != LockState.ACQUIRED) {
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, "Could not acquire lock. Lock in state " + response.getState()));
}

return response.getLockid();
}
catch (TException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
Expand All @@ -1710,15 +1730,16 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
private <T extends Exception> T unlockSuppressing(HiveIdentity identity, long lockId, T exception)
{
try {
unlockTableLock(identity, lockId);
releaseTableLock(identity, lockId);
}
catch (RuntimeException e) {
exception.addSuppressed(e);
}
return exception;
}

private void unlockTableLock(HiveIdentity identity, long lockId)
@Override
public void releaseTableLock(HiveIdentity identity, long lockId)
{
try {
retry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ default void acquireTableWriteLock(HiveIdentity identity, String queryId, long t
throw new UnsupportedOperationException();
}

default long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName)
{
throw new UnsupportedOperationException();
}

default void releaseTableLock(HiveIdentity identity, long lockId)
{
throw new UnsupportedOperationException();
}

default void updateTableWriteId(HiveIdentity identity, String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange)
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Tab
return result;
}

static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table)
public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table)
{
org.apache.hadoop.hive.metastore.api.Table result = new org.apache.hadoop.hive.metastore.api.Table();
result.setDbName(table.getDatabaseName());
Expand Down
10 changes: 10 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Loading