Skip to content

Commit

Permalink
Remove support for legacy Hive Parquet writer
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jul 26, 2023
1 parent 7991527 commit 1339286
Show file tree
Hide file tree
Showing 12 changed files with 22 additions and 151 deletions.
11 changes: 2 additions & 9 deletions docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1666,15 +1666,8 @@ with Parquet files performed by the Hive connector.
optimized parquet reader by default. The equivalent catalog session
property is ``parquet_optimized_reader_enabled``.
- ``true``
* - ``parquet.optimized-writer.enabled``
- Whether the optimized writer is used when writing Parquet files.
Set this property to ``false`` to disable the optimized parquet writer by
default. The equivalent catalog session property is
``parquet_optimized_writer_enabled``.
- ``true``
* - ``parquet.optimized-writer.validation-percentage``
- Percentage of parquet files to validate after write by re-reading the whole file
when ``parquet.optimized-writer.enabled`` is set to ``true``.
* - ``parquet.writer.validation-percentage``
- Percentage of Parquet files to validate after write by re-reading the whole file.
The equivalent catalog session property is ``parquet_optimized_writer_validation_percentage``.
Validation can be turned off by setting this property to ``0``.
- ``5``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public final class HiveSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TIMESTAMP_PRECISION = "timestamp_precision";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled";
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private static final String OPTIMIZE_SYMLINK_LISTING = "optimize_symlink_listing";
private static final String HIVE_VIEWS_LEGACY_TRANSLATION = "hive_views_legacy_translation";
Expand Down Expand Up @@ -572,11 +571,6 @@ public HiveSessionProperties(
HiveTimestampPrecision.class,
hiveConfig.getTimestampPrecision(),
false),
booleanProperty(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false),
durationProperty(
DYNAMIC_FILTERING_WAIT_TIMEOUT,
"Duration to wait for completion of dynamic filters during split generation",
Expand Down Expand Up @@ -1002,11 +996,6 @@ public static HiveTimestampPrecision getTimestampPrecision(ConnectorSession sess
return session.getProperty(TIMESTAMP_PRECISION, HiveTimestampPrecision.class);
}

public static boolean isParquetOptimizedWriterEnabled(ConnectorSession session)
{
return session.getProperty(PARQUET_OPTIMIZED_WRITER_ENABLED, Boolean.class);
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ public Optional<FileWriter> createFileWriter(
boolean useAcidSchema,
WriterKind writerKind)
{
if (!HiveSessionProperties.isParquetOptimizedWriterEnabled(session)) {
return Optional.empty();
}

if (!MAPRED_PARQUET_OUTPUT_FORMAT_CLASS.equals(storageFormat.getOutputFormat())) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
Expand All @@ -24,14 +25,17 @@
import jakarta.validation.constraints.DecimalMin;
import org.apache.parquet.hadoop.ParquetWriter;

@DefunctConfig({
"hive.parquet.optimized-writer.enabled",
"parquet.experimental-optimized-writer.enabled",
"parquet.optimized-writer.enabled",
})
public class ParquetWriterConfig
{
public static final String PARQUET_WRITER_MAX_BLOCK_SIZE = "2GB";
public static final String PARQUET_WRITER_MIN_PAGE_SIZE = "8kB";
public static final String PARQUET_WRITER_MAX_PAGE_SIZE = "8MB";

private boolean parquetOptimizedWriterEnabled = true;

private DataSize blockSize = DataSize.ofBytes(ParquetWriter.DEFAULT_BLOCK_SIZE);
private DataSize pageSize = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE);
private int batchSize = ParquetWriterOptions.DEFAULT_BATCH_SIZE;
Expand Down Expand Up @@ -66,20 +70,6 @@ public ParquetWriterConfig setPageSize(DataSize pageSize)
return this;
}

public boolean isParquetOptimizedWriterEnabled()
{
return parquetOptimizedWriterEnabled;
}

@Config("parquet.optimized-writer.enabled")
@LegacyConfig({"hive.parquet.optimized-writer.enabled", "parquet.experimental-optimized-writer.enabled"})
@ConfigDescription("Enable optimized Parquet writer")
public ParquetWriterConfig setParquetOptimizedWriterEnabled(boolean parquetOptimizedWriterEnabled)
{
this.parquetOptimizedWriterEnabled = parquetOptimizedWriterEnabled;
return this;
}

@Config("parquet.writer.batch-size")
@ConfigDescription("Maximum number of rows passed to the writer in each batch")
public ParquetWriterConfig setBatchSize(int batchSize)
Expand All @@ -100,7 +90,8 @@ public double getValidationPercentage()
return validationPercentage;
}

@Config("parquet.optimized-writer.validation-percentage")
@Config("parquet.writer.validation-percentage")
@LegacyConfig("parquet.optimized-writer.validation-percentage")
@ConfigDescription("Percentage of parquet files to validate after write by re-reading the whole file")
public ParquetWriterConfig setValidationPercentage(double validationPercentage)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5267,21 +5267,7 @@ public Object[][] timestampPrecisionAndValues()
@Test(dataProvider = "timestampPrecisionAndValues")
public void testParquetTimestampPredicatePushdown(HiveTimestampPrecision timestampPrecision, LocalDateTime value)
{
doTestParquetTimestampPredicatePushdown(getSession(), timestampPrecision, value);
}

@Test(dataProvider = "timestampPrecisionAndValues")
public void testParquetTimestampPredicatePushdownHiveWriter(HiveTimestampPrecision timestampPrecision, LocalDateTime value)
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("hive", "parquet_optimized_writer_enabled", "false")
.build();
doTestParquetTimestampPredicatePushdown(session, timestampPrecision, value);
}

