Skip to content

Commit

Permalink
Fix metadata caching with Hive ACID
Browse files Browse the repository at this point in the history
Add explicit AcidTransactionOwner to pass through the username even when
there is a caching layer in place.
  • Loading branch information
dain committed Mar 15, 2022
1 parent 876f3af commit b79d485
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.acid.AcidOperation;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.metastore.AcidTransactionOwner;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HivePrincipal;
Expand Down Expand Up @@ -305,9 +306,9 @@ public Set<HivePrivilegeInfo> listTablePrivileges(String databaseName, String ta
return delegate.listTablePrivileges(databaseName, tableName, tableOwner, principal);
}

public long openTransaction()
public long openTransaction(AcidTransactionOwner transactionOwner)
{
return delegate.openTransaction();
return delegate.openTransaction(transactionOwner);
}

public void commitTransaction(long transactionId)
Expand All @@ -325,9 +326,14 @@ public void sendTransactionHeartbeat(long transactionId)
delegate.sendTransactionHeartbeat(transactionId);
}

public void acquireSharedReadLock(String queryId, long transactionId, List<SchemaTableName> fullTables, List<HivePartition> partitions)
public void acquireSharedReadLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
List<SchemaTableName> fullTables,
List<HivePartition> partitions)
{
delegate.acquireSharedReadLock(queryId, transactionId, fullTables, partitions);
delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions);
}

public String getValidWriteIds(List<SchemaTableName> tables, long currentTransactionId)
Expand All @@ -345,9 +351,16 @@ public long allocateWriteId(String dbName, String tableName, long transactionId)
return delegate.allocateWriteId(dbName, tableName, transactionId);
}

public void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isPartitioned)
public void acquireTableWriteLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
String dbName,
String tableName,
DataOperationType operation,
boolean isPartitioned)
{
delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isPartitioned);
delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isPartitioned);
}

public void updateTableWriteId(String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.metastore;

import static java.util.Objects.requireNonNull;

// this is just a type safe wrapper to make the APIs cleaner
public class AcidTransactionOwner
{
private final String owner;

public AcidTransactionOwner(String owner)
{
this.owner = requireNonNull(owner, "owner is null");
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AcidTransactionOwner that = (AcidTransactionOwner) o;
return owner.equals(that.owner);
}

@Override
public int hashCode()
{
return owner.hashCode();
}

@Override
public String toString()
{
return owner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ public Set<HivePrivilegeInfo> listTablePrivileges(String databaseName,
}

@Override
public long openTransaction()
public long openTransaction(AcidTransactionOwner transactionOwner)
{
return delegate.openTransaction();
return delegate.openTransaction(transactionOwner);
}

@Override
Expand All @@ -365,12 +365,13 @@ public void sendTransactionHeartbeat(long transactionId)

@Override
public void acquireSharedReadLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
List<SchemaTableName> fullTables,
List<HivePartition> partitions)
{
delegate.acquireSharedReadLock(queryId, transactionId, fullTables, partitions);
delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions);
}

@Override
Expand All @@ -393,14 +394,15 @@ public long allocateWriteId(String dbName, String tableName, long transactionId)

@Override
public void acquireTableWriteLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
String dbName,
String tableName,
DataOperationType operation,
boolean isDynamicPartitionWrite)
{
delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite);
delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ default void updatePartitionStatistics(Table table, String partitionName, Functi
*/
Set<HivePrivilegeInfo> listTablePrivileges(String databaseName, String tableName, Optional<String> tableOwner, Optional<HivePrincipal> principal);

default long openTransaction()
default long openTransaction(AcidTransactionOwner transactionOwner)
{
throw new UnsupportedOperationException();
}
Expand All @@ -162,7 +162,12 @@ default void sendTransactionHeartbeat(long transactionId)
throw new UnsupportedOperationException();
}

default void acquireSharedReadLock(String queryId, long transactionId, List<SchemaTableName> fullTables, List<HivePartition> partitions)
default void acquireSharedReadLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
List<SchemaTableName> fullTables,
List<HivePartition> partitions)
{
throw new UnsupportedOperationException();
}
Expand All @@ -182,7 +187,14 @@ default long allocateWriteId(String dbName, String tableName, long transactionId
throw new UnsupportedOperationException();
}

default void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isDynamicPartitionWrite)
default void acquireTableWriteLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
String dbName,
String tableName,
DataOperationType operation,
boolean isDynamicPartitionWrite)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public AcidTransaction getTransaction()
return transaction;
}

