Skip to content

Commit

Permalink
Use Hive native writers for creating empty bucket files
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jul 14, 2023
1 parent 462c5dc commit a794b32
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static io.trino.plugin.hive.HiveFileSystemTestUtils.filterTable;
import static io.trino.plugin.hive.HiveFileSystemTestUtils.getSplitsCount;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories;
import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories;
import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
Expand Down Expand Up @@ -146,6 +147,7 @@ public S3SelectTestHelper(String host,
this.hiveConfig,
new HiveMetastoreConfig(),
HiveMetastoreFactory.ofInstance(metastoreClient),
getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment),
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
hdfsEnvironment,
hivePartitionManager,
Expand Down
109 changes: 32 additions & 77 deletions plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.CatalogName;
Expand All @@ -58,7 +56,6 @@
import io.trino.plugin.hive.statistics.HiveStatisticsProvider;
import io.trino.plugin.hive.util.HiveBucketing;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.plugin.hive.util.HiveWriteUtils;
import io.trino.plugin.hive.util.SerdeConstants;
import io.trino.spi.ErrorType;
import io.trino.spi.Page;
Expand Down Expand Up @@ -130,8 +127,6 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.mapred.JobConf;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -166,7 +161,6 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.hdfs.ConfigurationUtils.toJobConf;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.hive.HiveAnalyzeProperties.getColumnNames;
Expand All @@ -179,7 +173,6 @@
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.trino.plugin.hive.HiveColumnHandle.mergeRowIdColumnHandle;
import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
Expand All @@ -188,7 +181,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED;
import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName;
Expand Down Expand Up @@ -284,11 +276,8 @@
import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable;
import static io.trino.plugin.hive.util.AcidTables.isTransactionalTable;
import static io.trino.plugin.hive.util.AcidTables.writeAcidVersionFile;
import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression;
import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle;
import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing;
import static io.trino.plugin.hive.util.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveClassNames.ORC_OUTPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles;
Expand All @@ -304,7 +293,6 @@
import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkedDelete;
import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues;
import static io.trino.plugin.hive.util.HiveWriteUtils.initializeSerializer;
import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery;
import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem;
import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType;
Expand Down Expand Up @@ -378,6 +366,7 @@ public class HiveMetadata
private final CatalogName catalogName;
private final SemiTransactionalHiveMetastore metastore;
private final boolean autoCommit;
private final Set<HiveFileWriterFactory> fileWriterFactories;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final HivePartitionManager partitionManager;
Expand Down Expand Up @@ -406,6 +395,7 @@ public HiveMetadata(
CatalogName catalogName,
SemiTransactionalHiveMetastore metastore,
boolean autoCommit,
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand Down Expand Up @@ -433,6 +423,7 @@ public HiveMetadata(
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.autoCommit = autoCommit;
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
Expand Down Expand Up @@ -1742,12 +1733,13 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates);

if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) {
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates);
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, true, partitionUpdates);
// replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback
partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets));
for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) {
Optional<Partition> partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate));
createEmptyFiles(session, partitionUpdate.getWritePath(), table, partition, partitionUpdate.getFileNames());
Location writePath = Location.of(partitionUpdate.getWritePath().toString());
createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames());
}
if (handle.isTransactional()) {
AcidTransaction transaction = handle.getTransaction();
Expand Down Expand Up @@ -1826,19 +1818,15 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
private List<PartitionUpdate> computePartitionUpdatesForMissingBuckets(
ConnectorSession session,
HiveWritableTableHandle handle,
Table table,
boolean isCreateTable,
List<PartitionUpdate> partitionUpdates)
{
ImmutableList.Builder<PartitionUpdate> partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder();
HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() : handle.getPartitionStorageFormat();
for (PartitionUpdate partitionUpdate : partitionUpdates) {
int bucketCount = handle.getBucketProperty().get().getBucketCount();

List<String> fileNamesForMissingBuckets = computeFileNamesForMissingBuckets(
session,
storageFormat,
partitionUpdate.getTargetPath(),
bucketCount,
isCreateTable && handle.isTransactional(),
partitionUpdate);
Expand All @@ -1857,8 +1845,6 @@ private List<PartitionUpdate> computePartitionUpdatesForMissingBuckets(

private List<String> computeFileNamesForMissingBuckets(
ConnectorSession session,
HiveStorageFormat storageFormat,
Path targetPath,
int bucketCount,
boolean transactionalCreateTable,
PartitionUpdate partitionUpdate)
Expand All @@ -1867,10 +1853,7 @@ private List<String> computeFileNamesForMissingBuckets(
// fast path for common case
return ImmutableList.of();
}
HdfsContext hdfsContext = new HdfsContext(session);
JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath));
configureCompression(conf, selectCompressionCodec(session, storageFormat));
String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat));

