Skip to content

Commit

Permalink
Add per-driver cpu time slicing support (apache#8083)
Browse files Browse the repository at this point in the history
Summary:
Support per-driver cpu time slicing. It provides an advisory method
from driver to check if a driver has been running too long on the
thread. The driver execution framework respect this limit by checking
this when switch operators in the driver pipeline execution. The long
running operator method shall also respect this limit by checking this
inside the operator method execution such as table scan and spill read
path in hash join.

Add one query config: driver_cpu_time_slice_limit_ms to allow query
system to configure on query execution, and adds metric to monitor the
cpu yield events in production: driver_yield_count

Unit test is added to verify.

The followup is to add a system-wide config in Prestissimo to configure
this for slow query which are blocked in the queue for too long.

Pull Request resolved: facebookincubator/velox#8083

Reviewed By: Yuhta

Differential Revision: D52240407

Pulled By: xiaoxmeng

fbshipit-source-id: 282ad89bfbae4361302c38d7480c5353756b6acb
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 19, 2023
1 parent 0f32527 commit f0b1ae0
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 58 deletions.
7 changes: 7 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
namespace facebook::velox {

void registerVeloxMetrics() {
/// ================== Task Execution Counters =================
// The number of driver yield count when exceeds the per-driver cpu time slice
// limit if enforced.
DEFINE_METRIC(kMetricDriverYieldCount, facebook::velox::StatType::COUNT);

/// ================== Cache Counters =================

// Tracks hive handle generation latency in range of [0, 100s] and reports
// P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
Expand Down
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{
constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{
"velox.arbitrator_free_capacity_bytes"};

constexpr folly::StringPiece kMetricDriverYieldCount{
"velox.driver_yield_count"};

constexpr folly::StringPiece kMetricSpilledInputBytes{
"velox.spill_input_bytes"};

Expand Down
14 changes: 12 additions & 2 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,11 @@ class QueryConfig {
static constexpr const char* kSparkBloomFilterExpectedNumItems =
"spark.bloom_filter.expected_num_items";

// The default number of bits to use for the bloom filter.
/// The default number of bits to use for the bloom filter.
static constexpr const char* kSparkBloomFilterNumBits =
"spark.bloom_filter.num_bits";

// The max number of bits to use for the bloom filter.
/// The max number of bits to use for the bloom filter.
static constexpr const char* kSparkBloomFilterMaxNumBits =
"spark.bloom_filter.max_num_bits";

Expand Down Expand Up @@ -358,6 +358,12 @@ class QueryConfig {
static constexpr const char* kMaxSplitPreloadPerDriver =
"max_split_preload_per_driver";

/// If not zero, specifies the cpu time slice limit in ms that a driver thread
/// can continuously run without yielding. If it is zero, then there is no
/// limit.
static constexpr const char* kDriverCpuTimeSliceLimitMs =
"driver_cpu_time_slice_limit_ms";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
Expand Down Expand Up @@ -714,6 +720,10 @@ class QueryConfig {
return get<int32_t>(kMaxSplitPreloadPerDriver, 2);
}

uint32_t driverCpuTimeSliceLimitMs() const {
return get<uint32_t>(kDriverCpuTimeSliceLimitMs, 0);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ Generic Configuration
- true
- Whether to enable caches in expression evaluation. If set to true, optimizations including vector pools and
evalWithMemo are enabled.
* - driver_cpu_time_slice_limit_ms
- integer
- 0
- If it is not zero, specifies the time limit that a driver can continuously
run on a thread before yield. If it is zero, then it no limit.

.. _expression-evaluation-conf:

Expand Down
14 changes: 14 additions & 0 deletions velox/docs/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ in max bucket. It also allows to specify the value percentiles to report for
monitoring. This allows BaseStatsReporter and the backend monitoring service to
optimize the aggregated data storage.

Task Execution
--------------
.. list-table::
:widths: 40 10 50
:header-rows: 1

* - Metric Name
- Type
- Description
* - driver_yield_count
- Count
- The number of times that a driver has yielded from the thread when it
hits the per-driver cpu time slice limit if enforced.

Memory Management
-----------------

Expand Down
49 changes: 42 additions & 7 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/executors/thread_factory/InitThreadFactory.h>
#include <gflags/gflags.h>
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/process/TraceContext.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/common/time/Timer.h"
Expand All @@ -44,7 +46,7 @@ class CancelGuard {
}

~CancelGuard() {
bool onTerminateCalled = false;
bool onTerminateCalled{false};
if (isThrow_) {
// Runtime error. Driver is on thread, hence safe.
state_->isTerminated = true;
Expand All @@ -55,10 +57,11 @@ class CancelGuard {
}

private:
Task* task_;
ThreadState* state_;
std::function<void(StopReason reason)> onTerminate_;
bool isThrow_ = true;
Task* const task_;
ThreadState* const state_;
const std::function<void(StopReason reason)> onTerminate_;

bool isThrow_{true};
};

// Checks if output channel is produced using identity projection and returns
Expand Down Expand Up @@ -257,6 +260,7 @@ void Driver::init(
std::vector<std::unique_ptr<Operator>> operators) {
VELOX_CHECK_NULL(ctx_);
ctx_ = std::move(ctx);
cpuSliceMs_ = ctx_->queryConfig().driverCpuTimeSliceLimitMs();
VELOX_CHECK(operators_.empty());
operators_ = std::move(operators);
curOperatorId_ = operators_.size() - 1;
Expand Down Expand Up @@ -434,14 +438,22 @@ CpuWallTiming Driver::processLazyTiming(
timing.cpuNanos >= cpuDelta ? timing.cpuNanos - cpuDelta : 0};
}

bool Driver::shouldYield() const {
if (cpuSliceMs_ == 0) {
return false;
}
return execTimeMs() >= cpuSliceMs_;
}

StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>& blockingState,
RowVectorPtr& result) {
const auto now = getCurrentTimeMicro();
const auto queuedTime = (now - queueTimeStartMicros_) * 1'000;
// Update the next operator's queueTime.
auto stop = closed_ ? StopReason::kTerminate : task()->enter(state_, now);
StopReason stop =
closed_ ? StopReason::kTerminate : task()->enter(state_, now);
if (stop != StopReason::kNone) {
if (stop == StopReason::kTerminate) {
// ctx_ still has a reference to the Task. 'this' is not on
Expand Down Expand Up @@ -501,7 +513,18 @@ StopReason Driver::runInternal(
return stop;
}

auto op = operators_[i].get();
auto* op = operators_[i].get();

if (FOLLY_UNLIKELY(shouldYield())) {
recordYieldCount();
stop = StopReason::kYield;
future = ContinueFuture{folly::Unit{}};
blockingState = std::make_shared<BlockingState>(
self, std::move(future), op, blockingReason_);
guard.notThrown();
return stop;
}

// In case we are blocked, this index will point to the operator, whose
// queuedTime we should update.
curOperatorId_ = i;
Expand Down Expand Up @@ -711,6 +734,18 @@ StopReason Driver::runInternal(

#undef CALL_OPERATOR

// static
std::atomic_uint64_t& Driver::yieldCount() {
static std::atomic_uint64_t count{0};
return count;
}

// static
void Driver::recordYieldCount() {
++yieldCount();
RECORD_METRIC_VALUE(kMetricDriverYieldCount);
}

// static
void Driver::run(std::shared_ptr<Driver> self) {
process::TraceContext trace("Driver::run");
Expand Down
Loading

0 comments on commit f0b1ae0

Please sign in to comment.