Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port OrcFileWriterFactory.createOrcDataSink to TrinoFileSystem #14627

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airlift.event.client.EventClient;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.HivePageSinkMetadataProvider;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class HivePageSinkProvider
implements ConnectorPageSinkProvider
{
private final Set<HiveFileWriterFactory> fileWriterFactories;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final PageSorter pageSorter;
private final HiveMetastoreFactory metastoreFactory;
Expand All @@ -79,6 +81,7 @@ public class HivePageSinkProvider
@Inject
public HivePageSinkProvider(
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
PageSorter pageSorter,
HiveMetastoreFactory metastoreFactory,
Expand All @@ -93,6 +96,7 @@ public HivePageSinkProvider(
HiveWriterStats hiveWriterStats)
{
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
Expand Down Expand Up @@ -154,6 +158,7 @@ private HivePageSink createPageSink(HiveWritableTableHandle handle, boolean isCr

HiveWriterFactory writerFactory = new HiveWriterFactory(
fileWriterFactories,
fileSystemFactory,
handle.getSchemaName(),
handle.getTableName(),
isCreateTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
*/
package io.trino.plugin.hive;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.event.client.EventClient;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior;
Expand All @@ -44,7 +47,6 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -86,7 +88,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_READ_ONLY;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
import static io.trino.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
Expand All @@ -112,7 +113,6 @@
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
import static org.apache.hadoop.hive.ql.io.AcidUtils.deleteDeltaSubdir;
import static org.apache.hadoop.hive.ql.io.AcidUtils.deltaSubdir;
Expand All @@ -126,6 +126,7 @@ public class HiveWriterFactory
private static final Pattern BUCKET_FROM_FILENAME_PATTERN = Pattern.compile("(0[0-9]+)_.*");

private final Set<HiveFileWriterFactory> fileWriterFactories;
private final TrinoFileSystemFactory fileSystemFactory;
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
private final String schemaName;
private final String tableName;
private final AcidTransaction transaction;
Expand All @@ -146,7 +147,6 @@ public class HiveWriterFactory

private final HivePageSinkMetadataProvider pageSinkMetadataProvider;
private final TypeManager typeManager;
private final HdfsEnvironment hdfsEnvironment;
private final PageSorter pageSorter;
private final JobConf conf;

Expand All @@ -172,6 +172,7 @@ public class HiveWriterFactory

public HiveWriterFactory(
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
String schemaName,
String tableName,
boolean isCreateTable,
Expand Down Expand Up @@ -199,6 +200,7 @@ public HiveWriterFactory(
HiveWriterStats hiveWriterStats)
{
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.transaction = requireNonNull(transaction, "transaction is null");
Expand All @@ -214,7 +216,6 @@ public HiveWriterFactory(

this.typeManager = requireNonNull(typeManager, "typeManager is null");

this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.sortBufferSize = requireNonNull(sortBufferSize, "sortBufferSize is null");
this.maxOpenSortFiles = maxOpenSortFiles;
Expand Down Expand Up @@ -358,13 +359,19 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
if (!writeInfo.getWriteMode().isWritePathSameAsTargetPath()) {
// When target path is different from write path,
// verify that the target directory for the partition does not already exist
if (HiveWriteUtils.pathExists(new HdfsContext(session), hdfsEnvironment, writeInfo.getTargetPath())) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format(
"Target directory for new partition '%s' of table '%s.%s' already exists: %s",
partitionName,
schemaName,
tableName,
writeInfo.getTargetPath()));
String writeInfoTargetPath = writeInfo.getTargetPath().toString();
try {
if (fileSystemFactory.create(session).newInputFile(writeInfoTargetPath).exists()) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format(
"Target directory for new partition '%s' of table '%s.%s' already exists: %s",
partitionName,
schemaName,
tableName,
writeInfo.getTargetPath()));
}
}
catch (IOException e) {
throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error while accessing: %s", writeInfoTargetPath), e);
}
}
}
Expand Down Expand Up @@ -570,26 +577,19 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
};

if (!sortedBy.isEmpty()) {
FileSystem fileSystem;
TrinoFileSystem fileSystem;
Path tempFilePath;
if (sortedWritingTempStagingPathEnabled) {
String tempPrefix = sortedWritingTempStagingPath.replace(
"${USER}",
new HdfsContext(session).getIdentity().getUser());
tempPrefix = setSchemeToFileIfAbsent(tempPrefix);
tempFilePath = new Path(tempPrefix, ".tmp-sort." + path.getParent().getName() + "." + path.getName());
}
else {
tempFilePath = new Path(path.getParent(), ".tmp-sort." + path.getName());
}
try {
Configuration configuration = new Configuration(outputConf);
// Explicitly set the default FS to local file system to avoid getting HDFS when sortedWritingTempStagingPath specifies no scheme
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
configuration.set(FS_DEFAULT_NAME_KEY, "file:///");
fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), tempFilePath, configuration);
}
catch (IOException e) {
throw new TrinoException(HIVE_WRITER_OPEN_ERROR, e);
}
fileSystem = fileSystemFactory.create(session);

List<Type> types = dataColumns.stream()
.map(column -> column.getHiveType().getType(typeManager, getTimestampPrecision(session)))
Expand Down Expand Up @@ -643,17 +643,9 @@ public interface RowIdSortingFileWriterMaker

public SortingFileWriter makeRowIdSortingWriter(FileWriter deleteFileWriter, Path path)
{
FileSystem fileSystem;
Path tempFilePath = new Path(path.getParent(), ".tmp-sort." + path.getName());
try {
Configuration configuration = new Configuration(conf);
// Explicitly set the default FS to local file system to avoid getting HDFS when sortedWritingTempStagingPath specifies no scheme
configuration.set(FS_DEFAULT_NAME_KEY, "file:///");
fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), tempFilePath, configuration);
}
catch (IOException e) {
throw new TrinoException(HIVE_WRITER_OPEN_ERROR, e);
}
String parentPath = setSchemeToFileIfAbsent(path.getParent().toString());
Path tempFilePath = new Path(parentPath, ".tmp-sort." + path.getName());
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
// The ORC columns are: operation, originalTransaction, bucket, rowId, row
// The deleted rows should be sorted by originalTransaction, then by rowId
List<Integer> sortFields = ImmutableList.of(1, 3);
Expand Down Expand Up @@ -816,6 +808,17 @@ public static String getFileExtension(JobConf conf, StorageFormat storageFormat)
}
}

