From 1e64daf7598d5210711bb8c9e1fded1d3e56c6b0 Mon Sep 17 00:00:00 2001 From: Sergii Druzkin <65374769+sdruzkin@users.noreply.github.com> Date: Wed, 9 Mar 2022 14:14:22 -0800 Subject: [PATCH] Expose ORC writer compression level as session property --- .../presto/hive/HiveSessionProperties.java | 17 +++++++++++++ .../presto/hive/OrcFileWriterConfig.java | 25 ++++++++++++++++++- .../presto/hive/OrcFileWriterFactory.java | 2 ++ .../presto/hive/TestOrcFileWriterConfig.java | 23 ++++++++++++++--- 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index ae5c1f7cff8e0..4482a6791b9bc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.ThreadLocalRandom; import static com.facebook.presto.common.type.DoubleType.DOUBLE; @@ -35,6 +36,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND; import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR; import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.OVERWRITE; +import static com.facebook.presto.hive.OrcFileWriterConfig.DEFAULT_COMPRESSION_LEVEL; import static com.facebook.presto.hive.metastore.MetastoreUtil.METASTORE_HEADERS; import static com.facebook.presto.hive.metastore.MetastoreUtil.USER_DEFINED_TYPE_ENCODING_ENABLED; import static com.facebook.presto.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; @@ -69,6 +71,7 @@ public final class HiveSessionProperties private static final String ORC_OPTIMIZED_WRITER_MAX_STRIPE_SIZE = "orc_optimized_writer_max_stripe_size"; private static final String ORC_OPTIMIZED_WRITER_MAX_STRIPE_ROWS = "orc_optimized_writer_max_stripe_rows"; private static final String ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY = "orc_optimized_writer_max_dictionary_memory"; + private static final String ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL = "orc_optimized_writer_compression_level"; private static final String PAGEFILE_WRITER_MAX_STRIPE_SIZE = "pagefile_writer_max_stripe_size"; public static final String HIVE_STORAGE_FORMAT = "hive_storage_format"; private static final String COMPRESSION_CODEC = "compression_codec"; @@ -288,6 +291,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon "Experimental: ORC: Max dictionary memory", orcFileWriterConfig.getDictionaryMaxMemory(), false), + integerProperty( + ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL, + "Experimental: ORC: Compression level, works only for ZSTD and ZLIB compression kinds", + orcFileWriterConfig.getCompressionLevel(), + false), dataSizeSessionProperty( PAGEFILE_WRITER_MAX_STRIPE_SIZE, "PAGEFILE: Max stripe size", @@ -811,6 +819,15 @@ public static DataSize getOrcOptimizedWriterMaxDictionaryMemory(ConnectorSession return session.getProperty(ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY, DataSize.class); } + public static OptionalInt getCompressionLevel(ConnectorSession session) + { + int value = session.getProperty(ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL, Integer.class); + if (value != DEFAULT_COMPRESSION_LEVEL) { + return OptionalInt.of(value); + } + return OptionalInt.empty(); + } + public static DataSize getPageFileStripeMaxSize(ConnectorSession session) { return session.getProperty(PAGEFILE_WRITER_MAX_STRIPE_SIZE, DataSize.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterConfig.java index c9c25c3b3e5fc..88006b8be7cfa 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterConfig.java @@ -22,6 +22,8 @@ import javax.validation.constraints.NotNull; +import java.util.OptionalInt; + import static com.facebook.presto.hive.OrcFileWriterConfig.StreamLayoutType.BY_COLUMN_SIZE; @SuppressWarnings("unused") @@ -33,6 +35,8 @@ public enum StreamLayoutType BY_COLUMN_SIZE, } + public static final int DEFAULT_COMPRESSION_LEVEL = Integer.MIN_VALUE; + private DataSize stripeMinSize = DefaultOrcWriterFlushPolicy.DEFAULT_STRIPE_MIN_SIZE; private DataSize stripeMaxSize = DefaultOrcWriterFlushPolicy.DEFAULT_STRIPE_MAX_SIZE; private int stripeMaxRowCount = DefaultOrcWriterFlushPolicy.DEFAULT_STRIPE_MAX_ROW_COUNT; @@ -44,6 +48,7 @@ public enum StreamLayoutType private boolean isDwrfStripeCacheEnabled; private DataSize dwrfStripeCacheMaxSize = OrcWriterOptions.DEFAULT_DWRF_STRIPE_CACHE_MAX_SIZE; private DwrfStripeCacheMode dwrfStripeCacheMode = OrcWriterOptions.DEFAULT_DWRF_STRIPE_CACHE_MODE; + private int compressionLevel = DEFAULT_COMPRESSION_LEVEL; public OrcWriterOptions.Builder toOrcWriterOptionsBuilder() { @@ -53,6 +58,11 @@ public OrcWriterOptions.Builder toOrcWriterOptionsBuilder() .withStripeMaxRowCount(stripeMaxRowCount) .build(); + OptionalInt resolvedCompressionLevel = OptionalInt.empty(); + if (compressionLevel != DEFAULT_COMPRESSION_LEVEL) { + resolvedCompressionLevel = OptionalInt.of(compressionLevel); + } + // Give separate copy to callers for isolation. return OrcWriterOptions.builder() .withFlushPolicy(flushPolicy) @@ -63,7 +73,8 @@ public OrcWriterOptions.Builder toOrcWriterOptionsBuilder() .withStreamLayoutFactory(getStreamLayoutFactory(streamLayoutType)) .withDwrfStripeCacheEnabled(isDwrfStripeCacheEnabled) .withDwrfStripeCacheMaxSize(dwrfStripeCacheMaxSize) - .withDwrfStripeCacheMode(dwrfStripeCacheMode); + .withDwrfStripeCacheMode(dwrfStripeCacheMode) + .withCompressionLevel(resolvedCompressionLevel); } @NotNull @@ -129,6 +140,18 @@ public OrcFileWriterConfig setDictionaryMaxMemory(DataSize dictionaryMaxMemory) return this; } + public int getCompressionLevel() + { + return compressionLevel; + } + + @Config("hive.orc.writer.compression-level") + public OrcFileWriterConfig setCompressionLevel(int compressionLevel) + { + this.compressionLevel = compressionLevel; + return this; + } + @NotNull public DataSize getStringStatisticsLimit() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterFactory.java index fa706682478d6..aa3c2da38a3c4 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/OrcFileWriterFactory.java @@ -60,6 +60,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED; +import static com.facebook.presto.hive.HiveSessionProperties.getCompressionLevel; import static com.facebook.presto.hive.HiveSessionProperties.getDwrfWriterStripeCacheMaxSize; import static com.facebook.presto.hive.HiveSessionProperties.getOrcMaxBufferSize; import static com.facebook.presto.hive.HiveSessionProperties.getOrcMaxMergeDistance; @@ -235,6 +236,7 @@ else if (com.facebook.hive.orc.OrcOutputFormat.class.getName().equals(storageFor .withIgnoreDictionaryRowGroupSizes(isExecutionBasedMemoryAccountingEnabled(session)) .withDwrfStripeCacheEnabled(isDwrfWriterStripeCacheEnabled(session)) .withDwrfStripeCacheMaxSize(getDwrfWriterStripeCacheMaxSize(session)) + .withCompressionLevel(getCompressionLevel(session)) .build(), fileInputColumnIndexes, ImmutableMap.builder() diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcFileWriterConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcFileWriterConfig.java index ca2cdb35a2fe7..10efaa8a5338b 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcFileWriterConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcFileWriterConfig.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; @@ -58,7 +59,8 @@ public void testDefaults() .setStreamLayoutType(BY_COLUMN_SIZE) .setDwrfStripeCacheEnabled(false) .setDwrfStripeCacheMaxSize(new DataSize(8, MEGABYTE)) - .setDwrfStripeCacheMode(INDEX_AND_FOOTER)); + .setDwrfStripeCacheMode(INDEX_AND_FOOTER) + .setCompressionLevel(Integer.MIN_VALUE)); } @Test @@ -76,6 +78,7 @@ public void testExplicitPropertyMappings() .put("hive.orc.writer.dwrf-stripe-cache-enabled", "true") .put("hive.orc.writer.dwrf-stripe-cache-max-size", "10MB") .put("hive.orc.writer.dwrf-stripe-cache-mode", "FOOTER") + .put("hive.orc.writer.compression-level", "5") .build(); OrcFileWriterConfig expected = new OrcFileWriterConfig() @@ -89,7 +92,8 @@ public void testExplicitPropertyMappings() .setStreamLayoutType(BY_STREAM_SIZE) .setDwrfStripeCacheEnabled(true) .setDwrfStripeCacheMaxSize(new DataSize(10, MEGABYTE)) - .setDwrfStripeCacheMode(FOOTER); + .setDwrfStripeCacheMode(FOOTER) + .setCompressionLevel(5); assertFullMapping(properties, expected); } @@ -115,6 +119,7 @@ public void testOrcWriterOptionsBuilder() StreamLayoutType streamLayoutType = BY_STREAM_SIZE; DataSize dwrfStripeCacheMaxSize = new DataSize(4, MEGABYTE); DwrfStripeCacheMode dwrfStripeCacheMode = INDEX; + int compressionLevel = 5; OrcFileWriterConfig config = new OrcFileWriterConfig() .setStripeMinSize(stripeMinSize) @@ -127,7 +132,8 @@ public void testOrcWriterOptionsBuilder() .setStreamLayoutType(streamLayoutType) .setDwrfStripeCacheEnabled(false) .setDwrfStripeCacheMaxSize(dwrfStripeCacheMaxSize) - .setDwrfStripeCacheMode(dwrfStripeCacheMode); + .setDwrfStripeCacheMode(dwrfStripeCacheMode) + .setCompressionLevel(5); assertEquals(stripeMinSize, config.getStripeMinSize()); assertEquals(stripeMaxSize, config.getStripeMaxSize()); @@ -140,6 +146,7 @@ public void testOrcWriterOptionsBuilder() assertFalse(config.isDwrfStripeCacheEnabled()); assertEquals(dwrfStripeCacheMaxSize, config.getDwrfStripeCacheMaxSize()); assertEquals(dwrfStripeCacheMode, config.getDwrfStripeCacheMode()); + assertEquals(compressionLevel, config.getCompressionLevel()); assertNotSame(config.toOrcWriterOptionsBuilder(), config.toOrcWriterOptionsBuilder()); OrcWriterOptions options = config.toOrcWriterOptionsBuilder().build(); @@ -153,6 +160,7 @@ public void testOrcWriterOptionsBuilder() assertEquals(maxCompressionBufferSize, options.getMaxCompressionBufferSize()); assertTrue(options.getStreamLayoutFactory() instanceof StreamSizeLayoutFactory); assertEquals(Optional.empty(), options.getDwrfStripeCacheOptions()); + assertEquals(OptionalInt.of(compressionLevel), options.getCompressionLevel()); } @Test @@ -168,4 +176,13 @@ public void testStreamLayoutOption() options = config.toOrcWriterOptionsBuilder().build(); assertTrue(options.getStreamLayoutFactory() instanceof ColumnSizeLayoutFactory); } + + @Test + public void testDefaultCompressionLevel() + { + OrcFileWriterConfig config = new OrcFileWriterConfig(); + OrcWriterOptions options = config.toOrcWriterOptionsBuilder().build(); + + assertEquals(OptionalInt.empty(), options.getCompressionLevel()); + } }