Skip to content

Commit

Permalink
fix(merge-join): Produce output before advancing key comparison
Browse files Browse the repository at this point in the history
Summary:
Before we start reading keys from the next batch of input, we need to
make sure we are not holding output_ wrapped around lazy vector from the last
batch, since lazy vectors need to be materialized in order.

Differential Revision: D66169184
  • Loading branch information
pedroerp authored and facebook-github-bot committed Nov 20, 2024
1 parent 473902a commit 857143b
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 46 deletions.
14 changes: 12 additions & 2 deletions velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,6 @@ RowVectorPtr MergeJoin::doGetOutput() {
if (outputSize_ == outputBatchSize_) {
return std::move(output_);
}

addOutputRowForRightJoin(rightInput_, rightIndex_);

++rightIndex_;
Expand Down Expand Up @@ -945,6 +944,10 @@ RowVectorPtr MergeJoin::doGetOutput() {
if (index_ == input_->size()) {
// Ran out of rows on the left side.
input_ = nullptr;
if (output_) {
output_->resize(outputSize_);
return std::move(output_);
}
return nullptr;
}
}
Expand Down Expand Up @@ -1024,6 +1027,10 @@ RowVectorPtr MergeJoin::doGetOutput() {
if (index_ == input_->size()) {
// Ran out of rows on the left side.
input_ = nullptr;
if (output_) {
output_->resize(outputSize_);
return std::move(output_);
}
return nullptr;
}
compareResult = compare();
Expand All @@ -1042,7 +1049,6 @@ RowVectorPtr MergeJoin::doGetOutput() {
if (outputSize_ == outputBatchSize_) {
return std::move(output_);
}

addOutputRowForRightJoin(rightInput_, rightIndex_);
++rightIndex_;
} else {
Expand All @@ -1052,6 +1058,10 @@ RowVectorPtr MergeJoin::doGetOutput() {
if (rightIndex_ == rightInput_->size()) {
// Ran out of rows on the right side.
rightInput_ = nullptr;
if (output_) {
output_->resize(outputSize_);
return std::move(output_);
}
return nullptr;
}
compareResult = compare();
Expand Down
159 changes: 115 additions & 44 deletions velox/exec/tests/MergeJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,87 @@ class MergeJoinTest : public HiveConnectorTestBase {
return params;
}

std::vector<RowVectorPtr> generateInput(const std::vector<VectorPtr>& keys) {
std::vector<RowVectorPtr> data;
data.reserve(keys.size());
vector_size_t startRow = 0;

for (const auto& key : keys) {
auto payload = makeFlatVector<int32_t>(
key->size(), [startRow](auto row) { return (startRow + row) * 10; });
data.push_back(makeRowVector({key, payload}));
startRow += key->size();
}
return data;
}

// Lazy vector loader class to ensure they get loaded in the correct order,
// and only once.
class MySimpleVectorLoader : public VectorLoader {
public:
explicit MySimpleVectorLoader(
size_t batchId,
const std::shared_ptr<size_t>& count,
std::function<VectorPtr(RowSet)> loader)
: batchId_(batchId), count_(count), loader_(loader) {}

void loadInternal(
RowSet rows,
ValueHook* hook,
vector_size_t resultSize,
VectorPtr* result) override {
if (batchId_ > *count_) {
*count_ = batchId_;
}
VELOX_CHECK_GE(batchId_, *count_, "Lazy vectors loaded out of order.");
VELOX_CHECK(!loaded_, "Trying to load a lazy vector twice.");
*result = loader_(rows);
loaded_ = true;
}

private:
const size_t batchId_;
const std::shared_ptr<size_t> count_;
bool loaded_{false};
std::function<VectorPtr(RowSet)> loader_;
};

// Generates lazy vectors to ensure the merge join operator is loading them as
// expected.
std::vector<RowVectorPtr> generateLazyInput(
const std::vector<VectorPtr>& keys) {
std::vector<RowVectorPtr> data;
data.reserve(keys.size());
vector_size_t startRow = 0;

size_t batchId = 0;
auto counter = std::make_shared<size_t>(0);

for (const auto& key : keys) {
auto payload = std::make_shared<LazyVector>(
pool(),
CppToType<int32_t>::create(),
key->size(),
std::make_unique<MySimpleVectorLoader>(batchId, counter, [=](RowSet) {
return makeFlatVector<int32_t>(key->size(), [startRow](auto row) {
return (startRow + row) * 10;
});
}));

auto lazyKeys = std::make_shared<LazyVector>(
pool(),
CppToType<int32_t>::create(),
key->size(),
std::make_unique<MySimpleVectorLoader>(
batchId, counter, [=](RowSet) { return key; }));

data.push_back(makeRowVector({lazyKeys, payload}));
startRow += key->size();
++batchId;
}
return data;
}

template <typename T>
void testJoin(
std::function<T(vector_size_t /*row*/)> leftKeyAt,
Expand Down Expand Up @@ -99,38 +180,18 @@ class MergeJoinTest : public HiveConnectorTestBase {
void testJoin(
const std::vector<VectorPtr>& leftKeys,
const std::vector<VectorPtr>& rightKeys) {
std::vector<RowVectorPtr> left;
left.reserve(leftKeys.size());
vector_size_t startRow = 0;
for (const auto& key : leftKeys) {
auto payload = makeFlatVector<int32_t>(
key->size(), [startRow](auto row) { return (startRow + row) * 10; });
left.push_back(makeRowVector({key, payload}));
startRow += key->size();
}

std::vector<RowVectorPtr> right;
right.reserve(rightKeys.size());
startRow = 0;
for (const auto& key : rightKeys) {
auto payload = makeFlatVector<int32_t>(
key->size(), [startRow](auto row) { return (startRow + row) * 20; });
right.push_back(makeRowVector({key, payload}));
startRow += key->size();
}

createDuckDbTable("t", left);
createDuckDbTable("u", right);
createDuckDbTable("t", generateInput(leftKeys));
createDuckDbTable("u", generateInput(rightKeys));

// Test INNER join.
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.project({"c1 AS u_c1", "c0 AS u_c0"})
.planNode(),
"",
Expand All @@ -156,12 +217,12 @@ class MergeJoinTest : public HiveConnectorTestBase {
// Test LEFT join.
planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto leftPlan = PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.project({"c1 as u_c1", "c0 as u_c0"})
.planNode(),
"",
Expand All @@ -187,12 +248,12 @@ class MergeJoinTest : public HiveConnectorTestBase {
// Test RIGHT join.
planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto rightPlan = PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.project({"c1 as u_c1", "c0 as u_c0"})
.planNode(),
"",
Expand Down Expand Up @@ -222,12 +283,12 @@ class MergeJoinTest : public HiveConnectorTestBase {
// Test FULL join.
planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto fullPlan = PlanBuilder(planNodeIdGenerator)
.values(right)
.values(generateLazyInput(rightKeys))
.mergeJoin(
{"c0"},
{"u_c0"},
PlanBuilder(planNodeIdGenerator)
.values(left)
.values(generateLazyInput(leftKeys))
.project({"c1 as u_c1", "c0 as u_c0"})
.planNode(),
"",
Expand Down Expand Up @@ -282,10 +343,15 @@ TEST_F(MergeJoinTest, allRowsMatch) {
makeFlatVector<int32_t>(7, [](auto /* row */) { return 5; })};

testJoin(leftKeys, rightKeys);

testJoin(rightKeys, leftKeys);
}

TEST_F(MergeJoinTest, keySkew) {
testJoin<int32_t>(
[](auto row) { return row; },
[](auto row) { return row < 10 ? row : row + 10240; });
}

TEST_F(MergeJoinTest, aggregationOverJoin) {
auto left =
makeRowVector({"t_c0"}, {makeFlatVector<int32_t>({1, 2, 3, 4, 5})});
Expand Down Expand Up @@ -613,25 +679,30 @@ TEST_F(MergeJoinTest, numDrivers) {
}

TEST_F(MergeJoinTest, lazyVectors) {
// a dataset of multiple row groups with multiple columns. We create
// A dataset of multiple row groups with multiple columns. We create
// different dictionary wrappings for different columns and load the
// rows in scope at different times. We make 11000 repeats of 300
// followed by ascending rows. These will hits one 300 from the
// rows in scope at different times. We make 11000 repeats of 300
// followed by ascending rows. These will hit one 300 from the
// right side and cover more than one batch, so that we test lazy
// loading where we buffer multiple batches of input.
auto leftVectors = makeRowVector(
{makeFlatVector<int32_t>(
30'000, [](auto row) { return row < 11000 ? 300 : row; }),
makeFlatVector<int64_t>(30'000, [](auto row) { return row % 23; }),
makeFlatVector<int32_t>(30'000, [](auto row) { return row % 31; }),
makeFlatVector<StringView>(30'000, [](auto row) {
return StringView::makeInline(fmt::format("{} string", row % 43));
})});
auto leftVectors = makeRowVector({
makeFlatVector<int32_t>(
30'000, [](auto row) { return row < 11000 ? 300 : row; }),
makeFlatVector<int64_t>(30'000, [](auto row) { return row % 23; }),
makeFlatVector<int32_t>(30'000, [](auto row) { return row % 31; }),
makeFlatVector<StringView>(
30'000,
[](auto row) {
return StringView::makeInline(fmt::format("{} string", row % 43));
}),
});

auto rightVectors = makeRowVector(
{"rc0", "rc1"},
{makeFlatVector<int32_t>(10'000, [](auto row) { return row * 3; }),
makeFlatVector<int64_t>(10'000, [](auto row) { return row % 31; })});
{
makeFlatVector<int32_t>(10'000, [](auto row) { return row * 3; }),
makeFlatVector<int64_t>(10'000, [](auto row) { return row % 31; }),
});

auto leftFile = TempFilePath::create();
writeToFile(leftFile->getPath(), leftVectors);
Expand Down

0 comments on commit 857143b

Please sign in to comment.