Skip to content

Commit

Permalink
Port ParquetFileWriter to use TrinoOutputFile
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and sopel39 committed Nov 17, 2022
1 parent b998f19 commit 0ebcf02
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.writer.ParquetSchemaConverter;
import io.trino.parquet.writer.ParquetWriterOptions;
Expand All @@ -40,7 +39,6 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -83,8 +81,7 @@ public class DeltaLakeMergeSink
{
private static final JsonCodec<List<String>> PARTITIONS_CODEC = listJsonCodec(String.class);

private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final TrinoFileSystem fileSystem;
private final ConnectorSession session;
private final DateTimeZone parquetDateTimeZone;
private final String trinoVersion;
Expand All @@ -99,7 +96,6 @@ public class DeltaLakeMergeSink

public DeltaLakeMergeSink(
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
ConnectorSession session,
DateTimeZone parquetDateTimeZone,
String trinoVersion,
Expand All @@ -110,9 +106,8 @@ public DeltaLakeMergeSink(
ConnectorPageSink insertPageSink,
List<DeltaLakeColumnHandle> tableColumns)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.session = requireNonNull(session, "session is null");
this.fileSystem = fileSystemFactory.create(session);
this.parquetDateTimeZone = requireNonNull(parquetDateTimeZone, "parquetDateTimeZone is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
Expand Down Expand Up @@ -176,11 +171,10 @@ private List<Slice> rewriteFile(Path sourcePath, FileDeletion deletion)
try {
Path rootTablePath = new Path(rootTableLocation);
String sourceRelativePath = rootTablePath.toUri().relativize(sourcePath.toUri()).toString();
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session.getIdentity()), rootTablePath);

Path targetPath = new Path(sourcePath.getParent(), session.getQueryId() + "_" + randomUUID());
String targetRelativePath = rootTablePath.toUri().relativize(targetPath.toUri()).toString();
FileWriter fileWriter = createParquetFileWriter(fileSystem, targetPath, dataColumns);
FileWriter fileWriter = createParquetFileWriter(targetPath.toString(), dataColumns);

DeltaLakeWriter writer = new DeltaLakeWriter(
fileSystem,
Expand All @@ -201,7 +195,7 @@ private List<Slice> rewriteFile(Path sourcePath, FileDeletion deletion)
}
}

private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, List<DeltaLakeColumnHandle> dataColumns)
private FileWriter createParquetFileWriter(String path, List<DeltaLakeColumnHandle> dataColumns)
{
ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder()
.setMaxBlockSize(getParquetWriterBlockSize(session))
Expand All @@ -210,7 +204,7 @@ private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, Lis
CompressionCodecName compressionCodecName = getCompressionCodec(session).getParquetCompressionCodec();

try {
Closeable rollbackAction = () -> fileSystem.delete(path, false);
Closeable rollbackAction = () -> fileSystem.deleteFile(path);

List<Type> parquetTypes = dataColumns.stream()
.map(column -> {
Expand All @@ -233,7 +227,7 @@ private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, Lis
false);

return new ParquetFileWriter(
fileSystem.create(path),
fileSystem.newOutputFile(path),
rollbackAction,
parquetTypes,
dataColumnNames,
Expand Down Expand Up @@ -304,7 +298,7 @@ private Optional<DataFileInfo> rewriteParquetFile(Path path, ImmutableLongBitmap
private ReaderPageSource createParquetPageSource(Path path)
throws IOException
{
TrinoInputFile inputFile = fileSystemFactory.create(session).newInputFile(path.toString());
TrinoInputFile inputFile = fileSystem.newInputFile(path.toString());

return ParquetPageSourceFactory.createPageSource(
inputFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import com.google.common.collect.Streams;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.parquet.writer.ParquetSchemaConverter;
Expand All @@ -42,7 +43,6 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -99,7 +99,7 @@ public class DeltaLakePageSink
private final List<Type> partitionColumnTypes;

private final PageIndexer pageIndexer;
private final HdfsEnvironment hdfsEnvironment;
private final TrinoFileSystem fileSystem;

private final int maxOpenWriters;

Expand All @@ -126,6 +126,7 @@ public DeltaLakePageSink(
List<String> originalPartitionColumns,
PageIndexerFactory pageIndexerFactory,
HdfsEnvironment hdfsEnvironment,
TrinoFileSystemFactory fileSystemFactory,
int maxOpenWriters,
JsonCodec<DataFileInfo> dataFileInfoCodec,
String outputPath,
Expand All @@ -138,7 +139,7 @@ public DeltaLakePageSink(

requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");

this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null").create(session);
this.maxOpenWriters = maxOpenWriters;
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");

Expand Down Expand Up @@ -223,12 +224,6 @@ public long getValidationCpuNanos()

@Override
public CompletableFuture<Collection<Slice>> finish()
{
ListenableFuture<Collection<Slice>> result = hdfsEnvironment.doAs(session.getIdentity(), this::doFinish);
return MoreFutures.toCompletableFuture(result);
}

private ListenableFuture<Collection<Slice>> doFinish()
{
for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) {
closeWriter(writerIndex);
Expand All @@ -237,16 +232,11 @@ private ListenableFuture<Collection<Slice>> doFinish()

List<Slice> result = dataFileInfos.build();

return Futures.immediateFuture(result);
return MoreFutures.toCompletableFuture(Futures.immediateFuture(result));
}

@Override
public void abort()
{
hdfsEnvironment.doAs(session.getIdentity(), this::doAbort);
}

private void doAbort()
{
List<Closeable> rollbackActions = Streams.concat(
writers.stream()
Expand Down Expand Up @@ -275,22 +265,18 @@ private void doAbort()
@Override
public CompletableFuture<?> appendPage(Page page)
{
if (page.getPositionCount() > 0) {
hdfsEnvironment.doAs(session.getIdentity(), () -> doAppend(page));
if (page.getPositionCount() == 0) {
return NOT_BLOCKED;
}

return NOT_BLOCKED;
}

private void doAppend(Page page)
{
while (page.getPositionCount() > MAX_PAGE_POSITIONS) {
Page chunk = page.getRegion(0, MAX_PAGE_POSITIONS);
page = page.getRegion(MAX_PAGE_POSITIONS, page.getPositionCount() - MAX_PAGE_POSITIONS);
writePage(chunk);
}

writePage(page);
return NOT_BLOCKED;
}

private void writePage(Page page)
Expand Down Expand Up @@ -389,29 +375,24 @@ private int[] getWriterIndexes(Page page)

FileWriter fileWriter;
if (isOptimizedParquetWriter) {
fileWriter = createParquetFileWriter(filePath);
fileWriter = createParquetFileWriter(filePath.toString());
}
else {
fileWriter = createRecordFileWriter(filePath);
}

Path rootTableLocation = new Path(outputPath);
try {
DeltaLakeWriter writer = new DeltaLakeWriter(
hdfsEnvironment.getFileSystem(session.getIdentity(), rootTableLocation, conf),
fileWriter,
rootTableLocation,
partitionName.map(partition -> new Path(partition, fileName).toString()).orElse(fileName),
partitionValues,
stats,
dataColumnHandles);

writers.set(writerIndex, writer);
memoryUsage += writer.getMemoryUsage();
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to create writer for location: " + outputPath, e);
}
DeltaLakeWriter writer = new DeltaLakeWriter(
fileSystem,
fileWriter,
rootTableLocation,
partitionName.map(partition -> new Path(partition, fileName).toString()).orElse(fileName),
partitionValues,
stats,
dataColumnHandles);

writers.set(writerIndex, writer);
memoryUsage += writer.getMemoryUsage();
}
verify(writers.size() == pageIndexer.getMaxIndex() + 1);
verify(!writers.contains(null));
Expand Down Expand Up @@ -470,7 +451,7 @@ public static List<String> createPartitionValues(List<Type> partitionColumnTypes
.collect(toList());
}

private FileWriter createParquetFileWriter(Path path)
private FileWriter createParquetFileWriter(String path)
{
ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder()
.setMaxBlockSize(getParquetWriterBlockSize(session))
Expand All @@ -479,8 +460,7 @@ private FileWriter createParquetFileWriter(Path path)
CompressionCodecName compressionCodecName = getCompressionCodec(session).getParquetCompressionCodec();

try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf);
Closeable rollbackAction = () -> fileSystem.delete(path, false);
Closeable rollbackAction = () -> fileSystem.deleteFile(path);

List<Type> parquetTypes = dataColumnTypes.stream()
.map(type -> {
Expand All @@ -501,7 +481,7 @@ private FileWriter createParquetFileWriter(Path path)

ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(parquetTypes, dataColumnNames, false, false);
return new ParquetFileWriter(
fileSystem.create(path),
fileSystem.newOutputFile(path),
rollbackAction,
parquetTypes,
dataColumnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
tableHandle.getPartitionedBy(),
pageIndexerFactory,
hdfsEnvironment,
fileSystemFactory,
maxPartitionsPerWriter,
dataFileInfoCodec,
tableHandle.getLocation(),
Expand All @@ -102,6 +103,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
tableHandle.getMetadataEntry().getOriginalPartitionColumns(),
pageIndexerFactory,
hdfsEnvironment,
fileSystemFactory,
maxPartitionsPerWriter,
dataFileInfoCodec,
tableHandle.getLocation(),
Expand All @@ -123,6 +125,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
optimizeHandle.getOriginalPartitionColumns(),
pageIndexerFactory,
hdfsEnvironment,
fileSystemFactory,
maxPartitionsPerWriter,
dataFileInfoCodec,
executeHandle.getTableLocation(),
Expand All @@ -144,7 +147,6 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction

return new DeltaLakeMergeSink(
fileSystemFactory,
hdfsEnvironment,
session,
parquetDateTimeZone,
trinoVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
Expand Down Expand Up @@ -101,9 +102,8 @@ public class DeltaLakeUpdatablePageSource
private final long fileSize;
private final ConnectorSession session;
private final ExecutorService executorService;
private final TrinoFileSystemFactory fileSystemFactory;
private final TrinoFileSystem fileSystem;
private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
private final DateTimeZone parquetDateTimeZone;
private final ParquetReaderOptions parquetReaderOptions;
private final TypeManager typeManager;
Expand Down Expand Up @@ -148,9 +148,8 @@ public DeltaLakeUpdatablePageSource(
this.fileSize = fileSize;
this.session = requireNonNull(session, "session is null");
this.executorService = requireNonNull(executorService, "executorService is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.fileSystem = fileSystemFactory.create(session);
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null");
this.parquetDateTimeZone = requireNonNull(parquetDateTimeZone, "parquetDateTimeZone is null");
this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand Down Expand Up @@ -570,7 +569,7 @@ private DataFileInfo copyParquetPageSource(DeltaLakeWriter fileWriter)
private ReaderPageSource createParquetPageSource(TupleDomain<HiveColumnHandle> parquetPredicate, List<HiveColumnHandle> columns)
{
return ParquetPageSourceFactory.createPageSource(
fileSystemFactory.create(session).newInputFile(path, fileSize),
fileSystem.newInputFile(path, fileSize),
0,
fileSize,
columns,
Expand Down Expand Up @@ -612,7 +611,7 @@ private DeltaLakeWriter createWriter(Path targetFile, List<DeltaLakeColumnMetada
.collect(toImmutableList()));

return new DeltaLakeWriter(
hdfsEnvironment.getFileSystem(hdfsContext, targetFile),
fileSystem,
recordFileWriter,
tablePath,
relativePath.toString(),
Expand Down
Loading

0 comments on commit 0ebcf02

Please sign in to comment.