Skip to content

Commit

Permalink
Fix HDFS impersonation in Iceberg Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
lxynov authored and electrum committed Jul 29, 2020
1 parent e313142 commit 6d7f350
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.spi.PrestoException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

import java.io.IOException;

import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;

public class HdfsFileIo
Expand All @@ -44,26 +41,24 @@ public HdfsFileIo(HdfsEnvironment environment, HdfsContext context)
@Override
public InputFile newInputFile(String path)
{
Configuration configuration = environment.getConfiguration(context, new Path(path));
return HadoopInputFile.fromLocation(path, configuration);
return new HdfsInputFile(new Path(path), environment, context);
}

@Override
public OutputFile newOutputFile(String path)
{
Configuration configuration = environment.getConfiguration(context, new Path(path));
return HadoopOutputFile.fromPath(new Path(path), configuration);
return new HdfsOutputFile(new Path(path), environment, context);
}

@Override
public void deleteFile(String pathString)
{
Path path = new Path(pathString);
try {
environment.getFileSystem(context, path).delete(path, false);
environment.doAs(context.getIdentity().getUser(), () -> environment.getFileSystem(context, path).delete(path, false));
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed to delete file: " + path, e);
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete file: " + path, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.spi.PrestoException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;

import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;

public class HdfsInputFile
implements InputFile
{
private final InputFile delegate;
private final HdfsEnvironment environment;
private final String user;

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
{
requireNonNull(path, "path is null");
this.environment = requireNonNull(environment, "environment is null");
requireNonNull(context, "context is null");
try {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create input file: " + path, e);
}
this.user = context.getIdentity().getUser();
}

@Override
public long getLength()
{
return environment.doAs(user, delegate::getLength);
}

@Override
public SeekableInputStream newStream()
{
return environment.doAs(user, delegate::newStream);
}

@Override
public String location()
{
return delegate.location();
}

@Override
public boolean exists()
{
return environment.doAs(user, delegate::exists);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.spi.PrestoException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;

import java.io.IOException;

import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;

public class HdfsOutputFile
implements OutputFile
{
private final OutputFile delegate;
private final Path path;
private final HdfsEnvironment environment;
private final HdfsContext context;
private final String user;

public HdfsOutputFile(Path path, HdfsEnvironment environment, HdfsContext context)
{
this.path = requireNonNull(path, "path is null");
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
try {
this.delegate = HadoopOutputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create output file: " + path.toString(), e);
}
this.user = context.getIdentity().getUser();
}

@Override
public PositionOutputStream create()
{
return environment.doAs(user, delegate::create);
}

@Override
public PositionOutputStream createOrOverwrite()
{
return environment.doAs(user, delegate::createOrOverwrite);
}

@Override
public String location()
{
return delegate.location();
}

@Override
public InputFile toInputFile()
{
return new HdfsInputFile(path, environment, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.prestosql.parquet.writer.ParquetWriterOptions;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.NodeVersion;
import io.prestosql.plugin.hive.orc.HdfsOrcDataSource;
import io.prestosql.plugin.hive.orc.OrcWriterConfig;
Expand Down Expand Up @@ -108,11 +109,12 @@ public IcebergFileWriter createFileWriter(
Schema icebergSchema,
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext,
FileFormat fileFormat)
{
switch (fileFormat) {
case PARQUET:
return createParquetWriter(outputPath, icebergSchema, jobConf, session);
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
case ORC:
return createOrcWriter(outputPath, icebergSchema, jobConf, session);
}
Expand All @@ -123,7 +125,8 @@ private IcebergFileWriter createParquetWriter(
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
ConnectorSession session)
ConnectorSession session,
HdfsContext hdfsContext)
{
List<String> fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
Expand All @@ -146,7 +149,7 @@ private IcebergFileWriter createParquetWriter(
.build();

return new IcebergParquetFileWriter(
fileSystem.create(outputPath),
hdfsEnvironment.doAs(session.getUser(), () -> fileSystem.create(outputPath)),
rollbackAction,
fileColumnTypes,
convert(icebergSchema, "table"),
Expand All @@ -155,7 +158,8 @@ private IcebergFileWriter createParquetWriter(
IntStream.range(0, fileColumnNames.size()).toArray(),
getCompressionCodec(session).getParquetCompressionCodec(),
outputPath,
jobConf);
hdfsEnvironment,
hdfsContext);
}
catch (IOException e) {
throw new PrestoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
Expand All @@ -170,9 +174,9 @@ private IcebergFileWriter createOrcWriter(
{
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), outputPath, jobConf);
OrcDataSink orcDataSink = new OutputStreamOrcDataSink(fileSystem.create(outputPath));
OrcDataSink orcDataSink = hdfsEnvironment.doAs(session.getUser(), () -> new OutputStreamOrcDataSink(fileSystem.create(outputPath)));
Callable<Void> rollbackAction = () -> {
fileSystem.delete(outputPath, false);
hdfsEnvironment.doAs(session.getUser(), () -> fileSystem.delete(outputPath, false));
return null;
};

Expand All @@ -191,9 +195,9 @@ private IcebergFileWriter createOrcWriter(
try {
return new HdfsOrcDataSource(
new OrcDataSourceId(outputPath.toString()),
fileSystem.getFileStatus(outputPath).getLen(),
hdfsEnvironment.doAs(session.getUser(), () -> fileSystem.getFileStatus(outputPath).getLen()),
new OrcReaderOptions(),
fileSystem.open(outputPath),
hdfsEnvironment.doAs(session.getUser(), () -> fileSystem.open(outputPath)),
readStats);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.prestosql.spi.security.PrestoPrincipal;
import io.prestosql.spi.statistics.ComputedStatistics;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFiles;
Expand All @@ -65,7 +64,6 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -432,10 +430,9 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
AppendFiles appendFiles = transaction.newFastAppend();
for (CommitTaskData task : commitTasks) {
HdfsContext context = new HdfsContext(session, table.getSchemaName(), table.getTableName());
Configuration configuration = hdfsEnvironment.getConfiguration(context, new Path(task.getPath()));

DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withInputFile(HadoopInputFile.fromLocation(task.getPath(), configuration))
.withInputFile(new HdfsInputFile(new Path(task.getPath()), hdfsEnvironment, context))
.withFormat(table.getFileFormat())
.withMetrics(task.getMetrics().metrics());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class IcebergPageSink
private final String outputPath;
private final IcebergFileWriterFactory fileWriterFactory;
private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
private final JobConf jobConf;
private final JsonCodec<CommitTaskData> jsonCodec;
private final ConnectorSession session;
Expand Down Expand Up @@ -117,7 +118,7 @@ public IcebergPageSink(
this.outputPath = requireNonNull(outputPath, "outputPath is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
requireNonNull(hdfsContext, "hdfsContext is null");
this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null");
this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(outputPath)));
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
Expand Down Expand Up @@ -308,6 +309,7 @@ private WriteContext createWriter(Optional<String> partitionPath, Optional<Parti
outputSchema,
jobConf,
session,
hdfsContext,
fileFormat);

return new WriteContext(writer, outputPath, partitionData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,9 @@ private ConnectorPageSource createDataPageSource(
{
switch (fileFormat) {
case ORC:
FileSystem fileSystem = null;
FileStatus fileStatus = null;
try {
fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
fileStatus = fileSystem.getFileStatus(path);
fileStatus = hdfsEnvironment.doAs(session.getUser(), () -> hdfsEnvironment.getFileSystem(hdfsContext, path).getFileStatus(path));
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, e);
Expand Down Expand Up @@ -358,11 +356,11 @@ private static ConnectorPageSource createParquetPageSource(
ParquetDataSource dataSource = null;
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(user, path, configuration);
FileStatus fileStatus = fileSystem.getFileStatus(path);
FileStatus fileStatus = hdfsEnvironment.doAs(user, () -> fileSystem.getFileStatus(path));
long fileSize = fileStatus.getLen();
FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path));
dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, fileFormatDataSourceStats, options);
ParquetMetadata parquetMetadata = MetadataReader.readFooter(fileSystem, path, fileSize);
ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(fileSystem, path, fileSize));
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();

Expand Down
Loading

0 comments on commit 6d7f350

Please sign in to comment.