Skip to content

Commit

Permalink
Flush transaction log cache in Delta flush_metadata_cache procedure
Browse files Browse the repository at this point in the history
Co-Authored-By: Marius Grama <findinpath@gmail.com>
  • Loading branch information
ebyhr and findinpath committed Mar 22, 2023
1 parent 7158624 commit 3f88633
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 2 deletions.
14 changes: 14 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,20 @@ existing Delta Lake table from the metastores without deleting the data::

CALL example.system.unregister_table(schema_name => 'testdb', table_name => 'customer_orders')

.. _delta-lake-flush-metadata-cache:

Flush metadata cache
^^^^^^^^^^^^^^^^^^^^

* ``system.flush_metadata_cache()``

Flush all metadata caches.

* ``system.flush_metadata_cache(schema_name => ..., table_name => ...)``

Flush metadata caches entries connected with selected table.
Procedure requires named parameters to be passed

.. _delta-lake-write-support:

Updating data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure;
import io.trino.plugin.deltalake.procedure.OptimizeTableProcedure;
import io.trino.plugin.deltalake.procedure.RegisterTableProcedure;
import io.trino.plugin.deltalake.procedure.UnregisterTableProcedure;
Expand Down Expand Up @@ -147,6 +148,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(VacuumProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(FlushMetadataCacheProcedure.class).in(Scopes.SINGLETON);

Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected void setup(Binder binder)
bindMetastoreModule("file", new DeltaLakeFileMetastoreModule());
bindMetastoreModule("glue", new DeltaLakeGlueMetastoreModule());

install(new DecoratedHiveMetastoreModule(true));
install(new DecoratedHiveMetastoreModule(false));
}

private void bindMetastoreModule(String name, Module module)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.procedure.Procedure;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.Optional;

import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.getTableLocation;
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.verifyDeltaLakeTable;
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.invoke.MethodHandles.lookup;
import static java.util.Objects.requireNonNull;

public class FlushMetadataCacheProcedure
implements Provider<Procedure>
{
private static final String PROCEDURE_NAME = "flush_metadata_cache";

private static final String PARAM_SCHEMA_NAME = "SCHEMA_NAME";
private static final String PARAM_TABLE_NAME = "TABLE_NAME";

private final HiveMetastoreFactory metastoreFactory;
private final Optional<CachingHiveMetastore> cachingHiveMetastore;
private final TransactionLogAccess transactionLogAccess;

private static final MethodHandle FLUSH_METADATA_CACHE;

static {
try {
FLUSH_METADATA_CACHE = lookup().unreflect(FlushMetadataCacheProcedure.class.getMethod("flushMetadataCache", ConnectorSession.class, String.class, String.class));
}
catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}

@Inject
public FlushMetadataCacheProcedure(
HiveMetastoreFactory metastoreFactory,
Optional<CachingHiveMetastore> cachingHiveMetastore,
TransactionLogAccess transactionLogAccess)
{
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.cachingHiveMetastore = requireNonNull(cachingHiveMetastore, "cachingHiveMetastore is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
PROCEDURE_NAME,
ImmutableList.of(
new Procedure.Argument(PARAM_SCHEMA_NAME, VARCHAR, false, null),
new Procedure.Argument(PARAM_TABLE_NAME, VARCHAR, false, null)),
FLUSH_METADATA_CACHE.bindTo(this),
true);
}

public void flushMetadataCache(ConnectorSession session, String schemaName, String tableName)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doFlushMetadataCache(session, Optional.ofNullable(schemaName), Optional.ofNullable(tableName));
}
}

