Skip to content

Commit

Permalink
Check task pause first when driver leave suspended state (facebookinc…
Browse files Browse the repository at this point in the history
…ubator#11006)

Summary:
Pull Request resolved: facebookincubator#11006

When driver thread leave suspension state, it first check if the task has been terminated or not.
If it is terminated, then it returns without waiting for the task has been resumed or not. This assumes
that the driver thread will only leave suspend state if there is no spilling on the associated query
task. This assumption might always be true with global arbitration optimization which decouple the
memory arbitration request and memory arbitration operation. So we need to let driver thread wait until
the task has been resumed. Otherwise, the driver thread might continue execution after leave
suspended state until it checks the task state. This might cause concurrent updates to the operator
state.

The sequence to trigger this with global arbitration optimization:
T1. driver call try reserve memory (put the driver thread in reclaimable state) which trigger memory arbitration
T2. driver memory arbitration succeeds and about to leave
T3. the background global memory arbitration kicks off and try to reclaim from the driver as it is in reclaimable
state. The memory arbitration will pause the task execution.
T4. the task is terminated by coordinator for some reason
T5. driver thread tries to leave suspended state and realize that the task has been terminated so it just leaves the
suspended state.
T6. driver thread continue execution after memory reservation as it doesn't notice the task has been terminated.
Given that, both the driver execution and spill could operate on the same thread in parallel.

This PR changes driver thread to wait for the task resume signal first when leave from suspended state in T5.
Unit test is added and verified with global arbitration optimization shadow

Reviewed By: tanjialiang, oerling

Differential Revision: D62727855

fbshipit-source-id: c711e8c7c90873a6ea14ec249f78aa0e071f724b
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 17, 2024
1 parent 6d3fbfe commit a86ad08
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 8 deletions.
15 changes: 8 additions & 7 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2703,18 +2703,19 @@ StopReason Task::leaveSuspended(ThreadState& state) {
++numThreads_;
}
});
if (state.isTerminated) {
return StopReason::kAlreadyTerminated;
}
if (terminateRequested_) {
state.isTerminated = true;
return StopReason::kTerminate;
}
if (state.numSuspensions > 1 || !pauseRequested_) {
if (state.isTerminated) {
return StopReason::kAlreadyTerminated;
}
if (terminateRequested_) {
state.isTerminated = true;
return StopReason::kTerminate;
}
// If we have more than one suspension requests on this driver thread or
// the task has been resumed, then we return here.
return StopReason::kNone;
}

VELOX_CHECK_GT(state.numSuspensions, 0);
VELOX_CHECK_GE(numThreads_, 0);
leaveGuard.dismiss();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ class Task : public std::enable_shared_from_this<Task> {
// queued split groups.
std::queue<uint32_t> queuedSplitGroups_;

TaskState state_ = TaskState::kRunning;
TaskState state_{TaskState::kRunning};

// Stores splits state structure for each plan node. At construction populated
// with all leaf plan nodes that require splits. Afterwards accessed with
Expand Down
43 changes: 43 additions & 0 deletions velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,49 @@ DEBUG_ONLY_TEST_F(DriverTest, driverSuspensionCalledFromOffThread) {
VELOX_ASSERT_THROW(driver->task()->leaveSuspended(driver->state()), "");
}

// This test case verifies that the driver thread leaves suspended state after
// task termiates and before resuming.
DEBUG_ONLY_TEST_F(DriverTest, driverSuspendedAfterTaskTerminateBeforeResume) {
std::shared_ptr<Driver> driver;
std::atomic_bool triggerSuspended{false};
std::atomic_bool taskPaused{false};
// std::atomic_bool driverExecutionWaitFlag{true};
folly::EventCount taskPausedWait;
std::atomic_bool driverLeaveSuspended{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Values::getOutput",
std::function<void(const exec::Values*)>([&](const exec::Values* values) {
if (triggerSuspended.exchange(true)) {
return;
}
driver = values->testingOperatorCtx()->driver()->shared_from_this();
driver->task()->enterSuspended(driver->state());
driver->task()->requestPause().wait();
taskPaused = true;
taskPausedWait.notifyAll();
const StopReason ret = driver->task()->leaveSuspended(driver->state());
ASSERT_EQ(ret, StopReason::kAlreadyTerminated);
driverLeaveSuspended = true;
}));

auto task = createAndStartTaskToReadValues(1);

taskPausedWait.await([&]() { return taskPaused.load(); });
task->requestCancel().wait();
// Wait for 1 second and check the driver is still under suspended state
// without resuming.
std::this_thread::sleep_for(std::chrono::milliseconds(1'000));
ASSERT_FALSE(driverLeaveSuspended);

Task::resume(task);
std::this_thread::sleep_for(std::chrono::milliseconds(1'000));
// Check the driver leaves the suspended state after task is resumed. Wait for
// 1 second to avoid timing flakiness.
ASSERT_TRUE(driverLeaveSuspended);

ASSERT_TRUE(waitForTaskCancelled(task.get(), 100'000'000));
}

DEBUG_ONLY_TEST_F(DriverTest, driverThreadContext) {
ASSERT_TRUE(driverThreadContext() == nullptr);
std::thread nonDriverThread(
Expand Down

0 comments on commit a86ad08

Please sign in to comment.