Skip to content

Commit

Permalink
Add test parameter isSingleThreaded in SharedArbitratorTest.cpp (face…
Browse files Browse the repository at this point in the history
…bookincubator#10730)

Summary:
Separated from facebookincubator#10600

Make the 3 cases parameterized:

SharedArbitrationTest.reclaimToJoinBuilder
SharedArbitrationTest.reclaimToAggregation
SharedArbitrationTest.reclaimToOrderBy

In the patch we still run them with `isSingleThreaded=false` only. `isSingleThreaded=true` will be toggled on in facebookincubator#10600 with essential code fixes.

Pull Request resolved: facebookincubator#10730

Reviewed By: tanjialiang

Differential Revision: D61176283

Pulled By: xiaoxmeng

fbshipit-source-id: 19a2565a29cbea1fb741e95c65ebc6549338936f
  • Loading branch information
zhztheplayer authored and facebook-github-bot committed Aug 13, 2024
1 parent 83ff1dd commit 2abfadf
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 9 deletions.
69 changes: 63 additions & 6 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class FakeMemoryOperatorFactory : public Operator::PlanNodeTranslator {
uint32_t maxDrivers_{1};
};

class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase {
protected:
static void SetUpTestCase() {
exec::test::HiveConnectorTestBase::SetUpTestCase();
Expand All @@ -252,7 +252,6 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
fuzzerOpts_.stringLength = 1024;
fuzzerOpts_.allowLazyVector = false;
vector_ = makeRowVector(rowType_, fuzzerOpts_);
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(32);
numAddedPools_ = 0;
}

Expand Down Expand Up @@ -298,13 +297,55 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {

static inline FakeMemoryOperatorFactory* fakeOperatorFactory_;
std::unique_ptr<memory::MemoryManager> memoryManager_;
SharedArbitrator* arbitrator_;
SharedArbitrator* arbitrator_{nullptr};
RowTypePtr rowType_;
VectorFuzzer::Options fuzzerOpts_;
RowVectorPtr vector_;
std::atomic_uint64_t numAddedPools_{0};
};

namespace {
std::unique_ptr<folly::Executor> newMultiThreadedExecutor() {
return std::make_unique<folly::CPUThreadPoolExecutor>(32);
}

struct TestParam {
bool isSingleThreaded{false};
};
} // namespace

/// A test fixture that runs cases within multi-threaded execution mode.
class SharedArbitrationTest : public SharedArbitrationTestBase {
protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
executor_ = newMultiThreadedExecutor();
}
};
/// A test fixture that runs cases within both single-threaded and
/// multi-threaded execution modes.
class SharedArbitrationTestWithThreadingModes
: public testing::WithParamInterface<TestParam>,
public SharedArbitrationTestBase {
public:
static std::vector<TestParam> getTestParams() {
return std::vector<TestParam>({{false}});
}

protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
isSingleThreaded_ = GetParam().isSingleThreaded;
if (isSingleThreaded_) {
executor_ = nullptr;
} else {
executor_ = newMultiThreadedExecutor();
}
}

bool isSingleThreaded_{false};
};

DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) {
const std::vector<RowVectorPtr> vectors =
createVectors(rowType_, 32, 32 << 20);
Expand Down Expand Up @@ -430,7 +471,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) {
ASSERT_EQ(taskPausedCount, 1);
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -495,6 +536,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(orderByQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.orderBy({"c0 ASC NULLS LAST"}, false)
Expand All @@ -511,6 +553,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -530,7 +573,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
}
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
DEBUG_ONLY_TEST_P(
SharedArbitrationTestWithThreadingModes,
reclaimToAggregation) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -595,6 +640,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(aggregationQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0", "c1"}, {"array_agg(c2)"})
Expand All @@ -612,6 +658,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -631,7 +678,9 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
}
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
DEBUG_ONLY_TEST_P(
SharedArbitrationTestWithThreadingModes,
reclaimToJoinBuilder) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -697,6 +746,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(joinQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder(planNodeIdGenerator)
.values(vectors)
.project({"c0 AS t0", "c1 AS t1", "c2 AS t2"})
Expand Down Expand Up @@ -724,6 +774,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand Down Expand Up @@ -1300,6 +1351,12 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) {
ASSERT_EQ(arbitrator_->stats().numShrinks, numRootPools);
}
}

VELOX_INSTANTIATE_TEST_SUITE_P(
SharedArbitrationTestWithThreadingModes,
SharedArbitrationTestWithThreadingModes,
testing::ValuesIn(
SharedArbitrationTestWithThreadingModes::getTestParams()));
} // namespace facebook::velox::memory

int main(int argc, char** argv) {
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/tests/utils/AssertQueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ AssertQueryBuilder& AssertQueryBuilder::destination(int32_t destination) {
}

AssertQueryBuilder& AssertQueryBuilder::singleThreaded(bool singleThreaded) {
params_.singleThreaded = singleThreaded;
if (singleThreaded) {
params_.singleThreaded = true;
executor_ = nullptr;
return *this;
}
params_.singleThreaded = false;
executor_ = newExecutor();
return *this;
}

Expand Down
8 changes: 6 additions & 2 deletions velox/exec/tests/utils/AssertQueryBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,13 @@ class AssertQueryBuilder {
std::pair<std::unique_ptr<TaskCursor>, std::vector<RowVectorPtr>>
readCursor();

static std::unique_ptr<folly::Executor> newExecutor() {
return std::make_unique<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency());
}

// Used by the created task as the default driver executor.
std::unique_ptr<folly::Executor> executor_{
new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())};
std::unique_ptr<folly::Executor> executor_{newExecutor()};
DuckDbQueryRunner* const duckDbQueryRunner_;
CursorParameters params_;
std::unordered_map<std::string, std::string> configs_;
Expand Down

0 comments on commit 2abfadf

Please sign in to comment.