From b79d485c8ba0feb1458c1b2eb97fba6659394cda Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Mon, 14 Mar 2022 11:09:57 -0700 Subject: [PATCH] Fix metadata caching with Hive ACID Add explicit AcidTransactionOwner to pass through the username even when there is a caching layer in place. --- .../plugin/hive/HiveMetastoreClosure.java | 25 ++++++--- .../hive/metastore/AcidTransactionOwner.java | 52 +++++++++++++++++++ .../metastore/ForwardingHiveMetastore.java | 10 ++-- .../plugin/hive/metastore/HiveMetastore.java | 18 +++++-- .../hive/metastore/HiveTransaction.java | 6 ++- .../SemiTransactionalHiveMetastore.java | 17 ++++-- .../metastore/cache/CachingHiveMetastore.java | 20 ++++--- .../thrift/BridgingHiveMetastore.java | 25 ++++++--- .../metastore/thrift/ThriftHiveMetastore.java | 44 ++++++++++++---- .../metastore/thrift/ThriftMetastore.java | 28 ++++++++-- .../hms/HiveMetastoreTableOperations.java | 2 + 11 files changed, 202 insertions(+), 45 deletions(-) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/AcidTransactionOwner.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetastoreClosure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetastoreClosure.java index 850fc07f33e7..4224d0a8ec74 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetastoreClosure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetastoreClosure.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.plugin.hive.metastore.AcidTransactionOwner; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HivePrincipal; @@ -305,9 +306,9 @@ public Set listTablePrivileges(String databaseName, String ta return delegate.listTablePrivileges(databaseName, tableName, tableOwner, principal); } - public long openTransaction() + public long openTransaction(AcidTransactionOwner transactionOwner) { - return delegate.openTransaction(); + return delegate.openTransaction(transactionOwner); } public void commitTransaction(long transactionId) @@ -325,9 +326,14 @@ public void sendTransactionHeartbeat(long transactionId) delegate.sendTransactionHeartbeat(transactionId); } - public void acquireSharedReadLock(String queryId, long transactionId, List fullTables, List partitions) + public void acquireSharedReadLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + List fullTables, + List partitions) { - delegate.acquireSharedReadLock(queryId, transactionId, fullTables, partitions); + delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions); } public String getValidWriteIds(List tables, long currentTransactionId) @@ -345,9 +351,16 @@ public long allocateWriteId(String dbName, String tableName, long transactionId) return delegate.allocateWriteId(dbName, tableName, transactionId); } - public void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isPartitioned) + public void acquireTableWriteLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + String dbName, + String tableName, + DataOperationType operation, + boolean isPartitioned) { - delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isPartitioned); + delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isPartitioned); } public void updateTableWriteId(String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/AcidTransactionOwner.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/AcidTransactionOwner.java new file mode 100644 index 000000000000..c930e52e098c --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/AcidTransactionOwner.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore; + +import static java.util.Objects.requireNonNull; + +// this is just a type safe wrapper to make the APIs cleaner +public class AcidTransactionOwner +{ + private final String owner; + + public AcidTransactionOwner(String owner) + { + this.owner = requireNonNull(owner, "owner is null"); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AcidTransactionOwner that = (AcidTransactionOwner) o; + return owner.equals(that.owner); + } + + @Override + public int hashCode() + { + return owner.hashCode(); + } + + @Override + public String toString() + { + return owner; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/ForwardingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/ForwardingHiveMetastore.java index b8e1b915b8c1..d783fd74cf36 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/ForwardingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/ForwardingHiveMetastore.java @@ -346,9 +346,9 @@ public Set listTablePrivileges(String databaseName, } @Override - public long openTransaction() + public long openTransaction(AcidTransactionOwner transactionOwner) { - return delegate.openTransaction(); + return delegate.openTransaction(transactionOwner); } @Override @@ -365,12 +365,13 @@ public void sendTransactionHeartbeat(long transactionId) @Override public void acquireSharedReadLock( + AcidTransactionOwner transactionOwner, String queryId, long transactionId, List fullTables, List partitions) { - delegate.acquireSharedReadLock(queryId, transactionId, fullTables, partitions); + delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions); } @Override @@ -393,6 +394,7 @@ public long allocateWriteId(String dbName, String tableName, long transactionId) @Override public void acquireTableWriteLock( + AcidTransactionOwner transactionOwner, String queryId, long transactionId, String dbName, @@ -400,7 +402,7 @@ public void acquireTableWriteLock( DataOperationType operation, boolean isDynamicPartitionWrite) { - delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite); + delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastore.java index c2aefc3201cc..e8ad78aec96e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastore.java @@ -142,7 +142,7 @@ default void updatePartitionStatistics(Table table, String partitionName, Functi */ Set listTablePrivileges(String databaseName, String tableName, Optional tableOwner, Optional principal); - default long openTransaction() + default long openTransaction(AcidTransactionOwner transactionOwner) { throw new UnsupportedOperationException(); } @@ -162,7 +162,12 @@ default void sendTransactionHeartbeat(long transactionId) throw new UnsupportedOperationException(); } - default void acquireSharedReadLock(String queryId, long transactionId, List fullTables, List partitions) + default void acquireSharedReadLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + List fullTables, + List partitions) { throw new UnsupportedOperationException(); } @@ -182,7 +187,14 @@ default long allocateWriteId(String dbName, String tableName, long transactionId throw new UnsupportedOperationException(); } - default void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isDynamicPartitionWrite) + default void acquireTableWriteLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + String dbName, + String tableName, + DataOperationType operation, + boolean isDynamicPartitionWrite) { throw new UnsupportedOperationException(); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveTransaction.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveTransaction.java index e560c977b63a..66b3d337bc60 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveTransaction.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveTransaction.java @@ -60,7 +60,10 @@ public AcidTransaction getTransaction() return transaction; } - public ValidTxnWriteIdList getValidWriteIds(HiveMetastoreClosure metastore, HiveTableHandle tableHandle) + public ValidTxnWriteIdList getValidWriteIds( + AcidTransactionOwner transactionOwner, + HiveMetastoreClosure metastore, + HiveTableHandle tableHandle) { List lockedTables; List lockedPartitions; @@ -76,6 +79,7 @@ public ValidTxnWriteIdList getValidWriteIds(HiveMetastoreClosure metastore, Hive // Different calls for same table might need to lock different partitions so acquire locks every time metastore.acquireSharedReadLock( + transactionOwner, queryId, transactionId, lockedTables, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 38bae577ab2a..2367645699bd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -1300,6 +1300,7 @@ private AcidTransaction beginOperation(ConnectorSession session, Table table, Ac // because we need the writeId in order to write the delta files. HiveTransaction hiveTransaction = makeHiveTransaction(session, transactionId -> { acquireTableWriteLock( + new AcidTransactionOwner(session.getUser()), queryId, transactionId, table.getDatabaseName(), @@ -1322,7 +1323,8 @@ private HiveTransaction makeHiveTransaction(ConnectorSession session, Function heartbeatTask = heartbeatExecutor.scheduleAtFixedRate( @@ -1357,7 +1359,7 @@ public synchronized Optional getValidWriteIds(ConnectorSess .get()); } - return Optional.of(currentHiveTransaction.get().getValidWriteIds(delegate, tableHandle)); + return Optional.of(currentHiveTransaction.get().getValidWriteIds(new AcidTransactionOwner(session.getUser()), delegate, tableHandle)); } public synchronized void cleanupQuery(ConnectorSession session) @@ -3601,9 +3603,16 @@ private long allocateWriteId(String dbName, String tableName, long transactionId return delegate.allocateWriteId(dbName, tableName, transactionId); } - private void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isPartitioned) + private void acquireTableWriteLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + String dbName, + String tableName, + DataOperationType operation, + boolean isPartitioned) { - delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isPartitioned); + delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isPartitioned); } public void updateTableWriteId(String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index 5674b659ab17..a5ad8363e2bf 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -29,6 +29,7 @@ import io.trino.plugin.hive.PartitionStatistics; import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.plugin.hive.metastore.AcidTransactionOwner; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HivePartitionName; @@ -890,9 +891,9 @@ private Optional loadConfigValue(String name) } @Override - public long openTransaction() + public long openTransaction(AcidTransactionOwner transactionOwner) { - return delegate.openTransaction(); + return delegate.openTransaction(transactionOwner); } @Override @@ -914,9 +915,14 @@ public void sendTransactionHeartbeat(long transactionId) } @Override - public void acquireSharedReadLock(String queryId, long transactionId, List fullTables, List partitions) + public void acquireSharedReadLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + List fullTables, + List partitions) { - delegate.acquireSharedReadLock(queryId, transactionId, fullTables, partitions); + delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions); } @Override @@ -937,14 +943,16 @@ public long allocateWriteId(String dbName, String tableName, long transactionId) } @Override - public void acquireTableWriteLock(String queryId, + public void acquireTableWriteLock( + AcidTransactionOwner transactionOwner, + String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isDynamicPartitionWrite) { - delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite); + delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java index 37f7f1dcf3a6..db8d01f8b372 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java @@ -20,6 +20,7 @@ import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.AcidTransactionOwner; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HivePrincipal; @@ -471,9 +472,9 @@ public Optional getConfigValue(String name) } @Override - public long openTransaction() + public long openTransaction(AcidTransactionOwner transactionOwner) { - return delegate.openTransaction(identity); + return delegate.openTransaction(identity, transactionOwner); } @Override @@ -495,9 +496,14 @@ public void sendTransactionHeartbeat(long transactionId) } @Override - public void acquireSharedReadLock(String queryId, long transactionId, List fullTables, List partitions) + public void acquireSharedReadLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + List fullTables, + List partitions) { - delegate.acquireSharedReadLock(identity, queryId, transactionId, fullTables, partitions); + delegate.acquireSharedReadLock(identity, transactionOwner, queryId, transactionId, fullTables, partitions); } @Override @@ -513,9 +519,16 @@ public long allocateWriteId(String dbName, String tableName, long transactionId) } @Override - public void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isDynamicPartitionWrite) + public void acquireTableWriteLock( + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + String dbName, + String tableName, + DataOperationType operation, + boolean isDynamicPartitionWrite) { - delegate.acquireTableWriteLock(identity, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite); + delegate.acquireTableWriteLock(identity, transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java index efbeb99dd239..6fca52c2dea9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java @@ -36,6 +36,7 @@ import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.AcidTransactionOwner; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.HiveColumnStatistics; import io.trino.plugin.hive.metastore.HivePrincipal; @@ -1574,15 +1575,15 @@ public Set listTablePrivileges(String databaseName, String ta } @Override - public long openTransaction(HiveIdentity identity) + public long openTransaction(HiveIdentity identity, AcidTransactionOwner transactionOwner) { - checkArgument(!identity.getUsername().map(String::isEmpty).orElse(true), "User should be provided to open transaction"); + requireNonNull(transactionOwner, "transactionOwner is null"); try { return retry() .stopOnIllegalExceptions() .run("openTransaction", stats.getOpenTransaction().wrap(() -> { try (ThriftMetastoreClient metastoreClient = createMetastoreClient(identity)) { - return metastoreClient.openTransaction(identity.getUsername().get()); + return metastoreClient.openTransaction(transactionOwner.toString()); } })); } @@ -1658,19 +1659,34 @@ public void sendTransactionHeartbeat(HiveIdentity identity, long transactionId) } @Override - public void acquireSharedReadLock(HiveIdentity identity, String queryId, long transactionId, List fullTables, List partitions) + public void acquireSharedReadLock( + HiveIdentity identity, + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + List fullTables, + List partitions) { - acquireSharedLock(identity, queryId, transactionId, fullTables, partitions, DataOperationType.SELECT, false); + acquireSharedLock(identity, transactionOwner, queryId, transactionId, fullTables, partitions, DataOperationType.SELECT, false); } @Override - public void acquireTableWriteLock(HiveIdentity identity, String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isDynamicPartitionWrite) + public void acquireTableWriteLock( + HiveIdentity identity, + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + String dbName, + String tableName, + DataOperationType operation, + boolean isDynamicPartitionWrite) { - acquireSharedLock(identity, queryId, transactionId, ImmutableList.of(new SchemaTableName(dbName, tableName)), Collections.emptyList(), operation, isDynamicPartitionWrite); + acquireSharedLock(identity, transactionOwner, queryId, transactionId, ImmutableList.of(new SchemaTableName(dbName, tableName)), Collections.emptyList(), operation, isDynamicPartitionWrite); } private void acquireSharedLock( HiveIdentity identity, + AcidTransactionOwner transactionOwner, String queryId, long transactionId, List fullTables, @@ -1679,7 +1695,7 @@ private void acquireSharedLock( boolean isDynamicPartitionWrite) { requireNonNull(operation, "operation is null"); - checkArgument(!identity.getUsername().map(String::isEmpty).orElse(true), "User should be provided to acquire locks"); + requireNonNull(transactionOwner, "transactionOwner is null"); requireNonNull(queryId, "queryId is null"); if (fullTables.isEmpty() && partitions.isEmpty()) { @@ -1688,7 +1704,7 @@ private void acquireSharedLock( LockRequestBuilder request = new LockRequestBuilder(queryId) .setTransactionId(transactionId) - .setUser(identity.getUsername().get()); + .setUser(transactionOwner.toString()); for (SchemaTableName table : fullTables) { request.addLockComponent(createLockComponentForOperation(table, operation, isDynamicPartitionWrite, Optional.empty())); @@ -1702,13 +1718,19 @@ private void acquireSharedLock( } @Override - public long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName) + public long acquireTableExclusiveLock( + HiveIdentity identity, + AcidTransactionOwner transactionOwner, + String queryId, + String dbName, + String tableName) { + requireNonNull(transactionOwner, "transactionOwner is null"); LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, dbName); lockComponent.setTablename(tableName); LockRequest lockRequest = new LockRequestBuilder(queryId) .addLockComponent(lockComponent) - .setUser(identity.getUsername().get()) + .setUser(transactionOwner.toString()) .build(); return acquireLock(identity, format("query %s", queryId), lockRequest); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java index f3177a53d6ff..b497b260d188 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java @@ -18,6 +18,7 @@ import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.AcidTransactionOwner; import io.trino.plugin.hive.metastore.HivePrincipal; import io.trino.plugin.hive.metastore.HivePrivilegeInfo; import io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege; @@ -134,7 +135,7 @@ default Optional> getFields(HiveIdentity identity, String data return Optional.of(table.get().getSd().getCols()); } - default long openTransaction(HiveIdentity identity) + default long openTransaction(HiveIdentity identity, AcidTransactionOwner transactionOwner) { throw new UnsupportedOperationException(); } @@ -154,7 +155,13 @@ default void sendTransactionHeartbeat(HiveIdentity identity, long transactionId) throw new UnsupportedOperationException(); } - default void acquireSharedReadLock(HiveIdentity identity, String queryId, long transactionId, List fullTables, List partitions) + default void acquireSharedReadLock( + HiveIdentity identity, + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + List fullTables, + List partitions) { throw new UnsupportedOperationException(); } @@ -174,12 +181,25 @@ default long allocateWriteId(HiveIdentity identity, String dbName, String tableN throw new UnsupportedOperationException(); } - default void acquireTableWriteLock(HiveIdentity identity, String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isDynamicPartitionWrite) + default void acquireTableWriteLock( + HiveIdentity identity, + AcidTransactionOwner transactionOwner, + String queryId, + long transactionId, + String dbName, + String tableName, + DataOperationType operation, + boolean isDynamicPartitionWrite) { throw new UnsupportedOperationException(); } - default long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName) + default long acquireTableExclusiveLock( + HiveIdentity identity, + AcidTransactionOwner transactionOwner, + String queryId, + String dbName, + String tableName) { throw new UnsupportedOperationException(); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java index 97f53c3eb82c..2eea11aea064 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.hms; import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.AcidTransactionOwner; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.PrincipalPrivileges; @@ -67,6 +68,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) long lockId = thriftMetastore.acquireTableExclusiveLock( identity, + new AcidTransactionOwner(session.getUser()), session.getQueryId(), database, tableName);