Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Hive write to bucketed table when native writers disabled #18672

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import static io.trino.plugin.hive.HiveFileSystemTestUtils.filterTable;
import static io.trino.plugin.hive.HiveFileSystemTestUtils.getSplitsCount;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories;
import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories;
import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
Expand Down Expand Up @@ -147,7 +146,6 @@ public S3SelectTestHelper(String host,
this.hiveConfig,
new HiveMetastoreConfig(),
HiveMetastoreFactory.ofInstance(metastoreClient),
getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment),
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
hdfsEnvironment,
hivePartitionManager,
Expand Down
109 changes: 77 additions & 32 deletions plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoOutputFile;
Copy link
Member

@losipiuk losipiuk Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make commit summary of Fix Hive write to bucketed table when native writers disabled Reverts "Use Hive native writers for creating empty bucket files" - so it is obvious this is a single commit revert.

Add context why it is reverted later in commit message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am following the pattern from #18612
The revert commit is clearly marked as such in the message, but not in the title.

Here, I didn't expand in the commit body on why I revert, because it's all in the title/summary line.

import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.CatalogName;
Expand All @@ -58,6 +59,7 @@
import io.trino.plugin.hive.statistics.HiveStatisticsProvider;
import io.trino.plugin.hive.util.HiveBucketing;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.plugin.hive.util.HiveWriteUtils;
import io.trino.plugin.hive.util.SerdeConstants;
import io.trino.spi.ErrorType;
import io.trino.spi.Page;
Expand Down Expand Up @@ -126,6 +128,8 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.mapred.JobConf;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -158,6 +162,8 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath;
import static io.trino.hdfs.ConfigurationUtils.toJobConf;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.hive.HiveAnalyzeProperties.getColumnNames;
Expand All @@ -170,6 +176,7 @@
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.trino.plugin.hive.HiveColumnHandle.mergeRowIdColumnHandle;
import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
Expand All @@ -178,6 +185,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED;
import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName;
Expand Down Expand Up @@ -275,8 +283,11 @@
import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable;
import static io.trino.plugin.hive.util.AcidTables.isTransactionalTable;
import static io.trino.plugin.hive.util.AcidTables.writeAcidVersionFile;
import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression;
import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle;
import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing;
import static io.trino.plugin.hive.util.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveClassNames.ORC_OUTPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles;
Expand All @@ -291,6 +302,7 @@
import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable;
import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues;
import static io.trino.plugin.hive.util.HiveWriteUtils.initializeSerializer;
import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery;
import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType;
import static io.trino.plugin.hive.util.RetryDriver.retry;
Expand Down Expand Up @@ -363,7 +375,6 @@ public class HiveMetadata
private final CatalogName catalogName;
private final SemiTransactionalHiveMetastore metastore;
private final boolean autoCommit;
private final Set<HiveFileWriterFactory> fileWriterFactories;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final HivePartitionManager partitionManager;
Expand Down Expand Up @@ -392,7 +403,6 @@ public HiveMetadata(
CatalogName catalogName,
SemiTransactionalHiveMetastore metastore,
boolean autoCommit,
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand Down Expand Up @@ -420,7 +430,6 @@ public HiveMetadata(
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.autoCommit = autoCommit;
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
Expand Down Expand Up @@ -1739,13 +1748,12 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates);

if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) {
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, true, partitionUpdates);
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates);
// replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback
partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets));
for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) {
Optional<Partition> partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate));
Location writePath = Location.of(partitionUpdate.getWritePath().toString());
createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames());
createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames());
}
if (handle.isTransactional()) {
AcidTransaction transaction = handle.getTransaction();
Expand Down Expand Up @@ -1824,15 +1832,19 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
private List<PartitionUpdate> computePartitionUpdatesForMissingBuckets(
ConnectorSession session,
HiveWritableTableHandle handle,
Table table,
boolean isCreateTable,
List<PartitionUpdate> partitionUpdates)
{
ImmutableList.Builder<PartitionUpdate> partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder();
HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() : handle.getPartitionStorageFormat();
for (PartitionUpdate partitionUpdate : partitionUpdates) {
int bucketCount = handle.getBucketProperty().get().getBucketCount();

List<String> fileNamesForMissingBuckets = computeFileNamesForMissingBuckets(
session,
storageFormat,
hadoopPath(partitionUpdate.getTargetPath()),
bucketCount,
isCreateTable && handle.isTransactional(),
partitionUpdate);
Expand All @@ -1851,6 +1863,8 @@ private List<PartitionUpdate> computePartitionUpdatesForMissingBuckets(

private List<String> computeFileNamesForMissingBuckets(
ConnectorSession session,
HiveStorageFormat storageFormat,
Path targetPath,
int bucketCount,
boolean transactionalCreateTable,
PartitionUpdate partitionUpdate)
Expand All @@ -1859,7 +1873,10 @@ private List<String> computeFileNamesForMissingBuckets(
// fast path for common case
return ImmutableList.of();
}

HdfsContext hdfsContext = new HdfsContext(session);
JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath));
configureCompression(conf, selectCompressionCodec(session, storageFormat));
String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat));
Set<String> fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames());
Set<Integer> bucketsWithFiles = fileNames.stream()
.map(HiveWriterFactory::getBucketFromFileName)
Expand All @@ -1870,16 +1887,21 @@ private List<String> computeFileNamesForMissingBuckets(
if (bucketsWithFiles.contains(i)) {
continue;
}
missingFileNamesBuilder.add(transactionalCreateTable
? computeTransactionalBucketedFilename(i)
: computeNonTransactionalBucketedFilename(session.getQueryId(), i));
String fileName;
if (transactionalCreateTable) {
fileName = computeTransactionalBucketedFilename(i) + fileExtension;
}
else {
fileName = computeNonTransactionalBucketedFilename(session.getQueryId(), i) + fileExtension;
}
missingFileNamesBuilder.add(fileName);
}
List<String> missingFileNames = missingFileNamesBuilder.build();
verify(fileNames.size() + missingFileNames.size() == bucketCount);
return missingFileNames;
}

