Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fetching Hadoop configuration in Iceberg #1459

Merged
merged 2 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion presto-iceberg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ as a time travel feature which lets you query your table's snapshot at a given t
filtered on a property name, similar to how view listing works in `ThriftHiveMetastore`.
* Predicate pushdown is currently broken, which means delete is also broken. The code from the
original `getTableLayouts()` implementation needs to be updated for `applyFilter()`.
* All of the `HdfsContext` calls that use `/tmp` need to be fixed.
* `HiveConfig` needs to be removed. We might need to split out separate config classes in the
Hive connector for the components that are reused in Iceberg.
* We should try to remove `HiveColumnHandle`. This will require replacing or abstracting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.spi.PrestoException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

import java.io.IOException;

import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;

public class HdfsFileIo
implements FileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;

public HdfsFileIo(HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
}

@Override
public InputFile newInputFile(String path)
{
Configuration configuration = environment.getConfiguration(context, new Path(path));
return HadoopInputFile.fromLocation(path, configuration);
}

@Override
public OutputFile newOutputFile(String path)
{
Configuration configuration = environment.getConfiguration(context, new Path(path));
return HadoopOutputFile.fromPath(new Path(path), configuration);
}

@Override
public void deleteFile(String pathString)
{
Path path = new Path(pathString);
try {
environment.getFileSystem(context, path).delete(path, false);
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed to delete file: " + path, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.prestosql.plugin.iceberg;

import io.airlift.log.Logger;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.HiveType;
import io.prestosql.plugin.hive.metastore.Column;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
Expand All @@ -23,7 +25,6 @@
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.TableNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
Expand All @@ -33,7 +34,6 @@
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hive.HiveTypeConverter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
Expand Down Expand Up @@ -92,24 +92,24 @@ public class HiveTableOperations
private boolean shouldRefresh = true;
private int version = -1;

public HiveTableOperations(Configuration configuration, HiveMetastore metastore, String database, String table)
public HiveTableOperations(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String database, String table)
{
this(configuration, metastore, database, table, Optional.empty(), Optional.empty());
this(new HdfsFileIo(hdfsEnvironment, hdfsContext), metastore, database, table, Optional.empty(), Optional.empty());
}

public HiveTableOperations(Configuration configuration, HiveMetastore metastore, String database, String table, String owner, String location)
public HiveTableOperations(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String database, String table, String owner, String location)
{
this(configuration,
this(new HdfsFileIo(hdfsEnvironment, hdfsContext),
metastore,
database,
table,
Optional.of(requireNonNull(owner, "owner is null")),
Optional.of(requireNonNull(location, "location is null")));
}

private HiveTableOperations(Configuration configuration, HiveMetastore metastore, String database, String table, Optional<String> owner, Optional<String> location)
private HiveTableOperations(FileIO fileIo, HiveMetastore metastore, String database, String table, Optional<String> owner, Optional<String> location)
{
this.fileIo = new HadoopFileIO(configuration);
this.fileIo = requireNonNull(fileIo, "fileIo is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.database = requireNonNull(database, "database is null");
this.tableName = requireNonNull(table, "table is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl
{
IcebergTableHandle table = IcebergTableHandle.from(tableName);
if (table.getTableType() == TableType.PARTITIONS) {
Configuration configuration = getConfiguration(session, tableName.getSchemaName());
org.apache.iceberg.Table icebergTable = getIcebergTable(table.getSchemaName(), table.getTableName(), configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());
return Optional.of(new PartitionTable(table, session, typeManager, icebergTable));
}
return Optional.empty();
Expand Down Expand Up @@ -187,8 +186,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Configuration configuration = getConfiguration(session, table.getSchemaName());
org.apache.iceberg.Table icebergTable = getIcebergTable(table.getSchemaName(), table.getTableName(), configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());
List<HiveColumnHandle> columns = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager);
return columns.stream().collect(toMap(HiveColumnHandle::getName, identity()));
}
Expand Down Expand Up @@ -284,15 +282,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
.orElseThrow(() -> new SchemaNotFoundException(schemaName));

HdfsContext hdfsContext = new HdfsContext(session, schemaName, tableName);
Path targetPath = getTableDefaultLocation(database, hdfsContext, hdfsEnvironment, schemaName, tableName);
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, targetPath);
String targetPath = getTableDefaultLocation(database, hdfsContext, hdfsEnvironment, schemaName, tableName).toString();

TableOperations operations = new HiveTableOperations(configuration, metastore, schemaName, tableName, session.getUser(), targetPath.toString());
TableOperations operations = new HiveTableOperations(metastore, hdfsEnvironment, hdfsContext, schemaName, tableName, session.getUser(), targetPath);
if (operations.current() != null) {
throw new TableAlreadyExistsException(schemaTableName);
}

TableMetadata metadata = newTableMetadata(operations, schema, partitionSpec, targetPath.toString());
TableMetadata metadata = newTableMetadata(operations, schema, partitionSpec, targetPath);

transaction = createTableTransaction(operations, metadata);

Expand All @@ -302,7 +299,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
SchemaParser.toJson(metadata.schema()),
PartitionSpecParser.toJson(metadata.spec()),
getColumns(metadata.schema(), metadata.spec(), typeManager),
targetPath.toString(),
targetPath,
getFileFormat(tableMetadata.getProperties()));
}

Expand All @@ -316,8 +313,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Configuration configuration = getConfiguration(session, table.getSchemaName());
org.apache.iceberg.Table icebergTable = getIcebergTable(table.getSchemaName(), table.getTableName(), configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());

for (NestedField column : icebergTable.schema().columns()) {
io.prestosql.spi.type.Type type = toPrestoType(column.type(), typeManager);
Expand Down Expand Up @@ -358,8 +354,11 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,

AppendFiles appendFiles = transaction.newFastAppend();
for (CommitTaskData task : commitTasks) {
HdfsContext context = new HdfsContext(session, table.getSchemaName(), table.getTableName());
Configuration configuration = hdfsEnvironment.getConfiguration(context, new Path(task.getPath()));

DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withInputFile(HadoopInputFile.fromLocation(task.getPath(), getConfiguration(session, table.getSchemaName())))
.withInputFile(HadoopInputFile.fromLocation(task.getPath(), configuration))
.withFormat(table.getFileFormat())
.withMetrics(task.getMetrics().metrics());

Expand Down Expand Up @@ -405,8 +404,7 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Configuration configuration = getConfiguration(session, handle.getSchemaName());
org.apache.iceberg.Table icebergTable = getIcebergTable(handle.getSchemaName(), handle.getTableName(), configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName());
icebergTable.updateSchema().addColumn(column.getName(), toIcebergType(column.getType())).commit();
}

Expand All @@ -415,8 +413,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
HiveColumnHandle handle = (HiveColumnHandle) column;
Configuration configuration = getConfiguration(session, icebergTableHandle.getSchemaName());
org.apache.iceberg.Table icebergTable = getIcebergTable(icebergTableHandle.getSchemaName(), icebergTableHandle.getTableName(), configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, icebergTableHandle.getSchemaTableName());
icebergTable.updateSchema().deleteColumn(handle.getName()).commit();
}

Expand All @@ -425,23 +422,17 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
HiveColumnHandle columnHandle = (HiveColumnHandle) source;
Configuration configuration = getConfiguration(session, icebergTableHandle.getSchemaName());
org.apache.iceberg.Table icebergTable = getIcebergTable(icebergTableHandle.getSchemaName(), icebergTableHandle.getTableName(), configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, icebergTableHandle.getSchemaTableName());
icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit();
}

private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName schemaTableName)
private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table)
{
String schema = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();

if (!metastore.getTable(schema, tableName).isPresent()) {
throw new TableNotFoundException(schemaTableName);
if (!metastore.getTable(table.getSchemaName(), table.getTableName()).isPresent()) {
throw new TableNotFoundException(table);
}

Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session, schema), new Path("file:///tmp"));

