From 456f2a6c87e712b3f8cf368f674ee0fab4cab22a Mon Sep 17 00:00:00 2001 From: Ke Date: Mon, 24 Jun 2024 20:55:42 -0700 Subject: [PATCH] Add sort verification for writer fuzzer (#10235) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/10235 Reviewed By: xiaoxmeng Differential Revision: D58762369 Pulled By: kewang1024 fbshipit-source-id: 2ba3aab92957b698699148f533eb5a5a9e8ba7c0 --- velox/docs/develop/testing/writer-fuzzer.rst | 10 +- velox/exec/fuzzer/PrestoQueryRunner.cpp | 33 ++- velox/exec/fuzzer/WriterFuzzer.cpp | 220 ++++++++++++++++--- velox/exec/fuzzer/WriterFuzzerRunner.h | 4 +- velox/exec/tests/TableWriteTest.cpp | 20 +- velox/exec/tests/utils/PlanBuilder.cpp | 12 +- velox/exec/tests/utils/PlanBuilder.h | 3 +- 7 files changed, 241 insertions(+), 61 deletions(-) diff --git a/velox/docs/develop/testing/writer-fuzzer.rst b/velox/docs/develop/testing/writer-fuzzer.rst index 20dbbba369b7..ab78732a1e54 100644 --- a/velox/docs/develop/testing/writer-fuzzer.rst +++ b/velox/docs/develop/testing/writer-fuzzer.rst @@ -3,17 +3,19 @@ Writer Fuzzer ============= Writer fuzzer tests table write plan with up to 5 regular columns, up to -3 partition keys and up to 3 bucket columns. +3 partition keys, up to 3 bucket columns, up to 3 sorted columns. -At each iteration, fuzzer randomly generate a table write plan with different -table properties including un-partitioned and partitioned, non-bucketed and bucketed. +At each iteration, fuzzer randomly generates a table write plan with different +table properties including un-partitioned and partitioned, non-bucketed and bucketed, +sorted and unsorted. The fuzzer then generates inputs and runs the query plan and compares the results with PrestoDB. As of now, we compare: 1. How many rows were written. 2. Output directories have the same directory layout and hierarchy. -3. Same data were written by velox and prestoDB. +3. Same data were written by velox and prestoDB including bucket number. +4. Data of sorted columns is in the same order if table is sorted. How to run ---------- diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index 519da8085a6f..bb078fe48819 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -573,8 +573,13 @@ std::optional PrestoQueryRunner::toSql( // Returns a CTAS sql with specified table properties from TableWriteNode, // example sql: - // CREATE TABLE tmp_write WITH (PARTITIONED_BY = ARRAY['p0'], BUCKETED_COUNT = - // 20, BUCKETED_BY = ARRAY['b0', 'b1']) AS SELECT * FROM tmp + // CREATE TABLE tmp_write WITH ( + // PARTITIONED_BY = ARRAY['p0'], + // BUCKETED_COUNT = 2, BUCKETED_BY = ARRAY['b0', 'b1'], + // SORTED_BY = ARRAY['s0 ASC', 's1 DESC'], + // FORMAT = 'ORC' + // ) + // AS SELECT * FROM tmp std::stringstream sql; sql << "CREATE TABLE tmp_write"; std::vector partitionKeys; @@ -583,31 +588,43 @@ std::optional PrestoQueryRunner::toSql( partitionKeys.push_back(insertTableHandle->inputColumns()[i]->name()); } } + sql << " WITH ("; if (insertTableHandle->isPartitioned()) { - sql << " WITH (PARTITIONED_BY = ARRAY["; + sql << " PARTITIONED_BY = ARRAY["; for (int i = 0; i < partitionKeys.size(); ++i) { appendComma(i, sql); sql << "'" << partitionKeys[i] << "'"; } - sql << "]"; + sql << "], "; if (insertTableHandle->bucketProperty() != nullptr) { const auto bucketCount = insertTableHandle->bucketProperty()->bucketCount(); const auto bucketColumns = insertTableHandle->bucketProperty()->bucketedBy(); - sql << ", BUCKET_COUNT = " << bucketCount << ", BUCKETED_BY = ARRAY["; + sql << " BUCKET_COUNT = " << bucketCount << ", BUCKETED_BY = ARRAY["; for (int i = 0; i < bucketColumns.size(); ++i) { appendComma(i, sql); sql << "'" << bucketColumns[i] << "'"; } - sql << "]"; + sql << "], "; + + const auto sortColumns = insertTableHandle->bucketProperty()->sortedBy(); + if (!sortColumns.empty()) { + sql << " SORTED_BY = ARRAY["; + for (int i = 0; i < sortColumns.size(); ++i) { + appendComma(i, sql); + sql << "'" << sortColumns[i]->sortColumn() << " " + << (sortColumns[i]->sortOrder().isAscending() ? "ASC" : "DESC") + << "'"; + } + sql << "], "; + } } - sql << ")"; } - sql << " AS SELECT * FROM tmp"; + sql << "FORMAT = 'ORC') AS SELECT * FROM tmp"; return sql.str(); } diff --git a/velox/exec/fuzzer/WriterFuzzer.cpp b/velox/exec/fuzzer/WriterFuzzer.cpp index eab4bccced8e..46f789857274 100644 --- a/velox/exec/fuzzer/WriterFuzzer.cpp +++ b/velox/exec/fuzzer/WriterFuzzer.cpp @@ -17,18 +17,22 @@ #include +#include #include #include "velox/common/base/Fs.h" #include "velox/common/encode/Base64.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/fuzzer/FuzzerUtil.h" #include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/expression/fuzzer/FuzzerToolkit.h" +#include "velox/functions/prestosql/tests/utils/FunctionBaseTest.h" #include "velox/vector/VectorSaver.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -54,6 +58,7 @@ DEFINE_double( "(expressed as double from 0 to 1)."); using namespace facebook::velox::connector::hive; +using namespace facebook::velox::test; namespace facebook::velox::exec::test { @@ -113,9 +118,11 @@ class WriterFuzzer { const std::vector& names, const std::vector& types, int32_t partitionOffset, + const std::vector& partitionKeys, int32_t bucketCount, const std::vector& bucketColumns, - const std::vector& partitionKeys, + int32_t sortColumnOffset, + const std::vector>& sortBy, const std::string& outputDirectoryPath); // Generates table column handles based on table column properties @@ -152,13 +159,27 @@ class WriterFuzzer { RowTypePtr generateOutputType( const std::vector& names, const std::vector& types, - const int32_t partitionCount, const int32_t bucketCount); - // Check the table properties and see if the table is bucketed. - bool isBucketed(const int32_t partitionCount, const int32_t bucketCount) { - return partitionCount > 0 && bucketCount > 0; - } + // Generates a sql that reads sorted columns from a single split of a bucketed + // and sorted table. + // For example, for a table sorted by age, reading a split that belongs to ds + // = 2022-01-01 and bucket 1: + // SELECT age FROM temp_write where ds = '2022-01-01' and "$bucket" = 1 + std::string sortSql( + const std::shared_ptr& split, + const std::vector& names, + const std::vector& types, + int32_t partitionOffset, + const std::vector& partitionKeys, + const std::vector>& sortBy); + + // When concatenating a partition value, if it's non-varchar, no change, eg: + // age = 10 + // If it's varchar, we need to add single quote and also escape the single + // quote in partition value, eg: + // city = '''SF''' + std::string partitionToSql(const TypePtr& type, std::string partitionValue); const std::vector kRegularColumnTypes_{ BOOLEAN(), @@ -170,8 +191,29 @@ class WriterFuzzer { VARBINARY(), TIMESTAMP(), }; + + // Supported sorted column types: + const std::vector kSupportedSortColumnTypes_{ + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + VARCHAR(), + TIMESTAMP(), + }; + + // Supported order types: + // https://github.com/prestodb/presto/blob/c542429ba989887de6208daaed4d7b4e34b49b3b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SortingColumn.java#L101 + // ASCENDING(ASC_NULLS_FIRST, 1), + // DESCENDING(DESC_NULLS_LAST, 0); + const std::vector kSortOrderTypes_{ + core::SortOrder{true, true}, + core::SortOrder{false, false}, + }; + // Supported bucket column types: - // https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java#L142 + // https://github.com/prestodb/presto/blob/10143be627beb2c61aba5b3d36af473d2a8ef65e/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java#L133 const std::vector kSupportedBucketColumnTypes_{ BOOLEAN(), TINYINT(), @@ -181,9 +223,10 @@ class WriterFuzzer { VARCHAR(), TIMESTAMP(), }; + // Supported partition key column types // According to VectorHasher::typeKindSupportsValueIds and - // https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L575 + // https://github.com/prestodb/presto/blob/10143be627beb2c61aba5b3d36af473d2a8ef65e/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L593 const std::vector kPartitionKeyTypes_{ BOOLEAN(), TINYINT(), @@ -255,24 +298,43 @@ void WriterFuzzer::go() { std::vector names; std::vector types; - std::vector bucketColumns; + int32_t partitionOffset = 0; std::vector partitionKeys; + int32_t bucketCount = 0; + std::vector bucketColumns; + int32_t sortColumnOffset = 0; + std::vector> sortBy; // Regular table columns generateColumns(5, "c", kRegularColumnTypes_, 2, names, types); - // 50% of times test bucketed write. - int32_t bucketCount = 0; - if (vectorFuzzer_.coinToss(0.5)) { - bucketColumns = generateColumns( - 5, "b", kSupportedBucketColumnTypes_, 1, names, types); - bucketCount = - boost::random::uniform_int_distribution(1, 3)(rng_); - } - // 50% of times test partitioned write. - const auto partitionOffset = names.size(); if (vectorFuzzer_.coinToss(0.5)) { + // 50% of times test bucketed write. + if (vectorFuzzer_.coinToss(0.5)) { + bucketColumns = generateColumns( + 5, "b", kSupportedBucketColumnTypes_, 1, names, types); + bucketCount = + boost::random::uniform_int_distribution(1, 3)(rng_); + + // TODO: sort columns can overlap as bucket columns + // 50% of times test ordered write. + if (vectorFuzzer_.coinToss(0.5)) { + sortColumnOffset = names.size(); + auto sortColumns = generateColumns( + 3, "s", kSupportedSortColumnTypes_, 1, names, types); + sortBy.reserve(sortColumns.size()); + for (const auto& sortByColumn : sortColumns) { + sortBy.push_back(std::make_shared( + sortByColumn, + kSortOrderTypes_.at( + boost::random::uniform_int_distribution( + 0, 1)(rng_)))); + } + } + } + + partitionOffset = names.size(); partitionKeys = generateColumns(3, "p", kPartitionKeyTypes_, 1, names, types); } @@ -284,9 +346,11 @@ void WriterFuzzer::go() { names, types, partitionOffset, + partitionKeys, bucketCount, bucketColumns, - partitionKeys, + sortColumnOffset, + sortBy, tempDirPath->getPath()); LOG(INFO) << "==============================> Done with iteration " @@ -354,16 +418,21 @@ void WriterFuzzer::verifyWriter( const std::vector& names, const std::vector& types, const int32_t partitionOffset, + const std::vector& partitionKeys, const int32_t bucketCount, const std::vector& bucketColumns, - const std::vector& partitionKeys, + const int32_t sortColumnOffset, + const std::vector>& sortBy, const std::string& outputDirectoryPath) { - const auto plan = - PlanBuilder() - .values(input) - .tableWrite( - outputDirectoryPath, partitionKeys, bucketCount, bucketColumns) - .planNode(); + const auto plan = PlanBuilder() + .values(input) + .tableWrite( + outputDirectoryPath, + partitionKeys, + bucketCount, + bucketColumns, + sortBy) + .planNode(); const auto maxDrivers = boost::random::uniform_int_distribution(1, 16)(rng_); @@ -403,23 +472,60 @@ void WriterFuzzer::verifyWriter( auto splits = makeSplits(outputDirectoryPath); auto columnHandles = getTableColumnHandles(names, types, partitionOffset, bucketCount); - const auto rowType = - generateOutputType(names, types, partitionKeys.size(), bucketCount); + const auto rowType = generateOutputType(names, types, bucketCount); auto readPlan = PlanBuilder() .tableScan(rowType, {}, "", rowType, columnHandles) .planNode(); auto actual = execute(readPlan, maxDrivers, splits); std::string bucketSql = ""; - if (isBucketed(partitionKeys.size(), bucketCount)) { + if (bucketCount > 0) { bucketSql = ", \"$bucket\""; } - auto reference_data = referenceQueryRunner_->execute( + auto referenceData = referenceQueryRunner_->execute( "SELECT *" + bucketSql + " FROM tmp_write"); VELOX_CHECK( - assertEqualResults(reference_data, {actual}), + assertEqualResults(referenceData, {actual}), "Velox and reference DB results don't match"); + // 4. Verifies sorting. + if (sortBy.size() > 0) { + const std::vector sortColumnNames = { + names.begin() + sortColumnOffset, + names.begin() + sortColumnOffset + sortBy.size()}; + const std::vector sortColumnTypes = { + types.begin() + sortColumnOffset, + types.begin() + sortColumnOffset + sortBy.size()}; + + // Read from each file and check if data is sorted as presto sorted result. + for (const auto& split : splits) { + auto splitReadPlan = PlanBuilder() + .tableScan(generateOutputType( + sortColumnNames, sortColumnTypes, 0)) + .planNode(); + auto singleSplitData = execute(splitReadPlan, 1, {split}); + + auto const singleSplitReferenceSql = sortSql( + std::dynamic_pointer_cast(split.connectorSplit), + names, + types, + partitionOffset, + partitionKeys, + sortBy); + + const auto referenceResult = + referenceQueryRunner_->execute(singleSplitReferenceSql); + const auto& referenceData = referenceResult.at(0); + for (int i = 1; i < referenceResult.size(); ++i) { + referenceData->append(referenceResult.at(i).get()); + } + fuzzer::compareVectors( + singleSplitData, referenceData, "velox", "prestoDB"); + LOG(INFO) << "Sort Verification succeed for split: " + << split.connectorSplit->toString(); + } + } + LOG(INFO) << "Verified results against reference DB"; } @@ -444,7 +550,7 @@ WriterFuzzer::getTableColumnHandles( names.at(i), columnType, types.at(i), types.at(i))}); } // If table is bucketed, add synthesized $bucket column. - if (isBucketed(names.size() - partitionOffset, bucketCount)) { + if (bucketCount > 0) { columnHandle.insert( {"$bucket", std::make_shared( @@ -579,14 +685,13 @@ std::map WriterFuzzer::getPartitionNameAndFilecount( RowTypePtr WriterFuzzer::generateOutputType( const std::vector& names, const std::vector& types, - const int32_t partitionCount, const int32_t bucketCount) { std::vector outputNames{names}; std::vector outputTypes; for (auto type : types) { outputTypes.emplace_back(type); } - if (isBucketed(partitionCount, bucketCount)) { + if (bucketCount > 0) { outputNames.emplace_back("$bucket"); outputTypes.emplace_back(INTEGER()); } @@ -594,5 +699,50 @@ RowTypePtr WriterFuzzer::generateOutputType( return {ROW(std::move(outputNames), std::move(outputTypes))}; } +std::string WriterFuzzer::sortSql( + const std::shared_ptr& split, + const std::vector& names, + const std::vector& types, + int32_t partitionOffset, + const std::vector& partitionKeys, + const std::vector>& sortBy) { + // For a split, extract the partition filters and bucket filters. + std::stringstream whereSql; + whereSql << "WHERE "; + for (int i = partitionOffset; i < partitionOffset + partitionKeys.size(); + ++i) { + const auto& partitionKey = names.at(i); + auto partitionValue = split->partitionKeys.at(partitionKey); + if (partitionValue.has_value()) { + whereSql << partitionKey << " = " + << partitionToSql(types.at(i), partitionValue.value()); + } else { + whereSql << partitionKey << " IS NULL"; + } + whereSql << " AND "; + } + whereSql << "\"$bucket\" = " << split->tableBucketNumber.value(); + + std::stringstream selectedColumns; + for (int i = 0; i < sortBy.size(); ++i) { + if (i > 0) { + selectedColumns << ", "; + } + selectedColumns << sortBy.at(i)->sortColumn(); + } + return "SELECT " + selectedColumns.str() + " FROM tmp_write " + + whereSql.str(); +} + +std::string WriterFuzzer::partitionToSql( + const TypePtr& type, + std::string partitionValue) { + if (type->isVarchar()) { + RE2::Replace(&partitionValue, "'", "''"); + return "'" + partitionValue + "'"; + } + return partitionValue; +} + } // namespace } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/WriterFuzzerRunner.h b/velox/exec/fuzzer/WriterFuzzerRunner.h index 61ccaf0e6533..ca6fbbfa7019 100644 --- a/velox/exec/fuzzer/WriterFuzzerRunner.h +++ b/velox/exec/fuzzer/WriterFuzzerRunner.h @@ -37,8 +37,8 @@ namespace facebook::velox::exec::test { /// automatically generate and execute table writer tests. /// It works in following steps: /// -/// 1. Pick different table write properties. Eg: partitioned. -/// (TODO: bucketed, sorted). +/// 1. Pick different table write properties. Eg: partitioned, bucketed, +/// sorted. /// 2. Generate corresponding table write query plan. /// 3. Generate a random set of input data (vector). /// 4. Execute the query plan. diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 24cb3ebb5bdf..a3b97f979d34 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -3596,7 +3596,15 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->getPath(), {"c0"}, 4, {"c1"}, {"c2"}) + .tableWrite( + outputDirectory->getPath(), + {"c0"}, + 4, + {"c1"}, + { + std::make_shared( + "c2", core::SortOrder{false, false}), + }) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3965,7 +3973,15 @@ DEBUG_ONLY_TEST_F( auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->getPath(), {"c0"}, 4, {"c1"}, {"c2"}) + .tableWrite( + outputDirectory->getPath(), + {"c0"}, + 4, + {"c1"}, + { + std::make_shared( + "c2", core::SortOrder{false, false}), + }) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 7df17add510b..a1db6fb23544 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -57,24 +57,18 @@ std::shared_ptr buildHiveBucketProperty( const RowTypePtr rowType, int32_t bucketCount, const std::vector& bucketColumns, - const std::vector& sortByColumns) { + const std::vector>& sortBy) { std::vector bucketTypes; bucketTypes.reserve(bucketColumns.size()); for (const auto& bucketColumn : bucketColumns) { bucketTypes.push_back(rowType->childAt(rowType->getChildIdx(bucketColumn))); } - std::vector> sortedBy; - sortedBy.reserve(sortByColumns.size()); - for (const auto& sortByColumn : sortByColumns) { - sortedBy.push_back(std::make_shared( - sortByColumn, core::SortOrder{false, false})); - } return std::make_shared( HiveBucketProperty::Kind::kHiveCompatible, bucketCount, bucketColumns, bucketTypes, - sortedBy); + sortBy); } } // namespace @@ -385,7 +379,7 @@ PlanBuilder& PlanBuilder::tableWrite( const std::vector& partitionBy, int32_t bucketCount, const std::vector& bucketedBy, - const std::vector& sortBy, + const std::vector>& sortBy, const dwio::common::FileFormat fileFormat, const std::vector& aggregates, const std::string& connectorId, diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 682c2d78be20..471940f6766e 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -440,7 +440,8 @@ class PlanBuilder { const std::vector& partitionBy, int32_t bucketCount, const std::vector& bucketedBy, - const std::vector& sortBy, + const std::vector< + std::shared_ptr>& sortBy, const dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, const std::vector& aggregates = {},