Skip to content

Commit

Permalink
Support table redirections from Hive to Delta Lake
Browse files Browse the repository at this point in the history
The Hive connector can make use of the `hive.delta-lake-catalog-name`
configuration property for enable table redirects towards the
Delta Lake tables.
  • Loading branch information
findinpath authored and findepi committed Apr 20, 2022
1 parent 513d193 commit dcaca0a
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 3 deletions.
8 changes: 8 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<scope>runtime</scope>
</dependency>

<!-- Trino SPI -->
<dependency>
<groupId>io.trino</groupId>
Expand Down Expand Up @@ -411,6 +417,7 @@
<exclude>**/TestDeltaLakeAdlsConnectorSmokeTest.java</exclude>
<exclude>**/TestDeltaLakeGlueMetastore.java</exclude>
<exclude>**/TestDelta*FailureRecoveryTest.java</exclude>
<exclude>**/TestDeltaLakeSharedGlueMetastore.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -442,6 +449,7 @@
<include>**/TestDeltaLakeAdlsStorage.java</include>
<include>**/TestDeltaLakeAdlsConnectorSmokeTest.java</include>
<include>**/TestDeltaLakeGlueMetastore.java</include>
<include>**/TestDeltaLakeSharedGlueMetastore.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.testing.AbstractTestQueryFramework;
import org.testng.annotations.Test;