private void doTestParquetTimestampPredicatePushdown(Session baseSession, HiveTimestampPrecision timestampPrecision, LocalDateTime value)
{
Session session = withTimestampPrecision(baseSession, timestampPrecision);
Session session = withTimestampPrecision(getSession(), timestampPrecision);
String tableName = "test_parquet_timestamp_predicate_pushdown_" + randomNameSuffix();
assertUpdate("DROP TABLE IF EXISTS " + tableName);
assertUpdate("CREATE TABLE " + tableName + " (t TIMESTAMP) WITH (format = 'PARQUET')");
Expand Down Expand Up @@ -5378,25 +5364,11 @@ public void testParquetLongDecimalPredicatePushdown()

@Test
public void testParquetDictionaryPredicatePushdown()
{
testParquetDictionaryPredicatePushdown(getSession());
}

@Test
public void testParquetDictionaryPredicatePushdownWithHiveWriter()
{
testParquetDictionaryPredicatePushdown(
Session.builder(getSession())
.setCatalogSessionProperty("hive", "parquet_optimized_writer_enabled", "false")
.build());
}

private void testParquetDictionaryPredicatePushdown(Session session)
{
String tableName = "test_parquet_dictionary_pushdown_" + randomNameSuffix();
assertUpdate(session, "DROP TABLE IF EXISTS " + tableName);
assertUpdate(session, "CREATE TABLE " + tableName + " (n BIGINT) WITH (format = 'PARQUET')");
assertUpdate(session, "INSERT INTO " + tableName + " VALUES 1, 1, 2, 2, 4, 4, 5, 5", 8);
assertUpdate("DROP TABLE IF EXISTS " + tableName);
assertUpdate("CREATE TABLE " + tableName + " (n BIGINT) WITH (format = 'PARQUET')");
assertUpdate("INSERT INTO " + tableName + " VALUES 1, 1, 2, 2, 4, 4, 5, 5", 8);
assertNoDataRead("SELECT * FROM " + tableName + " WHERE n = 3");
}

Expand Down Expand Up @@ -7830,7 +7802,7 @@ public void testUseSortedProperties()
public void testCreateTableWithCompressionCodec(HiveCompressionCodec compressionCodec)
{
testWithAllStorageFormats((session, hiveStorageFormat) -> {
if (isNativeParquetWriter(session, hiveStorageFormat) && compressionCodec == HiveCompressionCodec.LZ4) {
if (hiveStorageFormat == HiveStorageFormat.PARQUET && compressionCodec == HiveCompressionCodec.LZ4) {
// TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer
assertThatThrownBy(() -> testCreateTableWithCompressionCodec(session, hiveStorageFormat, compressionCodec))
.hasMessage("Unsupported codec: LZ4");
Expand Down Expand Up @@ -8829,16 +8801,8 @@ private static void testWithStorageFormat(TestingHiveStorageFormat storageFormat
}
}

private boolean isNativeParquetWriter(Session session, HiveStorageFormat storageFormat)
{
return storageFormat == HiveStorageFormat.PARQUET &&
"true".equals(session.getCatalogProperties("hive").get("parquet_optimized_writer_enabled"));
}

private List<TestingHiveStorageFormat> getAllTestingHiveStorageFormat()
{
Session session = getSession();
String catalog = session.getCatalog().orElseThrow();
ImmutableList.Builder<TestingHiveStorageFormat> formats = ImmutableList.builder();
for (HiveStorageFormat hiveStorageFormat : HiveStorageFormat.values()) {
if (hiveStorageFormat == HiveStorageFormat.CSV) {
Expand All @@ -8849,20 +8813,7 @@ private List<TestingHiveStorageFormat> getAllTestingHiveStorageFormat()
// REGEX format is read-only
continue;
}
if (hiveStorageFormat == HiveStorageFormat.PARQUET) {
formats.add(new TestingHiveStorageFormat(
Session.builder(session)
.setCatalogSessionProperty(catalog, "parquet_optimized_writer_enabled", "false")
.build(),
hiveStorageFormat));
formats.add(new TestingHiveStorageFormat(
Session.builder(session)
.setCatalogSessionProperty(catalog, "parquet_optimized_writer_enabled", "true")
.build(),
hiveStorageFormat));
continue;
}
formats.add(new TestingHiveStorageFormat(session, hiveStorageFormat));
formats.add(new TestingHiveStorageFormat(getSession(), hiveStorageFormat));
}
return formats.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,10 @@ public void testParquetPageSourceGzip(int rowCount, long fileSizePadding)
}

@Test(dataProvider = "rowCount")
public void testOptimizedParquetWriter(int rowCount)
public void testParquetWriter(int rowCount)
throws Exception
{
ConnectorSession session = getHiveSession(
new HiveConfig(),
new ParquetWriterConfig()
.setParquetOptimizedWriterEnabled(true)
.setValidationPercentage(100.0));
assertTrue(HiveSessionProperties.isParquetOptimizedWriterEnabled(session));
ConnectorSession session = getHiveSession(new HiveConfig(), new ParquetWriterConfig().setValidationPercentage(100));

List<TestColumn> testColumns = getTestColumnsSupportedByParquet();
assertThatFileFormat(PARQUET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ protected QueryRunner createQueryRunner()
return HiveQueryRunner.builder()
.setHiveProperties(
ImmutableMap.of(
// TODO (https://github.com/trinodb/trino/issues/9359) use optimized writer
"parquet.optimized-writer.enabled", "false",
"parquet.use-column-index", "true",
"parquet.max-buffer-size", "1MB",
"parquet.optimized-reader.enabled", "false"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ protected QueryRunner createQueryRunner()
return HiveQueryRunner.builder()
.setHiveProperties(
ImmutableMap.of(
// TODO (https://github.com/trinodb/trino/issues/9359) use optimized writer
"parquet.optimized-writer.enabled", "false",
"parquet.use-column-index", "true",
"parquet.max-buffer-size", "1MB",
"parquet.optimized-reader.enabled", "true"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class TestParquetWriterConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(ParquetWriterConfig.class)
.setParquetOptimizedWriterEnabled(true)
.setBlockSize(DataSize.ofBytes(ParquetWriter.DEFAULT_BLOCK_SIZE))
.setPageSize(DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE))
.setBatchSize(ParquetWriterOptions.DEFAULT_BATCH_SIZE)
Expand All @@ -45,15 +44,11 @@ public void testLegacyProperties()
assertDeprecatedEquivalence(
ParquetWriterConfig.class,
Map.of(
"parquet.optimized-writer.enabled", "true",
"parquet.writer.validation-percentage", "42",
"parquet.writer.block-size", "33MB",
"parquet.writer.page-size", "7MB"),
Map.of(
"parquet.experimental-optimized-writer.enabled", "true",
"hive.parquet.writer.block-size", "33MB",
"hive.parquet.writer.page-size", "7MB"),
Map.of(
"hive.parquet.optimized-writer.enabled", "true",
"parquet.optimized-writer.validation-percentage", "42",
"hive.parquet.writer.block-size", "33MB",
"hive.parquet.writer.page-size", "7MB"));
}
Expand All @@ -62,14 +57,12 @@ public void testLegacyProperties()
public void testExplicitPropertyMappings()
{
Map<String, String> properties = Map.of(
"parquet.optimized-writer.enabled", "false",
"parquet.writer.block-size", "234MB",
"parquet.writer.page-size", "6MB",
"parquet.writer.batch-size", "100",
"parquet.optimized-writer.validation-percentage", "10");
"parquet.writer.validation-percentage", "10");

ParquetWriterConfig expected = new ParquetWriterConfig()
.setParquetOptimizedWriterEnabled(false)
.setBlockSize(DataSize.of(234, MEGABYTE))
.setPageSize(DataSize.of(6, MEGABYTE))
.setBatchSize(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,7 @@ public void testSnappyCompressedParquetTableCreatedInHive()
@Test(groups = HIVE_COMPRESSION)
public void testSnappyCompressedParquetTableCreatedInTrino()
{
testSnappyCompressedParquetTableCreatedInTrino(false);
}

@Test(groups = HIVE_COMPRESSION)
public void testSnappyCompressedParquetTableCreatedInTrinoWithNativeWriter()
{
testSnappyCompressedParquetTableCreatedInTrino(true);
}

private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedParquetWriter)
{
String tableName = "table_trino_parquet_snappy" + (optimizedParquetWriter ? "_native_writer" : "");
String tableName = "table_trino_parquet_snappy";
onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName);
onTrino().executeQuery(format(
"CREATE TABLE %s (" +
Expand All @@ -101,7 +90,6 @@ private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedPar

String catalog = (String) onTrino().executeQuery("SELECT CURRENT_CATALOG").getOnlyValue();
onTrino().executeQuery("SET SESSION " + catalog + ".compression_codec = 'SNAPPY'");
onTrino().executeQuery("SET SESSION " + catalog + ".parquet_optimized_writer_enabled = " + optimizedParquetWriter);
onTrino().executeQuery(format("INSERT INTO %s VALUES(1, 'test data')", tableName));

assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsExactlyInOrder(row(1, "test data"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,6 @@ public void testReadTrinoCreatedParquetTable()
testReadTrinoCreatedTable("using_parquet", "PARQUET");
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testReadTrinoCreatedParquetTableWithHiveWriter()
{
onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".parquet_optimized_writer_enabled = false");
testReadTrinoCreatedTable("using_hive_parquet", "PARQUET");
}

private void testReadTrinoCreatedTable(String tableName, String tableFormat)
{
String sparkTableName = "trino_created_table_" + tableName + "_" + randomNameSuffix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ public static StorageFormat[] storageFormatsWithConfiguration()
return new StorageFormat[] {
storageFormat("ORC", ImmutableMap.of("hive.orc_optimized_writer_validate", "true")),
storageFormat("PARQUET", ImmutableMap.of("hive.parquet_optimized_writer_validation_percentage", "100")),
storageFormat("PARQUET", ImmutableMap.of("hive.parquet_optimized_writer_enabled", "false")),
storageFormat("RCBINARY", ImmutableMap.of("hive.rcfile_optimized_writer_validate", "true")),
storageFormat("RCTEXT", ImmutableMap.of("hive.rcfile_optimized_writer_validate", "true")),
storageFormat("SEQUENCEFILE"),
Expand Down Expand Up @@ -785,19 +784,6 @@ public void testLargeParquetInsert()
"task_writer_count", "1")));
}

@Test(groups = STORAGE_FORMATS_DETAILED)
public void testLargeParquetInsertWithHiveWriter()
{
DataSize reducedRowGroupSize = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE / 4);
runLargeInsert(storageFormat(
"PARQUET",
ImmutableMap.of(
"hive.parquet_optimized_writer_enabled", "false",
"hive.parquet_writer_page_size", reducedRowGroupSize.toBytesValueString(),
"task_scale_writers_enabled", "false",
"task_writer_count", "1")));
}

@Test(groups = STORAGE_FORMATS_DETAILED)
public void testLargeOrcInsert()
{
Expand Down

0 comments on commit 1339286

Please sign in to comment.