diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java index a6ef12f7c602..00a8d48dbe0f 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java @@ -75,6 +75,7 @@ import static io.trino.plugin.hive.HiveFileSystemTestUtils.filterTable; import static io.trino.plugin.hive.HiveFileSystemTestUtils.getSplitsCount; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; +import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; @@ -146,6 +147,7 @@ public S3SelectTestHelper(String host, this.hiveConfig, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), + getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 6475f3e9cfe5..c4aaf8c97f16 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -29,9 +29,7 @@ import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.TrinoOutputFile; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.CatalogName; @@ -58,7 +56,6 @@ import io.trino.plugin.hive.statistics.HiveStatisticsProvider; import io.trino.plugin.hive.util.HiveBucketing; import io.trino.plugin.hive.util.HiveUtil; -import io.trino.plugin.hive.util.HiveWriteUtils; import io.trino.plugin.hive.util.SerdeConstants; import io.trino.spi.ErrorType; import io.trino.spi.Page; @@ -130,8 +127,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.mapred.JobConf; import java.io.File; import java.io.IOException; @@ -166,7 +161,6 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; -import static io.trino.hdfs.ConfigurationUtils.toJobConf; import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.hive.HiveAnalyzeProperties.getColumnNames; @@ -179,7 +173,6 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveColumnHandle.mergeRowIdColumnHandle; -import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; @@ -188,7 +181,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED; import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName; @@ -284,11 +276,8 @@ import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable; import static io.trino.plugin.hive.util.AcidTables.isTransactionalTable; import static io.trino.plugin.hive.util.AcidTables.writeAcidVersionFile; -import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression; import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle; import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing; -import static io.trino.plugin.hive.util.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS; -import static io.trino.plugin.hive.util.HiveClassNames.ORC_OUTPUT_FORMAT_CLASS; import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles; @@ -304,7 +293,6 @@ import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable; import static io.trino.plugin.hive.util.HiveWriteUtils.checkedDelete; import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues; -import static io.trino.plugin.hive.util.HiveWriteUtils.initializeSerializer; import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem; import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType; @@ -378,6 +366,7 @@ public class HiveMetadata private final CatalogName catalogName; private final SemiTransactionalHiveMetastore metastore; private final boolean autoCommit; + private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -406,6 +395,7 @@ public HiveMetadata( CatalogName catalogName, SemiTransactionalHiveMetastore metastore, boolean autoCommit, + Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -433,6 +423,7 @@ public HiveMetadata( this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.autoCommit = autoCommit; + this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -1742,12 +1733,13 @@ public Optional finishCreateTable(ConnectorSession sess partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, true, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate)); - createEmptyFiles(session, partitionUpdate.getWritePath(), table, partition, partitionUpdate.getFileNames()); + Location writePath = Location.of(partitionUpdate.getWritePath().toString()); + createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); } if (handle.isTransactional()) { AcidTransaction transaction = handle.getTransaction(); @@ -1826,19 +1818,15 @@ public Optional finishCreateTable(ConnectorSession sess private List computePartitionUpdatesForMissingBuckets( ConnectorSession session, HiveWritableTableHandle handle, - Table table, boolean isCreateTable, List partitionUpdates) { ImmutableList.Builder partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder(); - HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() : handle.getPartitionStorageFormat(); for (PartitionUpdate partitionUpdate : partitionUpdates) { int bucketCount = handle.getBucketProperty().get().getBucketCount(); List fileNamesForMissingBuckets = computeFileNamesForMissingBuckets( session, - storageFormat, - partitionUpdate.getTargetPath(), bucketCount, isCreateTable && handle.isTransactional(), partitionUpdate); @@ -1857,8 +1845,6 @@ private List computePartitionUpdatesForMissingBuckets( private List computeFileNamesForMissingBuckets( ConnectorSession session, - HiveStorageFormat storageFormat, - Path targetPath, int bucketCount, boolean transactionalCreateTable, PartitionUpdate partitionUpdate) @@ -1867,10 +1853,7 @@ private List computeFileNamesForMissingBuckets( // fast path for common case return ImmutableList.of(); } - HdfsContext hdfsContext = new HdfsContext(session); - JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath)); - configureCompression(conf, selectCompressionCodec(session, storageFormat)); - String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat)); + Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); Set bucketsWithFiles = fileNames.stream() .map(HiveWriterFactory::getBucketFromFileName) @@ -1881,21 +1864,16 @@ private List computeFileNamesForMissingBuckets( if (bucketsWithFiles.contains(i)) { continue; } - String fileName; - if (transactionalCreateTable) { - fileName = computeTransactionalBucketedFilename(i) + fileExtension; - } - else { - fileName = computeNonTransactionalBucketedFilename(session.getQueryId(), i) + fileExtension; - } - missingFileNamesBuilder.add(fileName); + missingFileNamesBuilder.add(transactionalCreateTable + ? computeTransactionalBucketedFilename(i) + : computeNonTransactionalBucketedFilename(session.getQueryId(), i)); } List missingFileNames = missingFileNamesBuilder.build(); verify(fileNames.size() + missingFileNames.size() == bucketCount); return missingFileNames; } - private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional partition, List fileNames) + private void createEmptyFiles(ConnectorSession session, Location path, Table table, Optional partition, List fileNames) { Properties schema; StorageFormat format; @@ -1908,48 +1886,24 @@ private void createEmptyFiles(ConnectorSession session, Path path, Table table, format = table.getStorage().getStorageFormat(); } - HiveCompressionCodec compression = selectCompressionCodec(session, format); - if (format.getOutputFormat().equals(ORC_OUTPUT_FORMAT_CLASS) && (compression == HiveCompressionCodec.ZSTD)) { - compression = HiveCompressionCodec.GZIP; // ZSTD not supported by Hive ORC writer - } - JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); - configureCompression(conf, compression); - - // for simple line-oriented formats, just create an empty file directly - if (format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS)) { - TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity()); - for (String fileName : fileNames) { - TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(path.toString()).appendPath(fileName)); - try { - // create empty file - trinoOutputFile.create().close(); - } - catch (IOException e) { - throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); - } - } - return; - } - - hdfsEnvironment.doAs(session.getIdentity(), () -> { - for (String fileName : fileNames) { - writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat()); - } - }); - } - - private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serde, String outputFormatName) - { - // Some serializers such as Avro set a property in the schema. - initializeSerializer(conf, properties, serde); - - // The code below is not a try with resources because RecordWriter is not Closeable. - FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session); - try { - recordWriter.close(false); - } - catch (IOException e) { - throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); + for (String fileName : fileNames) { + Location location = path.appendPath(fileName); + fileWriterFactories.stream() + .map(factory -> factory.createFileWriter( + location, + ImmutableList.of(), + format, + HiveCompressionCodec.NONE, + schema, + session, + OptionalInt.empty(), + NO_ACID_TRANSACTION, + false, + WriterKind.INSERT)) + .flatMap(Optional::stream) + .findFirst() + .orElseThrow(() -> new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Writing not supported for " + format)) + .commit(); } } @@ -2137,7 +2091,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc } if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, false, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { @@ -2154,7 +2108,8 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc statistics, handle.isRetriesEnabled()); } - createEmptyFiles(session, partitionUpdate.getWritePath(), table, partition, partitionUpdate.getFileNames()); + Location writePath = Location.of(partitionUpdate.getWritePath().toString()); + createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index 4419261eb5e7..991fdd36310b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive; +import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.concurrent.BoundedExecutor; import io.airlift.json.JsonCodec; @@ -56,6 +57,7 @@ public class HiveMetadataFactory private final boolean hideDeltaLakeTables; private final long perTransactionCacheMaximumSize; private final HiveMetastoreFactory metastoreFactory; + private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -86,6 +88,7 @@ public HiveMetadataFactory( HiveConfig hiveConfig, HiveMetastoreConfig hiveMetastoreConfig, HiveMetastoreFactory metastoreFactory, + Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -108,6 +111,7 @@ public HiveMetadataFactory( this( catalogName, metastoreFactory, + fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, @@ -146,6 +150,7 @@ public HiveMetadataFactory( public HiveMetadataFactory( CatalogName catalogName, HiveMetastoreFactory metastoreFactory, + Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -192,6 +197,7 @@ public HiveMetadataFactory( this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize; this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -248,6 +254,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm catalogName, metastore, autoCommit, + fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 6fdaaa4729a4..59cfe7403c11 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -25,7 +25,7 @@ import io.trino.hdfs.TrinoFileSystemCache; import io.trino.hdfs.TrinoFileSystemCacheStats; import io.trino.plugin.base.CatalogName; -import io.trino.plugin.hive.avro.AvroHiveFileWriterFactory; +import io.trino.plugin.hive.avro.AvroFileWriterFactory; import io.trino.plugin.hive.avro.AvroHivePageSourceFactory; import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; @@ -161,7 +161,7 @@ public void configure(Binder binder) fileWriterFactoryBinder.addBinding().to(SimpleSequenceFileWriterFactory.class).in(Scopes.SINGLETON); fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON); fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON); - fileWriterFactoryBinder.addBinding().to(AvroHiveFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(AvroFileWriterFactory.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(ParquetReaderConfig.class); configBinder(binder).bindConfig(ParquetWriterConfig.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroFileWriterFactory.java similarity index 98% rename from plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileWriterFactory.java rename to plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroFileWriterFactory.java index b7a6347fc934..8b3d420862fe 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroFileWriterFactory.java @@ -55,7 +55,7 @@ import static io.trino.plugin.hive.util.HiveUtil.getColumnTypes; import static java.util.Objects.requireNonNull; -public class AvroHiveFileWriterFactory +public class AvroFileWriterFactory implements HiveFileWriterFactory { private final TrinoFileSystemFactory fileSystemFactory; @@ -63,7 +63,7 @@ public class AvroHiveFileWriterFactory private final NodeVersion nodeVersion; @Inject - public AvroHiveFileWriterFactory( + public AvroFileWriterFactory( TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager, NodeVersion nodeVersion) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 06cb66a91c2f..38fc963b1625 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -833,6 +833,7 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas metadataFactory = new HiveMetadataFactory( new CatalogName("hive"), HiveMetastoreFactory.ofInstance(metastoreClient), + getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 2d6011c376d4..14a0b70da56a 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -211,6 +211,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec config, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), + getDefaultHiveFileWriterFactories(config, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index a1c6efef140a..4420eb0fc366 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -2136,21 +2136,12 @@ public void testEmptyBucketedTable() // REGEX format is readonly continue; } - for (HiveCompressionCodec compressionCodec : HiveCompressionCodec.values()) { - if ((storageFormat == HiveStorageFormat.AVRO) && (compressionCodec == HiveCompressionCodec.LZ4)) { - continue; - } - if ((storageFormat == HiveStorageFormat.PARQUET) && (compressionCodec == HiveCompressionCodec.LZ4)) { - // TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer - continue; - } - testEmptyBucketedTable(storageFormat, compressionCodec, true); - } - testEmptyBucketedTable(storageFormat, HiveCompressionCodec.GZIP, false); + testEmptyBucketedTable(storageFormat, true); + testEmptyBucketedTable(storageFormat, false); } } - private void testEmptyBucketedTable(HiveStorageFormat storageFormat, HiveCompressionCodec compressionCodec, boolean createEmpty) + private void testEmptyBucketedTable(HiveStorageFormat storageFormat, boolean createEmpty) { String tableName = "test_empty_bucketed_table"; @@ -2178,7 +2169,6 @@ private void testEmptyBucketedTable(HiveStorageFormat storageFormat, HiveCompres Session session = Session.builder(getSession()) .setSystemProperty("task_writer_count", "4") .setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty)) - .setCatalogSessionProperty(catalog, "compression_codec", compressionCodec.name()) .build(); assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('a0', 'b0', 'c0')", 1); assertUpdate(session, "INSERT INTO " + tableName + " VALUES ('a1', 'b1', 'c1')", 1); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index cd1dc6f20123..0d32453ad86f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -24,7 +24,7 @@ import io.trino.hive.formats.compression.CompressionKind; import io.trino.orc.OrcReaderOptions; import io.trino.orc.OrcWriterOptions; -import io.trino.plugin.hive.avro.AvroHiveFileWriterFactory; +import io.trino.plugin.hive.avro.AvroFileWriterFactory; import io.trino.plugin.hive.avro.AvroHivePageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; @@ -434,7 +434,7 @@ public void testAvro(int rowCount, long fileSizePadding) .withColumns(getTestColumnsSupportedByAvro()) .withRowsCount(rowCount) .withFileSizePadding(fileSizePadding) - .withFileWriterFactory(new AvroHiveFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test_version"))) + .withFileWriterFactory(new AvroFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test_version"))) .isReadableByPageSource(new AvroHivePageSourceFactory(FILE_SYSTEM_FACTORY, STATS)) .isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT)); } @@ -599,7 +599,7 @@ public void testTruncateVarcharColumn() assertThatFileFormat(AVRO) .withWriteColumns(ImmutableList.of(writeColumn)) .withReadColumns(ImmutableList.of(readColumn)) - .withFileWriterFactory(new AvroHiveFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test_version"))) + .withFileWriterFactory(new AvroFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test_version"))) .isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT)) .isReadableByPageSource(new AvroHivePageSourceFactory(FILE_SYSTEM_FACTORY, STATS)); @@ -638,7 +638,7 @@ public void testAvroProjectedColumns(int rowCount) .withWriteColumns(writeColumns) .withReadColumns(readColumns) .withRowsCount(rowCount) - .withFileWriterFactory(new AvroHiveFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test_version"))) + .withFileWriterFactory(new AvroFileWriterFactory(FILE_SYSTEM_FACTORY, TESTING_TYPE_MANAGER, new NodeVersion("test_version"))) .isReadableByRecordCursorPageSource(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT)) .isReadableByPageSource(new AvroHivePageSourceFactory(FILE_SYSTEM_FACTORY, STATS)); }