Skip to content

Commit

Permalink
Properly report lazy loaded inputBytes (facebookincubator#11097)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#11097

Whenever there is lazy loading, TableScan would come out with zero
input and output bytes, and they would be attributed to the operator that in
fact loaded the lazy vector. Using the existing mechanism to save it locally
and periodically apply to the correct source operator.

Reviewed By: xiaoxmeng

Differential Revision: D63414708
  • Loading branch information
pedroerp authored and facebook-github-bot committed Sep 27, 2024
1 parent 28a8979 commit faaccef
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 36 deletions.
2 changes: 2 additions & 0 deletions velox/common/time/CpuWallTimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/time/CpuWallTimer.h"

namespace facebook::velox {

CpuWallTimer::CpuWallTimer(CpuWallTiming& timing) : timing_(timing) {
++timing_.count;
cpuTimeStart_ = process::threadCpuNanos();
Expand All @@ -29,4 +30,5 @@ CpuWallTimer::~CpuWallTimer() {
std::chrono::steady_clock::now() - wallTimeStart_);
timing_.wallNanos += duration.count();
}

} // namespace facebook::velox
34 changes: 22 additions & 12 deletions velox/common/time/CpuWallTimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,14 @@ class CpuWallTimer {
CpuWallTiming& timing_;
};

// Keeps track of elapsed CPU and wall time from construction time.
// Composes delta CpuWallTiming upon destruction and passes it to the user
// callback, where it can be added to the user's CpuWallTiming using
// CpuWallTiming::add().
template <typename F>
class DeltaCpuWallTimer {
/// Keeps track of elapsed CPU and wall time from construction time.
class DeltaCpuWallTimeStopWatch {
public:
explicit DeltaCpuWallTimer(F&& func)
explicit DeltaCpuWallTimeStopWatch()
: wallTimeStart_(std::chrono::steady_clock::now()),
cpuTimeStart_(process::threadCpuNanos()),
func_(std::move(func)) {}
cpuTimeStart_(process::threadCpuNanos()) {}

~DeltaCpuWallTimer() {
CpuWallTiming elapsed() const {
// NOTE: End the cpu-time timing first, and then end the wall-time timing,
// so as to avoid the counter-intuitive phenomenon that the final calculated
// cpu-time is slightly larger than the wall-time.
Expand All @@ -78,15 +73,30 @@ class DeltaCpuWallTimer {
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now() - wallTimeStart_)
.count();
const CpuWallTiming deltaTiming{1, wallTimeDuration, cpuTimeDuration};
func_(deltaTiming);
return CpuWallTiming{1, wallTimeDuration, cpuTimeDuration};
}

private:
// NOTE: Put `wallTimeStart_` before `cpuTimeStart_`, so that wall-time starts
// counting earlier than cpu-time.
const std::chrono::steady_clock::time_point wallTimeStart_;
const uint64_t cpuTimeStart_;
};

/// Composes delta CpuWallTiming upon destruction and passes it to the user
/// callback, where it can be added to the user's CpuWallTiming using
/// CpuWallTiming::add().
template <typename F>
class DeltaCpuWallTimer {
public:
explicit DeltaCpuWallTimer(F&& func) : func_(std::move(func)) {}

~DeltaCpuWallTimer() {
func_(timer_.elapsed());
}

private:
DeltaCpuWallTimeStopWatch timer_;
F func_;
};

Expand Down
26 changes: 22 additions & 4 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,22 @@ size_t OpCallStatusRaw::callDuration() const {
: fmt::format("null::{}", operatorMethod);
}

CpuWallTiming Driver::processLazyTiming(
CpuWallTiming Driver::processLazyIoStats(
Operator& op,
const CpuWallTiming& timing) {
if (&op == operators_[0].get()) {
return timing;
}
auto lockStats = op.stats().wlock();

// Checks and tries to update cpu time from lazy loads.
auto it = lockStats->runtimeStats.find(LazyVector::kCpuNanos);
if (it == lockStats->runtimeStats.end()) {
// Return early if no lazy activity. Lazy CPU and wall times are recorded
// together, checking one is enough.
return timing;
}
int64_t cpu = it->second.sum;
const int64_t cpu = it->second.sum;
auto cpuDelta = std::max<int64_t>(0, cpu - lockStats->lastLazyCpuNanos);
if (cpuDelta == 0) {
// Return early if no change. Checking one counter is enough. If this did
Expand All @@ -421,15 +423,29 @@ CpuWallTiming Driver::processLazyTiming(
return timing;
}
lockStats->lastLazyCpuNanos = cpu;

// Checks and tries to update wall time from lazy loads.
int64_t wallDelta = 0;
it = lockStats->runtimeStats.find(LazyVector::kWallNanos);
if (it != lockStats->runtimeStats.end()) {
int64_t wall = it->second.sum;
const int64_t wall = it->second.sum;
wallDelta = std::max<int64_t>(0, wall - lockStats->lastLazyWallNanos);
if (wallDelta > 0) {
lockStats->lastLazyWallNanos = wall;
}
}

// Checks and tries to update input bytes from lazy loads.
int64_t inputBytesDelta = 0;
it = lockStats->runtimeStats.find(LazyVector::kInputBytes);
if (it != lockStats->runtimeStats.end()) {
const int64_t inputBytes = it->second.sum;
inputBytesDelta = inputBytes - lockStats->lastLazyInputBytes;
if (inputBytesDelta > 0) {
lockStats->lastLazyInputBytes = inputBytes;
}
}

lockStats.unlock();
cpuDelta = std::min<int64_t>(cpuDelta, timing.cpuNanos);
wallDelta = std::min<int64_t>(wallDelta, timing.wallNanos);
Expand All @@ -439,6 +455,8 @@ CpuWallTiming Driver::processLazyTiming(
static_cast<uint64_t>(wallDelta),
static_cast<uint64_t>(cpuDelta),
});
lockStats->inputBytes += inputBytesDelta;
lockStats->outputBytes += inputBytesDelta;
return CpuWallTiming{
1,
timing.wallNanos - wallDelta,
Expand Down Expand Up @@ -1016,7 +1034,7 @@ void Driver::withDeltaCpuWallTimer(
// opTimingMember upon destruction of the timer when withDeltaCpuWallTimer
// ends. The timer is created on the stack to avoid heap allocation
auto f = [op, opTimingMember, this](const CpuWallTiming& elapsedTime) {
auto elapsedSelfTime = processLazyTiming(*op, elapsedTime);
auto elapsedSelfTime = processLazyIoStats(*op, elapsedTime);
op->stats().withWLock([&](auto& lockedStats) {
(lockedStats.*opTimingMember).add(elapsedSelfTime);
});
Expand Down
13 changes: 6 additions & 7 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,12 @@ class Driver : public std::enable_shared_from_this<Driver> {
TimingMemberPtr opTimingMember,
Func&& opFunction);

// Adjusts 'timing' by removing the lazy load wall and CPU times
// accrued since last time timing information was recorded for
// 'op'. The accrued lazy load times are credited to the source
// operator of 'this'. The per-operator runtimeStats for lazy load
// are left in place to reflect which operator triggered the load
// but these do not bias the op's timing.
CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing);
// Adjusts 'timing' by removing the lazy load wall time, CPU time, and input
// bytes accrued since last time timing information was recorded for 'op'. The
// accrued lazy load times are credited to the source operator of 'this'. The
// per-operator runtimeStats for lazy load are left in place to reflect which
// operator triggered the load but these do not bias the op's timing.
CpuWallTiming processLazyIoStats(Operator& op, const CpuWallTiming& timing);

inline void validateOperatorOutputResult(
const RowVectorPtr& result,
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ struct OperatorStats {
// Last recorded values for lazy loading times for loads triggered by 'this'.
int64_t lastLazyCpuNanos{0};
int64_t lastLazyWallNanos{0};
int64_t lastLazyInputBytes{0};

// Total null keys processed by the operator.
// Currently populated only by HashJoin/HashBuild.
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/PrintPlanWithStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
{" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"},
{" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
Expand Down Expand Up @@ -269,6 +270,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) {
{{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"},
{" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" distinctKey0\\s+sum: .+, count: 1, min: .+, max: .+"},
{" hashtable.capacity\\s+sum: 1252, count: 1, min: 1252, max: 1252"},
Expand Down Expand Up @@ -345,6 +347,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) {
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
{" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"},
{" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
{" numWrittenFiles\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3815,7 +3815,12 @@ TEST_F(TableScanTest, structLazy) {
.project({"cardinality(c2.c0)"})
.planNode();

assertQuery(op, {filePath}, "select c0 % 3 from tmp");
auto task = assertQuery(op, {filePath}, "select c0 % 3 from tmp");

// Ensure lazy stats are attributed to table scan.
const auto stats = task->taskStats();
EXPECT_GT(stats.pipelineStats[0].operatorStats[0].inputBytes, 0);
EXPECT_GT(stats.pipelineStats[0].operatorStats[0].outputBytes, 0);
}

TEST_F(TableScanTest, interleaveLazyEager) {
Expand Down
41 changes: 31 additions & 10 deletions velox/vector/LazyVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,37 @@
#include "velox/vector/SelectivityVector.h"

namespace facebook::velox {

namespace {
void writeIOTiming(const CpuWallTiming& delta) {
addThreadLocalRuntimeStat(
LazyVector::kWallNanos,
RuntimeCounter(delta.wallNanos, RuntimeCounter::Unit::kNanos));
addThreadLocalRuntimeStat(
LazyVector::kCpuNanos,
RuntimeCounter(delta.cpuNanos, RuntimeCounter::Unit::kNanos));
}

// Convenience class to record cpu and wall time from construction, updating
// thread local stats at destruction, including input bytes of the vector passed
// as parameter.
class LazyIoStatsRecorder {
public:
LazyIoStatsRecorder(VectorPtr* vector) : vector_(vector) {}

~LazyIoStatsRecorder() {
auto cpuDelta = timer_.elapsed();
addThreadLocalRuntimeStat(
LazyVector::kWallNanos,
RuntimeCounter(cpuDelta.wallNanos, RuntimeCounter::Unit::kNanos));
addThreadLocalRuntimeStat(
LazyVector::kCpuNanos,
RuntimeCounter(cpuDelta.cpuNanos, RuntimeCounter::Unit::kNanos));

if (*vector_) {
addThreadLocalRuntimeStat(
LazyVector::kInputBytes,
RuntimeCounter(
(*vector_)->estimateFlatSize(), RuntimeCounter::Unit::kBytes));
}
}

private:
DeltaCpuWallTimeStopWatch timer_;
VectorPtr* vector_;
};

} // namespace

void VectorLoader::load(
Expand All @@ -41,7 +62,7 @@ void VectorLoader::load(
vector_size_t resultSize,
VectorPtr* result) {
{
DeltaCpuWallTimer timer([&](auto& delta) { writeIOTiming(delta); });
LazyIoStatsRecorder recorder(result);
loadInternal(rows, hook, resultSize, result);
}
if (hook) {
Expand Down
2 changes: 2 additions & 0 deletions velox/vector/LazyVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class LazyVector : public BaseVector {
public:
static constexpr const char* kCpuNanos = "dataSourceLazyCpuNanos";
static constexpr const char* kWallNanos = "dataSourceLazyWallNanos";
static constexpr const char* kInputBytes = "dataSourceLazyInputBytes";

LazyVector(
velox::memory::MemoryPool* pool,
TypePtr type,
Expand Down
6 changes: 4 additions & 2 deletions velox/vector/tests/LazyVectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,11 @@ TEST_F(LazyVectorTest, runtimeStats) {
std::sort(stats.begin(), stats.end(), [](auto& x, auto& y) {
return x.first < y.first;
});
ASSERT_EQ(stats.size(), 2);
ASSERT_EQ(stats.size(), 3);
ASSERT_EQ(stats[0].first, LazyVector::kCpuNanos);
ASSERT_GE(stats[0].second.value, 0);
ASSERT_EQ(stats[1].first, LazyVector::kWallNanos);
ASSERT_EQ(stats[1].first, LazyVector::kInputBytes);
ASSERT_GE(stats[1].second.value, 0);
ASSERT_EQ(stats[2].first, LazyVector::kWallNanos);
ASSERT_GE(stats[2].second.value, 0);
}

0 comments on commit faaccef

Please sign in to comment.