From cb9b476ced841899371c5922295cb17b703f980e Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 14 Aug 2023 13:28:37 +0200 Subject: [PATCH 1/2] Fix Hive write to bucketed table when native writers disabled This reverts commit a794b32a7846a555e1d5da9697cea6e54681b578. --- .../hive/s3select/S3SelectTestHelper.java | 2 - .../io/trino/plugin/hive/HiveMetadata.java | 109 +++++++++++++----- .../plugin/hive/HiveMetadataFactory.java | 7 -- .../trino/plugin/hive/AbstractTestHive.java | 1 - .../hive/AbstractTestHiveFileSystem.java | 1 - .../plugin/hive/BaseHiveConnectorTest.java | 16 ++- 6 files changed, 92 insertions(+), 44 deletions(-) diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java index 00a8d48dbe0f..a6ef12f7c602 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/S3SelectTestHelper.java @@ -75,7 +75,6 @@ import static io.trino.plugin.hive.HiveFileSystemTestUtils.filterTable; import static io.trino.plugin.hive.HiveFileSystemTestUtils.getSplitsCount; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; -import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; @@ -147,7 +146,6 @@ public S3SelectTestHelper(String host, this.hiveConfig, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), - getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 943b40d8be29..9783b9f87fac 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -32,6 +32,7 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoOutputFile; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.CatalogName; @@ -58,6 +59,7 @@ import io.trino.plugin.hive.statistics.HiveStatisticsProvider; import io.trino.plugin.hive.util.HiveBucketing; import io.trino.plugin.hive.util.HiveUtil; +import io.trino.plugin.hive.util.HiveWriteUtils; import io.trino.plugin.hive.util.SerdeConstants; import io.trino.spi.ErrorType; import io.trino.spi.Page; @@ -126,6 +128,8 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.mapred.JobConf; import java.io.FileNotFoundException; import java.io.IOException; @@ -158,6 +162,8 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath; +import static io.trino.hdfs.ConfigurationUtils.toJobConf; import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.hive.HiveAnalyzeProperties.getColumnNames; @@ -170,6 +176,7 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveColumnHandle.mergeRowIdColumnHandle; +import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; import static io.trino.plugin.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; @@ -178,6 +185,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED; import static io.trino.plugin.hive.HiveSessionProperties.getDeltaLakeCatalogName; @@ -275,8 +283,11 @@ import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable; import static io.trino.plugin.hive.util.AcidTables.isTransactionalTable; import static io.trino.plugin.hive.util.AcidTables.writeAcidVersionFile; +import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression; import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle; import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing; +import static io.trino.plugin.hive.util.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS; +import static io.trino.plugin.hive.util.HiveClassNames.ORC_OUTPUT_FORMAT_CLASS; import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles; @@ -291,6 +302,7 @@ import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported; import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable; import static io.trino.plugin.hive.util.HiveWriteUtils.createPartitionValues; +import static io.trino.plugin.hive.util.HiveWriteUtils.initializeSerializer; import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType; import static io.trino.plugin.hive.util.RetryDriver.retry; @@ -363,7 +375,6 @@ public class HiveMetadata private final CatalogName catalogName; private final SemiTransactionalHiveMetastore metastore; private final boolean autoCommit; - private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -392,7 +403,6 @@ public HiveMetadata( CatalogName catalogName, SemiTransactionalHiveMetastore metastore, boolean autoCommit, - Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -420,7 +430,6 @@ public HiveMetadata( this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.autoCommit = autoCommit; - this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -1739,13 +1748,12 @@ public Optional finishCreateTable(ConnectorSession sess partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, true, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { Optional partition = table.getPartitionColumns().isEmpty() ? Optional.empty() : Optional.of(buildPartitionObject(session, table, partitionUpdate)); - Location writePath = Location.of(partitionUpdate.getWritePath().toString()); - createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); + createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames()); } if (handle.isTransactional()) { AcidTransaction transaction = handle.getTransaction(); @@ -1824,15 +1832,19 @@ public Optional finishCreateTable(ConnectorSession sess private List computePartitionUpdatesForMissingBuckets( ConnectorSession session, HiveWritableTableHandle handle, + Table table, boolean isCreateTable, List partitionUpdates) { ImmutableList.Builder partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder(); + HiveStorageFormat storageFormat = table.getPartitionColumns().isEmpty() ? handle.getTableStorageFormat() : handle.getPartitionStorageFormat(); for (PartitionUpdate partitionUpdate : partitionUpdates) { int bucketCount = handle.getBucketProperty().get().getBucketCount(); List fileNamesForMissingBuckets = computeFileNamesForMissingBuckets( session, + storageFormat, + hadoopPath(partitionUpdate.getTargetPath()), bucketCount, isCreateTable && handle.isTransactional(), partitionUpdate); @@ -1851,6 +1863,8 @@ private List computePartitionUpdatesForMissingBuckets( private List computeFileNamesForMissingBuckets( ConnectorSession session, + HiveStorageFormat storageFormat, + Path targetPath, int bucketCount, boolean transactionalCreateTable, PartitionUpdate partitionUpdate) @@ -1859,7 +1873,10 @@ private List computeFileNamesForMissingBuckets( // fast path for common case return ImmutableList.of(); } - + HdfsContext hdfsContext = new HdfsContext(session); + JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, targetPath)); + configureCompression(conf, selectCompressionCodec(session, storageFormat)); + String fileExtension = HiveWriterFactory.getFileExtension(conf, fromHiveStorageFormat(storageFormat)); Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); Set bucketsWithFiles = fileNames.stream() .map(HiveWriterFactory::getBucketFromFileName) @@ -1870,16 +1887,21 @@ private List computeFileNamesForMissingBuckets( if (bucketsWithFiles.contains(i)) { continue; } - missingFileNamesBuilder.add(transactionalCreateTable - ? computeTransactionalBucketedFilename(i) - : computeNonTransactionalBucketedFilename(session.getQueryId(), i)); + String fileName; + if (transactionalCreateTable) { + fileName = computeTransactionalBucketedFilename(i) + fileExtension; + } + else { + fileName = computeNonTransactionalBucketedFilename(session.getQueryId(), i) + fileExtension; + } + missingFileNamesBuilder.add(fileName); } List missingFileNames = missingFileNamesBuilder.build(); verify(fileNames.size() + missingFileNames.size() == bucketCount); return missingFileNames; } - private void createEmptyFiles(ConnectorSession session, Location path, Table table, Optional partition, List fileNames) + private void createEmptyFiles(ConnectorSession session, Path path, Table table, Optional partition, List fileNames) { Properties schema; StorageFormat format; @@ -1892,24 +1914,48 @@ private void createEmptyFiles(ConnectorSession session, Location path, Table tab format = table.getStorage().getStorageFormat(); } - for (String fileName : fileNames) { - Location location = path.appendPath(fileName); - fileWriterFactories.stream() - .map(factory -> factory.createFileWriter( - location, - ImmutableList.of(), - format, - HiveCompressionCodec.NONE, - schema, - session, - OptionalInt.empty(), - NO_ACID_TRANSACTION, - false, - WriterKind.INSERT)) - .flatMap(Optional::stream) - .findFirst() - .orElseThrow(() -> new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Writing not supported for " + format)) - .commit(); + HiveCompressionCodec compression = selectCompressionCodec(session, format); + if (format.getOutputFormat().equals(ORC_OUTPUT_FORMAT_CLASS) && (compression == HiveCompressionCodec.ZSTD)) { + compression = HiveCompressionCodec.GZIP; // ZSTD not supported by Hive ORC writer + } + JobConf conf = toJobConf(hdfsEnvironment.getConfiguration(new HdfsContext(session), path)); + configureCompression(conf, compression); + + // for simple line-oriented formats, just create an empty file directly + if (format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS)) { + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity()); + for (String fileName : fileNames) { + TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(path.toString()).appendPath(fileName)); + try { + // create empty file + trinoOutputFile.create().close(); + } + catch (IOException e) { + throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); + } + } + return; + } + + hdfsEnvironment.doAs(session.getIdentity(), () -> { + for (String fileName : fileNames) { + writeEmptyFile(session, new Path(path, fileName), conf, schema, format.getSerde(), format.getOutputFormat()); + } + }); + } + + private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serde, String outputFormatName) + { + // Some serializers such as Avro set a property in the schema. + initializeSerializer(conf, properties, serde); + + // The code below is not a try with resources because RecordWriter is not Closeable. + FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session); + try { + recordWriter.close(false); + } + catch (IOException e) { + throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e); } } @@ -2097,7 +2143,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc } if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, false, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { @@ -2114,8 +2160,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc statistics, handle.isRetriesEnabled()); } - Location writePath = Location.of(partitionUpdate.getWritePath().toString()); - createEmptyFiles(session, writePath, table, partition, partitionUpdate.getFileNames()); + createEmptyFiles(session, hadoopPath(partitionUpdate.getWritePath()), table, partition, partitionUpdate.getFileNames()); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index 991fdd36310b..4419261eb5e7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hive; -import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.concurrent.BoundedExecutor; import io.airlift.json.JsonCodec; @@ -57,7 +56,6 @@ public class HiveMetadataFactory private final boolean hideDeltaLakeTables; private final long perTransactionCacheMaximumSize; private final HiveMetastoreFactory metastoreFactory; - private final Set fileWriterFactories; private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final HivePartitionManager partitionManager; @@ -88,7 +86,6 @@ public HiveMetadataFactory( HiveConfig hiveConfig, HiveMetastoreConfig hiveMetastoreConfig, HiveMetastoreFactory metastoreFactory, - Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -111,7 +108,6 @@ public HiveMetadataFactory( this( catalogName, metastoreFactory, - fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, @@ -150,7 +146,6 @@ public HiveMetadataFactory( public HiveMetadataFactory( CatalogName catalogName, HiveMetastoreFactory metastoreFactory, - Set fileWriterFactories, TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, @@ -197,7 +192,6 @@ public HiveMetadataFactory( this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize; this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); - this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); @@ -254,7 +248,6 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm catalogName, metastore, autoCommit, - fileWriterFactories, fileSystemFactory, hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index c90eed86a59c..7855fb66848d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -834,7 +834,6 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas metadataFactory = new HiveMetadataFactory( new CatalogName("hive"), HiveMetastoreFactory.ofInstance(metastoreClient), - getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, partitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index e22c4e290c3d..01d42a2ceeda 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -213,7 +213,6 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec config, new HiveMetastoreConfig(), HiveMetastoreFactory.ofInstance(metastoreClient), - getDefaultHiveFileWriterFactories(config, hdfsEnvironment), new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hdfsEnvironment, hivePartitionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 9a436a92b8f0..e671dd5ca5a5 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -2135,7 +2135,21 @@ public void testEmptyBucketedTable() private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat) { - testEmptyBucketedTable(session, storageFormat, true); + for (HiveCompressionCodec compressionCodec : HiveCompressionCodec.values()) { + if ((storageFormat == HiveStorageFormat.AVRO) && (compressionCodec == HiveCompressionCodec.LZ4)) { + continue; + } + if ((storageFormat == HiveStorageFormat.PARQUET) && (compressionCodec == HiveCompressionCodec.LZ4)) { + // TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer + continue; + } + testEmptyBucketedTable( + Session.builder(session) + .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "compression_codec", compressionCodec.name()) + .build(), + storageFormat, + true); + } testEmptyBucketedTable(session, storageFormat, false); } From 2f1e8ecf7576691fa13a320241ad511a6282843d Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 14 Aug 2023 12:43:01 +0200 Subject: [PATCH 2/2] Add Hive coverage with native format session toggles `TestHiveConnectorTest.testWithAllStorageFormats` is supposed to exercise format configurations, including different reader/writer implementations. For example it exercised two Parquet writers until 13392860aad16167c1b93d9275ceee270b3287c8 when one was removed. This commit adds same improved test coverage for other file formats which have two execution paths. --- .../plugin/hive/BaseHiveConnectorTest.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index e671dd5ca5a5..978579d5f158 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -8822,7 +8822,25 @@ private List getAllTestingHiveStorageFormat() // REGEX format is read-only continue; } - formats.add(new TestingHiveStorageFormat(getSession(), hiveStorageFormat)); + + Session defaultSession = getSession(); + String catalogName = defaultSession.getCatalog().orElseThrow(); + for (boolean enabled : List.of(true, false)) { + Session session = Session.builder(defaultSession) + .setCatalogSessionProperty(catalogName, "avro_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "avro_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "csv_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "csv_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "json_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "json_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "regex_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "text_file_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "text_file_native_writer_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "sequence_file_native_reader_enabled", Boolean.toString(enabled)) + .setCatalogSessionProperty(catalogName, "sequence_file_native_writer_enabled", Boolean.toString(enabled)) + .build(); + formats.add(new TestingHiveStorageFormat(session, hiveStorageFormat)); + } } return formats.build(); }