Skip to content

Commit

Permalink
Table scan execution path code cleanup (facebookincubator#10893)
Browse files Browse the repository at this point in the history
Summary:
No functional change and avoid RowSet copy passing around selective reader call stack

Pull Request resolved: facebookincubator#10893

Reviewed By: Yuhta

Differential Revision: D61998690

Pulled By: xiaoxmeng

fbshipit-source-id: 08850710e1b0ce318416a6e87cdeeb8383cb03d3
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 7, 2024
1 parent 030e439 commit 24dd2e9
Show file tree
Hide file tree
Showing 113 changed files with 1,539 additions and 1,362 deletions.
12 changes: 6 additions & 6 deletions velox/common/base/BitSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
#include "velox/common/base/BitUtil.h"

namespace facebook::velox {
// Dynamic size dense bit set that Keeps track of maximum set bit.
/// Dynamic size dense bit set that keeps track of maximum set bit.
class BitSet {
public:
// Constructs a bitSet. 'min' is the lowest possible member of the
// set. Values below this are not present and inserting these is a
// no-op. 'min' is used when using this as an IN predicate filter.
/// Constructs a bitSet. 'min' is the lowest possible member of the set.
/// Values below this are not present and inserting these is a no-op. 'min' is
/// used when using this as an IN predicate filter.
explicit BitSet(int64_t min) : min_(min) {}

void insert(int64_t index) {
Expand Down Expand Up @@ -56,7 +56,7 @@ class BitSet {
return bits::isBitSet(bits_.data(), bit);
}

// Returns the largest element of the set or 'min_ - 1' if empty.
/// Returns the largest element of the set or 'min_ - 1' if empty.
int64_t max() const {
return lastSetBit_ + min_;
}
Expand All @@ -66,8 +66,8 @@ class BitSet {
}

private:
std::vector<uint64_t> bits_;
const int64_t min_;
std::vector<uint64_t> bits_;
int64_t lastSetBit_ = -1;
};

Expand Down
5 changes: 5 additions & 0 deletions velox/common/base/BitUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ constexpr inline T roundUp(T value, U factor) {
return (value + (factor - 1)) / factor * factor;
}

template <typename T, typename U>
constexpr inline T divRoundUp(T value, U factor) {
return (value + (factor - 1)) / factor;
}

constexpr inline uint64_t lowMask(int32_t bits) {
return (1UL << bits) - 1;
}
Expand Down
6 changes: 3 additions & 3 deletions velox/common/base/SelectivityInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class SelectivityInfo {
class SelectivityTimer {
public:
SelectivityTimer(SelectivityInfo& info, uint64_t numIn)
: startClocks_(folly::hardware_timestamp()),
totalClocks_(&info.timeClocks_) {
: totalClocks_(&info.timeClocks_),
startClocks_(folly::hardware_timestamp()) {
info.numIn_ += numIn;
}

Expand All @@ -72,8 +72,8 @@ class SelectivityTimer {
}

private:
uint64_t startClocks_;
uint64_t* const totalClocks_;
uint64_t startClocks_;
};

} // namespace velox
Expand Down
61 changes: 61 additions & 0 deletions velox/common/base/tests/BitUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,67 @@ TEST_F(BitUtilTest, storeBitsToByte) {
ASSERT_EQ(bytes[2], 0);
}

TEST_F(BitUtilTest, roundUp) {
struct {
uint64_t value;
uint64_t factor;
uint64_t expected;

std::string debugString() const {
return fmt::format(
"value: {}, factor: {}, expected: {}", value, factor, expected);
}
} testSettings[] = {
{10, 1, 10},
{10, 3, 12},
{10, 4, 12},
{10, 10, 10},
{10, 11, 11},
{10, 20, 20},
{11, 1, 11},
{11, 3, 12},
{11, 4, 12},
{11, 11, 11},
{11, 12, 12},
{11, 23, 23}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
ASSERT_EQ(
bits::roundUp(testData.value, testData.factor), testData.expected);
}
}

TEST_F(BitUtilTest, divRoundUp) {
struct {
uint64_t value;
uint64_t factor;
uint64_t expected;

std::string debugString() const {
return fmt::format(
"value: {}, factor: {}, expected: {}", value, factor, expected);
}
} testSettings[] = {
{10, 1, 10},
{10, 3, 4},
{10, 4, 3},
{10, 10, 1},
{10, 11, 1},
{10, 20, 1},
{11, 1, 11},
{11, 3, 4},
{11, 4, 3},
{11, 11, 1},
{11, 12, 1},
{11, 23, 1}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
ASSERT_EQ(
bits::divRoundUp(testData.value, testData.factor), testData.expected);
}
}
} // namespace bits
} // namespace velox
} // namespace facebook
18 changes: 9 additions & 9 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,14 +583,14 @@ void configureReaderOptions(
}

void configureRowReaderOptions(
dwio::common::RowReaderOptions& rowReaderOptions,
const std::unordered_map<std::string, std::string>& tableParameters,
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const config::ConfigBase* sessionProperties) {
const config::ConfigBase* sessionProperties,
dwio::common::RowReaderOptions& rowReaderOptions) {
auto skipRowsIt =
tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount);
if (skipRowsIt != tableParameters.end()) {
Expand Down Expand Up @@ -649,22 +649,22 @@ bool testFilters(
const dwio::common::Reader* reader,
const std::string& filePath,
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKey,
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle) {
auto totalRows = reader->numberOfRows();
const auto totalRows = reader->numberOfRows();
const auto& fileTypeWithId = reader->typeWithId();
const auto& rowType = reader->rowType();
for (const auto& child : scanSpec->children()) {
if (child->filter()) {
const auto& name = child->fieldName();
auto iter = partitionKey.find(name);
auto iter = partitionKeys.find(name);
// By design, the partition key columns for Iceberg tables are included in
// the data files to facilitate partition transform and partition
// evolution, so we need to test both cases.
if (!rowType->containsChild(name) || iter != partitionKey.end()) {
if (iter != partitionKey.end() && iter->second.has_value()) {
auto handlesIter = partitionKeysHandle.find(name);
if (!rowType->containsChild(name) || iter != partitionKeys.end()) {
if (iter != partitionKeys.end() && iter->second.has_value()) {
const auto handlesIter = partitionKeysHandle.find(name);
VELOX_CHECK(handlesIter != partitionKeysHandle.end());

// This is a non-null partition key
Expand All @@ -684,7 +684,7 @@ bool testFilters(
}
} else {
const auto& typeWithId = fileTypeWithId->childByName(name);
auto columnStats = reader->columnStatistics(typeWithId->id());
const auto columnStats = reader->columnStatistics(typeWithId->id());
if (columnStats != nullptr &&
!testFilter(
child->filter(),
Expand Down
6 changes: 3 additions & 3 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ void configureReaderOptions(
const std::unordered_map<std::string, std::string>& tableParameters = {});

void configureRowReaderOptions(
dwio::common::RowReaderOptions& rowReaderOptions,
const std::unordered_map<std::string, std::string>& tableParameters,
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig = nullptr,
const config::ConfigBase* sessionProperties = nullptr);
const std::shared_ptr<const HiveConfig>& hiveConfig,
const config::ConfigBase* sessionProperties,
dwio::common::RowReaderOptions& rowReaderOptions);

bool testFilters(
const common::ScanSpec* scanSpec,
Expand Down
136 changes: 70 additions & 66 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ HiveDataSource::HiveDataSource(
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig)
: pool_(connectorQueryCtx->memoryPool()),
fileHandleFactory_(fileHandleFactory),
: fileHandleFactory_(fileHandleFactory),
executor_(executor),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
pool_(connectorQueryCtx->memoryPool()),
outputType_(outputType),
expressionEvaluator_(connectorQueryCtx->expressionEvaluator()) {
// Column handled keyed on the column alias, the name used in the query.
Expand Down Expand Up @@ -300,6 +300,7 @@ vector_size_t HiveDataSource::applyBucketConversion(
for (vector_size_t i = 0; i < rowVector->size(); ++i) {
VELOX_CHECK_EQ((partitions_[i] - bucketToKeep) % partitionBucketCount, 0);
}

if (remainingFilterExprSet_) {
for (vector_size_t i = 0; i < rowVector->size(); ++i) {
if (partitions_[i] != bucketToKeep) {
Expand Down Expand Up @@ -345,76 +346,76 @@ std::optional<RowVectorPtr> HiveDataSource::next(
output_ = BaseVector::create(readerOutputType_, 0, pool_);
}

auto rowsScanned = splitReader_->next(size, output_);
const auto rowsScanned = splitReader_->next(size, output_);
completedRows_ += rowsScanned;
if (rowsScanned == 0) {
splitReader_->updateRuntimeStats(runtimeStats_);
resetSplit();
return nullptr;
}

if (rowsScanned) {
VELOX_CHECK(
!output_->mayHaveNulls(), "Top-level row vector cannot have nulls");
auto rowsRemaining = output_->size();
VELOX_CHECK(
!output_->mayHaveNulls(), "Top-level row vector cannot have nulls");
auto rowsRemaining = output_->size();
if (rowsRemaining == 0) {
// no rows passed the pushed down filters.
return getEmptyOutput();
}

auto rowVector = std::dynamic_pointer_cast<RowVector>(output_);

// In case there is a remaining filter that excludes some but not all
// rows, collect the indices of the passing rows. If there is no filter,
// or it passes on all rows, leave this as null and let exec::wrap skip
// wrapping the results.
BufferPtr remainingIndices;
if (remainingFilterExprSet_) {
if (numBucketConversion_ > 0) {
filterRows_.resizeFill(rowVector->size());
} else {
filterRows_.resize(rowVector->size());
}
}
if (partitionFunction_) {
rowsRemaining = applyBucketConversion(rowVector, remainingIndices);
if (rowsRemaining == 0) {
// no rows passed the pushed down filters.
return getEmptyOutput();
}
}

auto rowVector = std::dynamic_pointer_cast<RowVector>(output_);

// In case there is a remaining filter that excludes some but not all
// rows, collect the indices of the passing rows. If there is no filter,
// or it passes on all rows, leave this as null and let exec::wrap skip
// wrapping the results.
BufferPtr remainingIndices;
if (remainingFilterExprSet_) {
if (numBucketConversion_ > 0) {
filterRows_.resizeFill(rowVector->size());
} else {
filterRows_.resize(rowVector->size());
}
}
if (partitionFunction_) {
rowsRemaining = applyBucketConversion(rowVector, remainingIndices);
if (rowsRemaining == 0) {
return getEmptyOutput();
}
if (remainingFilterExprSet_) {
rowsRemaining = evaluateRemainingFilter(rowVector);
VELOX_CHECK_LE(rowsRemaining, rowsScanned);
if (rowsRemaining == 0) {
// No rows passed the remaining filter.
return getEmptyOutput();
}
if (remainingFilterExprSet_) {
rowsRemaining = evaluateRemainingFilter(rowVector);
VELOX_CHECK_LE(rowsRemaining, rowsScanned);
if (rowsRemaining == 0) {
// No rows passed the remaining filter.
return getEmptyOutput();
}

if (rowsRemaining < rowVector->size()) {
// Some, but not all rows passed the remaining filter.
remainingIndices = filterEvalCtx_.selectedIndices;
}
if (rowsRemaining < rowVector->size()) {
// Some, but not all rows passed the remaining filter.
remainingIndices = filterEvalCtx_.selectedIndices;
}
}

if (outputType_->size() == 0) {
return exec::wrap(rowsRemaining, remainingIndices, rowVector);
}
if (outputType_->size() == 0) {
return exec::wrap(rowsRemaining, remainingIndices, rowVector);
}

std::vector<VectorPtr> outputColumns;
outputColumns.reserve(outputType_->size());
for (int i = 0; i < outputType_->size(); i++) {
auto& child = rowVector->childAt(i);
if (remainingIndices) {
// Disable dictionary values caching in expression eval so that we
// don't need to reallocate the result for every batch.
child->disableMemo();
}
outputColumns.emplace_back(
exec::wrapChild(rowsRemaining, remainingIndices, child));
std::vector<VectorPtr> outputColumns;
outputColumns.reserve(outputType_->size());
for (int i = 0; i < outputType_->size(); ++i) {
auto& child = rowVector->childAt(i);
if (remainingIndices) {
// Disable dictionary values caching in expression eval so that we
// don't need to reallocate the result for every batch.
child->disableMemo();
}

return std::make_shared<RowVector>(
pool_, outputType_, BufferPtr(nullptr), rowsRemaining, outputColumns);
outputColumns.emplace_back(
exec::wrapChild(rowsRemaining, remainingIndices, child));
}

splitReader_->updateRuntimeStats(runtimeStats_);
resetSplit();
return nullptr;
return std::make_shared<RowVector>(
pool_, outputType_, BufferPtr(nullptr), rowsRemaining, outputColumns);
}

void HiveDataSource::addDynamicFilter(
Expand Down Expand Up @@ -505,15 +506,18 @@ vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) {
filterLazyDecoded_,
filterLazyBaseRows_);
}
auto filterStartMicros = getCurrentTimeMicro();
expressionEvaluator_->evaluate(
remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_);
auto res = exec::processFilterResults(
filterResult_, filterRows_, filterEvalCtx_, pool_);
uint64_t filterTimeUs{0};
vector_size_t rowsRemaining{0};
{
MicrosecondTimer timer(&filterTimeUs);
expressionEvaluator_->evaluate(
remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_);
rowsRemaining = exec::processFilterResults(
filterResult_, filterRows_, filterEvalCtx_, pool_);
}
totalRemainingFilterTime_.fetch_add(
(getCurrentTimeMicro() - filterStartMicros) * 1000,
std::memory_order_relaxed);
return res;
filterTimeUs * 1000, std::memory_order_relaxed);
return rowsRemaining;
}

void HiveDataSource::resetSplit() {
Expand Down
Loading

0 comments on commit 24dd2e9

Please sign in to comment.