diff --git a/velox/common/base/Counters.cpp b/velox/common/base/Counters.cpp index 8d4cd95d21b4..779233ce0aa8 100644 --- a/velox/common/base/Counters.cpp +++ b/velox/common/base/Counters.cpp @@ -20,6 +20,13 @@ namespace facebook::velox { void registerVeloxMetrics() { + /// ================== Task Execution Counters ================= + // The number of driver yield count when exceeds the per-driver cpu time slice + // limit if enforced. + DEFINE_METRIC(kMetricDriverYieldCount, facebook::velox::StatType::COUNT); + + /// ================== Cache Counters ================= + // Tracks hive handle generation latency in range of [0, 100s] and reports // P50, P90, P99, and P100. DEFINE_HISTOGRAM_METRIC( diff --git a/velox/common/base/Counters.h b/velox/common/base/Counters.h index aeb93ee2cfdd..e8c4f152944c 100644 --- a/velox/common/base/Counters.h +++ b/velox/common/base/Counters.h @@ -67,6 +67,9 @@ constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{ constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{ "velox.arbitrator_free_capacity_bytes"}; +constexpr folly::StringPiece kMetricDriverYieldCount{ + "velox.driver_yield_count"}; + constexpr folly::StringPiece kMetricSpilledInputBytes{ "velox.spill_input_bytes"}; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 23440ffcaef7..f4f6b826d11a 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -312,11 +312,11 @@ class QueryConfig { static constexpr const char* kSparkBloomFilterExpectedNumItems = "spark.bloom_filter.expected_num_items"; - // The default number of bits to use for the bloom filter. + /// The default number of bits to use for the bloom filter. static constexpr const char* kSparkBloomFilterNumBits = "spark.bloom_filter.num_bits"; - // The max number of bits to use for the bloom filter. + /// The max number of bits to use for the bloom filter. static constexpr const char* kSparkBloomFilterMaxNumBits = "spark.bloom_filter.max_num_bits"; @@ -358,6 +358,12 @@ class QueryConfig { static constexpr const char* kMaxSplitPreloadPerDriver = "max_split_preload_per_driver"; + /// If not zero, specifies the cpu time slice limit in ms that a driver thread + /// can continuously run without yielding. If it is zero, then there is no + /// limit. + static constexpr const char* kDriverCpuTimeSliceLimitMs = + "driver_cpu_time_slice_limit_ms"; + uint64_t queryMaxMemoryPerNode() const { return toCapacity( get(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE); @@ -714,6 +720,10 @@ class QueryConfig { return get(kMaxSplitPreloadPerDriver, 2); } + uint32_t driverCpuTimeSliceLimitMs() const { + return get(kDriverCpuTimeSliceLimitMs, 0); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 275f894b1b5d..5c6b115246a1 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -118,6 +118,11 @@ Generic Configuration - true - Whether to enable caches in expression evaluation. If set to true, optimizations including vector pools and evalWithMemo are enabled. + * - driver_cpu_time_slice_limit_ms + - integer + - 0 + - If it is not zero, specifies the time limit that a driver can continuously + run on a thread before yield. If it is zero, then it no limit. .. _expression-evaluation-conf: diff --git a/velox/docs/metrics.rst b/velox/docs/metrics.rst index 219fa278a196..c1bbe30076cb 100644 --- a/velox/docs/metrics.rst +++ b/velox/docs/metrics.rst @@ -50,6 +50,20 @@ in max bucket. It also allows to specify the value percentiles to report for monitoring. This allows BaseStatsReporter and the backend monitoring service to optimize the aggregated data storage. +Task Execution +-------------- +.. list-table:: + :widths: 40 10 50 + :header-rows: 1 + + * - Metric Name + - Type + - Description + * - driver_yield_count + - Count + - The number of times that a driver has yielded from the thread when it + hits the per-driver cpu time slice limit if enforced. + Memory Management ----------------- diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 86b13c7b1a88..6ec52b1c54b8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -19,6 +19,8 @@ #include #include #include +#include "velox/common/base/Counters.h" +#include "velox/common/base/StatsReporter.h" #include "velox/common/process/TraceContext.h" #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" @@ -44,7 +46,7 @@ class CancelGuard { } ~CancelGuard() { - bool onTerminateCalled = false; + bool onTerminateCalled{false}; if (isThrow_) { // Runtime error. Driver is on thread, hence safe. state_->isTerminated = true; @@ -55,10 +57,11 @@ class CancelGuard { } private: - Task* task_; - ThreadState* state_; - std::function onTerminate_; - bool isThrow_ = true; + Task* const task_; + ThreadState* const state_; + const std::function onTerminate_; + + bool isThrow_{true}; }; // Checks if output channel is produced using identity projection and returns @@ -257,6 +260,7 @@ void Driver::init( std::vector> operators) { VELOX_CHECK_NULL(ctx_); ctx_ = std::move(ctx); + cpuSliceMs_ = ctx_->queryConfig().driverCpuTimeSliceLimitMs(); VELOX_CHECK(operators_.empty()); operators_ = std::move(operators); curOperatorId_ = operators_.size() - 1; @@ -434,6 +438,13 @@ CpuWallTiming Driver::processLazyTiming( timing.cpuNanos >= cpuDelta ? timing.cpuNanos - cpuDelta : 0}; } +bool Driver::shouldYield() const { + if (cpuSliceMs_ == 0) { + return false; + } + return execTimeMs() >= cpuSliceMs_; +} + StopReason Driver::runInternal( std::shared_ptr& self, std::shared_ptr& blockingState, @@ -441,7 +452,8 @@ StopReason Driver::runInternal( const auto now = getCurrentTimeMicro(); const auto queuedTime = (now - queueTimeStartMicros_) * 1'000; // Update the next operator's queueTime. - auto stop = closed_ ? StopReason::kTerminate : task()->enter(state_, now); + StopReason stop = + closed_ ? StopReason::kTerminate : task()->enter(state_, now); if (stop != StopReason::kNone) { if (stop == StopReason::kTerminate) { // ctx_ still has a reference to the Task. 'this' is not on @@ -501,7 +513,18 @@ StopReason Driver::runInternal( return stop; } - auto op = operators_[i].get(); + auto* op = operators_[i].get(); + + if (FOLLY_UNLIKELY(shouldYield())) { + recordYieldCount(); + stop = StopReason::kYield; + future = ContinueFuture{folly::Unit{}}; + blockingState = std::make_shared( + self, std::move(future), op, blockingReason_); + guard.notThrown(); + return stop; + } + // In case we are blocked, this index will point to the operator, whose // queuedTime we should update. curOperatorId_ = i; @@ -711,6 +734,18 @@ StopReason Driver::runInternal( #undef CALL_OPERATOR +// static +std::atomic_uint64_t& Driver::yieldCount() { + static std::atomic_uint64_t count{0}; + return count; +} + +// static +void Driver::recordYieldCount() { + ++yieldCount(); + RECORD_METRIC_VALUE(kMetricDriverYieldCount); +} + // static void Driver::run(std::shared_ptr self) { process::TraceContext trace("Driver::run"); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index aa614afe4126..fcac70efbbae 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -37,21 +37,21 @@ struct OperatorStats; class Task; enum class StopReason { - // Keep running. + /// Keep running. kNone, - // Go off thread and do not schedule more activity. + /// Go off thread and do not schedule more activity. kPause, - // Stop and free all. This is returned once and the thread that gets - // this value is responsible for freeing the state associated with - // the thread. Other threads will get kAlreadyTerminated after the - // first thread has received kTerminate. + /// Stop and free all. This is returned once and the thread that gets this + /// value is responsible for freeing the state associated with the thread. + /// Other threads will get kAlreadyTerminated after the first thread has + /// received kTerminate. kTerminate, kAlreadyTerminated, - // Go off thread and then enqueue to the back of the runnable queue. + /// Go off thread and then enqueue to the back of the runnable queue. kYield, - // Must wait for external events. + /// Must wait for external events. kBlock, - // No more data to produce. + /// No more data to produce. kAtEnd, kAlreadyOnThread }; @@ -60,53 +60,56 @@ std::string stopReasonString(StopReason reason); std::ostream& operator<<(std::ostream& out, const StopReason& reason); -// Represents a Driver's state. This is used for cancellation, forcing -// release of and for waiting for memory. The fields are serialized on -// the mutex of the Driver's Task. -// -// The Driver goes through the following states: -// Not on thread. It is created and has not started. All flags are false. -// -// Enqueued - The Driver is added to an executor but does not yet have a thread. -// isEnqueued is true. Next states are terminated or on thread. -// -// On thread - 'thread' is set to the thread that is running the Driver. Next -// states are blocked, terminated, suspended, enqueued. -// -// Blocked - The Driver is not on thread and is waiting for an external event. -// Next states are terminated, enqueued. -// -// Suspended - The Driver is on thread, 'thread' and 'isSuspended' are set. The -// thread does not manipulate the Driver's state and is suspended as in waiting -// for memory or out of process IO. This is different from Blocked in that here -// we keep the stack so that when the wait is over the control stack is not -// lost. Next states are on thread or terminated. -// -// Terminated - 'isTerminated' is set. The Driver cannot run after this and -// the state is final. -// -// CancelPool allows terminating or pausing a set of Drivers. The Task API -// allows starting or resuming Drivers. When terminate is requested the request -// is successful when all Drivers are off thread, blocked or suspended. When -// pause is requested, we have success when all Drivers are either enqueued, -// suspended, off thread or blocked. +/// Represents a Driver's state. This is used for cancellation, forcing +/// release of and for waiting for memory. The fields are serialized on +/// the mutex of the Driver's Task. +/// +/// The Driver goes through the following states: +/// Not on thread. It is created and has not started. All flags are false. +/// +/// Enqueued - The Driver is added to an executor but does not yet have a +/// thread. isEnqueued is true. Next states are terminated or on thread. +/// +/// On thread - 'thread' is set to the thread that is running the Driver. Next +/// states are blocked, terminated, suspended, enqueued. +/// +/// Blocked - The Driver is not on thread and is waiting for an external event. +/// Next states are terminated, enqueued. +/// +/// Suspended - The Driver is on thread, 'thread' and 'isSuspended' are set. The +/// thread does not manipulate the Driver's state and is suspended as in waiting +/// for memory or out of process IO. This is different from Blocked in that here +/// we keep the stack so that when the wait is over the control stack is not +/// lost. Next states are on thread or terminated. +/// +/// Terminated - 'isTerminated' is set. The Driver cannot run after this and +/// the state is final. +/// +/// CancelPool allows terminating or pausing a set of Drivers. The Task API +/// allows starting or resuming Drivers. When terminate is requested the request +/// is successful when all Drivers are off thread, blocked or suspended. When +/// pause is requested, we have success when all Drivers are either enqueued, +/// suspended, off thread or blocked. struct ThreadState { - // The thread currently running this. + /// The thread currently running this. std::atomic thread{std::thread::id()}; - // The tid of 'thread'. Allows finding the thread in a debugger. + /// The tid of 'thread'. Allows finding the thread in a debugger. std::atomic tid{0}; - // True if queued on an executor but not on thread. + /// True if queued on an executor but not on thread. std::atomic isEnqueued{false}; - // True if being terminated or already terminated. + /// True if being terminated or already terminated. std::atomic isTerminated{false}; - // True if there is a future outstanding that will schedule this on an - // executor thread when some promise is realized. + /// True if there is a future outstanding that will schedule this on an + /// executor thread when some promise is realized. bool hasBlockingFuture{false}; - // True if on thread but in a section waiting for RPC or memory - // strategy decision. The thread is not supposed to access its - // memory, which a third party can revoke while the thread is in - // this state. + /// True if on thread but in a section waiting for RPC or memory strategy + /// decision. The thread is not supposed to access its memory, which a third + /// party can revoke while the thread is in this state. bool isSuspended{false}; + /// The start execution time on thread in milliseconds. It is reset when the + /// driver goes off thread. This is used to track the time that a driver has + /// continuously run on a thread for per-driver cpu time slice enforcement. + size_t startExecTimeMs{0}; bool isOnThread() const { return thread != std::thread::id(); @@ -114,6 +117,7 @@ struct ThreadState { void setThread() { thread = std::this_thread::get_id(); + startExecTimeMs = getCurrentTimeMs(); #if !defined(__APPLE__) // This is a debugging feature disabled on the Mac since syscall // is deprecated on that platform. @@ -123,9 +127,20 @@ struct ThreadState { void clearThread() { thread = std::thread::id(); // no thread. + startExecTimeMs = 0; tid = 0; } + /// Returns the driver execution time on thread. Returns zero if the driver + /// is currently not running on thread. + size_t execTimeMs() const { + if (startExecTimeMs == 0) { + VELOX_CHECK(!isOnThread()); + return 0; + } + return getCurrentTimeMs() - startExecTimeMs; + } + std::string toJsonString() const { folly::dynamic obj = folly::dynamic::object; obj["onThread"] = std::to_string(isOnThread()); @@ -134,6 +149,7 @@ struct ThreadState { obj["isEnqueued"] = isEnqueued.load(); obj["hasBlockingFuture"] = hasBlockingFuture; obj["isSuspended"] = isSuspended; + obj["startExecTime"] = startExecTimeMs; return folly::toPrettyJson(obj); } }; @@ -323,6 +339,12 @@ class Driver : public std::enable_shared_from_this { return state_.isOnThread(); } + /// Returns the time in ms since this driver started execution on thread. The + /// function returns zero if this driver is off-thread. + uint64_t execTimeMs() const { + return state_.execTimeMs(); + } + bool isTerminated() const { return state_.isTerminated; } @@ -333,6 +355,10 @@ class Driver : public std::enable_shared_from_this { return state_; } + /// Returns true if this driver is running on thread and has exceeded the cpu + /// time slice limit if set. + bool shouldYield() const; + void initializeOperatorStats(std::vector& stats); /// Close operators and add operator stats to the task. @@ -387,6 +413,9 @@ class Driver : public std::enable_shared_from_this { return blockingReason_; } + /// Returns the process-wide number of driver cpu yields. + static std::atomic_uint64_t& yieldCount(); + static std::shared_ptr testingCreate( std::unique_ptr ctx = nullptr) { auto driver = new Driver(); @@ -400,6 +429,9 @@ class Driver : public std::enable_shared_from_this { private: Driver() = default; + // Invoked to record the driver cpu yield count. + static void recordYieldCount(); + void init( std::unique_ptr driverCtx, std::vector> operators); @@ -440,6 +472,9 @@ class Driver : public std::enable_shared_from_this { std::unique_ptr ctx_; + // If not zero, specifies the driver cpu time slice. + size_t cpuSliceMs_{0}; + bool operatorsInitialized_{false}; std::atomic_bool closed_{false}; diff --git a/velox/exec/tests/DriverTest.cpp b/velox/exec/tests/DriverTest.cpp index 92024f2e258e..4dbf664226d1 100644 --- a/velox/exec/tests/DriverTest.cpp +++ b/velox/exec/tests/DriverTest.cpp @@ -20,6 +20,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" +#include "velox/exec/PlanNodeStats.h" #include "velox/exec/Values.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" @@ -1363,3 +1364,57 @@ DEBUG_ONLY_TEST_F(DriverTest, nonReclaimableSection) { auto plan = PlanBuilder().values(batches).planNode(); ASSERT_NO_THROW(AssertQueryBuilder(plan).copyResults(pool())); } + +DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) { + const int numBatches = 3; + std::vector batches; + for (int i = 0; i < numBatches; ++i) { + batches.push_back( + makeRowVector({"c0"}, {makeFlatVector({1, 2, 3})})); + } + + for (const auto& hasCpuTimeSliceLimit : {false, true}) { + SCOPED_TRACE(fmt::format("hasCpuSliceLimit: {}", hasCpuTimeSliceLimit)); + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](const exec::Values* + values) { + // Verify that no matter driver cpu time slicing is enforced or not, + // the driver start execution time is set properly. + ASSERT_NE( + values->testingOperatorCtx()->driver()->state().startExecTimeMs, + 0); + if (hasCpuTimeSliceLimit) { + std::this_thread::sleep_for(std::chrono::seconds(1)); // NOLINT + ASSERT_GT( + values->testingOperatorCtx()->driver()->state().execTimeMs(), + 0); + } + })); + auto planNodeIdGenerator = std::make_shared(); + auto fragment = + PlanBuilder(planNodeIdGenerator).values(batches).planFragment(); + std::unordered_map queryConfig; + if (hasCpuTimeSliceLimit) { + queryConfig.emplace(core::QueryConfig::kDriverCpuTimeSliceLimitMs, "500"); + } + const uint64_t oldYieldCount = Driver::yieldCount(); + auto task = Task::create( + "t0", + fragment, + 0, + std::make_shared( + driverExecutor_.get(), std::move(queryConfig)), + [](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) { + return exec::BlockingReason::kNotBlocked; + }); + task->start(1, 1); + ASSERT_TRUE(waitForTaskCompletion(task.get(), 600'000'000)); + if (hasCpuTimeSliceLimit) { + // NOTE: there is one additional yield for the empty output. + ASSERT_GE(Driver::yieldCount(), oldYieldCount + numBatches + 1); + } else { + ASSERT_EQ(Driver::yieldCount(), oldYieldCount); + } + } +}