private void createEmptyFiles(ConnectorSession session, Location path, Table table, Optional<Partition> partition, List<String> fileNames)
private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional<Partition> partition, List<String> fileNames)
{
Properties schema;
StorageFormat format;
Expand All @@ -1892,24 +1914,48 @@ private void createEmptyFiles(ConnectorSession session, Location path, Table tab
format = table.getStorage().getStorageFormat();
}

for (String fileName : fileNames) {
Location location = path.appendPath(fileName);
fileWriterFactories.stream()
.map(factory -> factory.createFileWriter(
location,
ImmutableList.of(),
format,
HiveCompressionCodec.NONE,
schema,
session,
OptionalInt.empty(),
NO_ACID_TRANSACTION,
false,
WriterKind.INSERT))
.flatMap(Optional::stream)
.findFirst()
.orElseThrow(() -> new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Writing not supported for " + format))
.commit();
HiveCompressionCodec compression = selectCompressionCodec(session, format);
if (format.getOutputFormat().equals(ORC_OUTPUT_FORMAT_CLASS) && (compression == HiveCompressionCodec.ZSTD)) {
compression = HiveCompressionCodec.GZIP; // ZSTD not supported by Hive ORC writer
}
JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path));
configureCompression(conf, compression);

// for simple line-oriented formats, just create an empty file directly
if (format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS)) {
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity());
for (String fileName : fileNames) {
TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(path.toString()).appendPath(fileName));
try {
// create empty file
trinoOutputFile.create().close();
}
catch (IOException e) {
throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e);
}
}
return;
}

hdfsEnvironment.doAs(session.getIdentity(), () -> {
for (String fileName : fileNames) {
writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat());
}
});
}

private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serde, String outputFormatName)
{
// Some serializers such as Avro set a property in the schema.
initializeSerializer(conf, properties, serde);

// The code below is not a try with resources because RecordWriter is not Closeable.
FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session);
try {
recordWriter.close(false);
}
catch (IOException e) {
throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e);
}
}

Expand Down Expand Up @@ -2097,7 +2143,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc
}

if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) {
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, false, partitionUpdates);
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates);
// replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback
partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets));
for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) {
Expand All @@ -2114,8 +2160,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc
statistics,
handle.isRetriesEnabled());
}
Location writePath = Location.of(partitionUpdate.getWritePath().toString());
createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames());
createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -57,7 +56,6 @@ public class HiveMetadataFactory
private final boolean hideDeltaLakeTables;
private final long perTransactionCacheMaximumSize;
private final HiveMetastoreFactory metastoreFactory;
private final Set<HiveFileWriterFactory> fileWriterFactories;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsEnvironment hdfsEnvironment;
private final HivePartitionManager partitionManager;
Expand Down Expand Up @@ -88,7 +86,6 @@ public HiveMetadataFactory(
HiveConfig hiveConfig,
HiveMetastoreConfig hiveMetastoreConfig,
HiveMetastoreFactory metastoreFactory,
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand All @@ -111,7 +108,6 @@ public HiveMetadataFactory(
this(
catalogName,
metastoreFactory,
fileWriterFactories,
fileSystemFactory,
hdfsEnvironment,
partitionManager,
Expand Down Expand Up @@ -150,7 +146,6 @@ public HiveMetadataFactory(
public HiveMetadataFactory(
CatalogName catalogName,
HiveMetastoreFactory metastoreFactory,
Set<HiveFileWriterFactory> fileWriterFactories,
TrinoFileSystemFactory fileSystemFactory,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand Down Expand Up @@ -197,7 +192,6 @@ public HiveMetadataFactory(
this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize;

this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null"));
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
Expand Down Expand Up @@ -254,7 +248,6 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
catalogName,
metastore,
autoCommit,
fileWriterFactories,
fileSystemFactory,
hdfsEnvironment,
partitionManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,6 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas
metadataFactory = new HiveMetadataFactory(
new CatalogName("hive"),
HiveMetastoreFactory.ofInstance(metastoreClient),
getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment),
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
hdfsEnvironment,
partitionManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
config,
new HiveMetastoreConfig(),
HiveMetastoreFactory.ofInstance(metastoreClient),
getDefaultHiveFileWriterFactories(config, hdfsEnvironment),
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
hdfsEnvironment,
hivePartitionManager,
Expand Down
Loading