Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support views in Delta lake Connector #11763

Merged
merged 3 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,12 @@
<exclude>**/TestDeltaLakeAdlsConnectorSmokeTest.java</exclude>
<exclude>**/TestDeltaLakeGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeCleanUpGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeSharedGlueMetastoreViews.java</exclude>
<exclude>**/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java</exclude>
<exclude>**/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeRenameToWithGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeRegisterTableProcedureWithGlue.java</exclude>
<exclude>**/TestDeltaLakeViewsGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeGcsConnectorSmokeTest.java</exclude>
</excludes>
</configuration>
Expand Down Expand Up @@ -474,10 +476,12 @@
<include>**/TestDeltaLakeAdlsConnectorSmokeTest.java</include>
<include>**/TestDeltaLakeGlueMetastore.java</include>
<include>**/TestDeltaLakeCleanUpGlueMetastore.java</include>
<include>**/TestDeltaLakeSharedGlueMetastoreViews.java</include>
<include>**/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java</include>
<include>**/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java</include>
<include>**/TestDeltaLakeRenameToWithGlueMetastore.java</include>
<include>**/TestDeltaLakeRegisterTableProcedureWithGlue.java</include>
<include>**/TestDeltaLakeViewsGlueMetastore.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.SchemaAlreadyExistsException;
import io.trino.plugin.hive.TableAlreadyExistsException;
import io.trino.plugin.hive.TrinoViewHiveMetastore;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HivePrincipal;
Expand Down Expand Up @@ -82,6 +83,7 @@
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
Expand Down Expand Up @@ -284,6 +286,7 @@ public class DeltaLakeMetadata
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final AccessControlMetadata accessControlMetadata;
private final TrinoViewHiveMetastore trinoViewHiveMetastore;
private final CheckpointWriterManager checkpointWriterManager;
private final long defaultCheckpointInterval;
private final int domainCompactionThreshold;
Expand All @@ -307,6 +310,7 @@ public DeltaLakeMetadata(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
AccessControlMetadata accessControlMetadata,
TrinoViewHiveMetastore trinoViewHiveMetastore,
int domainCompactionThreshold,
boolean unsafeWritesEnabled,
JsonCodec<DataFileInfo> dataFileInfoCodec,
Expand All @@ -327,6 +331,7 @@ public DeltaLakeMetadata(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.accessControlMetadata = requireNonNull(accessControlMetadata, "accessControlMetadata is null");
this.trinoViewHiveMetastore = requireNonNull(trinoViewHiveMetastore, "trinoViewHiveMetastore is null");
this.domainCompactionThreshold = domainCompactionThreshold;
this.unsafeWritesEnabled = unsafeWritesEnabled;
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
Expand Down Expand Up @@ -2075,6 +2080,36 @@ public Map<String, Object> getSchemaProperties(ConnectorSession session, Catalog
return db.map(DeltaLakeSchemaProperties::fromDatabase).orElseThrow(() -> new SchemaNotFoundException(schema));
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
{
trinoViewHiveMetastore.createView(session, viewName, definition, replace);
}

@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
trinoViewHiveMetastore.dropView(viewName);
}

@Override
public List<SchemaTableName> listViews(ConnectorSession session, Optional<String> schemaName)
{
return trinoViewHiveMetastore.listViews(schemaName);
}

@Override
public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession session, Optional<String> schemaName)
{
return trinoViewHiveMetastore.getViews(schemaName);
}

@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
return trinoViewHiveMetastore.getView(viewName);
}

