Skip to content

Commit

Permalink
Split HiveTableOperations into two copies
Browse files Browse the repository at this point in the history
This replaces `HiveTableOperations` with `HiveMetastoreTableOperations`
and `FileMetastoreTableOperations` suitable for `HIVE_METASTORE` and
`TESTING_FILE_METASTORE` Iceberg catalogs respectively, along with
necessary interfaces.
  • Loading branch information
findepi committed Oct 15, 2021
1 parent 8a59a45 commit 63d4340
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastoreModule;
import io.trino.plugin.hive.metastore.cache.ForCachingHiveMetastore;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;

Expand Down Expand Up @@ -49,7 +51,7 @@ protected void setup(Binder binder)
if (metastore.isPresent()) {
binder.bind(HiveMetastore.class).annotatedWith(ForCachingHiveMetastore.class).toInstance(metastore.get());
install(new CachingHiveMetastoreModule());
binder.bind(HiveTableOperationsProvider.class).in(Scopes.SINGLETON);
binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON);
}
else {
bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.airlift.slice.SliceUtf8;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -107,7 +109,7 @@
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;

final class IcebergUtil
public final class IcebergUtil
{
private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*");

Expand All @@ -118,7 +120,7 @@ public static boolean isIcebergTable(io.trino.plugin.hive.metastore.Table table)
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}

public static Table loadIcebergTable(HiveMetastore metastore, HiveTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
{
TableOperations operations = tableOperationsProvider.createTableOperations(
metastore,
Expand All @@ -134,12 +136,12 @@ public static Table loadIcebergTable(HiveMetastore metastore, HiveTableOperation

public static Table getIcebergTableWithMetadata(
HiveMetastore metastore,
HiveTableOperationsProvider tableOperationsProvider,
IcebergTableOperationsProvider tableOperationsProvider,
ConnectorSession session,
SchemaTableName table,
TableMetadata tableMetadata)
{
HiveTableOperations operations = (HiveTableOperations) tableOperationsProvider.createTableOperations(
IcebergTableOperations operations = tableOperationsProvider.createTableOperations(
metastore,
new HdfsContext(session),
session.getQueryId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.type.TypeManager;

Expand All @@ -46,7 +47,7 @@ public class TrinoCatalogFactory
private final HiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final HiveTableOperationsProvider tableOperationsProvider;
private final IcebergTableOperationsProvider tableOperationsProvider;
private final String trinoVersion;
private final CatalogType catalogType;
private final boolean isUniqueTableLocation;
Expand All @@ -58,7 +59,7 @@ public TrinoCatalogFactory(
HiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
HiveTableOperationsProvider tableOperationsProvider,
IcebergTableOperationsProvider tableOperationsProvider,
NodeVersion nodeVersion)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.trino.plugin.hive.metastore.HivePrincipal;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnMetadata;
Expand Down Expand Up @@ -152,7 +153,7 @@ class TrinoHiveCatalog
private final HiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final HiveTableOperationsProvider tableOperationsProvider;
private final IcebergTableOperationsProvider tableOperationsProvider;
private final String trinoVersion;
private final boolean useUniqueTableLocation;

Expand All @@ -164,7 +165,7 @@ public TrinoHiveCatalog(
HiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
HiveTableOperationsProvider tableOperationsProvider,
IcebergTableOperationsProvider tableOperationsProvider,
String trinoVersion,
boolean useUniqueTableLocation)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;
package io.trino.plugin.iceberg.catalog;

import io.airlift.log.Logger;
import io.trino.plugin.hive.authentication.HiveIdentity;
Expand All @@ -20,6 +20,7 @@
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.iceberg.UnknownTableTypeException;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -30,7 +31,6 @@
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -69,34 +69,41 @@
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;

@NotThreadSafe
public class HiveTableOperations
implements TableOperations
public abstract class AbstractMetastoreTableOperations
implements IcebergTableOperations
{
private static final Logger log = Logger.get(HiveTableOperations.class);
private static final Logger log = Logger.get(AbstractMetastoreTableOperations.class);

public static final String METADATA_LOCATION = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION = "previous_metadata_location";
private static final String METADATA_FOLDER_NAME = "metadata";
protected static final String METADATA_FOLDER_NAME = "metadata";

private static final StorageFormat STORAGE_FORMAT = StorageFormat.create(
protected static final StorageFormat STORAGE_FORMAT = StorageFormat.create(
LazySimpleSerDe.class.getName(),
FileInputFormat.class.getName(),
FileOutputFormat.class.getName());

private final HiveMetastore metastore;
private final ConnectorSession session;
private final String database;
private final String tableName;
private final Optional<String> owner;
private final Optional<String> location;
private final FileIO fileIo;

private TableMetadata currentMetadata;
private String currentMetadataLocation;
private boolean shouldRefresh = true;
private int version = -1;

HiveTableOperations(FileIO fileIo, HiveMetastore metastore, ConnectorSession session, String database, String table, Optional<String> owner, Optional<String> location)
protected final HiveMetastore metastore;
protected final ConnectorSession session;
protected final String database;
protected final String tableName;
protected final Optional<String> owner;
protected final Optional<String> location;
protected final FileIO fileIo;

protected TableMetadata currentMetadata;
protected String currentMetadataLocation;
protected boolean shouldRefresh = true;
protected int version = -1;

protected AbstractMetastoreTableOperations(
FileIO fileIo,
HiveMetastore metastore,
ConnectorSession session,
String database,
String table,
Optional<String> owner,
Optional<String> location)
{
this.fileIo = requireNonNull(fileIo, "fileIo is null");
this.metastore = requireNonNull(metastore, "metastore is null");
Expand All @@ -107,6 +114,7 @@ public class HiveTableOperations
this.location = requireNonNull(location, "location is null");
}

@Override
public void initializeFromMetadata(TableMetadata tableMetadata)
{
checkState(currentMetadata == null, "already initialized");
Expand Down Expand Up @@ -263,18 +271,18 @@ public LocationProvider locationProvider()
return getLocationProvider(getSchemaTableName(), metadata.location(), metadata.properties());
}

private Table getTable()
protected Table getTable()
{
return metastore.getTable(new HiveIdentity(session), database, tableName)
.orElseThrow(() -> new TableNotFoundException(getSchemaTableName()));
}

private SchemaTableName getSchemaTableName()
protected SchemaTableName getSchemaTableName()
{
return new SchemaTableName(database, tableName);
}

private String writeNewMetadata(TableMetadata metadata, int newVersion)
protected String writeNewMetadata(TableMetadata metadata, int newVersion)
{
String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion);
OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);
Expand All @@ -285,7 +293,7 @@ private String writeNewMetadata(TableMetadata metadata, int newVersion)
return newTableMetadataFilePath;
}

private void refreshFromMetadataLocation(String newLocation)
protected void refreshFromMetadataLocation(String newLocation)
{
// use null-safe equality check because new tables have a null metadata location
if (Objects.equals(currentMetadataLocation, newLocation)) {
Expand All @@ -312,13 +320,13 @@ private void refreshFromMetadataLocation(String newLocation)
shouldRefresh = false;
}

private static String newTableMetadataFilePath(TableMetadata meta, int newVersion)
protected static String newTableMetadataFilePath(TableMetadata meta, int newVersion)
{
String codec = meta.property(METADATA_COMPRESSION, METADATA_COMPRESSION_DEFAULT);
return metadataFileLocation(meta, format("%05d-%s%s", newVersion, randomUUID(), getFileExtension(codec)));
}

private static String metadataFileLocation(TableMetadata metadata, String filename)
protected static String metadataFileLocation(TableMetadata metadata, String filename)
{
String location = metadata.properties().get(WRITE_METADATA_LOCATION);
if (location != null) {
Expand All @@ -327,7 +335,7 @@ private static String metadataFileLocation(TableMetadata metadata, String filena
return format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
}

private static int parseVersion(String metadataLocation)
protected static int parseVersion(String metadataLocation)
{
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
Expand All @@ -340,7 +348,7 @@ private static int parseVersion(String metadataLocation)
}
}

private static List<Column> toHiveColumns(List<NestedField> columns)
protected static List<Column> toHiveColumns(List<NestedField> columns)
{
return columns.stream()
.map(column -> new Column(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog;

import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;

public interface IcebergTableOperations
extends TableOperations
{
void initializeFromMetadata(TableMetadata tableMetadata);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog;

import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.connector.ConnectorSession;

import java.util.Optional;

public interface IcebergTableOperationsProvider
{
IcebergTableOperations createTableOperations(
HiveMetastore hiveMetastore,
HdfsContext hdfsContext,
String queryId,
ConnectorSession session,
String database,
String table,
Optional<String> owner,
Optional<String> location);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.file;

import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations;
import io.trino.spi.connector.ConnectorSession;
import org.apache.iceberg.io.FileIO;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Optional;

@NotThreadSafe
public class FileMetastoreTableOperations
extends AbstractMetastoreTableOperations
{
public FileMetastoreTableOperations(
FileIO fileIo,
HiveMetastore metastore,
ConnectorSession session,
String database,
String table,
Optional<String> owner,
Optional<String> location)
{
super(fileIo, metastore, session, database, table, owner, location);
}
}
Loading

0 comments on commit 63d4340

Please sign in to comment.