Set<String> fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames());
Set<Integer> bucketsWithFiles = fileNames.stream()
.map(HiveWriterFactory::getBucketFromFileName)
Expand All @@ -1881,21 +1864,16 @@ private List<String> computeFileNamesForMissingBuckets(
if (bucketsWithFiles.contains(i)) {
continue;
}
String fileName;
if (transactionalCreateTable) {
fileName = computeTransactionalBucketedFilename(i) + fileExtension;
}
else {
fileName = computeNonTransactionalBucketedFilename(session.getQueryId(), i) + fileExtension;
}
missingFileNamesBuilder.add(fileName);
missingFileNamesBuilder.add(transactionalCreateTable
? computeTransactionalBucketedFilename(i)
: computeNonTransactionalBucketedFilename(session.getQueryId(), i));
}
List<String> missingFileNames = missingFileNamesBuilder.build();
verify(fileNames.size() + missingFileNames.size() == bucketCount);
return missingFileNames;
}

private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional<Partition> partition, List<String> fileNames)
private void createEmptyFiles(ConnectorSession session, Location path, Table table, Optional<Partition> partition, List<String> fileNames)
{
Properties schema;
StorageFormat format;
Expand All @@ -1908,48 +1886,24 @@ private void createEmptyFiles(ConnectorSession session, Path path, Table table,
format = table.getStorage().getStorageFormat();
}

HiveCompressionCodec compression = selectCompressionCodec(session, format);
if (format.getOutputFormat().equals(ORC_OUTPUT_FORMAT_CLASS) && (compression == HiveCompressionCodec.ZSTD)) {
compression = HiveCompressionCodec.GZIP; // ZSTD not supported by Hive ORC writer
}
JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path));
configureCompression(conf, compression);

// for simple line-oriented formats, just create an empty file directly
if (format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS)) {
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity());
for (String fileName : fileNames) {
TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(path.toString()).appendPath(fileName));
try {
// create empty file
trinoOutputFile.create().close();
}
catch (IOException e) {
throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e);
}
}
return;
}

hdfsEnvironment.doAs(session.getIdentity(), () -> {
for (String fileName : fileNames) {
writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat());
}
});
}

private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serde, String outputFormatName)
{
// Some serializers such as Avro set a property in the schema.
initializeSerializer(conf, properties, serde);

// The code below is not a try with resources because RecordWriter is not Closeable.
FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session);
try {
recordWriter.close(false);
}
catch (IOException e) {
throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e);
for (String fileName : fileNames) {
Location location = path.appendPath(fileName);
fileWriterFactories.stream()
.map(factory -> factory.createFileWriter(
location,
ImmutableList.of(),
format,
HiveCompressionCodec.NONE,
schema,
session,
OptionalInt.empty(),
NO_ACID_TRANSACTION,
false,
WriterKind.INSERT))
.flatMap(Optional::stream)
.findFirst()
.orElseThrow(() -> new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Writing not supported for " + format))
.commit();
}
}

Expand Down Expand Up @@ -2137,7 +2091,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc
}

if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) {
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates);
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, false, partitionUpdates);
// replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback
partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets));
for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) {
Expand All @@ -2154,7 +2108,8 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc
statistics,
handle.isRetriesEnabled());
}
createEmptyFiles(session, partitionUpdate.getWritePath(), table, partition, partitionUpdate.getFileNames());
Location writePath = Location.of(partitionUpdate.getWritePath().toString());
createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class HiveMetadataFactory
private final boolean hideDeltaLakeTables;
private final long perTransactionCacheMaximumSize;
private final HiveMetastoreFactory metastoreFactory;
private final Set<HiveFileWriterFactory> fileWriterFactories;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final HivePartitionManager partitionManager;
Expand Down Expand Up @@ -86,6 +88,7 @@ public HiveMetadataFactory(
HiveConfig hiveConfig,
HiveMetastoreConfig hiveMetastoreConfig,
HiveMetastoreFactory metastoreFactory,
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand All @@ -108,6 +111,7 @@ public HiveMetadataFactory(
this(
catalogName,
metastoreFactory,
fileWriterFactories,
fileSystemFactory,
hdfsEnvironment,
partitionManager,
Expand Down Expand Up @@ -146,6 +150,7 @@ public HiveMetadataFactory(
public HiveMetadataFactory(
CatalogName catalogName,
HiveMetastoreFactory metastoreFactory,
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand Down Expand Up @@ -192,6 +197,7 @@ public HiveMetadataFactory(
this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize;

this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
Expand Down Expand Up @@ -248,6 +254,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
catalogName,
metastore,
autoCommit,
fileWriterFactories,
fileSystemFactory,
hdfsEnvironment,
partitionManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas
metadataFactory = new HiveMetadataFactory(
new CatalogName("hive"),
HiveMetastoreFactory.ofInstance(metastoreClient),
getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment),
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
hdfsEnvironment,
partitionManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
config,
new HiveMetastoreConfig(),
HiveMetastoreFactory.ofInstance(metastoreClient),
getDefaultHiveFileWriterFactories(config, hdfsEnvironment),
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
hdfsEnvironment,
hivePartitionManager,
Expand Down
Loading

0 comments on commit a794b32

Please sign in to comment.