diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index a976fcce7e16..70d9dd653490 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -17,10 +17,9 @@ import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; -import io.trino.hdfs.HdfsContext; -import io.trino.hdfs.HdfsEnvironment; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.writer.ParquetSchemaConverter; import io.trino.parquet.writer.ParquetWriterOptions; @@ -40,7 +39,6 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.joda.time.DateTimeZone; @@ -83,8 +81,7 @@ public class DeltaLakeMergeSink { private static final JsonCodec> PARTITIONS_CODEC = listJsonCodec(String.class); - private final TrinoFileSystemFactory fileSystemFactory; - private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystem fileSystem; private final ConnectorSession session; private final DateTimeZone parquetDateTimeZone; private final String trinoVersion; @@ -99,7 +96,6 @@ public class DeltaLakeMergeSink public DeltaLakeMergeSink( TrinoFileSystemFactory fileSystemFactory, - HdfsEnvironment hdfsEnvironment, ConnectorSession session, DateTimeZone parquetDateTimeZone, String trinoVersion, @@ -110,9 +106,8 @@ public DeltaLakeMergeSink( ConnectorPageSink insertPageSink, List tableColumns) { - this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.session = requireNonNull(session, "session is null"); + this.fileSystem = fileSystemFactory.create(session); this.parquetDateTimeZone = requireNonNull(parquetDateTimeZone, "parquetDateTimeZone is null"); this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null"); @@ -176,11 +171,10 @@ private List rewriteFile(Path sourcePath, FileDeletion deletion) try { Path rootTablePath = new Path(rootTableLocation); String sourceRelativePath = rootTablePath.toUri().relativize(sourcePath.toUri()).toString(); - FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session.getIdentity()), rootTablePath); Path targetPath = new Path(sourcePath.getParent(), session.getQueryId() + "_" + randomUUID()); String targetRelativePath = rootTablePath.toUri().relativize(targetPath.toUri()).toString(); - FileWriter fileWriter = createParquetFileWriter(fileSystem, targetPath, dataColumns); + FileWriter fileWriter = createParquetFileWriter(targetPath.toString(), dataColumns); DeltaLakeWriter writer = new DeltaLakeWriter( fileSystem, @@ -201,7 +195,7 @@ private List rewriteFile(Path sourcePath, FileDeletion deletion) } } - private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, List dataColumns) + private FileWriter createParquetFileWriter(String path, List dataColumns) { ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder() .setMaxBlockSize(getParquetWriterBlockSize(session)) @@ -210,7 +204,7 @@ private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, Lis CompressionCodecName compressionCodecName = getCompressionCodec(session).getParquetCompressionCodec(); try { - Closeable rollbackAction = () -> fileSystem.delete(path, false); + Closeable rollbackAction = () -> fileSystem.deleteFile(path); List parquetTypes = dataColumns.stream() .map(column -> { @@ -233,7 +227,7 @@ private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, Lis false); return new ParquetFileWriter( - fileSystem.create(path), + fileSystem.newOutputFile(path), rollbackAction, parquetTypes, dataColumnNames, @@ -304,7 +298,7 @@ private Optional rewriteParquetFile(Path path, ImmutableLongBitmap private ReaderPageSource createParquetPageSource(Path path) throws IOException { - TrinoInputFile inputFile = fileSystemFactory.create(session).newInputFile(path.toString()); + TrinoInputFile inputFile = fileSystem.newInputFile(path.toString()); return ParquetPageSourceFactory.createPageSource( inputFile, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java index 7c454c3fa8f5..dc968048adfe 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java @@ -17,11 +17,12 @@ import com.google.common.collect.Streams; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.MoreFutures; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; import io.airlift.slice.Slice; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.parquet.writer.ParquetSchemaConverter; @@ -42,7 +43,6 @@ import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.mapred.JobConf; @@ -99,7 +99,7 @@ public class DeltaLakePageSink private final List partitionColumnTypes; private final PageIndexer pageIndexer; - private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystem fileSystem; private final int maxOpenWriters; @@ -126,6 +126,7 @@ public DeltaLakePageSink( List originalPartitionColumns, PageIndexerFactory pageIndexerFactory, HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, int maxOpenWriters, JsonCodec dataFileInfoCodec, String outputPath, @@ -138,7 +139,7 @@ public DeltaLakePageSink( requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null").create(session); this.maxOpenWriters = maxOpenWriters; this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null"); @@ -223,12 +224,6 @@ public long getValidationCpuNanos() @Override public CompletableFuture> finish() - { - ListenableFuture> result = hdfsEnvironment.doAs(session.getIdentity(), this::doFinish); - return MoreFutures.toCompletableFuture(result); - } - - private ListenableFuture> doFinish() { for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { closeWriter(writerIndex); @@ -237,16 +232,11 @@ private ListenableFuture> doFinish() List result = dataFileInfos.build(); - return Futures.immediateFuture(result); + return MoreFutures.toCompletableFuture(Futures.immediateFuture(result)); } @Override public void abort() - { - hdfsEnvironment.doAs(session.getIdentity(), this::doAbort); - } - - private void doAbort() { List rollbackActions = Streams.concat( writers.stream() @@ -275,15 +265,10 @@ private void doAbort() @Override public CompletableFuture appendPage(Page page) { - if (page.getPositionCount() > 0) { - hdfsEnvironment.doAs(session.getIdentity(), () -> doAppend(page)); + if (page.getPositionCount() == 0) { + return NOT_BLOCKED; } - return NOT_BLOCKED; - } - - private void doAppend(Page page) - { while (page.getPositionCount() > MAX_PAGE_POSITIONS) { Page chunk = page.getRegion(0, MAX_PAGE_POSITIONS); page = page.getRegion(MAX_PAGE_POSITIONS, page.getPositionCount() - MAX_PAGE_POSITIONS); @@ -291,6 +276,7 @@ private void doAppend(Page page) } writePage(page); + return NOT_BLOCKED; } private void writePage(Page page) @@ -389,29 +375,24 @@ private int[] getWriterIndexes(Page page) FileWriter fileWriter; if (isOptimizedParquetWriter) { - fileWriter = createParquetFileWriter(filePath); + fileWriter = createParquetFileWriter(filePath.toString()); } else { fileWriter = createRecordFileWriter(filePath); } Path rootTableLocation = new Path(outputPath); - try { - DeltaLakeWriter writer = new DeltaLakeWriter( - hdfsEnvironment.getFileSystem(session.getIdentity(), rootTableLocation, conf), - fileWriter, - rootTableLocation, - partitionName.map(partition -> new Path(partition, fileName).toString()).orElse(fileName), - partitionValues, - stats, - dataColumnHandles); - - writers.set(writerIndex, writer); - memoryUsage += writer.getMemoryUsage(); - } - catch (IOException e) { - throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to create writer for location: " + outputPath, e); - } + DeltaLakeWriter writer = new DeltaLakeWriter( + fileSystem, + fileWriter, + rootTableLocation, + partitionName.map(partition -> new Path(partition, fileName).toString()).orElse(fileName), + partitionValues, + stats, + dataColumnHandles); + + writers.set(writerIndex, writer); + memoryUsage += writer.getMemoryUsage(); } verify(writers.size() == pageIndexer.getMaxIndex() + 1); verify(!writers.contains(null)); @@ -470,7 +451,7 @@ public static List createPartitionValues(List partitionColumnTypes .collect(toList()); } - private FileWriter createParquetFileWriter(Path path) + private FileWriter createParquetFileWriter(String path) { ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder() .setMaxBlockSize(getParquetWriterBlockSize(session)) @@ -479,8 +460,7 @@ private FileWriter createParquetFileWriter(Path path) CompressionCodecName compressionCodecName = getCompressionCodec(session).getParquetCompressionCodec(); try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf); - Closeable rollbackAction = () -> fileSystem.delete(path, false); + Closeable rollbackAction = () -> fileSystem.deleteFile(path); List parquetTypes = dataColumnTypes.stream() .map(type -> { @@ -501,7 +481,7 @@ private FileWriter createParquetFileWriter(Path path) ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(parquetTypes, dataColumnNames, false, false); return new ParquetFileWriter( - fileSystem.create(path), + fileSystem.newOutputFile(path), rollbackAction, parquetTypes, dataColumnNames, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java index 564df6614ff4..b5bb49e5a3e6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java @@ -84,6 +84,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa tableHandle.getPartitionedBy(), pageIndexerFactory, hdfsEnvironment, + fileSystemFactory, maxPartitionsPerWriter, dataFileInfoCodec, tableHandle.getLocation(), @@ -102,6 +103,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa tableHandle.getMetadataEntry().getOriginalPartitionColumns(), pageIndexerFactory, hdfsEnvironment, + fileSystemFactory, maxPartitionsPerWriter, dataFileInfoCodec, tableHandle.getLocation(), @@ -123,6 +125,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa optimizeHandle.getOriginalPartitionColumns(), pageIndexerFactory, hdfsEnvironment, + fileSystemFactory, maxPartitionsPerWriter, dataFileInfoCodec, executeHandle.getTableLocation(), @@ -144,7 +147,6 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction return new DeltaLakeMergeSink( fileSystemFactory, - hdfsEnvironment, session, parquetDateTimeZone, trinoVersion, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeUpdatablePageSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeUpdatablePageSource.java index d3b6966ddea8..57dc313f7b13 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeUpdatablePageSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeUpdatablePageSource.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableSet; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; @@ -101,9 +102,8 @@ public class DeltaLakeUpdatablePageSource private final long fileSize; private final ConnectorSession session; private final ExecutorService executorService; - private final TrinoFileSystemFactory fileSystemFactory; + private final TrinoFileSystem fileSystem; private final HdfsEnvironment hdfsEnvironment; - private final HdfsContext hdfsContext; private final DateTimeZone parquetDateTimeZone; private final ParquetReaderOptions parquetReaderOptions; private final TypeManager typeManager; @@ -148,9 +148,8 @@ public DeltaLakeUpdatablePageSource( this.fileSize = fileSize; this.session = requireNonNull(session, "session is null"); this.executorService = requireNonNull(executorService, "executorService is null"); - this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.fileSystem = fileSystemFactory.create(session); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); - this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null"); this.parquetDateTimeZone = requireNonNull(parquetDateTimeZone, "parquetDateTimeZone is null"); this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); @@ -570,7 +569,7 @@ private DataFileInfo copyParquetPageSource(DeltaLakeWriter fileWriter) private ReaderPageSource createParquetPageSource(TupleDomain parquetPredicate, List columns) { return ParquetPageSourceFactory.createPageSource( - fileSystemFactory.create(session).newInputFile(path, fileSize), + fileSystem.newInputFile(path, fileSize), 0, fileSize, columns, @@ -612,7 +611,7 @@ private DeltaLakeWriter createWriter(Path targetFile, List dataColumnNames, List dataColumnTypes, @@ -189,9 +193,15 @@ private static DeltaLakeJsonFileStatistics readStatistics( typeForColumn.put(dataColumnNames.get(i), dataColumnTypes.get(i)); } - ImmutableMultimap.Builder metadataForColumn = ImmutableMultimap.builder(); - try (ParquetFileReader parquetReader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(tableLocation, relativeFilePath), fs.getConf()))) { - for (BlockMetaData blockMetaData : parquetReader.getRowGroups()) { + TrinoInputFile inputFile = fileSystem.newInputFile(new Path(tableLocation, relativeFilePath).toString()); + try (TrinoParquetDataSource trinoParquetDataSource = new TrinoParquetDataSource( + inputFile, + new ParquetReaderOptions(), + new FileFormatDataSourceStats())) { + ParquetMetadata parquetMetadata = MetadataReader.readFooter(trinoParquetDataSource, Optional.empty()); + + ImmutableMultimap.Builder metadataForColumn = ImmutableMultimap.builder(); + for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) { for (ColumnChunkMetaData columnChunkMetaData : blockMetaData.getColumns()) { if (columnChunkMetaData.getPath().size() != 1) { continue; // Only base column stats are supported @@ -200,9 +210,9 @@ private static DeltaLakeJsonFileStatistics readStatistics( metadataForColumn.put(columnName, columnChunkMetaData); } } - } - return mergeStats(metadataForColumn.build(), typeForColumn.buildOrThrow(), rowCount); + return mergeStats(metadataForColumn.build(), typeForColumn.buildOrThrow(), rowCount); + } } @VisibleForTesting diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index 8a3b39d35b1b..0054de61cbef 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -126,7 +126,7 @@ public class HiveWriterFactory private static final Pattern BUCKET_FROM_FILENAME_PATTERN = Pattern.compile("(0[0-9]+)_.*"); private final Set fileWriterFactories; - private final TrinoFileSystemFactory fileSystemFactory; + private final TrinoFileSystem fileSystem; private final String schemaName; private final String tableName; private final AcidTransaction transaction; @@ -200,7 +200,7 @@ public HiveWriterFactory( HiveWriterStats hiveWriterStats) { this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); - this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.fileSystem = fileSystemFactory.create(session); this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.transaction = requireNonNull(transaction, "transaction is null"); @@ -361,7 +361,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt // verify that the target directory for the partition does not already exist String writeInfoTargetPath = writeInfo.getTargetPath().toString(); try { - if (fileSystemFactory.create(session).newInputFile(writeInfoTargetPath).exists()) { + if (fileSystem.newInputFile(writeInfoTargetPath).exists()) { throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format( "Target directory for new partition '%s' of table '%s.%s' already exists: %s", partitionName, @@ -577,7 +577,6 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt }; if (!sortedBy.isEmpty()) { - TrinoFileSystem fileSystem; Path tempFilePath; if (sortedWritingTempStagingPathEnabled) { String tempPrefix = sortedWritingTempStagingPath.replace( @@ -589,7 +588,6 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt else { tempFilePath = new Path(path.getParent(), ".tmp-sort." + path.getName()); } - fileSystem = fileSystemFactory.create(session); List types = dataColumns.stream() .map(column -> column.getHiveType().getType(typeManager, getTimestampPrecision(session))) @@ -645,7 +643,6 @@ public SortingFileWriter makeRowIdSortingWriter(FileWriter deleteFileWriter, Pat { String parentPath = setSchemeToFileIfAbsent(path.getParent().toString()); Path tempFilePath = new Path(parentPath, ".tmp-sort." + path.getName()); - TrinoFileSystem fileSystem = fileSystemFactory.create(session); // The ORC columns are: operation, originalTransaction, bucket, rowId, row // The deleted rows should be sorted by originalTransaction, then by rowId List sortFields = ImmutableList.of(1, 3); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/HdfsParquetDataSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/HdfsParquetDataSource.java deleted file mode 100644 index 971fd502bb47..000000000000 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/HdfsParquetDataSource.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hive.parquet; - -import io.airlift.slice.Slice; -import io.trino.hdfs.FSDataInputStreamTail; -import io.trino.parquet.AbstractParquetDataSource; -import io.trino.parquet.ParquetDataSourceId; -import io.trino.parquet.ParquetReaderOptions; -import io.trino.plugin.hive.FileFormatDataSourceStats; -import io.trino.spi.TrinoException; -import org.apache.hadoop.fs.FSDataInputStream; - -import java.io.IOException; - -import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; -import static java.lang.String.format; - -public class HdfsParquetDataSource - extends AbstractParquetDataSource -{ - private final FSDataInputStream inputStream; - private final FileFormatDataSourceStats stats; - - public HdfsParquetDataSource( - ParquetDataSourceId id, - long estimatedSize, - FSDataInputStream inputStream, - FileFormatDataSourceStats stats, - ParquetReaderOptions options) - { - super(id, estimatedSize, options); - this.inputStream = inputStream; - this.stats = stats; - } - - @Override - public void close() - throws IOException - { - inputStream.close(); - } - - @Override - protected Slice readTailInternal(int length) - { - try { - // Handle potentially imprecise file lengths by reading the footer - long readStart = System.nanoTime(); - FSDataInputStreamTail fileTail = FSDataInputStreamTail.readTail(getId().toString(), getEstimatedSize(), inputStream, length); - Slice tailSlice = fileTail.getTailSlice(); - stats.readDataBytesPerSecond(tailSlice.length(), System.nanoTime() - readStart); - return tailSlice; - } - catch (IOException e) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error reading tail from %s with length %s", getId(), length), e); - } - } - - @Override - protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength) - { - try { - long readStart = System.nanoTime(); - inputStream.readFully(position, buffer, bufferOffset, bufferLength); - stats.readDataBytesPerSecond(bufferLength, System.nanoTime() - readStart); - } - catch (IOException e) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error reading from %s at position %s", getId(), position), e); - } - } -} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java index d31e47a7c0b7..8b8991958ab0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive.parquet; import com.google.common.collect.ImmutableList; +import io.trino.filesystem.TrinoOutputFile; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.writer.ParquetWriter; import io.trino.parquet.writer.ParquetWriterOptions; @@ -62,7 +63,7 @@ public class ParquetFileWriter private long validationCpuNanos; public ParquetFileWriter( - OutputStream outputStream, + TrinoOutputFile outputFile, Closeable rollbackAction, List fileColumnTypes, List fileColumnNames, @@ -74,8 +75,9 @@ public ParquetFileWriter( String trinoVersion, Optional parquetTimeZone, Optional> validationInputFactory) + throws IOException { - requireNonNull(outputStream, "outputStream is null"); + OutputStream outputStream = outputFile.create(); requireNonNull(trinoVersion, "trinoVersion is null"); this.validationInputFactory = requireNonNull(validationInputFactory, "validationInputFactory is null"); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java index 1e228b7b892f..3caca76235b5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java @@ -13,9 +13,10 @@ */ package io.trino.plugin.hive.parquet; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetDataSource; -import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.writer.ParquetSchemaConverter; import io.trino.parquet.writer.ParquetWriterOptions; @@ -32,7 +33,6 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; import org.apache.hadoop.mapred.JobConf; @@ -66,7 +66,7 @@ public class ParquetFileWriterFactory implements HiveFileWriterFactory { - private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystemFactory fileSystemFactory; private final NodeVersion nodeVersion; private final TypeManager typeManager; private final DateTimeZone parquetTimeZone; @@ -74,13 +74,13 @@ public class ParquetFileWriterFactory @Inject public ParquetFileWriterFactory( - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, NodeVersion nodeVersion, TypeManager typeManager, HiveConfig hiveConfig, FileFormatDataSourceStats readStats) { - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.parquetTimeZone = hiveConfig.getParquetDateTimeZone(); @@ -125,10 +125,11 @@ public Optional createFileWriter( .mapToInt(inputColumnNames::indexOf) .toArray(); + String pathString = path.toString(); try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); - Closeable rollbackAction = () -> fileSystem.delete(path, false); + Closeable rollbackAction = () -> fileSystem.deleteFile(pathString); ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter( fileColumnTypes, @@ -140,12 +141,8 @@ public Optional createFileWriter( if (isParquetOptimizedWriterValidate(session)) { validationInputFactory = Optional.of(() -> { try { - return new HdfsParquetDataSource( - new ParquetDataSourceId(path.toString()), - fileSystem.getFileStatus(path).getLen(), - fileSystem.open(path), - readStats, - new ParquetReaderOptions()); + TrinoInputFile inputFile = fileSystem.newInputFile(pathString); + return new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), readStats); } catch (IOException e) { throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); @@ -154,7 +151,7 @@ public Optional createFileWriter( } return Optional.of(new ParquetFileWriter( - fileSystem.create(path, false), + fileSystem.newOutputFile(pathString), rollbackAction, fileColumnTypes, fileColumnNames, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index a61ea6e4f7f0..336562017127 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -456,7 +456,7 @@ public void testOptimizedParquetWriter(int rowCount) .withSession(session) .withColumns(testColumns) .withRowsCount(rowCount) - .withFileWriterFactory(new ParquetFileWriterFactory(HDFS_ENVIRONMENT, new NodeVersion("test-version"), TESTING_TYPE_MANAGER, new HiveConfig(), STATS)) + .withFileWriterFactory(new ParquetFileWriterFactory(HDFS_FILE_SYSTEM_FACTORY, new NodeVersion("test-version"), TESTING_TYPE_MANAGER, new HiveConfig(), STATS)) .isReadableByPageSource(PARQUET_PAGE_SOURCE_FACTORY); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetTester.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetTester.java index 34c9543453fc..271a853977f6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetTester.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetTester.java @@ -22,7 +22,8 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; -import io.trino.parquet.ParquetDataSourceId; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.writer.ParquetSchemaConverter; import io.trino.parquet.writer.ParquetWriter; @@ -63,7 +64,6 @@ import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; import io.trino.testing.TestingConnectorSession; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; @@ -113,6 +113,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED; import static io.trino.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; import static io.trino.plugin.hive.util.HiveUtil.isArrayType; import static io.trino.plugin.hive.util.HiveUtil.isMapType; @@ -776,15 +777,10 @@ private static void writeParquetColumnTrino( pageBuilder.declarePositions(size); writer.write(pageBuilder.build()); writer.close(); - Path path = new Path(outputFile.getPath()); - FileSystem fileSystem = HDFS_ENVIRONMENT.getFileSystem(SESSION.getIdentity(), path, newEmptyConfiguration()); + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); try { - writer.validate(new HdfsParquetDataSource( - new ParquetDataSourceId(path.toString()), - fileSystem.getFileStatus(path).getLen(), - fileSystem.open(path), - new FileFormatDataSourceStats(), - new ParquetReaderOptions())); + TrinoInputFile inputFile = fileSystem.newInputFile(outputFile.getPath()); + writer.validate(new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats())); } catch (IOException e) { throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java index 5fa0c0c6af75..071fc45f6cee 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHdfsParquetDataSource.java @@ -16,40 +16,42 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ListMultimap; -import io.airlift.slice.BasicSliceInput; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.trino.filesystem.TrinoFileSystem; import io.trino.parquet.ChunkReader; import io.trino.parquet.DiskRange; -import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetReaderOptions; import io.trino.plugin.hive.FileFormatDataSourceStats; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.IOException; -import java.io.InputStream; +import java.io.OutputStream; +import java.util.UUID; import java.util.stream.IntStream; -import static java.util.Objects.requireNonNull; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; +import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static org.assertj.core.api.Assertions.assertThat; public class TestHdfsParquetDataSource { @Test(dataProvider = "testPlanReadOrderingProvider") public void testPlanReadOrdering(DataSize maxBufferSize) + throws IOException { Slice testingInput = Slices.wrappedIntArray(IntStream.range(0, 1000).toArray()); - HdfsParquetDataSource dataSource = new HdfsParquetDataSource( - new ParquetDataSourceId("test"), - 0, - new FSDataInputStream(new TestingSliceInputStream(testingInput.getInput())), - new FileFormatDataSourceStats(), - new ParquetReaderOptions().withMaxBufferSize(maxBufferSize)); + String path = "/tmp/" + UUID.randomUUID(); + TrinoFileSystem trinoFileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + try (OutputStream outputStream = trinoFileSystem.newOutputFile(path).create()) { + outputStream.write(testingInput.getBytes()); + } + TrinoParquetDataSource dataSource = new TrinoParquetDataSource( + trinoFileSystem.newInputFile(path), + new ParquetReaderOptions().withMaxBufferSize(maxBufferSize), + new FileFormatDataSourceStats()); ListMultimap chunkReaders = dataSource.planRead(ImmutableListMultimap.builder() .putAll("test", new DiskRange(0, 300), new DiskRange(400, 100), new DiskRange(700, 200)) @@ -71,67 +73,4 @@ public Object[][] testPlanReadOrderingProvider() {DataSize.ofBytes(100000000)}, // All small ranges }; } - - private static class TestingSliceInputStream - extends InputStream - implements Seekable, PositionedReadable - { - private final BasicSliceInput sliceInput; - - public TestingSliceInputStream(BasicSliceInput sliceInput) - { - this.sliceInput = requireNonNull(sliceInput, "sliceInput is null"); - } - - @Override - public int read(long position, byte[] buffer, int offset, int length) - throws IOException - { - long currentPosition = sliceInput.position(); - sliceInput.setPosition(position); - int bytesRead = sliceInput.read(buffer, offset, length); - sliceInput.setPosition(currentPosition); - return bytesRead; - } - - @Override - public void readFully(long position, byte[] buffer, int offset, int length) - { - long currentPosition = sliceInput.position(); - sliceInput.setPosition(position); - sliceInput.readFully(buffer, offset, length); - sliceInput.setPosition(currentPosition); - } - - @Override - public void readFully(long position, byte[] buffer) - { - readFully(position, buffer, 0, buffer.length); - } - - @Override - public void seek(long pos) - { - sliceInput.setPosition(pos); - } - - @Override - public long getPos() - { - return sliceInput.position(); - } - - @Override - public boolean seekToNewSource(long targetPos) - { - throw new UnsupportedOperationException(); - } - - @Override - public int read() - throws IOException - { - return sliceInput.read(); - } - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 0b16a0bb0e10..6bba6c9230eb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -19,6 +19,7 @@ import io.airlift.units.DataSize; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoOutputFile; import io.trino.orc.OrcDataSink; import io.trino.orc.OrcDataSource; import io.trino.orc.OrcReaderOptions; @@ -43,7 +44,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; import java.util.List; import java.util.Map; import java.util.Optional; @@ -170,7 +170,7 @@ private IcebergFileWriter createParquetWriter( .collect(toImmutableList()); try { - OutputStream outputStream = fileSystem.newOutputFile(outputPath).create(); + TrinoOutputFile outputFile = fileSystem.newOutputFile(outputPath); Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath); @@ -182,7 +182,7 @@ private IcebergFileWriter createParquetWriter( return new IcebergParquetFileWriter( metricsConfig, - outputStream, + outputFile, rollbackAction, fileColumnTypes, fileColumnNames, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index 6664a326341c..faa7f978f6bb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoOutputFile; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.plugin.hive.parquet.ParquetFileWriter; import io.trino.spi.type.Type; @@ -24,7 +25,7 @@ import org.apache.parquet.schema.MessageType; import java.io.Closeable; -import java.io.OutputStream; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -42,7 +43,7 @@ public class IcebergParquetFileWriter public IcebergParquetFileWriter( MetricsConfig metricsConfig, - OutputStream outputStream, + TrinoOutputFile outputFile, Closeable rollbackAction, List fileColumnTypes, List fileColumnNames, @@ -54,8 +55,9 @@ public IcebergParquetFileWriter( String trinoVersion, String outputPath, TrinoFileSystem fileSystem) + throws IOException { - super(outputStream, + super(outputFile, rollbackAction, fileColumnTypes, fileColumnNames,