diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java index 00a8d48dbe0f..a6ef12f7c602 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java @@ -75,7 +75,6 @@ import static io.trino.plugin.hive.HiveFileSystemTestUtils.filterTable; import static io.trino.plugin.hive.HiveFileSystemTestUtils.getSplitsCount; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; -import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; @@ -147,7 +146,6 @@ public S3SelectTestHelper(String host, this.hiveConfig, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), - getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 943b40d8be29..9783b9f87fac 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -32,6 +32,7 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoOutputFile; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.CatalogName; @@ -58,6 +59,7 @@ import io.trino.plugin.hive.statistics.HiveStatisticsProvider; import io.trino.plugin.hive.util.HiveBucketing; import io.trino.plugin.hive.util.HiveUtil; +import io.trino.plugin.hive.util.HiveWriteUtils; import io.trino.plugin.hive.util.SerdeConstants; import io.trino.spi.ErrorType; import io.trino.spi.Page; @@ -126,6 +128,8 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.mapred.JobConf; import java.io.FileNotFoundException; import java.io.IOException; @@ -158,6 +162,8 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath; +import static io.trino.hdfs.ConfigurationUtils.toJobConf; import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.hive.HiveAnalyzeProperties.getColumnNames; @@ -170,6 +176,7 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveColumnHandle.mergeRowIdColumnHandle; +import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; @@ -178,6 +185,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED; import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName; @@ -275,8 +283,11 @@ import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable; import static io.trino.plugin.hive.util.AcidTables.isTransactionalTable; import static io.trino.plugin.hive.util.AcidTables.writeAcidVersionFile; +import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression; import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle; import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing; +import static io.trino.plugin.hive.util.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS; +import static io.trino.plugin.hive.util.HiveClassNames.ORC_OUTPUT_FORMAT_CLASS; import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles; @@ -291,6 +302,7 @@ import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported; import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable; import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues; +import static io.trino.plugin.hive.util.HiveWriteUtils.initializeSerializer; import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType; import static io.trino.plugin.hive.util.RetryDriver.retry; @@ -363,7 +375,6 @@ public class HiveMetadata private final CatalogName catalogName; private final SemiTransactionalHiveMetastore metastore; private final boolean autoCommit; - private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -392,7 +403,6 @@ public HiveMetadata( CatalogName catalogName, SemiTransactionalHiveMetastore metastore, boolean autoCommit, - Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -420,7 +430,6 @@ public HiveMetadata( this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.autoCommit = autoCommit; - this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -1739,13 +1748,12 @@ public Optional finishCreateTable(ConnectorSession sess partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, true, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate)); - Location writePath = Location.of(partitionUpdate.getWritePath().toString()); - createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); + createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames()); } if (handle.isTransactional()) { AcidTransaction transaction = handle.getTransaction(); @@ -1824,15 +1832,19 @@ public Optional finishCreateTable(ConnectorSession sess private List computePartitionUpdatesForMissingBuckets( ConnectorSession session, HiveWritableTableHandle handle, + Table table, boolean isCreateTable, List partitionUpdates) { ImmutableList.Builder partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder(); + HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() : handle.getPartitionStorageFormat(); for (PartitionUpdate partitionUpdate : partitionUpdates) { int bucketCount = handle.getBucketProperty().get().getBucketCount(); List fileNamesForMissingBuckets = computeFileNamesForMissingBuckets( session, + storageFormat, + hadoopPath(partitionUpdate.getTargetPath()), bucketCount, isCreateTable && handle.isTransactional(), partitionUpdate); @@ -1851,6 +1863,8 @@ private List computePartitionUpdatesForMissingBuckets( private List computeFileNamesForMissingBuckets( ConnectorSession session, + HiveStorageFormat storageFormat, + Path targetPath, int bucketCount, boolean transactionalCreateTable, PartitionUpdate partitionUpdate) @@ -1859,7 +1873,10 @@ private List computeFileNamesForMissingBuckets( // fast path for common case return ImmutableList.of(); } - + HdfsContext hdfsContext = new HdfsContext(session); + JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath)); + configureCompression(conf, selectCompressionCodec(session, storageFormat)); + String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat)); Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); Set bucketsWithFiles = fileNames.stream() .map(HiveWriterFactory::getBucketFromFileName) @@ -1870,16 +1887,21 @@ private List computeFileNamesForMissingBuckets( if (bucketsWithFiles.contains(i)) { continue; } - missingFileNamesBuilder.add(transactionalCreateTable - ? computeTransactionalBucketedFilename(i) - : computeNonTransactionalBucketedFilename(session.getQueryId(), i)); + String fileName; + if (transactionalCreateTable) { + fileName = computeTransactionalBucketedFilename(i) + fileExtension; + } + else { + fileName = computeNonTransactionalBucketedFilename(session.getQueryId(), i) + fileExtension; + } + missingFileNamesBuilder.add(fileName); } List missingFileNames = missingFileNamesBuilder.build(); verify(fileNames.size() + missingFileNames.size() == bucketCount); return missingFileNames; } - private void createEmptyFiles(ConnectorSession session, Location path, Table table, Optional partition, List fileNames) + private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional partition, List fileNames) { Properties schema; StorageFormat format; @@ -1892,24 +1914,48 @@ private void createEmptyFiles(ConnectorSession session, Location path, Table tab format = table.getStorage().getStorageFormat(); } - for (String fileName : fileNames) { - Location location = path.appendPath(fileName); - fileWriterFactories.stream() - .map(factory -> factory.createFileWriter( - location, - ImmutableList.of(), - format, - HiveCompressionCodec.NONE, - schema, - session, - OptionalInt.empty(), - NO_ACID_TRANSACTION, - false, - WriterKind.INSERT)) - .flatMap(Optional::stream) - .findFirst() - .orElseThrow(() -> new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Writing not supported for " + format)) - .commit(); + HiveCompressionCodec compression = selectCompressionCodec(session, format); + if (format.getOutputFormat().equals(ORC_OUTPUT_FORMAT_CLASS) && (compression == HiveCompressionCodec.ZSTD)) { + compression = HiveCompressionCodec.GZIP; // ZSTD not supported by Hive ORC writer + } + JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); + configureCompression(conf, compression); + + // for simple line-oriented formats, just create an empty file directly + if (format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS)) { + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity()); + for (String fileName : fileNames) { + TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(path.toString()).appendPath(fileName)); + try { + // create empty file + trinoOutputFile.create().close(); + } + catch (IOException e) { + throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); + } + } + return; + } + + hdfsEnvironment.doAs(session.getIdentity(), () -> { + for (String fileName : fileNames) { + writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat()); + } + }); + } + + private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serde, String outputFormatName) + { + // Some serializers such as Avro set a property in the schema. + initializeSerializer(conf, properties, serde); + + // The code below is not a try with resources because RecordWriter is not Closeable. + FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session); + try { + recordWriter.close(false); + } + catch (IOException e) { + throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); } } @@ -2097,7 +2143,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc } if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, false, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { @@ -2114,8 +2160,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc statistics, handle.isRetriesEnabled()); } - Location writePath = Location.of(partitionUpdate.getWritePath().toString()); - createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); + createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames()); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index 991fdd36310b..4419261eb5e7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hive; -import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.concurrent.BoundedExecutor; import io.airlift.json.JsonCodec; @@ -57,7 +56,6 @@ public class HiveMetadataFactory private final boolean hideDeltaLakeTables; private final long perTransactionCacheMaximumSize; private final HiveMetastoreFactory metastoreFactory; - private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -88,7 +86,6 @@ public HiveMetadataFactory( HiveConfig hiveConfig, HiveMetastoreConfig hiveMetastoreConfig, HiveMetastoreFactory metastoreFactory, - Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -111,7 +108,6 @@ public HiveMetadataFactory( this( catalogName, metastoreFactory, - fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, @@ -150,7 +146,6 @@ public HiveMetadataFactory( public HiveMetadataFactory( CatalogName catalogName, HiveMetastoreFactory metastoreFactory, - Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -197,7 +192,6 @@ public HiveMetadataFactory( this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize; this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); - this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -254,7 +248,6 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm catalogName, metastore, autoCommit, - fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index c90eed86a59c..7855fb66848d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -834,7 +834,6 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas metadataFactory = new HiveMetadataFactory( new CatalogName("hive"), HiveMetastoreFactory.ofInstance(metastoreClient), - getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index e22c4e290c3d..01d42a2ceeda 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -213,7 +213,6 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec config, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), - getDefaultHiveFileWriterFactories(config, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 9a436a92b8f0..978579d5f158 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -2135,7 +2135,21 @@ public void testEmptyBucketedTable() private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat) { - testEmptyBucketedTable(session, storageFormat, true); + for (HiveCompressionCodec compressionCodec : HiveCompressionCodec.values()) { + if ((storageFormat == HiveStorageFormat.AVRO) && (compressionCodec == HiveCompressionCodec.LZ4)) { + continue; + } + if ((storageFormat == HiveStorageFormat.PARQUET) && (compressionCodec == HiveCompressionCodec.LZ4)) { + // TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer + continue; + } + testEmptyBucketedTable( + Session.builder(session) + .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "compression_codec", compressionCodec.name()) + .build(), + storageFormat, + true); + } testEmptyBucketedTable(session, storageFormat, false); } @@ -8808,7 +8822,25 @@ private List getAllTestingHiveStorageFormat() // REGEX format is read-only continue; } - formats.add(new TestingHiveStorageFormat(getSession(), hiveStorageFormat)); + + Session defaultSession = getSession(); + String catalogName = defaultSession.getCatalog().orElseThrow(); + for (boolean enabled : List.of(true, false)) { + Session session = Session.builder(defaultSession) + .setCatalogSessionProperty(catalogName, "avro_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "avro_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "csv_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "csv_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "json_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "json_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "regex_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "text_file_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "text_file_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "sequence_file_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "sequence_file_native_writer_enabled", Boolean.toString(enabled)) + .build(); + formats.add(new TestingHiveStorageFormat(session, hiveStorageFormat)); + } } return formats.build(); }