public ValidTxnWriteIdList getValidWriteIds(HiveMetastoreClosure metastore, HiveTableHandle tableHandle)
public ValidTxnWriteIdList getValidWriteIds(
AcidTransactionOwner transactionOwner,
HiveMetastoreClosure metastore,
HiveTableHandle tableHandle)
{
List<SchemaTableName> lockedTables;
List<HivePartition> lockedPartitions;
Expand All @@ -76,6 +79,7 @@ public ValidTxnWriteIdList getValidWriteIds(HiveMetastoreClosure metastore, Hive

// Different calls for same table might need to lock different partitions so acquire locks every time
metastore.acquireSharedReadLock(
transactionOwner,
queryId,
transactionId,
lockedTables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ private AcidTransaction beginOperation(ConnectorSession session, Table table, Ac
// because we need the writeId in order to write the delta files.
HiveTransaction hiveTransaction = makeHiveTransaction(session, transactionId -> {
acquireTableWriteLock(
new AcidTransactionOwner(session.getUser()),
queryId,
transactionId,
table.getDatabaseName(),
Expand All @@ -1322,7 +1323,8 @@ private HiveTransaction makeHiveTransaction(ConnectorSession session, Function<L
long heartbeatInterval = configuredTransactionHeartbeatInterval
.map(Duration::toMillis)
.orElseGet(this::getServerExpectedHeartbeatIntervalMillis);
long transactionId = delegate.openTransaction();
// TODO consider adding query id to the owner
long transactionId = delegate.openTransaction(new AcidTransactionOwner(session.getUser()));
log.debug("Using hive transaction %s for %s", transactionId, queryId);

ScheduledFuture<?> heartbeatTask = heartbeatExecutor.scheduleAtFixedRate(
Expand Down Expand Up @@ -1357,7 +1359,7 @@ public synchronized Optional<ValidTxnWriteIdList> getValidWriteIds(ConnectorSess
.get());
}

return Optional.of(currentHiveTransaction.get().getValidWriteIds(delegate, tableHandle));
return Optional.of(currentHiveTransaction.get().getValidWriteIds(new AcidTransactionOwner(session.getUser()), delegate, tableHandle));
}

public synchronized void cleanupQuery(ConnectorSession session)
Expand Down Expand Up @@ -3601,9 +3603,16 @@ private long allocateWriteId(String dbName, String tableName, long transactionId
return delegate.allocateWriteId(dbName, tableName, transactionId);
}

private void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isPartitioned)
private void acquireTableWriteLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
String dbName,
String tableName,
DataOperationType operation,
boolean isPartitioned)
{
delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isPartitioned);
delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isPartitioned);
}

public void updateTableWriteId(String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.plugin.hive.PartitionStatistics;
import io.trino.plugin.hive.acid.AcidOperation;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.metastore.AcidTransactionOwner;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HivePartitionName;
Expand Down Expand Up @@ -890,9 +891,9 @@ private Optional<String> loadConfigValue(String name)
}

@Override
public long openTransaction()
public long openTransaction(AcidTransactionOwner transactionOwner)
{
return delegate.openTransaction();
return delegate.openTransaction(transactionOwner);
}

@Override
Expand All @@ -914,9 +915,14 @@ public void sendTransactionHeartbeat(long transactionId)
}

@Override
public void acquireSharedReadLock(String queryId, long transactionId, List<SchemaTableName> fullTables, List<HivePartition> partitions)
public void acquireSharedReadLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
List<SchemaTableName> fullTables,
List<HivePartition> partitions)
{
delegate.acquireSharedReadLock(queryId, transactionId, fullTables, partitions);
delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions);
}

@Override
Expand All @@ -937,14 +943,16 @@ public long allocateWriteId(String dbName, String tableName, long transactionId)
}

@Override
public void acquireTableWriteLock(String queryId,
public void acquireTableWriteLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
String dbName,
String tableName,
DataOperationType operation,
boolean isDynamicPartitionWrite)
{
delegate.acquireTableWriteLock(queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite);
delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.plugin.hive.acid.AcidOperation;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.AcidTransactionOwner;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HivePrincipal;
Expand Down Expand Up @@ -471,9 +472,9 @@ public Optional<String> getConfigValue(String name)
}

@Override
public long openTransaction()
public long openTransaction(AcidTransactionOwner transactionOwner)
{
return delegate.openTransaction(identity);
return delegate.openTransaction(identity, transactionOwner);
}

@Override
Expand All @@ -495,9 +496,14 @@ public void sendTransactionHeartbeat(long transactionId)
}

@Override
public void acquireSharedReadLock(String queryId, long transactionId, List<SchemaTableName> fullTables, List<HivePartition> partitions)
public void acquireSharedReadLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
List<SchemaTableName> fullTables,
List<HivePartition> partitions)
{
delegate.acquireSharedReadLock(identity, queryId, transactionId, fullTables, partitions);
delegate.acquireSharedReadLock(identity, transactionOwner, queryId, transactionId, fullTables, partitions);
}

@Override
Expand All @@ -513,9 +519,16 @@ public long allocateWriteId(String dbName, String tableName, long transactionId)
}

@Override
public void acquireTableWriteLock(String queryId, long transactionId, String dbName, String tableName, DataOperationType operation, boolean isDynamicPartitionWrite)
public void acquireTableWriteLock(
AcidTransactionOwner transactionOwner,
String queryId,
long transactionId,
String dbName,
String tableName,
DataOperationType operation,
boolean isDynamicPartitionWrite)
{
delegate.acquireTableWriteLock(identity, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite);
delegate.acquireTableWriteLock(identity, transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite);
}

@Override
Expand Down
Loading

0 comments on commit b79d485

Please sign in to comment.