@Override
public void createRole(ConnectorSession session, String role, Optional<TrinoPrincipal> grantor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.TrinoViewHiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.plugin.hive.security.AccessControlMetadata;
import io.trino.spi.NodeManager;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
Expand Down Expand Up @@ -58,6 +61,7 @@ public class DeltaLakeMetadataFactory
private final boolean useUniqueTableLocation;

private final boolean allowManagedTableRename;
private final String trinoVersion;

@Inject
public DeltaLakeMetadataFactory(
Expand All @@ -76,7 +80,8 @@ public DeltaLakeMetadataFactory(
CheckpointWriterManager checkpointWriterManager,
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
CachingExtendedStatisticsAccess statisticsAccess,
@AllowDeltaLakeManagedTableRename boolean allowManagedTableRename)
@AllowDeltaLakeManagedTableRename boolean allowManagedTableRename,
NodeVersion nodeVersion)
{
this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -99,6 +104,7 @@ public DeltaLakeMetadataFactory(
this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback();
this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation();
this.allowManagedTableRename = allowManagedTableRename;
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}

public DeltaLakeMetadata create(ConnectorIdentity identity)
Expand All @@ -107,18 +113,25 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
CachingHiveMetastore cachingHiveMetastore = memoizeMetastore(
hiveMetastoreFactory.createMetastore(Optional.of(identity)),
perTransactionMetastoreCacheMaximumSize);
AccessControlMetadata accessControlMetadata = accessControlMetadataFactory.create(cachingHiveMetastore);
HiveMetastoreBackedDeltaLakeMetastore deltaLakeMetastore = new HiveMetastoreBackedDeltaLakeMetastore(
cachingHiveMetastore,
transactionLogAccess,
typeManager,
statisticsAccess,
fileSystemFactory);
TrinoViewHiveMetastore trinoViewHiveMetastore = new TrinoViewHiveMetastore(
cachingHiveMetastore,
accessControlMetadata.isUsingSystemSecurity(),
trinoVersion,
"Trino Delta Lake connector");
return new DeltaLakeMetadata(
deltaLakeMetastore,
fileSystemFactory,
hdfsEnvironment,
typeManager,
accessControlMetadataFactory.create(cachingHiveMetastore),
accessControlMetadata,
trinoViewHiveMetastore,
domainCompactionThreshold,
unsafeWritesEnabled,
dataFileInfoCodec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ protected QueryRunner createQueryRunner()
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is missing a test where Trino views for Delta gets created on a Glue backed metastore.
Consider either such a test either in trino-delta-lake.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added

switch (connectorBehavior) {
case SUPPORTS_CREATE_VIEW:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up to this PR, could you please create an issue (with good-first-issue label) to add support for comments on Delta views ?

return true;

case SUPPORTS_RENAME_SCHEMA:
return false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_DELETE:
case SUPPORTS_UPDATE:
case SUPPORTS_MERGE:
case SUPPORTS_CREATE_VIEW:
return true;

default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;

/**
* Tests querying views on a schema which has a mix of Hive and Delta Lake tables.
*/
public abstract class BaseDeltaLakeSharedMetastoreViewsTest
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
extends AbstractTestQueryFramework
{
protected static final String DELTA_CATALOG_NAME = "delta_lake";
protected static final String HIVE_CATALOG_NAME = "hive";
protected static final String SCHEMA = "test_shared_schema_views_" + randomNameSuffix();

private String dataDirectory;
private HiveMetastore metastore;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(DELTA_CATALOG_NAME)
.setSchema(SCHEMA)
.build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build();

this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString();
this.metastore = createTestMetastore(dataDirectory);

queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE));
queryRunner.createCatalog(DELTA_CATALOG_NAME, "delta-lake");

queryRunner.installPlugin(new TestingHivePlugin(metastore));

ImmutableMap<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("hive.allow-drop-table", "true")
.buildOrThrow();

queryRunner.createCatalog(HIVE_CATALOG_NAME, "hive", hiveProperties);
queryRunner.execute("CREATE SCHEMA " + SCHEMA);

return queryRunner;
}

protected abstract HiveMetastore createTestMetastore(String dataDirectory);

ebyhr marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testViewWithLiteralColumnCreatedInDeltaLakeIsReadableInHive()
{
String deltaViewName = "delta_view_" + randomNameSuffix();
String deltaView = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, deltaViewName);
String deltaViewOnHiveCatalog = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, deltaViewName);
try {
assertUpdate(format("CREATE VIEW %s AS SELECT 1 bee", deltaView));
assertQuery(format("SELECT * FROM %s", deltaView), "VALUES 1");
assertQuery(format("SELECT * FROM %s", deltaViewOnHiveCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", HIVE_CATALOG_NAME, deltaViewName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP VIEW IF EXISTS %s", deltaView));
}
}

