Skip to content

Commit

Permalink
fix(merge-join): Produce output before advancing key comparison (face…
Browse files Browse the repository at this point in the history
…bookincubator#11605)

Summary:

Before we start reading keys from the next batch of input, we need to
make sure we are not holding output_ wrapped around lazy vector from the last
batch, since lazy vectors need to be materialized in order.

Differential Revision: D66169184
  • Loading branch information
pedroerp authored and facebook-github-bot committed Nov 21, 2024
1 parent ebfb1e5 commit 82021a2
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 63 deletions.
32 changes: 13 additions & 19 deletions velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ RowVectorPtr MergeJoin::getOutput() {
}
}
rightIndex_ = firstNonNullIndex;
if (rightIndex_ == rightInput_->size()) {
if (finishedRightBatch()) {
// Ran out of rows on the right side.
rightInput_ = nullptr;
}
Expand Down Expand Up @@ -884,12 +884,11 @@ RowVectorPtr MergeJoin::doGetOutput() {
return std::move(output_);
}
addOutputRowForLeftJoin(input_, index_);

++index_;
if (index_ == input_->size()) {
// Ran out of rows on the left side.

if (finishedLeftBatch()) {
input_ = nullptr;
return nullptr;
return produceOutput();
}
}
}
Expand All @@ -911,11 +910,10 @@ RowVectorPtr MergeJoin::doGetOutput() {
if (outputSize_ == outputBatchSize_) {
return std::move(output_);
}

addOutputRowForRightJoin(rightInput_, rightIndex_);

++rightIndex_;
if (rightIndex_ == rightInput_->size()) {
if (finishedRightBatch()) {
// Ran out of rows on the right side.
rightInput_ = nullptr;
return nullptr;
Expand All @@ -940,12 +938,11 @@ RowVectorPtr MergeJoin::doGetOutput() {
return std::move(output_);
}
addOutputRowForLeftJoin(input_, index_);

++index_;
if (index_ == input_->size()) {
// Ran out of rows on the left side.

if (finishedLeftBatch()) {
input_ = nullptr;
return nullptr;
return produceOutput();
}
}
}
Expand Down Expand Up @@ -1021,10 +1018,9 @@ RowVectorPtr MergeJoin::doGetOutput() {
index_ = firstNonNull(input_, leftKeys_, index_ + 1);
}

if (index_ == input_->size()) {
// Ran out of rows on the left side.
if (finishedLeftBatch()) {
input_ = nullptr;
return nullptr;
return produceOutput();
}
compareResult = compare();
}
Expand All @@ -1042,17 +1038,15 @@ RowVectorPtr MergeJoin::doGetOutput() {
if (outputSize_ == outputBatchSize_) {
return std::move(output_);
}

addOutputRowForRightJoin(rightInput_, rightIndex_);
++rightIndex_;
} else {
rightIndex_ = firstNonNull(rightInput_, rightKeys_, rightIndex_ + 1);
}

if (rightIndex_ == rightInput_->size()) {
// Ran out of rows on the right side.
if (finishedRightBatch()) {
rightInput_ = nullptr;
return nullptr;
return produceOutput();
}
compareResult = compare();
}
Expand Down Expand Up @@ -1105,7 +1099,7 @@ RowVectorPtr MergeJoin::doGetOutput() {
rightIndex_ = firstNonNull(rightInput_, rightKeys_, endRightIndex);
}

if (rightIndex_ == rightInput_->size()) {
if (finishedRightBatch()) {
// Ran out of rows on the right side.
rightInput_ = nullptr;
}
Expand Down
20 changes: 20 additions & 0 deletions velox/exec/MergeJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,26 @@ class MergeJoin : public Operator {
const RowVectorPtr& right,
vector_size_t rightIndex);

/// If all rows from the current left batch have been processed.
bool finishedLeftBatch() const {
return index_ == input_->size();
}

/// If all rows from the current right batch have been processed.
bool finishedRightBatch() const {
return rightIndex_ == rightInput_->size();
}

/// Properly resizes and produces the current output vector if one is
/// available.
RowVectorPtr produceOutput() {
if (output_) {
output_->resize(outputSize_);
return std::move(output_);
}
return nullptr;
}

/// Evaluates join filter on 'filterInput_' and returns 'output' that contains
/// a subset of rows on which the filter passed. Returns nullptr if no rows
/// passed the filter.
Expand Down
159 changes: 115 additions & 44 deletions velox/exec/tests/MergeJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,87 @@ class MergeJoinTest : public HiveConnectorTestBase {
return params;
}

std::vector<RowVectorPtr> generateInput(const std::vector<VectorPtr>& keys) {
std::vector<RowVectorPtr> data;
data.reserve(keys.size());
vector_size_t startRow = 0;

for (const auto& key : keys) {
auto payload = makeFlatVector<int32_t>(
key->size(), [startRow](auto row) { return (startRow + row) * 10; });
data.push_back(makeRowVector({key, payload}));
startRow += key->size();
}
return data;
}

// Lazy vector loader class to ensure they get loaded in the correct order,
// and only once.
class MySimpleVectorLoader : public VectorLoader {
public:
explicit MySimpleVectorLoader(
size_t batchId,
const std::shared_ptr<size_t>& count,
std::function<VectorPtr(RowSet)> loader)
: batchId_(batchId), count_(count), loader_(loader) {}

void loadInternal(
RowSet rows,
ValueHook* hook,
vector_size_t resultSize,
VectorPtr* result) override {
if (batchId_ > *count_) {
*count_ = batchId_;
}
VELOX_CHECK_GE(batchId_, *count_, "Lazy vectors loaded out of order.");
VELOX_CHECK(!loaded_, "Trying to load a lazy vector twice.");
*result = loader_(rows);
loaded_ = true;
}

private:
const size_t batchId_;
const std::shared_ptr<size_t> count_;
bool loaded_{false};
std::function<VectorPtr(RowSet)> loader_;
};

// Generates lazy vectors to ensure the merge join operator is loading them as
// expected.
std::vector<RowVectorPtr> generateLazyInput(
const std::vector<VectorPtr>& keys) {
std::vector<RowVectorPtr> data;
data.reserve(keys.size());
vector_size_t startRow = 0;

size_t batchId = 0;
auto counter = std::make_shared<size_t>(0);

for (const auto& key : keys) {
auto payload = std::make_shared<LazyVector>(
pool(),
CppToType<int32_t>::create(),
key->size(),
std::make_unique<MySimpleVectorLoader>(batchId, counter, [=](RowSet) {
return makeFlatVector<int32_t>(key->size(), [startRow](auto row) {
return (startRow + row) * 10;
});
}));

auto lazyKeys = std::make_shared<LazyVector>(
pool(),
CppToType<int32_t>::create(),
key->size(),
std::make_unique<MySimpleVectorLoader>(
batchId, counter, [=](RowSet) { return key; }));

data.push_back(makeRowVector({lazyKeys, payload}));
startRow += key->size();
++batchId;
}
return data;
}

template <typename T>
void testJoin(
std::function<T(vector_size_t /*row*/)> leftKeyAt,
Expand Down Expand Up @@ -99,38 +180,18 @@ class MergeJoinTest : public HiveConnectorTestBase {
void testJoin(
const std::vector<VectorPtr>& leftKeys,
const std::vector<VectorPtr>& rightKeys) {
std::vector<RowVectorPtr> left;
left.reserve(leftKeys.size());
vector_size_t startRow = 0;
for (const auto& key : leftKeys) {
auto payload = makeFlatVector<int32_t>(
key->size(), [startRow](auto row) { return (startRow + row) * 10; });
left.push_back(makeRowVector({key, payload}));
startRow += key->size();
}

std::vector<RowVectorPtr> right;
right.reserve(rightKeys.size());
startRow = 0;
for (const auto& key : rightKeys) {
auto payload = makeFlatVector<int32_t>(
key->size(), [startRow](auto row) { return (startRow + row) * 20; });
right.push_back(makeRowVector({key, payload}));
startRow += key->size();
}

createDuckDbTable("t", left);
createDuckDbTable("u", right);
createDuckDbTable("t", generateInput(leftKeys));
createDuckDbTable("u", generateInput(rightKeys));

// Test INNER join.
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.project({"c1 AS u_c1", "c0 AS u_c0"})
.planNode(),
"",
Expand All @@ -156,12 +217,12 @@ class MergeJoinTest : public HiveConnectorTestBase {
// Test LEFT join.
planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto leftPlan = PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.project({"c1 as u_c1", "c0 as u_c0"})
.planNode(),
"",
Expand All @@ -187,12 +248,12 @@ class MergeJoinTest : public HiveConnectorTestBase {
// Test RIGHT join.
planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto rightPlan = PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.project({"c1 as u_c1", "c0 as u_c0"})
.planNode(),
"",
Expand Down Expand Up @@ -222,12 +283,12 @@ class MergeJoinTest : public HiveConnectorTestBase {
// Test FULL join.
planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto fullPlan = PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.project({"c1 as u_c1", "c0 as u_c0"})
.planNode(),
"",
Expand Down Expand Up @@ -282,10 +343,15 @@ TEST_F(MergeJoinTest, allRowsMatch) {
makeFlatVector<int32_t>(7, [](auto /* row */) { return 5; })};

testJoin(leftKeys, rightKeys);

testJoin(rightKeys, leftKeys);
}

TEST_F(MergeJoinTest, keySkew) {
testJoin<int32_t>(
[](auto row) { return row; },
[](auto row) { return row < 10 ? row : row + 10240; });
}

TEST_F(MergeJoinTest, aggregationOverJoin) {
auto left =
makeRowVector({"t_c0"}, {makeFlatVector<int32_t>({1, 2, 3, 4, 5})});
Expand Down Expand Up @@ -613,25 +679,30 @@ TEST_F(MergeJoinTest, numDrivers) {
}

TEST_F(MergeJoinTest, lazyVectors) {
// a dataset of multiple row groups with multiple columns. We create
// A dataset of multiple row groups with multiple columns. We create
// different dictionary wrappings for different columns and load the
// rows in scope at different times. We make 11000 repeats of 300
// followed by ascending rows. These will hits one 300 from the
// rows in scope at different times. We make 11000 repeats of 300
// followed by ascending rows. These will hit one 300 from the
// right side and cover more than one batch, so that we test lazy
// loading where we buffer multiple batches of input.
auto leftVectors = makeRowVector(
{makeFlatVector<int32_t>(
30'000, [](auto row) { return row < 11000 ? 300 : row; }),
makeFlatVector<int64_t>(30'000, [](auto row) { return row % 23; }),
makeFlatVector<int32_t>(30'000, [](auto row) { return row % 31; }),
makeFlatVector<StringView>(30'000, [](auto row) {
return StringView::makeInline(fmt::format("{} string", row % 43));
})});
auto leftVectors = makeRowVector({
makeFlatVector<int32_t>(
30'000, [](auto row) { return row < 11000 ? 300 : row; }),
makeFlatVector<int64_t>(30'000, [](auto row) { return row % 23; }),
makeFlatVector<int32_t>(30'000, [](auto row) { return row % 31; }),
makeFlatVector<StringView>(
30'000,
[](auto row) {
return StringView::makeInline(fmt::format("{} string", row % 43));
}),
});

auto rightVectors = makeRowVector(
{"rc0", "rc1"},
{makeFlatVector<int32_t>(10'000, [](auto row) { return row * 3; }),
makeFlatVector<int64_t>(10'000, [](auto row) { return row % 31; })});
{
makeFlatVector<int32_t>(10'000, [](auto row) { return row * 3; }),
makeFlatVector<int64_t>(10'000, [](auto row) { return row % 31; }),
});

auto leftFile = TempFilePath::create();
writeToFile(leftFile->getPath(), leftVectors);
Expand Down

0 comments on commit 82021a2

Please sign in to comment.