From 2cf1ca3a4b2c074530738a457ff2a02dc37f3bbb Mon Sep 17 00:00:00 2001 From: James Petty Date: Wed, 22 Jul 2020 14:33:46 -0400 Subject: [PATCH] Avoid checking isSplittable for files smaller than initial split size For some input formats, the isSplittable check is non-trivial and can add a significant amount of time to split generation when handling a large number of very small files. This change allows files smaller than the max initial split size to avoid that check and considers them unsplittable instead. --- .../hive/BackgroundHiveSplitLoader.java | 3 + .../hive/util/InternalHiveSplitFactory.java | 9 +- .../hive/TestBackgroundHiveSplitLoader.java | 88 +++++++++++++++++-- 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java index 9fcf376dcc81..f70bd964ca9e 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java @@ -89,6 +89,7 @@ import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; import static io.prestosql.plugin.hive.HivePartitionManager.partitionMatches; +import static io.prestosql.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize; import static io.prestosql.plugin.hive.HiveSessionProperties.isForceLocalScheduling; import static io.prestosql.plugin.hive.metastore.MetastoreUtil.getHiveSchema; import static io.prestosql.plugin.hive.metastore.MetastoreUtil.getPartitionLocation; @@ -375,6 +376,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) partitionMatchSupplier, partition.getTableToPartitionMapping(), Optional.empty(), + getMaxInitialSplitSize(session), isForceLocalScheduling(session), s3SelectPushdownEnabled); lastResult = addSplitsToSource(targetSplits, splitFactory); @@ -413,6 +415,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) partitionMatchSupplier, partition.getTableToPartitionMapping(), bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty(), + getMaxInitialSplitSize(session), isForceLocalScheduling(session), s3SelectPushdownEnabled); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/InternalHiveSplitFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/InternalHiveSplitFactory.java index 76e517bed10a..4cd29e03c225 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/InternalHiveSplitFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/InternalHiveSplitFactory.java @@ -14,6 +14,7 @@ package io.prestosql.plugin.hive.util; import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; import io.prestosql.plugin.hive.AcidInfo; import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.plugin.hive.HivePartitionKey; @@ -62,6 +63,7 @@ public class InternalHiveSplitFactory private final TableToPartitionMapping tableToPartitionMapping; private final BooleanSupplier partitionMatchSupplier; private final Optional bucketConversion; + private final long minimumTargetSplitSizeInBytes; private final boolean forceLocalScheduling; private final boolean s3SelectPushdownEnabled; @@ -75,6 +77,7 @@ public InternalHiveSplitFactory( BooleanSupplier partitionMatchSupplier, TableToPartitionMapping tableToPartitionMapping, Optional bucketConversion, + DataSize minimumTargetSplitSize, boolean forceLocalScheduling, boolean s3SelectPushdownEnabled) { @@ -89,6 +92,8 @@ public InternalHiveSplitFactory( this.bucketConversion = requireNonNull(bucketConversion, "bucketConversion is null"); this.forceLocalScheduling = forceLocalScheduling; this.s3SelectPushdownEnabled = s3SelectPushdownEnabled; + this.minimumTargetSplitSizeInBytes = requireNonNull(minimumTargetSplitSize, "minimumTargetSplitSize is null").toBytes(); + checkArgument(minimumTargetSplitSizeInBytes > 0, "minimumTargetSplitSize must be > 0, found: %s", minimumTargetSplitSize); } public String getPartitionName() @@ -98,7 +103,9 @@ public String getPartitionName() public Optional createInternalHiveSplit(LocatedFileStatus status, OptionalInt bucketNumber, boolean splittable, Optional acidInfo) { - splittable = splittable && isSplittable(inputFormat, fileSystem, status.getPath()); + splittable = splittable && + status.getLen() > minimumTargetSplitSizeInBytes && + isSplittable(inputFormat, fileSystem, status.getPath()); return createInternalHiveSplit( status.getPath(), status.getBlockLocations(), diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java index 84038f82d645..20a907b80dcf 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -13,6 +13,7 @@ */ package io.prestosql.plugin.hive; +import com.google.common.base.Throwables; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; @@ -46,6 +47,12 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -82,6 +89,7 @@ import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.prestosql.plugin.hive.HiveColumnHandle.pathColumnHandle; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; +import static io.prestosql.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize; import static io.prestosql.plugin.hive.HiveStorageFormat.CSV; import static io.prestosql.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.prestosql.plugin.hive.HiveTestUtils.SESSION; @@ -105,6 +113,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class TestBackgroundHiveSplitLoader { @@ -156,14 +165,79 @@ public void testNoPathFilter() public void testCsv() throws Exception { - assertSplitCount(CSV, ImmutableMap.of(), 33); - assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "1"), 33); - assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "2"), 1); - assertSplitCount(CSV, ImmutableMap.of("skip.footer.line.count", "1"), 1); - assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "1", "skip.footer.line.count", "1"), 1); + DataSize fileSize = DataSize.of(2, GIGABYTE); + assertSplitCount(CSV, ImmutableMap.of(), fileSize, 33); + assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "1"), fileSize, 33); + assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "2"), fileSize, 1); + assertSplitCount(CSV, ImmutableMap.of("skip.footer.line.count", "1"), fileSize, 1); + assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "1", "skip.footer.line.count", "1"), fileSize, 1); } - private void assertSplitCount(HiveStorageFormat storageFormat, Map tableProperties, int expectedSplitCount) + @Test + public void testSplittableNotCheckedOnSmallFiles() + throws Exception + { + DataSize initialSplitSize = getMaxInitialSplitSize(SESSION); + + Table table = table( + ImmutableList.of(), + Optional.empty(), + ImmutableMap.of(), + StorageFormat.create(LazySimpleSerDe.class.getName(), TestSplittableFailureInputFormat.class.getName(), TestSplittableFailureInputFormat.class.getName())); + + // Exactly minimum split size, no isSplittable check + BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( + ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes())), + TupleDomain.all(), + Optional.empty(), + table, + Optional.empty()); + + HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); + backgroundHiveSplitLoader.start(hiveSplitSource); + + assertEquals(drainSplits(hiveSplitSource).size(), 1); + + // Large enough for isSplittable to be called + backgroundHiveSplitLoader = backgroundHiveSplitLoader( + ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes() + 1)), + TupleDomain.all(), + Optional.empty(), + table, + Optional.empty()); + + hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); + backgroundHiveSplitLoader.start(hiveSplitSource); + + try { + drainSplits(hiveSplitSource); + fail("Expected split generation to call isSplittable and fail"); + } + catch (PrestoException e) { + Throwable cause = Throwables.getRootCause(e); + assertTrue(cause instanceof IllegalStateException); + assertEquals(cause.getMessage(), "isSplittable called"); + } + } + + public static final class TestSplittableFailureInputFormat + extends FileInputFormat + { + @Override + protected boolean isSplitable(FileSystem fs, Path filename) + { + throw new IllegalStateException("isSplittable called"); + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) + throws IOException + { + throw new UnsupportedOperationException(); + } + } + + private void assertSplitCount(HiveStorageFormat storageFormat, Map tableProperties, DataSize fileSize, int expectedSplitCount) throws Exception { Table table = table( @@ -173,7 +247,7 @@ private void assertSplitCount(HiveStorageFormat storageFormat, Map