@Test
public void testViewOnDeltaLakeTableCreatedInDeltaLakeIsReadableInHive()
{
String deltaTableName = "delta_table_" + randomNameSuffix();
String deltaTable = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, deltaTableName);
String deltaViewName = "delta_view_" + randomNameSuffix();
String deltaView = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, deltaViewName);
String deltaViewOnHiveCatalog = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, deltaViewName);
try {
assertUpdate(format("CREATE TABLE %s AS SELECT 1 bee", deltaTable), 1);
assertUpdate(format("CREATE VIEW %s AS SELECT * from %s", deltaView, deltaTable));
assertQuery(format("SELECT * FROM %s", deltaView), "VALUES 1");
assertQuery(format("SELECT * FROM %s", deltaViewOnHiveCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", HIVE_CATALOG_NAME, deltaViewName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP TABLE IF EXISTS %s", deltaTable));
assertUpdate(format("DROP VIEW IF EXISTS %s", deltaView));
}
}

@Test
public void testViewWithLiteralColumnCreatedInHiveIsReadableInDeltaLake()
{
String trinoViewOnHiveName = "trino_view_on_hive_" + randomNameSuffix();
String trinoViewOnHive = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
String trinoViewOnHiveOnDeltaCatalog = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
try {
assertUpdate(format("CREATE VIEW %s AS SELECT 1 bee", trinoViewOnHive));
assertQuery(format("SELECT * FROM %s", trinoViewOnHive), "VALUES 1");
assertQuery(format("SELECT * FROM %s", trinoViewOnHiveOnDeltaCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", HIVE_CATALOG_NAME, trinoViewOnHiveName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP VIEW IF EXISTS %s", trinoViewOnHive));
}
}

@Test
public void testViewOnHiveTableCreatedInHiveIsReadableInDeltaLake()
{
String hiveTableName = "hive_table_" + randomNameSuffix();
String hiveTable = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, hiveTableName);
String trinoViewOnHiveName = "trino_view_on_hive_" + randomNameSuffix();
String trinoViewOnHive = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
String trinoViewOnHiveOnDeltaCatalog = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
try {
assertUpdate(format("CREATE TABLE %s AS SELECT 1 bee", hiveTable), 1);
assertUpdate(format("CREATE VIEW %s AS SELECT 1 bee", trinoViewOnHive));
assertQuery(format("SELECT * FROM %s", trinoViewOnHive), "VALUES 1");
assertQuery(format("SELECT * FROM %s", trinoViewOnHiveOnDeltaCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", DELTA_CATALOG_NAME, trinoViewOnHiveName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP TABLE IF EXISTS %s", hiveTable));
assertUpdate(format("DROP VIEW IF EXISTS %s", trinoViewOnHive));
}
}

@AfterClass(alwaysRun = true)
public void cleanup()
throws IOException
{
if (metastore != null) {
metastore.dropDatabase(SCHEMA, false);
deleteRecursively(Path.of(dataDirectory), ALLOW_INSECURE);
}
}
}
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.hive.metastore.HiveMetastore;

import java.io.File;

import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;

public class TestDeltaLakeSharedFileMetastoreViews
extends BaseDeltaLakeSharedMetastoreViewsTest
{
@Override
protected HiveMetastore createTestMetastore(String dataDirectory)
{
return createTestingFileHiveMetastore(new File(dataDirectory));
}
}
Loading