@VisibleForTesting
static String setSchemeToFileIfAbsent(String pathString)
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
{
Path path = new Path(pathString);
String scheme = path.toUri().getScheme();
if (scheme == null || scheme.equals("")) {
return "file:///" + pathString;
}
return pathString;
}

private static class DataColumn
{
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.common.io.Closer;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.orc.OrcDataSink;
import io.trino.orc.OrcDataSource;
import io.trino.orc.OrcDataSourceId;
Expand All @@ -32,7 +34,6 @@
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.openjdk.jol.info.ClassLayout;

Expand Down Expand Up @@ -66,7 +67,7 @@ public class SortingFileWriter

private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(SortingFileWriter.class).instanceSize());

private final FileSystem fileSystem;
private final TrinoFileSystem fileSystem;
private final Path tempFilePrefix;
private final int maxOpenTempFiles;
private final List<Type> types;
Expand All @@ -80,7 +81,7 @@ public class SortingFileWriter
private final TypeOperators typeOperators;

public SortingFileWriter(
FileSystem fileSystem,
TrinoFileSystem fileSystem,
Path tempFilePrefix,
FileWriter outputWriter,
DataSize maxMemory,
Expand Down Expand Up @@ -212,12 +213,13 @@ private void mergeFiles(Iterable<TempFile> files, Consumer<Page> consumer)
Collection<Iterator<Page>> iterators = new ArrayList<>();

for (TempFile tempFile : files) {
Path file = tempFile.getPath();
String file = tempFile.getPath();
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
TrinoInputFile inputFile = fileSystem.newInputFile(file);
OrcDataSource dataSource = new HdfsOrcDataSource(
new OrcDataSourceId(file.toString()),
fileSystem.getFileStatus(file).getLen(),
new OrcDataSourceId(file),
inputFile.length(),
new OrcReaderOptions(),
fileSystem.open(file),
inputFile,
new FileFormatDataSourceStats());
closer.register(dataSource);
iterators.add(new TempFileReader(types, dataSource));
Expand All @@ -227,10 +229,7 @@ private void mergeFiles(Iterable<TempFile> files, Consumer<Page> consumer)
.forEachRemaining(consumer);

for (TempFile tempFile : files) {
Path file = tempFile.getPath();
if (!fileSystem.delete(file, false)) {
throw new IOException("Failed to delete temporary file: " + file);
}
fileSystem.deleteFile(tempFile.getPath());
}
}
catch (IOException e) {
Expand All @@ -240,7 +239,7 @@ private void mergeFiles(Iterable<TempFile> files, Consumer<Page> consumer)

private void writeTempFile(Consumer<TempFileWriter> consumer)
{
Path tempFile = getTempFileName();
String tempFile = getTempFileName();

try (TempFileWriter writer = new TempFileWriter(types, tempFileSinkFactory.createSink(fileSystem, tempFile))) {
consumer.accept(writer);
Expand All @@ -253,36 +252,34 @@ private void writeTempFile(Consumer<TempFileWriter> consumer)
}
}

private void cleanupFile(Path file)
private void cleanupFile(String file)
{
try {
if (!fileSystem.delete(file, false)) {
throw new IOException("Delete failed");
}
fileSystem.deleteFile(file);
}
catch (IOException e) {
log.warn(e, "Failed to delete temporary file: %s", file);
}
}

private Path getTempFileName()
private String getTempFileName()
{
return new Path(tempFilePrefix + "." + nextFileId.getAndIncrement());
return tempFilePrefix + "." + nextFileId.getAndIncrement();
}

private static class TempFile
{
private final Path path;
private final String path;
private final long size;

public TempFile(Path path, long size)
public TempFile(String path, long size)
{
checkArgument(size >= 0, "size is negative");
this.path = requireNonNull(path, "path is null");
this.size = size;
}

public Path getPath()
public String getPath()
{
return path;
}
Expand All @@ -304,7 +301,7 @@ public String toString()

public interface TempFileSinkFactory
{
OrcDataSink createSink(FileSystem fileSystem, Path path)
OrcDataSink createSink(TrinoFileSystem fileSystem, String path)
throws IOException;
}
}
Loading