diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 665cae71ce12..6c784f5373f2 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -345,8 +345,8 @@ class SharedArbitrator : public memory::MemoryArbitrator { // Invoked to run global arbitration to reclaim free or used memory from the // other queries. The global arbitration run is protected by the exclusive - // lock of 'arbitrationLock_' for serial execution. The function returns true - // on success, false on failure. + // lock of 'arbitrationLock_' for serial execution mode. The function returns + // true on success, false on failure. bool runGlobalArbitration(ArbitrationOperation* op); // Gets the mim/max memory capacity growth targets for 'op'. The min and max diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 59e14c460691..e2ddafccb726 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -1117,7 +1117,7 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) { } // This test verifies local arbitration runs from the same query has to wait for -// serial execution. +// serial execution mode. DEBUG_ONLY_TEST_F( MockSharedArbitrationTest, localArbitrationRunsFromSameQuery) { diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 281246bc8353..3dd90d2b437b 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -305,26 +305,26 @@ class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase { }; namespace { -std::unique_ptr newMultiThreadedExecutor() { +std::unique_ptr newParallelExecutor() { return std::make_unique(32); } struct TestParam { - bool isSingleThreaded{false}; + bool isSerialExecutionMode{false}; }; } // namespace -/// A test fixture that runs cases within multi-threaded execution mode. +/// A test fixture that runs cases within parallel execution mode. class SharedArbitrationTest : public SharedArbitrationTestBase { protected: void SetUp() override { SharedArbitrationTestBase::SetUp(); - executor_ = newMultiThreadedExecutor(); + executor_ = newParallelExecutor(); } }; -/// A test fixture that runs cases within both single-threaded and -/// multi-threaded execution modes. -class SharedArbitrationTestWithThreadingModes +/// A test fixture that runs cases within both serial and +/// parallel execution modes. +class SharedArbitrationTestWithExecutionModes : public testing::WithParamInterface, public SharedArbitrationTestBase { public: @@ -335,15 +335,15 @@ class SharedArbitrationTestWithThreadingModes protected: void SetUp() override { SharedArbitrationTestBase::SetUp(); - isSingleThreaded_ = GetParam().isSingleThreaded; - if (isSingleThreaded_) { + isSerialExecutionMode_ = GetParam().isSerialExecutionMode; + if (isSerialExecutionMode_) { executor_ = nullptr; } else { - executor_ = newMultiThreadedExecutor(); + executor_ = newParallelExecutor(); } } - bool isSingleThreaded_{false}; + bool isSerialExecutionMode_{false}; }; DEBUG_ONLY_TEST_F(SharedArbitrationTest, queryArbitrationStateCheck) { @@ -525,7 +525,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, skipNonReclaimableTaskTest) { ASSERT_EQ(taskPausedCount, 1); } -DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) { +DEBUG_ONLY_TEST_P(SharedArbitrationTestWithExecutionModes, reclaimToOrderBy) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { @@ -590,7 +590,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(orderByQueryCtx) - .singleThreaded(isSingleThreaded_) + .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() .values(vectors) .orderBy({"c0 ASC NULLS LAST"}, false) @@ -607,7 +607,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) { auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) - .singleThreaded(isSingleThreaded_) + .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -628,7 +628,7 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, reclaimToOrderBy) { } DEBUG_ONLY_TEST_P( - SharedArbitrationTestWithThreadingModes, + SharedArbitrationTestWithExecutionModes, reclaimToAggregation) { const int numVectors = 32; std::vector vectors; @@ -694,7 +694,7 @@ DEBUG_ONLY_TEST_P( auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(aggregationQueryCtx) - .singleThreaded(isSingleThreaded_) + .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() .values(vectors) .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) @@ -712,7 +712,7 @@ DEBUG_ONLY_TEST_P( auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) - .singleThreaded(isSingleThreaded_) + .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -733,7 +733,7 @@ DEBUG_ONLY_TEST_P( } DEBUG_ONLY_TEST_P( - SharedArbitrationTestWithThreadingModes, + SharedArbitrationTestWithExecutionModes, reclaimToJoinBuilder) { const int numVectors = 32; std::vector vectors; @@ -800,7 +800,7 @@ DEBUG_ONLY_TEST_P( auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(joinQueryCtx) - .singleThreaded(isSingleThreaded_) + .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder(planNodeIdGenerator) .values(vectors) .project({"c0 AS t0", "c1 AS t1", "c2 AS t2"}) @@ -828,7 +828,7 @@ DEBUG_ONLY_TEST_P( auto task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(fakeMemoryQueryCtx) - .singleThreaded(isSingleThreaded_) + .serialExecution(isSerialExecutionMode_) .plan(PlanBuilder() .values(vectors) .addNode([&](std::string id, core::PlanNodePtr input) { @@ -1407,10 +1407,10 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { } VELOX_INSTANTIATE_TEST_SUITE_P( - SharedArbitrationTestWithThreadingModes, - SharedArbitrationTestWithThreadingModes, + SharedArbitrationTestWithExecutionModes, + SharedArbitrationTestWithExecutionModes, testing::ValuesIn( - SharedArbitrationTestWithThreadingModes::getTestParams())); + SharedArbitrationTestWithExecutionModes::getTestParams())); } // namespace facebook::velox::memory int main(int argc, char** argv) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 7df296f1ff55..c2274cf4af44 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -65,7 +65,6 @@ class QueryCtx : public std::enable_shared_from_this { folly::Executor* executor() const { return executor_; - ; } bool isExecutorSupplied() const { diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index f511d247a117..bea69c0a352c 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -779,7 +779,7 @@ StopReason Driver::runInternal( result->estimateFlatSize(), result->size()); } - // This code path is used only in single-threaded execution. + // This code path is used only in serial execution mode. blockingReason_ = BlockingReason::kWaitForConsumer; guard.notThrown(); return StopReason::kBlock; diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7ceb696aab45..9410dd7d82e5 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -241,7 +241,7 @@ class BlockingState { } /// Moves out the blocking future stored inside. Can be called only once. - /// Used in single-threaded execution. + /// Used in serial execution mode. ContinueFuture future() { return std::move(future_); } @@ -616,7 +616,7 @@ struct DriverFactory { static void registerAdapter(DriverAdapter adapter); - bool supportsSingleThreadedExecution() const { + bool supportsSerialExecution() const { return !needsPartitionedOutput() && !needsExchangeClient() && !needsLocalExchange(); } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 50eccd815d60..9ae3661962e1 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -550,7 +550,7 @@ velox::memory::MemoryPool* Task::addExchangeClientPool( return childPools_.back().get(); } -bool Task::supportsSingleThreadedExecution() const { +bool Task::supportSerialExecutionMode() const { if (consumerSupplier_) { return false; } @@ -560,7 +560,7 @@ bool Task::supportsSingleThreadedExecution() const { planFragment_, nullptr, &driverFactories, queryCtx_->queryConfig(), 1); for (const auto& factory : driverFactories) { - if (!factory->supportsSingleThreadedExecution()) { + if (!factory->supportsSerialExecution()) { return false; } } @@ -570,18 +570,18 @@ bool Task::supportsSingleThreadedExecution() const { RowVectorPtr Task::next(ContinueFuture* future) { checkExecutionMode(ExecutionMode::kSerial); - // NOTE: Task::next() is single-threaded execution so locking is not required + // NOTE: Task::next() is serial execution so locking is not required // to access Task object. VELOX_CHECK_EQ( core::ExecutionStrategy::kUngrouped, planFragment_.executionStrategy, - "Single-threaded execution supports only ungrouped execution"); + "Serial execution mode supports only ungrouped execution"); if (!splitsStates_.empty()) { for (const auto& it : splitsStates_) { VELOX_CHECK( it.second.noMoreSplits, - "Single-threaded execution requires all splits to be added before " + "Serial execution mode requires all splits to be added before " "calling Task::next()."); } } @@ -595,7 +595,7 @@ RowVectorPtr Task::next(ContinueFuture* future) { if (driverFactories_.empty()) { VELOX_CHECK_NULL( consumerSupplier_, - "Single-threaded execution doesn't support delivering results to a " + "Serial execution mode doesn't support delivering results to a " "callback"); taskStats_.executionStartTimeMs = getCurrentTimeMs(); @@ -605,7 +605,7 @@ RowVectorPtr Task::next(ContinueFuture* future) { // In Task::next() we always assume ungrouped execution. for (const auto& factory : driverFactories_) { - VELOX_CHECK(factory->supportsSingleThreadedExecution()); + VELOX_CHECK(factory->supportsSerialExecution()); numDriversUngrouped_ += factory->numDrivers; numTotalDrivers_ += factory->numTotalDrivers; taskStats_.pipelineStats.emplace_back( @@ -943,7 +943,7 @@ void Task::resume(std::shared_ptr self) { // event. The Driver gets enqueued by the promise realization. // // Do not continue the driver if no executor is supplied, - // This usually happens in single-threaded execution. + // This usually happens in serial execution mode. Driver::enqueue(driver); } } diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 6a18d3fbcef1..4e24c0814463 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -165,9 +165,9 @@ class Task : public std::enable_shared_from_this { /// splits groups processed concurrently. void start(uint32_t maxDrivers, uint32_t concurrentSplitGroups = 1); - /// If this returns true, this Task supports the single-threaded execution API + /// If this returns true, this Task supports the serial execution API /// next(). - bool supportsSingleThreadedExecution() const; + bool supportSerialExecutionMode() const; /// Single-threaded execution API. Runs the query and returns results one /// batch at a time. Returns nullptr if query evaluation is finished and no diff --git a/velox/exec/tests/AssertQueryBuilderTest.cpp b/velox/exec/tests/AssertQueryBuilderTest.cpp index 01f6d73f8bdd..573d7349e509 100644 --- a/velox/exec/tests/AssertQueryBuilderTest.cpp +++ b/velox/exec/tests/AssertQueryBuilderTest.cpp @@ -34,17 +34,17 @@ TEST_F(AssertQueryBuilderTest, basic) { .assertResults(data); } -TEST_F(AssertQueryBuilderTest, singleThreaded) { +TEST_F(AssertQueryBuilderTest, serialExecution) { auto data = makeRowVector({makeFlatVector({1, 2, 3})}); PlanBuilder builder; const auto& plan = builder.values({data}).planNode(); AssertQueryBuilder(plan, duckDbQueryRunner_) - .singleThreaded(true) + .serialExecution(true) .assertResults("VALUES (1), (2), (3)"); - AssertQueryBuilder(plan).singleThreaded(true).assertResults(data); + AssertQueryBuilder(plan).serialExecution(true).assertResults(data); } TEST_F(AssertQueryBuilderTest, orderedResults) { diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index c07d462e5fd0..b76db56e1eff 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -6658,7 +6658,7 @@ TEST_F(HashJoinTest, leftJoinPreserveProbeOrder) { .planNode(); auto result = AssertQueryBuilder(plan) .config(core::QueryConfig::kPreferredOutputBatchRows, "1") - .singleThreaded(true) + .serialExecution(true) .copyResults(pool_.get()); ASSERT_EQ(result->size(), 3); auto* v1 = diff --git a/velox/exec/tests/QueryAssertionsTest.cpp b/velox/exec/tests/QueryAssertionsTest.cpp index b41aea713515..358ee59d4e77 100644 --- a/velox/exec/tests/QueryAssertionsTest.cpp +++ b/velox/exec/tests/QueryAssertionsTest.cpp @@ -32,15 +32,15 @@ class QueryAssertionsTest : public OperatorTestBase { void assertQueryWithThreadingConfigs( const core::PlanNodePtr& plan, const std::string& duckDbSql) { - CursorParameters multiThreadedParams{}; - multiThreadedParams.planNode = plan; - multiThreadedParams.singleThreaded = false; - assertQuery(multiThreadedParams, duckDbSql); - - CursorParameters singleThreadedParams{}; - singleThreadedParams.planNode = plan; - singleThreadedParams.singleThreaded = true; - assertQuery(singleThreadedParams, duckDbSql); + CursorParameters parallelParams{}; + parallelParams.planNode = plan; + parallelParams.serialExecution = false; + assertQuery(parallelParams, duckDbSql); + + CursorParameters serialParams{}; + serialParams.planNode = plan; + serialParams.serialExecution = true; + assertQuery(serialParams, duckDbSql); } }; diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 145ede14480d..6c6b9a9f0d46 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -3277,7 +3277,7 @@ TEST_F(TableScanTest, remainingFilterLazyWithMultiReferences) { writeToFile(file->getPath(), {vector}); CursorParameters params; params.copyResult = false; - params.singleThreaded = true; + params.serialExecution = true; params.planNode = PlanBuilder() .tableScan(schema, {}, "NOT (c0 % 2 == 0 AND c2 % 3 == 0)") @@ -3318,7 +3318,7 @@ TEST_F( writeToFile(file->getPath(), {vector}); CursorParameters params; params.copyResult = false; - params.singleThreaded = true; + params.serialExecution = true; params.planNode = PlanBuilder() .tableScan(schema, {}, "c0 % 7 == 0 AND c1 % 2 == 0") .planNode(); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index a3bf36810b2d..307d168a3f1a 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -462,7 +462,7 @@ class TestBadMemoryTranslator : public exec::Operator::PlanNodeTranslator { class TaskTest : public HiveConnectorTestBase { protected: static std::pair, std::vector> - executeSingleThreaded( + executeSerial( core::PlanFragment plan, const std::unordered_map>& filePaths = {}) { @@ -480,7 +480,7 @@ class TaskTest : public HiveConnectorTestBase { task->noMoreSplits(nodeId); } - VELOX_CHECK(task->supportsSingleThreadedExecution()); + VELOX_CHECK(task->supportSerialExecutionMode()); vector_size_t numRows = 0; std::vector results; @@ -727,7 +727,7 @@ TEST_F(TaskTest, testTerminateDeadlock) { cursor->task()->toString().find("zombie drivers:"), std::string::npos); } -TEST_F(TaskTest, singleThreadedExecution) { +TEST_F(TaskTest, serialExecution) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), }); @@ -746,7 +746,7 @@ TEST_F(TaskTest, singleThreadedExecution) { uint64_t numCreatedTasks = Task::numCreatedTasks(); uint64_t numDeletedTasks = Task::numDeletedTasks(); { - auto [task, results] = executeSingleThreaded(plan); + auto [task, results] = executeSerial(plan); assertEqualResults( std::vector{expectedResult, expectedResult}, results); } @@ -779,7 +779,7 @@ TEST_F(TaskTest, singleThreadedExecution) { ++numCreatedTasks; ++numDeletedTasks; { - auto [task, results] = executeSingleThreaded(plan); + auto [task, results] = executeSerial(plan); assertEqualResults({expectedResult}, results); } ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks()); @@ -799,16 +799,16 @@ TEST_F(TaskTest, singleThreadedExecution) { { auto [task, results] = - executeSingleThreaded(plan, {{scanId, {filePath->getPath()}}}); + executeSerial(plan, {{scanId, {filePath->getPath()}}}); assertEqualResults({expectedResult}, results); } // Query failure. plan = PlanBuilder().values({data, data}).project({"c0 / 0"}).planFragment(); - VELOX_ASSERT_THROW(executeSingleThreaded(plan), "division by zero"); + VELOX_ASSERT_THROW(executeSerial(plan), "division by zero"); } -TEST_F(TaskTest, singleThreadedHashJoin) { +TEST_F(TaskTest, serialHashJoin) { auto left = makeRowVector( {"t_c0", "t_c1"}, { @@ -850,7 +850,7 @@ TEST_F(TaskTest, singleThreadedHashJoin) { }); { - auto [task, results] = executeSingleThreaded( + auto [task, results] = executeSerial( plan, {{leftScanId, {leftPath->getPath()}}, {rightScanId, {rightPath->getPath()}}}); @@ -858,7 +858,7 @@ TEST_F(TaskTest, singleThreadedHashJoin) { } } -TEST_F(TaskTest, singleThreadedCrossJoin) { +TEST_F(TaskTest, serialCrossJoin) { auto left = makeRowVector({"t_c0"}, {makeFlatVector({1, 2, 3})}); auto leftPath = TempFilePath::create(); writeToFile(leftPath->getPath(), {left}); @@ -888,7 +888,7 @@ TEST_F(TaskTest, singleThreadedCrossJoin) { }); { - auto [task, results] = executeSingleThreaded( + auto [task, results] = executeSerial( plan, {{leftScanId, {leftPath->getPath()}}, {rightScanId, {rightPath->getPath()}}}); @@ -896,7 +896,7 @@ TEST_F(TaskTest, singleThreadedCrossJoin) { } } -TEST_F(TaskTest, singleThreadedExecutionExternalBlockable) { +TEST_F(TaskTest, serialExecutionExternalBlockable) { exec::Operator::registerOperator( std::make_unique()); auto data = makeRowVector({ @@ -967,7 +967,7 @@ TEST_F(TaskTest, singleThreadedExecutionExternalBlockable) { EXPECT_EQ(3, results.size()); } -TEST_F(TaskTest, supportsSingleThreadedExecution) { +TEST_F(TaskTest, supportSerialExecutionMode) { auto plan = PlanBuilder() .tableScan(ROW({"c0"}, {BIGINT()})) .project({"c0 % 10"}) @@ -980,9 +980,9 @@ TEST_F(TaskTest, supportsSingleThreadedExecution) { core::QueryCtx::create(), Task::ExecutionMode::kSerial); - // PartitionedOutput does not support single threaded execution, therefore the + // PartitionedOutput does not support serial execution mode, therefore the // task doesn't support it either. - ASSERT_FALSE(task->supportsSingleThreadedExecution()); + ASSERT_FALSE(task->supportSerialExecutionMode()); } TEST_F(TaskTest, updateBroadCastOutputBuffers) { @@ -1286,7 +1286,7 @@ DEBUG_ONLY_TEST_F(TaskTest, inconsistentExecutionMode) { { // Scenario 2: Serial execution starts first then kicks in Parallel - // execution. + // execution mode. auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), @@ -1792,9 +1792,7 @@ DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { waitForAllTasksToBeDeleted(); } -DEBUG_ONLY_TEST_F( - TaskTest, - singleThreadedLongRunningOperatorInTaskReclaimerAbort) { +DEBUG_ONLY_TEST_F(TaskTest, serialLongRunningOperatorInTaskReclaimerAbort) { auto data = makeRowVector({ makeFlatVector(1'000, [](auto row) { return row; }), }); diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index d22f8b9d2011..b7792d42a43e 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -62,13 +62,13 @@ AssertQueryBuilder& AssertQueryBuilder::destination(int32_t destination) { return *this; } -AssertQueryBuilder& AssertQueryBuilder::singleThreaded(bool singleThreaded) { - if (singleThreaded) { - params_.singleThreaded = true; +AssertQueryBuilder& AssertQueryBuilder::serialExecution(bool serial) { + if (serial) { + params_.serialExecution = true; executor_ = nullptr; return *this; } - params_.singleThreaded = false; + params_.serialExecution = false; executor_ = newExecutor(); return *this; } diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 1358af608ede..257fe624a7f8 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -39,9 +39,9 @@ class AssertQueryBuilder { /// Default is 0. AssertQueryBuilder& destination(int32_t destination); - /// Use single-threaded execution to execute the Velox plan. + /// Use serial execution mode to execute the Velox plan. /// Default is false. - AssertQueryBuilder& singleThreaded(bool singleThreaded); + AssertQueryBuilder& serialExecution(bool serial); /// Set configuration property. May be called multiple times to set multiple /// properties. diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 00942857eda2..631c5bf525cf 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -214,10 +214,10 @@ class MultiThreadedTaskCursor : public TaskCursorBase { maxDrivers_{params.maxDrivers}, numConcurrentSplitGroups_{params.numConcurrentSplitGroups}, numSplitGroups_{params.numSplitGroups} { - VELOX_CHECK(!params.singleThreaded) + VELOX_CHECK(!params.serialExecution) VELOX_CHECK( queryCtx_->isExecutorSupplied(), - "Executor should be set in multi-threaded task cursor") + "Executor should be set in parallel task cursor") queue_ = std::make_shared(params.bufferedBytes); // Captured as a shared_ptr by the consumer callback of task_. @@ -322,10 +322,10 @@ class SingleThreadedTaskCursor : public TaskCursorBase { public: explicit SingleThreadedTaskCursor(const CursorParameters& params) : TaskCursorBase(params, nullptr) { - VELOX_CHECK(params.singleThreaded) + VELOX_CHECK(params.serialExecution) VELOX_CHECK( !queryCtx_->isExecutorSupplied(), - "Executor should not be set in single-threaded task cursor") + "Executor should not be set in serial task cursor") task_ = Task::create( taskId_, @@ -339,8 +339,8 @@ class SingleThreadedTaskCursor : public TaskCursorBase { } VELOX_CHECK( - task_->supportsSingleThreadedExecution(), - "Plan doesn't support single-threaded execution") + task_->supportSerialExecutionMode(), + "Plan doesn't support serial execution mode") } ~SingleThreadedTaskCursor() override { @@ -402,7 +402,7 @@ class SingleThreadedTaskCursor : public TaskCursorBase { }; std::unique_ptr TaskCursor::create(const CursorParameters& params) { - if (params.singleThreaded) { + if (params.serialExecution) { return std::make_unique(params); } return std::make_unique(params); diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index 6f4a0de7d2a9..314eb8958760 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -63,8 +63,8 @@ struct CursorParameters { bool copyResult = true; - /// If true, use single threaded execution. - bool singleThreaded = false; + /// If true, use serial execution mode. Use parallel execution mode otherwise. + bool serialExecution = false; /// If both 'queryConfigs' and 'queryCtx' are specified, the configurations in /// 'queryCtx' will be overridden by 'queryConfig'.