Skip to content

Commit

Permalink
Code cleanup to use concept serial / parallel to replace single-threa…
Browse files Browse the repository at this point in the history
…ded / multi-threaded as task execution modes (facebookincubator#10792)

Summary:
A code cleanup for task execution mode concepts, to uniformly use `serial` / `parallel`. Remove usages of `single-threaded` / `multi-threaded`.

Fixes facebookincubator#10745

Pull Request resolved: facebookincubator#10792

Reviewed By: Yuhta, bikramSingh91

Differential Revision: D61956385

Pulled By: xiaoxmeng

fbshipit-source-id: c601451cc5059aaec304d9e7c34506856f51fbdf
  • Loading branch information
zhztheplayer authored and facebook-github-bot committed Aug 30, 2024
1 parent 7af17de commit ede7d0b
Show file tree
Hide file tree
Showing 17 changed files with 86 additions and 89 deletions.
4 changes: 2 additions & 2 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ class SharedArbitrator : public memory::MemoryArbitrator {

// Invoked to run global arbitration to reclaim free or used memory from the
// other queries. The global arbitration run is protected by the exclusive
// lock of 'arbitrationLock_' for serial execution. The function returns true
// on success, false on failure.
// lock of 'arbitrationLock_' for serial execution mode. The function returns
// true on success, false on failure.
bool runGlobalArbitration(ArbitrationOperation* op);

// Gets the mim/max memory capacity growth targets for 'op'. The min and max
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) {
}

// This test verifies local arbitration runs from the same query has to wait for
// serial execution.
// serial execution mode.
DEBUG_ONLY_TEST_F(
MockSharedArbitrationTest,
localArbitrationRunsFromSameQuery) {
Expand Down
46 changes: 23 additions & 23 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,26 +305,26 @@ class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase {
};

namespace {
std::unique_ptr<folly::Executor> newMultiThreadedExecutor() {
std::unique_ptr<folly::Executor> newParallelExecutor() {
return std::make_unique<folly::CPUThreadPoolExecutor>(32);
}

struct TestParam {
bool isSingleThreaded{false};
bool isSerialExecutionMode{false};
};
} // namespace

/// A test fixture that runs cases within multi-threaded execution mode.
/// A test fixture that runs cases within parallel execution mode.
class SharedArbitrationTest : public SharedArbitrationTestBase {
protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
executor_ = newMultiThreadedExecutor();
executor_ = newParallelExecutor();
}
};
/// A test fixture that runs cases within both single-threaded and
/// multi-threaded execution modes.
class SharedArbitrationTestWithThreadingModes
/// A test fixture that runs cases within both serial and
/// parallel execution modes.
class SharedArbitrationTestWithExecutionModes
: public testing::WithParamInterface<TestParam>,
public SharedArbitrationTestBase {
public:
Expand All @@ -335,15 +335,15 @@ class SharedArbitrationTestWithThreadingModes
protected:
void SetUp() override {
SharedArbitrationTestBase::SetUp();
isSingleThreaded_ = GetParam().isSingleThreaded;
if (isSingleThreaded_) {
isSerialExecutionMode_ = GetParam().isSerialExecutionMode;
if (isSerialExecutionMode_) {
executor_ = nullptr;
} else {
executor_ = newMultiThreadedExecutor();
executor_ = newParallelExecutor();
}
}

bool isSingleThreaded_{false};
bool isSerialExecutionMode_{false};
};

DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) {
Expand Down Expand Up @@ -525,7 +525,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) {
ASSERT_EQ(taskPausedCount, 1);
}

DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) {
DEBUG_ONLY_TEST_P(SharedArbitrationTestWithExecutionModes, reclaimToOrderBy) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
Expand Down Expand Up @@ -590,7 +590,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(orderByQueryCtx)
.singleThreaded(isSingleThreaded_)
.serialExecution(isSerialExecutionMode_)
.plan(PlanBuilder()
.values(vectors)
.orderBy({"c0 ASC NULLS LAST"}, false)
Expand All @@ -607,7 +607,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) {
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.serialExecution(isSerialExecutionMode_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -628,7 +628,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) {
}

DEBUG_ONLY_TEST_P(
SharedArbitrationTestWithThreadingModes,
SharedArbitrationTestWithExecutionModes,
reclaimToAggregation) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
Expand Down Expand Up @@ -694,7 +694,7 @@ DEBUG_ONLY_TEST_P(
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(aggregationQueryCtx)
.singleThreaded(isSingleThreaded_)
.serialExecution(isSerialExecutionMode_)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0", "c1"}, {"array_agg(c2)"})
Expand All @@ -712,7 +712,7 @@ DEBUG_ONLY_TEST_P(
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.serialExecution(isSerialExecutionMode_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand All @@ -733,7 +733,7 @@ DEBUG_ONLY_TEST_P(
}

DEBUG_ONLY_TEST_P(
SharedArbitrationTestWithThreadingModes,
SharedArbitrationTestWithExecutionModes,
reclaimToJoinBuilder) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
Expand Down Expand Up @@ -800,7 +800,7 @@ DEBUG_ONLY_TEST_P(
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(joinQueryCtx)
.singleThreaded(isSingleThreaded_)
.serialExecution(isSerialExecutionMode_)
.plan(PlanBuilder(planNodeIdGenerator)
.values(vectors)
.project({"c0 AS t0", "c1 AS t1", "c2 AS t2"})
Expand Down Expand Up @@ -828,7 +828,7 @@ DEBUG_ONLY_TEST_P(
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(fakeMemoryQueryCtx)
.singleThreaded(isSingleThreaded_)
.serialExecution(isSerialExecutionMode_)
.plan(PlanBuilder()
.values(vectors)
.addNode([&](std::string id, core::PlanNodePtr input) {
Expand Down Expand Up @@ -1407,10 +1407,10 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) {
}

VELOX_INSTANTIATE_TEST_SUITE_P(
SharedArbitrationTestWithThreadingModes,
SharedArbitrationTestWithThreadingModes,
SharedArbitrationTestWithExecutionModes,
SharedArbitrationTestWithExecutionModes,
testing::ValuesIn(
SharedArbitrationTestWithThreadingModes::getTestParams()));
SharedArbitrationTestWithExecutionModes::getTestParams()));
} // namespace facebook::velox::memory

int main(int argc, char** argv) {
Expand Down
1 change: 0 additions & 1 deletion velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {

folly::Executor* executor() const {
return executor_;
;
}

bool isExecutorSupplied() const {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ StopReason Driver::runInternal(
result->estimateFlatSize(), result->size());
}

// This code path is used only in single-threaded execution.
// This code path is used only in serial execution mode.
blockingReason_ = BlockingReason::kWaitForConsumer;
guard.notThrown();
return StopReason::kBlock;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class BlockingState {
}

/// Moves out the blocking future stored inside. Can be called only once.
/// Used in single-threaded execution.
/// Used in serial execution mode.
ContinueFuture future() {
return std::move(future_);
}
Expand Down Expand Up @@ -616,7 +616,7 @@ struct DriverFactory {

static void registerAdapter(DriverAdapter adapter);

bool supportsSingleThreadedExecution() const {
bool supportsSerialExecution() const {
return !needsPartitionedOutput() && !needsExchangeClient() &&
!needsLocalExchange();
}
Expand Down
16 changes: 8 additions & 8 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ velox::memory::MemoryPool* Task::addExchangeClientPool(
return childPools_.back().get();
}

bool Task::supportsSingleThreadedExecution() const {
bool Task::supportSerialExecutionMode() const {
if (consumerSupplier_) {
return false;
}
Expand All @@ -560,7 +560,7 @@ bool Task::supportsSingleThreadedExecution() const {
planFragment_, nullptr, &driverFactories, queryCtx_->queryConfig(), 1);

for (const auto& factory : driverFactories) {
if (!factory->supportsSingleThreadedExecution()) {
if (!factory->supportsSerialExecution()) {
return false;
}
}
Expand All @@ -570,18 +570,18 @@ bool Task::supportsSingleThreadedExecution() const {

RowVectorPtr Task::next(ContinueFuture* future) {
checkExecutionMode(ExecutionMode::kSerial);
// NOTE: Task::next() is single-threaded execution so locking is not required
// NOTE: Task::next() is serial execution so locking is not required
// to access Task object.
VELOX_CHECK_EQ(
core::ExecutionStrategy::kUngrouped,
planFragment_.executionStrategy,
"Single-threaded execution supports only ungrouped execution");
"Serial execution mode supports only ungrouped execution");

if (!splitsStates_.empty()) {
for (const auto& it : splitsStates_) {
VELOX_CHECK(
it.second.noMoreSplits,
"Single-threaded execution requires all splits to be added before "
"Serial execution mode requires all splits to be added before "
"calling Task::next().");
}
}
Expand All @@ -595,7 +595,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {
if (driverFactories_.empty()) {
VELOX_CHECK_NULL(
consumerSupplier_,
"Single-threaded execution doesn't support delivering results to a "
"Serial execution mode doesn't support delivering results to a "
"callback");

taskStats_.executionStartTimeMs = getCurrentTimeMs();
Expand All @@ -605,7 +605,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {

// In Task::next() we always assume ungrouped execution.
for (const auto& factory : driverFactories_) {
VELOX_CHECK(factory->supportsSingleThreadedExecution());
VELOX_CHECK(factory->supportsSerialExecution());
numDriversUngrouped_ += factory->numDrivers;
numTotalDrivers_ += factory->numTotalDrivers;
taskStats_.pipelineStats.emplace_back(
Expand Down Expand Up @@ -943,7 +943,7 @@ void Task::resume(std::shared_ptr<Task> self) {
// event. The Driver gets enqueued by the promise realization.
//
// Do not continue the driver if no executor is supplied,
// This usually happens in single-threaded execution.
// This usually happens in serial execution mode.
Driver::enqueue(driver);
}
}
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ class Task : public std::enable_shared_from_this<Task> {
/// splits groups processed concurrently.
void start(uint32_t maxDrivers, uint32_t concurrentSplitGroups = 1);

/// If this returns true, this Task supports the single-threaded execution API
/// If this returns true, this Task supports the serial execution API
/// next().
bool supportsSingleThreadedExecution() const;
bool supportSerialExecutionMode() const;

/// Single-threaded execution API. Runs the query and returns results one
/// batch at a time. Returns nullptr if query evaluation is finished and no
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/AssertQueryBuilderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ TEST_F(AssertQueryBuilderTest, basic) {
.assertResults(data);
}

TEST_F(AssertQueryBuilderTest, singleThreaded) {
TEST_F(AssertQueryBuilderTest, serialExecution) {
auto data = makeRowVector({makeFlatVector<int32_t>({1, 2, 3})});

PlanBuilder builder;
const auto& plan = builder.values({data}).planNode();

AssertQueryBuilder(plan, duckDbQueryRunner_)
.singleThreaded(true)
.serialExecution(true)
.assertResults("VALUES (1), (2), (3)");

AssertQueryBuilder(plan).singleThreaded(true).assertResults(data);
AssertQueryBuilder(plan).serialExecution(true).assertResults(data);
}

TEST_F(AssertQueryBuilderTest, orderedResults) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6658,7 +6658,7 @@ TEST_F(HashJoinTest, leftJoinPreserveProbeOrder) {
.planNode();
auto result = AssertQueryBuilder(plan)
.config(core::QueryConfig::kPreferredOutputBatchRows, "1")
.singleThreaded(true)
.serialExecution(true)
.copyResults(pool_.get());
ASSERT_EQ(result->size(), 3);
auto* v1 =
Expand Down
18 changes: 9 additions & 9 deletions velox/exec/tests/QueryAssertionsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ class QueryAssertionsTest : public OperatorTestBase {
void assertQueryWithThreadingConfigs(
const core::PlanNodePtr& plan,
const std::string& duckDbSql) {
CursorParameters multiThreadedParams{};
multiThreadedParams.planNode = plan;
multiThreadedParams.singleThreaded = false;
assertQuery(multiThreadedParams, duckDbSql);

CursorParameters singleThreadedParams{};
singleThreadedParams.planNode = plan;
singleThreadedParams.singleThreaded = true;
assertQuery(singleThreadedParams, duckDbSql);
CursorParameters parallelParams{};
parallelParams.planNode = plan;
parallelParams.serialExecution = false;
assertQuery(parallelParams, duckDbSql);

CursorParameters serialParams{};
serialParams.planNode = plan;
serialParams.serialExecution = true;
assertQuery(serialParams, duckDbSql);
}
};

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3277,7 +3277,7 @@ TEST_F(TableScanTest, remainingFilterLazyWithMultiReferences) {
writeToFile(file->getPath(), {vector});
CursorParameters params;
params.copyResult = false;
params.singleThreaded = true;
params.serialExecution = true;
params.planNode =
PlanBuilder()
.tableScan(schema, {}, "NOT (c0 % 2 == 0 AND c2 % 3 == 0)")
Expand Down Expand Up @@ -3318,7 +3318,7 @@ TEST_F(
writeToFile(file->getPath(), {vector});
CursorParameters params;
params.copyResult = false;
params.singleThreaded = true;
params.serialExecution = true;
params.planNode = PlanBuilder()
.tableScan(schema, {}, "c0 % 7 == 0 AND c1 % 2 == 0")
.planNode();
Expand Down
Loading

0 comments on commit ede7d0b

Please sign in to comment.