diff --git a/velox/common/time/CpuWallTimer.cpp b/velox/common/time/CpuWallTimer.cpp index 7a1e137ba91f..e21944b05d1c 100644 --- a/velox/common/time/CpuWallTimer.cpp +++ b/velox/common/time/CpuWallTimer.cpp @@ -17,6 +17,7 @@ #include "velox/common/time/CpuWallTimer.h" namespace facebook::velox { + CpuWallTimer::CpuWallTimer(CpuWallTiming& timing) : timing_(timing) { ++timing_.count; cpuTimeStart_ = process::threadCpuNanos(); @@ -29,4 +30,5 @@ CpuWallTimer::~CpuWallTimer() { std::chrono::steady_clock::now() - wallTimeStart_); timing_.wallNanos += duration.count(); } + } // namespace facebook::velox diff --git a/velox/common/time/CpuWallTimer.h b/velox/common/time/CpuWallTimer.h index f60f23c19dc9..6562364a2942 100644 --- a/velox/common/time/CpuWallTimer.h +++ b/velox/common/time/CpuWallTimer.h @@ -57,19 +57,14 @@ class CpuWallTimer { CpuWallTiming& timing_; }; -// Keeps track of elapsed CPU and wall time from construction time. -// Composes delta CpuWallTiming upon destruction and passes it to the user -// callback, where it can be added to the user's CpuWallTiming using -// CpuWallTiming::add(). -template -class DeltaCpuWallTimer { +/// Keeps track of elapsed CPU and wall time from construction time. +class DeltaCpuWallTimeStopWatch { public: - explicit DeltaCpuWallTimer(F&& func) + explicit DeltaCpuWallTimeStopWatch() : wallTimeStart_(std::chrono::steady_clock::now()), - cpuTimeStart_(process::threadCpuNanos()), - func_(std::move(func)) {} + cpuTimeStart_(process::threadCpuNanos()) {} - ~DeltaCpuWallTimer() { + CpuWallTiming elapsed() const { // NOTE: End the cpu-time timing first, and then end the wall-time timing, // so as to avoid the counter-intuitive phenomenon that the final calculated // cpu-time is slightly larger than the wall-time. @@ -78,8 +73,7 @@ class DeltaCpuWallTimer { std::chrono::duration_cast( std::chrono::steady_clock::now() - wallTimeStart_) .count(); - const CpuWallTiming deltaTiming{1, wallTimeDuration, cpuTimeDuration}; - func_(deltaTiming); + return CpuWallTiming{1, wallTimeDuration, cpuTimeDuration}; } private: @@ -87,6 +81,22 @@ class DeltaCpuWallTimer { // counting earlier than cpu-time. const std::chrono::steady_clock::time_point wallTimeStart_; const uint64_t cpuTimeStart_; +}; + +/// Composes delta CpuWallTiming upon destruction and passes it to the user +/// callback, where it can be added to the user's CpuWallTiming using +/// CpuWallTiming::add(). +template +class DeltaCpuWallTimer { + public: + explicit DeltaCpuWallTimer(F&& func) : func_(std::move(func)) {} + + ~DeltaCpuWallTimer() { + func_(timer_.elapsed()); + } + + private: + DeltaCpuWallTimeStopWatch timer_; F func_; }; diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index b818e17d2c00..d730d39e8d81 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -399,20 +399,22 @@ size_t OpCallStatusRaw::callDuration() const { : fmt::format("null::{}", operatorMethod); } -CpuWallTiming Driver::processLazyTiming( +CpuWallTiming Driver::processLazyIoStats( Operator& op, const CpuWallTiming& timing) { if (&op == operators_[0].get()) { return timing; } auto lockStats = op.stats().wlock(); + + // Checks and tries to update cpu time from lazy loads. auto it = lockStats->runtimeStats.find(LazyVector::kCpuNanos); if (it == lockStats->runtimeStats.end()) { // Return early if no lazy activity. Lazy CPU and wall times are recorded // together, checking one is enough. return timing; } - int64_t cpu = it->second.sum; + const int64_t cpu = it->second.sum; auto cpuDelta = std::max(0, cpu - lockStats->lastLazyCpuNanos); if (cpuDelta == 0) { // Return early if no change. Checking one counter is enough. If this did @@ -421,15 +423,29 @@ CpuWallTiming Driver::processLazyTiming( return timing; } lockStats->lastLazyCpuNanos = cpu; + + // Checks and tries to update wall time from lazy loads. int64_t wallDelta = 0; it = lockStats->runtimeStats.find(LazyVector::kWallNanos); if (it != lockStats->runtimeStats.end()) { - int64_t wall = it->second.sum; + const int64_t wall = it->second.sum; wallDelta = std::max(0, wall - lockStats->lastLazyWallNanos); if (wallDelta > 0) { lockStats->lastLazyWallNanos = wall; } } + + // Checks and tries to update input bytes from lazy loads. + int64_t inputBytesDelta = 0; + it = lockStats->runtimeStats.find(LazyVector::kInputBytes); + if (it != lockStats->runtimeStats.end()) { + const int64_t inputBytes = it->second.sum; + inputBytesDelta = inputBytes - lockStats->lastLazyInputBytes; + if (inputBytesDelta > 0) { + lockStats->lastLazyInputBytes = inputBytes; + } + } + lockStats.unlock(); cpuDelta = std::min(cpuDelta, timing.cpuNanos); wallDelta = std::min(wallDelta, timing.wallNanos); @@ -439,6 +455,8 @@ CpuWallTiming Driver::processLazyTiming( static_cast(wallDelta), static_cast(cpuDelta), }); + lockStats->inputBytes += inputBytesDelta; + lockStats->outputBytes += inputBytesDelta; return CpuWallTiming{ 1, timing.wallNanos - wallDelta, @@ -1016,7 +1034,7 @@ void Driver::withDeltaCpuWallTimer( // opTimingMember upon destruction of the timer when withDeltaCpuWallTimer // ends. The timer is created on the stack to avoid heap allocation auto f = [op, opTimingMember, this](const CpuWallTiming& elapsedTime) { - auto elapsedSelfTime = processLazyTiming(*op, elapsedTime); + auto elapsedSelfTime = processLazyIoStats(*op, elapsedTime); op->stats().withWLock([&](auto& lockedStats) { (lockedStats.*opTimingMember).add(elapsedSelfTime); }); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 6563f204b17f..a1f9d087321d 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -532,13 +532,12 @@ class Driver : public std::enable_shared_from_this { TimingMemberPtr opTimingMember, Func&& opFunction); - // Adjusts 'timing' by removing the lazy load wall and CPU times - // accrued since last time timing information was recorded for - // 'op'. The accrued lazy load times are credited to the source - // operator of 'this'. The per-operator runtimeStats for lazy load - // are left in place to reflect which operator triggered the load - // but these do not bias the op's timing. - CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing); + // Adjusts 'timing' by removing the lazy load wall time, CPU time, and input + // bytes accrued since last time timing information was recorded for 'op'. The + // accrued lazy load times are credited to the source operator of 'this'. The + // per-operator runtimeStats for lazy load are left in place to reflect which + // operator triggered the load but these do not bias the op's timing. + CpuWallTiming processLazyIoStats(Operator& op, const CpuWallTiming& timing); inline void validateOperatorOutputResult( const RowVectorPtr& result, diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 1c56a3bfb294..711a59da0c71 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -177,6 +177,7 @@ struct OperatorStats { // Last recorded values for lazy loading times for loads triggered by 'this'. int64_t lastLazyCpuNanos{0}; int64_t lastLazyWallNanos{0}; + int64_t lastLazyInputBytes{0}; // Total null keys processed by the operator. // Currently populated only by HashJoin/HashBuild. diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 9b52725c3fee..3539c301341b 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -152,6 +152,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"}, {" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, @@ -269,6 +270,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"}, {" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" distinctKey0\\s+sum: .+, count: 1, min: .+, max: .+"}, {" hashtable.capacity\\s+sum: 1252, count: 1, min: 1252, max: 1252"}, @@ -345,6 +347,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"}, {" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" numWrittenFiles\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index db96a5089460..85643fd4134d 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -3815,7 +3815,12 @@ TEST_F(TableScanTest, structLazy) { .project({"cardinality(c2.c0)"}) .planNode(); - assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + auto task = assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + + // Ensure lazy stats are attributed to table scan. + const auto stats = task->taskStats(); + EXPECT_GT(stats.pipelineStats[0].operatorStats[0].inputBytes, 0); + EXPECT_GT(stats.pipelineStats[0].operatorStats[0].outputBytes, 0); } TEST_F(TableScanTest, interleaveLazyEager) { diff --git a/velox/vector/LazyVector.cpp b/velox/vector/LazyVector.cpp index d0cc09c3d59e..a614ead8452c 100644 --- a/velox/vector/LazyVector.cpp +++ b/velox/vector/LazyVector.cpp @@ -23,16 +23,37 @@ #include "velox/vector/SelectivityVector.h" namespace facebook::velox { - namespace { -void writeIOTiming(const CpuWallTiming& delta) { - addThreadLocalRuntimeStat( - LazyVector::kWallNanos, - RuntimeCounter(delta.wallNanos, RuntimeCounter::Unit::kNanos)); - addThreadLocalRuntimeStat( - LazyVector::kCpuNanos, - RuntimeCounter(delta.cpuNanos, RuntimeCounter::Unit::kNanos)); -} + +// Convenience class to record cpu and wall time from construction, updating +// thread local stats at destruction, including input bytes of the vector passed +// as parameter. +class LazyIoStatsRecorder { + public: + LazyIoStatsRecorder(VectorPtr* vector) : vector_(vector) {} + + ~LazyIoStatsRecorder() { + auto cpuDelta = timer_.elapsed(); + addThreadLocalRuntimeStat( + LazyVector::kWallNanos, + RuntimeCounter(cpuDelta.wallNanos, RuntimeCounter::Unit::kNanos)); + addThreadLocalRuntimeStat( + LazyVector::kCpuNanos, + RuntimeCounter(cpuDelta.cpuNanos, RuntimeCounter::Unit::kNanos)); + + if (*vector_) { + addThreadLocalRuntimeStat( + LazyVector::kInputBytes, + RuntimeCounter( + (*vector_)->estimateFlatSize(), RuntimeCounter::Unit::kBytes)); + } + } + + private: + DeltaCpuWallTimeStopWatch timer_; + VectorPtr* vector_; +}; + } // namespace void VectorLoader::load( @@ -41,7 +62,7 @@ void VectorLoader::load( vector_size_t resultSize, VectorPtr* result) { { - DeltaCpuWallTimer timer([&](auto& delta) { writeIOTiming(delta); }); + LazyIoStatsRecorder recorder(result); loadInternal(rows, hook, resultSize, result); } if (hook) { diff --git a/velox/vector/LazyVector.h b/velox/vector/LazyVector.h index 30354af615ab..3957002a1d31 100644 --- a/velox/vector/LazyVector.h +++ b/velox/vector/LazyVector.h @@ -224,6 +224,8 @@ class LazyVector : public BaseVector { public: static constexpr const char* kCpuNanos = "dataSourceLazyCpuNanos"; static constexpr const char* kWallNanos = "dataSourceLazyWallNanos"; + static constexpr const char* kInputBytes = "dataSourceLazyInputBytes"; + LazyVector( velox::memory::MemoryPool* pool, TypePtr type, diff --git a/velox/vector/tests/LazyVectorTest.cpp b/velox/vector/tests/LazyVectorTest.cpp index 2ad3cbc8e446..aee10a50bc42 100644 --- a/velox/vector/tests/LazyVectorTest.cpp +++ b/velox/vector/tests/LazyVectorTest.cpp @@ -639,9 +639,11 @@ TEST_F(LazyVectorTest, runtimeStats) { std::sort(stats.begin(), stats.end(), [](auto& x, auto& y) { return x.first < y.first; }); - ASSERT_EQ(stats.size(), 2); + ASSERT_EQ(stats.size(), 3); ASSERT_EQ(stats[0].first, LazyVector::kCpuNanos); ASSERT_GE(stats[0].second.value, 0); - ASSERT_EQ(stats[1].first, LazyVector::kWallNanos); + ASSERT_EQ(stats[1].first, LazyVector::kInputBytes); ASSERT_GE(stats[1].second.value, 0); + ASSERT_EQ(stats[2].first, LazyVector::kWallNanos); + ASSERT_GE(stats[2].second.value, 0); }