org.apache.iceberg.Table icebergTable = getIcebergTable(schema, tableName, configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table);

List<ColumnMetadata> columns = getColumnMetadatas(icebergTable);

Expand All @@ -451,7 +442,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema
properties.put(PARTITIONING_PROPERTY, toPartitionFields(icebergTable.spec()));
}

return new ConnectorTableMetadata(schemaTableName, columns, properties.build(), Optional.empty());
return new ConnectorTableMetadata(table, columns, properties.build(), Optional.empty());
}

private List<ColumnMetadata> getColumnMetadatas(org.apache.iceberg.Table table)
Expand Down Expand Up @@ -479,11 +470,6 @@ private static Schema toIcebergSchema(List<ColumnMetadata> columns)
return TypeUtil.assignFreshIds(schema, nextFieldId::getAndIncrement);
}

private Configuration getConfiguration(ConnectorSession session, String schemaName)
{
return hdfsEnvironment.getConfiguration(new HdfsContext(session, schemaName), new Path("file:///tmp"));
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand All @@ -507,8 +493,8 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;

Configuration configuration = getConfiguration(session, handle.getSchemaName());
org.apache.iceberg.Table icebergTable = getIcebergTable(handle.getSchemaName(), handle.getTableName(), configuration, metastore);
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName());

icebergTable.newDelete()
.deleteFromRowFilter(toIcebergExpression(handle.getPredicate(), session))
.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@
package io.prestosql.plugin.iceberg;

import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;

Expand Down Expand Up @@ -52,8 +49,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
IcebergTableHandle table = (IcebergTableHandle) handle;

HiveMetastore metastore = transactionManager.get(transaction).getMetastore();
Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session, table.getSchemaName()), new Path("file:///tmp"));
Table icebergTable = getIcebergTable(table.getSchemaName(), table.getTableName(), configuration, metastore);
Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());

TableScan tableScan = getTableScan(session, table.getPredicate(), table.getSnapshotId(), icebergTable);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveColumnHandle.ColumnType;
import io.prestosql.plugin.hive.HiveType;
import io.prestosql.plugin.hive.HiveTypeTranslator;
import io.prestosql.plugin.hive.TypeTranslator;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
Expand Down Expand Up @@ -66,10 +68,11 @@ public static boolean isIcebergTable(io.prestosql.plugin.hive.metastore.Table ta
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}

public static Table getIcebergTable(String database, String tableName, Configuration configuration, HiveMetastore metastore)
public static Table getIcebergTable(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, ConnectorSession session, SchemaTableName table)
{
TableOperations operations = new HiveTableOperations(configuration, metastore, database, tableName);
return new BaseTable(operations, database + "." + tableName);
HdfsContext context = new HdfsContext(session, table.getSchemaName(), table.getTableName());
TableOperations operations = new HiveTableOperations(metastore, hdfsEnvironment, context, table.getSchemaName(), table.getTableName());
return new BaseTable(operations, table.getSchemaName() + "." + table.getTableName());
}

public static List<HiveColumnHandle> getColumns(Schema schema, PartitionSpec spec, TypeManager typeManager)
Expand Down