import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public abstract class BaseDeltaLakeSharedMetastoreTest
extends AbstractTestQueryFramework
{
protected final String schema = "test_shared_schema_" + randomTableSuffix();

protected abstract String getExpectedHiveCreateSchema(String catalogName);

protected abstract String getExpectedDeltaLakeCreateSchema(String catalogName);

@Test
public void testReadInformationSchema()
{
assertThat(query("SELECT table_schema FROM hive.information_schema.tables WHERE table_name = 'region' AND table_schema='" + schema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM delta.information_schema.tables WHERE table_name = 'nation' AND table_schema='" + schema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'region' AND table_schema='" + schema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM hive_with_redirections.information_schema.tables WHERE table_name = 'nation' AND table_schema='" + schema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SELECT table_schema FROM delta_with_redirections.information_schema.tables WHERE table_name = 'region' AND table_schema='" + schema + "'"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");

assertQuery("SELECT table_name, column_name from hive.information_schema.columns WHERE table_schema = '" + schema + "'",
"VALUES ('region', 'regionkey'), ('region', 'name'), ('region', 'comment')");
assertThatThrownBy(() -> computeActual("SELECT table_name, column_name from delta.information_schema.columns WHERE table_schema = '" + schema + "'"))
.hasMessageContaining(format("%s.region is not a Delta Lake table", schema));
assertQuery("SELECT table_name, column_name from hive_with_redirections.information_schema.columns WHERE table_schema = '" + schema + "'",
"VALUES" +
"('region', 'regionkey'), ('region', 'name'), ('region', 'comment'), " +
"('nation', 'nationkey'), ('nation', 'name'), ('nation', 'regionkey'), ('nation', 'comment')");
assertQuery("SELECT table_name, column_name from delta_with_redirections.information_schema.columns WHERE table_schema = '" + schema + "'",
"VALUES" +
"('region', 'regionkey'), ('region', 'name'), ('region', 'comment'), " +
"('nation', 'nationkey'), ('nation', 'name'), ('nation', 'regionkey'), ('nation', 'comment')");
}

@Test
public void testSelect()
{
assertQuery("SELECT * FROM delta." + schema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM hive." + schema + ".region", "SELECT * FROM region");
assertQuery("SELECT * FROM hive_with_redirections." + schema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM hive_with_redirections." + schema + ".region", "SELECT * FROM region");
assertQuery("SELECT * FROM delta_with_redirections." + schema + ".nation", "SELECT * FROM nation");
assertQuery("SELECT * FROM delta_with_redirections." + schema + ".region", "SELECT * FROM region");

assertThatThrownBy(() -> query("SELECT * FROM delta." + schema + ".region"))
.hasMessageContaining("not a Delta Lake table");
assertThatThrownBy(() -> query("SELECT * FROM hive." + schema + ".nation"))
.hasMessageContaining("Cannot query Delta Lake table");
}

@Test
public void testShowTables()
{
assertQuery("SHOW TABLES FROM delta." + schema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM hive." + schema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM hive_with_redirections." + schema, "VALUES 'region', 'nation'");
assertQuery("SHOW TABLES FROM delta_with_redirections." + schema, "VALUES 'region', 'nation'");

assertThatThrownBy(() -> query("SHOW CREATE TABLE delta." + schema + ".region"))
.hasMessageContaining("not a Delta Lake table");
assertThatThrownBy(() -> query("SHOW CREATE TABLE hive." + schema + ".nation"))
.hasMessageContaining("Cannot query Delta Lake table");

assertThatThrownBy(() -> query("DESCRIBE delta." + schema + ".region"))
.hasMessageContaining("not a Delta Lake table");
assertThatThrownBy(() -> query("DESCRIBE hive." + schema + ".nation"))
.hasMessageContaining("Cannot query Delta Lake table");
}

@Test
public void testShowSchemas()
{
assertThat(query("SHOW SCHEMAS FROM hive"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SHOW SCHEMAS FROM delta"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");
assertThat(query("SHOW SCHEMAS FROM hive_with_redirections"))
.skippingTypesCheck()
.containsAll("VALUES '" + schema + "'");

String showCreateHiveSchema = (String) computeActual("SHOW CREATE SCHEMA hive." + schema).getOnlyValue();
assertEquals(
showCreateHiveSchema,
getExpectedHiveCreateSchema("hive"));
String showCreateDeltaLakeSchema = (String) computeActual("SHOW CREATE SCHEMA delta." + schema).getOnlyValue();
assertEquals(
showCreateDeltaLakeSchema,
getExpectedDeltaLakeCreateSchema("delta"));
String showCreateHiveWithRedirectionsSchema = (String) computeActual("SHOW CREATE SCHEMA hive_with_redirections." + schema).getOnlyValue();
assertEquals(
showCreateHiveWithRedirectionsSchema,
getExpectedHiveCreateSchema("hive_with_redirections"));
String showCreateDeltaLakeWithRedirectionsSchema = (String) computeActual("SHOW CREATE SCHEMA delta_with_redirections." + schema).getOnlyValue();
assertEquals(
showCreateDeltaLakeWithRedirectionsSchema,
getExpectedDeltaLakeCreateSchema("delta_with_redirections"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.trino.Session;
import io.trino.plugin.hive.HdfsConfig;
import io.trino.plugin.hive.HdfsConfigurationInitializer;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveHdfsConfiguration;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.glue.DefaultGlueColumnStatisticsProviderFactory;
import io.trino.plugin.hive.metastore.glue.GlueHiveMetastore;
import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;
import org.testng.annotations.AfterClass;

import java.nio.file.Path;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.testing.QueryAssertions.copyTpchTables;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;

/**
* Tests metadata operations on a schema which has a mix of Hive and Delta Lake tables.
* <p>
* Requires AWS credentials, which can be provided any way supported by the DefaultProviderChain
* See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default
*/
public class TestDeltaLakeSharedGlueMetastore
extends BaseDeltaLakeSharedMetastoreTest
{
private static final Logger LOG = Logger.get(TestDeltaLakeSharedGlueMetastore.class);

private Path dataDirectory;
private HiveMetastore glueMetastore;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Session deltaLakeSession = testSessionBuilder()
.setCatalog("delta")
.setSchema(schema)
.build();
Session hiveSession = testSessionBuilder()
.setCatalog("hive")
.setSchema(schema)
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(deltaLakeSession).build();

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data");
this.dataDirectory.toFile().deleteOnExit();

queryRunner.installPlugin(new DeltaLakePlugin());
queryRunner.createCatalog(
"delta",
"delta-lake",
ImmutableMap.<String, String>builder()
.put("hive.metastore", "glue")
.put("hive.metastore.glue.default-warehouse-dir", dataDirectory.toString())
.buildOrThrow());
queryRunner.createCatalog(
"delta_with_redirections",
"delta-lake",
ImmutableMap.<String, String>builder()
.put("hive.metastore", "glue")
.put("hive.metastore.glue.default-warehouse-dir", dataDirectory.toString())
.put("delta.hive-catalog-name", "hive")
.buildOrThrow());

HdfsConfig hdfsConfig = new HdfsConfig();
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(
new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()),
hdfsConfig,
new NoHdfsAuthentication());
this.glueMetastore = new GlueHiveMetastore(
hdfsEnvironment,
new GlueHiveMetastoreConfig(),
DefaultAWSCredentialsProviderChain.getInstance(),
directExecutor(),
new DefaultGlueColumnStatisticsProviderFactory(directExecutor(), directExecutor()),
Optional.empty(),
table -> true);
queryRunner.installPlugin(new TestingHivePlugin(glueMetastore));
queryRunner.createCatalog("hive", "hive");
queryRunner.createCatalog(
"hive_with_redirections",
"hive",
ImmutableMap.of("hive.delta-lake-catalog-name", "delta"));

queryRunner.execute("CREATE SCHEMA " + schema + " WITH (location = '" + dataDirectory.toString() + "')");
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, deltaLakeSession, ImmutableList.of(TpchTable.NATION));
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, hiveSession, ImmutableList.of(TpchTable.REGION));

return queryRunner;
}

@AfterClass(alwaysRun = true)
public void cleanup()
{
try {
if (glueMetastore != null) {
// Data is on the local disk and will be deleted by the deleteOnExit hook
glueMetastore.dropDatabase(schema, false);
}
}
catch (Exception e) {
LOG.error(e, "Failed to clean up Glue database: %s", schema);
}
}

@Override
protected String getExpectedHiveCreateSchema(String catalogName)
{
String expectedHiveCreateSchema = "CREATE SCHEMA %s.%s\n" +
"AUTHORIZATION ROLE public\n" +
"WITH (\n" +
" location = '%s'\n" +
")";

return format(expectedHiveCreateSchema, catalogName, schema, dataDirectory);
}

@Override
protected String getExpectedDeltaLakeCreateSchema(String catalogName)
{
String expectedDeltaLakeCreateSchema = "CREATE SCHEMA %s.%s\n" +
"WITH (\n" +
" location = '%s'\n" +
")";
return format(expectedDeltaLakeCreateSchema, catalogName, schema, dataDirectory, schema);
}
}
Loading

0 comments on commit dcaca0a

Please sign in to comment.