private void doFlushMetadataCache(ConnectorSession session, Optional<String> schemaName, Optional<String> tableName)
{
if (schemaName.isEmpty() && tableName.isEmpty()) {
cachingHiveMetastore.ifPresent(CachingHiveMetastore::flushCache);
transactionLogAccess.flushCache();
}
else if (schemaName.isPresent() && tableName.isPresent()) {
HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(session.getIdentity()));
Table table = metastore.getTable(schemaName.get(), tableName.get())
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(schemaName.get(), tableName.get())));
verifyDeltaLakeTable(table);
cachingHiveMetastore.ifPresent(caching -> caching.invalidateTable(table.getDatabaseName(), table.getTableName()));
transactionLogAccess.invalidateCaches(getTableLocation(table));
}
else {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Illegal parameter set passed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ public TableSnapshot loadSnapshot(SchemaTableName table, String tableLocation, C
return snapshot;
}

public void flushCache()
{
tableSnapshots.invalidateAll();
activeDataFileCache.invalidateAll();
}

public void invalidateCaches(String tableLocation)
{
tableSnapshots.invalidate(tableLocation);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.IOException;

import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE;

public class TestDeltaLakeFlushMetadataCacheProcedure
extends AbstractTestQueryFramework
{
private static final String BUCKET_NAME = "delta-lake-test-flush-metadata-cache";

private HiveMetastore metastore;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
HiveMinioDataLake hiveMinioDataLake = new HiveMinioDataLake(BUCKET_NAME, HIVE3_IMAGE);
hiveMinioDataLake.start();
metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());

return createS3DeltaLakeQueryRunner(
DELTA_CATALOG,
"default",
ImmutableMap.of("hive.metastore-cache-ttl", "10m"),
hiveMinioDataLake.getMinio().getMinioAddress(),
hiveMinioDataLake.getHiveHadoop());
}

@AfterClass(alwaysRun = true)
public void tearDown()
throws IOException
{
metastore = null;
}

@Test
public void testFlushMetadataCache()
{
assertUpdate("CREATE SCHEMA cached WITH (location = 's3://" + BUCKET_NAME + "/cached')");
assertUpdate("CREATE TABLE cached.cached AS SELECT * FROM tpch.tiny.nation", 25);

// Verify that column cache is flushed
// Fill caches
assertQuerySucceeds("SELECT name, regionkey FROM cached.cached");

// Verify that table cache is flushed
String showTablesSql = "SHOW TABLES FROM cached";
// Fill caches
assertQuery(showTablesSql, "VALUES 'cached'");

// Rename table outside Trino
metastore.renameTable("cached", "cached", "cached", "renamed");

// Should still return old table name from cache
assertQuery(showTablesSql, "VALUES 'cached'");

// Should return new table name after cache flush
assertUpdate("CALL system.flush_metadata_cache(schema_name => 'cached', table_name => 'cached')");
assertQuery(showTablesSql, "VALUES 'renamed'");

// Verify that schema cache is flushed
String showSchemasSql = "SHOW SCHEMAS FROM delta_lake";
// Fill caches
assertQuery(showSchemasSql, "VALUES ('cached'), ('information_schema'), ('default')");

// Drop a table and a schema outside Trino
metastore.dropTable("cached", "renamed", false);
metastore.dropDatabase("cached", false);

// Should still return old schemas from cache
assertQuery(showSchemasSql, "VALUES ('cached'), ('information_schema'), ('default')");

// Should not return the old schema name after cache flush
assertUpdate("CALL system.flush_metadata_cache()");
assertQuery(showSchemasSql, "VALUES ('information_schema'), ('default')");
}

@Test
public void testFlushMetadataCacheTableNotFound()
{
assertQueryFails(
"CALL system.flush_metadata_cache(schema_name => 'test_not_existing_schema', table_name => 'test_not_existing_table')",
"Table 'test_not_existing_schema.test_not_existing_table' not found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,43 @@ public void testTableSnapshotsActiveDataFilesCache()
"00000000000000000010.checkpoint.parquet", 2));
}

@Test
public void testFlushSnapshotAndActiveFileCache()
throws Exception
{
String tableName = "person";
String tableDir = getClass().getClassLoader().getResource("databricks/" + tableName).toURI().toString();
DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig();
shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(10, TimeUnit.MINUTES));
setupTransactionLogAccess(tableName, tableDir, shortLivedActiveDataFilesCacheConfig);

List<AddFileEntry> addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION);
assertEquals(addFileEntries.size(), 12);
assertThat(accessTrackingFileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf(
ImmutableMap.of(
"_last_checkpoint", 1,
"00000000000000000011.json", 1,
"00000000000000000012.json", 1,
"00000000000000000013.json", 1,
"00000000000000000014.json", 1,
"00000000000000000010.checkpoint.parquet", 2));

// Flush all cache and then load snapshot and get active files
transactionLogAccess.flushCache();
transactionLogAccess.loadSnapshot(new SchemaTableName("schema", tableName), tableDir, SESSION);
addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION);

assertEquals(addFileEntries.size(), 12);
assertThat(accessTrackingFileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf(
ImmutableMap.of(
"_last_checkpoint", 2,
"00000000000000000011.json", 2,
"00000000000000000012.json", 2,
"00000000000000000013.json", 2,
"00000000000000000014.json", 2,
"00000000000000000010.checkpoint.parquet", 4));
}

@Test
public void testTableSnapshotsActiveDataFilesCacheDisabled()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public TestingDeltaLakeMetastoreModule(HiveMetastore metastore)
public void setup(Binder binder)
{
binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(metastore));
install(new DecoratedHiveMetastoreModule(true));
install(new DecoratedHiveMetastoreModule(false));

binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
binder.bind(Key.get(boolean.class, AllowDeltaLakeManagedTableRename.class)).toInstance(true);
Expand Down
Loading

0 comments on commit 3f88633

Please sign in to comment.